rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...
分布式事務
我們知道在單數據庫系統中,實現數據的一致性,通過數據庫的事務來處理比較簡單。在微服務或分布式系統中,各個獨立的服務都會有自己的數據庫,而不是在同一個數據庫中,所以當一組事務(如商品交易中,商品的庫存、用戶的賬戶資金和交易記錄等)的處理是分布在不同數據庫中的,分布式事務就是為了解決在多個數據庫節點中保證這些數據的一致性。
分布式事務里有個BASE理論,在分布式數據庫中,存在強一致性和弱一致性。
強一致性的好處是,對于開發者來說比較友好,數據始終可以讀取到最新值,但這種方式需要復雜的協議,并且需要犧牲很多的性能。
弱一致性,對于開發者來說相對沒有那么友好,無法保證讀取的值是最新的,但是不需要引入復雜的協議,也不需要犧牲很多的性能。
弱一致性是當今企業采用的主流方案,它并不能保證所有數據的實時一致性,所以有時候實時讀取數據是不可信的。它只是在正常的流程中,加入了提供修復數據的可能性,從而減少數據不一致的可能性,大大降低數據不一致的可能性。
什么時候使用分布式事務
對于像電商中用戶隱私信息、商品信息、交易記錄以及資金等數據,這些具備價值的核心數據,關系到用戶隱私和財產的內容,應該考慮使用分布式事務來保證一致性。
但對于用戶評價、自身裝飾和其他一些非重要的個性化信息,可以采用非事務的處理。因為一個正常的系統出現不一致的情況是小概率事件,而非大概率事件,對于一些小概率的數據丟失,一般來說是允許的。之所以這樣選擇,主要基于兩點,一個是開發者的開發難度;另一個是用戶的體驗,過多的分布式事務會造成性能的不斷丟失
弱一致性分布式事務解決方案有如下幾種:
狀態表
RabbitMQ可靠事件
最大嘗試
TCC模式
冪等性
在分布式事務中,各個訪問操作的接口,都需要保證冪等性。
所謂冪等性,是指在HTTP協議中,一次和多次請求某一個資源,對于資源本身應該具有同樣的結果,也就是其執行任意多次時,對資源本身所產生的影響,與執行一次時的相同。
實現方式有以下幾種:
唯一索引 -- 防止新增臟數據
token機制 -- 防止頁面重復提交
悲觀鎖 -- 獲取數據的時候加鎖(鎖表或鎖行)
樂觀鎖 -- 基于版本號version實現, 在更新數據那一刻校驗數據
分布式鎖 -- redis(jedis、redisson)或zookeeper實現
狀態機 -- 狀態變更, 更新數據時判斷狀態
※說明:如何實現接口的冪等性,可以分篇在接口的冪等性文章里解說。
狀態表實現分布式事務
這里拿電商的商品交易為例,講述下思路:
需要商品數據庫:商品表、商品交易明細表;資金數據庫:用戶賬戶表、賬戶交易明細表
主要流程包括:
商品表減商品庫存、
商品交易明細表中添加新的交易記錄、
用戶賬戶表中扣減用戶賬戶表的資金、
資金交易明細表中記錄賬戶交易明細表
需要準備一個狀態表,用redis的Hset數據類型比較合適
這里假設相關的明細記錄表中,有4個狀態:
1--準備交易,
2--交易成功,
3--被沖正,
4--沖正記錄
交易流程
流程說明
在商品服務中,商品減庫存后,記錄商品交易明細,如果沒有異常,就將商品交易記錄的狀態位設置為“1—準備提交”,并且記錄在Redis的狀態表中。
商品服務通過RESTFUL調用資金服務,如果成功,就將賬戶交易明細表的記錄的狀態位設置為“1—準備提交”,并且記錄在Redis的狀態表中。
最后,讀取Redis相關的所有狀態位,確定是否所有的操作都為“1—準備提交”狀態,如果是,則更新產品服務的記錄狀態為“2—提交成功”,然后發起資金服務調用,將對應的記錄(可通過業務流水號關聯)的狀態也更新為“2—提交成功”,這樣就完成了整個交易。
如果不全部為“1—準備提交”狀態,則發起各庫的沖正交易,沖掉原有的記錄,并且歸還商品庫存和賬戶金額。發起沖正交易,把原明細記錄狀態更新為3--被沖正,并往明細表中添加對應的新記錄,狀態為4--沖正記錄
RabbitMQ可靠事件
使用RabbitMQ等消息隊列中間件的可靠事件,來實現分布式事務,這里結合SpringBoot
前面有介紹過SpringBoot整合多數據庫的文章,這里可以用到,具體參考《Spring Boot學習:MyBatis配置Druid多數據源》,切換數據源使用@DataSource注解,如下
@DataSource(value = DataSourceType.MASTER) //切換到商品數據庫
@DataSource(value = DataSourceType.SLAVE) //切換到賬戶數據庫
在此基礎上我們加入RabbitMQ實現分布式事務功能
在pom.xml文件中加入依賴
org.springframework.boot
spring-boot-starter-amqp
yml配置文件中,關于RabbitMQ的配置如下:
# Spring 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
#使用發布者確認模式,發布消息者會得到一個“消息是否被服務提供者接收”的確認消息
publisher-confirms: true
#RabbitMQ 隊列名稱配置
rabbitmq:
queue:
fund: fund
3.創建RabbitMQ配置文件RabbitConfig.java
package com.zhlab.demo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitConfig
* @Description //RabbitMQ消息隊列配置
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:10
**/
@Configuration
public class RabbitConfig {
// 讀取配置屬性
@Value("${rabbitmq.queue.fund}")
private String fundQueueName = null;
// 創建RabbitMQ消息隊列
@Bean(name="fundQueue")
public Queue createFundQueue() {
return new Queue(fundQueueName);
}
}
創建數據傳輸對象FundParams.java
package com.zhlab.demo.model;
import java.io.Serializable;
/**
* @ClassName FundParams
* @Description //FundParams
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:30
**/
public class FundParams implements Serializable {
// 序列化版本號
public static final long serialVersionUID = 989878441231256478L;
private Long xid; // 業務流水號
private Long userId; // 用戶編號
private Double amount; // 交易金額
public FundParams() {
}
public FundParams(Long xid, Long userId, Double amount) {
this.xid = xid;
this.userId = userId;
this.amount = amount;
}
public Long getXid() {
return xid;
}
public void setXid(Long xid) {
this.xid = xid;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
}
創建商品服務 業務邏輯PurchaseService.java
package com.zhlab.demo.service.goods;
import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import com.zhlab.demo.utils.SnowFlakeUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @ClassName PurchaseService
* @Description //商品 業務邏輯
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:24
**/
@Service
public class PurchaseService implements RabbitTemplate.ConfirmCallback {
//實現RabbitTemplate.ConfirmCallback接口
//需要實現它定義的confirm方法,這樣它便可以作為一個發布者檢測消息是否被消費者所接收的確認類
// SnowFlake算法生成ID
SnowFlakeUtil worker = new SnowFlakeUtil(003);
// RabbitMQ模板
@Autowired
private RabbitTemplate rabbitTemplate;
// 讀取配置屬性
@Value("${rabbitmq.queue.fund}")
private String fundQueueName;
// 購買業務方法
@DataSource(value = DataSourceType.MASTER) //切換到商品數據庫
public Long purchase(Long productId, Long userId, Double amount) {
rabbitTemplate.setConfirmCallback(this);//設置了回調類為當前類
// SnowFlake算法生成序列號,用戶跨服務的關聯,這里用本地自定義方法,可以借助Leaf TinyID等分布式ID生成服務中間件
Long xid = worker.nextId();
// 傳遞給消費者的參數
FundParams params = new FundParams(xid, userId, amount);
// 發送消息給資金服務做扣款
this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④
System.out.println("執行產品服務邏輯");
return xid;
}
/**
* 確認回調,會異步執行
* @param correlationData --相關數據
* @param ack -- 是否被消費
* @param cause -- 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
/*
* ack代表是否成功。
* 如果投遞消息失敗,就會先停滯1秒,然后嘗試進行沖正交易,沖掉原有交易,這樣就可以使得數據平整
*/
if (ack){ // 消息投遞成功
System.out.println("執行交易成功");
} else { // 消息投遞失敗
try {
// 停滯1秒(稍微等待可能沒有完成的正常流程),然后發起沖正交易
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("嘗試產品減庫存沖正交易。");
System.out.println("嘗試賬戶扣減沖正交易。");
//在confirm方法中,如果參數ack為false,則說明消息傳遞失敗,就要嘗試執行沖正交易,把數據還原回來
System.out.println(cause); // 打印消息投遞失敗的原因
}
}
}
創建賬戶服務業務邏輯AccountService.java
package com.zhlab.demo.service.fund;
import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @ClassName AccountService
* @Description //賬戶 業務邏輯
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:25
**/
@Service
public class AccountService {
/* 消息監聽,取YAML文件配置的隊列名
*因為消息被消費,所以觸發PurchaseService類的confirm方法
*spring.rabbitmq.listener.simple.acknowledge-mode = manual
*如果配置為手動,這里就需要手動確認消息,默認為自動的
*自動確認:這種模式下,當發送者發送完消息之后,它會自動認為消費者已經成功接收到該條消息。
*這種方式效率較高,當時如果在發送過程中,如果網絡中斷或者連接斷開,將會導致消息丟失
*手動確認:消費者成功消費完消息之后,會顯式發回一個應答(ack信號),
*RabbitMQ只有成功接收到這個應答消息,才將消息從內存或磁盤中移除消息。
*這種方式效率較低點,但是能保證絕大部分的消息不會丟失,當然肯定還有一些小概率會發生消息丟失的情況
*主要方法:basicAck、basicNack、basicReject根據具體業務情況使用,配合redis做冪等檢驗
*/
@RabbitListener(queues = "${rabbitmq.queue.fund}")
@DataSource(value = DataSourceType.SLAVE) //切換到賬戶數據庫
public void dealAccount(FundParams params) {
//TODO具體業務邏輯需自己實現
System.out.println("扣減賬戶金額邏輯......");
}
}
7.寫個測試接口來測試一下,創建MqController.java
package com.zhlab.demo.controller;
import com.zhlab.demo.service.goods.PurchaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName MqController
* @Description //RabbitMQ可靠消息 接口測試
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 下午 2:25
**/
@RestController
@RequestMapping("/mq")
public class MqController {
@Autowired
private PurchaseService purchaseService;
@GetMapping("/test")
public String testMq() {
return purchaseService.purchase(1L, 1L, 200.0) + "";
}
}
以上就是基于RabbitMQ可靠消息 實現的分布式事務處理,邏輯和說明都在注釋里了。
※說明:這樣的確認方式,只是保證了事件的有效傳遞,但是不能保證消費類能夠沒有異常或者錯誤發生,當消費類有異常或錯誤發生時,數據依舊會存在不一致的情況。這樣的方式,只是保證了消息傳遞的有效性,降低了不一致的可能性,從而大大降低了后續需要運維和業務人員處理的不一致數據的數量
TCC補償事務
TCC代表的是
try(嘗試)
confirm(確認)
cancel(取消)
在TCC事務中,要求任何一個服務邏輯都有3個接口,它們對應的就是嘗試(try)方法、確認(confirm)方法和取消(cancel)方法。
TCC事務模型
TCC事務的一致性可達99.99%,是一種較為成熟的方案,因此在目前有著較為廣泛的應用。
繼續通過上面的商品交易流程來解析這個模型:
一階段
商品表減庫存,商品交易明細表記錄商品交易明細,并且將對應記錄狀態設置為“1—準備提交”。
調用賬戶服務,用戶賬戶表扣減賬戶資金,賬戶交易明細表記錄交易明細,并且將對應記錄狀態設置為“1—準備提交”
在一階段的調用中,如果沒有發生異常,就可以執行正常二階段進行提交了
正常二階段
商品服務 更新對應記錄的狀態為“2—提交成功”,使得數據生效
調用賬戶服務,使得對應的記錄狀態也為“2—提交成功”,這樣正常的提交就完成了
如果在一階段發生異常,需要取消操作,可以執行異常二階段
異常二階段
商品服務執行沖正交易,沖掉原有的產品交易,將庫存歸還給商品表
調用賬戶服務,發起沖正交易,沖掉原有的資金交易,將資金歸還到賬戶里
注意,這些提交和退出機制在TCC中,都需要開發者對接口作冪等性處理
TCC事務機制,也并不能保證所有的數據都是完全一致的,它只是提供了一個可以修復的機制,來降低不一致的情況,從而大大降低后續維護數據的代價。TCC事務也會帶來兩個較大的麻煩:第一個是,原本的一個方法實現,現在需要拆分為3個方法,代價較大;第二個是,需要開發者自已實現提交和取消方法的冪等性
總結
使用分布式事務,并不是很容易的事情,甚至有些方法還相當復雜。
在互聯網中,并不是所有的數據都需要使用分布式事務,所以首先要考慮的是:在什么時候使用分布式事務。即使需要使用分布式事務,有時候也并非需要實時實現數據的一致性,因為可以在后續通過一定的手段來完成。例如電商網站,對買家來說,需要的是快速響應,但對商家來說,就未必需要得到實時數據了,過段時間得到數據也是可以的,而這段時間就可以考慮進行數據補償了。無論我們如何使用分布式事務,也無法使數據完全達到百分之百的一致性,因此一般金融和電商企業會通過對賬等形式來完成最終一致性的操作。
在分布式事務的選擇中,都會采用弱一致性代替強一致性,相對來說,弱一致性更加靈活,更方便我們開發。從網站的角度來說,弱一致性可以獲得更佳的性能,提升用戶的體驗,這是互聯網應用需要首先考慮的要素。
拓展---電商中的高并發和分布式事務
電商網站中高并發是常見的,高并發是針對用戶而言的,比如搶購中,用戶只希望短時間內快速搶到商品,而商家對于交易信息可以延遲處理得到。
這就是意味著,對于用戶交易部分,要盡可能通過分布式事務進行保證,但而對于商戶數據部分,實時性要求相對不是那么高,可以過段時間通過后續手段來補償修復,從而縮小分布式事務的范圍。
確定需要分布式事務的范圍
這里可以看出使用分布式事務的主要是請求數據,保證這個過程可以提高數據可靠性。對于商戶數據,不需要使用分布式事務,這樣可以提升性能,使搶購進行得更快,滿足買家的需求,但是這也會引發數據的丟失。為了解決這個問題,后續可以通過和請求數據進行對比來修復數據,使數據達到一致,這個過程可以在高并發過后(一般高并發都是時間段性的,如性價比高的產品發布點、購物節開始時間段)進行,這樣商戶最終也可以得到可靠的數據,只是不是實時的,但是這并不影響商戶和用戶的業務。
總結
以上是生活随笔為你收集整理的rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 校招笔试C语言,校招c ++笔试题汇总
- 下一篇: c 语言 16进制写法,C语言16进制中