RabbitMQ 消息队列六种模式
RabbitMQ 的第一個(gè)程序
RabbitMQ-生產(chǎn)者|消費(fèi)者
搭建環(huán)境
java client
生產(chǎn)者和消費(fèi)者都屬于客戶端, rabbitMQ的java客戶端如下
創(chuàng)建 maven 工程
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version> </dependency>AMQP協(xié)議的回顧
RabbitMQ支持的消息模型
第一種模型(直連)
在上圖的模型中,有以下概念:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來。
- queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
開發(fā)生產(chǎn)者
/*** 生產(chǎn)者* <p>* 直連模式** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 獲取連接中通道Channel channel = connection.createChannel();// 通道綁定消息隊(duì)列// 參數(shù)1 隊(duì)列的名稱, 如果不存在則自動(dòng)創(chuàng)建// 參數(shù)2 用來定義隊(duì)列是否需要持久化, true 持久化隊(duì)列(mq關(guān)閉時(shí), 會(huì)存到磁盤中) false 不持久化(關(guān)閉即失)// 參數(shù)3 exclusive 是否獨(dú)占隊(duì)列 true 獨(dú)占隊(duì)列 false 不獨(dú)占// 參數(shù)4 autoDelete 是否在消費(fèi)后自動(dòng)刪除隊(duì)列 true 自動(dòng)刪除 false 不刪除// 參數(shù)5 額外的附加參數(shù)channel.queueDeclare("hello", false, false, false, null);// 發(fā)布消息// 參數(shù)1 交換機(jī)名稱// 參數(shù)2 隊(duì)列名稱// 參數(shù)3 傳遞消息額外設(shè)置// 參數(shù)4 消息的具體內(nèi)容channel.basicPublish("", "hello", null, "hello rabbitMQ".getBytes());RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }開發(fā)消費(fèi)者
/*** 消費(fèi)者** @author mxz*/ @Component public class Customer {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("hello", false, false, false, null);// 消費(fèi)消息// 參數(shù)1 消息隊(duì)列的消息, 隊(duì)列名稱// 參數(shù)2 開啟消息的確認(rèn)機(jī)制// 參數(shù)3 消息時(shí)的回調(diào)接口channel.basicConsume("hello", true, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("new String(body)" + new String(body));}});// channel.close(); // connection.close();}}工具類
/*** @author mxz*/ public class RabbitMQUtils {private static ConnectionFactory connectionFactory;// 重量級資源 類加載執(zhí)行一次(即可)static {// 創(chuàng)建連接 mq 的連接工廠connectionFactory = new ConnectionFactory();// 設(shè)置 rabbitmq 主機(jī)connectionFactory.setHost("127.0.0.1");// 設(shè)置端口號connectionFactory.setPort(5672);// 設(shè)置連接哪個(gè)虛擬主機(jī)connectionFactory.setVirtualHost("/codingce");// 設(shè)置訪問虛擬主機(jī)用戶名密碼connectionFactory.setUsername("codingce");connectionFactory.setPassword("123456");}/*** 定義提供連接對象的方法** @return*/public static Connection getConnection() {try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 關(guān)閉通道和關(guān)閉連接工具方法** @param connection* @param channel*/public static void closeConnectionAndChannel(Channel channel, Connection connection) {try {// 先關(guān) channelif (channel != null)channel.close();if (connection != null)connection.close();} catch (Exception e) {e.printStackTrace();}} }第二種模型(work quene)
Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。
角色:
- P:生產(chǎn)者:任務(wù)的發(fā)布者
- C1:消費(fèi)者-1,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢
- C2:消費(fèi)者-2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
開發(fā)生產(chǎn)者
/*** 生產(chǎn)者* <p>* 任務(wù)模型 work quenue** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通過通道聲明隊(duì)列channel.queueDeclare("work", true, false, false, null);for (int i = 0; i < 10; i++) {// 生產(chǎn)消息channel.basicPublish("", "work", null, (" " + i + "work quenue").getBytes());}// 關(guān)閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }開發(fā)消費(fèi)者-1
/*** 自動(dòng)確認(rèn)消費(fèi) autoAck true 12搭配測試* <p>* 消費(fèi)者 1** @author mxz*/ @Component public class CustomerOne {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 消費(fèi)消息// 參數(shù)1 消息隊(duì)列的消息, 隊(duì)列名稱// 參數(shù)2 開啟消息的確認(rèn)機(jī)制// 參數(shù)3 消息時(shí)的回調(diào)接口channel.basicConsume("work", true, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息// 默認(rèn)分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}開發(fā)消費(fèi)者-2
/*** 自動(dòng)確認(rèn)消費(fèi) autoAck true 12搭配測試* <p>* 消費(fèi)者 2** @author mxz*/ @Component public class CustomerTwo {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", true, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));}});// channel.close(); // connection.close();}}測試結(jié)果
總結(jié):默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)。
消息自動(dòng)確認(rèn)機(jī)制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
消費(fèi)者3
/*** 能者多勞 34 搭配測試* <p>* 消費(fèi)者 3** @author mxz*/ @Component public class CustomerThree {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 每一次只能消費(fèi)一個(gè)消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 參數(shù)1 隊(duì)列名稱 參數(shù)2(autoAck) 消息自動(dòng)確認(rèn) true 消費(fèi)者自動(dòng)向 rabbitMQ 確認(rèn)消息消費(fèi) false 不會(huì)自動(dòng)確認(rèn)消息// 若出現(xiàn)消費(fèi)者宕機(jī)情況 消費(fèi)者三可以進(jìn)行消費(fèi)channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息// 默認(rèn)分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));// 手動(dòng)確認(rèn) 參數(shù)1 確認(rèn)隊(duì)列中channel.basicAck(envelope.getDeliveryTag(), false);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}消費(fèi)者4
/*** 能者多勞 34 搭配測試* <p>* 消費(fèi)者 4** @author mxz*/ @Component public class CustomerFour {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 每一次只能消費(fèi)一個(gè)消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));// 手動(dòng)確認(rèn) 參數(shù)1 手動(dòng)確認(rèn)channel.basicAck(envelope.getDeliveryTag(), false);}});// channel.close(); // connection.close();}}消費(fèi)者3
/*** 能者多勞 34 搭配測試* <p>* 消費(fèi)者 3** @author mxz*/ @Component public class CustomerThree {public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 每一次只能消費(fèi)一個(gè)消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);// 參數(shù)1 隊(duì)列名稱 參數(shù)2(autoAck) 消息自動(dòng)確認(rèn) true 消費(fèi)者自動(dòng)向 rabbitMQ 確認(rèn)消息消費(fèi) false 不會(huì)自動(dòng)確認(rèn)消息// 若出現(xiàn)消費(fèi)者宕機(jī)情況 消費(fèi)者三可以進(jìn)行消費(fèi)channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息// 默認(rèn)分配是平均的@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));// 手動(dòng)確認(rèn) 參數(shù)1 確認(rèn)隊(duì)列中channel.basicAck(envelope.getDeliveryTag(), false);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});// channel.close(); // connection.close();}}消費(fèi)者4
/*** 能者多勞 34 搭配測試* <p>* 消費(fèi)者 4** @author mxz*/ @Component public class CustomerFour {public static void main(String[] args) throws IOException {// 獲取連接對象Connection connection = RabbitMQUtils.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 每一次只能消費(fèi)一個(gè)消息channel.basicQos(1);// 通道綁定對象channel.queueDeclare("work", true, false, false, null);channel.basicConsume("work", false, new DefaultConsumer(channel) {// 最后一個(gè)參數(shù) 消息隊(duì)列中取出的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費(fèi)者-1" + new String(body));// 手動(dòng)確認(rèn) 參數(shù)1 手動(dòng)確認(rèn)channel.basicAck(envelope.getDeliveryTag(), false);}});// channel.close(); // connection.close();}}第三種模型(fanout)
fanout 扇出 也稱為廣播
在廣播模式下,消息發(fā)送流程是這樣的:
- 可以有多個(gè)消費(fèi)者
- 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
- 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
- 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
- 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
- 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
開發(fā)開發(fā)生產(chǎn)者
/*** 生產(chǎn)者* <p>* 任務(wù)模型 fanout** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 將通道聲明指定交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 代表交換機(jī)類型 fanout 廣播類型channel.exchangeDeclare("logs", "fanout");// 發(fā)送消息channel.basicPublish("logs", "", null, "fanout type message".getBytes());// 關(guān)閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }開發(fā)消費(fèi)者
- 消費(fèi)者 1
- 消費(fèi)者 2
- 消費(fèi)者 3
測試結(jié)果
第四種模型(Routing)
Routing 之訂閱模型-Direct(直連)
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在Direct模型下:
- 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
- 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
- Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息
流程:
圖解:
- P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
開發(fā)生產(chǎn)者
/*** @author mxz*/ public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 通過通道聲明交換機(jī) 參數(shù)1 交換機(jī)名稱 參數(shù)2 路由模式channel.exchangeDeclare("logs_direct", "direct");// 發(fā)送消息String routingKey = "error";channel.basicPublish("logs_direct", routingKey, null, ("這是 direct 模式發(fā)布基于 route_key [" + routingKey + "]").getBytes());// 關(guān)閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }開發(fā)消費(fèi)者
- 消費(fèi)者1
- 消費(fèi)者2
Routing 之訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!這種模型Routingkey 一般都是由一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
# 統(tǒng)配符* (star) can substitute for exactly one word. 匹配不多不少恰好1個(gè)詞# (hash) can substitute for zero or more words. 匹配一個(gè)或多個(gè)詞 # 如:audit.# 匹配audit.irs.corporate或者 audit.irs 等audit.* 只能匹配 audit.irs開發(fā)生產(chǎn)者
/*** 生產(chǎn)者* <p>** @author mxz*/ @Component public class Provider {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();// 聲明交換機(jī)以及交換機(jī)類型channel.exchangeDeclare("topics", "topic");// 路由keyString routeKey = "user.save";channel.basicPublish("topics", routeKey, null, ("這里是 topic 動(dòng)態(tài)路由模型, routeKey:[" + routeKey + "]").getBytes());// 關(guān)閉資源RabbitMQUtils.closeConnectionAndChannel(channel, connection);} }開發(fā)消費(fèi)者
- 消費(fèi)者
- 消費(fèi)者
文章已上傳gitee https://gitee.com/codingce/hexo-blog
項(xiàng)目地址: https://github.com/xzMhehe/codingce-java
文章已上傳gitee https://gitee.com/codingce/hexo-blog
項(xiàng)目地址github: https://github.com/xzMhehe/codingce-java
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 消息队列六种模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: idea 解决查看源码没有注释
- 下一篇: SpringBoot整合RabbitMQ