spring集成RabbitMQ配置文件详解(生产者和消费者)
1,首先引入配置文件org.springframework.amqp,如下:
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.7.1.RELEASE</version></dependency>2,準(zhǔn)備工作:安裝好rabbitmq,并在項(xiàng)目中增加配置文件?? rabbit.properties 內(nèi)容如下:
rmq.ip=192.188.113.114 rmq.port=5672 rmq.producer.num=20 rmq.manager.user=admin rmq.manager.password=admin3,rabbitmq屬性介紹:
概念解釋:
Brocker:消息隊(duì)列服務(wù)器實(shí)體。Exchange:消息交換機(jī),指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。Queue:消息隊(duì)列,每個(gè)消息都會(huì)被投入到一個(gè)或者多個(gè)隊(duì)列里。Binding:綁定,它的作用是把exchange和queue按照路由規(guī)則binding起來。Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。Virtual Host: 虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不用用戶的權(quán)限分離。每個(gè)virtual host本質(zhì)上都是一個(gè)RabbitMQ Server(但是一個(gè)server中可以有多個(gè)virtual host),擁有它自己若干的個(gè)Exchange、Queue和bings rule等等。其實(shí)這是一個(gè)虛擬概念,類似于權(quán)限控制組。Virtual Host是權(quán)限控制的最小粒度。Producer:消息生產(chǎn)者,就是投遞消息的程序。Consumer:消息消費(fèi)者,就是接受消息的程序。Connection: 就是一個(gè)TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實(shí)踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。僅僅創(chuàng)建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個(gè)Connection創(chuàng)建Channel。Channel: 它是建立在上述TCP連接之上的虛擬連接。數(shù)據(jù)傳輸都是在Channel中進(jìn)行的。AMQP協(xié)議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令。一個(gè)Connection可以包含多個(gè)Channel。有人要問了,為什么要使用Channel呢,直接用TCP連接不就好了么?對于一個(gè)消息服務(wù)器來說,它的任務(wù)是處理海量的消息,當(dāng)有很多線程需要從RabbitMQ中消費(fèi)消息或者生產(chǎn)消息,那么必須建立很多個(gè)connection,也就是許多個(gè)TCP連接。然而對于操作系統(tǒng)而言,建立和關(guān)閉TCP連接是非常昂貴的開銷,而且TCP的連接數(shù)也有限制,頻繁的建立關(guān)閉TCP連接對于系統(tǒng)的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現(xiàn)。RabbitMQ采用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時(shí)也便于管理。在TCP連接中建立Channel是沒有上述代價(jià)的,可以復(fù)用TCP連接。對于Producer或者Consumer來說,可以并發(fā)的使用多個(gè)Channel進(jìn)行Publish或者Receive。有實(shí)驗(yàn)表明,在Channel中,1秒可以Publish10K的數(shù)據(jù)包。對于普通的Consumer或者Producer來說,這已經(jīng)足夠了。除非有非常大的流量時(shí),一個(gè)connection可能會(huì)產(chǎn)生性能瓶頸,此時(shí)就需要開辟多個(gè)connection。消息隊(duì)列的使用過程大概如下:
消息接收
消息發(fā)布
客戶端投遞消息到exchange。 exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。AMQP 里主要要說兩個(gè)組件:
Exchange 和 Queue 綠色的X就是Exchange ,紅色的是Queue,這兩者都在Server端,又稱作Broker 這部分是RabbitMQ實(shí)現(xiàn)的,而藍(lán)色的則是客戶端,通常有Producer和Consumer兩種類型。?
4,配置spring-rabbitmq.xml,內(nèi)容如下:
<!-- 公共部分 --> <!-- 創(chuàng)建連接類 連接安裝好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"><constructor-arg value="localhost" /> <!-- username,訪問RabbitMQ服務(wù)器的賬戶,默認(rèn)是guest --><property name="username" value="${rmq.manager.user}" /><!-- username,訪問RabbitMQ服務(wù)器的密碼,默認(rèn)是guest --> <property name="password" value="${rmq.manager.password}" /><!-- host,RabbitMQ服務(wù)器地址,默認(rèn)值"localhost" --> <property name="host" value="${rmq.ip}" /> <!-- port,RabbitMQ服務(wù)端口,默認(rèn)值為5672 --><property name="port" value="${rmq.port}" /><!-- channel-cache-size,channel的緩存數(shù)量,默認(rèn)值為25 --><property name="channel-cache-size" value="50" /><!-- cache-mode,緩存連接模式,默認(rèn)值為CHANNEL(單個(gè)connection連接,連接之后關(guān)閉,自動(dòng)銷毀) --><property name="cache-mode" value="CHANNEL" /> </bean> <!--或者這樣配置,connection-factory元素實(shí)際就是注冊一個(gè)org.springframework.amqp.rabbit.connection.CachingConnectionFactory實(shí)例 <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}" />--> <rabbit:admin connection-factory="connectionFactory"/><!--定義消息隊(duì)列,durable:是否持久化,如果想在RabbitMQ退出或崩潰的時(shí)候,不會(huì)失去所有的queue和消息,需要同時(shí)標(biāo)志隊(duì)列(queue)和交換機(jī)(exchange)是持久化的,即rabbit:queue標(biāo)簽和rabbit:direct-exchange中的durable=true,而消息(message)默認(rèn)是持久化的可以看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除;auto_delete: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列 --> <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /><!--綁定隊(duì)列,rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,我們用direct模式,即rabbit:direct-exchange標(biāo)簽,Direct交換器很簡單,如果是Direct類型,就會(huì)將消息中的RoutingKey與該Exchange關(guān)聯(lián)的所有Binding中的BindingKey進(jìn)行比較,如果相等,則發(fā)送到該Binding對應(yīng)的Queue中。有一個(gè)需要注意的地方:如果找不到指定的exchange,就會(huì)報(bào)錯(cuò)。但routing key找不到的話,不會(huì)報(bào)錯(cuò),這條消息會(huì)直接丟失,所以此處要小心,auto-delete:自動(dòng)刪除,如果為Yes,則該交換機(jī)所有隊(duì)列queue刪除后,自動(dòng)刪除交換機(jī),默認(rèn)為false --> <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"><rabbit:bindings><rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding><rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding></rabbit:bindings> </rabbit:fanout-exchange><!-- 生產(chǎn)者部分 --> <!-- 發(fā)送消息的producer類,也就是生產(chǎn)者 --> <bean id="msgProducer" class="com.asdf.sdf.ClassA"><!-- value中的值就是producer中的的routingKey,也就是隊(duì)列名稱,它與上面的rabbit:bindings標(biāo)簽中的key必須相同 --><property name="queueName" value="{alert.queue.1}"/> </bean><!-- spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn) --> <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean> <!-- 或者配置jackson --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> --><rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /><!-- 消費(fèi)者部分 --> <!-- 自定義接口類 --> <bean id="testHandler" class="com.rabbit.TestHandler"></bean><!-- 用于消息的監(jiān)聽的代理類MessageListenerAdapter --> <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" ><!-- 類名 --><constructor-arg ref="testHandler" /><!-- 方法名 --><property name="defaultListenerMethod" value="handlerTest"></property><property name="messageConverter" ref="jsonMessageConverter"></property> </bean><!-- 配置監(jiān)聽acknowledeg="manual"設(shè)置手動(dòng)應(yīng)答,它能夠保證即使在一個(gè)worker處理消息的時(shí)候用CTRL+C來殺掉這個(gè)worker,或者一個(gè)consumer掛了(channel關(guān)閉了、connection關(guān)閉了或者TCP連接斷了),也不會(huì)丟失消息。因?yàn)镽abbitMQ知道沒發(fā)送ack確認(rèn)消息導(dǎo)致這個(gè)消息沒有被完全處理,將會(huì)對這條消息做re-queue處理。如果此時(shí)有另一個(gè)consumer連接,消息會(huì)被重新發(fā)送至另一個(gè)consumer會(huì)一直重發(fā),直到消息處理成功,監(jiān)聽容器acknowledge="auto" concurrency="30"設(shè)置發(fā)送次數(shù),最多發(fā)送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"><rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /><rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> </rabbit:listener-container>?
5,生產(chǎn)者(發(fā)送端)代碼:
@Resource private RabbitTemplate rabbitTemplate; private String queueName; public void sendMessage(CommonMessage msg){try { logger.error("發(fā)送信息開始");System.out.println(rabbitTemplate.getConnectionFactory().getHost()); //發(fā)送信息 queueName交換機(jī),就是上面的routingKey msg.getSource() 為 test_key rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);//如果是普通字符串消息需要先序列化,再發(fā)送消息//rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));logger.error("發(fā)送信息結(jié)束");} catch (Exception e) { e.printStackTrace();}}public void setQueueName(String queueName) {this.queueName = queueName; }?
6,消費(fèi)端代碼:TestHandler 類
public class TestHandler {@Overridepublic void handlerTest(CommonMessage commonMessage) {System.out.println("DetailQueueConsumer: " + new String(message.getBody()));} }其他exchangeType介紹:
fanOut:
<!-- Fanout 扇出,顧名思義,就是像風(fēng)扇吹面粉一樣,吹得到處都是。如果使用fanout類型的exchange,那么routing key就不重要了。因?yàn)榉彩墙壎ǖ竭@個(gè)exchange的queue,都會(huì)受到消息。 --> <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange"> <rabbit:bindings> <rabbit:binding queue="test_delay_queue"/> </rabbit:bindings> </rabbit:fanout-exchange>topic:如果說direct是將消息放到exchange綁定的一個(gè)queue里(一對一);fanout是將消息放到exchange綁定的所有queue里(一對所有);那么topic類型的exchange就可以實(shí)現(xiàn)(一對部分),應(yīng)用場景就是打印不同級別的錯(cuò)誤日志,我們的系統(tǒng)出錯(cuò)后會(huì)根據(jù)不同的錯(cuò)誤級別生成error_levelX.log日志,我們在后臺(tái)首先要把所有的error保存在一個(gè)總的queue(綁定了一個(gè)*.error的路由鍵)里,然后再按level分別存放在不同的queue。
<!-- 發(fā)送端不是按固定的routing key發(fā)送消息,而是按字符串“匹配”發(fā)送,接收端同樣如此 --> <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"><rabbit:bindings><rabbit:binding queue="Q1" pattern="error.*.log" /><rabbit:binding queue="Q2" pattern="error.level1.log" /><rabbit:binding queue="Q3" pattern="error.level2.log" /></rabbit:bindings> </rabbit:topic-exchange>routing key綁定如下圖:
?
本文轉(zhuǎn)自:
https://blog.csdn.net/nandao158/article/details/81065892
https://www.cnblogs.com/LipeiNet/p/6079427.html
https://www.toutiao.com/a6598154241037042189/?tt_from=mobile_qq&utm_campaign=client_share×tamp=1536277584&app=news_article&utm_source=mobile_qq&iid=43157585039&utm_medium=toutiao_android&group_id=6598154241037042189
總結(jié)
以上是生活随笔為你收集整理的spring集成RabbitMQ配置文件详解(生产者和消费者)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Taro+react开发(83):tar
- 下一篇: CentOS配置postgresql+p