Python zmq 讲解
安裝 pip install pyzmq-18.0.1
?
1.?請求應答模式(Request-Reply)(rep 和 req)
??消息雙向的,有來有往,req端請求的消息,rep端必須答復給req端
2.?訂閱發布模式?(pub 和?sub)
??消息單向的,有去無回的。可按照發布端可發布制定主題的消息,訂閱端可訂閱喜歡的主題,訂閱端只會收到自己已經訂閱的主題。發布端發布一條消息,可被多個訂閱端同事收到。
3.?push?pull模式
消息單向的,也是有去無回的。push的任何一個消息,始終只會有一個pull端收到消息.
后續的代理模式和路由模式等都是在三種基本模式上面的擴展或變異。
?
1.請求回應模型。由請求端發起請求,并等待回應端回應請求。從請求端來看,一定是一對對收發配對的;
反之,在回應端一定是發收對。請求端和回應端都可以是1:N的模型。通常把1認為是server,N認為是Client。
0MQ可以很好的支持路由功能(實現路由功能的組件叫做Device),把1:N擴展為N:M(只需要加入若干路由節點)。
從這個模型看,更底層的端點地址是對上層隱藏的。每個請求都隱含回應地址,而應用則不關心它。
2.發布訂閱模型。這個模型里,發布端是單向只發送數據的,且不關心是否把全部的信息都發送給訂閱者。
如果發布端開始發布信息的時候,訂閱端尚未連接上,這些信息直接丟棄。
不過一旦訂閱端連接上來,中間會保證沒有信息丟失。
同樣,訂閱端則只負責接收,而不能反饋。
如果發布端和訂閱端需要交互(比如要確認訂閱者是否已經連接上),則使用額外的socket采用請求回應模型滿足這個需求。
3.管道模型。這個模型里,管道是單向的,從PUSH端單向的向PULL端單向的推送數據流。
?
server為REP模式,等待消息,client為REQ模式,向server請求消息。
一個最簡單的例子:
server.py
import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") message = socket.recv() print "message from client:", message # Send reply back to client socket.send("World")client.py
import zmq context = zmq.Context() print "Connecting to server..." socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") socket.send ("Hello") message = socket.recv() print "Received reply: ", message一個線程中如果有多個sokect,同時需要收發數據時,zmq提供polling sockets實現,不用在send()或者recv()時阻塞socket。
下面是一個在recv()端接受信息的poller()輪詢接受代碼。其中socks = dict(poller.poll(1000))中的1000位延遲時間。
?
結果:
Running server on port: 5556 Running server on port: 5558 8 server#5230 Connected to server with port 5556 Connected to publisher with port 5558 Recieved control command: Continue 8 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Continue 9 server#5230 Processing ... 9 server#5230 Recieved control command: Exit Recieved exit command, client will stop recieving messages 8 server#5230 9 server#5230 9 server#5230Process finished with exit code 0from zmq.auth.certs import create_certificates?
創建證書
public_key, private_key = create_certificates(tmp_key_dir, id)
?
from zmq.auth.ioloop import IOLoopAuthenticator
IO循環驗證程序
?
from zmq.auth.certs import load_certificate
讀取證書
?
ZMQ開源源碼實例
from gevent import Greenlet import zmq.green as zmq from zmq.auth.certs import create_certificatesdef initcontext = beeswarm.shared.zmq_contextself.config_commands = context.socket(zmq.REP)self.enabled = Truedef runself.config_commands.bind(SocketNames.CONFIG_COMMANDS.value)poller = zmq.Poller()poller.register(self.config_commands, zmq.POLLIN)while self.enabled:socks = dict(poller.poll(500))if self.config_commands in socks and socks[self.config_commands] == zmq.POLLIN:self._handle_commands()def _handle_commands(self):msg = self.config_commands.recv()if ' ' in msg:cmd, data = msg.split(' ', 1)else:cmd = msglogger.debug('Received command: {0}'.format(cmd))if cmd == Messages.SET_CONFIG_ITEM.value: #SET 重新設置配置文件self._handle_command_set(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, '{}'))elif cmd == Messages.GET_CONFIG_ITEM.value: #GET 獲取配置文件某KEY值value = self._handle_command_get(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, value))elif cmd == Messages.GET_ZMQ_KEYS.value: #返回客戶端證書self._handle_command_getkeys(data)elif cmd == Messages.DELETE_ZMQ_KEYS.value: #刪除客戶端證書self._remove_zmq_keys(data)self.config_commands.send('{0} {1}'.format(Messages.OK.value, '{}'))else:logger.error('Unknown command received: {0}'.format(cmd))self.config_commands.send(Messages.FAIL.value)def _handle_command_set(self, data):new_config = json.loads(data)self.config.update(new_config)self._save_config_file()def _handle_command_get(self, data):# example: 'network,host' will lookup self.config['network']['host']#示例:“network,host”將查找self.config[“network”][“host”]keys = data.split(',')value = self._retrieve_nested_config(keys, self.config)return valuedef _retrieve_nested_config(self, keys, dict):if keys[0] in dict:if len(keys) == 1:return dict[keys[0]]else:return self._retrieve_nested_config(keys[1:], dict[keys[0]])def _handle_command_getkeys(self, name):private_key, publickey = self._get_zmq_keys(name)self.config_commands.send(Messages.OK.value + ' ' + json.dumps({'public_key': publickey,'private_key': private_key}))def _save_config_file(self):with open(self.config_file, 'w+') as config_file:config_file.write(json.dumps(self.config, indent=4))def _get_zmq_keys(self, id):cert_path = os.path.join(self.work_dir, 'certificates')public_keys = os.path.join(cert_path, 'public_keys')private_keys = os.path.join(cert_path, 'private_keys')public_key_path = os.path.join(public_keys, '{0}.pub'.format(id))private_key_path = os.path.join(private_keys, '{0}.pri'.format(id))if not os.path.isfile(public_key_path) or not os.path.isfile(private_key_path):logging.debug('Generating ZMQ keys for: {0}.'.format(id))for _path in [cert_path, public_keys, private_keys]:if not os.path.isdir(_path):os.mkdir(_path)tmp_key_dir = tempfile.mkdtemp()try:public_key, private_key = create_certificates(tmp_key_dir, id)# the final location for keys#鑰匙的最終位置shutil.move(public_key, public_key_path)shutil.move(private_key, private_key_path)finally:shutil.rmtree(tmp_key_dir)# return copy of keys#返回密鑰副本return open(private_key_path, "r").readlines(), open(public_key_path, "r").readlines()def _remove_zmq_keys(self, id):cert_path = os.path.join(self.work_dir, 'certificates')public_keys = os.path.join(cert_path, 'public_keys')private_keys = os.path.join(cert_path, 'private_keys')public_key_path = os.path.join(public_keys, '{0}.pub'.format(id))private_key_path = os.path.join(private_keys, '{0}.pri'.format(id))for _file in [public_key_path, private_key_path]:if os.path.isfile(_file):os.remove(_file)def message_proxy(self, work_dir):"""drone_data_inboud is for data comming from dronesdrone_data_outbound is for commands to the drones, topic must either be a drone ID or all for sending a broadcast message to all drones無人機內部數據用于無人機的數據混合。無人機數據輸出用于向無人機發送命令,主題必須是無人機ID或全部用于向所有無人機發送廣播消息"""public_keys_dir = os.path.join(work_dir, 'certificates', 'public_keys')secret_keys_dir = os.path.join(work_dir, 'certificates', 'private_keys')# start and configure auth worker#啟動并配置身份驗證工作程序auth = IOLoopAuthenticator()auth.start()auth.allow('127.0.0.1')auth.configure_curve(domain='*', location=public_keys_dir)# external interfaces for communicating with drones#與無人機通信的外部接口#服務器證書server_secret_file = os.path.join(secret_keys_dir, 'beeswarm_server.pri')#獲取公鑰和密鑰server_public, server_secret = load_certificate(server_secret_file)drone_data_inbound = beeswarm.shared.zmq_context.socket(zmq.PULL)drone_data_inbound.curve_secretkey = server_secretdrone_data_inbound.curve_publickey = server_publicdrone_data_inbound.curve_server = Truedrone_data_inbound.bind('tcp://*:{0}'.format(self.config['network']['zmq_port']))drone_data_outbound = beeswarm.shared.zmq_context.socket(zmq.PUB)drone_data_outbound.curve_secretkey = server_secretdrone_data_outbound.curve_publickey = server_publicdrone_data_outbound.curve_server = Truedrone_data_outbound.bind('tcp://*:{0}'.format(self.config['network']['zmq_command_port']))# internal interfaces# all inbound session data from drones will be replayed on this socket#內部接口,來自無人機的所有入站會話數據將在此套接字上重播drone_data_socket = beeswarm.shared.zmq_context.socket(zmq.PUB)drone_data_socket.bind(SocketNames.DRONE_DATA.value)# all commands received on this will be published on the external interface#在此上接收到的所有命令都將在外部接口上發布drone_command_socket = beeswarm.shared.zmq_context.socket(zmq.PULL)drone_command_socket.bind(SocketNames.DRONE_COMMANDS.value)poller = zmq.Poller()poller.register(drone_data_inbound, zmq.POLLIN)poller.register(drone_command_socket, zmq.POLLIN)while True:# .recv() gives no context switch - why not? using poller with timeout instead#recv()不提供上下文切換-為什么不?將輪詢器與超時一起使用socks = dict(poller.poll(100))gevent.sleep()if drone_command_socket in socks and socks[drone_command_socket] == zmq.POLLIN:data = drone_command_socket.recv()drone_id, _ = data.split(' ', 1)logger.debug("Sending drone command to: {0}".format(drone_id))# pub socket takes care of filtering#pub socket負責過濾drone_data_outbound.send(data)elif drone_data_inbound in socks and socks[drone_data_inbound] == zmq.POLLIN:raw_msg = drone_data_inbound.recv()split_data = raw_msg.split(' ', 2)if len(split_data) == 3:topic, drone_id, data = split_dataelse:data = Nonetopic, drone_id, = split_datalogger.debug("Received {0} message from {1}.".format(topic, drone_id))# relay message on internal socket#內部插座上的中繼消息drone_data_socket.send(raw_msg)?
總結
以上是生活随笔為你收集整理的Python zmq 讲解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ubuntu 解决 pip 安装 lxm
- 下一篇: C# string转double,dou