python消息队列中间件_python-RabbtiMQ消息队列
1.RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
2.RabbitMQ能為你做些什么?
消息系統允許軟件、應用相互連接和擴展.這些應用可以相互鏈接起來組成一個更大的應用,或者將用戶設備和數據進行連接.消息系統通過將消息的發送和接收分離來實現應用程序的異步和解偶.
或許你正在考慮進行數據投遞,非阻塞操作或推送通知。或許你想要實現發布/訂閱,異步處理,或者工作隊列。所有這些都可以通過消息系統實現。
RabbitMQ是一個消息代理- 一個消息系統的媒介。它可以為你的應用提供一個通用的消息發送和接收平臺,并且保證消息在傳輸過程中的安全。
3.RabbitMQ 安裝使用
4.Python應用RabbitMQ
python操作RabbitMQ的模塊有三種:pika,Celery,Haigha。
本文使用的是pika。
"""RabbitMQ-生產者。"""
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識"""channel.queue_declare(queue='hello')"""定義queue中的消息內容"""channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')print("[x] Sent 'Hello World!'")
"""RabbitMQ-消費者。"""
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識,與生產者隊列中對應"""channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue='hello', #queue_declare(queue='hello') 對應
no_ack=True
)"""消費者會一直監聽這queue,如果隊列中沒有消息,則會卡在這里,等待消息隊列中生成消息。"""
print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
5.RabbitMQ消息持久化
importpika
queue_name= 'xiaoxi_'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
properties=pika.BasicProperties( #消息持久化.....
delivery_mode=2,
)
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)#time.sleep(5) # 模擬消費者丟失生產者發送的消息,生產者消息隊列中的這一條消息則不會刪除。
print('rev messages-->',body)"""手動向生產者確認收到消息"""
#ch.basic_ack(delivery_tag=method.delivery_tag)
"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
6.RabbitMQ消息公平分發
importpika
queue_name= 'xiaoxi_1'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=input_value,
)continue
producer.py
importpika,time
queue_name= 'xiaoxi_1'
"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""定義一個queue,定義queue名稱,標識
queue,durable 持久化"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""模擬處理消息快慢速度"""time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)"""根據消費者處理消息的快慢公平分發消息"""channel.basic_qos(prefetch_count=1)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.RabbitMQ-廣播模式。
消息的發送模式類型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout
4.headers: 通過headers 來決定把消息發給哪些queue (少用)
7.1 topic 廣播模式。
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'
"""定義exchage模式 direct廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""消息的發送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout
4.headers: 通過headers 來決定把消息發給哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=input_value,
)continue
producer.py
importpika,time"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.2?direct 廣播模式
importpika
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()"""通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'direct_messages'routing_key= 'my_direct'
"""定義exchage模式 direct廣播模式
消息的發送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout
4.headers: 通過headers 來決定把消息發給哪些queue (少用)"""channel.exchange_declare(exchange=exchange_name,exchange_type='direct')
channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body='hello word!',
)#while True:#input_value = input(">>:").strip()#if input_value:#"""定義queue中的消息內容"""#print('producer messages:{0}'.format(input_value))#channel.basic_publish(#exchange=exchange_name,#routing_key=routing_key,#body=input_value,#)#continue
producer.py
importpika,time
connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel=connection.channel()
exchange_name= 'direct_messages'routing_key= 'my_direct'channel.exchange_declare(exchange=exchange_name,exchange_type='direct')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue=queue_name,
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
7.3?fanout 廣播模式
importpika"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()
exchange_name= 'messages'
"""定義exchage模式 fanout廣播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""消息的發送模式類型
1.fanout: 所有bind到此exchange的queue都可以接收消息 即是廣播模式,所有的consumer都能收到。
2.direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息 ,指定唯一的。
3.topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。符合條件的。
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout
4.headers: 通過headers 來決定把消息發給哪些queue (少用)"""
whileTrue:
input_value= input(">>:").strip()ifinput_value:"""定義queue中的消息內容"""
print('producer messages:{0}'.format(input_value))
channel.basic_publish(
exchange=exchange_name,
routing_key='',
body=input_value,
)continue
producer.py
importpika,time"""聲明socket"""connection=pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)"""聲明一個管道"""channel=connection.channel()"""
"""exchange_name= 'messages'channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除"""res= channel.queue_declare(exclusive=True)
queue_name=res.method.queue
channel.queue_bind(exchange=exchange_name,queue=queue_name)"""每一個消費者隨機一個唯一的queue_name"""
print('queue_name:{0}',format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)
ch.basic_ack(delivery_tag=method.delivery_tag)"""消費,接收消息..."""channel.basic_consume(
consumer_callback=callback, #如果收到消息,則回調這個函數處理消息
queue=queue_name,#no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
)print('waiting for meassages, to exit press CTRL+C')
channel.start_consuming()
consumer.py
8?RabbitMQ 實現 RPC
"""RabbitMQ-生產者。
利用rabbitMQ 實現一個能收能發的RPC小程序。
重點需要注意的是:queue的綁定。接收的一端必選預先綁定queue生成隊列,發送端才能根據queue發送。"""
importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.app_id=str(uuid.uuid4())
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()"""生成一個自動queue,傳過去server,server再往這個自動queue回復數據"""autoqueue= self.channel.queue_declare(exclusive=True)
self.callback_queue=autoqueue.method.queue"""先定義一個接收回復的動作"""self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id ==properties.app_id:
self.response=bodydefsend(self,msg):
self.response=None
self.channel.basic_publish(
exchange='',
routing_key=self.rpc_queue,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
app_id=self.app_id,
),
body=str(msg)
)#發送完消息,進入接收模式。
while self.response isNone:#print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))
self.connection.process_data_events()#time.sleep(0.5)
returnself.response
rpc_request_queue= 'rpc_request_queue'rb=rabbitmqClient(rpc_request_queue)whileTrue:
msg= input('input >> :').strip()ifmsg:print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id))print('send msg:{}'.format(msg))
reponses=rb.send(msg)print('reponses msg:{}'.format(reponses.decode('utf-8')))continue
client.py
"""RabbitMQ-消費者。"""
importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):
self.rpc_queue=rpc_queue
self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel=self.connection.channel()
self.channel.queue_declare(queue=self.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...
ch.basic_publish(exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(
reply_to=properties.reply_to,
app_id=properties.app_id,
),
body='reponses ok! msg is:{}'.format(body.decode('utf-8')))defstart_consuming(self):
self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)print('waiting for meassages, to exit press CTRL+C')
self.channel.start_consuming()
rpc_request_queue= 'rpc_request_queue'rd_server=rabbitmqServer(rpc_request_queue)
rd_server.start_consuming()
server.py
總結
以上是生活随笔為你收集整理的python消息队列中间件_python-RabbtiMQ消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 陇南看输卵管粘连最好的医院推荐
- 下一篇: 北京环球影城淡季是什么时候