RabbitMQ 安装与简单使用
?
在企業(yè)應(yīng)用系統(tǒng)領(lǐng)域,會(huì)面對(duì)不同系統(tǒng)之間的通信、集成與整合,尤其當(dāng)面臨異構(gòu)系統(tǒng)時(shí),這種分布式的調(diào)用與通信變得越發(fā)重要。其次,系統(tǒng)中一般會(huì)有很多對(duì)實(shí)時(shí)性要求不高的但是執(zhí)行起來(lái)比較較耗時(shí)的地方,比如發(fā)送短信,郵件提醒,更新文章閱讀計(jì)數(shù),記錄用戶操作日志等等,如果實(shí)時(shí)處理的話,在用戶訪問(wèn)量比較大的情況下,對(duì)系統(tǒng)壓力比較大。
面對(duì)這些問(wèn)題,我們一般會(huì)將這些請(qǐng)求,放在消息隊(duì)列中處理;異構(gòu)系統(tǒng)之間使用消息進(jìn)行通訊。消息傳遞相較文件傳遞與遠(yuǎn)程過(guò)程調(diào)用(RPC)而言,似乎更勝一籌,因?yàn)樗哂懈玫钠脚_(tái)無(wú)關(guān)性,并能夠很好地支持并發(fā)與異步調(diào)用。所以如果系統(tǒng)中出現(xiàn)了如下情況:
- 對(duì)操作的實(shí)時(shí)性要求不高,而需要執(zhí)行的任務(wù)極為耗時(shí);
- 存在異構(gòu)系統(tǒng)間的整合;
一般的可以考慮引入消息隊(duì)列。對(duì)于第一種情況,常常會(huì)選擇消息隊(duì)列來(lái)處理執(zhí)行時(shí)間較長(zhǎng)的任務(wù)。引入的消息隊(duì)列就成了消息處理的緩沖區(qū)。消息隊(duì)列引入的異步通信機(jī)制,使得發(fā)送方和接收方都不用等待對(duì)方返回成功消息,就可以繼續(xù)執(zhí)行下面的代碼,從而提高了數(shù)據(jù)處理的能力。尤其是當(dāng)訪問(wèn)量和數(shù)據(jù)流量較大的情況下,就可以結(jié)合消息隊(duì)列與后臺(tái)任務(wù),通過(guò)避開(kāi)高峰期對(duì)大數(shù)據(jù)進(jìn)行處理,就可以有效降低數(shù)據(jù)庫(kù)處理數(shù)據(jù)的負(fù)荷。
在前面的一篇講解CQRS模式的文章中,所有的對(duì)系統(tǒng)的狀態(tài)的更改都是通過(guò)事件來(lái)完成,一般的將事件存儲(chǔ)到消息隊(duì)列中,然后進(jìn)行統(tǒng)一的處理。
本文簡(jiǎn)單介紹在RabbitMQ這一消息代理工具,以及在.NET中如何使用RabbitMQ.
一 環(huán)境搭建
首先,由于RabbitMQ使用Erlang編寫的,需要運(yùn)行在Erlang運(yùn)行時(shí)環(huán)境上,所以在安裝RabbitMQ Server之前需要安裝Erlang 運(yùn)行時(shí)環(huán)境,可以到Erlang官網(wǎng)下載對(duì)應(yīng)平臺(tái)的安裝文件。如果沒(méi)有安裝運(yùn)行時(shí)環(huán)境,安裝RabbitMQ Server的時(shí)候,會(huì)提示需要先安裝Erlang環(huán)境。 安裝完成之后,確保已經(jīng)將Erlang的安裝路徑注冊(cè)到系統(tǒng)的環(huán)境變量中。安裝完Erlang之后,這個(gè)環(huán)境會(huì)自動(dòng)設(shè)置,如果沒(méi)有,在administrator環(huán)境下在控制臺(tái)下面輸入,也可以設(shè)置:
Setx ERLANG_HOME “D:\Program Files (x86)\erl6.3″然后,去RabbitMQ官網(wǎng)下載RabbitMQ Server服務(wù)端程序,選擇合適的平臺(tái)版本下載。安裝完成之后,就可以開(kāi)始使用了。
現(xiàn)在就可以對(duì)RabbitMQ Server進(jìn)行配置了。
首先,切換到RabbitMQ Server的安裝目錄:
在sbin下面有很多batch文件,用來(lái)控制RabbitMQ Server,當(dāng)然您也可以直接在安裝開(kāi)始菜單中來(lái)執(zhí)行相應(yīng)的操作:
最簡(jiǎn)單的方式是使RabbitMQ以Windows Service的方式在后臺(tái)運(yùn)行,所以我們需要以管理員權(quán)限打開(kāi)cmd,然后切換到sbin目錄下,執(zhí)行這三條命令即可:
rabbitmq-service install rabbitmq-service enable rabbitmq-service start現(xiàn)在RabbitMQ的服務(wù)端已經(jīng)啟動(dòng)起來(lái)了。
下面可以使用sbin目錄下面的rabbitmqctl.bat這個(gè)腳本來(lái)查看和控制服務(wù)端狀態(tài)的,在cmd中直接運(yùn)行rabbitmqctl status。如果看到以下結(jié)果:
顯示node沒(méi)有連接上,需要到C:\Windows目錄下,將.erlang.cookie文件,拷貝到用戶目錄下 C:\Users\{用戶名},這是Erlang的Cookie文件,允許與Erlang進(jìn)行交互,現(xiàn)在重復(fù)運(yùn)行剛才的命令就會(huì)得到如下信息:
RabbitMQ Server上面也有用戶概念,安裝好之后,使用rabbitmqctl list_users命令,可以看到上面目前的用戶:
可以看到,現(xiàn)在只有一個(gè)角色為administrator的名為guest的用戶,這個(gè)是RabbitMQ默認(rèn)為我們創(chuàng)建的,他有RabbitMQ的所有權(quán)限,一般的,我們需要新建一個(gè)我們自己的用戶,設(shè)置密碼,并授予權(quán)限,并將其設(shè)置為管理員,可以使用下面的命令來(lái)執(zhí)行這一操作:
rabbitmqctl add_user yy hello! rabbitmqctl set_permissions yy ".*" ".*" ".*" rabbitmqctl set_user_tags yy administrator上面的一條命令添加了一個(gè)名為yy的用戶,并設(shè)置了密碼hello!,下面的命令為用戶yy分別授予對(duì)所有消息隊(duì)列的配置、讀和寫的權(quán)限。
現(xiàn)在我們可以將默認(rèn)的guest用戶刪掉,使用下面的命令即可:
rabbitmqctl delete_user guest如果要修改密碼,可以使用下面的命令:
rabbitmqctl change_password {username} {newpassowrd}二 開(kāi)始使用
在.NET中使用RabbitMQ需要下載RabbitMQ的客戶端程序集,可以到官網(wǎng)下載,下載解壓后就可以得到RabbitMQ.Client.dll,這就是RabbitMQ的客戶端。
在使用RabitMQ之前,需要對(duì)下面的幾個(gè)基本概念說(shuō)明一下:
RabbitMQ是一個(gè)消息代理。他從消息生產(chǎn)者(producers)那里接收消息,然后把消息送給消息消費(fèi)者(consumer)在發(fā)送和接受之間,他能夠根據(jù)設(shè)置的規(guī)則進(jìn)行路由,緩存和持久化。
一般提到RabbitMQ和消息,都用到一些專有名詞。
- 生產(chǎn)(Producing)意思就是發(fā)送。發(fā)送消息的程序就是一個(gè)生產(chǎn)者(producer)。我們一般用"P"來(lái)表示:
- 隊(duì)列(queue)就是郵箱的名稱。消息通過(guò)你的應(yīng)用程序和RabbitMQ進(jìn)行傳輸,它們只能存儲(chǔ)在隊(duì)列(queue)中。 隊(duì)列(queue)容量沒(méi)有限制,你要存儲(chǔ)多少消息都可以——基本上是一個(gè)無(wú)限的緩沖區(qū)。多個(gè)生產(chǎn)者(producers)能夠把消息發(fā)送給同一個(gè)隊(duì)列,同樣,多個(gè)消費(fèi)者(consumers)也能從同一個(gè)隊(duì)列(queue)中獲取數(shù)據(jù)。隊(duì)列可以畫成這樣(圖上是隊(duì)列的名稱):
- 消費(fèi)(Consuming)和獲取消息是一樣的意思。一個(gè)消費(fèi)者(consumer)就是一個(gè)等待獲取消息的程序。我們把它畫作"C":
通常,消息生產(chǎn)者,消息消費(fèi)者和消息代理不在同一臺(tái)機(jī)器上。
2.1 Hello World
為了展示RabbitMQ的基本使用,我們發(fā)送一個(gè)HelloWorld消息,然后接收并處理。
首先創(chuàng)建一個(gè)控制臺(tái)程序,用來(lái)將消息發(fā)送到RabbitMQ的消息隊(duì)列中,代碼如下:
static void Main(string[] args){var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);string message = "Hello World";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "hello", null, body);Console.WriteLine(" set {0}", message);}}}首先,需要?jiǎng)?chuàng)建一個(gè)ConnectionFactory,設(shè)置目標(biāo),由于是在本機(jī),所以設(shè)置為localhost,如果RabbitMQ不在本機(jī),只需要設(shè)置目標(biāo)機(jī)器的IP地址或者機(jī)器名稱即可,然后設(shè)置前面創(chuàng)建的用戶名yy和密碼hello!。
緊接著要?jiǎng)?chuàng)建一個(gè)Channel,如果要發(fā)送消息,需要?jiǎng)?chuàng)建一個(gè)隊(duì)列,然后將消息發(fā)布到這個(gè)隊(duì)列中。在創(chuàng)建隊(duì)列的時(shí)候,只有RabbitMQ上該隊(duì)列不存在,才會(huì)去創(chuàng)建。消息是以二進(jìn)制數(shù)組的形式傳輸?shù)?#xff0c;所以如果消息是實(shí)體對(duì)象的話,需要序列化和然后轉(zhuǎn)化為二進(jìn)制數(shù)組。
現(xiàn)在客戶端發(fā)送代碼已經(jīng)寫好了,運(yùn)行之后,消息會(huì)發(fā)布到RabbitMQ的消息隊(duì)列中,現(xiàn)在需要編寫服務(wù)端的代碼連接到RabbitMQ上去獲取這些消息。
同樣,創(chuàng)建一個(gè)名為Receive的服務(wù)端控制臺(tái)應(yīng)用程序,服務(wù)端代碼如下:
static void Main(string[] args) {var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume("hello", true, consumer);Console.WriteLine(" waiting for message.");while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine("Received {0}", message);}}} }和發(fā)送一樣,首先需要定義連接,然后聲明消息隊(duì)列。要接收消息,需要定義一個(gè)Consume,然后從消息隊(duì)列中不斷Dequeue消息,然后處理。
現(xiàn)在發(fā)送端和接收端的代碼都寫好了,運(yùn)行發(fā)送端,發(fā)送消息:
現(xiàn)在,名為hello的消息隊(duì)列中,發(fā)送了一條消息。這條消息存儲(chǔ)到了RabbitMQ的服務(wù)器上了。使用rabbitmqctl 的list_queues可以查看所有的消息隊(duì)列,以及里面的消息個(gè)數(shù),可以看到,目前Rabbitmq上只有一個(gè)消息隊(duì)列,里面只有一條消息:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues Listing queues ... hello 1現(xiàn)在運(yùn)行接收端程序,如下:
可以看到,已經(jīng)接受到了客戶端發(fā)送的Hello World,現(xiàn)在再來(lái)看RabitMQ上的消息隊(duì)列信息:
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues Listing queues ... hello 0可以看到,hello這個(gè)隊(duì)列中的消息隊(duì)列個(gè)數(shù)為0,這表示,當(dāng)接收端,接收到消息之后,RabbitMQ上就把這個(gè)消息刪掉了。
2.2 工作隊(duì)列
前面的例子展示了如何往一個(gè)指定的消息隊(duì)列中發(fā)送和收取消息。現(xiàn)在我們創(chuàng)建一個(gè)工作隊(duì)列(work queue)來(lái)將一些耗時(shí)的任務(wù)分發(fā)給多個(gè)工作者(workers):
工作隊(duì)列(work queues, 又稱任務(wù)隊(duì)列Task Queues)的主要思想是為了避免立即執(zhí)行并等待一些占用大量資源、時(shí)間的操作完成。而是把任務(wù)(Task)當(dāng)作消息發(fā)送到隊(duì)列中,稍后處理。一個(gè)運(yùn)行在后臺(tái)的工作者(worker)進(jìn)程就會(huì)取出任務(wù)然后處理。當(dāng)運(yùn)行多個(gè)工作者(workers)時(shí),任務(wù)會(huì)在它們之間共享。
這個(gè)在網(wǎng)絡(luò)應(yīng)用中非常有用,它可以在短暫的HTTP請(qǐng)求中處理一些復(fù)雜的任務(wù)。在一些實(shí)時(shí)性要求不太高的地方,我們可以處理完主要操作之后,以消息的方式來(lái)處理其他的不緊要的操作,比如寫日志等等。
準(zhǔn)備
在第一部分,發(fā)送了一個(gè)包含“Hello World!”的字符串消息。現(xiàn)在發(fā)送一些字符串,把這些字符串當(dāng)作復(fù)雜的任務(wù)。這里使用time.sleep()函數(shù)來(lái)模擬耗時(shí)的任務(wù)。在字符串中加上點(diǎn)號(hào)(.)來(lái)表示任務(wù)的復(fù)雜程度,一個(gè)點(diǎn)(.)將會(huì)耗時(shí)1秒鐘。比如"Hello..."就會(huì)耗時(shí)3秒鐘。
對(duì)之前示例的send.cs做些簡(jiǎn)單的調(diào)整,以便可以發(fā)送隨意的消息。這個(gè)程序會(huì)按照計(jì)劃發(fā)送任務(wù)到我們的工作隊(duì)列中。
static void Main(string[] args) {var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);string message = GetMessage(args); var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "hello", properties, body);Console.WriteLine(" set {0}", message);}}Console.ReadKey(); }private static string GetMessage(string[] args) {return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }加粗部分是經(jīng)過(guò)修改過(guò)了的。
接著我們修改接收端,讓他根據(jù)消息中的逗點(diǎn)的個(gè)數(shù)來(lái)Sleep對(duì)應(yīng)的秒數(shù):
static void Main(string[] args) {var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume("hello", true, consumer);while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body); int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);Console.WriteLine("Done");}}} }輪詢分發(fā)
使用工作隊(duì)列的一個(gè)好處就是它能夠并行的處理隊(duì)列。如果堆積了很多任務(wù),我們只需要添加更多的工作者(workers)就可以了,擴(kuò)展很簡(jiǎn)單。
現(xiàn)在,我們先啟動(dòng)兩個(gè)接收端,等待接受消息,然后啟動(dòng)一個(gè)發(fā)送端開(kāi)始發(fā)送消息。
?
在cmd條件下,發(fā)送了5條消息,每條消息后面的逗點(diǎn)表示該消息需要執(zhí)行的時(shí)長(zhǎng),來(lái)模擬耗時(shí)的操作。
然后可以看到,兩個(gè)接收端依次接收到了發(fā)出的消息:
?
默認(rèn),RabbitMQ會(huì)將每個(gè)消息按照順序依次分發(fā)給下一個(gè)消費(fèi)者。所以每個(gè)消費(fèi)者接收到的消息個(gè)數(shù)大致是平均的。 這種消息分發(fā)的方式稱之為輪詢(round-robin)。
2.3 消息響應(yīng)
當(dāng)處理一個(gè)比較耗時(shí)得任務(wù)的時(shí)候,也許想知道消費(fèi)者(consumers)是否運(yùn)行到一半就掛掉。在當(dāng)前的代碼中,當(dāng)RabbitMQ將消息發(fā)送給消費(fèi)者(consumers)之后,馬上就會(huì)將該消息從隊(duì)列中移除。此時(shí),如果把處理這個(gè)消息的工作者(worker)停掉,正在處理的這條消息就會(huì)丟失。同時(shí),所有發(fā)送到這個(gè)工作者的還沒(méi)有處理的消息都會(huì)丟失。
我們不想丟失任何任務(wù)消息。如果一個(gè)工作者(worker)掛掉了,我們希望該消息會(huì)重新發(fā)送給其他的工作者(worker)。
為了防止消息丟失,RabbitMQ提供了消息響應(yīng)(acknowledgments)機(jī)制。消費(fèi)者會(huì)通過(guò)一個(gè)ack(響應(yīng)),告訴RabbitMQ已經(jīng)收到并處理了某條消息,然后RabbitMQ才會(huì)釋放并刪除這條消息。
如果消費(fèi)者(consumer)掛掉了,沒(méi)有發(fā)送響應(yīng),RabbitMQ就會(huì)認(rèn)為消息沒(méi)有被完全處理,然后重新發(fā)送給其他消費(fèi)者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會(huì)丟失消息。
消息是沒(méi)有超時(shí)這個(gè)概念的;當(dāng)工作者與它斷開(kāi)連的時(shí)候,RabbitMQ會(huì)重新發(fā)送消息。這樣在處理一個(gè)耗時(shí)非常長(zhǎng)的消息任務(wù)的時(shí)候就不會(huì)出問(wèn)題了。
消息響應(yīng)默認(rèn)是開(kāi)啟的。在之前的例子中使用了no_ack=True標(biāo)識(shí)把它關(guān)閉。是時(shí)候移除這個(gè)標(biāo)識(shí)了,當(dāng)工作者(worker)完成了任務(wù),就發(fā)送一個(gè)響應(yīng)。
channel.BasicConsume("hello", false, consumer);while (true) {var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);Console.WriteLine("Done");channel.BasicAck(ea.DeliveryTag, false); }現(xiàn)在,可以保證,即使正在處理消息的工作者被停掉,這些消息也不會(huì)丟失,所有沒(méi)有被應(yīng)答的消息會(huì)被重新發(fā)送給其他工作者.
一個(gè)很常見(jiàn)的錯(cuò)誤就是忘掉了BasicAck這個(gè)方法,這個(gè)錯(cuò)誤很常見(jiàn),但是后果很嚴(yán)重. 當(dāng)客戶端退出時(shí),待處理的消息就會(huì)被重新分發(fā),但是RabitMQ會(huì)消耗越來(lái)越多的內(nèi)存,因?yàn)檫@些沒(méi)有被應(yīng)答的消息不能夠被釋放。調(diào)試這種case,可以使用rabbitmqct打印messages_unacknoledged字段。
rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.2.4 消息持久化
前面已經(jīng)搞定了即使消費(fèi)者down掉,任務(wù)也不會(huì)丟失,但是,如果RabbitMQ Server停掉了,那么這些消息還是會(huì)丟失。
當(dāng)RabbitMQ Server 關(guān)閉或者崩潰,那么里面存儲(chǔ)的隊(duì)列和消息默認(rèn)是不會(huì)保存下來(lái)的。如果要讓RabbitMQ保存住消息,需要在兩個(gè)地方同時(shí)設(shè)置:需要保證隊(duì)列和消息都是持久化的。
首先,要保證RabbitMQ不會(huì)丟失隊(duì)列,所以要做如下設(shè)置:
bool durable = true; channel.QueueDeclare("hello", durable, false, false, null);雖然在語(yǔ)法上是正確的,但是在目前階段是不正確的,因?yàn)槲覀冎耙呀?jīng)定義了一個(gè)非持久化的hello隊(duì)列。RabbitMQ不允許我們使用不同的參數(shù)重新定義一個(gè)已經(jīng)存在的同名隊(duì)列,如果這樣做就會(huì)報(bào)錯(cuò)。現(xiàn)在,定義另外一個(gè)不同名稱的隊(duì)列:
bool durable = true; channel.queueDeclare("task_queue", durable, false, false, null);queueDeclare 這個(gè)改動(dòng)需要在發(fā)送端和接收端同時(shí)設(shè)置。
現(xiàn)在保證了task_queue這個(gè)消息隊(duì)列即使在RabbitMQ Server重啟之后,隊(duì)列也不會(huì)丟失。 然后需要保證消息也是持久化的, 這可以通過(guò)設(shè)置IBasicProperties.SetPersistent 為true來(lái)實(shí)現(xiàn):
var properties = channel.CreateBasicProperties(); properties.SetPersistent(true);需要注意的是,將消息設(shè)置為持久化并不能完全保證消息不丟失。雖然他告訴RabbitMQ將消息保存到磁盤上,但是在RabbitMQ接收到消息和將其保存到磁盤上這之間仍然有一個(gè)小的時(shí)間窗口。 RabbitMQ 可能只是將消息保存到了緩存中,并沒(méi)有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對(duì)于一個(gè)簡(jiǎn)單任務(wù)隊(duì)列來(lái)說(shuō)已經(jīng)足夠。如果需要消息隊(duì)列持久化的強(qiáng)保證,可以使用publisher confirms
2.5 公平分發(fā)
你可能會(huì)注意到,消息的分發(fā)可能并沒(méi)有如我們想要的那樣公平分配。比如,對(duì)于兩個(gè)工作者。當(dāng)奇數(shù)個(gè)消息的任務(wù)比較重,但是偶數(shù)個(gè)消息任務(wù)比較輕時(shí),奇數(shù)個(gè)工作者始終處理忙碌狀態(tài),而偶數(shù)個(gè)工作者始終處理空閑狀態(tài)。但是RabbitMQ并不知道這些,他仍然會(huì)平均依次的分發(fā)消息。
為了改變這一狀態(tài),我們可以使用basicQos方法,設(shè)置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時(shí)間給一個(gè)工作者發(fā)送多于1個(gè)的消息,或者換句話說(shuō)。在一個(gè)工作者還在處理消息,并且沒(méi)有響應(yīng)消息之前,不要給他分發(fā)新的消息。相反,將這條新的消息發(fā)送給下一個(gè)不那么忙碌的工作者。
channel.BasicQos(0, 1, false);2.6 完整實(shí)例
現(xiàn)在將所有這些放在一起:
發(fā)送端代碼如下:
static void Main(string[] args) {var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){bool durable = true;channel.QueueDeclare("task_queue", durable, false, false, null);string message = GetMessage(args);var properties = channel.CreateBasicProperties();properties.SetPersistent(true);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "task_queue", properties, body);Console.WriteLine(" set {0}", message);}}Console.ReadKey(); }private static string GetMessage(string[] args) {return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }接收端代碼如下:
static void Main(string[] args) {var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){bool durable = true;channel.QueueDeclare("task_queue", durable, false, false, null);channel.BasicQos(0, 1, false);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume("task_queue", false, consumer);while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);Console.WriteLine("Done");channel.BasicAck(ea.DeliveryTag, false);}}} }三 管理界面
RabbitMQ還有一個(gè)管理界面,通過(guò)該界面可以查看RabbitMQ Server 當(dāng)前的狀態(tài),該界面是以插件形式提供的,并且在安裝RabbitMQ的時(shí)候已經(jīng)自帶了該插件。需要做的是在RabbitMQ控制臺(tái)界面中啟用該插件,命令如下:
rabbitmq-plugins enable rabbitmq_management現(xiàn)在,在瀏覽器中輸入?http://server-name:15672/?server-name換成機(jī)器地址或者域名(http://localhost:15672/),如果是本地的,直接用localhost(RabbitMQ 3.0之前版本端口號(hào)為55672)在輸入之后,彈出登錄界面,使用我們之前創(chuàng)建的用戶登錄。
?.
在該界面上可以看到當(dāng)前RabbitMQServer的所有狀態(tài)。
四 總結(jié)
本文簡(jiǎn)單介紹了消息隊(duì)列的相關(guān)概念,并介紹了RabbitMQ消息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。消息隊(duì)列在構(gòu)建分布式系統(tǒng)和提高系統(tǒng)的可擴(kuò)展性和響應(yīng)性方面有著很重要的作用,希望本文對(duì)您了解消息隊(duì)列以及如何使用RabbitMQ有所幫助。
轉(zhuǎn)載于:https://www.cnblogs.com/hf-0712/p/5566024.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 安装与简单使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 状态码具体解释
- 下一篇: Mono Compatibility