RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发
內容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環分發、消息確認、持久化、公平分發
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調隊列callback queue、關聯標識correlation id、實現簡單的RPC系統
RabbitMQ(七):常用方法說明 與 學習小結
Work Queues:
在上一篇博客中,我們實現了從一個指定的隊列中發送和接收消息。在這一部分,我們將會創建一個工作隊列:用來將耗時的任務分發給多個工作者。
工作隊列的主要思想是避免這樣的情況:直接去做一件資源密集型的任務,并且還得等它完成。相反,我們將任務安排到之后再去做。我們將任務封裝為一個消息,并發到隊列中。一個工作進程將會在后臺取出任務并最終完成工作。如果開啟多個工作進程,任務將會在這多個工作進程間共享。
這個概念在web應用中是非常有用的,因為web應用不可能在一個HTTP請求中去處理一個復雜的任務。
準備:
在上一篇博客中,我們發送了“hello world”的消息。現在,我們會發送一些代表復雜任務的字符串。我們沒有真實的任務(比如調整圖片大小、PDF文件加載等),所以我們使用?Thread.sleep()?方法來偽造耗時任務。我們用字符串中的點號.來表示任務的復雜性,一個點就表示需要耗時1秒,比如一個描述為hello...的假任務,它需要耗時3秒。
將上個教程中的Send.java中的代碼稍作修改。因為這個程序會調度任務到工作隊列,所以我們將它命名為NewTask.java:
String message = "1.";channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");之前的Recv.java同樣也要做些修改,它需要模擬消息中的點代表的耗時。因為它負責接收消息并處理任務,所以,將它命名為Worker.java:
final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}} }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);我們的假任務的執行:
private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);} }循環分發:
使用任務隊列的一個優勢在于容易并行處理。如果積壓了大量的工作,我們只需要添加更多的工作者(上文中的Worker.java中的概念),這樣很容易擴展。
首先,我們來嘗試同時運行兩個工作者實例(Worker.java)。它們都會從隊列中獲取消息,但具體是如何獲取的呢?
啟動NewTask,之后,可以依次將message修改為"2.."、"3..."、"4...."、"5....."等,每修改一次就運行一次。
觀察console中兩個工作者的接收消息情況:
可以看出,默認情況下,RabbitMQ是輪流發送消息給下一個消費者,平均每個消費者接收到的消息數量是相等的。這種分發消息的方式叫做循環分發。你可以試一下開3個或更多工作者的情況。
消息確認:
完成一項任務可能會耗費幾秒鐘,你可能會問,假如其中一個消費者開始了一個非常耗時的任務,并在執行這個任務的時候崩潰了(也就是沒有完成這個任務),將會發生什么事情。按照上面的代碼,一旦RabbitMQ向消費者發出消息,消息就會立即從內存中移除。在這種情況下,如果你殺死一個工作者,我們將會失去它正在處理的消息,同時也會丟失所有發給這個工作者但這個工作者還未處理的消息。
但我們不想丟掉任務,如果一個工作者死掉,我們想將這個任務發給其他的工作者。
為了確保消息永遠不會丟失,RabbitMQ支持消息確認。消費者將會發送一個確認信息來告訴RabbitMQ,我已經接收到了消息,并且處理完了,你可以隨便刪它了。
如果一個消費者在發送確認信息前死去(連接或通道關閉、TCP連接丟失等),RabbitMQ將會認為該消息沒有被完全處理并會重新將消息加入隊列。如果此時有其他的消費者,RabbitMQ很快就會重新發送該消息到其他的消費者。通過這種方式,你完全可以保證沒有消息丟失,即使某個消費者意外死亡。
對RabbitMQ而言,沒有消息超時這一說。如果消費者死去,RabbitMQ將會重新發送消息。即使處理一個消息需要耗時很久很久也沒有關系。
消息確認機制是默認打開的。只是在前面的代碼中,我們顯示地關掉了:boolean autoAck=true。將代碼做如下修改:
channel.basicQos(1); // accept only one unack-ed message at a time (see below)final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}} }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);注意到最上面的那句代碼:
channel.basicQos(int prefetchCount);其中的參數prefetchCount表示:maximum number of messages that the server will deliver。
這樣,就可以確保即使消費者掛了,消息也不會丟失。
消息持久化:
通過上面的教程,我們知道如何確保消費者掛掉也不會丟失消息。但是,加入RabbitMQ服務器掛掉了怎么辦?
如果關閉RabbitMQ服務或者RabbitMQ服務崩潰了,RabbitMQ就會丟掉所有的隊列和消息:除非你告訴它不要這樣。要確保RabbitMQ服務關閉或崩潰后消息不會丟失,要做兩件事情:持久化隊列、持久化消息。
首先,我們要確保RabbitMQ永遠不會丟失我們的隊列。怎么做呢?在聲明隊列的時候,指定durable參數為true。
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);盡管上面的代碼沒有錯,但是它不會按所想的那樣將隊列持久化:因為之前我們已經將hello這個隊列設置了不持久化,RabbitMQ不允許重新定義已經存在的隊列,否則就會報錯。但是,我們有一個快速的解決辦法:聲明另外一個隊列就行了,只要不叫hello,比如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);現在,我們已經確保隊列不會丟失了,那么如何將消息持久化呢:將MessageProperties的值設置為PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());將消息標記為持久化并不能完全保證消息不會丟失。盡管它告訴RabbitMQ將消息保存到磁盤中,但是在RabbitMQ接收到消息和保存消息之間會與一個很短的時間窗。同時,RabbitMQ不會為每個消息做fsync(2)處理,消息可能僅僅保存到緩存中而不會真正地寫入到磁盤中。這種持久化保證盡管不夠健壯,但已經遠遠足夠我們的簡單任務隊列。如果你需要更強大的保證,可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。
公平分發:
你可能已經發現,循環消息分發并不是我們想要的。比如,有兩個工作者,當奇數消息(如上文中的"1..."、"3..."、"5..."、"7...")很耗時,而偶數消息(如上文中的"2."、"4."、"6."、"8.")很簡單的時候,其中一個工作者就會一直很忙而另一個工作者就會閑。然而RabbitMQ對這些一概不知,它只是在輪流平均地發消息。
這種情況的發生是因為,RabbitMQ 只是當消息進入隊列時就分發出去,而沒有查看每個工作者未返回確認信息的數量。
為了改變這種情況,我們可以使用basicQos方法,并將參數prefetchCount設為1。這樣做,工作者就會告訴RabbitMQ:不要同時發送多個消息給我,每次只發1個,當我處理完這個消息并給你確認信息后,你再發給我下一個消息。這時候,RabbitMQ就不會輪流平均發送消息了,而是尋找閑著的工作者。
int prefetchCount = 1; channel.basicQos(prefetchCount);注意,如果所有的工作者都很忙,你的隊列可能會裝滿,你必須留意這種情況:或者添加更多的工作者,或者采取其他策略。
完整代碼:
NewTask.java:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv)throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish( "", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} //... }Worker.java:
import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}} }?
說明
①與原文略有出入,如有疑問,請參考原文。
②原文是直接用javacp命令運行代碼,用IDE更方便。
博客轉自:https://www.jianshu.com/p/37c23ed0a5f1
?
總結
以上是生活随笔為你收集整理的RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(一):Hello Wo
- 下一篇: RabbitMQ(三):Exchange