rabbitmq系列(三)消息幂等性处理
一、springboot整合rabbitmq
小說網(wǎng) m.198200.com我們啟動生產(chǎn)者,然后請求send接口,然后打開rabbitmq控制臺發(fā)現(xiàn)多了一個名為”byte-zb“的交換機和隊列,并且隊列中出現(xiàn)了一個未消費的消息,然后啟動消費者,我們會在控制臺上發(fā)現(xiàn)打印了一條消息,同時rabbitmq控制臺中”byte-zb“的隊列中消息沒有了。
二、自動補償機制
如果消費者消息消費不成功的話,會出現(xiàn)什么情況呢?我們修改一下消費者代碼,然后看看。
@Component public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(String message) throws Exception {System.out.println("接收到的消息為"+message);int i = 1 / 0;} }我們會看到消費者工程控制臺一直在刷新報錯,當消費者配出異常,也就是說當消息消費不成功的話,該消息會存放在rabbitmq的服務端,一直進行重試,直到不拋出異常為止。
如果一直拋異常,我們的服務很容易掛掉,那有沒有辦法控制重試幾次不成功就不再重試了呢?答案是有的。我們在消費者application.yml中增加一段配置。
spring:rabbitmq:# 連接地址host: 127.0.0.1# 端口port: 5672# 登錄賬號username: guest# 登錄密碼password: guest# 虛擬主機virtual-host: /listener:simple:retry:enabled: true # 開啟消費者進行重試max-attempts: 5 # 最大重試次數(shù)initial-interval: 3000 # 重試時間間隔上面配置的意思是消費異常后,重試五次,每次隔3s。繼續(xù)啟動消費者看看效果,我們發(fā)現(xiàn)重試五次以后,就不再重試了。
三、結(jié)合實際案例來使用消息補償機制
像上面那種情況出現(xiàn)的異常其實不管怎么重試都不會成功,實際上用到消息補償?shù)木褪钦{(diào)用第三方接口的這種。
案例:生者往隊列中扔一條消息,包含郵箱和發(fā)送內(nèi)容。消費者拿到消息后將調(diào)用郵件接口發(fā)送郵件。有時候可能郵件接口由于網(wǎng)絡等原因不通,這時候就需要去重試了。
在調(diào)用接口的工具類中,如果出現(xiàn)異常我們直接返回null,工具類具體代碼就不貼了,如果返回null之后怎么處理呢?我們只需要拋出異常,rabbitListener捕獲到異常后就會自動重試。
我們改造一下消費者代碼:
@Component public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(String message) throws Exception {System.out.println("接收到的消息為"+message);JSONObject jsonObject = JSONObject.parseObject(message);String email = jsonObject.getString("email");String content = jsonObject.getString("timestamp");String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;// 如果發(fā)生異常則返回nullString body = HttpUtils.httpGet(httpUrl, "utf-8");//if(body == null){throw new Exception();}} }當然我們可以自定義異常拋出。具體怎么試驗呢,第一步啟動生產(chǎn)者和消費者,這時候我們發(fā)現(xiàn)消費者在重試,第二步我們啟動郵件服務,這時候我們會發(fā)現(xiàn)郵件發(fā)送成功了,消費者不再重試了。
四、解決消息冪等性問題
一些剛接觸java的同學可能對冪等性不太清楚。冪等性就是重復消費造成結(jié)果不一致。為了保證冪等性,因此消費者消費消息只能消費一次消息。我么可以是用全局的消息id來控制冪等性。當消息被消費了之后我們可以選擇緩存保存這個消息id,然后當再次消費的時候,我們可以查詢緩存,如果存在這個消息id,我們就不錯處理直接return即可。先改造生產(chǎn)者代碼,在消息中添加消息id:
@RequestMapping("/send")public void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("email","11111111111");jsonObject.put("timestamp",System.currentTimeMillis());String json = jsonObject.toJSONString();System.out.println(json);Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,message);}消費者代碼改造:
@Component public class Consumer {public static final String QUEUE_NAME = "byte-zb";@RabbitListener(queues = QUEUE_NAME)public void receiveMessage(Message message) throws Exception {Jedis jedis = new Jedis("localhost", 6379);String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");System.out.println("接收導的消息為:"+msg+"==消息id為:"+messageId);String messageIdRedis = jedis.get("messageId");if(messageId == messageIdRedis){return;}JSONObject jsonObject = JSONObject.parseObject(msg);String email = jsonObject.getString("email");String content = jsonObject.getString("timestamp");String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;// 如果發(fā)生異常則返回nullString body = HttpUtils.httpGet(httpUrl, "utf-8");//if(body == null){throw new Exception();}jedis.set("messageId",messageId);} }我們在消費者端使用redis存儲消息id,只做演示,具體項目請根據(jù)實際情況選擇相應的工具進行存儲。
如果文章對您有幫助,請記得點贊關注喲~
歡迎大家關注我的公眾號:字節(jié)傳說,每日推送技術文章供大家學習參考。
總結(jié)
以上是生活随笔為你收集整理的rabbitmq系列(三)消息幂等性处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java基础-简聊类与对象
- 下一篇: babe的l使用步骤记录