RabbitMq队列 queue
目錄
?
RabbitMq隊列
消息確認機制
負載均衡
生產者代碼
消費者1
消費者2
RabbitMq隊列
在上篇文章中講了mq的隊列,這篇用代碼實現。在例子中存在一個生產者,和兩個消費者。生產者將生產的消息傳遞給隊列(queue),由消費者一、消費者二區消費。
?
消息確認機制
在處理消息的過程中,消費者由于服務器、網絡、網卡等原因出現故障不能接受消息,那可能這條正在處理的消息或者任務就沒有完成,就會失去這個消息和任務。 rabbitmq為了確保消息或者任務不會丟失,RabbitMQ提供了消息確認機制ACK。
ACK是消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除。但是如果消費者由于網絡不穩定、服務器異常等原因在處理消息時掛掉,那么他就不會有ACK確認反饋,RabbitMQ會認為這個消息沒有正常消費,會將此消息重新放入隊列中。如果有其他消費者同時在線,RabbitMQ會立即將這個消息推送給這個在線的消費者。這種機制保證了在消費者服務器故障的時候,能不丟失任何消息和任務。?
消息的ACK確認機制默認是打開的。在上面的代碼中,我們顯示返回autoAck=true 這個標簽。
負載均衡
在正常情況下,隊列是將消息隨機分配給每一個消費者,這時候就有可能出現分配不均的問題。這時候mq不會負責調度消息,不會根據確認機制來分析哪一個消費者確認慢。這時候為了解決這個問題可以在代碼中設置 prefetchcount = 1。這個設置告訴RabbitMQ,不要一次將多個消息發送給一個消費者。這樣做的好處是只有當消費者處理完成當前消息并反饋后,才會收到另外一條消息或任務。這樣就避免了負載不均衡的事情了
注意:如果服務器所有消費者負載都很高,你的隊列很可能會被塞滿。這時我們就要考慮增加更多的消費者或者其他方案進行解決
channel.queueDeclare(QUEUE_NAME, true, false, false, null);//確定獲取數量channel.basicQos(1);生產者代碼
package com.ll.mq.hellomq.queue;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;/*** * @author ll 生產者**/ public class Producer {public final static String QUENE_NAME = "hello";// 定義隊列名稱public static void main(String[] args) {try {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory();// RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");// 創建一個連接Connection connection = factory.newConnection();// 創建一個頻道Channel channel = connection.createChannel();//設置為持久化channel.queueDeclare(QUENE_NAME, true, false, false, null);// 發送消息到隊列中for(int i = 0 ; i < 6; i++){String message = "Hello mq! " + i;channel.basicPublish("", QUENE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [Producer] Sent '" + message + "'");}// 關閉頻道和連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();} }}消費者1
package com.ll.mq.hellomq.queue;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;/*** * @author ll ConsumerOne **/ public class ConsumerOne {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory(); // 設置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456"); // 創建一個連接Connection connection = factory.newConnection(); // 創建一個頻道final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ConsumerOne [x] Received '" + message + "'");try {work(message);} finally {System.out.println("ConsumerOne [x] Done");// 消息處理完成確認channel.basicAck(envelope.getDeliveryTag(), false);}}}; // 自動回復隊列應答channel.basicConsume(QUEUE_NAME, false, consumer);}//睡眠public static void work(String task) {try {System.out.println("task++++========"+task);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}} }運行結果:
ConsumerOne [x] Received 'Hello mq! 1'
task++++========Hello mq! 1
ConsumerOne [x] Done
ConsumerOne [x] Received 'Hello mq! 4'
task++++========Hello mq! 4
ConsumerOne [x] Done
消費者2
package com.ll.mq.hellomq.queue;import java.io.IOException;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /*** * @author ll ConsumerTwo **/ public class ConsumerTwo {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory(); // 設置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456"); // 創建一個連接Connection connection = factory.newConnection(); // 創建一個頻道final Channel channel = connection.createChannel(); // 聲明要關注的隊列 -- 在RabbitMQ中,隊列聲明是冪等性的(一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同),也就是說,如果不存在,就創建,如果存在,不會對已經存在的隊列產生任何影響。channel.queueDeclare(QUEUE_NAME, true, false, false, null);// DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ConsumerTwo [x] Received '" + message + "'");try {work(message);} finally {System.out.println("ConsumerTwo [x] Done");// 消息處理完成確認channel.basicAck(envelope.getDeliveryTag(), false);}}}; // 自動回復隊列應答channel.basicConsume(QUEUE_NAME, false, consumer);}//睡眠public static void work(String task) {try {System.out.println("task++++========"+task);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}} }
運行結果
ConsumerTwo [x] Received 'Hello mq! 2'
task++++========Hello mq! 2
ConsumerTwo [x] Done
ConsumerTwo [x] Received 'Hello mq! 5'
task++++========Hello mq! 5
ConsumerTwo [x] Done
? ? ?
參考?https://www.rabbitmq.com/api-guide.html
下一篇 發布訂閱?https://blog.csdn.net/lilongwangyamin/article/details/105112696
總結
以上是生活随笔為你收集整理的RabbitMq队列 queue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【云服务】浅析XaaS
- 下一篇: 移动语义-右值引用-完美转发-万字长文让