autobahn-python的使用——sendMessage()和断线自动重连
生活随笔
收集整理的這篇文章主要介紹了
autobahn-python的使用——sendMessage()和断线自动重连
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
客戶端代碼:
# -*-coding:utf8-*- import threading import timefrom autobahn.asyncio.websocket import WebSocketClientProtocolfrom autobahn.asyncio.websocket import WebSocketClientFactory import asyncioclass BaseSipClientProtocol(WebSocketClientProtocol):KEEPALIVE_INTERVAL = 5def check_keepalive(self):last_interval = time.time() - self.last_ping_timeif last_interval > 2 * self.KEEPALIVE_INTERVAL:# drop connectionself.dropConnection(abort=True)else:# reschedule next checkself.schedule_keepalive()def schedule_keepalive(self):""" Store the future in the class to cancel it later. """try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()self.keepalive_fut = loop.call_later(self.KEEPALIVE_INTERVAL,self.check_keepalive)def onConnect(self, response):print("Server connected: {0}".format(response.peer))# save connection to server handle on factoryself.factory.saveConnectionToServer(self)self.onConnectToServer(response.peer)def onOpen(self):""" Start scheduling the keepalive check. """self.last_ping_time = time.time()self.schedule_keepalive()def onPing(self, payload):""" Respond to the ping request. """self.last_ping_time = time.time()self.sendPong(payload)print('Ping == ', payload)def connection_lost(self, exc):""" Cancel the scheduled future. """self.keepalive_fut.cancel()try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()loop.stop()def onMessage(self, payload, isBinary):# 做一些處理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection to serverself.factory.delConnectionToServer()self.onDisConnectFromServer(wasClean, code, reason)# client連接上server的時(shí)候回調(diào)def onConnectToServer(self, peer):pass# client斷開(kāi)與server的連接的時(shí)候回調(diào)def onDisConnectFromServer(self, wasClean, code, reason):pass# 收到Msg消息時(shí)回調(diào)def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))# 保存一個(gè)與server連接的句柄 class BaseSipClientFactory(WebSocketClientFactory):_connectionToServer = None# save connection to serverdef saveConnectionToServer(self, connectedHandle):self._connectionToServer = connectedHandle# remove connection to serverdef delConnectionToServer(self):self._connectionToServer = Nonedef getConnectionToServer(self):return self._connectionToServer# support sendMsg to clientdef sendMsg(self, data):if self._connectionToServer is not None:if isinstance(data,bytes):self._connectionToServer.sendMessage(data, True)else:self._connectionToServer.sendMessage(data.encode('utf-8'))else:raise Exception('與server的連接不存在')if __name__ == '__main__':try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asynciofactory = BaseSipClientFactory(u"ws://192.168.88.3:9009")factory.protocol = BaseSipClientProtocolloop = asyncio.get_event_loop()while True:fut = loop.create_connection(factory, '192.168.88.3', 9009)try:transport, protocol = loop.run_until_complete(asyncio.wait_for(fut, 5))loop.run_forever()except asyncio.TimeoutError:print('TimeoutError')continueexcept OSError as err:print('OSError == ' + str(err))# a little timeout before trying againloop.run_until_complete(asyncio.sleep(5))loop.close()服務(wù)端代碼:
# -*-coding:utf8-*- from autobahn.asyncio.websocket import WebSocketServerFactoryfrom autobahn.asyncio.websocket import WebSocketServerProtocolclass BaseSipServerProtocol(WebSocketServerProtocol):def onConnect(self, request):print("Client connecting: {0}".format(request.peer))# save connection in factory _connectionSetsself.factory.addConnection(request.peer, self)self.onClientConnected(request.peer)def onOpen(self):print("WebSocket connection open.")def onMessage(self, payload, isBinary):# 做一些處理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection from factory _connectionSetsself.factory.removeConnection(self)self.onClientLostConnected(wasClean, code, reason)# client連接上來(lái)的時(shí)候回調(diào)def onClientConnected(self, peer):pass# client斷開(kāi)連接的時(shí)候回調(diào)def onClientLostConnected(self, wasClean, code, reason):pass# 收到Msg消息時(shí)回調(diào)def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))class BaseSipServerFactory(WebSocketServerFactory):_connectionSets = dict()# save connectiondef addConnection(self, peer, connectedHandle):self._connectionSets.setdefault(peer, connectedHandle)# remove connectiondef removeConnection(self, connectedHandle):removePeer = Nonefor k, v in self._connectionSets.items():if v == connectedHandle:removePeer = kbreakif removePeer is not None:del self._connectionSets[removePeer]def getConnectionByPeer(self, peer):return self._connectionSets.get(peer)def getConnections(self):return self._connectionSets# support sendMsg to clientdef sendMsg(self, peer, data):connectedHandle = self.getConnectionByPeer(peer)if connectedHandle is not None:if isinstance(data,bytes):connectedHandle.sendMessage(data, True)else:connectedHandle.sendMessage(data.encode('utf-8'))else:raise Exception('peer的連接不存在')啟動(dòng)Server:
class CommunicationTool:Server = 1Client = 0def __init__(self):self.isAutoReconnect = False# 設(shè)置創(chuàng)建的Server還是Clientdef setFlag(self, flag):self.flag = flag# 創(chuàng)建serverdef createServer(self, addr, port):# 先判斷創(chuàng)建的類(lèi)型if self._isServer():if isinstance(addr, str) and isinstance(port, int):self.addr = addrself.port = portelse:raise Exception('createServer的參數(shù)類(lèi)型有誤')else:raise Exception('不支持client類(lèi)型執(zhí)行此方法')def startListen(self):# 先判斷必填參數(shù)是否都填了if self._isServer():if self.serverProtocol is None:raise Exception('未設(shè)置protocol')if self.serverFactory is None:raise Exception('未設(shè)置factory')if self.addr is None or self.port is None:raise Exception('未設(shè)置createServer')else:raise Exception('未設(shè)置flag或不支持此方法')# 開(kāi)啟服務(wù)器,應(yīng)該在子線程中一直運(yùn)行# 調(diào)用父類(lèi)的startListen方法,將數(shù)據(jù)傳入factory = self.serverFactoryfactory.protocol = self.serverProtocolif self.isAutoReconnect: # set auto-reconnectionfactory.setProtocolOptions(autoPingInterval=5, autoPingTimeout=2)loop = asyncio.get_event_loop()coro = loop.create_server(factory, self.addr, self.port)server = loop.run_until_complete(coro)serverThread = threading.Thread(target=self._run, args=(loop, server), name='serverThread')serverThread.start()def _run(self, loop, server):try:loop.run_forever()except KeyboardInterrupt:passfinally:server.close()loop.close()# 設(shè)置是否自動(dòng)斷線重連def setAutoReconnect(self, isAutoReconnect):if isinstance(isAutoReconnect, bool):self.isAutoReconnect = isAutoReconnectelse:raise Exception('參數(shù)類(lèi)型錯(cuò)誤')def setServerProtocol(self, protocol):if self._isServer():self.serverProtocol = protocolelse:raise Exception('Client類(lèi)型不能調(diào)用此方法')def setServerFactory(self, factory):if self._isServer():self.serverFactory = factoryelse:raise Exception('Client類(lèi)型不能調(diào)用此方法')def _isServer(self):if self.flag == self.Server:return Trueif self.flag == self.Client:return Falseelse:raise Exception('未設(shè)置flag')if __name__ == '__main__':cTool = CommunicationTool() # 傳入webSocket的地址cTool.setFlag(CommunicationTool.Server) # 設(shè)置類(lèi)型cTool.setServerProtocol(BaseSipServerProtocol)factory = BaseSipServerFactory(u"ws://127.0.0.1:9009")cTool.setServerFactory(factory)factory.setProtocolOptions(autoPingInterval=5)cTool.setAutoReconnect(True) # 設(shè)置是否自動(dòng)斷線重連cTool.createServer('0.0.0.0', 9009) # 創(chuàng)建servercTool.startListen() # 開(kāi)啟server監(jiān)聽(tīng)總結(jié)
以上是生活随笔為你收集整理的autobahn-python的使用——sendMessage()和断线自动重连的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 我们的游戏世界(背包【仓库】,交易,任务
- 下一篇: Windows系统切换工具 算法分析+注