基于可靠消息方案的分布式事务(四):接入Lottor服务
在上一篇文章中,通過Lottor Sample介紹了快速體驗分布式事務Lottor。本文將會介紹如何將微服務中的生產方和消費方服務接入Lottor。
場景描述
- 生產方:User服務
- 消費方:Auth服務
- 事務管理方:Lottor Server
Lottor-Samples中的場景為:客戶端調用User服務創建一個用戶,用戶服務的user表中增加了一條用戶記錄。除此之外,還會調用Auth服務創建該用戶對應的角色和權限信息。
我們通過上面的請求流程圖入手,介紹接入Lottor服務。當您啟動好docker-compose中的組件時,會創建好兩個服務對應的user和auth數據庫。其中User和Auth服務所需要的初始化數據已經準備好,放在各自的classpath下,服務在啟動時會自動初始化數據庫,所需要的預置數據(如角色、權限信息)也放在sql文件中。
Lottor客戶端API
Lottor Client中提供了一個ExternalNettyService接口,用以發送三類消息到Lottor Server:
- 預提交消息
- 確認提交消息
- 消費完成消息
預發送#preSend的入參為預提交的消息列表,一個生產者可能有對應的多個消費者;確認提交#postSend的入參為生產方本地事務執行的狀態,如果失敗,第二個參數記錄異常信息;#consumedSend為消費方消費成功的發送的異步消息,第一個入參為其接收到的事務消息,第二個為消費的狀態。
事務消息TransactionMsg
public class TransactionMsg implements Serializable {/*** 用于消息的追溯*/private String groupId;/*** 事務消息id*/private String subTaskId;/*** 源服務,即調用發起方*/private String source;/*** 目標方服務*/private String target;/*** 執行的方法,適配成枚舉*/private String method;/*** 參數,即要傳遞的內容,可以為null*/private Object args;/*** 創建時間*/private Long createTime = Timestamp.valueOf(DateUtils.getCurrentDateTime()).getTime();/*** 操作結果信息*/private String message;/*** 更新時間*/private Long updateTime;/*** 是否消費,默認為否** {@linkplain com.blueskykong.lottor.common.enums.ConsumedStatus}*/private int consumed = ConsumedStatus.UNCONSUMED.getStatus();... } 復制代碼在構建事務消息時,事務消息id、源服務、目標服務、目標方法和目標方法的傳參args都是必不可少的。消費方消費完之后,將會設置consumed的狀態,出現異常將會設置異常message信息。
生產方-User服務
創建用戶時,需要創建對應的角色。生產方接入分為三步:
- 發送預提交消息
- 執行本地事務
- 發送確認提交的消息
引入依賴
首先,需要引入Lottor客戶端的依賴:
<dependency><groupId>com.blueskykong</groupId><artifactId>lottor-starter</artifactId><version>2.0.0-SNAPSHOT</version></dependency> 復制代碼發起調用
在UserService中定義了創建用戶的方法,我們需要在執行本地事務之前,構造事務消息并預發送到Lottor Server(對應流程圖中的步驟1)。如果遇到預發送失敗,則直接停止本地事務的執行。如果本地事務執行成功(對應步驟3),則發送confirm消息,否則發送回滾消息到Lottor Server(對應步驟4)。
public class UserServiceImpl implements UserService {private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);//注入ExternalNettyServiceprivate ExternalNettyService nettyService;private UserMapper userMapper;public Boolean createUser(UserEntity userEntity, StateEnum flag) {UserRoleDTO userRoleDTO = new UserRoleDTO(RoleEnum.ADMIN, userEntity.getId());//構造消費方的TransactionMsgTransactionMsg transactionMsg = new TransactionMsg.Builder().setSource(ServiceNameEnum.TEST_USER.getServiceName()).setTarget(ServiceNameEnum.TEST_AUTH.getServiceName()).setMethod(MethodNameEnum.AUTH_ROLE.getMethod()).setSubTaskId(IdWorkerUtils.getInstance().createUUID()).setArgs(userRoleDTO).build();if (flag == StateEnum.CONSUME_FAIL) {userRoleDTO.setUserId(null);transactionMsg.setArgs(userRoleDTO);}//發送預處理消息if (!nettyService.preSend(Collections.singletonList(transactionMsg))) {return false;//預發送失敗,本地事務停止執行}//local transaction本地事務try {LOGGER.debug("執行本地事務!");if (flag != StateEnum.PRODUCE_FAIL) {userMapper.saveUser(userEntity);} else {userMapper.saveUserFailure(userEntity);}} catch (Exception e) {//本地事務異常,發送回滾消息nettyService.postSend(false, e.getMessage());LOGGER.error("執行本地事務失敗,cause is 【{}】", e.getLocalizedMessage());return false;}//發送確認消息nettyService.postSend(true, null);return true;}} 復制代碼代碼如上所示,實現不是很復雜。本地事務執行前,必然已經成功發送了預提交消息,當本地事務執行成功,Lottor Client將會記錄本地事務執行的狀態,避免異步發送的確認消息的丟失,便于后續的Lottor Server回查。
配置文件
lottor: enabled: true core: cache: true cache-type: redis tx-redis-config: host-name: localhost port: 6379 serializer: kryo netty-serializer: kryo tx-manager-id: lottorspring: datasource: url: jdbc:mysql://localhost:3306/user?autoReconnect=true&useSSL=false continue-on-error: false initialize: true max-active: 50 max-idle: 10 max-wait: 10000 min-evictable-idle-time-millis: 60000 min-idle: 8 name: dbcp1 test-on-borrow: false test-on-return: false test-while-idle: false time-between-eviction-runs-millis: 5000 username: root password: _123456_schema[0]: classpath:/user.sql 復制代碼如上為User服務的部分配置文件,lottor.enabled: true開啟Lottor 客戶端服務。cache 開啟本地緩存記錄。cache-type指定了本地事務記錄的緩存方式,可以為redis或者MongoDB。serializer為序列化和反序列化方式。tx-manager-id為對應的Lottor Server的服務名。
Lottor Server
多個微服務的接入,對Lottor Server其實沒什么侵入性。這里需要注意的是,TransactionMsg中設置的source和target字段來源于lottor-common中的com.blueskykong.lottor.common.enums.ServiceNameEnum:
public enum ServiceNameEnum {TEST_USER("user", "tx-user"),TEST_AUTH("auth", "tx-auth");//服務名String serviceName;//消息中間件的topicString topic;... } 復制代碼消息中間件的topic是在服務名的基礎上,加上tx-前綴。消費方在設置訂閱的topic時,需要按照這樣的規則命名。Lottor Server完成的步驟為上面流程圖中的2(成功收到預提交消息)和5(發送事務消息到指定的消費方),除此之外,還會定時輪詢異常狀態的事務組和事務消息。
消費方-Auth服務
引入依賴
<dependency><groupId>com.blueskykong</groupId><artifactId>lottor-starter</artifactId><version>2.0.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency> 復制代碼引入了Lottor客戶端starter,spring-cloud-stream用于消費方接收來自Lottor Server的事務消息。
topic監聽
({TestSink.class}) public class ListenerStream extends InitStreamHandler {private static final Logger LOGGER = LoggerFactory.getLogger(ListenerStream.class);private RoleUserService roleUserService;public ListenerStream(ExternalNettyService nettyService, ObjectSerializer objectSerializer) {super(nettyService, objectSerializer);}(TestSink.INPUT)public void processSMS(Message message) {//解析接收到的TransactionMsgprocess(init(message));}public void process(TransactionMsg message) {try {if (Objects.nonNull(message)) {LOGGER.info("===============consume notification message: =======================" + message.toString());if (StringUtils.isNotBlank(message.getMethod())) {MethodNameEnum method = MethodNameEnum.fromString(message.getMethod());LOGGER.info(message.getMethod());//根據目標方法進行處理,因為一個服務可以對應多個生產方,有多個目標方法switch (method) {case AUTH_ROLE:UserRoleDTO userRoleDTO = (UserRoleDTO) message.getArgs();RoleEntity roleEntity = roleUserService.getRole(userRoleDTO.getRoleEnum().getName());String roleId = "";if (Objects.nonNull(roleEntity)) {roleId = roleEntity.getId();}roleUserService.saveRoleUser(new UserRole(UUID.randomUUID().toString(), userRoleDTO.getUserId(), roleId));LOGGER.info("matched case {}", MethodNameEnum.AUTH_ROLE);break;default:LOGGER.warn("no matched consumer case!");message.setMessage("no matched consumer case!");nettyService.consumedSend(message, false);return;}}}} catch (Exception e) {//處理異常,發送消費失敗的消息LOGGER.error(e.getLocalizedMessage());message.setMessage(e.getLocalizedMessage());nettyService.consumedSend(message, false);return;}//成功消費nettyService.consumedSend(message, true);return;} } 復制代碼消費方監聽指定的topic(如上實現中,為test-input中指定的topic,spring-cloud-stream更加簡便調用的接口),解析接收到的TransactionMsg。根據目標方法進行處理,因為一個服務可以對應多個生產方,有多個目標方法。執行本地事務時,Auth會根據TransactionMsg中提供的args作為入參執行指定的方法(對應步驟7),最后向Lottor Server發送消費的結果(對應步驟8)。
配置文件
spring: cloud: stream: bindings: test-input: group: testGroup content-type: application/x-java-object;type=com.blueskykong.lottor.common.entity.TransactionMsgAdapter destination: tx-auth binder: rabbit1 binders: rabbit1: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / lottor: enabled: true core: cache: true cache-type: redis tx-redis-config: host-name: localhost port: 6379 serializer: kryo netty-serializer: kryo tx-manager-id: lottor 復制代碼配置和User服務的差別在于增加了spring-cloud-stream的配置,配置了rabbitmq的相關信息,監聽的topic為tx-auth。
小結
本文主要通過User和Auth的示例服務講解了如何接入Lottor客戶端。生產方構造涉及的事務消息,首先預發送事務消息到Lottor Server,成功預提交之后便執行本地事務;本地事務執行完則異步發送確認消息(可能成功,也可能失敗)。Lottor Server根據接收到的確認消息決定是否將對應的事務組消息發送到對應的消費方。Lottor Server還會定時輪詢異常狀態的事務組和事務消息,以防因為異步的確認消息發送失敗。消費方收到事務消息之后,將會根據目標方法執行對應的處理操作,最后將消費結果異步回寫到Lottor Server。
推薦閱讀
基于可靠消息方案的分布式事務
Lottor項目地址:https://github.com/keets2012/Lottor
訂閱最新文章,歡迎關注我的公眾號
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的基于可靠消息方案的分布式事务(四):接入Lottor服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TiDB 源码阅读系列文章(十五)Sor
- 下一篇: OWASP TOP 10