目錄
- 1、工作隊列的概念
- 2、輪循分發(round-robin)
- 3、公平分發(fair dispatch)
- 4、消息應答機制
- 4.1 為什么需要消息應答機制
- 4.2 消息應答機制的作用
- 5、消息持久化
1、工作隊列的概念
簡單隊列不足 : 不支持多個消費者;
工作隊列即一個生產者可以對應多個消費者同時消費;
相比簡單隊列支持多消費者; 因為實際工作中,生產者服務一般都是很簡單的業務邏輯處理之后就發送到隊列,消費者接收到隊列的消息之后,進行復雜的業務邏輯處理,所以一般都是多個消費者進行處理.如是是一個消費者進行處理,那么隊列會積壓很多消息.
2、輪循分發(round-robin)
在默認情況下, RabbitMQ將逐個發送消息到在序列中的下一個消費者(而不考慮每個任務處理的時長等等,且是提前一次性分配,并非一個一個的分配) . 平均每個消費者獲取相同數量的消息. 這種分發消息機制稱為 輪詢分發
當消息進入隊列 ,RabbitMQ就會分發消息 .它不看消費者的應答的數目 ,也不關心消費者處理消息的能力,只是盲目的將第n條消息發給第n個消費者
/*** @author zhaod* @description* @date 2018/9/27 11:11*/
public class Producer {private static final Logger log = LoggerFactory.getLogger(Producer.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 獲取連接Connection connection = MqConnectionUtil.getConnection();// 創建信道Channel channel = connection.createChannel();// 申明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 1; i < 11; i++) {String msg = "I am " + i + " old";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("P---->" + msg);Thread.sleep(500);}channel.close();connection.close();}}
/*** @author zhaodi* @description 工作隊列* @date 2018/9/27 11:30*/
public class Consumer01 {private static final Logger log = LoggerFactory.getLogger(Consumer01.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消費者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{1}接收:" +new String(body );try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);}};// 監聽channel.basicConsume(QUEUE_NAME,true,consumer);}
}
/*** @author zhaodi* @description 工作隊列* @date 2018/9/27 11:30*/
public class Consumer02 {private static final Logger log = LoggerFactory.getLogger(Consumer02.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消費者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{2}接收:" +new String(body);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);}};// 監聽channel.basicConsume(QUEUE_NAME,true,consumer);}
}
3、公平分發(fair dispatch)
根據消費者處理性能,性能好的消費的數據量多,性能差的消費的數據量少 .這種分發消息機制稱為 公平分發
如何實現公平分發
/*** @author zhaod* @description 公平分發* @date 2018/9/27 11:11*/
public class Producer03 {private static final Logger log = LoggerFactory.getLogger(Producer03.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {// 獲取連接Connection connection = MqConnectionUtil.getConnection();// 創建信道Channel channel = connection.createChannel();// 申明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 每個消費者在發送確認消息之前,消息隊列不發送下一個消息給該消費者,保證每次只處理一條消息int prefetch = 1;channel.basicQos(prefetch);for (int i = 1; i < 20; i++) {String msg = "I am " + i + " old";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("P---->" + msg);Thread.sleep(500);}channel.close();connection.close();}
}
/*** @author zhaodi* @description 工作隊列公平分發* @date 2018/9/27 11:30*/
public class Consumer03 {private static final Logger log = LoggerFactory.getLogger(Consumer03.class);private static final String QUEUE_NAME = "my-work-queue";public static void main(String[] args) throws IOException {// 連接Connection connection = MqConnectionUtil.getConnection();// 信道Channel channel = connection.createChannel();// 隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//int prefetch = 1;channel.basicQos(prefetch);// 消費者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = "C-->{3}接收:" +new String(body );try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg);// 處理完消息之后,發送回執,告訴隊列,你給我的消息我處理完了,你可以發送下一條消息給我了// envelope.getDeliveryTag(),標出是哪條消息channel.basicAck(envelope.getDeliveryTag(),false);}};// 自動應答關閉boolean autoAck = false;// 監聽channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}
代碼同上面
4、消息應答機制
4.1 為什么需要消息應答機制
完成一個任務需要花費幾秒鐘,但是如果某個消費者開始執行某個任務花費了很長的時間并且在執行到某個部分的時候崩潰了怎么辦。
在我們目前的代碼中,在向消費者推送了某一條消息后,RabiitMQ會立即刪除這條消息的。
如果我們kill掉某個worker的話,那么我們將會丟失該worker正在處理的消息,我們也會丟失掉所有被發送到這個消費者且未被處理完成的消息。
4.2 消息應答機制的作用
為了保證消息永遠不會被丟失,RabbitMQ采用消息應答機制。
當消費者接收到消息并完成任務后會往RabbitMQ服務器發送一條確認的命令,然后RabbitMQ才會將消息刪除。
如果某個消費者在還有發送確認信息就掛了,RabbitMQ將會視為服務沒有執行完成,然后把執行消息的服務再發給另外一個消費者。這種方式下,即時某個worker掛了,也不會使得消息丟失。
這里不是用超時來判斷的,只有在某個消費者連接斷開時,RabbitMQ才會把重新發送該消費者沒有返回確認的消息到其它消費者那。即時處理某條任務花費了很長的時間,在這里也是沒有問題的。
消息應答機制默認是開啟的,也就是說當消費者接收到消息的時候,不管是否開始處理接收到的消息,它已經向RabbitMQ發送確認消息,這時候RabbitMQ服務器就會刪除該條消息。
// 自動應答關閉
boolean autoAck = false;
// 監聽
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
很多人都會忘記調用basicAck方法,雖然這是一個很簡單的錯誤,但往往卻是致命。消費者退出后消息將會被重發,但是由于一些未能被確認消息不能被釋放,RabbitMQ將會消耗掉越來越多的內存
channel.basicAck(envelope.getDeliveryTag(),false);
5、消息持久化
如果RabbitMQ的服務器宕機那么怎么保證消息不丟失呢?
當MQ重啟,那么之前的隊列消息是會丟失的;
解決:將隊列和消息都持久化存儲
注意點:隊列持久化的時候,生產者和消費者都要申明
// 申明隊列,并且指明該隊列是持久化的
Boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
...// 發送消息,并且持久化消息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
消息的持久化不能百分百的保證消息不會丟失,雖然RabbitMQ會把消息寫到磁盤上,但是從RabbitMQ接收到消息到寫到磁盤上,這個短時間的過程中發生的RabbitMQ重啟依然會使得為寫入到磁盤的消息被丟失;
事實上是這樣的,RabbitMQ接收到消息后,首先會把該消息寫到內存緩沖區中,并不是直接把單條消息實時寫到磁盤上的。消息的持久化不是健壯的,但是對于簡單的任務隊列是夠用了。如果你需要一套很健壯的持久化方案,那么你可以使用publisher confirms
轉載于:https://www.cnblogs.com/zhaod/p/11389487.html
總結
以上是生活随笔為你收集整理的2 工作队列的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。