二.java下使用RabbitMQ实现hello world
上一篇文章介紹了windows環(huán)境下的安裝和配置rabbitMQ,具體戳這邊,一.windows環(huán)境下rabbit的的安裝和配置。
現(xiàn)在我們可以著手編寫hello world程序了,一窺RabbitMQ的效用,從rabbitmq的官網(wǎng)的get start進(jìn)入rabbitMQ文檔學(xué)習(xí)區(qū),即這個頁面https://www.rabbitmq.com/getstarted.html。
由于網(wǎng)上關(guān)于rabbitMQ的中文材料和教程不是很多,所以只好硬著頭皮看官網(wǎng)文檔了。
可以看到官網(wǎng)主要從6個步驟來介紹學(xué)習(xí)軌跡,并且每個步驟均有多種編程語言的版本。由于本人采用的是java語言,所以就從一個java版本的hello world開始rabbitMQ的學(xué)習(xí)吧。
一.Introduction(簡介)
1.可以將RabbitMQ理解為一個消息代理,它接收、存儲、和分發(fā)數(shù)據(jù)信息。
2.RabbitMQ主要由三個元素組成,producer(生產(chǎn)者),隊(duì)列(queue),和消費(fèi)者(Consumer).
3.生產(chǎn)者生產(chǎn)消息,隊(duì)列存儲消息,消費(fèi)者接收消息。他們之間的關(guān)系是多對多的,即多個生產(chǎn)者可以向一個隊(duì)列中存放消息,多個消費(fèi)者可以從一個隊(duì)列中獲取消息。
4.值得注意的是,RabbitMQ代理器和生產(chǎn)者、消費(fèi)者并不需要在同一個服務(wù)器上,他們可以是分布式的。
?
二.hello world
現(xiàn)在我們可以進(jìn)入正題,用RabbitMQ來寫一個hello world 的demo,以對RabbitMQ這個中間件有個直觀的認(rèn)識。
在這個demo中,我們將編寫兩個類,一個是生產(chǎn)者類,一個是消費(fèi)者類,生產(chǎn)者類負(fù)責(zé)發(fā)送一個簡單的message,而消費(fèi)者類負(fù)責(zé)接收這個消息并且打印出來。
1.首先添加maven依賴包,如下。
<!-- rabbitMQ --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.0.2</version></dependency>2.新建Send類,如下所示。
package com.xdx.learn;import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message="hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message+"'");channel.close();connection.close();}}
運(yùn)行上述代碼,報錯如下。
這是因?yàn)槲艺粘倬W(wǎng)的代碼,官網(wǎng)的demo是基于本地的連接,而我是遠(yuǎn)程連接,所以必須顯式地指定連接端口,用戶名,密碼之類的信息,修改上述代碼,修改后如下。
import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");//服務(wù)器ipfactory.setPort(5672);//端口factory.setUsername("xdx");//登錄名factory.setPassword("xxxxxx");//密碼Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message="hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Sent '"+message+"'");channel.close();connection.close();} }
然后再運(yùn)行,這次可以運(yùn)行成功了。
然后我們?nèi)abbitMQ的管理后臺,就可以看到隊(duì)列中有一個queue了,名字就叫做hello。如下圖所示。
如果我再執(zhí)行以下剛才那段代碼,就會發(fā)現(xiàn)messages的數(shù)量又多了一個,如下所示。
?
3.接下來是Recv.java類,用于接收消息,不同意發(fā)布消息的類,接收消息的類必須一直保持運(yùn)行的狀態(tài),以便監(jiān)聽消息的到來。
package com.xdx.learn;import java.io.IOException; import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope;public class Recv {private final static String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {//下面的配置與生產(chǎn)者相對應(yīng)ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.1.195");//服務(wù)器ipfactory.setPort(5672);//端口factory.setUsername("xdx");//登錄名factory.setPassword("xxxx");//密碼Connection connection=factory.newConnection();//連接Channel channel=connection.createChannel();//頻道channel.queueDeclare(QUEUE_NAME, false, false, false, null);//隊(duì)列System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//defaultConsumer實(shí)現(xiàn)了Consumer,我們將使用它來緩存生產(chǎn)者發(fā)送過來儲存在隊(duì)列中的消息。當(dāng)我們可以接收消息的時候,從中獲取。Consumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");}};//接收到消息以后,推送給RabbitMQ,確認(rèn)收到了消息。channel.basicConsume(QUEUE_NAME, true, consumer);}}
運(yùn)行結(jié)果如下:
此時我們再去RabbitMQ的控制臺查看,發(fā)現(xiàn)hello隊(duì)列中已經(jīng)沒有message了。
? 注意到消費(fèi)者的代碼,有一個實(shí)現(xiàn)了DefaultConsumer接口的Consumer對象,去查看Consumer的源碼,我們可以知道它的handleDelivery方法被一個一直存在的線程(該線程不是Connection所在的線程)調(diào)用,當(dāng)有消息的時候,就會被執(zhí)行。
以上就是一個簡單的生產(chǎn)者和消費(fèi)者的例子,其實(shí)RabbitMQ在這個過程中充當(dāng)了一個消息存儲器的角色,它負(fù)責(zé)接收,分配消息,而發(fā)送,接收消息的工作由我們編程來實(shí)現(xiàn)。經(jīng)過這個例子,我們對RabbitMQ有了一個直觀的簡單的理解。更多的細(xì)節(jié)將在下面的文章中來學(xué)習(xí)。
轉(zhuǎn)載于:https://www.cnblogs.com/roy-blog/p/8023791.html
總結(jié)
以上是生活随笔為你收集整理的二.java下使用RabbitMQ实现hello world的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 打包下载文件_java下载打包
- 下一篇: 修改java启动参数_如何修改jvm启动