使用rabbitMQ实现数据同步
思路分析
發送方:商品微服務
-
什么時候發?
當商品服務對商品進行寫操作:增、刪、改的時候,需要發送一條消息,通知其它服務。
-
發送什么內容?
對商品的增刪改時其它服務可能需要新的商品數據,但是如果消息內容中包含全部商品信息,數據量太大,而且并不是每個服務都需要全部的信息。因此我們只發送商品id,其它服務可以根據id查詢自己需要的信息。
接收方:搜索微服務、靜態頁微服務
接收消息后如何處理?
-
搜索微服務:
-
增/改:添加新的數據到索引庫
-
刪:刪除索引庫數據
-
-
靜態頁微服務:
-
增/改:創建新的靜態頁
-
刪:刪除原來的靜態頁
-
商品服務發送消息
我們先在商品微服務learn-item-service中實現發送消息。
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置文件
我們在application.yml中添加一些有關RabbitMQ的配置:
spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learntemplate:exchange: learn.item.exchangepublisher-confirms: true-
template:有關AmqpTemplate的配置
-
exchange:缺省的交換機名稱,此處配置后,發送消息如果不指定交換機就會使用這個
-
-
publisher-confirms:生產者確認機制,確保消息會正確發送,如果發送失敗會有錯誤回執,從而觸發重試
改造GoodsService
在GoodsService中封裝一個發送消息到mq的方法:(需要注入AmqpTemplate模板)
private void sendMessage(Long id, String type){// 發送消息try {this.amqpTemplate.convertAndSend("item." + type, id);} catch (Exception e) {logger.error("{}商品消息發送異常,商品id:{}", type, id, e);} }這里沒有指定交換機,因此默認發送到了配置中的:leyou.item.exchange
注意:這里要把所有異常都try起來,不能讓消息的發送影響到正常的業務邏輯
?
然后在新增的時候調用:
修改的時候調用:
搜索服務接收消息
搜索服務接收到消息后要做的事情:
-
增:添加新的數據到索引庫
-
刪:刪除索引庫數據
-
改:修改索引庫數據
因為索引庫的新增和修改方法是合二為一的,因此我們可以將這兩類消息一同處理,刪除另外處理。
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>添加配置
spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn這里只是接收消息而不發送,所以不用配置template相關內容。
編寫監聽器
代碼:
@Component public class GoodsListener {@Autowiredprivate SearchService searchService;/*** 處理insert和update的消息** @param id* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創建或更新索引this.searchService.createIndex(id);}/*** 處理delete的消息** @param id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除索引this.searchService.deleteIndex(id);} }編寫創建和刪除索引方法
這里因為要創建和刪除索引,我們需要在SearchService中拓展兩個方法,創建和刪除索引:
public void createIndex(Long id) throws IOException {Spu spu = this.goodsClient.querySpuById(id);// 構建商品Goods goods = this.buildGoods(spu);// 保存數據到索引庫this.goodsRepository.save(goods); }public void deleteIndex(Long id) {this.goodsRepository.deleteById(id); }創建索引的方法可以從之前導入數據的測試類中拷貝和改造。
?
靜態頁服務接收消息
商品靜態頁服務接收到消息后的處理:
-
增:創建新的靜態頁
-
刪:刪除原來的靜態頁
-
改:創建新的靜態頁并覆蓋原來的
不過,我們編寫的創建靜態頁的方法也具備覆蓋以前頁面的功能,因此:增和改的消息可以放在一個方法中處理,刪除消息放在另一個方法處理。
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>添加配置
spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn這里只是接收消息而不發送,所以不用配置template相關內容。
?
編寫監聽器
代碼:
@Component public class GoodsListener {@Autowiredprivate GoodsHtmlService goodsHtmlService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創建頁面goodsHtmlService.createHtml(id);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除頁面goodsHtmlService.deleteHtml(id);} }添加刪除頁面方法
public void deleteHtml(Long id) {File file = new File("C:\\project\\nginx-1.14.0\\html\\item\\", id + ".html");file.deleteOnExit(); }測試
3.5.1.查看RabbitMQ控制臺
重新啟動項目,并且登錄RabbitMQ管理界面:http://192.168.56.101:15672
可以看到,交換機已經創建出來了:
隊列也已經創建完畢:
并且隊列都已經綁定到交換機:
總結
以上是生活随笔為你收集整理的使用rabbitMQ实现数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: springAMQP
- 下一篇: 数据是否可用校验