RabbitMQ从入门到精通
?
From:http://blog.csdn.net/column/details/rabbitmq.html
?
?
RabbitMQ 介紹
?
歷史
?
? ? RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的開源實現(xiàn)。AMQP 的出現(xiàn)其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業(yè)有一些商業(yè)實現(xiàn)(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯(lián)合制定了 AMQP 的公開標準。
??? RabbitMQ是由RabbitMQ?Technologies Ltd開發(fā)并且提供商業(yè)支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被并入Pivotal。其實VMWare,Pivotal和EMC本質(zhì)上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現(xiàn)在并沒有上市。
??? RabbitMQ的官網(wǎng)是:http://www.rabbitmq.com
?
?
?
應用場景
?
?
? ? ?言歸正傳。RabbitMQ,或者說AMQP解決了什么問題,或者說它的應用場景是什么?
???? 對于一個大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:
?1)信息的發(fā)送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
?2)如何降低發(fā)送者和接收者的耦合度?
?3)如何讓Priority高的接收者先接到數(shù)據(jù)?
?4)如何做到load balance?有效均衡接收者的負載?
?5)如何有效的將數(shù)據(jù)發(fā)送到相關的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
?6)如何做到可擴展,甚至將這個通信模塊發(fā)到cluster上?
?7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?
? AMDQ協(xié)議解決了以上的問題,而RabbitMQ實現(xiàn)了AMQP。
?
系統(tǒng)架構
?
?
?
成為系統(tǒng)架構可能不太合適,可能叫應用場景的系統(tǒng)架構更合適。
??
??? 這個系統(tǒng)架構圖版權屬于sunjun041640。
????RabbitMQ Server: 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數(shù)據(jù)能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對于普通的應用來說這已經(jīng)足夠了。當然對于商業(yè)系統(tǒng)來說,可以再做一層數(shù)據(jù)一致性的guard,就可以徹底保證系統(tǒng)的一致性了。
????Client A & B: 也叫Producer,數(shù)據(jù)的發(fā)送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸?shù)臄?shù)據(jù)。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發(fā)給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規(guī)則。
????Client 1,2,3:也叫Consumer,數(shù)據(jù)的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發(fā)送給它的某個訂閱者即Consumer。當然可能會把同一個Message發(fā)送給很多的Consumer。在這個Message中,只有payload,label已經(jīng)被刪掉了。對于Consumer來說,它是不知道誰發(fā)送的這個信息的。就是協(xié)議本身不支持。但是當然了如果Producer發(fā)送的payload包含了Producer的信息就另當別論了。
? ?? 對于一個數(shù)據(jù)從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。
????????Exchanges?are where producers publish their messages.
????????Queuesare where the messages end up and are received by consumers
????????Bindings?are how the messages get routed from the exchange to particular queues.
?? 還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道)。
?
???Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
???Channels:?虛擬連接。它建立在上述的TCP連接中。數(shù)據(jù)流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
????那么,為什么使用Channel,而不是直接使用TCP連接?
??? 對于OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對于系統(tǒng)的性能有很大的影響,而且TCP的連接數(shù)也有限制,這也限制了系統(tǒng)處理高并發(fā)的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對于Producer或者Consumer來說,可以并發(fā)的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數(shù)據(jù)可以Publish10K的數(shù)據(jù)包。當然對于不同的硬件環(huán)境,不同的數(shù)據(jù)包大小這個數(shù)據(jù)肯定不一樣,但是我只想說明,對于普通的Consumer或者Producer來說,這已經(jīng)足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。
?
進一步的細節(jié)闡明
?
?
使用ack確認Message的正確傳遞
?
默認情況下,如果Message 已經(jīng)被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當然也可以讓同一個Message發(fā)送到很多的Consumer。
??? 如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數(shù)據(jù)到達,那么這個數(shù)據(jù)會被cache,不會被丟棄。當有Consumer時,這個數(shù)據(jù)會被立即發(fā)送到這個Consumer,這個數(shù)據(jù)被Consumer正確收到時,這個數(shù)據(jù)就被從queue中刪除。
???? 那么什么是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數(shù)據(jù)沒有被ack,那么:
???? RabbitMQ Server會把這個信息發(fā)送到下一個Consumer。
??? 如果這個app有bug,忘記了ack,那么RabbitMQ Server不會再發(fā)送數(shù)據(jù)給它,因為Server認為這個Consumer處理能力有限。
?? 而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ack,甚至在額外的延時后發(fā)送ack,將有效的balance Consumer的load。
?? 當然對于實際的例子,比如我們可能會對某些數(shù)據(jù)進行merge,比如merge 4s內(nèi)的數(shù)據(jù),然后sleep 4s后再獲取數(shù)據(jù)。特別是在監(jiān)聽系統(tǒng)的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端用戶也不會感覺到。
?
Reject a message
?
? ?有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 發(fā)送到下一個Consumer。第二種是從queue中立即刪除該Message。
?
Creating a queue
? ? ??Consumer和Procuder都可以通過?queue.declare?創(chuàng)建queue。對于某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以創(chuàng)建私有的queue。這樣只有app本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創(chuàng)建一個已經(jīng)存在的queue呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改。
?
? ??那么誰應該負責創(chuàng)建這個queue呢?是Consumer,還是Producer?
如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那么Producer Publish的Message會被丟棄。所以,還是為了數(shù)據(jù)不丟失,Consumer和Producer都try to create the queue!反正不管怎么樣,這個接口都不會出問題。
?? queue對load balance的處理是完美的。對于多個Consumer來說,RabbitMQ 使用循環(huán)的方式(round-robin)的方式均衡的發(fā)送給不同的Consumer。
Exchanges
?
? ? 從架構圖可以看出,Procuder Publish的Message進入了Exchange。接著通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue里。queue也是通過這個routing keys來做的綁定。
???? 有三種類型的Exchanges:direct, fanout,topic。 每個實現(xiàn)了不同的路由算法(routing algorithm)。
·????????Direct exchange: 如果 routing key 匹配, 那么Message就會被傳遞到相應的queue中。其實在queue創(chuàng)建時,它會自動的以queue的名字作為routing key來綁定那個exchange。
·????????Fanout exchange: 會向響應的queue廣播。
·????????Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。
?
Virtual hosts
?
?
? ?每個virtual host本質(zhì)上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的application中使用RabbitMQ。
?? 接下來我會使用Python來說明RabbitMQ的使用方法。
?
python 使用 RabbitMQ 寫 "Hello World"
?
http://www.rabbitmq.com/tutorials/tutorial-one-python.html
?
使用Python(pika 0.9.8)實現(xiàn)從Producer到Consumer傳遞數(shù)據(jù)”Hello, World“。
?
? ? ?RabbitMQ實現(xiàn)了AMQP定義的消息隊列。它實現(xiàn)的功能”非常簡單“:從Producer接收數(shù)據(jù)然后傳遞到Consumer。它能保證多并發(fā),數(shù)據(jù)安全傳遞,可擴展。
???? 和任何的Hello world一樣,它們都不復雜。我們將會設計兩個程序,一個發(fā)送Hello world,另一個接收這個數(shù)據(jù)并且打印到屏幕。
????? 整體的設計如下圖:
?
?
環(huán)境配置
?
?
?
RabbitMQ 實現(xiàn)了AMQP。因此,我們需要安裝AMPQ的library。幸運的是對于多種編程語言都有實現(xiàn)。我們可以使用以下lib的任何一個:
- py-amqplib??
- txAMQP
- pika
在這里我們將使用pika. 可以通過?pip?包管理工具來安裝:
$ sudo pip install pika==0.9.8?
?
這個安裝依賴于pip和Git-core。
-
On Ubuntu:
$ sudo apt-get install python-pip git-core -
On Debian:
$ sudo apt-get install python-setuptools git-core $ sudo easy_install pip -
On Windows:To install easy_install, run the MS Windows Installer for?setuptools
> easy_install pip > pip install pika==0.9.8
?
Sending
?
?
?
第一個program?send.py:發(fā)送Hello world 到queue。正如我們在上篇文章提到的,你程序的第一句話就是建立連接,第二句話就是創(chuàng)建channel:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()創(chuàng)建連接傳入的參數(shù)就是RabbitMQ Server的ip或者name。關于誰創(chuàng)建queue,上面也討論過:Producer和Consumer都應該去創(chuàng)建。接下來我們創(chuàng)建名字為hello的queue:
?
channel.queue_declare(queue='hello')?
創(chuàng)建了channel,我們可以通過相應的命令來list queue:
?
$ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.?
現(xiàn)在我們已經(jīng)準備好了發(fā)送了。
從架構圖可以看出,Producer只能發(fā)送到exchange,它是不能直接發(fā)送到queue的。
現(xiàn)在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發(fā)送給指定的queue。routing_key就是指定的queue名字。
?
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'"?
退出前別忘了關閉connection。
?
connection.close()?
Receiving
?
?
第二個program?receive.py?將從queue中獲取Message并且打印到屏幕。
第一步還是創(chuàng)建connection。第二步創(chuàng)建channel。第三步創(chuàng)建queue,name = hello:
?
?
?
channel.queue_declare(queue='hello')?
接下來要subscribe了。在這之前,需要聲明一個回調(diào)函數(shù)來處理接收到的數(shù)據(jù)。
?
def callback(ch, method, properties, body): print " [x] Received %r" % (body,)?
subscribe:
?
channel.basic_consume(callback, queue='hello', no_ack=True)?
最后,準備好無限循環(huán)監(jiān)聽吧:
?
print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()?
最終版本
send.py:
?
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()?
?receive.py:
?
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()?
最終運行
先運行?send.py?program:
?
$ python send.py [x] Sent 'Hello World!'?
send.py?每次運行完都會停止。注意:現(xiàn)在數(shù)據(jù)已經(jīng)存到queue里了。接收它:
?
$ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'?
?
任務分發(fā)機制
?
http://www.rabbitmq.com/tutorials/tutorial-two-Python.html
?
? ?上面解決了從發(fā)送端(Producer)向接收端(Consumer)發(fā)送“Hello World”的問題。在實際的應用場景中,這是遠遠不夠的。現(xiàn)在將結合更加實際的應用場景來講解更多的高級用法。
?? 當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發(fā)機制來balance每個Consumer的load。試想一下,對于web application來說,在一個很多的HTTP request里是沒有時間來處理復雜的運算的,只能通過后臺的一些工作線程來完成。接下來我們分布講解。?
?? 應用場景就是RabbitMQ Server會將queue的Message分發(fā)給不同的Consumer以處理計算密集型的任務:
?
準備
?
?
在上面,我們簡單在Message中包含了一個字符串"Hello World"。現(xiàn)在為了是Consumer做的是計算密集型的工作,那就不能簡單的字符串了。在現(xiàn)實應用中,Consumer有可能做的是一個圖片的resize,或者是pdf文件的渲染或者內(nèi)容提取。但是作為Demo,還是用字符串模擬吧:通過字符串中的.的數(shù)量來決定計算的復雜度,每個.都會消耗1s,即sleep(1)。
??? 還是復用上面的code,根據(jù)“計算密集型”做一下簡單的修改,為了辨別,我們把send.py 的名字換成new_task.py
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)同樣的道理,把receive.py的名字換成worker.py,并且根據(jù)Message中的.的數(shù)量進行計算密集型模擬:
?
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"?
Round-robin dispatching 循環(huán)分發(fā)
?
? ? ? ? RabbitMQ的分發(fā)機制非常適合擴展,而且它是專門為并發(fā)程序設計的。如果現(xiàn)在load加重,那么只需要創(chuàng)建更多的Consumer來進行任務處理即可。當然了,對于負載還要加大怎么辦?我沒有遇到過這種情況,那就可以創(chuàng)建多個virtual Host,細化不同的通信類別了。
???? 首先開啟兩個Consumer,即運行兩個worker.py。
Console1:
?
?
?
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C?
Consule2:
?
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C?
Producer new_task.py要Publish Message了:
?
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....?
注意一下:.代表的sleep(1)。接著開一下Consumer worker.py收到了什么:
?
Console1:
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'Console2:
?
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'?
默認情況下,RabbitMQ 會順序的分發(fā)每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發(fā)到下一個Consumer。這種分發(fā)方式叫做round-robin。這種分發(fā)還有問題,接著向下讀吧。
Message acknowledgment 消息確認
?
? ? ? 每個Consumer可能需要一段時間才能處理完收到的數(shù)據(jù)。如果在這個過程中,Consumer出錯了,異常退出了,而數(shù)據(jù)還沒有處理完成,那么非常不幸,這段數(shù)據(jù)就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數(shù)據(jù)后,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。
???? 如果一個Consumer異常退出了,它處理的數(shù)據(jù)能夠被另外的Consumer處理,這樣數(shù)據(jù)在這種情況下就不會丟失了(注意是這種情況下)。
????? 為了保證數(shù)據(jù)不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數(shù)據(jù)后發(fā)送ack。
??? 在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收,處理完成,RabbitMQ可以去安全的刪除它了。
??? 如果Consumer退出了但是沒有發(fā)送ack,那么RabbitMQ就會把這個Message發(fā)送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會丟失。
??? 這里并沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message并沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數(shù)據(jù)處理。
??? 默認情況下,消息確認是打開的(enabled)。在上篇文章中我們通過no_ack = True 關閉了ack。重新修改一下callback,以在消息處理完成后發(fā)送ack:
?
?
?
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')?
? ? ?這樣即使你通過Ctr-C中斷了worker.py,那么Message也不會丟失了,它會被分發(fā)到下一個Consumer。
?
????? 如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發(fā)。然后RabbitMQ會占用越來越多的內(nèi)存,由于RabbitMQ會長時間運行,因此這個“內(nèi)存泄漏”是致命的。去調(diào)試這種錯誤,可以通過一下命令打印un-acked Messages:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.Message durability消息持久化
?
? ? ?在上一節(jié)中我們知道了即使Consumer異常退出,Message也不會丟失。但是如果RabbitMQ Server退出呢?軟件都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟件就有bug,沒有bug的那不叫軟件),它還是有可能退出的:被其它軟件影響,或者系統(tǒng)重啟了,系統(tǒng)panic了。。。
??? 為了保證在RabbitMQ退出或者crash了數(shù)據(jù)仍沒有丟失,需要將queue和Message都要持久化。
queue的持久化需要在聲明時指定durable=True:
channel.queue_declare(queue='hello', durable=True)上述語句執(zhí)行不會有什么錯誤,但是確得不到我們想要的結果,原因就是RabbitMQ Server已經(jīng)維護了一個叫hello的queue,那么上述執(zhí)行不會有任何的作用,也就是hello的任何屬性都不會被影響。這一點在上篇文章也討論過。
?
那么workaround也很簡單,聲明一個另外的名字的queue,比如名字定位task_queue:
?
[python]?view plain?copy
再次強調(diào),Producer和Consumer都應該去創(chuàng)建這個queue,盡管只有一個地方的創(chuàng)建是真正起作用的:
?
接下來,需要持久化Message,即在Publish的時候指定一個properties,方式如下:
?
[python]?view plain?copy
關于持久化的進一步討論:
?
? ? 為了數(shù)據(jù)不丟失,我們采用了:
??? 但是這樣能保證數(shù)據(jù)100%不丟失嗎?
??? 答案是否定的。問題就在與RabbitMQ需要時間去把這些信息存到磁盤上,這個time window雖然短,但是它的確還是有。在這個時間窗口內(nèi)如果數(shù)據(jù)沒有保存,數(shù)據(jù)還會丟失。還有另一個原因就是RabbitMQ并不是為每個Message都做fsync:它可能僅僅是把它保存到Cache里,還沒來得及保存到物理磁盤上。
??? 因此這個持久化還是有問題。但是對于大多數(shù)應用來說,這已經(jīng)足夠了。當然為了保持一致性,你可以把每次的publish放到一個transaction中。這個transaction的實現(xiàn)需要user defined codes。
??? 那么商業(yè)系統(tǒng)會做什么呢?一種可能的方案是在系統(tǒng)panic時或者異常重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。
Fair dispatch 公平分發(fā)
?
? ? 你可能也注意到了,分發(fā)機制不是那么優(yōu)雅。默認狀態(tài)下,RabbitMQ將第n個Message分發(fā)給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發(fā)。
?? 那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那么,RabbitMQ是如何處理這種問題呢?
?
? 通過?basic.qos?方法設置prefetch_count=1?。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發(fā)給它。 設置方法如下:
?
[python]?view plain?copy
注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創(chuàng)建更多的virtualHost來細化你的設計。
?
?
?
最終版本
?
new_task.py?script:
?
[python]?view plain?copy
?
worker.py script:
?
[python]?view plain?copy
?
分發(fā)到多Consumer(Publish/Subscribe)
?
http://www.rabbitmq.com/tutorials/tutorial-three-Python.html
?
? ? ? 上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們將會將同一個Message deliver到多個Consumer中。這個模式也被成為 "publish / subscribe"。
??? 這篇文章中,我們將創(chuàng)建一個日志系統(tǒng),它包含兩個部分:第一個部分是發(fā)出log(Producer),第二個部分接收到并打印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。
?
Exchanges
?
? ? RabbitMQ 的Messaging Model就是Producer并不會直接發(fā)送Message到queue。實際上,Producer并不知道它發(fā)送的Message是否已經(jīng)到達queue。
?? Producer發(fā)送的Message實際上是發(fā)到了Exchange中。它的功能也很簡單:從Producer接收Message,然后投遞到queue中。Exchange需要知道如何處理Message,是把它放到那個queue中,還是放到多個queue中?這個rule是通過Exchange 的類型定義的。
?
?? 我們知道有三種類型的Exchange:direct,?topic?和fanout。fanout就是廣播模式,會將所有的Message都放到它所知道的queue中。創(chuàng)建一個名字為logs,類型為fanout的Exchange:
?
[python]?view plain?copy
?
Listing exchanges
通過rabbitmqctl可以列出當前所有的Exchange:
?
[python]?view plain?copy
注意?amq.*?exchanges 和the default (unnamed)exchange是RabbitMQ默認創(chuàng)建的。
?
現(xiàn)在我們可以通過exchange,而不是routing_key來publish Message了:
?
[python]?view plain?copy
?
Temporary queues
?
?
? ? 截至現(xiàn)在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。
??? 但是對于我們將要構建的日志系統(tǒng),并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現(xiàn)這個目標,我們需要兩件事情:
??? 1) 每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。方法:
?
[python]?view plain?copy
??? 通過result.method.queue?可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
??? 2)當Consumer關閉連接時,這個queue要被deleted。可以加個exclusive的參數(shù)。方法:
?
[python]?view plain?copy
?
Bindings綁定
?
?
現(xiàn)在我們已經(jīng)創(chuàng)建了fanout類型的exchange和沒有名字的queue(實際上是RabbitMQ幫我們?nèi)×嗣?#xff09;。那exchange怎么樣知道它的Message發(fā)送到哪個queue呢?答案就是通過bindings:綁定。
?
方法:
?
[python]?view plain?copy
現(xiàn)在logs的exchange就將它的Message附加到我們創(chuàng)建的queue了。
?
Listing bindings
使用命令rabbitmqctl list_bindings。
?
最終版本
?
?
? ? 我們最終實現(xiàn)的數(shù)據(jù)流圖如下:
Producer,在這里就是產(chǎn)生log的program,基本上和前幾個都差不多。最主要的區(qū)別就是publish通過了exchange而不是routing_key。
emit_log.py?script:
?
[python]?view plain?copy
?
還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。
Consumer:receive_logs.py:
?
[python]?view plain?copy
我們開始不是說需要兩個Consumer嗎?一個負責記錄到文件;一個負責打印到屏幕?
其實用重定向就可以了,當然你想修改callback自己寫文件也行。我們使用重定向的方法:
We're done. If you want to save logs to a file, just open a console and type:
?
[python]?view plain?copy
Consumer2:打印到屏幕:
?
[python]?view plain?copy
接下來,Producer:
?
[python]?view plain?copy
使用命令rabbitmqctl list_bindings你可以看我們創(chuàng)建的queue。
一個output:
?
[python]?view plain?copy
這個結果還是很好理解的。
?
?
?
Routing 消息路由
http://www.rabbitmq.com/tutorials/tutorial-four-Python.html
?
? ? 上篇文章中,我們構建了一個簡單的日志系統(tǒng)。接下來,我們將豐富它:能夠使用不同的severity來監(jiān)聽不同等級的log。比如我們希望只有error的log才保存到磁盤上。
?
?
Bindings綁定
?
?
??? 上篇文章中我們是這么做的綁定:
?
[python]?view plain?copy
??? 綁定其實就是關聯(lián)了exchange和queue。或者這么說:queue對exchagne的內(nèi)容感興趣,exchange要把它的Message deliver到queue中。
?
??? 實際上,綁定可以帶routing_key?這個參數(shù)。其實這個參數(shù)的名稱和basic_publish?的參數(shù)名是相同了。為了避免混淆,我們把它成為binding key。
??? 使用一個key來創(chuàng)建binding :
?
[python]?view plain?copy
對于fanout的exchange來說,這個參數(shù)是被忽略的。
?
?
Direct exchange
?
?
? Direct exchange的路由算法非常簡單:通過binding key的完全匹配,可以通過下圖來說明。?
??? exchange X和兩個queue綁定在一起。Q1的binding key是orange。Q2的binding key是black和green。
??? 當P publish key是orange時,exchange會把它放到Q1。如果是black或者green那么就會到Q2。其余的Message都會被丟棄。
?
?
Multiple bindings
?
????? 多個queue綁定同一個key是可以的。對于下圖的例子,Q1和Q2都綁定了black。也就是說,對于routing key是black的Message,會被deliver到Q1和Q2。其余的Message都會被丟棄。
??
?
Emitting logs
?
首先是我們要創(chuàng)建一個direct的exchange:
?
[python]?view plain?copy
我們將使用log的severity作為routing key,這樣Consumer可以針對不同severity的log進行不同的處理。
publish:
?
?
[python]?view plain?copy
我們使用三種severity:'info', 'warning', 'error'.
?
?
Subscribing
?
對于queue,我們需要綁定severity:
?
[python]?view plain?copy
?
?
最終版本
?
The code for?emit_log_direct.py:
?
[python]?view plain?copy
The code for?receive_logs_direct.py:
?
?
[python]?view plain?copy
我們想把warning和error的log記錄到一個文件中:
?
?
[python]?view plain?copy
打印所有l(wèi)og到屏幕:
?
?
[python]?view plain?copy
?
?
使用主題進行消息分發(fā)
?
?
http://www.rabbitmq.com/tutorials/tutorial-five-Python.html
?
上面實現(xiàn)了一個簡單的日志系統(tǒng)。Consumer可以監(jiān)聽不同severity的log。但是,這也是它之所以叫做簡單日志系統(tǒng)的原因,因為是僅僅能夠通過severity設定。不支持更多的標準。
??????? 比如syslog?unix的日志工具,它可以通過severity (info/warn/crit...) 和模塊(auth/cron/kern...)。這可能更是我們想要的:我們可以僅僅需要cron模塊的log。
??????? 為了實現(xiàn)類似的功能,我們需要用到topic exchange。
?
Topic exchange
?
?
? ? ? ? 對于Message的routing_key是有限制的,不能使任意的。格式是以點號“."分割的字符表。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
??????? 對于routing_key,有兩個特殊字符(在正則表達式里叫元字符):
- *?(星號) 代表任意 一個單詞
- #?(hash) 0個或者多個單詞
??????? 請看下面一個例子:
???? Producer發(fā)送消息時需要設置routing_key,routing_key包含三個單詞和兩個點號。第一個key是描述了celerity(靈巧,敏捷),第二個是colour(色彩),第三個是species(物種):"<celerity>.<colour>.<species>"。
???? 在這里我們創(chuàng)建了兩個綁定: Q1 的binding key 是"*.orange.*"; Q2 是? "*.*.rabbit" 和 "lazy.#":
- Q1 感興趣所有orange顏色的動物
- Q2 感興趣所有的rabbits和所有的lazy的
???? 比如routing_key是 "quick.orange.rabbit"將會發(fā)送到Q1和Q2中。消息"lazy.orange.elephant" 也會發(fā)送到Q1和Q2。但是"quick.orange.fox" 會發(fā)送到Q1;"lazy.brown.fox"會發(fā)送到Q2。"lazy.pink.rabbit" 也會發(fā)送到Q2,但是盡管兩個routing_key都匹配,它也只是發(fā)送一次。"quick.brown.fox" 會被丟棄。
???? 如果發(fā)送的單詞不是3個呢? 答案要看情況,因為#是可以匹配0個或任意個單詞。比如"orange" or "quick.orange.male.rabbit",它們會被丟棄。如果是lazy那么就會進入Q2。類似的還有 "lazy.orange.male.rabbit",盡管它包含四個單詞。
Topic exchange和其他exchange
由于有"*" (star) and "#" (hash), Topic exchange 非常強大并且可以轉化為其他的exchange:
??? 如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
??? 如果 "*" (star) and "#" (hash) 沒有被使用,那么topic exchange就變成了direct exchange。
?
代碼實現(xiàn)
?
?
? ? ?現(xiàn)在我們要refine我們上篇的日志系統(tǒng)。routing keys 有兩個部分: "<facility>.<severity>"。
The code for?emit_log_topic.py:
?
[python]?view plain?copy
The code for?receive_logs_topic.py:
?
?
[python]?view plain?copy
?
運行和結果
?
接收所有的log:
?
[python]?view plain?copy
接收所有kern facility的log:
?
?
?
?
[python]?view plain?copy
僅僅接收critical的log:
?
?
[python]?view plain?copy
可以創(chuàng)建多個綁定:
?
?
[python]?view plain?copy
Producer產(chǎn)生一個log:"kern.critical" type:
?
?
[python]?view plain?copy
課后思考題:
?
- Will "*" binding catch a message sent with an empty routing key?
- Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
- How different is "a.*.#" from "a.#"?
?
適用于云計算集群的遠程調(diào)用(RPC)
?
?
http://www.rabbitmq.com/tutorials/tutorial-six-Python.html
? ? ? ? 在云計算環(huán)境中,很多時候需要用它其他機器的計算資源,我們有可能會在接收到Message進行處理時,會把一部分計算任務分配到其他節(jié)點來完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我們將會通過其它節(jié)點求來斐波納契完成示例。
客戶端接口 Client interface
?
? ? ? ? 為了展示一個RPC服務是如何使用的,我們將創(chuàng)建一段很簡單的客戶端class。 它將會向外提供名字為call的函數(shù),這個call會發(fā)送RPC請求并且阻塞知道收到RPC運算的結果。代碼如下:
?
[python]?view plain?copy
?
回調(diào)函數(shù)隊列 Callback queue
?
? ? ? ? 總體來說,在RabbitMQ進行RPC遠程調(diào)用是比較容易的。client發(fā)送請求的Message然后server返回響應結果。為了收到響應client在publish message時需要提供一個”callback“(回調(diào))的queue地址。code如下:
?
[python]?view plain?copy
Message properties
?
AMQP 預定義了14個屬性。它們中的絕大多很少會用到。以下幾個是平時用的比較多的:
- delivery_mode: 持久化一個Message(通過設定值為2)。其他任意值都是非持久化。請移步RabbitMQ消息隊列(三):任務分發(fā)機制
- content_type: 描述mime-type 的encoding。比如設置為JSON編碼:設置該property為application/json。
- reply_to: 一般用來指明用于回調(diào)的queue(Commonly used to name a callback queue)。
- correlation_id: 在請求中關聯(lián)處理RPC響應(correlate RPC responses with requests)。
相關id Correlation id
?
? ? ? ?在上個小節(jié)里,實現(xiàn)方法是對每個RPC請求都會創(chuàng)建一個callback queue。這是不高效的。幸運的是,在這里有一個解決方法:為每個client創(chuàng)建唯一的callback queue。
?????? 這又有其他問題了:收到響應后它無法確定是否是它的,因為所有的響應都寫到同一個queue了。上一小節(jié)的correlation_id在這種情況下就派上用場了:對于每個request,都設置唯一的一個值,在收到響應后,通過這個值就可以判斷是否是自己的響應。如果不是自己的響應,就不去處理。
總結
?
?
???? 工作流程:
- 當客戶端啟動時,它創(chuàng)建了匿名的exclusive callback queue.
- 客戶端的RPC請求時將同時設置兩個properties:?reply_to設置為callback queue;correlation_id設置為每個request一個獨一無二的值.
- 請求將被發(fā)送到an?rpc_queue?queue.
- RPC端或者說server一直在等待那個queue的請求。當請求到達時,它將通過在reply_to指定的queue回復一個message給client。
- client一直等待callback queue的數(shù)據(jù)。當message到達時,它將檢查correlation_id的值,如果值和它request發(fā)送時的一致那么就將返回響應。
?
最終實現(xiàn)
?
?
The code for?rpc_server.py:
?
[python]?view plain?copy
The server code is rather straightforward:
?
- (4) As usual we start by establishing the connection and declaring the queue.
- (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).
- (19) We declare a callback for?basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.
- (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count?setting.
The code for?rpc_client.py:
?
[python]?view plain?copy
The client code is slightly more involved:
?
- (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.
- (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.
- (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id?is the one we're looking for. If so, it saves the response inself.response?and breaks the consuming loop.
- (23) Next, we define our main?call?method - it does the actual RPC request.
- (24) In this method, first we generate a unique?correlation_id?number and save it - the 'on_response' callback function will use this value to catch the appropriate response.
- (25) Next, we publish the request message, with two properties:?reply_to?and?correlation_id.
- (32) At this point we can sit back and wait until the proper response arrives.
- (33) And finally we return the response back to the user.
開始rpc_server.py:
[python]?view plain?copy
通過client來請求fibonacci數(shù):
?
?
[python]?view plain?copy
????? 現(xiàn)在這個設計并不是唯一的,但是這個實現(xiàn)有以下優(yōu)勢:
?
?
?
- 如何RPC server太慢,你可以擴展它:啟動另外一個RPC server。
- 在client端, 無所進行加鎖能同步操作,他所作的就是發(fā)送請求等待響應。
????? 我們的code還是挺簡單的,并沒有嘗試去解決更復雜和重要的問題,比如:
- 如果沒有server在運行,client需要怎么做?
- RPC應該設置超時機制嗎?
- 如果server運行出錯并且拋出了異常,需要將這個問題轉發(fā)到client嗎?
- 需要邊界檢查嗎?
?
RabbitMQ消息隊列的小伙伴: ProtoBuf(Google Protocol Buffer)
?
?
? ? ?什么是ProtoBuf?
???? 一種輕便高效的結構化數(shù)據(jù)存儲格式,可以用于結構化數(shù)據(jù)串行化,或者說序列化。它很適合做數(shù)據(jù)存儲或 RPC 數(shù)據(jù)交換格式。可用于通訊協(xié)議、數(shù)據(jù)存儲等領域的語言無關、平臺無關、可擴展的序列化結構數(shù)據(jù)格式。目前提供了 C++、Java、Python?三種語言的 API。
???? 它可以作為RabbitMQ的Message的數(shù)據(jù)格式進行傳輸,由于是結構化的數(shù)據(jù),這樣就極大的方便了Consumer的數(shù)據(jù)高效處理。當然了你可能說使用XML不也可以嗎?與XML相比,ProtoBuf有以下優(yōu)勢:
?????? 當然了,的確還有很多類似的技術,比如JSON,Thrift等等,和他們相比,ProtoBuf的優(yōu)勢或者劣勢在哪里?簡單說來,ProtoBuf就是簡單,快。以測試為證:項目 thrift-protobuf-compare 比較了這些類似的技術,下圖 顯示了該項目的一項測試結果。
在占用空間上的性能比較:
由此可見,ProtoBuf具有速度和空間的優(yōu)勢,使得它現(xiàn)在應用非常廣泛。比如Hadoop就使用了它。
更多信息,請閱 http://www.ibm.com/developerworks/cn/Linux/l-cn-gpb/。
?
Publisher的消息確認機制
?
? ? ? ?在前面的文章中提到了queue和consumer之間的消息確認機制:通過設置ack。那么Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個Consumer處理呢?畢竟對于一些非常重要的數(shù)據(jù),可能Publisher需要確認某個消息已經(jīng)被正確處理。
????? 在我們的系統(tǒng)中,我們沒有是實現(xiàn)這種確認,也就是說,不管Message是否被Consume了,Publisher不會去care。他只是將自己的狀態(tài)publish給上層,由上層的邏輯去處理。如果Message沒有被正確處理,可能會導致某些狀態(tài)丟失。但是由于提供了其他強制刷新全部狀態(tài)的機制,因此這種異常情況的影響也就可以忽略不計了。
???? 對于某些異步操作,比如客戶端需要創(chuàng)建一個FileSystem,這個可能需要比較長的時間,甚至要數(shù)秒鐘。這時候通過RPC可以解決這個問題。因此也就不存在Publisher端的確認機制了。
???? 那么,有沒有一種機制能保證Publisher能夠感知它的Message有沒有被處理的?答案肯定的。在這里感謝笑天居士同學:他在我的《RabbitMQ消息隊列(三):任務分發(fā)機制》文后留言一起討論了問題,而且也查找了一些資料。在這里我整理了一下他轉載和一篇文章和原創(chuàng)的一篇文章。銜接已經(jīng)附后。
事務機制 VS Publisher Confirm
?
? ? ? ?如果采用標準的 AMQP 協(xié)議,則唯一能夠保證消息不會丟失的方式是利用事務機制 -- 令 channel 處于 transactional 模式、向其 publish 消息、執(zhí)行 commit 動作。在這種方式下,事務機制會帶來大量的多余開銷,并會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。
???? 為了使能 confirm 機制,client 首先要發(fā)送 confirm.select 方法幀。取決于是否設置了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就將處于 confirm 模式。處于 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。
??? 一旦 channel 處于 confirm 模式,broker 和 client 都將啟動消息計數(shù)(以?confirm.select?為基礎從 1 開始計數(shù))。broker 會在處理完消息后,在當前 channel 上通過發(fā)送?basic.ack?的方式對其進行 confirm 。delivery-tag 域的值標識了被 confirm 消息的序列號。broker 也可以通過設置?basic.ack?中的?multiple 域來表明到指定序列號為止的所有消息都已被 broker 正確的處理了。
?????? 在異常情況中,broker 將無法成功處理相應的消息,此時 broker 將發(fā)送?basic.nack?來代替?basic.ack?。在這個情形下,basic.nack?中各域值的含義與?basic.ack 中相應各域含義是相同的,同時?requeue 域的值應該被忽略。通過?nack 一或多條消息,broker 表明自身無法對相應消息完成處理,并拒絕為這些消息的處理負責。在這種情況下,client 可以選擇將消息 re-publish 。
??????在 channel 被設置成 confirm 模式之后,所有被 publish 的后續(xù)消息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對消息被 confirm 的快慢做任何保證,并且同一條消息不會既被 confirm 又被 nack 。
?
消息在什么時候確認
?
broker 將在下面的情況中對消息進行 confirm :
- broker 發(fā)現(xiàn)當前消息無法被路由到指定的 queues 中(如果設置了?mandatory 屬性,則 broker 會先發(fā)送?basic.return)
- 非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中)
- 持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),并被持久化到了磁盤(被 fsync)
- 持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)
?
broker 會丟失持久化消息,如果 broker 在將上述消息寫入磁盤前異常。在一定條件下,這種情況會導致 broker 以一種奇怪的方式運行。例如,考慮下述情景:
?? 1.? 一個 client 將持久消息 publish 到持久 queue 中
?? 2.? 另一個 client 從 queue 中 consume 消息(注意:該消息具有持久屬性,并且 queue 是持久化的),當尚未對其進行 ack
?? 3.? broker 異常重啟
?? 4.? client 重連并開始 consume 消息
?? 在上述情景下,client 有理由認為消息需要被(broker)重新 deliver 。但這并非事實:重啟(有可能)會令 broker 丟失消息。為了確保持久性,client 應該使用 confirm 機制。如果 publisher 使用的 channel 被設置為 confirm 模式,publisher 將不會收到已丟失消息的 ack(這是因為 consumer 沒有對消息進行 ack ,同時該消息也未被寫入磁盤)。
?
編程實現(xiàn)
?
?
首先要區(qū)別AMQP協(xié)議mandatory和immediate標志位的作用。
mandatory和immediate是AMQP協(xié)議中basic.pulish方法中的兩個標志位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者的功能。具體區(qū)別在于:
1. mandatory標志位
當mandatory標志位設置為true時,如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者;當mandatory設為false時,出現(xiàn)上述情形broker會直接將消息扔掉。
2. immediate標志位
當immediate標志位設置為true時,如果exchange在將消息route到queue(s)時發(fā)現(xiàn)對應的queue上沒有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯(lián)的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者。
具體的代碼參考請參考參考資料1.
?
參考資料:
1. http://blog.csdn.NET/jiao_fuyou/article/details/21594205
2. http://blog.csdn.net/jiao_fuyou/article/details/21594947
3.??http://my.oschina.Net/moooofly/blog/142095
?
?
?
?
總結
以上是生活随笔為你收集整理的RabbitMQ从入门到精通的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python3 爬虫实战:mitmpro
- 下一篇: gdb 调试命令的使用及总结