rabbitmq手动确认ack
生活随笔
收集整理的這篇文章主要介紹了
rabbitmq手动确认ack
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
【README】
參考 https://blog.csdn.net/u012943767/article/details/79300673 ;
?
【0】聲明交換機,隊列 與綁定
/*** 交換機,隊列聲明與綁定 */ public class AckDeclarer {/** 確認交換機 */public static final String ACK_EXCHANGE2 = "ACK_EXCHNAGE2";/** 確認隊列 */public static final String ACK_QUEUE2 = "ACK_QUEUE2";/** 路由鍵 */public static final String ACK_ROUTE2 = "ACK_ROUTE2";// 四種Exchange 模式 // direct :需要生產(chǎn)者和消費者綁定相同的Exchange和routing key。 // fanout:廣播模式需要生產(chǎn)者消費者綁定相同的Exchange。 // topic:支持模糊匹配的廣播模式以點分隔,*表示一個單詞,#表示任意數(shù)量(零個或多個)單詞。 // header:根據(jù)生產(chǎn)者和消費者的header中信息進行匹配性能較差 ,x-match [all 匹配所有/any 任意一個]。public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel(); /* 聲明交換機 */channel.exchangeDeclare(ACK_EXCHANGE2, BuiltinExchangeType.FANOUT);/* 聲明隊列 */channel.queueDeclare(ACK_QUEUE2, true, false, false, null); System.out.println(String.format("聲明交換機【%s】,隊列【%s】成功", ACK_EXCHANGE2, ACK_QUEUE2));/* 把隊列綁定到交換機 */channel.queueBind(ACK_QUEUE2, ACK_EXCHANGE2, ACK_ROUTE2);/* 關(guān)閉信道和連接 */channel.close();conn.close(); } }【1】生產(chǎn)者
/*** 消息確認生產(chǎn)者*/ public class AckProducer {public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();String[] messages = new String[]{"first.-04130828", "second..-04130828", "third...-04130828", "fourth....-04130828", "fiveth.....-04130828", "6th.....-04130828", "7th.....-04130828", "8th.....-04130828", "9th.....-04130828", "10th.....-04130828"};for (String msg : messages) {channel.basicPublish(AckDeclarer.ACK_EXCHANGE2, AckDeclarer.ACK_ROUTE2, null, msg.getBytes());System.out.println(msg + " is sent"); } System.out.println("消息發(fā)送完成"); channel.close(); conn.close();} }【2】自動確認消費者
/*** 自動確認消費者*/ public class AutoAckConsumer {public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();System.out.println("等待消費1");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("接收到的消息=" + msg); try {doWork(msg);} catch (InterruptedException e) {e.printStackTrace();}}};channel.basicConsume(AckDeclarer.ACK_QUEUE2, true, consumer);// 為true自動確認 }private static void doWork(String task) throws InterruptedException {for (char ch : task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}} }【3】手動確認消費者
/*** 手動確認消費者*/ public class ManualAckConsumer {public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();System.out.println("手動確認消費者等待消費1");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("接收到的消息=" + msg); try {doWork(msg);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("消費done,手動確認ack,msg=" + msg);channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認}}};// 手動確認,向rabbitmq 服務(wù)器手動發(fā)送ack成功消費標(biāo)識channel.basicConsume(AckDeclarer.ACK_QUEUE2, false, consumer);// 為false手動確認 }private static void doWork(String task) throws InterruptedException {for (char ch : task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}} }手動確認消費者日志?
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 手動確認消費者等待消費1 接收到的消息=first.-04130828 消費done,手動確認ack,msg=first.-04130828 接收到的消息=second..-04130828 消費done,手動確認ack,msg=second..-04130828 接收到的消息=third...-04130828 消費done,手動確認ack,msg=third...-04130828 接收到的消息=fourth....-04130828 消費done,手動確認ack,msg=fourth....-04130828 接收到的消息=fiveth.....-04130828 消費done,手動確認ack,msg=fiveth.....-04130828 接收到的消息=6th.....-04130828 消費done,手動確認ack,msg=6th.....-04130828 接收到的消息=7th.....-04130828 消費done,手動確認ack,msg=7th.....-04130828 接收到的消息=8th.....-04130828 消費done,手動確認ack,msg=8th.....-04130828 接收到的消息=9th.....-04130828 消費done,手動確認ack,msg=9th.....-04130828 接收到的消息=10th.....-04130828 消費done,手動確認ack,msg=10th.....-04130828?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq手动确认ack的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 百度收录的怎么删除(百度收录如何删除)
- 下一篇: 怎么在图片中加入网址(怎么在图片中加入网