javascript
SpringMQ的使用
文章目錄
- SpringMQ的使用
- 1、windows安裝
- 2、添加依賴:
- 3、增加rabbit的配置:
- 4、最簡單的測試:publisher--->MQ-->consumer
- 4.1、建立連接配置:
- 4.2、發送端:建立連接,獲取通道,創建隊列,準備消息,發送消息到隊列:
- 4.3 、接收端:建立連接,獲取通道,聲明隊列,申請隊列的一個消費者(內含監聽消息的方法),在通道線路上接收消息。
- 4.4 消息接收的手動確認:
- 5、work消息模型
- 5.1 消息發送:
- 5.2 消息接收:
- 6、訂閱模型分類:
- 6.1 fanout:也就是交換機廣播消息
- 6.1.1 發送消息:建立連接,獲取通道,聲明交換機,發送消息到交換機
- 6.1.2 接受消息:建立連接,獲取通道,聲明隊列,綁定到交換機,定義消費者(包含監聽),監聽通道。
- 6.2 direct:相當于定向投放
- 6.2.1 發送消息: 交換機類型指定為direct,發送消息時指定routing key
- 6.2.2 接收消息: 獲取通道,聲明隊列,綁定交換機,指定routing key
- 6.3 Topic:交換機采用通配符的方式和隊列匹配
- 6.3.1 發送消息: 指定交換機類型,發送消息攜帶route key
- 6.3.2 接收消息: 聲明隊列,綁定交換機攜帶帶通配符的route key
- 7、持久化
- 7.1 交換機持久化
- 7.2 隊列持久化
- 7.3 消息持久化
- 8、spring和MQ整合
- 8.1 設置接收消息的監聽器: 通過注解@RabbitListener實現,綁定隊列,交換機,和route-key
- 8.2接收消息:通過AmqpTemplate實現
SpringMQ的使用
1、windows安裝
安裝erlang,配置環境變量,安裝rabbitmq-server
訪問:http://localhost:15672/
2、添加依賴:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>3、增加rabbit的配置:
spring:rabbitmq:host: 127.0.0.1username: rootpassword: 880808virtual-host: /4、最簡單的測試:publisher—>MQ–>consumer
4.1、建立連接配置:
public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("12345678");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;}}4.2、發送端:建立連接,獲取通道,創建隊列,準備消息,發送消息到隊列:
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 從連接中創建通道,使用通道才能完成消息相關的操作Channel channel = connection.createChannel();// 聲明(創建)隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息內容String message = "Hello World!";// 向指定的隊列中發送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關閉通道和連接channel.close();connection.close();} }4.3 、接收端:建立連接,獲取通道,聲明隊列,申請隊列的一個消費者(內含監聽消息的方法),在通道線路上接收消息。
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 創建通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 監聽隊列,第二個參數:是否自動進行消息確認。channel.basicConsume(QUEUE_NAME, true, consumer);} }4.4 消息接收的手動確認:
DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手動進行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 監聽隊列,第二個參數false,手動進行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);5、work消息模型
5.1 消息發送:
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循環發布任務for (int i = 0; i < 50; i++) {// 消息內容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 關閉通道和連接channel.close();connection.close();} }5.2 消息接收:
6、訂閱模型分類:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者獲取的目的
X(Exchanges):交換機一方面:接收生產者發送的消息。另一方面:知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
Exchange類型有以下幾種:
? Fanout:廣播,將消息交給所有綁定到交換機的隊列
? Direct:定向,把消息交給符合指定routing key 的隊列
? Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
我們這里先學習
? Fanout:即廣播模式
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
6.1 fanout:也就是交換機廣播消息
在廣播模式下,消息發送流程是這樣的:
- 1) 可以有多個消費者
- 2) 每個消費者有自己的queue(隊列)
- 3) 每個隊列都要綁定到Exchange(交換機)
- 4) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
- 5) 交換機把消息發送給綁定過的所有隊列
- 6) 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
6.1.1 發送消息:建立連接,獲取通道,聲明交換機,發送消息到交換機
public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息內容String message = "Hello everyone";// 發布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生產者] Sent '" + message + "'");channel.close();connection.close();} }6.1.2 接受消息:建立連接,獲取通道,聲明隊列,綁定到交換機,定義消費者(包含監聽),監聽通道。
//消費者1 public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監聽隊列,自動返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }6.2 direct:相當于定向投放
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
6.2.1 發送消息: 交換機類型指定為direct,發送消息時指定routing key
發送消息的RoutingKey分別是:insert、update、delete
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息內容String message = "商品新增了, id = 1001";// 發送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服務:] Sent '" + message + "'");channel.close();connection.close();} }6.2.2 接收消息: 獲取通道,聲明隊列,綁定交換機,指定routing key
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }6.3 Topic:交換機采用通配符的方式和隊列匹配
Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!
Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert
通配符規則:
? #:匹配一個或多個詞
? *:匹配不多不少恰好1個詞
6.3.1 發送消息: 指定交換機類型,發送消息攜帶route key
public class Send {private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息內容String message = "新增商品 : id = 1001";// 發送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());System.out.println(" [商品服務:] Sent '" + message + "'");channel.close();connection.close();} }6.3.2 接收消息: 聲明隊列,綁定交換機攜帶帶通配符的route key
public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_2";private final static String EXCHANGE_NAME = "topic_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者2] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }7、持久化
7.1 交換機持久化
7.2 隊列持久化
7.3 消息持久化
8、spring和MQ整合
8.1 設置接收消息的監聽器: 通過注解@RabbitListener實現,綁定隊列,交換機,和route-key
@Component public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "true"),exchange = @Exchange(value = "spring.test.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"#.#"}))public void listen(String msg){System.out.println("接收到消息:" + msg);}}8.2接收消息:通過AmqpTemplate實現
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class MqDemo {@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void testSend() throws InterruptedException {String msg = "hello, Spring boot amqp";this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);// 等待10秒后再結束Thread.sleep(10000);} }總結
以上是生活随笔為你收集整理的SpringMQ的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Activiti的使用技巧
- 下一篇: D3js(六):支持css的toolti