第二百九十二节,RabbitMQ多设备消息队列-Python开发
RabbitMQ多設(shè)備消息隊(duì)列-Python開(kāi)發(fā)
首先安裝Python開(kāi)發(fā)連接RabbitMQ的API,pika模塊
pika模塊為第三方模塊
?
?對(duì)于RabbitMQ來(lái)說(shuō),生產(chǎn)和消費(fèi)不再針對(duì)內(nèi)存里的一個(gè)Queue對(duì)象,而是某臺(tái)服務(wù)器上的RabbitMQ Server實(shí)現(xiàn)的消息隊(duì)列。
?
?
生產(chǎn)者消費(fèi)者一對(duì)一
不使用交換機(jī)
?
?
生產(chǎn)者主機(jī)
pika.PlainCredentials()設(shè)置RabbitMQ Server用戶名和密碼
ConnectionParameters()設(shè)置ip和端口
BlockingConnection()創(chuàng)建連接,自帶了socket邏輯部分
channel()獲取連接句柄
queue_declare(queue='隊(duì)列名稱')向RabbitMQ Server創(chuàng)建一個(gè)消息隊(duì)列,如果此隊(duì)列存在則不創(chuàng)建
basic_publish(exchange='交換機(jī)狀態(tài)',routing_key='隊(duì)列名稱',body='數(shù)據(jù)類容')向指定隊(duì)列里寫(xiě)入數(shù)據(jù)
close()關(guān)閉連接
生產(chǎn)者執(zhí)行了7次,可以看到hello隊(duì)列里有7條數(shù)據(jù)
?
消費(fèi)者主機(jī)
callback(ch, method, properties, (body接收隊(duì)列里的數(shù)據(jù)))自定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù)
basic_consume(回調(diào)函數(shù)名稱,queue='隊(duì)列名稱',no_ack=True)在指定隊(duì)列里獲取數(shù)據(jù),no_ack=True設(shè)置獲取到數(shù)據(jù)后,是否刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù)
start_consuming()等待獲取隊(duì)列數(shù)據(jù)
執(zhí)行后循環(huán)獲取了7次,將hello隊(duì)列里的7條數(shù)據(jù)拿了出來(lái)
可以看到hello隊(duì)列里已經(jīng)沒(méi)有了數(shù)據(jù)
?
?
?
?
保證數(shù)據(jù)不丟失介紹
保證消費(fèi)者數(shù)據(jù)不丟失
當(dāng)消費(fèi)者主機(jī)從隊(duì)列里拿出數(shù)據(jù)時(shí)basic_consume()方法參數(shù)no_ack=True,表示拿出數(shù)據(jù)后立即刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù),
如果消費(fèi)者主機(jī)拿出數(shù)據(jù),隊(duì)列也刪除了對(duì)應(yīng)的數(shù)據(jù),還沒(méi)來(lái)得及處理數(shù)據(jù),消費(fèi)者主機(jī)死機(jī)了,這樣數(shù)據(jù)就丟了
解決方法:
當(dāng)消費(fèi)者主機(jī)從隊(duì)列里拿出數(shù)據(jù)時(shí)basic_consume()方法參數(shù)no_ack=False,表示拿出數(shù)據(jù)后不刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù)
在消費(fèi)者回調(diào)函數(shù)里處理數(shù)據(jù),當(dāng)數(shù)據(jù)處理完成后寫(xiě)上ch.basic_ack(delivery_tag = method.delivery_tag),表示執(zhí)行這串代碼后才刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù)
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 消費(fèi)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.queue_declare(queue='helloa',durable=True) #向RabbitMQ Server創(chuàng)建一個(gè)消息隊(duì)列,queue='hello'設(shè)置隊(duì)列名稱,如果此隊(duì)列存在則不創(chuàng)建def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print(" 你好 %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag) #執(zhí)行這串代碼后才刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue='helloa', #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=False) #設(shè)置獲取到數(shù)據(jù)后,是否刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù),如果回調(diào)函數(shù)處理數(shù)據(jù)異常時(shí)會(huì)都丟失數(shù)據(jù)#如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)?
保證生產(chǎn)者數(shù)據(jù)不丟失
如果RabbitMQ Server主機(jī)隊(duì)列里有很多數(shù)據(jù),此時(shí)RabbitMQ Server主機(jī)死機(jī)了,那么隊(duì)列里的數(shù)據(jù)也就丟了
解決方法:
當(dāng)生產(chǎn)者向RabbitMQ Server主機(jī)隊(duì)列投遞數(shù)據(jù)時(shí),數(shù)據(jù)同時(shí)也在RabbitMQ Server主機(jī)硬盤(pán)保存一份,那么即使死機(jī)重啟后數(shù)據(jù)也存在
basic_publish(properties=pika.BasicProperties(delivery_mode=2))投遞模式默認(rèn)為1,修改成2表示投遞的數(shù)據(jù)在RabbitMQ Server主機(jī)硬盤(pán)上保存一份,當(dāng)消費(fèi)者操作后刪除隊(duì)列數(shù)據(jù)時(shí),也跟隨刪除
queue_declare(durable=True)表示隊(duì)列里的數(shù)據(jù)開(kāi)啟硬盤(pán)保存,注意:如果生產(chǎn)者設(shè)置了那么消費(fèi)者也要設(shè)置
?
?
?
消息獲取順序
默認(rèn)消息隊(duì)列里的數(shù)據(jù)是按照順序被消費(fèi)者拿走,例如:消費(fèi)者1 去隊(duì)列中獲取 奇數(shù) 序列的任務(wù),消費(fèi)者2去隊(duì)列中獲取 偶數(shù) 序列的任務(wù)。
誰(shuí)來(lái)誰(shuí)取,不再按照奇偶數(shù)排列
在消費(fèi)者獲取隊(duì)列數(shù)據(jù)方法basic_consume()之前寫(xiě)一個(gè)channel.basic_qos(prefetch_count=1)表示誰(shuí)來(lái)誰(shuí)取,不再按照奇偶數(shù)排列
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 消費(fèi)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.queue_declare(queue='helloa',durable=True) #向RabbitMQ Server創(chuàng)建一個(gè)消息隊(duì)列,queue='hello'設(shè)置隊(duì)列名稱,如果此隊(duì)列存在則不創(chuàng)建def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print(" 你好 %r" % body)ch.basic_ack(delivery_tag=method.delivery_tag) #執(zhí)行這串代碼后才刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù) channel.basic_qos(prefetch_count=1) #表示誰(shuí)來(lái)誰(shuí)取,不再按照奇偶數(shù)排列 #在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue='helloa', #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=False) #設(shè)置獲取到數(shù)據(jù)后,是否刪除隊(duì)列里對(duì)應(yīng)的數(shù)據(jù),如果回調(diào)函數(shù)處理數(shù)據(jù)異常時(shí)會(huì)都丟失數(shù)據(jù)#如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)?
?
?
exchange交換機(jī)工作模型(fanout發(fā)布訂閱,direct關(guān)鍵字發(fā)送,topic模糊匹配)
fanout交換機(jī),發(fā)布訂閱模式
發(fā)布者發(fā)布數(shù)據(jù)到交換機(jī),交換機(jī)將數(shù)據(jù)分發(fā)到所有訂閱者創(chuàng)建的隊(duì)列里,每個(gè)訂閱者主機(jī)都獲取一份數(shù)據(jù)
列隊(duì)只能由訂閱者創(chuàng)建,訂閱者創(chuàng)建的列隊(duì)只要綁定了交換機(jī)都會(huì)獲取到,交換機(jī)分發(fā)的數(shù)據(jù)
發(fā)布訂閱和簡(jiǎn)單的消息隊(duì)列區(qū)別在于,發(fā)布訂閱會(huì)將消息發(fā)送給所有的訂閱者,而消息隊(duì)列中的數(shù)據(jù)被消費(fèi)一次便消失。所以,RabbitMQ實(shí)現(xiàn)發(fā)布和訂閱時(shí),會(huì)為每一個(gè)訂閱者創(chuàng)建一個(gè)隊(duì)列,而發(fā)布者發(fā)布消息時(shí),交換機(jī)會(huì)將消息放置在所有相關(guān)隊(duì)列中。
exchange type = fanout
?發(fā)布者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊 import sys# ######################### 發(fā)布者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='fanout'交互機(jī)工作模式fanout為訂閱模式type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!" #設(shè)置向交換機(jī)發(fā)布的數(shù)據(jù)內(nèi)容#向交換機(jī)發(fā)布內(nèi)容 channel.basic_publish(exchange='logs', #設(shè)置要發(fā)布的交換機(jī)名稱routing_key='', #隊(duì)列名稱為空,因?yàn)閿?shù)據(jù)是發(fā)布到交換機(jī)而不是隊(duì)列,所以為空body=message) #設(shè)置要發(fā)布的數(shù)據(jù)內(nèi)容 print("數(shù)據(jù)內(nèi)容以發(fā)布到交換機(jī)")connection.close() #關(guān)閉連接訂閱者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 訂閱者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='fanout'交互機(jī)工作模式fanout為訂閱模式type='fanout')result = channel.queue_declare(exclusive=True) #創(chuàng)建專一訂閱消息隊(duì)列, queue_name = result.method.queue #隨機(jī)生成隊(duì)列名稱#將訂閱隊(duì)列綁定交換機(jī) channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name) #queue=要綁定交換機(jī)的隊(duì)列名稱def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print("%r" % body)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue=queue_name, #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=True) #如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 訂閱者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='fanout'交互機(jī)工作模式fanout為訂閱模式type='fanout')result = channel.queue_declare(exclusive=True) #創(chuàng)建專一訂閱消息隊(duì)列, queue_name = result.method.queue #隨機(jī)生成隊(duì)列名稱#將訂閱隊(duì)列綁定交換機(jī) channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name) #queue=要綁定交換機(jī)的隊(duì)列名稱def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print("%r" % body)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue=queue_name, #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=True) #如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)
?
?
?
direct交換機(jī),關(guān)鍵字發(fā)送模式
exchange type = direct
也叫做完全匹配模式
RabbitMQ還支持根據(jù)關(guān)鍵字發(fā)送,即:隊(duì)列綁定關(guān)鍵字,生產(chǎn)者設(shè)置關(guān)鍵字將數(shù)據(jù)發(fā)送到exchange交換機(jī),exchange交換機(jī)根據(jù) 消費(fèi)者列隊(duì)設(shè)置的關(guān)鍵字 判定應(yīng)該將數(shù)據(jù)發(fā)送至指定隊(duì)列。
也就是說(shuō)消費(fèi)者列隊(duì)設(shè)置的關(guān)鍵字,和生產(chǎn)者發(fā)送數(shù)據(jù)設(shè)置的關(guān)鍵字,兩者只要是一樣關(guān)鍵字的,消費(fèi)者主機(jī)都會(huì)獲取到一份數(shù)據(jù)
【重點(diǎn)】消費(fèi)者可以設(shè)置多個(gè)關(guān)鍵字
?
生產(chǎn)者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊 import sys# ######################### 生產(chǎn)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='direct'交互機(jī)工作模式direct為關(guān)鍵字發(fā)送模式type='direct')message = ' '.join(sys.argv[1:]) or "info: Hello World!" #設(shè)置向交換機(jī)發(fā)布的數(shù)據(jù)內(nèi)容#向交換機(jī)發(fā)布內(nèi)容 channel.basic_publish(exchange='logs', #設(shè)置要發(fā)布的交換機(jī)名稱routing_key='severity', #設(shè)置信道關(guān)鍵字body=message) #設(shè)置要發(fā)布的數(shù)據(jù)內(nèi)容 print("數(shù)據(jù)內(nèi)容以發(fā)布到交換機(jī)")connection.close() #關(guān)閉連接消費(fèi)者,設(shè)置一個(gè)信道關(guān)鍵字
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 消費(fèi)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='direct'交互機(jī)工作模式direct為關(guān)鍵字發(fā)送模式type='direct')result = channel.queue_declare(exclusive=True) #創(chuàng)建專一消息隊(duì)列, queue_name = result.method.queue #隨機(jī)生成隊(duì)列名稱#將隊(duì)列綁定交換機(jī) channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name, #queue=要綁定交換機(jī)的隊(duì)列名稱routing_key='severity',) #設(shè)置信道關(guān)鍵字def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print("%r" % body)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue=queue_name, #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=True) #如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)消費(fèi)者,設(shè)置多個(gè)信道關(guān)鍵字
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 消費(fèi)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='direct'交互機(jī)工作模式direct為關(guān)鍵字發(fā)送模式type='direct')result = channel.queue_declare(exclusive=True) #創(chuàng)建專一消息隊(duì)列, queue_name = result.method.queue #隨機(jī)生成隊(duì)列名稱#將隊(duì)列綁定交換機(jī) channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name, #queue=要綁定交換機(jī)的隊(duì)列名稱routing_key='severity',) #設(shè)置信道關(guān)鍵字 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name, #queue=要綁定交換機(jī)的隊(duì)列名稱routing_key='severity2',) #設(shè)置信道關(guān)鍵字 channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name, #queue=要綁定交換機(jī)的隊(duì)列名稱routing_key='severity3',) #設(shè)置信道關(guān)鍵字def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print("%r" % body)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue=queue_name, #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=True) #如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)?
?
?
?topic交換機(jī),模糊匹配發(fā)送模式
exchange type = topic
在topic類型下,可以讓隊(duì)列綁定幾個(gè)模糊的關(guān)鍵字,之后發(fā)送者將數(shù)據(jù)發(fā)送到exchange交換機(jī),exchange交換機(jī)將傳入的”列隊(duì)關(guān)鍵字“和 ”生產(chǎn)者關(guān)鍵字“進(jìn)行匹配,匹配成功,則將數(shù)據(jù)發(fā)送到指定隊(duì)列。
- # 表示可以匹配 0 個(gè) 或 多個(gè) 單詞
- * ?表示只能匹配 一個(gè) 單詞
例如:
發(fā)送者關(guān)鍵字 生產(chǎn)者匹配 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配?
生產(chǎn)者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊 import sys# ######################### 生產(chǎn)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='topic'交互機(jī)工作模式topic為模糊匹配發(fā)送模式type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #設(shè)置發(fā)送關(guān)鍵字 message = ' '.join(sys.argv[2:]) or 'Hello World!' #設(shè)置發(fā)送內(nèi)容#向交換機(jī)發(fā)送內(nèi)容 channel.basic_publish(exchange='logs', #設(shè)置要發(fā)布的交換機(jī)名稱routing_key=routing_key, #設(shè)置信道關(guān)鍵字:anonymous.infobody=message) #設(shè)置要發(fā)布的數(shù)據(jù)內(nèi)容:Hello World!print(" [x] Sent %r:%r" % (routing_key, message)) #打印出相關(guān)內(nèi)容 connection.close() #關(guān)閉連接消費(fèi)者
#!/usr/bin/env python # -*- coding:utf8 -*-import pika #導(dǎo)入連接操作RabbitMQ Server主機(jī)的模塊# ######################### 消費(fèi)者 ######################### credentials = pika.PlainCredentials('guest', 'guest') #設(shè)置RabbitMQ Server用戶名和密碼 parameters = pika.ConnectionParameters('localhost', 5672,'/',credentials) #設(shè)置ip和端口 connection = pika.BlockingConnection(parameters) #創(chuàng)建連接,自帶了socket邏輯部分 channel = connection.channel() #獲取連接句柄 channel.exchange_declare(exchange='logs', #創(chuàng)建交換機(jī),exchange='交換機(jī)名稱',type='topic'交互機(jī)工作模式topic為模糊匹配發(fā)送模式type='topic')result = channel.queue_declare(exclusive=True) #創(chuàng)建專一消息隊(duì)列, queue_name = result.method.queue #隨機(jī)生成隊(duì)列名稱#將隊(duì)列綁定交換機(jī) channel.queue_bind(exchange='logs', #exchange='要綁定的交換機(jī)名稱'queue=queue_name, #queue=要綁定交換機(jī)的隊(duì)列名稱routing_key='anonymous.*',) #設(shè)置信道關(guān)鍵字匹配def callback(ch, method, properties, body): #定義獲取隊(duì)列數(shù)據(jù)后的回調(diào)函數(shù),body接收隊(duì)列里的數(shù)據(jù)內(nèi)容print("%r" % body)#在指定隊(duì)列里獲取數(shù)據(jù) channel.basic_consume(callback, #獲取到數(shù)據(jù)后執(zhí)行回調(diào)函數(shù)queue=queue_name, #指定獲取數(shù)據(jù)的隊(duì)列名稱no_ack=True) #如果要保證數(shù)據(jù)必須不丟失設(shè)置為False不刪除數(shù)據(jù),當(dāng)接執(zhí)行回調(diào)函數(shù)里ch.basic_ack(delivery_tag=method.delivery_tag)時(shí)才刪除,但是效率不高 channel.start_consuming() #等待獲取隊(duì)列數(shù)據(jù)?
更多教程,可以查看官方教程
http://www.rabbitmq.com/getstarted.html
?
轉(zhuǎn)載于:https://www.cnblogs.com/adc8868/p/7064389.html
總結(jié)
以上是生活随笔為你收集整理的第二百九十二节,RabbitMQ多设备消息队列-Python开发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 安装elasticsearch5.4.1
- 下一篇: Pycharm 创建 Django ad