在游戲行業(yè)游戲客戶端和服務(wù)端需要大量的,快速的通訊,這里面就會(huì)用到websocket 。
Autobahn 是一個(gè)高性能的websocket 它采用了兩種實(shí)現(xiàn)方案 。
1 基于Twisted
2 基于asyncio
開發(fā)語(yǔ)言: python
PS:twisted 的 reactor.callLater回調(diào)方法本身有bug,不穩(wěn)定,過(guò)一段時(shí)間會(huì)假死,可能是因?yàn)槲覜]深入了解的原因。所以我用tornado替代了twisted實(shí)現(xiàn)了websocket,會(huì)在下一篇詳解,tornado做websocket是真NB,穩(wěn)定高效簡(jiǎn)介,可參考我這篇博客【https://editor.csdn.net/md/?articleId=106819559】,而且更簡(jiǎn)單。
Autobahn 有如下的特點(diǎn):
framework
for WebSocket
and WAMP clients
(Websocket框架和WAMP框架
)
compatible
with Python
2.7 and 3.3+ (兼容python2
.7和python3
.3以上的版本
)
runs on CPython
, PyPy
and Jython
(可以運(yùn)行在Cpython
, PyPy 和Jython 上面
)
runs under Twisted
and asyncio
(可以選擇Twisted 或者是asyncio的方式來(lái)運(yùn)行
)
implements WebSocket RFC6455
(and draft versions Hybi
-10+) (實(shí)現(xiàn)WebSocket RFC6455(以及草案版本Hybi
-10 +)
)
implements WebSocket compression
(實(shí)現(xiàn)WebSocket壓縮
)
implements WAMP
, the Web Application Messaging Protocol
(實(shí)現(xiàn)Web應(yīng)用程序消息傳遞協(xié)議
)
supports TLS
(secure WebSocket
) and proxies
(支持TLS(安全WebSocket)和代理
)
Open
-source
(MIT license
) (開源
)
Autobahn 可以用來(lái)做什么
Autobahn 非常適用于交易系統(tǒng),多人游戲, 實(shí)時(shí)聊天等應(yīng)用程序的開發(fā)
一個(gè)簡(jiǎn)單的webSocket服務(wù)端程序
from autobahn
.twisted
.websocket
import WebSocketServerProtocol
class MyServerProtocol(WebSocketServerProtocol
):def onConnect(self
, request
):print("Client connecting: {}".format(request
.peer
))def onOpen(self
):print("WebSocket connection open.")def onMessage(self
, payload
, isBinary
):if isBinary
:print("Binary message received: {} bytes".format(len(payload
)))else:print("Text message received: {}".format(payload
.decode
('utf8')))self
.sendMessage
(payload
, isBinary
)def onClose(self
, wasClean
, code
, reason
):print("WebSocket connection closed: {}".format(reason
))
幾個(gè)重要的方法:
onConnect: 當(dāng)有新的客戶端連接進(jìn)來(lái)的時(shí)候調(diào)用該方法
onOpen: 當(dāng)連接成功時(shí)調(diào)用該方法
onMessage:該方法是接收來(lái)自客戶端的消息
onClose: 當(dāng)關(guān)閉時(shí)調(diào)用該方法
安裝Autobahn
Autobahn 的運(yùn)行依賴于 Twisted 和 asyncio 因此安裝Autobahn 需要確認(rèn)你的python支持了
Twisted 和 asyncio
各個(gè)python 版本對(duì) Twisted 和 asyncio 支持如下
使用pip 安裝
pip install autobahn[twisted]
pip install autobahn[asyncio]
驗(yàn)證是否安裝成功
from autobahn import version
print(version)
0.9.1
啟動(dòng)webScoketServer
twisted 版本:
import sys
from twisted
.python
import log
from twisted
.internet
import reactorlog
.startLogging
(sys
.stdout
)from autobahn
.twisted
.websocket
import WebSocketServerFactoryfactory
= WebSocketServerFactory
()factory
.protocol
= MyServerProtocolreactor
.listenTCP
(9000, factory
)reactor
.run
()
asyncio 版本:
try:import asyncio
except ImportError
:import trollius
as asyncio
from autobahn
.asyncio
.websocket
import WebSocketServerFactoryfactory
= WebSocketServerFactory
()factory
.protocol
= MyServerProtocolloop
= asyncio
.get_event_loop
()coro
= loop
.create_server
(factory
, '127.0.0.1', 9000)server
= loop
.run_until_complete
(coro
)try:loop
.run_forever
()except KeyboardInterrupt
:passfinally:server
.close
()loop
.close
()
創(chuàng)建WebSocketClient
class MyClientProtocol(WebSocketClientProtocol
):def onOpen(self
):self
.sendMessage
(u
"Hello, world!".encode
('utf8'))def onMessage(self
, payload
, isBinary
):if isBinary
:print("Binary message received: {0} bytes".format(len(payload
)))else:print("Text message received: {0}".format(payload
.decode
('utf8')))
WebSocketClientProtocol 可以使用:
autobahn.twisted.websocket.WebSocketClientProtocol
autobahn.asyncio.websocket.WebSocketClientProtocol
使用Client
Twisted版本
import sys
from twisted
.python
import log
from twisted
.internet
import reactorlog
.startLogging
(sys
.stdout
)from autobahn
.twisted
.websocket
import WebSocketClientFactoryfactory
= WebSocketClientFactory
()factory
.protocol
= MyClientProtocolreactor
.connectTCP
("127.0.0.1", 9000, factory
)reactor
.run
()
asyncio版本
try:import asyncio
except ImportError
:import trollius
as asyncio
from autobahn
.asyncio
.websocket
import WebSocketClientFactoryfactory
= WebSocketClientFactory
()factory
.protocol
= MyClientProtocolloop
= asyncio
.get_event_loop
()coro
= loop
.create_connection
(factory
, '127.0.0.1', 9000)loop
.run_until_complete
(coro
)loop
.run_forever
()loop
.close
()
發(fā)送消息
當(dāng)我們的server或者client 繼承了autobahn之后就可以使用 sendMessage 方法來(lái)發(fā)送消息
接收消息
當(dāng)我們的server或者client 實(shí)現(xiàn)了onMessage(self, payload, isBinary): 定義了該方法之后,就可以接收到消息
更多詳細(xì)介紹請(qǐng)查看: https://autobahn.readthedocs.io/en/latest/index.html
Twisted demo,本人寫的測(cè)試腳本:
以下的爬取數(shù)據(jù)方法,以及組裝數(shù)據(jù)結(jié)構(gòu)到客戶端組方法,需要你自己重寫下,大致流程就是下面這樣,親測(cè)可用。
PS:twisted 的 reactor.callLater回調(diào)方法本身有bug,不穩(wěn)定,過(guò)一段時(shí)間會(huì)假死。所以我用tornado替代了twisted實(shí)現(xiàn)了websocket,會(huì)在下一篇詳解,tornado做websocket是真NB,不解釋。
"""
-------------------------------------------------File Name :viewsDescription :Author : cjhdate :2020-02-20
-------------------------------------------------
"""
import sys
import json
import time
from utils
.dbapi
import DBPool
from twisted
.python
import log
from twisted
.internet
import reactor
from autobahn
.twisted
.websocket
import WebSocketServerFactory
, WebSocketServerProtocol
, listenWSlog
.startLogging
(sys
.stdout
)class BroadcastServerProtocol(WebSocketServerProtocol
):""" webSocket 服務(wù)端 """def onOpen(self
):"""打開連接:return:"""self
.factory
.register
(self
)print("WebSocket connection open.")def onMessage(self
, payload
, isBinary
):"""接收消息:param payload::param isBinary: Boolean值,判斷payload是否為二進(jìn)制數(shù)據(jù):return:"""if isBinary
:print('WebSocket Data Type Error')else:obj
= eval(payload
.decode
('utf-8'))unit_id
= obj
.get
('unit_id') handle
= obj
.get
('handle') db_code
= obj
.get
('db_code') config
= eval(obj
.get
('config')) self
.factory
.client_to_group
(self
, db_code
, handle
, unit_id
, config
)def onConnect(self
, request
):"""打開鏈接:param request::return:"""print("Client connecting: {}".format(request
.peer
))def onClose(self
, wasClean
, code
, reason
):"""關(guān)閉連接:param wasClean::param code::param reason::return:"""print("WebSocket connection closed: {}".format(reason
))self
.factory
.un_register
(self
)def send_msg(self
, payload
, is_binary
):"""重寫 sendMessage:param payload: 務(wù)必編碼 .encode('utf-8'):param is_binary::return:"""self
.sendMessage
(payload
=payload
, isBinary
=is_binary
)class BroadcastServerFactory(WebSocketServerFactory
, DBPool
):""" 工廠 創(chuàng)建 協(xié)議類的實(shí)例"""def __init__(self
, url
):WebSocketServerFactory
.__init__
(self
, url
)self
.data
= [] self
.send_list
= [] self
.client_list
= [] self
.client_group
= {} self
.polling
() def polling(self
):"""回調(diào)函數(shù):return:"""self
.item_client_group
()reactor
.callLater
(5, self
.polling
) def register(self
, client
):"""注冊(cè)客戶端,加入客戶端 列表:param client::return:"""if client
not in self
.client_list
:print("registered client {}".format(client
.peer
))self
.client_list
.append
(client
)print self
.client_list
print self
.client_group
def un_register(self
, client
):"""刪除客戶端:param client::return:"""if client
in self
.client_list
:print("unregistered client {}".format(client
.peer
))self
.client_list
.remove
(client
)print self
.client_list
print self
.client_group
def send_msg(self
):"""輪循服務(wù)列表,逐個(gè)發(fā)消息(會(huì)一直被回調(diào)函數(shù)觸發(fā)):param data::return:"""self
.data
= json
.dumps
(self
.data
).encode
('utf8')for client
in self
.send_list
:client
.sendMessage
(self
.data
)print("broadcasting message to {}".format(client
.peer
))def client_to_group(self
, client
, db_code
, handle
, unit_id
, config
):"""組裝請(qǐng)求參數(shù),掛載到客戶端:param client::param db_code::param unit_id::param config::param handle::return:"""if db_code
in self
.client_group
:if handle
in self
.client_group
[db_code
]:if unit_id
in self
.client_group
[db_code
][handle
]:self
.client_group
[db_code
][handle
][unit_id
]['client'].append
(client
)else:self
.client_group
[db_code
][handle
].update
({unit_id
: {'client': [client
], 'config': config
}})else:self
.client_group
[db_code
].update
({handle
: {unit_id
: {'client': [client
], 'config': config
}}})else:self
.client_group
.update
({db_code
: {handle
: {unit_id
: {'client': [client
], 'config': config
}}}})def item_client_group(self
):"""迭代客戶端及請(qǐng)求參數(shù):return:"""for db_code
, v_1
in self
.client_group
.items
():for handle
, v_2
in v_1
.items
():for unit_id
, v_3
in v_2
.items
():self
.get_close_client
(v_3
['client'])v_3
['client'] = self
.send_listself
.get_data
(handle
, unit_id
, v_3
['config']) self
.send_msg
() def get_close_client(self
, client_send
):"""主動(dòng)剔除已關(guān)閉 客戶端:param client_send::return:"""self
.send_list
= list(set(self
.client_list
) & set(client_send
))def get_data(self
, handle
, unit_id
, config
):"""爬取數(shù)據(jù):param db_code::param handle::param unit_id::param config::return:"""print 'start'print time
.time
()self
.data
= [] if handle
== 'real' and unit_id
and config
:config
= dict(host
=config
.get
('HOST'), port
=eval(config
.get
('PORT')), db
=config
.get
('NAME'),user
=config
.get
('USER'), passwd
=config
.get
('PASSWORD'))DBPool
.__init__
(self
, config
)sql_1
= "select `code`,recode,`name`,`rename`,show_id,gateway_address address,gateway_id,lot_type " \
"address_type from archive_device where status='a' and unit_id={}".format(unit_id
)sql_data
= "select * from device_data_real where equip_address='{}' limit 1"self
.data
= self
.query
(sql_1
)for i
in self
.data
:data
= self
.query
(sql_data
.format(i
.get
('address')))if data
:value
= data
[0].get
(i
.get
('code'))i
['value'] = value
else:i
['value'] = ''print time
.time
()print '_________________________________________'def run():factory
= BroadcastServerFactoryfactory
= factory
('ws://0.0.0.0:4053') factory
.protocol
= BroadcastServerProtocol listenWS
(factory
) reactor
.run
() if __name__
== '__main__':run
()
總結(jié)
以上是生活随笔為你收集整理的Python+Twisted+Autobahn实现Websocket(附完整demo)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。