python之rabbitMQ
? ? ? ?
一、簡單的rabbitMQ隊列通信
由上圖可知,數據是先發給exchange交換器,exchage再發給相應隊列。pika模塊是python對rabbitMQ的API接口。接收端有一個回調函數,一接收到數據就調用該函數。一條消息被一個消費者接收后,該消息就從隊列刪除。OK,了解上面的知識后,先來看看一個簡單的rabbitMQ列隊通信。
send端:
?1?import?pika?2?#連上rabbitMQ?3?connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))?4?channel=connection.channel()???????#生成管道,在管道里跑不同的隊列?5??6?#聲明queue?7?channel.queue_declare(queue='hello1')?8??9?#n?RabbitMQ?a?message?can?never?be?sent?directly?to?the?queue,it?always?needs?to?go?through?an?exchange.10?#向隊列里發數據11?channel.basic_publish(exchange='', #先把數據發給exchange交換器,exchage再發給相應隊列12???????????????????????routing_key='hello1', #向"hello'隊列發數據13???????????????????????body='HelloWorld!!') #發的消息14?print("[x]Sent'HelloWorld!'")15?connection.close()receive端:
?1?import?pika?2??3?connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))?4?channel=connection.channel()?5??6?#?You?may?ask?why?we?declare?the?queue?again??we?have?already?declared?it?in?our?previous?code.?7?#?We?could?avoid?that?if?we?were?sure?that?the?queue?already?exists.?For?example?if?send.py?program?8?#?was?run?before.?But?we're?not?yet?sure?which?program?to?run?first.?In?such?cases?it's?a?good?9?#?practice?to?repeat?declaring?the?queue?in?both?programs.10?channel.queue_declare(queue='hello1')#聲明隊列,保證程序不出錯11?12?13?def?callback(ch,method,properties,body):14?????print("-->ch",ch)15?????print("-->method",method)16?????print("-->properties",properties)17?????print("[x]?Received?%r"?%?body)?????????#一條消息被一個消費者接收后,該消息就從隊列刪除18?19?20?channel.basic_consume(callback,??????????????#回調函數,一接收到消息就調用回調函數21???????????????????????queue='hello1',22???????????????????????no_ack=False)????#消費完畢后向服務端發送一個確認,默認為False23?24?print('[*]?Waiting?for?messages.To?exit?press?CTRL+C')25?channel.start_consuming()運行結果:(上面的代碼對應我寫的注釋相信是看得懂的~)
?View Code
經過深入的測試,有以下兩個發現:
先運行rabbitMQ_1_send.py發送數據,rabbitMQ_2_receive.py未運行。發現當receive運行時仍能接收數據。
運行多個(eg:3個)接收數據的客戶端,再運行發送端,客戶端1收到數據,再運行發送端,客戶端2收到數據,再運行發送端,客戶端3收到數據。
RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。
?
二、全英文ack
在看上面的例子,你會發現有一句代碼no_ack=False(消費完畢后向服務端發送一個確認,默認為False),以我英語四級飄過的水平,看完下面關于ack的講解感覺寫得很牛啊!!于是分享一下:
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message?acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the?no_ack=True?flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
我把發送端和接收端分別比作生產者與消費者。生產者發送任務A,消費者接收任務A并處理,處理完后生產者將消息隊列中的任務A刪除。現在我們遇到了一個問題:如果消費者接收任務A,但在處理的過程中突然宕機了。而此時生產者將消息隊列中的任務A刪除。實際上任務A并未成功處理完,相當于丟失了任務/消息。為解決這個問題,應使消費者接收任務并成功處理完后發送一個ack到生產者!生產者收到ack后就明白任務A已被成功處理,這時才從消息隊列中將任務A刪除,如果沒有收到ack,就需要把任務A發送給下一個消費者,直到任務A被成功處理。
?
三、消息持久化
前面已經知道,生產者生產數據,消費者再啟動是可以接收數據的。
但是,生產者生產數據,然后重啟rabbitMQ,消費者是無法接收數據。
eg:消息在傳輸過程中rabbitMQ服務器宕機了,會發現之前的消息隊列就不存在了,這時我們就要用到消息持久化,消息持久化會讓隊列不隨著服務器宕機而消失,會永久的保存下去。下面看下關于消息持久化的英文講解:
We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable ?use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.
When?RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
First,?we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as?durable:
1?channel.queue_declare(queue='hello', durable=True)
Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called?hello?which is not durable.RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error(會曝錯) to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
1 channel.queue_declare(queue='task_queue', durable=True)
This?queue_declare?change needs to be applied to both the producer and consumer code.
At that point we're sure that the?task_queue?queue won't be lost even if RabbitMQ restarts. Now we need to?mark our messages as persistent - by supplying a?delivery_mode?property with a value?2.
1 channel.basic_publish(exchange='',
2 ??????????????????????routing_key="task_queue",
3 ??????????????????????body=message,
4 ??????????????????????properties=pika.BasicProperties(
5 ?????????????????????????delivery_mode?=?2,? ? ? # make message persistent
6 ??????????????????????))
上面的英文對消息持久化講得很好。消息持久化分為兩步:
-
持久化隊列。通過代碼實現持久化hello隊列:channel.queue_declare(queue='hello',?durable=True)
-
持久化隊列中的消息。通過代碼實現:properties=pika.BasicProperties(?delivery_mode?=?2, )
這里有個點要注意下:
如果你在代碼中已實現持久化hello隊列與隊列中的消息。那么你重啟rabbitMQ后再次運行代碼可能會爆錯!
因為: RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error.
為了解決這個問題,可以聲明一個與重啟rabbitMQ之前不同的隊列名(queue_name).
?
四、消息公平分發
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
?
帶消息持久化+公平分發的完整代碼
生產者端:
?View Code
消費者端:
?View Code
我在運行上面程序時對消費者端里回調函數的一句代碼(ch.basic_ack(delivery_tag =method.delivery_tag))十分困惑。這句代碼去掉消費者端也能照樣收到消息啊。這句代碼有毛線用處??
生產者端消息持久后,需要在消費者端加上(ch.basic_ack(delivery_tag =method.delivery_tag)):?保證消息被消費后,消費端發送一個ack,然后服務端從隊列刪除該消息.
?
五、消息發布與訂閱
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的queue收到,類似廣播的效果,這時候就要用到exchange了。PS:有興趣的了解redis的發布與訂閱,可以看看我寫的博客python之redis。
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded(丟棄). The rules for that are defined by the?exchange type.
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
?
fanout:?所有bind到此exchange的queue都可以接收消息
direct:?通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
?
表達式符號說明: #代表一個或多個字符,*代表任何字符
? ? ? 例:#.a會匹配a.a,aa.a,aaa.a等
? ? ? ? ?? *.a會匹配a.a,b.a,c.a等
? ? ?? ?注:使用RoutingKey為#,Exchange Type為topic的時候相當于使用fanout
?
下面我分別講下fanout,direct,topic:
1、fanout
fanout:?所有bind到此exchange的queue都可以接收消息
send端:
?View Code
receive端:
?View Code
有兩個點要注意下:
-
fanout-廣播,send端的routing_key='', #fanout的話為空(默認)
-
receive端有一句代碼:result=channel.queue_declare(exclusive=True),作用:不指定queue名字(為了收廣播),rabbitMQ會隨機分配一個queue名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除。
?
2、有選擇的接收消息(exchange type=direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
send端:
?View Code
receive端:
?View Code
其實最開始我看代碼是一臉懵逼的~ 下面是我在cmd進行測試的截圖(配合著截圖看會容易理解些),一個send端,兩個receive端(先起receive端,再起receive端):
send端:
receive端-1:
receive端-2:
?
3、更細致的消息過濾topic(供參考)
Although using the?direct?exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the?syslog?unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
感覺我英文水平不高啊~,我對照著垃圾有道翻譯,加上自己的理解,大概知道上面在講什么。
舉例: 如果是系統的錯誤,就把信息發送到A,如果是MySQL的錯誤,就把信息發送到B。但是對B來說,想實現接收MySQL的錯誤信息,可以用有選擇的接收消息(exchange type=direct),讓關鍵字為error就實現了啊!現在B有個需求:不是所有的錯誤信息都接收,只接收指定的錯誤。在某種信息再進行過濾,這就是更細致的消息過濾topic。
?
send端:
?View Code
receive端:
?View Code
?
?
六、RPC(Remote Procedure Call)
RPC的概念可看我百度的(其實就類似我之前做的FTP,我從客戶端發一個指令,服務端返回相關信息):
?View Code
下面重點講下RPC通信,我剛開始學挺難的,學完之后感覺RPC通信的思想很有啟發性,代碼的例子寫得也很牛!!
client端發的消息被server端接收后,server端會調用callback函數,執行任務后,還需要把相應的信息發送到client,但是server如何將信息發還給client?如果有多個client連接server,server又怎么知道是要發給哪個client??
RPC-server默認監聽rpc_queue.肯定不能把要發給client端的信息發到rpc_queue吧(rpc_queue是監聽client端發到server端的數據)。
合理的方案是server端另起一個queue,通過queue將信息返回給對應client。但問題又來了,queue是server端起的,故client端肯定不知道queue_name,連queue_name都不知道,client端接收毛線的數據??
解決方法:
客戶端在發送指令的同時告訴服務端:任務執行完后,數據通過某隊列返回結果。客戶端監聽該隊列就OK了。
client端:
?1?import?pika?2?import?uuid?3??4??5?class?FibonacciRpcClient(object):?6?????def?__init__(self):?7?????????self.connection?=?pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))?8??9?????????self.channel?=?self.connection.channel()10?????????#隨機建立一個queue,為了監聽返回的結果11?????????result?=?self.channel.queue_declare(exclusive=True)12?????????self.callback_queue?=?result.method.queue???##隊列名13?14?????????self.channel.basic_consume(self.on_response,??#一接收客戶端發來的指令就調用回調函數on_response15????????????????????????????????????no_ack=True,16????????????????????????????????????queue=self.callback_queue)17?18?????def?on_response(self,?ch,?method,?props,?body):??#回調19?????????#每條指令執行的速度可能不一樣,指令1比指令2先發送,但可能指令2的執行結果比指令1先返回到客戶端,20?????????#此時如果沒有下面的判斷,客戶端就會把指令2的結果誤認為指令1執行的結果21?????????if?self.corr_id?==?props.correlation_id:22?????????????self.response?=?body23?24?????def?call(self,?n):25?????????self.response?=?None????##指令執行后返回的消息26?????????self.corr_id?=?str(uuid.uuid4())???##可用來標識指令(順序)27?????????self.channel.basic_publish(exchange='',28????????????????????????????????????routing_key='rpc_queue',?#client發送指令,發到rpc_queue29????????????????????????????????????properties=pika.BasicProperties(30????????????????????????????????????????reply_to=self.callback_queue,?#將指令執行結果返回到reply_to隊列31????????????????????????????????????????correlation_id=self.corr_id,32????????????????????????????????????),33????????????????????????????????????body=str(n))34?????????while?self.response?is?None:35?????????????self.connection.process_data_events()?#去queue接收數據(不阻塞)36?????????return?int(self.response)37?38?39?fibonacci_rpc?=?FibonacciRpcClient()40?41?print("?[x]?Requesting?fib(30)")42?response?=?fibonacci_rpc.call(30)43?print("?[.]?Got?%r"?%?response)server端:
?1?import?pika?2?import?time?3??4?connection?=?pika.BlockingConnection(pika.ConnectionParameters(?5?????host='localhost'))?6??7?channel?=?connection.channel()?8??9?channel.queue_declare(queue='rpc_queue')10?11?12?def?fib(n):13?????if?n?==?0:14?????????return?015?????elif?n?==?1:16?????????return?117?????else:18?????????return?fib(n?-?1)?+?fib(n?-?2)19?20?21?def?on_request(ch,?method,?props,?body):22?????n?=?int(body)23?24?????print("?[.]?fib(%s)"?%?n)25?????response?=?fib(n)??#從客戶端收到的消息26?27?????ch.basic_publish(exchange='',???##服務端發送返回的數據到props.reply_to隊列(客戶端發送指令時聲明)28??????????????????????routing_key=props.reply_to,??#correlation_id?(隨機數)每條指令都有隨機獨立的標識符29??????????????????????properties=pika.BasicProperties(correlation_id=?\30??????????????????????????????????????????????????????????props.correlation_id),31??????????????????????body=str(response))32?????ch.basic_ack(delivery_tag=method.delivery_tag)??#客戶端持久化33?34?35?channel.basic_qos(prefetch_count=1)??#公平分發36?channel.basic_consume(on_request,????#一接收到消息就調用on_request37???????????????????????queue='rpc_queue')38?39?print("?[x]?Awaiting?RPC?requests")40?channel.start_consuming()
本文轉自帥氣的頭頭博客51CTO博客,原文鏈接http://blog.51cto.com/12902932/1924608如需轉載請自行聯系原作者
sshpp
總結
以上是生活随笔為你收集整理的python之rabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ASP.NET 5 入门 (3) – L
- 下一篇: 中国联通与成都携手,合作打造全球领先的大