當(dāng)前位置:
首頁(yè) >
前端技术
> javascript
>内容正文
javascript
(需求实战_终章) SpringBoot2.x 整合RabbitMQ
生活随笔
收集整理的這篇文章主要介紹了
(需求实战_终章) SpringBoot2.x 整合RabbitMQ
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 1. maven依賴(lài)
- 2. MainConfig
- 3. application.properties
- 4. 發(fā)送字符串 生產(chǎn)者
- 5. 發(fā)送對(duì)象 生產(chǎn)者
- 6. 接收字符串客戶(hù)端
- 7. 接收對(duì)象客戶(hù)端
- 8.confirem 確認(rèn)機(jī)制
- 9. return確認(rèn)機(jī)制
- 10. MQ消息發(fā)送工具類(lèi)封裝
- 11. 分布式id
- 12. 時(shí)間工具類(lèi)
- 13. 對(duì)象
1. maven依賴(lài)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2. MainConfig
package com.gblfy.springboot.config;import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration;@Configuration @ComponentScan({"com.gblfy.springboot.*"}) public class MainConfig { }3. application.properties
#應(yīng)用端口信息 server.port=80 #RabbitMQ 連接信息 #IP地址 spring.rabbitmq.addresses=127.0.0.1 #RabbitMQ 端口 spring.rabbitmq.port=5672 #用戶(hù)名 spring.rabbitmq.username=admin #密碼 spring.rabbitmq.password=admin #虛擬主機(jī) spring.rabbitmq.virtual-host=/admin #連接超時(shí)時(shí)間 spring.rabbitmq.connection-timeout=15000spring.profiles.active=devapplication-dev.properties
#服務(wù)端 RabbitMQ 配置 #消息發(fā)送至交換機(jī)消息確認(rèn)模式 是否確認(rèn)回調(diào) spring.rabbitmq.publisher-confirms=true #消息發(fā)送至交換機(jī)消息確認(rèn)模式 是否確認(rèn)消息返回回調(diào) spring.rabbitmq.publisher-returns=true #消息手工簽收 spring.rabbitmq.template.mandatory=true#消費(fèi)端 RabbitMQ 配置 #手動(dòng)簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual #指定最小的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.concurrency=5 #指定最大的消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 #接收字符串類(lèi)型MQ消息spring.rabbitmq.listener.str.queue.name=queue-1 spring.rabbitmq.listener.str.queue.durable=true spring.rabbitmq.listener.str.exchange.name=exchange-1 spring.rabbitmq.listener.str.exchange.durable=true spring.rabbitmq.listener.str.exchange.type=topic spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.str.key=cus-str.##接收object類(lèi)型MQ消息 spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-2 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=cus-obj.#YML
#----------------------------服務(wù)端(公有)配置---------------------------- spring:rabbitmq:addresses: 192.168.0.XXX #RabbitMQ服務(wù)端地址username: admin #用戶(hù)名password: admin #密碼port: 5672 #端口virtual-host: /admin #虛擬主機(jī)connection-timeout: 15000 #超時(shí)時(shí)間 #----------------------------生產(chǎn)端端配置----------------------------publisher-confirm-type: correlated #確認(rèn)消息已發(fā)送至交換機(jī),選擇交換類(lèi)型為交互publisher-returns: true #在消息沒(méi)有被路由到指定的queue時(shí)將消息返回,而不是丟棄template:mandatory: true #是否手動(dòng)簽收l(shuí)istener:simple:acknowledge-mode: manual #手動(dòng)簽收concurrency: 5 #默認(rèn)線程數(shù)max-concurrency: 10 #最大線程數(shù) #----------------------------消費(fèi)端配置---------------------------- #----------------------------對(duì)象類(lèi)型監(jiān)聽(tīng)----------------------------order:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-2 #交換機(jī)名稱(chēng)type: topic #消息類(lèi)型key: cmiip-obj.# #消息路由key的路由規(guī)則queue:durable: true #是否持久化name: queue-2 #隊(duì)列名稱(chēng) #----------------------------字符串類(lèi)型監(jiān)聽(tīng)----------------------------str:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-1 #交換機(jī)名稱(chēng)type: topic #消息類(lèi)型key: cmiip-str.# #消息路由key的路由規(guī)則queue:durable: true #是否持久化name: queue-1 #隊(duì)列名稱(chēng)4. 發(fā)送字符串 生產(chǎn)者
package com.gblfy.springboot.controller;import com.gblfy.springboot.utils.MQSendMsgUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class MQSendStrMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 發(fā)送字符串類(lèi)型消息** @param exchangeName 交換機(jī)名稱(chēng)* @param queueRouterKey 路由key* @param msg 報(bào)文* @return*/@GetMapping("/mQSendStrMsg")public String mQSendStrMsg(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey,@RequestParam(value = "msg") String msg) {mqSendMsgUtils.snedStrMQMsg(exchangeName, queueRouterKey, msg);return "發(fā)送字符串消息成功!";}//測(cè)試連接:http://localhost/mQSendStrMsg?exchangeName=exchange-1&queueRouterKey=cus&msg=測(cè)試2 }5. 發(fā)送對(duì)象 生產(chǎn)者
package com.gblfy.springboot.controller;import com.gblfy.springboot.entity.Order; import com.gblfy.springboot.utils.MQSendMsgUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class MQSendObjMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;@GetMapping("/mQSendObjMsg")public String mQSendStrMsg2(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey) {//模擬發(fā)送order對(duì)象Order order = new Order().builder().reqXml("我是請(qǐng)求報(bào)文").serviceName("接口名稱(chēng)").resXml("我是響應(yīng)報(bào)文").build();//模擬接口描述String serviceName = "TJHL";String queueDesc = "紐約理賠發(fā)送退單接口";//模擬接口類(lèi)型String queueType = "WEBSERVICE";//調(diào)用MQ工具類(lèi)發(fā)送消息mqSendMsgUtils.snedObjMqMsg(exchangeName, order, queueRouterKey, serviceName, queueDesc, queueType);return "發(fā)送對(duì)象消息成功!";}//測(cè)試連接:http://localhost/mQSendObjMsg?exchangeName=exchange-2&queueRouterKey=cus }6. 接收字符串客戶(hù)端
package com.gblfy.springboot.conusmer;import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;@Component public class CusStrQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 實(shí)時(shí)定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收字符串類(lèi)型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str.queue.name}",durable = "${spring.rabbitmq.listener.str.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str.exchange.name}",durable = "${spring.rabbitmq.listener.str.exchange.durable}",type = "${spring.rabbitmq.listener.str.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 創(chuàng)建一個(gè)消費(fèi)端軌跡表來(lái)存儲(chǔ)消息的軌跡數(shù)據(jù)String jsonMsg = new String(message.getBody());log.info("響應(yīng)報(bào)文 mResXml: {}", jsonMsg);// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者channel.basicQos(1);// 反饋消息的消費(fèi)狀態(tài)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//TODO 保存數(shù)據(jù)到數(shù)據(jù)庫(kù)} }7. 接收對(duì)象客戶(hù)端
package com.gblfy.springboot.conusmer;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.gblfy.springboot.entity.Order; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.util.Map;@Component public class CusObjQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 實(shí)時(shí)定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收對(duì)象類(lèi)型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable = "${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable = "${spring.rabbitmq.listener.order.exchange.durable}",type = "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 創(chuàng)建一個(gè)消費(fèi)端軌跡表來(lái)存儲(chǔ)消息的軌跡數(shù)據(jù)String jsonMsg = new String(message.getBody());// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者channel.basicQos(1);// 反饋消息的消費(fèi)狀態(tài)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//通過(guò) 判斷路由routingKey是否等于trace相同即可//fastjson解析MQ接收的json字符串 轉(zhuǎn)換成RequestInfo對(duì)象JSONObject jsonObject = JSON.parseObject(jsonMsg);Order orderInfo = JSON.toJavaObject(jsonObject, Order.class);log.info("接口名稱(chēng) serviceName: {}", orderInfo.getServiceName());log.info("請(qǐng)求報(bào)文 mReqXml: {}", orderInfo.getReqXml());log.info("響應(yīng)報(bào)文 mResXml: {}", orderInfo.getResXml());MessageProperties messageProperties = message.getMessageProperties();log.info("交換機(jī)名稱(chēng) : {}", messageProperties.getReceivedExchange());log.info("路由key名稱(chēng) : {}", messageProperties.getReceivedRoutingKey());log.info("內(nèi)容類(lèi)型 : {}", messageProperties.getContentType());log.info("內(nèi)容編碼 : {}", messageProperties.getContentEncoding());log.info("標(biāo)簽 : {}", messageProperties.getDeliveryTag());// 2. 接收接口信息Map<String, Object> headers = message.getMessageProperties().getHeaders();log.info("隊(duì)列唯一標(biāo)識(shí)ID: {}", headers.get("QUEUE_MSG_ID"));log.info("隊(duì)列名稱(chēng): {}", headers.get("QUEUE_NAME"));log.info("隊(duì)列類(lèi)型: {}", headers.get("QUEUE_TYPE"));log.info("隊(duì)列描述: {}", headers.get("QUEUE_DESC"));//TODO 保存數(shù)據(jù)到數(shù)據(jù)庫(kù)} }8.confirem 確認(rèn)機(jī)制
package com.gblfy.springboot.confirms;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component;@Component("confirmCallback") public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志輸出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生產(chǎn)者消息發(fā)送成功與失敗確認(rèn)機(jī)制* <p>* 1. ack* true : 標(biāo)志生產(chǎn)者將消息發(fā)出成功* false: 標(biāo)志生產(chǎn)者將消息發(fā)出失敗* 2. ack :true 意味著消息發(fā)送成功 有2種場(chǎng)景* 第一種:生產(chǎn)者將消息成功發(fā)送到指定隊(duì)列中,等待消費(fèi)者消費(fèi)消息* 第兩種:生產(chǎn)者將消息發(fā)送成功,但是,由于無(wú)法路由到指定的消息* 隊(duì)列,這種場(chǎng)景的消息,會(huì)被return機(jī)制監(jiān)聽(tīng)到,后續(xù)進(jìn)行* 補(bǔ)償機(jī)制,做消息補(bǔ)發(fā)處理* </p>** @param correlationData 隊(duì)列消息的唯一標(biāo)識(shí)ID,消息做補(bǔ)償機(jī)制會(huì)用到* @param ack ack 消息是否發(fā)送成功的標(biāo)識(shí)* @param cause 消息發(fā)送失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息隊(duì)列標(biāo)識(shí)ID: {}", correlationData.getId());log.info("發(fā)送消息狀態(tài): {}", ack);//TODO 消息發(fā)送交換機(jī)成功 保存軌跡記錄if (!ack) {//TODO 消息發(fā)送交換機(jī)失敗 保存軌跡記錄log.info("異常處理....");}} } /*** !ack 場(chǎng)景結(jié)果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 異常處理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失敗原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/9. return確認(rèn)機(jī)制
package com.gblfy.springboot.returns;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;@Component("returnCallback") public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 實(shí)時(shí)定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息無(wú)法路由 觸發(fā)消息 return機(jī)制* <p></p>* 1. 消費(fèi)者在消息沒(méi)有被路由到合適隊(duì)列情況下會(huì)被return監(jiān)聽(tīng),而不會(huì)自動(dòng)刪除* 2. 會(huì)監(jiān)聽(tīng)到生產(chǎn)者發(fā)送消息的關(guān)鍵信息* 3. 根據(jù)關(guān)鍵信息,后續(xù)進(jìn)行補(bǔ)償機(jī)制,做消息補(bǔ)發(fā)處理* </p>** @param message 消息實(shí)體* @param replyCode 應(yīng)答碼312* @param replyText NO_ROUTE* @param exchange 交換機(jī)* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息發(fā)送的指定交換機(jī): {}", exchange);log.info("隊(duì)列路由的routingKey: {}", routingKey);log.info("隊(duì)列的響應(yīng)碼replyCode: {}", replyCode);log.info("隊(duì)列的響應(yīng)信息: {}", replyText);//TODO 消息發(fā)送交換機(jī)成功 路由失敗 保存軌跡記錄} } /*** 場(chǎng)景結(jié)果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已經(jīng)被ack成功*/10. MQ消息發(fā)送工具類(lèi)封裝
package com.gblfy.springboot.utils;import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.springboot.confirms.ConfirmCallBackListener; import com.gblfy.springboot.consts.MQPrefixConst; import com.gblfy.springboot.entity.Order; import com.gblfy.springboot.returns.ReturnCallBackListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** MQ發(fā)送 不同類(lèi)型消息 公用工具類(lèi)* <p>* MQ發(fā)送消息模式采用 訂閱模式(topic)中的通配符模式* order.* 區(qū)配一個(gè)詞* order.# 區(qū)配一個(gè)或者多個(gè)詞* <p>** @author gblfy* @date 2020-04-16*/ @Component public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);//引入json工具類(lèi)private static final ObjectMapper MAPPER = new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitTemplate;//注入發(fā)送消息模板@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 發(fā)送MQ STRING類(lèi)型消息 第1種** @param exchangeName 指定交換機(jī)名稱(chēng)* @param type 路由routingKey* @param msg MQ STRING類(lèi)型消息*/public void snedStrMQMsg(String exchangeName, String type, String msg) {try {/*** CorrelationData 說(shuō)明:* 1. correlationId 作為生產(chǎn)端和消息綁定消息隊(duì)列全局唯一標(biāo)識(shí)* 2. 當(dāng)生產(chǎn)端發(fā)送的消息無(wú)法路由到指定的消息隊(duì)列時(shí),此種場(chǎng)* 景的消息會(huì)被生產(chǎn)端會(huì)return確認(rèn)機(jī)制監(jiān)聽(tīng)到,對(duì)消息做補(bǔ)* 償機(jī)制處理*///通過(guò)雪花算法生成全局唯一ID,用于消息發(fā)送失敗,后期做消息補(bǔ)償處理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// Confirm 消息確認(rèn)策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息確認(rèn)策略rabbitTemplate.setReturnCallback(returnCallback);//發(fā)送消息到MQ的交換機(jī),通知其他系統(tǒng)rabbitTemplate.convertAndSend(exchangeName, MQPrefixConst.CUS_MQ_STR_PRE + type, msg.getBytes(), correlationId);} catch (Exception e) {e.printStackTrace();}}public void snedObjMqMsg(String exchangeName, Order order, String queueRouteKey, String queueName, String queueDesc, String queueType) {try {/*** CorrelationData 說(shuō)明:* 1. correlationId 作為生產(chǎn)端和消息綁定消息隊(duì)列全局唯一標(biāo)識(shí)* 2. 當(dāng)生產(chǎn)端發(fā)送的消息無(wú)法路由到指定的消息隊(duì)列時(shí),此種場(chǎng)* 景的消息會(huì)被生產(chǎn)端會(huì)return確認(rèn)機(jī)制監(jiān)聽(tīng)到,對(duì)消息做補(bǔ)* 償機(jī)制處理*/// Confirm 消息確認(rèn)策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息確認(rèn)策略rabbitTemplate.setReturnCallback(returnCallback);//1.對(duì)象處理String jsonStrObj = MAPPER.writeValueAsString(order);// 2. 通過(guò)雪花算法生成全局唯一ID,用于消息發(fā)送失敗,后期做消息補(bǔ)償處理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// MQ 添加額外參數(shù)設(shè)置 用于定位該消息屬于什么接口Message message = addExtraParameters(jsonStrObj, correlationId, queueName, queueDesc, queueType);// 3.發(fā)送數(shù)據(jù)消息到指定的MQ交換機(jī),通知其他系統(tǒng)rabbitTemplate.convertAndSend(exchangeName,MQPrefixConst.CUS_MQ_OBJ_PRE + queueRouteKey, message, correlationId);} catch (Exception e) {e.printStackTrace();}}/*** MQ 添加額外參數(shù)設(shè)置** @param jsonStrObj json處理前的數(shù)據(jù)對(duì)象* @param queueDesc 隊(duì)列描述* @param queueType 隊(duì)列類(lèi)型* @return*/public Message addExtraParameters(String jsonStrObj, CorrelationData correlationId, String queueName, String queueDesc, String queueType) {MessageProperties messageProperties = new MessageProperties();//這里注意一定要修改contentType為 application/jsonmessageProperties.setContentType("application/json");messageProperties.setContentEncoding("UTF-8");messageProperties.getHeaders().put("QUEUE_NAME", queueName);messageProperties.getHeaders().put("QUEUE_DESC", queueDesc);messageProperties.getHeaders().put("QUEUE_TYPE", queueType);messageProperties.getHeaders().put("QUEUE_MSG_ID", correlationId.getId());messageProperties.getHeaders().put("SEND_DATE", MQTimeUtils.CURRENT_DATE_TIME);Message message = new Message(jsonStrObj.getBytes(), messageProperties);return message;} }11. 分布式id
package com.gblfy.springboot.utils;import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils;import java.net.Inet4Address; import java.net.UnknownHostException;/*** Twitter_Snowflake<br>* SnowFlake的結(jié)構(gòu)如下(每部分用-分開(kāi)):<br>* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>* 1位標(biāo)識(shí),由于long基本類(lèi)型在Java中是帶符號(hào)的,最高位是符號(hào)位,正數(shù)是0,負(fù)數(shù)是1,所以id一般是正數(shù),最高位是0<br>* 41位時(shí)間截(毫秒級(jí)),注意,41位時(shí)間截不是存儲(chǔ)當(dāng)前時(shí)間的時(shí)間截,而是存儲(chǔ)時(shí)間截的差值(當(dāng)前時(shí)間截 - 開(kāi)始時(shí)間截)* 得到的值),這里的的開(kāi)始時(shí)間截,一般是我們的id生成器開(kāi)始使用的時(shí)間,由我們程序來(lái)指定的(如下下面程序IdWorker類(lèi)的startTime屬性)。41位的時(shí)間截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>* 10位的數(shù)據(jù)機(jī)器位,可以部署在1024個(gè)節(jié)點(diǎn),包括5位datacenterId和5位workerId<br>* 12位序列,毫秒內(nèi)的計(jì)數(shù),12位的計(jì)數(shù)順序號(hào)支持每個(gè)節(jié)點(diǎn)每毫秒(同一機(jī)器,同一時(shí)間截)產(chǎn)生4096個(gè)ID序號(hào)<br>* 加起來(lái)剛好64位,為一個(gè)Long型。<br>* SnowFlake的優(yōu)點(diǎn)是,整體上按照時(shí)間自增排序,并且整個(gè)分布式系統(tǒng)內(nèi)不會(huì)產(chǎn)生ID碰撞(由數(shù)據(jù)中心ID和機(jī)器ID作區(qū)分),并且效率較高,經(jīng)測(cè)試,SnowFlake每秒能夠產(chǎn)生26萬(wàn)ID左右。*/ public class SnowflakeIdWorker {// ==============================Fields===========================================/*** 開(kāi)始時(shí)間截 (2015-01-01)*/private final long twepoch = 1489111610226L;/*** 機(jī)器id所占的位數(shù)*/private final long workerIdBits = 5L;/*** 數(shù)據(jù)標(biāo)識(shí)id所占的位數(shù)*/private final long dataCenterIdBits = 5L;/*** 支持的最大機(jī)器id,結(jié)果是31 (這個(gè)移位算法可以很快的計(jì)算出幾位二進(jìn)制數(shù)所能表示的最大十進(jìn)制數(shù))*/private final long maxWorkerId = -1L ^ (-1L << workerIdBits);/*** 支持的最大數(shù)據(jù)標(biāo)識(shí)id,結(jié)果是31*/private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);/*** 序列在id中占的位數(shù)*/private final long sequenceBits = 12L;/*** 機(jī)器ID向左移12位*/private final long workerIdShift = sequenceBits;/*** 數(shù)據(jù)標(biāo)識(shí)id向左移17位(12+5)*/private final long dataCenterIdShift = sequenceBits + workerIdBits;/*** 時(shí)間截向左移22位(5+5+12)*/private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 生成序列的掩碼,這里為4095 (0b111111111111=0xfff=4095)*/private final long sequenceMask = -1L ^ (-1L << sequenceBits);/*** 工作機(jī)器ID(0~31)*/private long workerId;/*** 數(shù)據(jù)中心ID(0~31)*/private long dataCenterId;/*** 毫秒內(nèi)序列(0~4095)*/private long sequence = 0L;/*** 上次生成ID的時(shí)間截*/private long lastTimestamp = -1L;private static SnowflakeIdWorker idWorker;static {idWorker = new SnowflakeIdWorker(getWorkId(), getDataCenterId());}//==============================Constructors=====================================/*** 構(gòu)造函數(shù)** @param workerId 工作ID (0~31)* @param dataCenterId 數(shù)據(jù)中心ID (0~31)*/public SnowflakeIdWorker(long workerId, long dataCenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));}if (dataCenterId > maxDataCenterId || dataCenterId < 0) {throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));}this.workerId = workerId;this.dataCenterId = dataCenterId;}// ==============================Methods==========================================/*** 獲得下一個(gè)ID (該方法是線程安全的)** @return SnowflakeId*/public synchronized long nextId() {long timestamp = timeGen();//如果當(dāng)前時(shí)間小于上一次ID生成的時(shí)間戳,說(shuō)明系統(tǒng)時(shí)鐘回退過(guò)這個(gè)時(shí)候應(yīng)當(dāng)拋出異常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}//如果是同一時(shí)間生成的,則進(jìn)行毫秒內(nèi)序列if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;//毫秒內(nèi)序列溢出if (sequence == 0) {//阻塞到下一個(gè)毫秒,獲得新的時(shí)間戳timestamp = tilNextMillis(lastTimestamp);}}//時(shí)間戳改變,毫秒內(nèi)序列重置else {sequence = 0L;}//上次生成ID的時(shí)間截lastTimestamp = timestamp;//移位并通過(guò)或運(yùn)算拼到一起組成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift)| (dataCenterId << dataCenterIdShift)| (workerId << workerIdShift)| sequence;}/*** 阻塞到下一個(gè)毫秒,直到獲得新的時(shí)間戳** @param lastTimestamp 上次生成ID的時(shí)間截* @return 當(dāng)前時(shí)間戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 返回以毫秒為單位的當(dāng)前時(shí)間** @return 當(dāng)前時(shí)間(毫秒)*/protected long timeGen() {return System.currentTimeMillis();}private static Long getWorkId() {try {String hostAddress = Inet4Address.getLocalHost().getHostAddress();int[] ints = StringUtils.toCodePoints(hostAddress);int sums = 0;for (int b : ints) {sums += b;}return (long) (sums % 32);} catch (UnknownHostException e) {// 如果獲取失敗,則使用隨機(jī)數(shù)備用return RandomUtils.nextLong(0, 31);}}private static Long getDataCenterId() {int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());int sums = 0;for (int i : ints) {sums += i;}return (long) (sums % 32);}/*** 靜態(tài)工具類(lèi)** @return*/public static Long generateId() {long id = idWorker.nextId();return id;}//==============================Test=============================================/*** 測(cè)試*/public static void main(String[] args) {System.out.println(System.currentTimeMillis());long startTime = System.nanoTime();for (int i = 0; i < 50000; i++) {long id = SnowflakeIdWorker.generateId();System.out.println(id);}System.out.println((System.nanoTime() - startTime) / 1000000 + "ms");} }12. 時(shí)間工具類(lèi)
package com.gblfy.springboot.utils;import org.springframework.stereotype.Component;import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date;@Component public class MQTimeUtils {//格式化時(shí)間 日期格式public static final DateFormat DATE_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss");// 日期格式//當(dāng)前日期+時(shí)間 用于定位 消息隊(duì)列服務(wù)端和和生產(chǎn)發(fā)送消息時(shí)間 確認(rèn)什么類(lèi)型的什么接口public static final String CURRENT_DATE_TIME = DATE_TIME_FORMAT.format(new Date());/*** 獲取當(dāng)前日期 類(lèi)型Date*/public static Date getCurrentDate() {Date currentDate = null;try {currentDate = DATE_FORMAT.parse(DATE_FORMAT.format(new Date()));} catch (ParseException e) {e.printStackTrace();}return currentDate;}/*** 獲取當(dāng)前日期 類(lèi)型String*/public static String getCurrentDateToStr() {String currentDateToStr = null;try {currentDateToStr = DATE_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentDateToStr;}/*** 獲取當(dāng)前時(shí)間 類(lèi)型String*/public static String getCurrenTimeToStr() {String currentTimeToStr = null;try {currentTimeToStr = TIME_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentTimeToStr;}public static void main(String[] args) {System.out.println(MQTimeUtils.getCurrentDate());System.out.println(MQTimeUtils.getCurrentDateToStr());} }13. 對(duì)象
package com.gblfy.springboot.entity;import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;@Data @AllArgsConstructor @NoArgsConstructor @Builder public class Order implements Serializable {private String serviceName;private String reqXml;private String resXml; }總結(jié)
以上是生活随笔為你收集整理的(需求实战_终章) SpringBoot2.x 整合RabbitMQ的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: linux下删除目录及其子目录下某种类型
- 下一篇: PMP考试必看的答题技巧分享