谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)
目錄
一、訂單業務模塊
訂單流程
購物車跳轉訂單確認頁
提交訂單接口冪等性
二、分布式事務
- CAP定理
- BASE理論
- 強一致性、弱一致性、最終一致性
- 2PC模式
- 柔性事務-TCC事務補償方案
- 柔性事務-最大努力通知型方案
- 柔性事務-可靠消息+最終一致性方案(異步確認)
三、延時隊列實現定時任務
- 消息丟失
- 消息重復
- 消息積壓
一、訂單業務模塊
概述
- 電商系統涉及到 3 流,分別時信息流,資金流,物流,而訂單系統作為中樞將三者有機的集
合起來。 - 訂單模塊是電商系統的樞紐,在訂單這個環節上需求獲取多個模塊的數據和信息,同時對這
些信息進行加工處理后流向下個環節,這一系列就構成了訂單的信息流通
1.訂單流程
不管類型如何訂單都包括正向流程和逆向流程,對應的場景就是購買商品和退換貨流程,正
向流程就是一個正常的網購步驟:訂單生成–>支付訂單–>賣家發貨–>確認收貨–>交易成功。
而每個步驟的背后,訂單是如何在多系統之間交互流轉的,可概括如下圖
1、訂單創建與支付
(1) 、訂單創建前需要預覽訂單,選擇收貨信息等
(2) 、訂單創建需要鎖定庫存,庫存有才可創建,否則不能創建
(3) 、訂單創建后超時未支付需要解鎖庫存
(4) 、支付成功后,需要進行拆單,根據商品打包方式,所在倉庫,物流等進行拆單
(5) 、支付的每筆流水都需要記錄,以待查賬
(6) 、訂單創建,支付成功等狀態都需要給 MQ 發送消息,方便其他系統感知訂閱
2、逆向流程
(1) 、修改訂單,用戶沒有提交訂單,可以對訂單一些信息進行修改,比如配送信息,
優惠信息,及其他一些訂單可修改范圍的內容,此時只需對數據進行變更即可。
(2) 、訂單取消,用戶主動取消訂單和用戶超時未支付,兩種情況下訂單都會取消訂
單,而超時情況是系統自動關閉訂單,所以在訂單支付的響應機制上面要做支付的
限時處理,尤其是在前面說的下單減庫存的情形下面,可以保證快速的釋放庫存。
另外需要需要處理的是促銷優惠中使用的優惠券,權益等視平臺規則,進行相應補
回給用戶。
(3) 、退款,在待發貨訂單狀態下取消訂單時,分為缺貨退款和用戶申請退款。如果是
全部退款則訂單更新為關閉狀態,若只是做部分退款則訂單仍需進行進行,同時生
成一條退款的售后訂單,走退款流程。退款金額需原路返回用戶的賬戶。
(4) 、發貨后的退款,發生在倉儲貨物配送,在配送過程中商品遺失,用戶拒收,用戶
收貨后對商品不滿意,這樣情況下用戶發起退款的售后訴求后,需要商戶進行退款
的審核,雙方達成一致后,系統更新退款狀態,對訂單進行退款操作,金額原路返
回用戶的賬戶,同時關閉原訂單數據。僅退款情況下暫不考慮倉庫系統變化。如果
發生雙方協調不一致情況下,可以申請平臺客服介入。在退款訂單商戶不處理的情
況下,系統需要做限期判斷,比如 5 天商戶不處理,退款單自動變更同意退款
2.購物車頁跳轉到訂單確認頁
用戶登錄狀態下查看購物車商品信息,點擊去結算,訂單詳情頁需要顯示
- 商品最新價格、優惠信息
- 用戶的基本信息、地址
- 支付方式等
- 訂單總價格
- …
1.登錄攔截器
首先訂單業務都需要登錄狀態,設置一個全局攔截器LoginInterceptor
package henu.soft.xiaosi.order.interceptor;import henu.soft.common.constant.AuthServerConstant; import henu.soft.common.to.MemberResponseTo; import org.springframework.util.AntPathMatcher; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView;import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession;/*** 登錄攔截器,未登錄的用戶不能進入訂單服務*/ public class LoginInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberResponseTo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 獲取登錄狀態HttpSession session = request.getSession();MemberResponseTo memberResponseVo = (MemberResponseTo) session.getAttribute(AuthServerConstant.LOGIN_USER);//登陸了if (memberResponseVo != null) {loginUser.set(memberResponseVo);return true;}else {session.setAttribute("msg","請先登錄!");response.sendRedirect("http://auth.gulishop.cn/login.html");return false;}}@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {} }注冊攔截器
package henu.soft.xiaosi.order.config;import henu.soft.xiaosi.order.interceptor.LoginInterceptor; import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configuration public class MyWebConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).addPathPatterns("/**");} }2.封裝vo
package henu.soft.xiaosi.order.vo;import lombok.Getter; import lombok.Setter;import java.math.BigDecimal; import java.util.List; import java.util.Map;public class OrderConfirmVo {@Getter@Setter/** 會員收獲地址列表 **/private List<MemberAddressVo> memberAddressVos;@Getter @Setter/** 所有選中的購物項 **/private List<OrderItemVo> items;/** 發票記錄 **/@Getter @Setter/** 優惠券(會員積分) **/private Integer integration;/** 防止重復提交的令牌 **/@Getter @Setterprivate String orderToken;@Getter @SetterMap<Long,Boolean> stocks;public Integer getCount() {Integer count = 0;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {count += item.getCount();}}return count;}/** 訂單總額 **///BigDecimal total;//計算訂單總額public BigDecimal getTotal() {BigDecimal totalNum = BigDecimal.ZERO;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {//計算當前商品的總價格BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));//再計算全部商品的總價格totalNum = totalNum.add(itemPrice);}}return totalNum;}/** 應付價格 **///BigDecimal payPrice;public BigDecimal getPayPrice() {return getTotal();} }3.Feign遠程調用丟失請求頭信息
登錄信息保存在分布式session中,瀏覽器的cookie保存這些信息,直接瀏覽器訪問controller會帶上cookie
但是遠程調用的方法創建一個新的request對應的controller不會帶上cookie,解決辦法是加上feign的攔截器
package henu.soft.xiaosi.cart.config;import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;@Configuration public class MyFeignConfig {@Beanpublic RequestInterceptor requestInterceptor() {return new RequestInterceptor() {@Overridepublic void apply(RequestTemplate template) {//1. 使用RequestContextHolder拿到常常請求的請求數據,同一個線程內可以獲取的到ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (requestAttributes != null) {HttpServletRequest request = requestAttributes.getRequest();if (request != null) {//2. 將老請求得到cookie信息放到feign請求上String cookie = request.getHeader("Cookie");template.header("Cookie", cookie);}}}};} }4.Feign遠程異步調用丟失上下文信息
Feign遠程方法調用異步方法,會開啟新的線程,
- 之前是 主線程---> 攔截器--->controller--->service都是一個主線程,可以拿到ThreadLocal線程共享的cookie信息
- 現在是 新的異步線程--->攔截器--->controller--->service 每個異步任務對應一個線程,拿不到主線程的cookie信息
解決辦法
在異步方法內部重新設置 上下文信息 RequestContextHolder.setRequestAttributes(requestAttributes);
3.提交訂單接口冪等性
概念
- 接口冪等性就是用戶對于同一操作發起的一次請求或者多次請求的結果是一致的,不會因
為多次點擊而產生了副作用; - 比如說支付場景,用戶購買了商品支付扣款成功,但是返回結果的時候網絡異常,此時錢已經扣了,用戶再次點擊按鈕,此時會進行第二次扣款,返回結果成功,用戶查詢余額返發現多扣錢了,流水記錄也變成了兩條...,這就沒有保證接口
的冪等性。
防止場景
- 用戶多次點擊按鈕
- 用戶頁面回退再次提交
- 微服務互相調用,由于網絡問題,導致請求失敗。feign 觸發重試機制
- 其他業務情況
冪等情況, 以 SQL 為例,有些操作是天然冪等的。
- SELECT * FROM table WHER id=?,無論執行多少次都不會改變狀態,是天然的冪等。
- UPDATE tab1 SET col1=1 WHERE col2=2,無論執行成功多少次狀態都是一致的,也是冪等操作。
- delete from user where userid=1,多次操作,結果一樣,具備冪等性
- insert into user(userid,name) values(1,'a')如 userid 為唯一主鍵,即重復操作上面的業務,只
會插入一條用戶數據,具備冪等性。 - UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次執行的結果都會發生變化,不是冪等的。
- insert into user(userid,name) values(1,'a')如 userid 不是主鍵,可以重復,那上面業務多次操
作,數據都會新增多條,不具備冪等性。
1.令牌token機制
1、服務端提供了發送 token 的接口。我們在分析業務的時候,哪些業務是存在冪等問題的,
就必須在執行業務前,先去獲取 token,服務器會把 token 保存到 redis 中。
2、然后調用業務接口請求時,把 token 攜帶過去,一般放在請求頭部。
3、服務器判斷 token 是否存在 redis 中,存在表示第一次請求,然后刪除 token,繼續執行業
務。
4、如果判斷 token 不存在 redis 中,就表示是重復操作,直接返回重復標記給 client,這樣
就保證了業務代碼,不被重復執行。
危險性:
1、先刪除 token 還是后刪除 token;
-
先刪除可能導致,業務確實沒有執行,重試還帶上之前 token,由于防重設計導致,
請求還是不能執行。 -
后刪除可能導致,業務處理成功,但是服務閃斷,出現超時,沒有刪除 token,別
人繼續重試,導致業務被執行兩邊 -
我們最好設計為先刪除 token,如果業務調用失敗,就重新獲取 token 再次請求。
2、Token 獲取、比較和刪除必須是原子性
-
redis.get(token) 、token.equals、redis.del(token)如果這兩個操作不是原子,可能導
致,高并發下,都 get 到同樣的數據,判斷都成功,繼續業務并發執行 -
可以在 redis 使用 lua 腳本完成這個操作
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 en
1.準備令牌
// 常量 package henu.soft.common.constant;public class OrderConstant {public static final String USER_ORDER_TOKEN_PREFIX = "order:token"; }//6. 防重令牌String token = UUID.randomUUID().toString().replace("-", "");// 存入redisredisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId(), token, 30, TimeUnit.MINUTES);// 返回給頁面confirmVo.setOrderToken(token); <form action="http://order.gulishop.cn/submitOrder" method="post"><input id="addrInput" type="hidden" name="addrId"/><input id="payPriceInput" type="hidden" name="payPrice"><input name="orderToken" th:value="${confirmOrder.orderToken}" type="hidden"/><button class="tijiao" type="submit">提交訂單</button></form>2.封裝實體
訂單實體
package henu.soft.xiaosi.order.entity;import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName;import java.math.BigDecimal; import java.io.Serializable; import java.util.Date; import lombok.Data;/*** 訂單* * @author xiaosi* @email 2589165806@qq.com* @date 2021-07-22 23:34:48*/ @Data @TableName("oms_order") public class OrderEntity implements Serializable {private static final long serialVersionUID = 1L;/*** id*/@TableIdprivate Long id;/*** member_id*/private Long memberId;/*** 訂單號*/private String orderSn;/*** 使用的優惠券*/private Long couponId;/*** create_time*/private Date createTime;/*** 用戶名*/private String memberUsername;/*** 訂單總額*/private BigDecimal totalAmount;/*** 應付總額*/private BigDecimal payAmount;/*** 運費金額*/private BigDecimal freightAmount;/*** 促銷優化金額(促銷價、滿減、階梯價)*/private BigDecimal promotionAmount;/*** 積分抵扣金額*/private BigDecimal integrationAmount;/*** 優惠券抵扣金額*/private BigDecimal couponAmount;/*** 后臺調整訂單使用的折扣金額*/private BigDecimal discountAmount;/*** 支付方式【1->支付寶;2->微信;3->銀聯; 4->貨到付款;】*/private Integer payType;/*** 訂單來源[0->PC訂單;1->app訂單]*/private Integer sourceType;/*** 訂單狀態【0->待付款;1->待發貨;2->已發貨;3->已完成;4->已關閉;5->無效訂單】*/private Integer status;/*** 物流公司(配送方式)*/private String deliveryCompany;/*** 物流單號*/private String deliverySn;/*** 自動確認時間(天)*/private Integer autoConfirmDay;/*** 可以獲得的積分*/private Integer integration;/*** 可以獲得的成長值*/private Integer growth;/*** 發票類型[0->不開發票;1->電子發票;2->紙質發票]*/private Integer billType;/*** 發票抬頭*/private String billHeader;/*** 發票內容*/private String billContent;/*** 收票人電話*/private String billReceiverPhone;/*** 收票人郵箱*/private String billReceiverEmail;/*** 收貨人姓名*/private String receiverName;/*** 收貨人電話*/private String receiverPhone;/*** 收貨人郵編*/private String receiverPostCode;/*** 省份/直轄市*/private String receiverProvince;/*** 城市*/private String receiverCity;/*** 區*/private String receiverRegion;/*** 詳細地址*/private String receiverDetailAddress;/*** 訂單備注*/private String note;/*** 確認收貨狀態[0->未確認;1->已確認]*/private Integer confirmStatus;/*** 刪除狀態【0->未刪除;1->已刪除】*/private Integer deleteStatus;/*** 下單時使用的積分*/private Integer useIntegration;/*** 支付時間*/private Date paymentTime;/*** 發貨時間*/private Date deliveryTime;/*** 確認收貨時間*/private Date receiveTime;/*** 評價時間*/private Date commentTime;/*** 修改時間*/private Date modifyTime;}封裝的訂單vo
package henu.soft.xiaosi.order.vo;import lombok.Data;import java.math.BigDecimal;@Data public class OrderSubmitVo {/** 收獲地址的id **/private Long addrId;/** 支付方式 **/private Integer payType;//無需提交要購買的商品,去購物車再獲取一遍//優惠、發票/** 防重令牌 **/private String orderToken;/** 應付價格 **/private BigDecimal payPrice;/** 訂單備注 **/private String remarks;//用戶相關的信息,直接去session中取出即可 }3.對應controller
/*** 確認訂單* @param submitVo* @param model* @param attributes* @return*/@RequestMapping("/submitOrder")public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes attributes) {try {SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);Integer code = responseVo.getCode();if (code == 0) {model.addAttribute("order", responseVo.getOrder());return "pay";} else {String msg = "下單失敗;";switch (code) {case 1:msg += "防重令牌校驗失敗";break;case 2:msg += "商品價格發生變化";break;}attributes.addFlashAttribute("msg", msg);return "redirect:http://order.gulishop.cn/toTrade";}} catch (Exception e) {if (e instanceof NoStockException) {String msg = "下單失敗,商品無庫存";attributes.addFlashAttribute("msg", msg);}return "redirect:http://order.gulishop.cn/toTrade";}}4.對應service
步驟
- //1. 驗證令牌,前端傳遞的令牌和redis存儲的令牌對比
- //2. 創建訂單、訂單項
- //3. 驗價
- //4. 保存訂單
- //5. 鎖定庫存(RabbitMQ延時隊列)
數據庫表信息
驗價
鎖庫存
封裝vo
package henu.soft.xiaosi.order.vo;import henu.soft.common.to.OrderItemTo; import lombok.Data;import java.util.List;@Data public class WareSkuLockVo {private String OrderSn;private List<OrderItemTo> locks; } //5. 鎖定庫存List<OrderItemTo> orderItemTos = order.getOrderItems().stream().map((item) -> {OrderItemTo orderItemTo = new OrderItemTo();orderItemTo.setSkuId(item.getSkuId());orderItemTo.setCount(item.getSkuQuantity());return orderItemTo;}).collect(Collectors.toList());WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());lockVo.setLocks(orderItemTos);R r = wareFeignService.orderLockStock(lockVo);//5.1 鎖定庫存成功if (r.getCode() == 0) { // int i = 10 / 0;responseVo.setOrder(order.getOrder());responseVo.setCode(0);//發送消息到訂單延遲隊列,判斷過期訂單rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());//清除購物車記錄BoundHashOperations<String, Object, Object> ops = redisTemplate.boundHashOps(CartConstant.CART_PREFIX + memberResponseTo.getId());for (OrderItemEntity orderItem : order.getOrderItems()) {ops.delete(orderItem.getSkuId().toString());}return responseVo;} else {//5.1 鎖定庫存失敗String msg = (String) r.get("msg");throw new NoStockException(msg);}2.各種鎖機制
1、數據庫悲觀鎖
-
select * from xxxx where id = 1 for update;
-
悲觀鎖使用時一般伴隨事務一起使用,數據鎖定時間可能會很長,需要根據實際情況選用。
另外要注意的是,id 字段一定是主鍵或者唯一索引,不然可能造成鎖表的結果,處理起來會
非常麻煩。
2、數據庫樂觀鎖
-
update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1
-
這種方法適合在更新的場景中,根據 version 版本,也就是在操作庫存前先獲取當前商品的 version 版本號,然后操作的時候帶上此 version 號。我們梳理下,我們第一次操作庫存時,得到 version 為 1,調用庫存服務version 變成了 2;但返回給訂單服務出現了問題,訂單服務又一次發起調用庫存服務,當訂單服務傳如的 version 還是 1,再執行上面的 sql 語句時,就不會執行;因為 version 已經變為 2 了,where 條件就不成立。這樣就保證了不管調用幾次,只會真正的處理一次。
-
樂觀鎖主要使用于處理讀多寫少的問題
3、業務層分布式鎖
- 如果多個機器可能在同一時間同時處理相同的數據,比如多臺機器定時任務都拿到了相同數
據處理,我們就可以加分布式鎖,鎖定此數據,處理完成后釋放鎖。獲取到鎖的必須先判斷
這個數據是否被處理過。
3.各種唯一約束
1、數據庫唯一約束
- 插入數據,應該按照唯一索引進行插入,比如訂單號,相同的訂單就不可能有兩條記錄插入。
我們在數據庫層面防止重復。 - 這個機制是利用了數據庫的主鍵唯一約束的特性,解決了在 insert 場景時冪等問題。但主鍵
的要求不是自增的主鍵,這樣就需要業務生成全局唯一的主鍵。 - 如果是分庫分表場景下,路由規則要保證相同請求下,落地在同一個數據庫和同一表中,要
不然數據庫主鍵約束就不起效果了,因為是不同的數據庫和表主鍵不相關。
2、redis set 防重
- 很多數據需要處理,只能被處理一次,比如我們可以計算數據的 MD5 將其放入 redis 的 set,
每次處理數據,先看這個 MD5 是否已經存在,存在就不處理。
4.防重表
-
使用訂單號 orderNo 做為去重表的唯一索引,把唯一索引插入去重表,再進行業務操作,且
他們在同一個事務中。這個保證了重復請求時,因為去重表有唯一約束,導致請求失敗,避
免了冪等問題。這里要注意的是,去重表和業務表應該在同一庫中,這樣就保證了在同一個
事務,即使業務操作失敗了,也會把去重表的數據回滾。這個很好的保證了數據一致性。 -
之前說的 redis 防重也算
5.全局請求唯一id
- 調用接口時,生成一個唯一 id,redis 將數據保存到集合中(去重),存在即處理過。
- 可以使用 nginx 設置每一個請求的唯一 id;
proxy_set_header X-Request-Id $request_id
二、分布式事務(提交訂單、鎖庫存)
前面提交訂單調用遠程的倉庫服務鎖庫存,加上的事務是 本地事務
- 提交訂單加上,出現異常回滾
- 訂單保存成功,但是鎖庫存失敗,還是回滾
但是由于不確定因素
- 訂單保存成功了,庫存也鎖成功了,但是 由于網絡原因 并未正常完成邏輯,導致訂單保存回滾,但是鎖庫存沒有回滾
- 訂單提交保存訂單之后,還可能調用多個其他的遠程服務,遠程父事務 并不能 很好的管理 子事務
因此本地事務只能控制本地方法的調用,對于遠程調用,因此要求統一的分布式事務管理
1.本地事務
1、事務的基本性質
數據庫事務的幾個特性:原子性(Atomicity )、一致性( Consistency )、隔離性或獨立性( Isolation)
和持久性(Durabilily),簡稱就是 ACID;
- 原子性:一系列的操作整體不可拆分,要么同時成功,要么同時失敗
- 一致性:數據在事務的前后,業務整體一致。 轉賬。A:1000;B:1000; 轉 200 事務成功; A:800 B:1200
- 隔離性:事務之間互相隔離。
- 持久性:一旦事務成功,數據一定會落盤在數據庫
2、事務的隔離級別isolation
- READ UNCOMMITTED(讀未提交)
該隔離級別的事務會讀到其它未提交事務的數據,此現象也稱之為臟讀。 - READ COMMITTED(讀提交)
一個事務可以讀取另一個已提交的事務,多次讀取會造成不一樣的結果,此現象稱為不可重
復讀問題,Oracle 和 SQL Server 的默認隔離級別。 - REPEATABLE READ(可重復讀)
該隔離級別是 MySQL 默認的隔離級別,在同一個事務里,select 的結果是事務開始時時間
點的狀態,因此,同樣的 select 操作讀到的結果會是一致的,但是,會有幻讀現象。MySQL
的 InnoDB 引擎可以通過 next-key locks 機制(參考下文"行鎖的算法"一節)來避免幻讀。 - SERIALIZABLE(序列化)
在該隔離級別下事務都是串行順序執行的,MySQL 數據庫的 InnoDB 引擎會給讀操作隱式
加一把讀共享鎖,從而避免了臟讀、不可重讀復讀和幻讀問題。
3、事務的傳播行為propagation
就加入該事務,該設置是最常用的設置。
前不存在事務,就以非事務執行。
當前不存在事務,就拋出異常。
前事務掛起。
則執行與 PROPAGATION_REQUIRED 類似的操作。
4、SpringBoot 事務關鍵點
- 在同一個類里面,編寫兩個方法,內部調用的時候,會導致事務設置失效。原因是沒有用到
代理對象的緣故。同一個service方法子事務的調用繞過了代理對象,導致直接是方法調用 - 解決:
0)、導入 spring-boot-starter-aop
1)、@EnableTransactionManagement(proxyTargetClass = true)
2)、@EnableAspectJAutoProxy(exposeProxy=true)
3)、AopContext.currentProxy() 調用方
2.分布式事務
1、CAP 定理
CAP 原則又稱 CAP 定理,指的是在一個分布式系統中
- 一致性(Consistency):
在分布式系統中的所有數據備份,在同一時刻是否同樣的值。(等同于所有節點訪
問同一份最新的數據副本) - 可用性(Availability)
在集群中一部分節點故障后,集群整體是否還能響應客戶端的讀寫請求。(對數據
更新具備高可用性) - 分區容錯性(Partition tolerance)
大多數分布式系統都分布在多個子網絡。每個子網絡就叫做一個區(partition)。
分區容錯的意思是,區間通信可能失敗。比如,一臺服務器放在中國,另一臺服務
器放在美國,這就是兩個區,它們之間可能無法通信。
CAP 原則指的是,這三個要素最多只能同時實現兩點,不可能三者兼顧。
-
一般來說,分區容錯無法避免,因此可以認為 CAP 的 P 總是成立。CAP 定理告訴我們,剩下的 C 和 A 無法同時做到。
-
分布式系統中實現一致性的 raft 算法,類似redis的主從復制、哨兵模式
paxos:http://thesecretlivesofdata.com/raft/ -
對于多數大型互聯網應用的場景,主機眾多、部署分散,而且現在的集群規模越來越大,所
以節點故障、網絡故障是常態,而且要保證服務可用性達到 99.99999%(N 個 9),即保證
P 和 A,舍棄 C,即不能保證強一致,但是可以彌補強一致
2.BASE理論
是對 CAP 理論的延伸,思想是即使無法做到強一致性(CAP 的一致性就是強一致性),但
- 以采用適當的采取弱一致性,即最終一致性。
- 即不能保證強一致,但是可以彌補強一致
BASE 是指
- 基本可用(Basically Available)
(1)基本可用是指分布式系統在出現故障的時候,允許損失部分可用性(例如響應時間、
功能上的可用性),允許損失部分可用性。需要注意的是,基本可用絕不等價于系
統不可用。
(2)響應時間上的損失:正常情況下搜索引擎需要在 0.5 秒之內返回給用戶相應的
查詢結果,但由于出現故障(比如系統部分機房發生斷電或斷網故障),查詢
結果的響應時間增加到了 1~2 秒。
(3)功能上的損失:購物網站在購物高峰(如雙十一)時,為了保護系統的穩定性,
部分消費者可能會被引導到一個降級頁面。 - 軟狀態( Soft State)(處于失敗、成功的中間狀態)
軟狀態是指允許系統存在中間狀態,而該中間狀態不會影響系統整體可用性。分布
式存儲中一般一份數據會有多個副本,允許不同副本同步的延時就是軟狀態的體
現。mysql replication 的異步復制也是一種體現。 - 最終一致性( Eventual Consistency)
最終一致性是指系統中的所有數據副本經過一定時間后,最終能夠達到一致的狀
態。弱一致性和強一致性相反,最終一致性是弱一致性的一種特殊情況
3. 強一致性、弱一致性、最終一致性
從客戶端角度,多進程并發訪問時,更新過的數據在不同進程如何獲取的不同策略,決定了
不同的一致性。
-
對于關系型數據庫,要求更新過的數據能被后續的訪問都能看到,這是強一致性。
-
如果能容忍后續的部分或者全部訪問不到,則是弱一致性。(容忍軟件態,容忍彌補一致性的時間操作)
-
如果經過一段時間后要求能訪問到更新后的數據,則是最終一致性
三、分布式事務解決方案
1. 2PC 模式
數據庫支持的 2PC【2 phase commit 二階提交】,又叫做 XA Transactions。MySQL 從 5.5 版本開始支持,SQL Server 2005 開始支持,Oracle 7 開始支持。其中,XA 是一個兩階段提交協議,該協議分為以下兩個階段:
- 第一階段:事務協調器要求每個涉及到事務的數據庫預提交(precommit)此操作,并反映是
否可以提交. - 第二階段:事務協調器要求每個數據庫提交數據。
其中,如果有任何一個數據庫否決此次提交,那么所有數據庫都會被要求回滾它們在此事務
中的那部分信息。
- XA 協議比較簡單,而且一旦商業數據庫實現了 XA 協議,使用分布式事務的成本也比較
低。 - XA 性能不理想,特別是在交易下單鏈路,往往并發量很高,XA 無法滿足高并發場景
- XA 目前在商業數據庫支持的比較理想,在 mysql 數據庫中支持的不太理想,mysql 的XA 實現,沒有記錄 prepare 階段日志,主備切換回導致主庫與備庫數據不一致。
- 許多 nosql 也沒有支持 XA,這讓 XA 的應用場景變得非常狹隘。
- 也有 3PC,引入了超時機制(無論協調者還是參與者,在向對方發送請求后,若長時間
未收到回應則做出相應處理)
2. 柔性事務-TCC 事務補償型方案(遵循BASE原則)
- 剛性事務:遵循 ACID 原則,強一致性。
- 柔性事務:遵循 BASE 理論,最終一致性;與剛性事務不同,柔性事務允許一定時間內,不同節點的數據不一致,但要求最終一致。
- 一階段 prepare 行為:調用 自定義 的 prepare 邏輯。該邏輯為父事務、各個子事務調用自己的try接口方法
- 二階段 commit 行為:調用 自定義 的 commit 邏輯。該邏輯為父事務、各個子事務調用自己的confirm接口方法
- 二階段 rollback 行為:調用 自定義 的 rollback 邏輯。該邏輯為父事務、各個子事務調用自己的cancle接口方法
所謂 TCC 模式,是指支持把 自定義 的分支事務納入到全局事務的管理中(抽取一層)
3.柔性事務-最大努力通知型方案
按規律進行通知,不保證數據一定能通知成功,但會提供可查詢操作接口進行核對。
- 這種方案主要用在與第三方系統通訊時,比如:調用微信或支付寶支付后的支付結果通知。這種方案也是結合 MQ 進行實現,例如:通過 MQ 發送 http 請求,設置最大通知次數。達到通知次數后即不再通知。
- 案例:銀行通知、商戶通知等(各大交易業務平臺間的商戶通知:多次通知、查詢校對、對賬文件),支付寶的支付成功異步回調
- 案例:不斷提醒訂閱MQ的服務父事務執行失敗,直到作出回應(如訂單下失敗解鎖庫存)
4.柔性事務-可靠消息+最終一致性方案(異步確保型)
- 實現:業務處理服務在業務事務提交之前,向實時消息服務請求發送消息,實時消息服務只
記錄消息數據,而不是真正的發送。業務處理服務在業務事務提交之后,向實時消息服務確
認發送。只有在得到確認發送指令后,實時消息服務才會真正發送
四、整合Seata
1.概述
- 官網:http://seata.io/zh-cn/docs/overview/what-is-seata.html
- Seata 是一款開源的分布式事務解決方案,致力于提供高性能和簡單易用的分布式事務服務。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。
- TC:協調全局
- TM:控制整個大的事務
- RM:各個微服務獨立的資源管理器,每一個微服務都需要一個回滾日志表,即使不能回滾也要補償修改的內容
2.建立Seata日志表
每個微服務創建 UNDO_LOG 表,SEATA AT 模式需要 UNDO_LOG 表
-- 注意此處0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;3.安裝事務協調器TC
- 下載地址:https://github.com/seata/seata/releases/tag/v1.2.0
- 解壓配置
配置文件file.conf
啟動nacos和seata,如果報錯參考:報錯2_Seata啟動報錯:Initialization of output ‘file=xxxlogs/seata_gc.log‘ using options ‘(null)‘ failed.
4.整合
- 導入依賴:https://github.com/seata/seata
- 配置DataSourceProxy代理各個微服務的數據源
- 每個微服務的resource需要放入registry.conf、file.conf,并配置分組
- 父事務加注解@GlobalTransactional,子事務不用,測試
5.說明
對于普通業務如后臺管理的業務,分布式事務解決方案可以使用 Seata的 AT 分布式事務管理,也就是上面的操作
但是對于高并發下的場景,這種是不適合的,需要使用 柔性事務-可靠消息 + 最終一致性方案
- 父事務失敗,發送失敗消息給子事務,子事務回滾
- 延時隊列實現定時任務(定時任務太耗費資源),掃描數據庫表保存的鎖庫存記錄,根據訂單狀態判斷,然后將失敗的鎖庫存自動解鎖
五、延時隊列實現定時任務
1.場景分析
定時任務的缺點
- 耗費系統內存、數據庫資源
- 時效性不能保證,30分鐘訂單未支付被關閉可能需要多輪才能被掃描出來
父事務下訂單,子事務1鎖庫存,子事務…
-
不需要分布式事務的場景
- 訂單失敗,未進行到鎖庫存,只需要父事務自動回滾即可。
- 訂單成功,庫存鎖定成功,其他子事務成功。無需回滾
-
需要分布式事務的場景
- 訂單成功,鎖庫存業務也成功,其他遠程子事務失敗,導致訂單回滾,需要自動解鎖庫存。
- 訂單成功,用戶未支付、手動取消,需要自動解鎖庫存
使用RabbitMQ的延時隊列
- 訂單提交之后,先被放到消息隊列,到達指定時間30分鐘后發送給邏輯業務進行數據庫訂單保存
- 訂單提交之后,庫存鎖定成功信息先被放到消息隊列,達到指定時間40分鐘后檢查訂單,訂單不存在的話自動解鎖庫存
- 其實延時隊列就是保證 訂單狀態更新后(已支付、手動取消),判斷庫存鎖定是否邏輯正確,不正確就更正
2.概念
- 消息的TTL(Time To Live)消息的存活時間
- 可以對隊列、消息設置TTL,前者沒有該隊列消費者時消息保留最大時間,后者是該消息沒有消費者是保留最大時間,超過這個時間成為死信
- 如果隊列和消息都設置了,取二者最小的
- 通過消息的expiration字段或者 x-message-ttl屬性來設置時間
死信會進入死信路由(DLX對應多個隊列的路由的Dead Letter Exchage是在普通的路由加上消息轉發機制)
- 被消費者reject拒收的消息,并且參數為requeue為false,即該消息不會被重新放入隊列
- 設置TTL到期的消息
- 隊列長度限制滿了。排在前面的消息被丟棄或者扔到死路由的
使用MQ
- 1、Queue、Exchange、Binding可以@Bean進去
- 2、監聽消息的方法可以有三種參數(不分數量,順序)
- Object content, Message message, Channel channel
- 3、channel可以用來拒絕消息,否則自動ack;
可以控制消息在一段時間變成死信,也可一控制死信轉到對應的交換機,結合二者,實現延時隊列
建議使用隊列過期時間
升級版
-
不需要分布式事務的場景
- 訂單失敗,未進行到鎖庫存,只需要父事務自動回滾即可。
- 訂單成功,庫存鎖定成功,其他子事務成功。無需回滾。
-
需要分布式事務的場景
- 訂單成功,鎖庫存業務也成功,發送庫存鎖定信息到延時隊列,其他遠程子事務失敗,導致訂單回滾,需要自動解鎖庫存。
- 訂單成功,鎖庫存業務也成功,發送庫存鎖定信息到延時隊列,用戶未支付、手動取消,需要自動解鎖庫存
3.代碼實現
1.訂單服務
訂單服務
- 訂單微服務交換機order-event-exchange
- 延時隊列order.delay.queue
- 死信消費隊列order.release.order.queue
- 兩個隊列的綁定
2.庫存服務
庫存服務
- 一個交換機stock-event-exchange
- 普通隊列 stock.release.stock.queue,需要service監聽,有消息證明要回滾
- 延時隊列 stock.delay.stock.queue
- 兩個綁定
保存訂單詳情日志發給mq
// 待鎖的商品、數量、倉庫id信息for (SkuLockVo lockVo : lockVos) {boolean lock = true;Long skuId = lockVo.getSkuId();List<Long> wareIds = lockVo.getWareIds();//如果沒有滿足條件的倉庫,拋出異常if (wareIds == null || wareIds.size() == 0) {throw new NoStockException(skuId);} else {for (Long wareId : wareIds) {// 一個個倉庫的鎖定// 成功返回1Long count = baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);if (count == 0) {lock = false;} else {//1. 鎖定成功,保存訂單詳情WareOrderTaskDetailEntity detailEntity = WareOrderTaskDetailEntity.builder().skuId(skuId).skuName("").skuNum(lockVo.getNum()).taskId(taskEntity.getId()).wareId(wareId).lockStatus(1).build();wareOrderTaskDetailService.save(detailEntity);//2. 發送庫存鎖定消息至延遲隊列StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(taskEntity.getId());StockDetailTo detailTo = new StockDetailTo();try {BeanUtils.copyProperties(detailEntity, detailTo);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);lock = true;break;}}}if (!lock) throw new NoStockException(skuId);}package henu.soft.common.to.mq;import lombok.Data;@Data public class StockDetailTo {private Long id;/*** sku_id*/private Long skuId;/*** sku_name*/private String skuName;/*** 購買個數*/private Integer skuNum;/*** 工作單id*/private Long taskId;/*** 倉庫id*/private Long wareId;/*** 鎖定狀態*/private Integer lockStatus; }3.庫存服務監聽死信隊列
package henu.soft.xiaosi.ware.listener;import com.rabbitmq.client.Channel;import henu.soft.common.to.mq.OrderTo; import henu.soft.common.to.mq.StockLockedTo; import henu.soft.xiaosi.ware.service.WareSkuService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component @RabbitListener(queues = {"stock.release.stock.queue"}) public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;// 庫存回滾@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {log.info("************************收到庫存解鎖的消息********************************");try {wareSkuService.unlock(stockLockedTo);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 重新放到消息隊列,重試機制channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}之前即使訂單失敗,父事務回滾了,但是分布式事務鎖庫存并未回滾,但是保存了
- 成功的庫存鎖定信息到消息隊列(只有庫存鎖定成功才會發消息到延時隊列)
- 數據庫中間表中也保存了訂單、鎖庫存關系(便于回滾)
現在需要庫存服務監聽 用于 實現回滾,即從死信隊列獲取訂單詳情日志信息
- 再次查詢訂單表,無該訂單號,證明父事務已經回滾,說明必須回滾鎖定的庫存,調用unlock()方法再次查詢中間表,回滾庫存
- 再次查詢訂單表,有該訂單號,說明沒有被取消,變成了支付的訂單(因為訂單死信隊列30分鐘會被訂單模塊監聽,若是待付款則訂單號直接被關閉了,此時有訂單號,證明一定是支付過了),則無需回滾
4.訂單服務監聽死信隊列關單
分析
- 下訂單業務成功之后,會被放到延時隊列,30分鐘后進入死信隊列,訂單模塊監聽死信隊列,此時需要先判斷訂單狀態
- 訂單狀態為代付款狀態:需要關閉訂單
- 訂單狀態為已付款、已發貨、已完成、已取消、售后中、售后完成等狀態:不需要關閉訂單
訂單狀態枚舉類
package henu.soft.xiaosi.order.enume;public enum OrderStatusEnume {CREATE_NEW(0,"待付款"),PAYED(1,"已付款"),SENDED(2,"已發貨"),RECIEVED(3,"已完成"),CANCLED(4,"已取消"),SERVICING(5,"售后中"),SERVICED(6,"售后完成");private String msg;private Integer code;public String getMsg() {return msg;}public Integer getCode() {return code;}OrderStatusEnume(Integer code, String msg){this.msg = msg;this.code = code;} }判斷關閉訂單
/*** 收到過期的訂單信息,準備關閉訂單* @param orderEntity*//*** 關閉過期的的訂單* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) throws InvocationTargetException, IllegalAccessException {//因為消息發送過來的訂單已經是很久前的了,中間可能被改動,因此要查詢最新的訂單OrderEntity newOrderEntity = this.getById(orderEntity.getId());//如果訂單還處于新創建的狀態,說明超時未支付,進行關單if (newOrderEntity.getStatus() == OrderStatusEnume.CREATE_NEW.getCode()) {OrderEntity updateOrder = new OrderEntity();updateOrder.setId(newOrderEntity.getId());updateOrder.setStatus(OrderStatusEnume.CANCLED.getCode());this.updateById(updateOrder);//關單后發送消息通知其他服務進行關單相關的操作,如解鎖庫存OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(newOrderEntity,orderTo);rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other",orderTo);}}現在出現的問題是
- 理論上庫存死信隊列的時長比訂單死信隊列的時長要長,即 庫存是否解鎖在 訂單狀態關閉前判斷
- 現在可能出現網絡延遲的原因,導致 庫存服務監聽的死信隊列 先 獲得數據,這時候判斷訂單狀態還存在,就沒解鎖庫存,過了一段時間才收到訂單關閉的信息,這時候庫存就會一直鎖定
解決辦法
- 在訂單模塊在設置一個路由鍵order.release.order.other.# 該路由鍵直接 轉發到 庫存的死信隊列stock.release.stock.queue,庫存服務監聽用于解鎖
- 訂單釋放和庫存釋放直接綁定,也就是當有訂單關閉的時候通知庫存服務,再次判斷是否進行庫存解鎖
訂單服務
庫存服務
5.訂單提交保存
4.如何保證消息的可靠性
1.消息丟失
- 消息發送出去,由于網絡問題沒有抵達服務器
? 做好容錯方法(try-catch),發送消息可能會網絡失敗,失敗后要有重試機制,可記錄到數據庫,采用定期掃描重發的方式
? 做好日志記錄,每個消息狀態是否都被服務器收到都應該記錄
? 做好定期重發,如果消息沒有發送成功,定期去數據庫掃描未成功的消息進行重發 - 消息抵達Broker,Broker要將消息寫入queue、磁盤(持久化)才算成功。此時Broker尚未持久化完成,宕機。 publisher也必須加入確認回調機制,確認成功的消息,修改數據庫消息狀態。
- 自動ACK的狀態下。消費者收到消息,但沒來得及消息然后宕機
? 一定開啟手動ACK,消費成功才移除,失敗或者沒來得及處理就noAck并重新入隊
2.消息重復
- 消息消費成功,事務已經提交,ack時,機器宕機。導致沒有ack成功,Broker的消息重新由unack變為ready,并發送給其他消費者
? 消息消費失敗,由于重試機制,自動又將消息發送出去
? 成功消費,ack時宕機,消息由unack變為ready,Broker又重新發送
? 消費者的業務消費接口應該設計為冪等性的。比如扣庫存有工作單的狀態標志 - 使用防重表(redis/mysql),發送消息每一個都有業務的唯一標識,處理過就不用處理
- rabbitMQ的每一個消息都有redelivered字段,可以獲取是否是被重新投遞過來的,而不是第一次投遞過來的
3.消息積壓
- 消費者宕機積壓
- 消費者消費能力不足積壓
- 發送者發送流量太大
? 上線更多的消費者,進行正常消費
? 上線專門的隊列消費服務,將消息先批量取出來,記錄數據庫,離線慢慢處理
總結
以上是生活随笔為你收集整理的谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 江苏大学885程序设计历年代码题题型整理
- 下一篇: 20考研北理885软工经验贴——初试篇