javascript
手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动
Spring Cloud全集文章目錄:
零、什么是微服務(wù)?一看就會(huì)系列!
一、手把手教你搭建SpringCloud項(xiàng)目(一)圖文詳解,傻瓜式操作
二、手把手教你搭建SpringCloud項(xiàng)目(二)生產(chǎn)者與消費(fèi)者
三、手把手教你搭建SpringCloud項(xiàng)目(三)集成Eureka服務(wù)注冊(cè)中心
四、手把手教你搭建SpringCloud項(xiàng)目(四)Eureka集群版搭建
五、手把手教你搭建SpringCloud項(xiàng)目(五)生產(chǎn)者集群版搭建
六、手把手教你搭建SpringCloud項(xiàng)目(六)Eureka實(shí)現(xiàn)服務(wù)發(fā)現(xiàn)
七、手把手教你搭建SpringCloud項(xiàng)目(七)集成Consul服務(wù)注冊(cè)中心
八、手把手教你搭建SpringCloud項(xiàng)目(八)集成Ribbon負(fù)載均衡器
九、手把手教你搭建SpringCloud項(xiàng)目(九)集成OpenFeign服務(wù)接口調(diào)用
十、手把手教你搭建SpringCloud項(xiàng)目(十)集成Hystrix之服務(wù)降級(jí)
十一、手把手教你搭建SpringCloud項(xiàng)目(十一)集成Hystrix之服務(wù)熔斷
十二、手把手教你搭建SpringCloud項(xiàng)目(十二 )集成Hystrix之圖形化Dashboard實(shí)時(shí)監(jiān)控
十三、手把手教你搭建SpringCloud項(xiàng)目(十三?)集成Gateway新一代網(wǎng)關(guān)
十四、手把手教你搭建SpringCloud項(xiàng)目(十四?)集成Config分布式配置中心
十五、手把手教你搭建SpringCloud項(xiàng)目(十五)集成Bus消息總線
十六、手把手教你搭建SpringCloud項(xiàng)目(十六)集成Stream消息驅(qū)動(dòng)
十七、手把手教你搭建SpringCloud項(xiàng)目(十七)集成Sleuth分布式鏈路跟蹤
文章繼續(xù)更新中,歡迎點(diǎn)贊關(guān)注!
一、消息驅(qū)動(dòng)概述
1. 消息驅(qū)動(dòng)是什么?
在實(shí)際應(yīng)用中有很多消息中間件,比如現(xiàn)在企業(yè)里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學(xué)習(xí)所有這些消息中間件無疑需要大量時(shí)間經(jīng)歷成本,那有沒有一種技術(shù),使我們不再需要關(guān)注具體的消息中間件的細(xì)節(jié),而只需要用一種適配綁定的方式,自動(dòng)的在各種消息中間件內(nèi)切換呢?消息驅(qū)動(dòng)就是這樣的技術(shù),它能屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型。
SpringCloud Stream是一個(gè)構(gòu)件消息驅(qū)動(dòng)微服務(wù)的框架。應(yīng)用程序通過inputs和outputs來與SpringCloud Stream中的綁定器(binder)對(duì)象交互,通過配置來綁定,而SpringCloud Stream的綁定器對(duì)象負(fù)責(zé)與消息中間件交互,所以,我們只需要搞清楚如何與SpringCloud Stream交互就可以方便使用消息驅(qū)動(dòng)的方式。但是截至到目前時(shí)間,SpringCloud Stream目前僅支持RabbitMQ和Kafka。
2. 設(shè)計(jì)思想
在經(jīng)典的消息隊(duì)列中,生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容,消息必須走特定的通道Message Channel,消息通道里的子接口Subscribable Channel消費(fèi)消息,然后MessageHandler負(fù)責(zé)收發(fā)處理。
在SpringCloud Stream中,通過定義綁定器(binder)作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。在消息綁定器中,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者,Stream中的消息通信方式遵循了發(fā)布—訂閱模式:用Topic(主題)進(jìn)行廣播(RabbitMQ中對(duì)應(yīng)于Exchange交換機(jī),Kafka中就是Topic)。
3. SpringCloud Stream編碼API和常用注解
| Middleware | 中間件,目前只支持RabbitMQ和Kaf |
| Binder | Binder是應(yīng)用與消息中間件之間的封裝,目前實(shí)行了RabbitMQ和Kafka的Binder,通過Binder可以很方便的連接中間件,可以動(dòng)態(tài)的改變消息類型(對(duì)應(yīng)于Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實(shí)現(xiàn) |
| @Input | 注解標(biāo)識(shí)輸入通道,通過該輸入通道接收到的消息進(jìn)入應(yīng)用程序 |
| @Output | 注解標(biāo)識(shí)輸出通道,發(fā)布的消息將通過該通道離開應(yīng)用程序 |
| @StreamListner | 監(jiān)聽隊(duì)列,用于消費(fèi)者的隊(duì)列的消息接收 |
| @EnableBinding | 使信道Channel和交換機(jī)/主題(Exchange/Topic)綁定在一起 |
二. SpringCloud Stream 案例實(shí)操
新建三個(gè)子模塊分別對(duì)應(yīng)于消息的生產(chǎn)者和消費(fèi)者:
| cloud-stream-rabbitmq-provider8801 | 生產(chǎn)者,發(fā)送消息模塊 |
| cloud-stream-rabbitmq-consumer8802 | 消費(fèi)者,接收消息模塊 |
| cloud-stream-rabbitmq-consumer8803 | 消費(fèi)者,接收消息模塊 |
1. 消息驅(qū)動(dòng)之消息生產(chǎn)者
新建Module:cloud-stream-rabbitmq-provider8801作為消息的生產(chǎn)者用來發(fā)送消息,在其POM文件中除引入web、actuator、eureka-client等必要啟動(dòng)器外,還需要引入SpringCloud Stream對(duì)應(yīng)實(shí)現(xiàn)RabbitMQ的啟動(dòng)器依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>編寫其配置文件application.yml:
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;defaultRabbit: # 表示定義的名稱,用于于binding整合type: rabbit # 消息組件類型environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服務(wù)的整合處理output: # 這個(gè)名字是一個(gè)通道的名稱,OUTPUT表示這是消息的發(fā)送方destination: testExchange # 表示要使用的Exchange名稱定義content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置eureka:client: # 客戶端進(jìn)行Eureka注冊(cè)的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eurekainstance:lease-renewal-interval-in-seconds: 2 # 設(shè)置心跳的時(shí)間間隔(默認(rèn)是30秒)lease-expiration-duration-in-seconds: 5 # 如果現(xiàn)在超過了5秒的間隔(默認(rèn)是90秒)instance-id: send-8801.com # 在信息列表時(shí)顯示主機(jī)名稱prefer-ip-address: true # 訪問的路徑變?yōu)镮P地址編寫其主啟動(dòng)類后,然后編寫業(yè)務(wù)類,在業(yè)務(wù)來中分別要編寫發(fā)送消息接口及其實(shí)現(xiàn)類,并在發(fā)送接口消息的實(shí)現(xiàn)類中添加**@EnableBinding注解用來綁定消息的推送管道,消息生產(chǎn)者綁定的消息推送管道為org.springframework.cloud.stream.messaging.Source**:
package cn.sher6j.sprincloud.service;/*** 發(fā)送消息接口* @author sher6j* @create 2020-05-25-12:20*/ public interface IMessageProvider {public String send(); } ---------------------------------------------------------------------- package cn.sher6j.sprincloud.service.impl;import cn.sher6j.sprincloud.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import java.util.UUID;/*** 發(fā)送消息接口實(shí)現(xiàn)類* @author sher6j* @create 2020-05-25-12:21*/ @EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output; //消息發(fā)送管道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());//發(fā)送消息System.out.println("========serial:" + serial);return null;} }注意我們?cè)趕ervice的實(shí)現(xiàn)類中不再需要@Service注解,因?yàn)檫@個(gè)service不再是傳統(tǒng)意義上的和Controller、DAO數(shù)據(jù)等進(jìn)行交互的service,而是要綁定綁定器打交道的service。
然后編寫其業(yè)務(wù)層的Controller:
@RestController public class SendMessageController {@Autowiredprivate IMessageProvider messageProvider;@GetMapping("/sendMessage")public String sendMessage() {return messageProvider.send();} }啟動(dòng)服務(wù)注冊(cè)中心后和RabbitMQ后,啟動(dòng)消息生產(chǎn)者微服務(wù),我們?cè)赗abbitMQ的控制面板中可以看見多出了一個(gè)名為testExchange的交換機(jī),這個(gè)交換機(jī)恰恰就是我們之前在配置文件中配置的交換機(jī)名字:
然后我們?cè)L問 http://localhost:8801/sendMessage 使用消息生產(chǎn)者微服務(wù)發(fā)送消息,在其微服務(wù)后臺(tái)我們看到了打印的消息:
在RabbitMQ的控制面板中我們也看到了確實(shí)發(fā)送了消息。
2. 消息驅(qū)動(dòng)之消息消費(fèi)者
新建Module:cloud-stream-rabbitmq-consumer8802/8803作為消息的生產(chǎn)者用來接收消息,其POM文件中引入的啟動(dòng)器依賴和消息生產(chǎn)者微服務(wù)的依賴幾乎相同,然后編寫其配置文件application.yml,其配置文件的書寫和消息生產(chǎn)者的幾乎一致,特別需要注意的是,消息生產(chǎn)者微服務(wù)用到的通道為OUTPUT,而消息消費(fèi)者微服務(wù)用到的通道為INPUT,其他的配置文件信息就只需要注意端口號(hào)、注冊(cè)服務(wù)名的區(qū)別即可:
spring:cloud:bindings: input: # 這個(gè)名字是一個(gè)通道的名稱,INPUT表示消息消費(fèi)者編寫完主啟動(dòng)類,編寫消息消費(fèi)者的業(yè)務(wù)類,由于是消費(fèi)者,所以只需要編寫其Controller即可,在其Controller上同樣需要添加**@EnableBinding注解用來綁定消息的推送管道,消息消費(fèi)者綁定的消息推送管道為import org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中需要使用@StreamListner**注解來監(jiān)聽其綁定的消息推送管道:
package cn.sher6j.springcloud.controller;import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component;/*** @author sher6j* @create 2020-05-25-12:58*/ @Component @EnableBinding(Sink.class) public class ReceiveMessageController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消費(fèi)者" + serverPort + "號(hào),收到消息:" + message.getPayload());} }然后啟動(dòng)消息發(fā)送消費(fèi)者服務(wù),用生產(chǎn)者發(fā)送消息,我們可以發(fā)現(xiàn)在消費(fèi)者端可以成功接收到消息:
3. 分組消費(fèi)和持久化
1. 重復(fù)消費(fèi)問題
當(dāng)生產(chǎn)者發(fā)送消息后,此時(shí)的我們的消費(fèi)者都接受了消息并進(jìn)行了消費(fèi),也就是說同一條消息被多個(gè)消息消費(fèi)者所消費(fèi):
上述的問題就是消息的重復(fù)消費(fèi)問題,那么這個(gè)問題為什么如此重要呢?其實(shí)重復(fù)消費(fèi)這個(gè)問題本身不可怕,可怕的是沒考慮到重復(fù)消費(fèi)之后,怎么保證冪等性。(冪等性,通俗的說,就一個(gè)數(shù)據(jù),或者一個(gè)請(qǐng)求,重復(fù)很多次,需要確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò))。分布式微服務(wù)應(yīng)用為了實(shí)現(xiàn)高可用和負(fù)載均衡,實(shí)際上同一功能的服務(wù)都會(huì)部署多個(gè)具體的服務(wù)實(shí)例。舉個(gè)例子,假設(shè)有一個(gè)系統(tǒng),有一條消息要求往數(shù)據(jù)庫里插入一條數(shù)據(jù),要是這個(gè)消息重復(fù)消費(fèi)兩次,結(jié)果就是向數(shù)據(jù)庫里插入了兩條數(shù)據(jù),這樣數(shù)據(jù)就錯(cuò)了,就違背了冪等性原則,但是要是該消息消費(fèi)到第二次的時(shí)候,可以判斷一下已經(jīng)消費(fèi)過了,然后直接將該消息丟棄,這就實(shí)現(xiàn)了只插入一條數(shù)據(jù),一條消息重復(fù)出現(xiàn)了兩次,但是只有第一次真正被消費(fèi)了,數(shù)據(jù)庫里也就只插入了一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。
上面簡(jiǎn)單的介紹了消息的重復(fù)消費(fèi)問題,那如何解決這種重復(fù)消費(fèi)問題呢,那就需要我們進(jìn)行分組和持久化屬性組操作,利用SpringCloud Stream中的消息分組來解決這個(gè)問題,需要注意的是在Stream中處于同一組中的多個(gè)消息消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,也就是保證生產(chǎn)者所發(fā)送的同一個(gè)消息只會(huì)被其中一個(gè)消費(fèi)者消費(fèi)一次。在不同組的消費(fèi)者是是可以對(duì)消息進(jìn)行全面消費(fèi)(重復(fù)消費(fèi))的,只有同一組內(nèi)才會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系。
在RabbitMQ中,默認(rèn)分組是不同的,組流水號(hào)不一樣,被認(rèn)為不同組,我們查看testExchange交換機(jī),可以發(fā)現(xiàn)8802和8803兩個(gè)消息消費(fèi)者處于不同的組,所以8801消息生產(chǎn)者發(fā)送的消息可以被這兩個(gè)消費(fèi)者重復(fù)消費(fèi):
2. 分組解決重復(fù)消費(fèi)問題
上面在RabbitMQ控制面板中我們看到的組流水號(hào)是系統(tǒng)隨機(jī)分配的,這樣無疑不好控制,所以我們應(yīng)該自定義配置分組,將8802/8803兩個(gè)消息消費(fèi)者微服務(wù)分為同一個(gè)組,以此來解決消息的重復(fù)消費(fèi)問題,先來演示如何自定義分組。
在8802/8803微服務(wù)中的配置文件中分別添加組名屬性:
spring:cloud:stream:bindings:input:group: A/B ## 分組名稱這里我們將8802設(shè)置為A組,8803設(shè)置為B組,然后我們將消息消費(fèi)方的兩個(gè)微服務(wù)重啟,我們?cè)俅尾榭雌浣M流水號(hào),發(fā)現(xiàn)不再是長(zhǎng)長(zhǎng)的隨機(jī)組流水號(hào),而變成了我們自定義的分組:
此時(shí)由于8802/8803位于兩個(gè)不同分組下,所以沒有競(jìng)爭(zhēng)關(guān)系,消息生產(chǎn)者發(fā)送消息后,仍然可以重復(fù)消費(fèi)。
下面我們將這兩個(gè)消息消費(fèi)方微服務(wù)分到相同的消費(fèi)組中,這樣每次就只有一個(gè)消費(fèi)者,消息生產(chǎn)者發(fā)送的消息只能被8802或8803其中一個(gè)接受到,這樣就避免了重復(fù)消費(fèi),將8802和8803的分組名都改為A,再次重啟兩個(gè)消息消費(fèi)方微服務(wù),此時(shí)我們可以看到在分組A下已經(jīng)有了兩個(gè)消費(fèi)者:
?
再用生產(chǎn)者發(fā)送5條消息,我們發(fā)現(xiàn)8802/8803分別消費(fèi)了3條和2條不同的消息,而沒有出現(xiàn)重復(fù)消費(fèi)的問題:
由于時(shí)間原因就轉(zhuǎn)發(fā)的其他小伙伴六甲橫寶的文章,點(diǎn)擊查看原文!
我們Spring Cloud Stream到這里就學(xué)習(xí)完畢了,下一篇文章我們學(xué)習(xí)Spring Cloud Stream分布式鏈路跟蹤,持續(xù)學(xué)習(xí),持續(xù)更新,下一節(jié)更精彩!歡迎朋友們點(diǎn)贊關(guān)注!感謝!
總結(jié)
以上是生活随笔為你收集整理的手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TCP/IP协议栈之LwIP(四)---
- 下一篇: 光线传感器使用