基于WebSocket协议实现Broker
寫在前面:
前兩篇文字<<基于MQTT協議談談物聯網開發-華佗寫代碼>>,<<基于MQTT協議實現Broker-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基于MQTT協議的一些常見應用場景,并以一個簡單的消息推送系統作為例子具體闡述了Mqtt Broker部分的實現,之前主要以原生android或者iOS或者服務端代理作為例子,考慮到在移動端開發時,選擇的技術棧有所不同,有的選擇web前端開發.作為例子,這里以之前的消息推送系統為例基于web前端開發,繼續敘述基于WebSocket協議實現Broker.
?
1.WebSocket協議主要特點:
(1)基于http協議握手建立tcp長連接;
(2)相比http,WebSocket協議交換最小化,降低網絡流量;
(3)雙向通信,服務器可以主動推送數據給客戶端;
?
2.Mqtt Broker具體實現(WebSocket部分):
2.1Mqtt Broker架構草圖:
?
2.2Mqtt Broker實現細節:
(1)新增實現websocket server,監聽不同端口;
(2)每個websocket連接,實例化一個mqttclient負責其協議解析,消息發布和訂閱等;
(3)復用之前Mqtt Broker與RabbitMQ通信部分,具體參考上一篇文字;
(4)其他...
?
2.3Mqtt Broker代碼實現(WebSocket部分):
type tcpKeepAliveListener struct { ????*net.TCPListener }var upgrader = websocket.Upgrader{ ????ReadBufferSize: 1024, ????WriteBufferSize: 1024, } ? //監聽websocket server地址,注冊websocket handler func (mb *MqttBroker) ListenAndServeWeb() { defer mb.wg.Done()http.HandleFunc("/", mb.webHandler)webserver := &http.Server{Addr: mb.webaddr, Handler: nil}var listener net.Listenervar err errorlistener, err = net.Listen("tcp", mb.webaddr)if err != nil {return}U.GetLog().Printf("listen and serve web broker on %s", mb.webaddr)err = webserver.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
}
//每一個Websocket連接,實例化一個MqttClient負責其協議解析,以及與rabbitmq的通信 func (mb *MqttBroker) webHandler(w http.ResponseWriter, r *http.Request) { ????upgrader.CheckOrigin = checkSameOrigin ????conn, err := upgrader.Upgrade(w, r, nil) ????if err != nil { ????????U.GetLog().Printf("upgrade error:%v", err) ????????return ????} ????mqttclient, err := NewMqttClient(mb.wg, mb, nil, conn, "web") ????if err != nil { ????????return ????} ????mb.clientMap[mqttclient.GetClientID()] = mqttclient ????mb.wg.Add(1) ????go mqttclient.ServeWeb() }
?
2.4Mqtt Client代碼實現(WebSocket部分):
//定義WebSocket通信消息格式//Action選項有publish,subscribe,unsubscribe
type WebMessage struct {Action stringTopic stringPayload string }type MqttClient struct {wg *sync.WaitGroupbroker *MqttBrokertcpconn net.Connwconn *websocket.Conn...needDisConn bool }//通過WebMessage.Action區分消息指令類型 func (mc *MqttClient) ServeWeb() {defer mc.wg.Done()defer mc.commonDefer()if mc.wconn == nil {return}for {if mc.needDisConn {break}_, message, err := mc.wconn.ReadMessage()if err != nil {U.GetLog().Printf("handle message error:%v", err)mc.needDisConn = truecontinue}wm := WebMessage{}err = json.Unmarshal(message, &wm)if err != nil {U.GetLog().Printf("json.Unmarshal(message, &wm) error:%v", err)continue}switch wm.Action {case "subscribe":err = mc.handleWebSubscibe(wm.Topic)case "publish":err = mc.handleWebPublish(wm.Topic, wm.Payload)case "unsubscribe":err = mc.handleWebUnSubscribe(wm.Topic)case "ping":mc.lastheartbeat = 0default:U.GetLog().Printf("unexpected WebMessage Action:%s", wm.Action)continue}if err != nil {U.GetLog().Printf("handle message error:%v", err)}mc.lastheartbeat = 0} }
?
3.WebSocket Client端實現:
3.1實現細節:
(1)建立與WebSocket Server的連接;
(2)初始化WebSocket,注冊相關回調函數;
(3)實現WebSocket斷線重連機制;
(4)封裝類似mqtt基于topic的發布訂閱等接口;
(5)Nodejs端需要browserify相關js文件,Javascript端可以直接調用WebSocket;
(6)其他...
?
3.2具體代碼實現:
var WebSocket = require('ws'); var WEBSOCKET_MQTT_BROKER = 'ws://your_server_ip/ws/'; var ping = {Action: "ping" };var _listeners = {}; var _websocket = null; var _connected = false;_access = function () {console.log('try mqtt.connect');_connect_websocket();setInterval(function () {_reconnect_websocket();if (_websocket != null && _connected) {_websocket.send(JSON.stringify(ping));}}, 3000); }; //websocket初始化,并實現相關回調函數 _init_websocket = function () {if (_websocket == null) {return;}_websocket.onopen = function () {_connected = true;console.log("Connected to WebSocket server.");for (var topic in _listeners) {var sub = {Action: "subscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(sub));}};_websocket.onclose = function () {_connected = false;_websocket = null;console.log("Disconnected");};_websocket.onmessage = function (evt) {console.log('recv data from server: ' + evt.data);var dataObj = JSON.parse(evt.data);_listeners[dataObj.Topic] && _listeners[dataObj.Topic](dataObj.Payload);};_websocket.onerror = function (evt) {_connected = false;_websocket = null;console.log('Error occured: ' + evt);}; };_connect_websocket = function () {if (_connected) {return;}_websocket = new WebSocket(WEBSOCKET_MQTT_BROKER);_init_websocket(); }; //斷線重連,通過定時器實現每三秒斷線重連 _reconnect_websocket = function () {if (_connected) {return;}_websocket = new WebSocket(WEBSOCKET_MQTT_BROKER);_init_websocket(); }; //模擬mqtt發布消息 sendMessage = function (topic, data) {if (!_websocket || !_connected) {var err = new Error('iot client not ready.');console.warn(err);return;}var send_data = JSON.stringify(data);var pub = {Action: "publish",Topic: topic,Payload: send_data};_websocket.send(JSON.stringify(pub)); }; //模擬mqtt訂閱消息,并根據topic注冊回調函數 onMessage = function (topic, callback) {_listeners[topic] = callback;if (!_websocket || !_connected) {console.warn('onMessage, but iot client not ready.');return;}var sub = {Action: "subscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(sub)); }; //模擬mqtt取消訂閱,并根據topic刪除對應回調函數 stopReceiveMessage = function (topic) {delete _listeners[topic];if (!_websocket || !_connected) {console.warn('stopReceiveMessage, but iot client not ready.');return;}var unsub = {Action: "unsubscribe",Topic: topic,Payload: ""};_websocket.send(JSON.stringify(unsub)); };_access();?
4.WebSocket相關nginx配置:
server { listen 80;server_name your_server_name;...location /ws/ {proxy_redirect off;add_header Access-Control-Allow-Origin *;add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';proxy_pass http://127.0.0.1:2884/;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}}
?
出于篇幅考慮,之前兩篇文字敘述過的內容,比如Mqtt Broker其他實現部分以及與RabbitMQ通信部分,都是復用之前的代碼邏輯,這里不再贅述,Mqtt Broker中WebSocket部分相當于使用WebSocket協議做了MQTT協議的翻譯轉換,也有一些成員變量,用到了也不一一具體注釋了,主要通過代碼關鍵路徑敘述實現的一些細節,如有錯誤,懇請指出,轉載也請注明出處!!!
?
未完待續...
轉載于:https://www.cnblogs.com/huatuo/p/9323729.html
總結
以上是生活随笔為你收集整理的基于WebSocket协议实现Broker的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: #ifndef/#define/#end
- 下一篇: jsonp的原理?