RabbitMQ(mall学习)
延遲隊列
業務場景說明
用于解決用戶下單以后,訂單超時如何取消訂單的問題。
用戶進行下單操作(會有鎖定商品庫存、使用優惠券、積分一系列的操作);
生成訂單,獲取訂單的id;
獲取到設置的訂單超時時間(假設設置的為60分鐘不支付取消訂單);
按訂單超時時間發送一個延遲消息給RabbitMQ,讓它在訂單超時后觸發取消訂單的操作;
如果用戶沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返還優惠券、返回積分一系列操作)。
在pom.xml中添加相關依賴
<!--消息隊列相關依賴--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--lombok依賴--> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional> </dependency>修改SpringBoot配置文件
spring:rabbitmq:host: localhost # rabbitmq的連接地址port: 5672 # rabbitmq的連接端口號virtual-host: /mall # rabbitmq的虛擬hostusername: mall # rabbitmq的用戶名password: mall # rabbitmq的密碼publisher-confirms: true #如果對異步消息需要回調必須設置為true添加消息隊列的枚舉配置類QueueEnum
用于延遲消息隊列及處理取消訂單消息隊列的常量定義,包括交換機名稱、隊列名稱、路由鍵名稱。
package com.macro.mall.tiny.dto;import lombok.Getter;/*** 消息隊列枚舉配置* Created by macro on 2018/9/14.*/ @Getter public enum QueueEnum {/*** 消息通知隊列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl隊列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交換名稱*/private String exchange;/*** 隊列名稱*/private String name;/*** 路由鍵*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;} }添加RabbitMQ的配置
用于配置交換機、隊列及隊列與交換機的綁定關系。
package com.macro.mall.tiny.config;import com.macro.mall.tiny.dto.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 消息隊列配置*/ @Configuration public class RabbitMqConfig {/*** 訂單消息實際消費隊列所綁定的交換機* 此處可以選擇交換機的類型 direct、fanout、topic*/@BeanDirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單延遲隊列隊列所綁定的交換機*/@BeanDirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單實際消費隊列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 訂單延遲隊列(死信隊列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后轉發的交換機.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后轉發的路由鍵.build();}/*** 將訂單隊列綁定到交換機*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 將訂單延遲隊列綁定到交換機*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}交換機及隊列說明
mall.order.direct(取消訂單消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發過來,會發送到此隊列。
mall.order.direct.ttl(訂單延遲消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發送過來,會轉發到此隊列,并在此隊列保存一定時間,等到超時后會自動將消息發送到mall.order.cancel(取消訂單消息消費隊列)。
添加延遲消息的發送者CancelOrderSender
用于向訂單延遲消息隊列(mall.order.cancel.ttl)里發送消息。
package com.macro.mall.tiny.component;import com.macro.mall.tiny.dto.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** 取消訂單消息的發出者*/ @Component public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//給延遲隊列發送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//給消息設置延遲毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);} }添加取消訂單消息的接收者CancelOrderReceiver
用于從取消訂單的消息隊列(mall.order.cancel)里接收消息。
package com.macro.mall.tiny.component;import com.macro.mall.tiny.service.OmsPortalOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** 取消訂單消息的處理者* Created by macro on 2018/9/14.*/ @Component @RabbitListener(queues = "mall.order.cancel") public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);} }添加OmsPortalOrderService接口
package com.macro.mall.tiny.service;import com.macro.mall.tiny.common.api.CommonResult; import com.macro.mall.tiny.dto.OrderParam; import org.springframework.transaction.annotation.Transactional;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/ public interface OmsPortalOrderService {/*** 根據提交信息生成訂單*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消單個超時訂單*/@Transactionalvoid cancelOrder(Long orderId); }添加OmsPortalOrderService的實現類OmsPortalOrderServiceImpl
package com.macro.mall.tiny.service.impl;import com.macro.mall.tiny.common.api.CommonResult; import com.macro.mall.tiny.component.CancelOrderSender; import com.macro.mall.tiny.dto.OrderParam; import com.macro.mall.tiny.service.OmsPortalOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/ @Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 執行一系類下單操作,具體參考mall項目LOGGER.info("process generateOrder");//下單完成后開啟一個延遲消息,用于當用戶沒有付款時取消訂單(orderId應該在下單后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下單成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 執行一系類取消訂單操作,具體參考mall項目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//獲取訂單超時時間,假設為60分鐘long delayTimes = 30 * 1000;//發送延遲消息cancelOrderSender.sendMessage(orderId, delayTimes);}}添加OmsPortalOrderController定義接口
package com.macro.mall.tiny.controller;import com.macro.mall.tiny.dto.OrderParam; import com.macro.mall.tiny.service.OmsPortalOrderService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody;/*** 訂單管理Controller*/ @Controller @Api(tags = "OmsPortalOrderController", description = "訂單管理") @RequestMapping("/order") public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根據購物車信息生成訂單")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);} }Rabbitmq 插件實現延遲隊列
RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行。
在官網上下載 https://www.rabbitmq.com/community-plugins.html,下載
rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。
進入 RabbitMQ 的安裝目錄下的 plgins 目錄,執行下面命令讓該插件生效,然后重啟 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
添加RabbitMQ的配置
用于配置交換機、隊列及隊列與交換機的綁定關系。
@Configuration public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//自定義交換機 我們在這里定義的是一個延遲交換機@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();//自定義交換機的類型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchangedelayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }消息生產者代碼
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData ->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(" 當 前 時 間 : {}, 發送一條延遲 {} 毫秒的信息給隊列 delayed.queue:{}", newDate(),delayTime, message);}消息消費者代碼
public static final String DELAYED_QUEUE_NAME = "delayed.queue"; @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("當前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg);}發起請求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
總結
以上是生活随笔為你收集整理的RabbitMQ(mall学习)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021十款最佳性价比高的电脑配置?
- 下一篇: Redis(案例一:注册登录-图形验证码