快速掌握消息队列RabbitMQ
※快速掌握消息隊列RabbitMQ
一.RabbitMQ概述
(一)什么是消息隊列MQ
- 消息隊列(Message Queue),后文稱MQ,是一種 跨進程的通信機制,用于上下游傳遞消息。
- MQ作為消息中間件,最主要的作用系統之間的信息傳 遞進行“解耦”,MQ是數據可靠性的重要保障。
(二)什么是RabbitMQ
- RabbitMQ是全世界最火的開源消息代理服務器, 在全世界擁有超過35000個項目部署在 RabbitMQ。
- RabbitMQ支持幾乎所有的操作系統與編程語言。
- Rabbit提供了高并發、高可用的成熟方案,支持 多種消息協議,易于部署與使用。
(三)和同類產品比較
(四)RabbitMQ應用場景
- 異構系統的數據傳遞
- 高并發程序的流量控制
- 基于P2P,P2PPP的程序
- 分布式系統的事務一致性TCC
- 高可靠性的交易系統
(五)消息狀態
Ready – 消息已被送入隊列,等待被消費
Unacked – 消息已經被消費者認領,但還未被確認“已被消費” – Unacked狀態下,消費者斷開連接則消息回到”Ready” – 沒有確認,客戶有沒有斷開連接,則一直處于Unacked。 Finished – 調用basicAck()方法后,表示消息已被消費,從隊列中移除。
(六)RabbitMQ六中工作模式
二.RabbitMQ單點安裝
(一)安裝步驟
(二)常用命令
啟動與關閉
rabbitmq-server 前臺啟動服務
rabbitmq-server -detached 后臺啟動服務
rabbitmqctl stop 停止服務(相當于關閉進程)
終止與啟動應用
rabbitmqctl start_app 啟動應用 (不會關閉進程)
rabbitmqctl stop_app 終止應用
用戶管理
rabbitmqctl add_user {username} {password} – 創建新用戶
rabbitmqctl delete_user {username} – 刪除用戶
rabbitmqctl change_password {username} {newpassword} – 重置密碼
rabbitmqctl set_user_tags {username} {tag} – 授予用戶角色(Tag)
rabbitmqctl set_permissions -p / user_admin ‘.’ '.’ ‘.*’ – 設置用戶允許訪問的vhost
RabbitMQ用戶四種Tag
超級管理員(administrator) –
可登陸管理控制臺(啟用management plugin的情況下),可查看所有 的信息,并且可以對用戶,策略(policy)進行操作。
監控者(monitoring) – 登陸管理控制臺(啟用management plugin的情況下),同時可以查看 rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等) u 策略制定者(policymaker) – 可登陸管理控制臺(啟用management plugin的情況下), 同時可以對 policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
普通管理者(management) – 僅可登陸管理控制臺(啟用management plugin的情況下),無法看到 節點信息,也無法對策略進行管理。
三.rabbitMQ集群安裝
1.搭建RabbitMQ集群采用主備模式、鏡像模式、遠程模式、多活模式中的鏡像模式,部署在騰訊云輝:106.53.70.31,騰訊云芳:129.204.181.161,騰訊云傳:129.204.152.2,搭建圖如下:
2.搭建步驟
(1)https://www.erlang-solutions.com/resources/download.html下載RabbitMQ必備
組件
Erlang插件esl-erlang_22.0.7-1_centos_7_amd64.rpm;https://www.rabbitmq.com/download.html下載RabbitMQ安裝包rabbitmq-server-3.7.17-1.el7.noarch.rpm
把這兩個文件上傳到騰訊云服務器芳Ip地址:129.204.181.161的/usr/local/temp
騰訊云服務器傳Ip地址:129.204.152.2
(2)在兩臺服務器終端執行以下命令來搭建RabbitMQ
[root@VM_0_12_centos ~]# cd /usr/local/temp
[root@VM_0_12_centos temp]# rpm -ivh --nodeps esl-erlang_22.0.7-1_centos_7_amd64.rpm
[root@VM_0_12_centos temp]# rpm -ivh --nodeps rabbitmq-server-3.7.17-1.el7.noarch.rpm
[root@VM_0_12_centos temp]# rabbitmq-plugins enable rabbitmq_management
[root@VM_0_12_centos temp]# chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
[root@VM_0_12_centos temp]# rabbitmq-server -detached
[root@VM_0_12_centos temp]# rabbitmqctl add_user root root
[root@VM_0_12_centos temp]# rabbitmqctl set_user_tags root administrator
[root@VM_0_12_centos temp]# rabbitmqctl set_permissions -p / root ‘.’ '.’ ‘.*’
最后開啟15672控制臺端口, 5672客戶端訪問端口
(3)在兩臺服務器終端執行以下命令以搭建Mirror鏡像集群
129.204.181.161服務器
[root@VM_0_12_centos rabbitmq]# vim /etc/hostname 修改為m1
[root@VM_0_12_centos rabbitmq]# vim /etc/hosts
添加
129.204.181.161 m1
129.204.152.2 m2
開放4369 25672端口
[root@VM_0_12_centos rabbitmq]# scp /var/lib/rabbitmq/.erlang.cookie 129.204.152.2:/var/lib/rabbitmq/
[root@VM_0_12_centos rabbitmq]# chmod 400 /var/lib/rabbitmq/.erlang.cookie
129.204.152.2服務器
[root@VM_0_12_centos rabbitmq]# vim /etc/hostname 修改為m2
[root@VM_0_12_centos rabbitmq]# vim /etc/hosts
添加
129.204.181.161 m1
129.204.152.2 m2
開放4369 25672端口
[root@VM_0_12_centos rabbitmq]# chmod 400 /var/lib/rabbitmq/.erlang.cookie
[root@VM_0_12_centos temp]# rabbitmqctl stop_app
[root@VM_0_12_centos temp]# rabbitmqctl join_cluster rabbit@m1
[root@VM_0_12_centos temp]# rabbitmqctl start_app
(4)在106.53.70.31(輝)服務器終端執行以下命令來搭建Haproxy對RabbitMQ負載均衡
[root@VM_0_16_centos ~]# yum install haproxy
vim /etc/haproxy/haproxy.cfg
在末尾添加內容如下
#---------------------------------------------------------------------
my
#---------------------------------------------------------------------
listen rabbitmq
bind 0.0.0.0:5672
mode tcp
option tcplog
balance roundrobin
#option tcpka
server rabbit1 xx.xx.1.1:5672 check inter 5s rise 2 fall 3
server rabbit2 xx.xx.1.2:5672 check inter 5s rise 2 fall 3
listen http_front
bind 0.0.0.0:1080
stats refresh 30s
stats uri /haproxy?stats
stats auth admin:admin
然后運行Haproxy
haproxy -f /etc/haproxy/haproxy.cfg
重啟的方式是
service haproxy restart
四.java端使用RabbitMQ
(一)創建Maven工程,導入jar包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency>(二)創建工具類
package com.itlaoqi.rabbitmq.utils;import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class RabbitUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();static {connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//5672是RabbitMQ的默認端口號connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/test");}public static Connection getConnection(){Connection conn = null;try {conn = connectionFactory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}} }常量類
package com.itlaoqi.rabbitmq.utils;public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic"; }(三)簡單模式
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//TCP 物理連接Connection conn= RabbitUtils.getConnection();//創建通信“通道”,相當于TCP中的虛擬連接Channel channel = conn.createChannel();//創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列//第一個參數:隊列名稱ID//第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失//第三個參數:是否隊列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用,其他消費者不讓訪問//第四個:是否自動刪除,false代表連接停掉后不自動刪除掉這個隊列//其他額外的參數, nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//四個參數//exchange 交換機,暫時用不到,在后面進行發布訂閱時才會用到//隊列名稱//額外的設置屬性//最后一個參數是要傳遞的消息字節數組String message = "helloworld!";channel.basicPublish("" , RabbitConstant.QUEUE_HELLOWORLD,null , message.getBytes());channel.close();conn.close();System.out.println("發送數據成功");} }消費者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection conn= RabbitUtils.getConnection();//創建通道Channel channel = conn.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//創建一個消息消費者//第二個參數代表是否自動確認收到消息,false代表手動編程來確認消息,這是MQ的推薦做法//第三個參數要傳入DefaultConsumer的實現類channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));} }class Reciver extends DefaultConsumer{private Channel channel;//重寫構造函數,Channel通道對象需要從外層傳入,在handleDelivery中要用到public Reciver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*super.handleDelivery(consumerTag,envelope,properties,body);*/String messageBody = new String(body);System.out.println("消費者接收到:" + messageBody);//簽收消息,確認消息//envelope.getDeliveryTag() 獲取這個消息的TagId//false只確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息channel.basicAck(envelope.getDeliveryTag() , false);} }(四)工作隊列模式
實體類
生產者
public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);for(int i = 100 ; i <= 200 ; i++) {SMS sms = new SMS("乘客" + i, "13900000" + i, "您的車票已預訂成功");String jsonSMS = new Gson().toJson(sms);channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());}System.out.println("發送數據成功");channel.close();connection.close();} }消費者,這里的話通過負責多個消費者達到負載均衡地消費消息
public class SMSSender1 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者//basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),在從隊列中獲取一個新的channel.basicQos(1);//處理完一個取一個channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1-短信發送成功:" + jsonSMS);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});} }(五)發布訂閱模式(創建交換機為fanout)
生產者
消費者(可復制多個),然后改下隊列名稱
百度消費者
新浪消費者
public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由key(暫時用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }(六)路由模式(精確匹配)(創建交換機為direct)
生產者
百度消費者
public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }新浪消費者
public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }(七)主體模式(模糊匹配)(交換機為topic,*匹配單個字符,#匹配所有)
生產者
public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中國河北石家莊20991011天氣數據");area.put("china.shandong.qingdao.20991011", "中國山東青島20991011天氣數據");area.put("china.henan.zhengzhou.20991011", "中國河南鄭州20991011天氣數據");area.put("us.cal.la.20991011", "美國加州洛杉磯20991011天氣數據");area.put("china.hebei.shijiazhuang.20991012", "中國河北石家莊20991012天氣數據");area.put("china.shandong.qingdao.20991012", "中國山東青島20991012天氣數據");area.put("china.henan.zhengzhou.20991012", "中國河南鄭州20991012天氣數據");area.put("us.cal.la.20991012", "美國加州洛杉磯20991012天氣數據");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二個參數相當于數據篩選的條件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();} }百度消費者
public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//channel.queueUnbind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//*.hebei.*.*channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }新浪消費者
public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }(八)生產者與MQ的交互
生產者
五.Spring與RabbitMQ整合
(一)生產者
1.新建Maven工程,導入jar包
2.增加配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 設置連接工廠,配置基本參數 --><rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"virtual-host="/test"></rabbit:connection-factory><!--fanout-exchange | direct-exchange | topic-exchange聲明一個名為topicExchange的topic交換機,如果這個交換機不存在,則自動創建--><rabbit:topic-exchange name="topicExchange" auto-declare="true"></rabbit:topic-exchange><!-- Spring為我們封裝了RabbitTemplate對象來簡化生產者發送數據的過程,對常用的方法進行了封裝。 --><rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template><!--在生產者中配置template對象,用于發送數據--><bean id="newsProducer" class="com.itlaoqi.rabbit.exchange.NewsProducer"><property name="rabbitTemplate" ref="template"/></bean><!-- RabbitAdmin對象用于創建、綁定、管理隊列與交換機 --><rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/> </beans>3.創建實體類,并序列化
public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }4.編寫生產者
public class NewsProducer {private RabbitTemplate rabbitTemplate = null;public RabbitTemplate getRabbitTemplate() {return rabbitTemplate;}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendNews(String routingKey , News news){//convertAndSend 用于向exchange發送數據//第一個參數是routingkey//第二個參數是要傳遞的對象,可以是字符串、byte【】或者任何實現了【序列化接口】的對象rabbitTemplate.convertAndSend(routingKey , news);System.out.println("新聞發送成功");}public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));} }5.利用rabbitAdmin直接操作隊列和交換機
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class RabbitAdminTestor {@Resource(name="rabbitAdmin")private RabbitAdmin rabbitAdmin;@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testCreateExchange(){rabbitAdmin.declareExchange(new FanoutExchange("test.exchange.fanout" , true ,false));rabbitAdmin.declareExchange(new DirectExchange("test.exchange.direct" , true ,false));rabbitAdmin.declareExchange(new TopicExchange("test.exchange.topic" , true ,false));}@Testpublic void testQueueAndBind(){rabbitAdmin.declareQueue(new Queue("test.queue"));rabbitAdmin.declareBinding(new Binding("test.queue", Binding.DestinationType.QUEUE,"test.exchange.topic", "#", new HashMap<String, Object>()));rabbitTemplate.convertAndSend("test.exchange.topic" , "abc" , "abc123");}@Testpublic void testDelete(){rabbitAdmin.deleteQueue("test.queue");rabbitAdmin.deleteExchange("test.exchange.fanout");rabbitAdmin.deleteExchange("test.exchange.direct");rabbitAdmin.deleteExchange("test.exchange.topic");}}(二)消費者
1.創建Maven工程,導入jar包
2.增加配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"virtual-host="/test"></rabbit:connection-factory><rabbit:admin connection-factory="connectionFactory"></rabbit:admin><!--創建隊列--><rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/><!--交換機與隊列綁定,并指明篩選條件--><rabbit:topic-exchange name="topicExchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--啟動消費者后,Spring底層自動監聽對應的topicQueue數據,一旦有新的消息進來,自動傳入到consumer Bean的recv的News參數中,之后再程序對News進一步處理--><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/></rabbit:listener-container><bean id="consumer" class="com.itlaoqi.rabbitmq.NewsConsumer"></bean> </beans>3.編寫實體類
public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }4.編寫消費者方法類
public class NewsConsumer {public void recv(News news){System.out.println("接收到最新新聞:" + news.getTitle() + ":" + news.getSource());}public static void main(String[] args) {//初始化IOC容器ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");} }六.Spring Boot與RabbitMQ整合
(一)生產者
1.創建spring boot工程,導入jar包
2.修改配置文件
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=1000ms#producer #confirmlistener spring.rabbitmq.publisher-confirms=true #returnlistener spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true3.創建實體類,并序列化
public class Employee implements Serializable{private String empno;private String name;private Integer age;public Employee(String empno, String name, Integer age) {this.empno = empno;this.name = name;this.age = age;}public String getEmpno() {return empno;}public void setEmpno(String empno) {this.empno = empno;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;} }4.編寫生產者方法(需要自己創建好虛擬機,交換機,隊列和綁定)
@Component public class MessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate ;RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Override/*** CorrelationData 消息的附加信息,即自定義id* isack 代表消息是否被broker(MQ)接收 true 代表接收 false代表拒收。* cause 如果拒收cause則說明拒收的原因,幫助我們進行后續處理*/public void confirm(CorrelationData correlationData, boolean isack, String cause) {System.out.println(correlationData);System.out.println("ack:" + isack);if(isack == false){System.err.println(cause);}}};RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {System.err.println("Code:" + replyCode + ",Text:" + replyText );System.err.println("Exchange:" + exchange + ",RoutingKey:" + routingkey );}};public void sendMsg(Employee emp){//CorrelationData對象的作用是作為消息的附加信息傳遞,通常我們用它來保存消息的自定義idCorrelationData cd = new CorrelationData(emp.getEmpno() + "-" + new Date().getTime());rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);rabbitTemplate.convertAndSend("springboot-exchange" , "hr.employee" , emp , cd);}}(二)消費者
1.創建spring boot工程,導入jar包
2.編寫配置文件
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=1000ms #手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.concurrency=1 spring.rabbitmq.listener.simple.max-concurrency=53.編寫實體類
public class Employee implements Serializable{private String empno;private String name;private Integer age;public Employee(String empno, String name, Integer age) {this.empno = empno;this.name = name;this.age = age;}public String getEmpno() {return empno;}public void setEmpno(String empno) {this.empno = empno;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;} }4.編寫消費者類
@Component public class MessageConsumer {//@RabbitListener注解用于聲明式定義消息接受的隊列與exhcange綁定的信息//在SpringBoot中,消費者這端使用注解獲取消息@RabbitListener(bindings = @QueueBinding(value = @Queue(value="springboot-queue" , durable="true"),exchange = @Exchange(value = "springboot-exchange" , durable = "true" , type = "topic") ,key = "#"))//用于接收消息的方法@RabbitHandler //通知SpringBoot下面的方法用于接收消息。// 這個方法運行后將處于等待的狀態,有新的消息進來就會自動觸發下面的方法處理消息//@Payload 代表運行時將消息反序列化后注入到后面的參數中public void handleMessage(@Payload Employee employee , Channel channel ,@Headers Map<String,Object> headers) {System.out.println("=========================================");System.out.println("接收到" + employee.getEmpno() + ":" + employee.getName());//所有消息處理后必須進行消息的ack,channel.basicAck()Long tag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);try {channel.basicAck(tag , false);} catch (IOException e) {e.printStackTrace();}System.out.println("=========================================");} }七.Spring Boot與RabbitMQ集群整合
只需要ip地址改成harproxy的ip,端口5672
總結
以上是生活随笔為你收集整理的快速掌握消息队列RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我的子平真诠学习笔记
- 下一篇: java jisuan da xie z