EasyNetQ笔记-Publish/Subscribe模式(发布订阅)
介紹
asyNetQ支持的最簡單的消息模式是發布/訂閱.這個模式是一個極好的方法用來解耦消息提供者和消費者。消息發布者只要簡單的對世界說,“這里有些事發生” 或者 “我現在有一個信息”。它不關心有沒有人監聽,或者接收者是誰,或者接收者在那里。我們能夠添加和移除特定類型的消息的訂閱者,不需發布者做任何的重新配置。我們也能夠有多個發布者發布相同的消息,添加和刪除發布者也不用其他的發布者或者訂閱者做任何重新配置。
假如你開始去發布消息,而沒有任何訂閱者曾經定義此消息,那么這個消息就簡單的消失了。
一個EasyNetQ訂閱者訂閱一種消息類型(消息類為.NET 類型)。通過調用Subcribe方法一旦對一個類型設置了訂閱,一個持久化隊列就會在RabbitMQ broker上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都將會將消息從隊列中發送給訂閱者。
消息發布(Publish)
EasyNetQ支持最簡單的消息模式是發布和訂閱。發布消息后,任意消費者可以訂閱該消息,也可以多個消費者訂閱。并且不需要額外配置。首先,如上文中需要先創建一個IBus對象,然后,在創建一個可序列化的.NET對象。調用Publish方法即可。
var _bus = RabbitHutch.CreateBus("host=xxxxxx;port=5672;virtualHost=my_vhost;username=admin;password=admin;timeout=30;publisherConfirms=true");//連接字符串末尾不要加";"
var message = new MyMessage { Text = "Hello Rabbit" };
for (int i = 0; i < 10; i++)
{
_bus.Publish<MyMessage>(message);
}
警告,Publish只顧發送消息到隊列,但是不管有沒有消費端訂閱,所以,發布之后,如果沒有消費者,該消息將不會被消費甚至丟失。
消息訂閱(Subscribe)
EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會創建一個用于接收消息的隊列,不過與消息發布不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:
_bus.Subscribe<MyMessage>("subscribe_id", myMessage => {
lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
});
第一個參數是訂閱id,另外一個是delegate參數,用于處理接收到的消息。
這里要注意的是,subscribe_id參數很重要,假如開發者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓的方式給每個訂閱的隊列發送消息。接收到之后,其他隊列就接收不到該消息。
如果用不同的subscribeid訂閱同一種消息類型,那么生成的每一個隊列都會收到該消息。
舉個例子:出庫發貨,我們有五個商品倉庫,每個倉庫的商品都是一樣的,假如來了一堆訂單,那么我們需要五個倉庫共同工作,分別處理訂單。而同樣,總倉庫需要知道總出貨量,正常情況下,可以用每個倉庫的出貨量相加即可。不過如果我們在總倉庫也監聽商品訂單消息,那么,每次來訂單,總倉庫也都會收到一份,那么可以作相應的統計了。
//不同的subscribe_id將會創建兩個隊列,消息同時發給兩個隊列
private void btn_subscribe11_Click(object sender, EventArgs e)
{
_bus.Subscribe<MyMessage>("subscribe_id_1", myMessage => {
lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
});
}
private void btnSubscribe22_Click(object sender, EventArgs e)
{
_bus.Subscribe<MyMessage>("subscribe_id_2", myMessage => {
lbxMessage2.Invoke(new Action(() => { lbxMessage2.Items.Add(myMessage.Text); }));
});
}
需要注意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,所以,遇到占用長時間的處理方法,最好用異步處理。代碼如下:
bus.SubscribeAsync<MyMessage>("subscribe_async_test", message =>
new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
.ContinueWith(task =>
Console.WriteLine("Received: '{0}', Downloaded: '{1}'",
message.Text,
task.Result)));
非泛型發布訂閱
如果想在項目運行期間發布訂閱消息,EasyNetQ提供了非泛型的發布訂閱
加using
using EasyNetQ.NonGeneric;
提供如下非泛型的發布訂閱方法:
//訂閱
public static IDisposable Subscribe(
this IBus bus,
Type messageType,
string subscriptionId,
Action<object> onMessage,
Action<ISubscriptionConfiguration> configure)
public static IDisposable SubscribeAsync(
this IBus bus,
Type messageType,
string subscriptionId,
Func<object, Task> onMessage,
Action<ISubscriptionConfiguration> configure)
//發布
public static void Publish(
this IBus bus,
Type messageType,
object message,
string topic)
public static Task PublishAsync(
this IBus bus,
Type messageType,
object message,
string topic)
取消訂閱,可以用如下方法:
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
subscriptionResult.Dispose();
//或者直接IBus.Dispose();
總結
以上是生活随笔為你收集整理的EasyNetQ笔记-Publish/Subscribe模式(发布订阅)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ARM Trusted Firmware
- 下一篇: C# 脚本代码自动登录淘宝获取用户信息