RabbitMQ 原文译03--发布和订阅
發(fā)布/訂閱
在之前的案例中我們創(chuàng)建了一個(gè)工作隊(duì)列,這個(gè)工作隊(duì)列的實(shí)現(xiàn)思想就是一個(gè)把每一個(gè)任務(wù)平均分配給每一個(gè)執(zhí)行者,在這個(gè)篇文章我們會(huì)做一些不一樣的東西,把一個(gè)消息發(fā)送給多個(gè)消費(fèi)者,這種模式就被稱作"發(fā)布/訂閱".
為了說(shuō)明這個(gè)模式,我們將要?jiǎng)?chuàng)建一個(gè)簡(jiǎn)單的日志系統(tǒng),一個(gè)負(fù)責(zé)發(fā)布消息,另外一個(gè)負(fù)責(zé)接收打印他們.
在我們的日志系統(tǒng)中,每一個(gè)運(yùn)行中的接收者副本將都會(huì)獲得消息,這種方式可以讓我們?cè)谶\(yùn)行一個(gè)接收者直接把消息保存在磁盤的同時(shí),另外一個(gè)消費(fèi)者可以把消息打印到屏幕上.
本質(zhì)上,發(fā)布一個(gè)日志消息將會(huì)廣播給所有的接收者
交換機(jī)(Exchanges)
在之前的文章中,我們接受和發(fā)送消息都是通過(guò)一個(gè)隊(duì)列來(lái)完成了,現(xiàn)在是時(shí)候引入RabbitMQ的全部工作模型了.
讓我們快速回憶一下之前涉及到的模型
--生產(chǎn)者(發(fā)布者),是一個(gè)負(fù)責(zé)發(fā)送消息的用戶應(yīng)用程序.
--隊(duì)列,負(fù)責(zé)存儲(chǔ)消息
--消費(fèi)者(接收者),負(fù)責(zé)接收消息的用戶程序.
RabbitMQ的核心思想是生產(chǎn)者永遠(yuǎn)不會(huì)直接把消息發(fā)送給隊(duì)列,事實(shí)上生產(chǎn)者甚至經(jīng)常不知道一個(gè)發(fā)出去的消息是否可以有隊(duì)列去接收它.
相應(yīng)的,生產(chǎn)者只能消息發(fā)送給交換機(jī),交換機(jī)的工作機(jī)制非常簡(jiǎn)單,一方面它從生產(chǎn)者那里接收到消息,另一方面它會(huì)把消息發(fā)送給相應(yīng)的隊(duì)列上.交換機(jī)必須要知道怎么處理接收到的消息,它應(yīng)該被放入一個(gè)特殊的隊(duì)列嗎?它是否應(yīng)該被放入多個(gè)隊(duì)列?或者它是否需要被忽略.
處理這工作的方式是通過(guò)交換機(jī)類型來(lái)實(shí)現(xiàn)的.
這里有幾個(gè)可用的交換機(jī)類型:direct,topic,headers,fanout 我們將會(huì)關(guān)注最后一個(gè)(fanout),讓我們創(chuàng)建一個(gè)fanout的交換機(jī),名字叫做'logs'
channel.ExchangeDeclare("logs", "fanout");這個(gè)fanout的交換機(jī)功能非常簡(jiǎn)單(你也許已經(jīng)從名字中猜到了他的方式),把接收到的消息廣播給所有已知的隊(duì)列,這個(gè)這是我們的日志系統(tǒng)需要的.
列出RabbitMQ已添加的交換機(jī):
cmd:rabbitmqctl list_exchanges
?
無(wú)命名的交換機(jī):在之前的案例中我們對(duì)于交換機(jī)一無(wú)所知,但是仍然可以把消息發(fā)送到隊(duì)列上,這是因?yàn)槲覀兪褂玫氖且粋€(gè)默認(rèn)的交互機(jī),名字為空(""),回顧一下我們之前發(fā)送消息的方式
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null,body: body);第一個(gè)參數(shù)就是交換機(jī)的名稱,空字符串表示默認(rèn)的無(wú)命名的交換機(jī):消息通過(guò)存在的RoutingKey被發(fā)送到隊(duì)列上.
現(xiàn)在我們發(fā)送命名的交換機(jī)代替:
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);臨時(shí)隊(duì)列
在之前的案例中,我們使用的隊(duì)列是一個(gè)指定了名字的隊(duì)列(記得hello 和task_queue 嗎),給一個(gè)隊(duì)名命名是嚴(yán)格的,我們需要執(zhí)行者連接的同樣的隊(duì)列來(lái)工作,當(dāng)你想在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列的時(shí)候指定一個(gè)隊(duì)列名是非常重要的.但是我們的日志系統(tǒng)則不在此列,
我們想要監(jiān)聽(tīng)到所有的日志消息,而不僅僅是他們的子集,我們也僅僅對(duì)當(dāng)前正在流轉(zhuǎn)的消息感興趣,而不是老的消息,結(jié)局這個(gè)問(wèn)題我們需要2件事情.
首先,無(wú)論何時(shí)我們連接到隊(duì)列,我們都需要一個(gè)新鮮的,空的隊(duì)列,為了實(shí)現(xiàn)這個(gè)目標(biāo)我們可以每次創(chuàng)建一個(gè)隨機(jī)名稱的隊(duì)列,或者更加便捷的方式--讓服務(wù)為我們的隊(duì)列隨機(jī)命名.
第二,一旦我們斷開(kāi)到消費(fèi)者到隊(duì)列的連接,我們需要自動(dòng)刪除隊(duì)列.
在.Net客戶端,我們使用無(wú)參的queueDeclare()方法來(lái)創(chuàng)建一個(gè)隨機(jī)命名的非持久的,自動(dòng)刪除的排他隊(duì)列.
var queueName = channel.QueueDeclare().QueueName;queueName就是一個(gè)隨機(jī)的隊(duì)列名,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定
我們已經(jīng)創(chuàng)建了一個(gè)fanout的交換機(jī)和一個(gè)隊(duì)列,現(xiàn)在我們需要告訴我們交換機(jī)發(fā)送消息到我們的隊(duì)列,交換機(jī)和隊(duì)列之間的關(guān)系叫做綁定.
channel.QueueBind(queue: queueName,exchange: "logs", routingKey: "");從現(xiàn)在開(kāi)始logs 交換機(jī)將會(huì)把消息放入我們的隊(duì)列當(dāng)中.
列出隊(duì)列cmd:?rabbitmqctl list_bindings
匯總
負(fù)責(zé)發(fā)送消息的生產(chǎn)者可之前案例基本上是一樣的,最大的不同是我們將消息發(fā)送到了我們的命名隊(duì)列l(wèi)ogs上而不是默認(rèn)的隊(duì)列上,發(fā)送的時(shí)候我們需要使用routingKey,但是它的值是被fanout交換機(jī)忽略的.
EmitLog.cs
class EmitLog {public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout");var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0)? string.Join(" ", args): "info: Hello World!");} }正如你看到的,我們?cè)诮⑦B接之后創(chuàng)建了一個(gè)隊(duì)列,這一步是必須的,因?yàn)榘l(fā)送到一個(gè)不存在的交換機(jī)是不被允許的。
當(dāng)隊(duì)列還沒(méi)有綁定到交換機(jī)是發(fā)送的消息將會(huì)丟失,但是這對(duì)我們?nèi)罩鞠到y(tǒng)來(lái)說(shuō)沒(méi)有問(wèn)題,當(dāng)沒(méi)有消費(fèi)者監(jiān)聽(tīng)時(shí)我們可以安全的忽略這個(gè)消息。
ReceiveLogs.cs:
class ReceiveLogs {public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout");var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);};channel.BasicConsume(queue: queueName,noAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}} }同時(shí)運(yùn)行兩個(gè)receive,可以看到兩個(gè)接收端可以同時(shí)接收到一個(gè)消息。
轉(zhuǎn)載于:https://www.cnblogs.com/grayguo/p/5356070.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 原文译03--发布和订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Atitit. 数据约束 校验 原理理论
- 下一篇: js函数引用、函数调用与回调函数