【Python之路Day12】网络篇之Python操作RabbitMQ
基礎知識
分布式系統中,我們廣泛運用消息中間件進行系統間的數據交換,便于異步解耦。消息中間件這塊在我們前面的學習中,是使用python中的queue模塊來提供,但這個模塊僅限于在本機的內存中使用,假設這個隊列需要其他服務器的程序也訪問的話,就需要利用socket了。不過,現成的方案很多,輪子已經有了,我們沒有必要反復造輪子。直接拿來用就可以了。
消息中間件解決方案
流行的消息隊列解決方案很多:
ZeroMQ,號稱最快的消息隊列,由于支持的模式特別多: TCP、IPC、inproc、Multicas,基本已經打到替代Socket的地步了。站點地址:http://zeromq.org/
Kafka,是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬于Apache定級項目。 一個消息發布訂閱系統,現在常用于日志團隊使用的工具,如程序將操作日志批量異步的發送到Kafka集群中,而不是保存在本地或者DB中。Kafka可以提供批量提交消息/壓縮等,對Producer而言,幾乎感覺不到性能的開銷。Consumer可以使用Hadoop等其他系統化的存儲和數據分析等。站點:http://kafka.apache.org/
RocketMQ, 阿里開源的一款高性能、高吞吐量的消息中間件, 純Java開發。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。站點:?https://github.com/alibaba/RocketMQ
RabbitMQ,?RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。
等等....
使用消息中間件的理由?
使用消息中間件的10個理由,請參照oschina的這篇博文:?http://www.oschina.net/translate/top-10-uses-for-message-queue
RabbitMQ
一. 歷史
? ?RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標準。
? ?RabbitMQ是由RabbitMQ Technologies Ltd開發并且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被并入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在并沒有上市。
官方站點:https://www.rabbitmq.com/
RabbitMQ中文:?https://rabbitmq-into-chinese.readthedocs.io/zh_CN/latest/
二. 應用架構
?
這個系統架構圖版權屬于sunjun041640。
RabbitMQ Server: 是一種傳入服務。 它的角色是維護一條從生產者(Producer) 到 消費者(Consumer)的路線,從而保證數據能夠按照指定的方式進行傳入。但是也并不是100%的保證,但杜宇普通的應用來說,應該是足夠的。當然對于要求可靠性、完整性絕對的場景,可以再走一層數據一致性的guard, 就可以保證了。
Client A 和 Client B:生產者(Producer), 數據的生產者, 發送方。一個消息(Message)有兩個部分:有效載荷(payload) 和 標簽(label).?
- payload: 傳入的數據
- lable: exchange的名字或者說是一個tag, payload的描述信息,而且RabbitMQ是通過這個lable來決定把這個消息(Message)發給那個消費者(Consumer).AMQP僅僅描述了lable, 而RabbitMQ決定了如何使用這個lable的規則。
Client1, client2, client3: 消費者(Consumer), 接受消息的應用程序。當有消息(Message)到達某個和Consumer關聯的某個隊列后,RabbitMQ會把它發送Consumer。當然也可能會發送給多個Consumer。
一個數據從Producer到Consumer,還需要明白三個概念: exchanges, queue 和 bindings
queue:用于消息存儲的緩沖
Connection: 一個TCP連接
Channels:虛擬連接。它建立在TCP連接中,數據流動都是channel中進行的。一般情況是起始建立TCP連接,第二部就是建立這個Channel。
三. 安裝RabbitMQ
Ubuntu/Debian
apt-get install -y rabbitmq-serverRHEL6:
#安裝配置epel源# rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm#安裝erlang# yum -y install erlang#安裝RabbitMQ# yum -y install rabbitmq-server服務啟動/停止
#停止服務 #service rabbitmq-server stop#啟動服務 #service rabbitmq-server start#重啟服務 #service rabbitmq-server restart安裝Python API(Python3)
pip3 install pika#源碼安裝: https://pypi.python.org/pypi/pika#過程略...四. 基本操作
基于queue(Python2.7中模塊是Queue)實現的生產者消費者模型
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)import queue import threadingmessage = queue.Queue(10)def producer(i):'''生產者:param i::return:'''while True:message.put(i)def consumer(i):'''消費者:param i::return:'''while True:msg = message.get()for i in range(12):t = threading.Thread(target=producer, args=(i,))t.start()for j in range(10):t = threading.Thread(target=consumer, args=(j,))t.start() View Code?
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)import pika #導入模塊########### 生產者 ################# connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') #端口如果是默認的話,不用謝) #建立TCP連接 channel = connection.channel() #虛擬連接,建立在上面的TCP連接基礎上 #為什么使用Channel,而不用TCP連接? #因為對于OS來說,建立和關閉TCP連接是有代價的,尤其是頻繁的建立和關閉. 而且TCP的連接數默認在系統內核中也有限制, 這也限制了系統處理高并發的能力. #但是,如果在TCP連接中建立 Channel是沒有代價的,對于Procuder或者Consumer來講,可以并發的使用多個channel來進行publish或者receive. channel.queue_declare(queue='hello') #Consumer和Procuder都可以使用 queue_declar創建queue.對某個channel來說,Consumer不能declare一個queue,卻可以訂閱queue,當然也可以創建私有的queue. #這樣就只有APP本身才能使用這個queue. queue也可以自動刪除,被標記auto-delete的queue在最后一個Consumer unsubscribe后會被自動刪除. #如果創建一個已經存在的queue是不會有任何影響的, 就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改. # queue 對 load balance的處理是完美的,對于多個Consumer來說, RabbitMQ使用循環的方式輪訓(Round-robin)來均衡發送給不同的Consumer channel.basic_publish(exchange='', #消息是不能直接發送到隊列的,它需要發送到交換機(exchange),下面會談這個,此處使用一個空字符串來標識.routing_key='hello', #必須指定為隊列的名稱body='How are you' #消息主體 )print('Send ok') connection.close() #關閉連接 #!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.queue_declare('hello') #使用queue_declare創建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會被創建。def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue='hello',no_ack=True)print('Wating for messages. To exit press CTRL+C')channel.start_consuming()1. 消息確認
為了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ就會釋放并刪除這條消息。如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者(consumer)。這樣,及時工作者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。
消息響應:默認是開啟的 no_ack, 在上面的例子中,我們把標識給置為True(關閉了)。開啟后,完成一個任務后,會發送一個響應。
def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,queue='hello')PS: 這一來一往的是提高了消息的安全性,但是,開銷是難免的,和默認的不加確認消息相比,這種方案性能肯定會有下降,但是這不就是一種妥協么?就看應用場景了.
切記, 不要忘記確認(basic_ack)!消息會在你退出程序之后就重新發送,如果它不能夠釋放沒響應的信息,RabbitMQ就會占用越來越多的內存!
2. 消息持久化
有個參數durable, 默認情況下,如果沒有顯式告訴RabbitMQ這條消息需要持久化,那么它(rabbitmq)在自己退出或者崩潰的時候,將會丟失所有隊列和消息。
為了確保不丟失,需要注意:
必須把隊列和消息設置為持久化。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)import pika #導入模塊 connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') #端口如果是默認的話,不用謝) #建立TCP連接 channel = connection.channel() #虛擬連接,建立在上面的TCP連接基礎上#隊列持久化 channel.queue_declare(queue='hello',durable=True)channel.basic_publish(exchange='', #消息是不能直接發送到隊列的,它需要發送到交換機(exchange),下面會談這個,此處使用一個空字符串來標識.routing_key='hello', #必須指定為隊列的名稱body='How are you' , #消息主體properties=pika.BasicProperties(delivery_mode=2) #持久化 )print(" [x] Sent 'Hello World!'") connection.close() #關閉連接 生產者 #!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.queue_declare('hello') #使用queue_declare創建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會被創建。def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)ch.basic_ack(delivery_tag=method.delivery_tag)while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue='hello',no_ack=False #no_ack 置為False )print('Wating for messages. To exit press CTRL+C')channel.start_consuming() 消費者3. 公平調度
默認消息隊列中的數據是按照順序來消費的。比如有兩個workers,處理奇數消息的特別忙,而處理偶數的比較輕松,而RabbitMQ默認的規則還是一如既往的派發消息。它默認才不管你忙不忙!
可以使用basic_qos方法,并設置 prefetch_count=1,告訴RabbitMQ,在同一時刻,不要發送超過一條消息給一個worker,直到它處理了上一條消息并且做出了響應。這樣,RabbitMQ就能把消息分給下一個空閑的Worker了。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.queue_declare('hello') #使用queue_declare創建一個隊列——我們可以運行這個命令很多次,但是只有一個隊列會被創建。def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1) #設置 prefetch_count=1,告訴RabbitMQ,在同一時刻,不要發送超過一條消息給一個worker,直到它處理了上一條消息并且做出了響應。while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue='hello',no_ack=False)print('Wating for messages. To exit press CTRL+C')channel.start_consuming() 消費者PS: 如果所有的工作者都處理很繁忙,你的隊列就可能會被填滿,需要留意這個問題,要么添加更多的Workers,要么使用其他策略!
4. 發布/訂閱
分發一個消息給多個消費者(Consumers), 這種模式,稱為"發布/訂閱"?
發布者只需要把消息發送給exchange。exchange一邊從發布者放接受消息,一邊推送到隊列。
exchange必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過交換機類型(exchange type)來定義的。
exchange的幾個類型:
direct: 直連, 通過binding key的完全匹配來傳遞消息到相應的隊列中
topic:主題交換機,exchange將傳入的”路由值“ 和”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配0個 或者多個單詞
- * 表示只能匹配一個單詞
fanout:扇形交換機, 把消息發送給和他關聯的所有的隊列
headers: 頭交換機
root@test2-ubunut:~# rabbitmqctl list_exchanges Listing exchanges ...direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic direct_logs direct logs fanout ...done.fanout,綁定(binding)
創建一個fanout類型的exchange 和隊列, 而后告訴交換機如何發送消息給我們的隊列。exchange和queue之間的關系為 綁定(binding)。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)#fanout import pika #導入模塊 connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') #端口如果是默認的話,不用謝) #建立TCP連接 channel = connection.channel() #虛擬連接,建立在上面的TCP連接基礎上 channel.exchange_declare(exchange='logs',type='fanout') #使用fanout類型 message = 'baslkdfk2sdf' channel.basic_publish(exchange='logs', #消息是不能直接發送到隊列的,它需要發送到交換機(exchange),exchange值必須為定義好的exchange值routing_key='',body=message #消息主體 )print(" [x] Sent %s"%message) connection.close() #關閉連接 發布 #!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)# fanout ################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.exchange_declare(exchange='logs',type='fanout' )result = channel.queue_declare(exclusive=True) queue_name = result.method.queuechannel.queue_bind(exchange='logs',queue=queue_name ) print('Wating for messages. To exit press CTRL+C')def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)ch.basic_ack(delivery_tag=method.delivery_tag)while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue=queue_name,no_ack=False)channel.start_consuming() 訂閱direct,多個綁定(Multiple bindings)
?
多個隊列使用相同的綁定鍵是合法的。上圖這個例子中,我們可以添加一個X和Q1之間的綁定,使用black綁定鍵。這樣一來,直連交換機就和扇型交換機的行為一樣,會將消息廣播到所有匹配的隊列。帶有black路由鍵的消息會同時發送到Q1和Q2。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)#fanout import pika #導入模塊 connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') #端口如果是默認的話,不用寫) #建立TCP連接 channel = connection.channel() #虛擬連接,建立在上面的TCP連接基礎上 channel.exchange_declare(exchange='direct_logs',type='direct') #使用direct類型 message = 'info message' facility = 'info' #如果生產者發送一個info消息的話,兩個消費者都能收到; 如果發送一個error級別的消息,只有 error級別的能收到.. channel.basic_publish(exchange='direct_logs', #消息是不能直接發送到隊列的,它需要發送到交換機(exchange),exchange值必須為定義好的exchange值routing_key=facility,body=message #消息主體 )print(" [x] Sent %s"%message) connection.close() #關閉連接 生產 #!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)# fanout ################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct' )result = channel.queue_declare(exclusive=True) queue_name = result.method.queuefacility = ['info'] #日志級別定義為info for severity in facility: #遞歸綁定到隊列 channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) print('Wating for messages. To exit press CTRL+C')def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)ch.basic_ack(delivery_tag=method.delivery_tag)while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue=queue_name,no_ack=False)channel.start_consuming() 訂閱一(info) #!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: DBQ(Du Baoqiang)# fanout ################################消費者########################################### import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.30.162') )channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct' )result = channel.queue_declare(exclusive=True) queue_name = result.method.queuefacility = ['info','warnning','error'] #定義為info, warnning, error的級別 for severity in facility: #遞歸綁定隊列 channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) print('Wating for messages. To exit press CTRL+C')def callback(ch, method, properties, body):'''回調方法,當我們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。:param ch: 虛擬通道:param method: 方法:param properties: 消息屬性:param body: 消息主體:return:'''print('Received: %r' %body)ch.basic_ack(delivery_tag=method.delivery_tag)while True:channel.basic_consume( #需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息: callback,queue=queue_name,no_ack=False)channel.start_consuming() 訂閱二(info,warnning,error)Topic
發送到主題交換機(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個由.分隔開的詞語列表。這些單詞隨便是什么都可以,但是最好是跟攜帶它們的消息有關系的詞匯。以下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數可以隨意,但是不要超過255字節。
- *?(星號) 用來表示一個單詞.
- #?(井號) 用來表示任意數量(零個或多個)單詞
?
更多,請參照官方文檔
?
參考博客:
- http://www.cnblogs.com/wupeiqi/articles/5132791.html
- http://blog.csdn.net/anzhsoft/article/details/19563091
- http://rabbitmq.mr-ping.com/
MySQL、ORM框架
移步另一篇博文>>>?
Paramiko
移步另一篇博文>>?
轉載于:https://www.cnblogs.com/dubq/p/5702663.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的【Python之路Day12】网络篇之Python操作RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eclipse新建maven项目(2)
- 下一篇: 360度旋转图片小特效