RabbitMQ消息持久化处理
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ消息持久化处理
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們來看一下RabbitMQ的消息處理,我們先來看第一個知識點,關于RabbitMQ持久化的消息處理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息的可靠性的呢,就是靠他的消息持久化,那么什么是消息持久化處理呢,在這里我給大家做一個簡單的介紹,比如我現在的消息提供者,不停的向我的消息消費者發(fā)送消息,在某一個時間段內,或者在某一個時間點上,突然我的消息消費者宕機了,然后經過搶修以后,這個服務又立即啟動了,那么就在宕機和啟動的時間軸上,這個時候的消息,時間段所發(fā)出的消息,那么對于消費者來講,對于消息消費者來講,他是否還能夠接收得到這個消息了呢,答案肯定是可以接收的到的,最主要是依賴于Rabbit消息持久化的技術,接下來我們就通過編寫這樣一個案例,來展示消息持久化的一個體現
首先我們去創(chuàng)建一個Provider,創(chuàng)建一個Consumer,我們就拿direct交換器來作為案例交換器,你用fanout,topic都可以,我就那direct,我就先去做一個Provider,在這里我們給他起一個名字,在項目名當中,我們加一個durable,表示是一個持久的,rabbitmq-durable-direct-provider然后我們再去創(chuàng)建一個Consumerrabbitmq-durable-direct-consumer兩個項目創(chuàng)建好以后呢,我們先去修改一下他的pom文件,在pom文件當中呢, 把artifactId和name,做一個更改,這樣我們兩個項目就創(chuàng)建好了,創(chuàng)建項目,我們先看我們拷貝過來的項目,在這里我們兩個消息的接收者,一個是ErrorReceiver,一個是InfoReceiver,然后他們都是根據自己的路由器,決定從哪個隊列里去取消息,然后我們去看消息的提供者,Sender,現在我們用的是error的路由key@Value("${mq.config.queue.error.routing.key}")
private String routingkey;我們看配置文件mq.config.queue.error.routing.key=log.error.routing.key那么按照這個的發(fā)現規(guī)則來看呢,我們應該是ErrorReceiver,接收到消息,然后我們再看一下測試代碼,測試代碼當中呢,我們這里有一個while死循環(huán),然后里面休眠了一秒,一秒發(fā)送一條消息,這個時候應該會被誰接收呢,被我們的ErrorReceiver接收,我們發(fā)送是有他來發(fā)送的,代碼的一個整體結構,現在我在這里做一個代碼的修改,我首先讓他休眠兩秒,讓他慢一點,這樣看的清楚一點,然后我在外面設置一個int的變量,是0,我在這里flag++,加到我們的消息當中,什么意思呢,給我們的消息加一個編號,這樣知道是第幾條消息,然后我們再打開我們的管理頁面59.110.158.145:15672大家還記得這個管理頁面吧,然后我們看這個queues,現在這里是沒有任何隊列的,我們接下來先去啟動消費者,消費者啟動成功,我們再來啟動消息的發(fā)送者,找到他的測試代碼,我們來運行他,這個時候就兩秒發(fā)送一次消息,兩秒發(fā)送一次消息,這個時候ErrorReceiver已經接受消息了,兩秒鐘發(fā)送一次,兩秒鐘接收一次,然后我們再看管理頁面,這個時候queues就直接有變化了,這個時候是不是出現了兩個隊列,一個好似log.error,一個是log.info,只不過我們現在發(fā)送消息是log.error,不是log.info,所以現在只有一個隊列里有數據流,然后注意看他的Features,特征,現在特征是一個AD,鼠標往那里一放,auto-delete:true,那么auto-delete是什么含義呢,別著急,我們一會再來講解他,給大家演示一下消息丟失的現象,怎么辦呢,現在顯示的消費者的線程,我把消息消費者關掉,你看我在哪里關,點,然后我們的消息在第51條停止了,宕機了,消費者宕機了,然后我的發(fā)送者這一側,由于它是在一個死循環(huán)里,所以消息還是源源不斷的在發(fā)送,我們這個時候再點queues,這里就已經動態(tài)更新了是不是又變成了no queues的動態(tài)了
現在沒有隊列了,你那個原來的log.error,log.info,隊列都沒有了,然后現在再看,我再把消費者啟動,注意我們是在51條停的,運行走,這個時候我們就接收到消息了,第208條,這里這個隊列就出來了,那么也就是說,從51到208之間的消息,就沒有了,丟失了,這就是一個典型的消息丟失現象,那我們現在之所以會出現丟失消息的原因是什么呢,關鍵在這,看我們的Error Receiver,我們現在在設置這個Queue的時候,我們就是用了一個@Queue的一個注解,這里有一個autoDelete屬性,然后我們給的是true,然后當時我們給的屬性的時候,你就可以理解為一個臨時隊列,那么什么叫臨時隊列呢,如果我們在Queue里,autoDelete除了可以在@Queue的注解當中,在@Exchange這里也有一個autoDelete,也有這個屬性,他里面賦的值也是true或者false,那么我們就把autoDelete這個屬性設置一下,如果我們在@Queue注解上,加了autoDelete,這里是什么含義呢,當所有消費客戶端,鏈接斷開后,是否自動刪除隊列,如果設置成true,表示刪除,如果設置成false,就表示不刪除,看到了嗎,我們現在只要所有的客戶端,都不再連接這個隊列了,這個隊列就會被自動的刪除,如果你設置true的話,如果設置成false呢,即便是所有的客戶端都斷開連接了,除了@Queue這一塊可以設置,在@Exchange的注解里,也有autoDelete屬性,如果我們要在這個注解里設置了,表示什么含義呢,當綁定隊列都不再使用時,是否自動刪除交換器,同樣的道理,true表示刪除,false表示不刪除,其實你在哪個注解里設置,刪隊列還是刪交換器,明白這個意思吧,所以@Queue作為我們講解的重點,比如我們要解決丟失消息的問題,怎么辦呢,我們剛才也看到了,其實之所以會產生消息丟失的原因就是,當我們的消費者服務一停掉,那么對于RabbitMQ來講,會自動的把這個queue刪除掉,所以我們再次啟動的時候,在另一個Queue是沒法拿到另一個消息的,所以我們要解決這個問題,很簡單,當你的消費客戶端,斷開連接以后,RabbitMQ也不會刪除這個Queue,那當我再次啟動的時候,由于這個Queue還存在,那我們是不是還可以繼續(xù)從這個Queue里去獲取剛才我沒有獲取的消息,我們再來演示一下,現在我把這個true改成false@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.error.routing.key}")
)然后我們先去啟動消費者,然后再去啟動消息的提供者,我們看這個時候消息就來了,我們再去看管理界面,這個時候已經刷新了,然后注意看這個Features,由于我們只是在log.error的消息隊列里,我們只改了這個的autoDelete,沒有改log.info的,所以log.info還是AD狀態(tài)的,就是當客戶端,所有客戶端一斷開以后,而現在在log.error這一塊,他的Feature里面,里面沒有AD了,表示他是一個持久的隊列,他并不會隨著你客戶端的關閉而刪除,我們來看一下是不是這樣的,我們現在還是把服務停掉,在第28條消息給關掉了,我們把它清空一下,然后我們的消息提供者還是在不斷地發(fā)送消息,回過來,我們再去請求消費者,等一下我們先去看這個隊列,看這個管理界面,是不是log.info沒有了,但是log.error一直存在,而且還有數據向隊列里進行輸入,然后注意看這個ready,這個Ready是什么呢,其實就是對于未接收到的數據的一個顯示,也就是說RabbitMQ,在隊列里存放的消息,如果消息并沒有被消費者所消費,那么他就會給這個消息加一個標記,表示當前這個消息是未被消費的,那我們現在看到的這個Ready里面,顯示當前有多少條消息沒有被消費,現在已經變成27條了,然后我們現在再去啟動消費者,運行,觀察他的控制臺,看到了嗎,他啟動以后,我們從28條我們停了,從第29條開始讀到消息,一直往后,讀到現在,現在ready是不是沒有了,現在已經沒有被消費的消息,都被消費到了,所以我們解決了丟失消息的現象,只要是你的服務停了,只要你把autoDelete設置成false了,服務再啟動的時候,由于隊列,還存在,所以這些消息我們仍然可以接收到,我么之所以接收到的原因就是因為隊列存在了,如果我們關閉以后,我們的消費者關閉以后,其實發(fā)送者發(fā)送的消息一直在那里存著呢,就是在RabbitMQ服務器的內存當中存放著,因為隊列也在內存當中,隊列里包含了這些數據,都在內存里存放著,但是我的RabbitMQ,整個服務都關掉了,服務都關掉了,那隊列肯定也沒了,消息肯定也沒了,這就是我們采用autoDelete來對知識點的講解,消息持久化的內容就到這
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-durable-direct-consumer</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-durable-direct-consumerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
mq.config.exchange=log.direct
mq.config.queue.info=log.info
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error=log.error
mq.config.queue.error.routing.key=log.error.routing.key
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.error.routing.key}"))
public class ErrorReceiver {/*** 接收消息的方法。采用消息隊列監(jiān)聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Error..........receiver: "+msg);}
}
package com.learn;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息接收者* @author Administrator* @RabbitListener bindings:綁定隊列* @QueueBinding value:綁定隊列的名稱* exchange:配置交換器* * @Queue value:配置隊列名稱* autoDelete:是否是一個可刪除的臨時隊列* * @Exchange value:為交換器起個名稱* type:指定具體的交換器類型*/
@Component
@RabbitListener(bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),key="${mq.config.queue.info.routing.key}"))
public class InfoReceiver {/*** 接收消息的方法。采用消息隊列監(jiān)聽機制* @param msg*/@RabbitHandlerpublic void process(String msg){System.out.println("Info........receiver: "+msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDurableDirectConsumerApplication {public static void main(String[] args) {SpringApplication.run(RabbitDurableDirectConsumerApplication.class, args);}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>rabbitmq-durable-direct-provider</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個插件,可以將應用打包成一個可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
spring.application.name=rabbitmq-durable-direct-providerspring.rabbitmq.host=59.110.158.145
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guestmq.config.exchange=log.direct
mq.config.queue.info.routing.key=log.info.routing.key
mq.config.queue.error.routing.key=log.error.routing.key
mq.config.queue.error=log.error
package com.learn;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 消息發(fā)送者* @author Administrator**/
@Component
public class Sender {@Autowiredprivate AmqpTemplate rabbitAmqpTemplate;//exchange 交換器名稱@Value("${mq.config.exchange}")private String exchange;//routingkey 路由鍵@Value("${mq.config.queue.error.routing.key}")private String routingkey;/** 發(fā)送消息的方法*/public void send(String msg){//向消息隊列發(fā)送消息//參數一:交換器名稱。//參數二:路由鍵//參數三:消息this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);}
}
package com.learn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitDurableDirectProviderApplication {public static void main(String[] args) {SpringApplication.run(RabbitDurableDirectProviderApplication.class, args);}
}
?
總結
以上是生活随笔為你收集整理的RabbitMQ消息持久化处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用RabbitMQ实现松耦合设计
- 下一篇: 什么是服务注册中心