rabbitmq 查询版本_基于rabbitmq解决分布式事务
分布式事務(wù)要解決的問題是保證二個(gè)數(shù)據(jù)庫(kù)數(shù)據(jù)的一致性,本地事務(wù)ACID屬于剛性事務(wù),基于CAP理論,分布式事務(wù)的核心要點(diǎn)柔性事務(wù),最終一致性。
基于rabbitmq解決分布式事務(wù)要點(diǎn)如下
- 生產(chǎn)者采用發(fā)送方確認(rèn)機(jī)制加上mandatory參數(shù)或者備份交換機(jī),保證消息被正確地到達(dá)隊(duì)列中。
- 隊(duì)列、交換機(jī)、消息都需要持久化(可以考慮鏡像隊(duì)列機(jī)制,如果業(yè)務(wù)不是那么重要,比如短信郵件通知)。
- 消費(fèi)端采用手動(dòng)應(yīng)答的方式,確認(rèn)消息已經(jīng)被正確消費(fèi)。
rabbitmq配置
spring:application:name: rabbitmq-servicerabbitmq:addresses: 192.168.137.128:5672,192.168.137.129:5672username: rootpassword: rootvirtual-host: transactionpublisher-confirms: true #開啟發(fā)送端確認(rèn)機(jī)制publisher-returns: true # 開啟returnstemplate:mandatory: true # 交換機(jī)沒有匹配的隊(duì)列時(shí),會(huì)將消息返回給生產(chǎn)者,避免消息丟失listener:simple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 3000server:port: 8400eureka:client:serviceUrl:defaultZone: http://localhost:8100/eureka/instance:prefer-ip-address: trueinstance-id: ${spring.cloud.client.ip-address}:${server.port}mq:common:exchange: common.exchangequeue: common.queueroutingkey: common.routing
代碼案例是下單減庫(kù)存
生產(chǎn)者代碼
package com.liwen.mqservice.producer;import com.liwen.entity.Order;
import com.liwen.feign.IOrderServiceFeign;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQOrderProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Value("${mq.common.exchange}")private String exchangeName;@Value("${mq.common.routingkey}")private String routingKey;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate IOrderServiceFeign orderServiceFeign;public void send(JSONObject json){rabbitTemplate.setConfirmCallback(this);//發(fā)送到確認(rèn)機(jī)制rabbitTemplate.setReturnCallback(this);//消息Return回調(diào)Order order= (Order) JSONObject.toBean(json.getJSONObject("order"), Order.class);orderServiceFeign.saveOrder(order);String orderId = order.getOrderId();String goodId = order.getGoodId();json.put("orderId", orderId);json.put("goodId", goodId);CorrelationData correlationData = new CorrelationData(orderId);rabbitTemplate.convertAndSend(exchangeName,routingKey, json.toString(), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設(shè)置消息持久化return message;}}, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){/** 處理消息沒有到達(dá)交換機(jī),數(shù)據(jù)丟失的情況* 根據(jù)訂單號(hào)查詢到訂單數(shù)據(jù),并將數(shù)據(jù)保存到異常消息表中,定時(shí)補(bǔ)發(fā),并報(bào)警人工處理* */String orderId = correlationData.getId();}else{//查詢訂單號(hào)是否在異常消息表,在的話要?jiǎng)h除log.info(">>>下單消息發(fā)送成功{}<<<",correlationData);}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {//消息到達(dá)交換機(jī),沒有路由到隊(duì)列,根據(jù)訂單號(hào)查詢到訂單數(shù)據(jù),并將數(shù)據(jù)保存到異常消息表中,定時(shí)補(bǔ)發(fā),并報(bào)警人工處理/** 1 交換機(jī)沒有綁定隊(duì)列* 2 交換機(jī)根據(jù)路由鍵沒有匹配到隊(duì)列* 3 隊(duì)列消息已滿* */byte[] body = message.getBody();JSONObject json = JSONObject.fromObject(new String(body));System.out.println("return============================");System.out.println(message);}
}對(duì)應(yīng)生產(chǎn)者發(fā)送消息到rabbitmq有以下幾種情況:
1 沒有發(fā)送到交換機(jī)的數(shù)據(jù),會(huì)回調(diào)public void confirm(CorrelationData correlationData, boolean ack, String cause) 方法(ack為false),我們可以在發(fā)送消息時(shí)把業(yè)務(wù)參數(shù)比如訂單號(hào)設(shè)置到correlationData參數(shù)中,回調(diào)時(shí)把相關(guān)消息保存到異常消息表,采用定時(shí)任務(wù)兜底,并報(bào)警通知相關(guān)人員。
2 對(duì)于發(fā)送方確認(rèn)機(jī)制,只能保證消息到達(dá)rabbitmq的交換機(jī)(ack為true),如果此交換機(jī)沒有匹配的隊(duì) 列,那么消息會(huì)丟失,所以需要結(jié)合mandatory(設(shè)置為true),當(dāng)交換機(jī)沒有匹配的隊(duì)列時(shí),會(huì)回調(diào) public void returnedMessage(Message message, int i, String s, String s1, String s2)將消息返回給生產(chǎn)者,我們保存消息body部分,采用定時(shí)任務(wù)兜底,并報(bào)警通知相關(guān)人員。
2 交換機(jī)、隊(duì)列、消息(重要的數(shù)據(jù)比如支付數(shù)據(jù))需要持久化,避免消息在rabbitmq重啟、宕機(jī)等異常情況下造成消息丟失。
3 對(duì)應(yīng)消費(fèi)者,需要手動(dòng)應(yīng)答,明確告訴rabbitmq服務(wù)器,已經(jīng)正確消費(fèi)了消息,然后rabbitmq服務(wù)器會(huì)刪除已經(jīng)被正確消費(fèi)的消息。如果rabbitmq沒有收到應(yīng)答消息(比如消費(fèi)者處理超時(shí)或者網(wǎng)絡(luò)不好),rabbitmq會(huì)間隔的講消息重新發(fā)給消費(fèi)者消費(fèi),這是消費(fèi)者需要根據(jù)消息id或者全局唯一業(yè)務(wù)字段做好冪等處理。如果是消費(fèi)者代碼有問題需要重新發(fā)布版本解決。兜底方案依然是保存消息到異常消息表,定時(shí)處理并報(bào)警通知相關(guān)人員處理。
總結(jié):基于rabbitmq解決分布式事務(wù),核心是最終一致性,比如電商下單減庫(kù)存、外賣下單派單等場(chǎng)景,核心是最終一致性。經(jīng)過一定的時(shí)間(具體能容忍多久看業(yè)務(wù)場(chǎng)景)二個(gè)數(shù)據(jù)庫(kù)中的數(shù)據(jù)最終達(dá)到一致。
完結(jié),不正確之處,望大佬指正!求點(diǎn)贊鼓勵(lì)~
總結(jié)
以上是生活随笔為你收集整理的rabbitmq 查询版本_基于rabbitmq解决分布式事务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电影下映后还会不会再次播出?
- 下一篇: 一个好一点的头戴式耳机多少钱?