RabbitMQ(一):Hello World程序
內容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環分發、消息確認、持久化、公平分發
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調隊列callback queue、關聯標識correlation id、實現簡單的RPC系統
RabbitMQ(七):常用方法說明 與 學習小結
介紹:
RabbitMQ是一個消息代理:它接受并轉發消息。你可以把它當成一個郵局:當你想郵寄信件的時候,你會把信件放在投遞箱中,并確信郵遞員最終會將信件送到收件人的手里。在這個例子中,RabbitMQ就相當與投遞箱、郵局和郵遞員。
RabbitMQ與郵局的區別在于:RabbitMQ并不處理紙質信件,而是接受、存儲并轉發二進制數據---消息。
談到RabbitMQ的消息,通常有幾個術語:
(1)生產者:是指發送消息的程序
(2)隊列:相當于RabbitMQ的投遞箱。盡管消息在RabbitMQ和你的應用之間傳遞,但是消息僅僅會在隊列之中存儲。隊列只能存儲在內存或磁盤中,本質上是一個大的消息緩沖區。不同的生產者可以發送消息到同一個對隊列,不同的消費者也可以從同一個隊列中獲取消息。
(3)消費者:等待接受消息的程序。
注意,生產者、消費者以及RabbitMQ并不一定要在同一個主機上,在絕大部分的應用中它們都不在同一主機上。
在開始教程之前,請確保:你已經安裝了RabbitMQ,并且在localhost上運行起來(默認端口5672)。如果你使用了不同的主機或端口,請在下文中的連接設置中
更改相應的參數。
一、Hello World:
在這一部分,我們將會使用Java編寫兩個小程序:一個發送單個消息的生產者、一個接受消息并打印出消息的消費者。這個消息就是Hello World。
下圖中,P代表生產者,C代表消費者,中間紅色的小箱子就代表隊列--RabbitMQ為了讓消費者收到消息而保持的消息緩沖區。
在這一部分,只需要引入Java客戶端依賴即可:amqp-client.jar,也可以通過maven的方式引入:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.1.0</version> </dependency>1、生產者:
我們將消息的發布者(生產者)命名為Send,將消息的消費者命名為Recv。發布者將會連接到RabbitMQ,并且發送一條消息,然后退出。
在Send.java中,首先引入相關類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;再定義隊列的名字:
private final static String QUEUE_NAME = "hello";然后,創建一個連接到Rabbit服務器的連接:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();上面的代碼中,connection是socket連接的抽象,為我們處理了通信協議版本協商以及認證等。這樣,我們就連接到了本地機器上的一個消息代理(broker)。如果想連接到其他機器上的broker,只要修改IP即可。
之后,我們又創建了一個通道(channel),大部分的API操作均在這里完成。
對于Send來說,必須指明消息要發到哪個隊列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");隊列的定義是冪等的,它僅僅在不存在時才會創建。消息的內容是一個字節數組,所以你可以隨意編碼(encode)。
最后,必須將通道和連接關閉。
channel.close(); connection.close();完整代碼
//引入相關Class文件 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {//定義隊列名字private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創建連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//為通道指明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發布消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");//關閉連接channel.close();connection.close();} }2、接收者(消費者):
消費者從RabbitMQ中取出消息。不同于發布者只發送一條消息就退出,這里我們讓消費者一直監聽消息,并把接受到的消息打印出來。
與Send.java類似,首先引入相關類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;上面引入的DefaultConsumer是Consumer接口的實現類,我們使用它來緩沖從服務器push來的消息。
接下來的設置與發布者類似,打開連接和通道,聲明我們想消費的隊列。注意,這里的隊列的名字要與發布者中聲明的隊列的名字一致。
注意,消費者同樣聲明了隊列。這是因為,我們可能在啟動生產者之前啟動了消費者應用,我們想確保在從一個隊列消費消息之前,這個隊列是存在的。
接下來,告訴服務器(RabbitMQ)把隊列中的消息發過來。因為這個過程是異步的,可以通過DefaultConsumer來進行回調。
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");} }; channel.basicConsume(QUEUE_NAME, true, consumer);Consumer的完整代碼如下:
package com.maxwell.rabbitdemo;import com.rabbitmq.client.*;import java.io.IOException;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明要消費的隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//回調消費消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);} }這樣,消費者就會一直監聽聲明的隊列。運行一次生產者(即Send.java中的main方法),消費者就會打印出接受到的消息。
?
說明:
①與原文略有出入,如有疑問,請參考原文。
②RabbitMQ的官方rabbitmq-tutorials的java示例中,amqp-client版本為3.5,我改為了4.1,否則后續的示例教程中會報錯說找不到文件。
總結
以上是生活随笔為你收集整理的RabbitMQ(一):Hello World程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java虚拟机:深入详细分析Java C
- 下一篇: RabbitMQ(二):Work Que