生活随笔
收集整理的這篇文章主要介紹了
使用Seata彻底解决Spring Cloud中的分布式事务问题!
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
摘要
Seata是Alibaba開源的一款分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務,本文將通過一個簡單的下單業務場景來對其用法進行詳細介紹。
什么是分布式事務問題?
單體應用
單體應用中,一個業務操作需要調用三個模塊完成,此時數據的一致性由本地事務來保證。
微服務應用
隨著業務需求的變化,單體應用被拆分成微服務應用,原來的三個模塊被拆分成三個獨立的應用,分別使用獨立的數據源,業務操作需要調用三個服務來完成。此時每個服務內部的數據一致性由本地事務來保證,但是全局的數據一致性問題沒法保證。
小結
在微服務架構中由于全局數據一致性沒法保證產生的問題就是分布式事務問題。簡單來說,一次業務操作需要操作多個數據源或需要進行遠程調用,就會產生分布式事務問題。
Seata簡介
定義一個分布式事務
我們可以把一個分布式事務理解成一個包含了若干分支事務的全局事務,全局事務的職責是協調其下管轄的分支事務達成一致,要么一起成功提交,要么一起失敗回滾。此外,通常分支事務本身就是一個滿足ACID的本地事務。這是我們對分布式事務結構的基本認識,與 XA 是一致的
協議分布式事務處理過程的三個組件
Transaction Coordinator (TC): 事務協調器,維護全局事務的運行狀態,負責協調并驅動全局事務的提交或回滾; Transaction Manager ?: 控制全局事務的邊界,負責開啟一個全局事務,并最終發起全局提交或全局回滾的決議; Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,并接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。
一個典型的分布式事務過程
TM 向 TC 申請開啟一個全局事務,全局事務創建成功并生成一個全局唯一的 XID; XID 在微服務調用鏈路的上下文中傳播; RM 向 TC 注冊分支事務,將其納入 XID 對應全局事務的管轄; TM 向 TC 發起針對 XID 的全局提交或回滾決議; TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。
seata-server的安裝與配置
我們先從官網下載seata-server,這里下載的是seata-server-0.9.0.zip,下載地址:seata-server
這里我們使用Nacos作為注冊中心,Nacos的安裝及使用可以參考:Spring Cloud Alibaba:Nacos 作為注冊中心和配置中心使用;
解壓seata-server安裝包到指定目錄,修改conf目錄下的file.conf配置文件,主要修改自定義事務組名稱,事務日志存儲模式為db及數據庫連接信息;
service
{ #vgroup->rgroup vgroup_mapping
. fsp_tx_group
= "default" #修改事務組名稱為:fsp_tx_group,和客戶端自定義的名稱對應
#only support single node default . grouplist
= "127.0.0.1:8091" #degrade current not support enableDegrade
= false
#disable disable
= false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max
. commit
. retry
. timeout
= "-1" max
. rollback
. retry
. timeout
= "-1"
} ## transaction log store
store
{ ## store mode
: file、dbmode
= "db" #修改此處將事務信息存儲到數據庫中## database storedb
{ ## the implement of javax
. sql
. DataSource
, such as
DruidDataSource ( druid
) / BasicDataSource ( dbcp
) etc
. datasource
= "dbcp" ## mysql
/ oracle
/ h2
/ oceanbase etc
. db
- type
= "mysql" driver
- class
- name
= "com.mysql.jdbc.Driver" url
= "jdbc:mysql://localhost:3306/seat-server" #修改數據庫連接地址user
= "root" #修改數據庫用戶名password
= "root" #修改數據庫密碼min
- conn
= 1 max
- conn
= 3 global
. table
= "global_table" branch
. table
= "branch_table" lock
- table
= "lock_table" query
- limit
= 100 }
}
由于我們使用了db模式存儲事務日志,所以我們需要創建一個seat-server數據庫,建表sql在seata-server的/conf/db_store.sql中; 修改conf目錄下的registry.conf配置文件,指明注冊中心為nacos,及修改nacos連接信息即可;
registry
{ # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type
= "nacos" #改為nacosnacos
{ serverAddr
= "localhost:8848" #改為nacos的連接地址namespace
= "" cluster
= "default" }
}
先啟動Nacos,再使用seata-server中/bin/seata-server.bat文件啟動seata-server。
數據庫準備
創建業務數據庫
seat-order:存儲訂單的數據庫; seat-storage:存儲庫存的數據庫; seat-account:存儲賬戶信息的數據庫。
初始化業務表
order表
CREATE TABLE ` order ` ( ` id
` bigint ( 11 ) NOT NULL AUTO_INCREMENT , ` user_id
` bigint ( 11 ) DEFAULT NULL COMMENT '用戶id' , ` product_id
` bigint ( 11 ) DEFAULT NULL COMMENT '產品id' , ` count
` int ( 11 ) DEFAULT NULL COMMENT '數量' , ` money
` decimal ( 11 , 0 ) DEFAULT NULL COMMENT '金額' , PRIMARY KEY ( ` id
` )
) ENGINE = InnoDB AUTO_INCREMENT = 7 DEFAULT CHARSET = utf8
; ALTER TABLE ` order ` ADD COLUMN ` status ` int ( 1 ) DEFAULT NULL COMMENT '訂單狀態:0:創建中;1:已完結' AFTER ` money
` ;
storage表
CREATE TABLE ` storage
` ( ` id
` bigint ( 11 ) NOT NULL AUTO_INCREMENT , ` product_id
` bigint ( 11 ) DEFAULT NULL COMMENT '產品id' , ` total
` int ( 11 ) DEFAULT NULL COMMENT '總庫存' , ` used
` int ( 11 ) DEFAULT NULL COMMENT '已用庫存' , ` residue
` int ( 11 ) DEFAULT NULL COMMENT '剩余庫存' , PRIMARY KEY ( ` id
` )
) ENGINE = InnoDB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8
; INSERT INTO ` seat
- storage
` . ` storage
` ( ` id
` , ` product_id
` , ` total
` , ` used
` , ` residue
` ) VALUES ( '1' , '1' , '100' , '0' , '100' ) ;
account表
CREATE TABLE ` account
` ( ` id
` bigint ( 11 ) NOT NULL AUTO_INCREMENT COMMENT 'id' , ` user_id
` bigint ( 11 ) DEFAULT NULL COMMENT '用戶id' , ` total
` decimal ( 10 , 0 ) DEFAULT NULL COMMENT '總額度' , ` used
` decimal ( 10 , 0 ) DEFAULT NULL COMMENT '已用余額' , ` residue
` decimal ( 10 , 0 ) DEFAULT '0' COMMENT '剩余可用額度' , PRIMARY KEY ( ` id
` )
) ENGINE = InnoDB AUTO_INCREMENT = 2 DEFAULT CHARSET = utf8
; INSERT INTO ` seat
- account
` . ` account
` ( ` id
` , ` user_id
` , ` total
` , ` used
` , ` residue
` ) VALUES ( '1' , '1' , '1000' , '0' , '1000' ) ;
創建日志回滾表
使用Seata還需要在每個數據庫中創建日志表,建表sql在seata-server的/conf/db_undo_log.sql中。
完整數據庫示意圖
制造一個分布式事務問題
這里我們會創建三個服務,一個訂單服務,一個庫存服務,一個賬戶服務。當用戶下單時,會在訂單服務中創建一個訂單,然后通過遠程調用庫存服務來扣減下單商品的庫存,再通過遠程調用賬戶服務來扣減用戶賬戶里面的余額,最后在訂單服務中修改訂單狀態為已完成。該操作跨越三個數據庫,有兩次遠程調用,很明顯會有分布式事務問題。
客戶端配置
spring : cloud : alibaba : seata : tx-service-group : fsp_tx_group
添加并修改file.conf配置文件,主要是修改自定義事務組名稱;
service
{ #vgroup->rgroup vgroup_mapping
. fsp_tx_group
= "default" #修改自定義事務組名稱
#only support single node default . grouplist
= "127.0.0.1:8091" #degrade current not support enableDegrade
= false
#disable disable
= false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max
. commit
. retry
. timeout
= "-1" max
. rollback
. retry
. timeout
= "-1" disableGlobalTransaction
= false
}
添加并修改registry.conf配置文件,主要是將注冊中心改為nacos;
registry
{ # file 、nacos 、eureka、redis、zk type
= "nacos" #修改為nacosnacos
{ serverAddr
= "localhost:8848" #修改為nacos的連接地址namespace
= "" cluster
= "default" }
}
@SpringBootApplication ( exclude
= DataSourceAutoConfiguration
. class )
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderServiceApplication { public static void main ( String
[ ] args
) { SpringApplication
. run ( SeataOrderServiceApplication
. class , args
) ; } }
@Configuration
public class DataSourceProxyConfig { @Value ( "${mybatis.mapperLocations}" ) private String mapperLocations
; @Bean @ConfigurationProperties ( prefix
= "spring.datasource" ) public DataSource
druidDataSource ( ) { return new DruidDataSource ( ) ; } @Bean public DataSourceProxy
dataSourceProxy ( DataSource dataSource
) { return new DataSourceProxy ( dataSource
) ; } @Bean public SqlSessionFactory
sqlSessionFactoryBean ( DataSourceProxy dataSourceProxy
) throws Exception
{ SqlSessionFactoryBean sqlSessionFactoryBean
= new SqlSessionFactoryBean ( ) ; sqlSessionFactoryBean
. setDataSource ( dataSourceProxy
) ; sqlSessionFactoryBean
. setMapperLocations ( new PathMatchingResourcePatternResolver ( ) . getResources ( mapperLocations
) ) ; sqlSessionFactoryBean
. setTransactionFactory ( new SpringManagedTransactionFactory ( ) ) ; return sqlSessionFactoryBean
. getObject ( ) ; } }
使用@GlobalTransactional注解開啟分布式事務:
package com
. macro
. cloud
. service
. impl
; import com
. macro
. cloud
. dao
. OrderDao
;
import com
. macro
. cloud
. domain
. Order
;
import com
. macro
. cloud
. service
. AccountService
;
import com
. macro
. cloud
. service
. OrderService
;
import com
. macro
. cloud
. service
. StorageService
;
import io
. seata
. spring
. annotation
. GlobalTransactional
;
import org
. slf4j
. Logger
;
import org
. slf4j
. LoggerFactory
;
import org
. springframework
. beans
. factory
. annotation
. Autowired
;
import org
. springframework
. stereotype
. Service
;
@Service
public class OrderServiceImpl implements OrderService { private static final Logger LOGGER
= LoggerFactory
. getLogger ( OrderServiceImpl
. class ) ; @Autowired private OrderDao orderDao
; @Autowired private StorageService storageService
; @Autowired private AccountService accountService
; @Override @GlobalTransactional ( name
= "fsp-create-order" , rollbackFor
= Exception
. class ) public void create ( Order order
) { LOGGER
. info ( "------->下單開始" ) ; orderDao
. create ( order
) ; LOGGER
. info ( "------->order-service中扣減庫存開始" ) ; storageService
. decrease ( order
. getProductId ( ) , order
. getCount ( ) ) ; LOGGER
. info ( "------->order-service中扣減庫存結束:{}" , order
. getId ( ) ) ; LOGGER
. info ( "------->order-service中扣減余額開始" ) ; accountService
. decrease ( order
. getUserId ( ) , order
. getMoney ( ) ) ; LOGGER
. info ( "------->order-service中扣減余額結束" ) ; LOGGER
. info ( "------->order-service中修改訂單狀態開始" ) ; orderDao
. update ( order
. getUserId ( ) , 0 ) ; LOGGER
. info ( "------->order-service中修改訂單狀態結束" ) ; LOGGER
. info ( "------->下單結束" ) ; }
}
分布式事務功能演示
@Service
public class AccountServiceImpl implements AccountService { private static final Logger LOGGER
= LoggerFactory
. getLogger ( AccountServiceImpl
. class ) ; @Autowired private AccountDao accountDao
; @Override public void decrease ( Long userId
, BigDecimal money
) { LOGGER
. info ( "------->account-service中扣減賬戶余額開始" ) ; try { Thread
. sleep ( 30 * 1000 ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } accountDao
. decrease ( userId
, money
) ; LOGGER
. info ( "------->account-service中扣減賬戶余額結束" ) ; }
}
此時我們可以發現下單后數據庫數據并沒有任何改變; 我們可以在seata-order-service中注釋掉@GlobalTransactional來看看沒有Seata的分布式事務管理會發生什么情況:
@Service
public class OrderServiceImpl implements OrderService { @Override
public void create ( Order order
) { LOGGER
. info ( "------->下單開始" ) ; LOGGER
. info ( "------->下單結束" ) ; }
}
由于seata-account-service的超時會導致當庫存和賬戶金額扣減后訂單狀態并沒有設置為已經完成,而且由于遠程調用的重試機制,賬戶余額還會被多次扣減。
參考資料
Seata官方文檔:https://github.com/seata/seata/wiki
使用到的模塊
springcloud
- learning
├── seata
- order
- service
├── seata
- storage
- service
└── seata
- account
- service
總結
以上是生活随笔 為你收集整理的使用Seata彻底解决Spring Cloud中的分布式事务问题! 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。