mqtt js 中乱码_mqtt之上RRPC同步调用实战
1.背景
MQTT協(xié)議是基于PUB/SUB的異步通信模式,不適用于服務(wù)端同步控制設(shè)備端返回結(jié)果的場(chǎng)景。物聯(lián)網(wǎng)平臺(tái)基于MQTT協(xié)議制定了一套請(qǐng)求和響應(yīng)的同步機(jī)制,無需改動(dòng)MQTT協(xié)議即可實(shí)現(xiàn)同步通信。物聯(lián)網(wǎng)平臺(tái)提供API給服務(wù)端,設(shè)備端只需要按照固定的格式回復(fù)PUB消息,服務(wù)端使用API,即可同步獲取設(shè)備端的響應(yīng)結(jié)果。
2.名詞解釋
RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客戶機(jī)/服務(wù)器模式,用戶不需要了解底層技術(shù)協(xié)議,即可遠(yuǎn)程請(qǐng)求服務(wù)。RRPC則可以實(shí)現(xiàn)由服務(wù)端請(qǐng)求設(shè)備端并能夠使設(shè)備端響應(yīng)的功能。
3.RRPC原理
物聯(lián)網(wǎng)平臺(tái)收到來自用戶服務(wù)器的RRPC調(diào)用,下發(fā)一條RRPC請(qǐng)求消息給設(shè)備。消息體為用戶傳入的數(shù)據(jù),Topic為物聯(lián)網(wǎng)平臺(tái)定義的Topic,其中含有唯一的RRPC消息ID。
設(shè)備收到下行消息后,按照指定Topic格式(包含之前云端下發(fā)的唯一的RRPC消息ID)回復(fù)一條RRPC響應(yīng)消息給云端,云端提取出Topic中的消息ID,和之前的RRPC請(qǐng)求消息匹配上,然后回復(fù)給用戶服務(wù)器。
如果調(diào)用時(shí)設(shè)備不在線,云端會(huì)給用戶服務(wù)器返回設(shè)備離線的錯(cuò)誤;如果設(shè)備沒有在超時(shí)時(shí)間內(nèi)(5秒內(nèi))回復(fù)RRPC響應(yīng)消息,云端會(huì)給用戶服務(wù)器返回超時(shí)錯(cuò)誤。
3.1 設(shè)備端標(biāo)識(shí) ext=1
為了配合RRPC調(diào)用,設(shè)備端必須在進(jìn)行MQTT CONNECT協(xié)議設(shè)置時(shí),在clientId中增加ext=1參數(shù):
3.2?RRPC通信Topic
RRPC調(diào)用的Topic格式如下:
其中${messageId}是IoT物聯(lián)網(wǎng)平臺(tái)生成的唯一的RRPC消息id,黑體部分是IoT物聯(lián)網(wǎng)平臺(tái)約定,紅色部分可以根據(jù)業(yè)務(wù)場(chǎng)景自定義。
3.3 云端POP API調(diào)用
4.開發(fā)實(shí)戰(zhàn)
4.1 設(shè)備端代碼(Nodejs)
/**"dependencies":?{?"mqtt":?"2.18.8"?}
*/
const?crypto?=?require('crypto');
const?mqtt?=?require('mqtt');
//設(shè)備身份三元組+區(qū)域?
const?deviceConfig?=?{
????productKey:?"替換productKey",
????deviceName:?"替換deviceName",
????deviceSecret:?"替換deviceSecret",
????regionId:?"cn-shanghai"
};
const?url?=?`tcp://${deviceConfig.productKey}.iot-as-mqtt.${deviceConfig.regionId}.aliyuncs.com:1883`;
//2.建立連接
const?client?=?mqtt.connect(url,?makeMqttOptions(deviceConfig));
//3.監(jiān)聽RRPC指令
client.subscribe(`/ext/rrpc/+/this/is/my/topic`)
client.on('message',?function(topic,?message)?{
????console.log("topic="?+?topic)
????console.log("payloadJson="?+?message)
????//4.響應(yīng)RRPC指令
????if?(topic.indexOf(`/ext/rrpc/`)?>?-1)?{
????????client.publish(topic,?JSON.stringify({?code:?200,?msg:?"rrpc?ok"?}));
????}
})
/*
??生成MQTT連接參數(shù)
*/
function?makeMqttOptions()?{
????const?params?=?{
????????productKey:?deviceConfig.productKey,
????????deviceName:?deviceConfig.deviceName,
????????timestamp:?Date.now(),
????????clientId:?Math.random().toString(36).substr(2),
????}
????//CONNECT參數(shù)
????const?options?=?{
????????keepalive:?60,?//60s
????????clean:?false,?//cleanSession保持持久會(huì)話
????????protocolVersion:?4?//MQTT?v3.1.1
????}
????//1.生成clientId,username,password
????options.password?=?signHmacSha1(params,?deviceConfig.deviceSecret);
????options.clientId?=?`${params.clientId}|securemode=3,signmethod=hmacsha1,timestamp=${params.timestamp},ext=1|`;
????options.username?=?`${params.deviceName}&${params.productKey}`;
????return?options;
}
/*
??生成基于HmacSha1的password
??參考文檔:https://help.aliyun.com/document_detail/73742.html?#h2-url-1
*/
function?signHmacSha1(params,?deviceSecret)?{
????let?keys?=?Object.keys(params).sort();
????//?按字典序排序
????keys?=?keys.sort();
????const?list?=?[];
????keys.map((key)?=>?{
????????list.push(`${key}${params[key]}`);
????});
????const?contentStr?=?list.join('');
????return?crypto.createHmac('sha1',?deviceSecret).update(contentStr).digest('hex');
}
4.2 服務(wù)端POP API調(diào)用代碼(Nodejs)
const?co?=?require('co');const?RPCClient?=?require('@alicloud/pop-core').RPCClient;
const?options?=?{
????accessKey:"替換accessKey",
????accessKeySecret:?"替換accessKeySecret"
};
//1.初始化client
const?client?=?new?RPCClient({
????accessKeyId:?options.accessKey,
????secretAccessKey:?options.accessKeySecret,
????endpoint:?'https://iot.cn-shanghai.aliyuncs.com',
????apiVersion:?'2018-01-20'
});
//2.構(gòu)造RRPC參數(shù)
const?params?=?{
????ProductKey:?"你的產(chǎn)品ProductKey",
????DeviceName:?"你的設(shè)備DeviceName",
????RequestBase64Byte:?Buffer.from("Hello?World").toString('base64'),
????Timeout:?5000,
????Topic:'/this/is/my/topic'
};
co(function*()?{
????try?{
????????//3.發(fā)起API調(diào)用
????????const?response?=?yield?client.request('RRpc',?params);
????????console.log(JSON.stringify(response));
????}?catch?(err)?{
????????console.log(err);
????}
});
4.3 實(shí)際運(yùn)行效果
#?設(shè)備端日志$?node?rrpc_client.js?
topic=/ext/rrpc/1128893568908287488/this/is/my/topic
payloadJson=Hello?World
#?服務(wù)端日志
$?node?rrpc_Server.js?
{
????"MessageId":?"1128893568908287488",
????"RequestId":?"F9540921-8FDB-4671-B50A-84D119DA56D4",
????"PayloadBase64Byte":?"eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==",
????"Success":?true,
????"RrpcCode":?"SUCCESS"
}
其中?eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==?
解碼后即:{?code:?200,?msg:?"rrpc?ok"?}
4.4 IoT物聯(lián)網(wǎng)平臺(tái) 日志服務(wù)
總結(jié)
以上是生活随笔為你收集整理的mqtt js 中乱码_mqtt之上RRPC同步调用实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 35平方铝线可以代替18平方的铝线吗
- 下一篇: 两线怎么接三线插座图_一文搞懂电工配电二