java死信队列_RabbitMQ 死信队列是什么鬼?
死信隊(duì)列
死信隊(duì)列:沒有被及時(shí)消費(fèi)的消息存放的隊(duì)列。
消息沒有被及時(shí)消費(fèi)的原因:
a.消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false
b.TTL(time-to-live) 消息超時(shí)未消費(fèi)
c.達(dá)到最大隊(duì)列長度
實(shí)現(xiàn)死信隊(duì)列步驟
首先需要設(shè)置死信隊(duì)列的 exchange 和 queue,然后進(jìn)行綁定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # 代表接收所有路由 key
然后我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過我們需要在普通隊(duì)列加上一個(gè)參數(shù)即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )
這樣消息在過期、requeue失敗、 隊(duì)列在達(dá)到最大長度時(shí),消息就可以直接路由到死信隊(duì)列!
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
//設(shè)置連接以及創(chuàng)建 channel 湖綠
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
//我們?cè)O(shè)置消息過期時(shí)間,10秒后再消費(fèi) 讓消息進(jìn)入死信隊(duì)列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
//創(chuàng)建連接、創(chuàng)建channel忽略 內(nèi)容可以在上面代碼中獲取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
//必須設(shè)置參數(shù)到 arguments 中
Map arguments = new HashMap();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//將 arguments 放入隊(duì)列的聲明中
channel.queueDeclare(queueName, true, false, false, arguments);
//一般不用代碼綁定,在管理界面手動(dòng)綁定
channel.queueBind(queueName, exchangeName, routingKey);
//聲明死信隊(duì)列
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
//路由鍵為 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列
channel.basicConsume(queueName, true, consumer);
}
}
總結(jié)
DLX也是一個(gè)正常的 Exchange,和一般的 Exchange 沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ 就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的 Exchange 上去,進(jìn)而被路由到另一個(gè)隊(duì)列。可以監(jiān)聽這個(gè)隊(duì)列中消息做相應(yīng)的處理。
近期熱文推薦:
覺得不錯(cuò),別忘了隨手點(diǎn)贊+轉(zhuǎn)發(fā)哦!
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的java死信队列_RabbitMQ 死信队列是什么鬼?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JAVA的思维逻辑_[Java教程]计算
- 下一篇: java和opencv配置_Java——