rabbitmq-死信队列
【README】死信隊列是什么?
1)死信隊列:當消息在一個隊列中變成死信之后,它能被重新發(fā)送到另一個交換機中,這個交換機被稱為DLX(Dead-Letter-Exchange-DLX , 死信交換機);綁定DLX的隊列被稱之為死信隊列;
2)消息變成死信的原因有三個;
消息被拒絕;
消息過期;
隊列達到最大長度;
?
【1】如何配置死信隊列
1)首先死信隊列是相對于正常業(yè)務(wù)隊列而言的;
如?? 白條還款隊列為業(yè)務(wù)隊列, 當該隊列有消息過期或被消費者拒絕或隊列達到最大長度,則這些消息會被轉(zhuǎn)發(fā)到私信隊列; 又死信隊列通常情況下通過 路由鍵進行轉(zhuǎn)發(fā),所以需要在死信隊列前加一個交換機(俗稱死信交換機);
2)要想使用死信隊列,必須聲明死信交換機,可以在聲明正常業(yè)務(wù)隊列的時候設(shè)置? 正常業(yè)務(wù)隊列對應(yīng)的死信交換機(需要注意這個映射關(guān)系),如下:
/* 2-聲明正常隊列時,配置其對應(yīng)死信隊列 */ Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", DLX_EXCHANGE); //這個agruments屬性,要設(shè)置到聲明隊列上 channel.queueDeclare(NORM_QUEUE, true, false, false, agruments);【2】死信隊列荔枝
【2.1】 聲明隊列,交換機
step1)聲明正常業(yè)務(wù)隊列,正常業(yè)務(wù)交換機,定義正常業(yè)務(wù)路由鍵,以及三者間的綁定; 同時,在聲明正常業(yè)務(wù)隊列時,一定要設(shè)置其 死信交換機(DLX_EXCHANGE),作為屬性傳入聲明方法;
step2)聲明死信隊列,死信交換機(DLX_EXCHANGE), 定義死信隊列接收的路由鍵(#表示所有),以及三者間的綁定;
/*** 聲明正常隊列+交換機;聲明死信隊列+交換機 */ public class DlxDeclare {public static final String POSTFIX = "_tr1";/*** 正常隊列*/public static final String NORM_QUEUE = "norm_queue" + POSTFIX;/*** 正常交換機 */public static final String NORM_EXCHANGE = "norm_exchange" + POSTFIX;/*** 正常路由鍵*/public static final String NORM_ROUTE = "dlx.save" + POSTFIX;/*** 死信隊列*/public static final String DLX_QUEUE = "dlx_queue" + POSTFIX; /*** 死信交換機*/public static final String DLX_EXCHANGE = "dlx_exchange" + POSTFIX;/*** 死信隊列路由鍵-#-所有鍵 */public static final String DLX_ROUTE = "#"; public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();/*1-聲明正常交換機*/channel.exchangeDeclare(NORM_EXCHANGE, BuiltinExchangeType.DIRECT);/* 2-聲明正常隊列時,配置其對應(yīng)死信隊列 */ Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", DLX_EXCHANGE); //這個agruments屬性,要設(shè)置到聲明隊列上channel.queueDeclare(NORM_QUEUE, true, false, false, agruments);/* 3-綁定正常交換機,正常隊列和路由鍵 */ channel.queueBind(NORM_QUEUE, NORM_EXCHANGE, NORM_ROUTE);/*4-聲明死信交換機,死信隊列,并綁定兩者*/ channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, null);channel.queueDeclare(DLX_QUEUE, true, false, false, null);channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTE);} }【2.2】生產(chǎn)者
1)我們把消息的 ttl 設(shè)置為10秒,即該消息10秒內(nèi)沒有被成功消費,則消息轉(zhuǎn)發(fā)到死信隊列;
/*** 死信消息生產(chǎn)者 */ public class DlxProducer {public static void main(String[] args) throws Exception {/* 獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創(chuàng)建信道 Channel channel = conn.createChannel();AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 設(shè)置消息的ttl .build();/* 發(fā)送消息 */ long temp = System.currentTimeMillis();for (int i = 1; i <= 20; i++) { String msg = "我是 msg_norm消息,序號=" + (temp+i) + "時間=" + MyDateUtil.getNow(); channel.basicPublish(DlxDeclare.NORM_EXCHANGE, DlxDeclare.NORM_ROUTE, properties, msg.getBytes("UTF-8")); System.out.println("生產(chǎn)者發(fā)送消息" + msg); Thread.sleep(10);} /* 關(guān)閉連接和信道 */ channel.close();conn.close(); } }【3】效果
1)生產(chǎn)者把消息發(fā)送到正常交換機 norm_exchange_tr1
2)10秒后,由于沒有消費,消息被轉(zhuǎn)發(fā)到死信隊列 dlx_queue_tr1
?
總結(jié)
以上是生活随笔為你收集整理的rabbitmq-死信队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (思科anti ddos)
- 下一篇: 百度收录的怎么删除(百度收录如何删除)