Direct交换器-编写消费者
生活随笔
收集整理的這篇文章主要介紹了
Direct交换器-编写消费者
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們已經把環境搭建好了,我們先從Consumer開始寫起,看一下如何編寫Consumer,回到我們的代碼當中,然后我們來看一下我們的代碼,這個代碼是我們copy過來的,QueueConfig我們當時是用來創建一個隊列,那么其實這個隊列其實并不是我們每次啟動都需要去new一個,這個是我們第一個入門案例,給大家講RabbitMQ的時候,創建隊列提到的一個Queue,但是在這個案例當中,你不new,我們RabbitMQ也會幫你去創建隊列,對于這個隊列來講就不需要了,這個QueueConfig去掉,然后我們還有Sender,Sender我們也不要了,因為我們現在是接受方,所以把這兩個去掉,我們留一個Consumer和啟動類,我們再來看一下,然后注意看我們這個圖,他現在應該有兩個處理消息的服務,我們用一個項目來模擬兩個服務,那我這里是不是得有兩個類來去接收不同的消息,所以這塊我還得創建一個Receive,咱們先改個名,這個Receiver叫什么呢,先叫InfoReceiver,然后我們再去拷貝一個叫ErrorReceiver,我們先寫好再去拷貝,這樣我們的代碼修改量就少一些,首先先來看InfoReceiver里的代碼,我們之前在寫這個案例的時候呢,是在方法上加了一個@RabbitListener的注解,然后一個queues的一個名稱,這樣當你的隊列里有消息進來了,這個注解主要是配置隊列的一個監聽,一旦有消息進來了,我們標記有@RabbitListener的方法,就會自動的從調用這個方法給傳遞過來,然后這個方法對這個消息做一個操作,這是我們之前寫的案例,但是我們現在寫的這個案例呢,這塊要做一個改造,改造的原因是什么呢,是這樣的,其實這個@RabbitListener我告訴大家,他不僅僅可以定義在方法上,他還可以定義在class上,定義在class上表示什么含義呢,就是在這個類下,有某個方法,能夠從消息隊列當中去獲取消息,那么至于是哪個方法呢,因為一個類下可能會有很多方法,如果你把它定義到類上了,你在你類下的方法當中,就是能夠去處理消息的方法上,還得加一個注解,這個注解叫什么呢,叫@RabbitHandler,那么你在哪個方法上加上這個注解了,表示未來某個隊列有變化了,那么他就會調用這個方法,來消費消息,來把消息發送過來,我們這個方法處理這個消息就可以了,然后再來看,由于我們現在用的是direct的交換器,所以我們現在對于@RabbitListener這一塊,需要一個更復雜的一個設置了,而不是原來的,因為什么呢,原來我們就直接一個隊列,就一個hello-queue,但是我們現在這個圖里,是有兩個queue的,到底按照哪個交換器去處理信息,這些我們是不是都得給他去做一個配置,沒錯是這樣的,所以這一塊我們得把queues去掉了,這里我們要配什么呢,大家注意,這里配置起來內容可能會有點多,不要緊,我們按照他的格式去做就可以了,首先在@RabbitListener注解當中呢,還有一個屬性,這個屬性叫什么呢,叫bindings,這個binds屬性表示什么呢,我們先說一下,這個屬性是@RabbitListener注解下的一個屬性,這個屬性是什么呢,表示綁定隊列,也就是說,我這個消費者去哪個隊列里去取消息,你得告訴我他的位置,那么我們的綁定其實就是,告訴我們的消費者,去哪兒取那個消息,這也是完成一個綁定的,那么這個綁定都綁什么呢,首先來看第一個,我們先看@QueueBinding的注解,注意啊,接下來我們做的所有的配置,用的都是@QueueBinding下的屬性來完成的,這里先跟大家說一下,我們現在在@RabbitListener里就用到了他的一個屬性,他的一個屬性就叫bindings,接下來我們所有的配置,都是基于@QueueBinding注解去完成的,所以這里他有好幾個屬性,所以這這塊別搞混了,這個注解就是來完成一些綁定隊列詳細的配置,我們來看這里有什么屬性呢,第一個這里有一個value屬性,這個value屬性是配置什么呢,就是需要我們指定queue的名字,叫什么,他在綁定之前需要知道你的queue叫什么名字,那么這塊怎么給呢,還是通過一個叫queue的注解,然后這里也有一個value,這個value表示的就是,給你當前的queue,起個名字,那么我們現在在配置文件里,是不是給queue起好名了,注意看我們現在配置的是不是InfoReceiver,所以這里是不是應該把info的隊列名字給他,說白了就是這個key對應的這個value,那如果你就直接這么拿過來給他,那么我們就放到配置文件里就沒有意義了,我們目的不就是通過key給value,這樣以后我們以后改隊列的名稱,通過配置文件修改就更方便一些,所以這樣就可以防止硬編碼的問題,這塊我肯定不能給他,我應是給這個key,那這個key我怎么給呢,也是同樣的道理,通過${},就可以把這個信息取出來,這是第一個,對于queue的屬性當中,value隊列的名稱,這個屬性表示的是配置隊列名稱,然后在queue當中還有一個屬性,叫autoDelete,他要的是一個布爾類型,這個屬性是什么意思呢,就是表示你當前的隊列,是不是一個臨時隊列,是否是一個可刪除的臨時隊列,這個我們就是一個臨時的隊列,你不會都不會有問題,這是Queue下的兩個屬性,然后咱們再來看,我們現在除了用到value屬性以外,我們還要用到@QueueBinding里的另一個屬性,第二個屬性是誰呢,是exchange,大家注意,我們現在用的屬性都是@QueueBinding,屬性全都是@QueueBinding里的,包括value和exchange都是他的,那么exchange里都是配置什么呢,就是配置交換器的,這個屬性是配置交換器,那么怎么配置交換器呢,我們也得用一個注解,這個注解就叫@Exchange,然后在這個注解當中呢,也有一個value,這個value指的是什么呢,為交換器起個名字,就是你的交換器得有個名字,然后這個名字我們是不是已經起好了,就是配置文件里的log.direct,那同樣我們把這個key拿過來,然后放到value當中,然后在exchange當中呢,他有了一個value以外,他還有一個屬性,叫type,這個type屬性是配置什么的呢,指定具體的交換器類型,就是你必須要指定你的交換器是什么類型的,那么怎么指定的呢,有一個常量類,叫ExchangeTypes,在這里羅列出了當前所支持的交換器類型,我們選Direct就可以了,這樣我們就完成了一個在消費者這一塊,隊列和交換器的一個綁定,也就是我們看圖,完成了這一塊,但是現在還少一個路由鍵,所以我們還得配置一個路由鍵,再來一個屬性,這個屬性叫什么呢,叫key,這個屬性也是@QueueBinding下的,然后這個路由鍵是什么呢,這個就是我們配置文件里的這個路由鍵,所以我們把info的路由鍵拿過來,這樣就搞定了,這會配置完畢以后,其實對于InfoReceiver消費Receiver來講,就知道去哪個交換器下,所綁定的哪個隊列,根據什么樣的路由規則,去取消息,這樣就可以取出想要的消息了,就這樣的一個過程,這樣對于info的代碼就寫完了然后在ErrorReceiver里面,我們要把info的改成error就可以了,就是我們下面這個,第一個看隊列名稱,我們的key叫error,不就是他mq.config.queue.error,然后還有一個路由key,也是把info改成error就可以了,其他的都不用動,對于error級別的我們也是用的發布訂閱器
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-direct-consumer</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-direct-consumerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
mq.config.exchange=log.direct
mq.config.queue.info=log.info
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error=log.error
mq.config.queue.error.routing.key=log.error.routing.key
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.info.routing.key}"))
public class InfoReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Info........receiver: "+msg);}
}
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.error.routing.key}"))
public class ErrorReceiver {/*** 接收消息的方法。采用消息隊列監聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Error..........receiver: "+msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDirectConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitDirectConsumerApplication.class, args);}
}
?
總結
以上是生活随笔為你收集整理的Direct交换器-编写消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RaDirect交换器-搭建环境
- 下一篇: Direct交换器-编写生产者