简单消息模型
五種消息模型
RabbitMQ提供了6種消息模型,但是第6種其實(shí)是RPC,并不是MQ,因此不予學(xué)習(xí)。那么也就剩下5種。
但是其實(shí)3、4、5這三種都屬于訂閱模型,只不過進(jìn)行路由的方式不同。
?
我們通過一個demo工程來了解下RabbitMQ的工作方式:
導(dǎo)入工程:
依賴:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.learn.rabbitmq</groupId><artifactId>learn-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.2.RELEASE</version></parent><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies> </project>我們抽取一個建立RabbitMQ連接的工具類,方便其他程序獲取連接:
public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置服務(wù)地址factory.setHost("192.168.56.101");//端口factory.setPort(5672);//設(shè)置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/learn");factory.setUsername("learn");factory.setPassword("learn");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;} }基本消息模型
官方介紹:
RabbitMQ是一個消息代理:它接受和轉(zhuǎn)發(fā)消息。 你可以把它想象成一個郵局:當(dāng)你把郵件放在郵箱里時,你可以確定郵差先生最終會把郵件發(fā)送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ與郵局的主要區(qū)別是它不處理紙張,而是接受,存儲和轉(zhuǎn)發(fā)數(shù)據(jù)消息的二進(jìn)制數(shù)據(jù)塊。
P(producer/ publisher):生產(chǎn)者,一個發(fā)送消息的用戶應(yīng)用程序。
C(consumer):消費(fèi)者,消費(fèi)和接收有類似的意思,消費(fèi)者是一個主要用來等待接收消息的用戶應(yīng)用程序
隊(duì)列(紅色區(qū)域):rabbitmq內(nèi)部類似于郵箱的一個概念。雖然消息流經(jīng)rabbitmq和你的應(yīng)用程序,但是它們只能存儲在隊(duì)列中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制,實(shí)質(zhì)上是一個大的消息緩沖區(qū)。許多生產(chǎn)者可以發(fā)送消息到一個隊(duì)列,許多消費(fèi)者可以嘗試從一個隊(duì)列接收數(shù)據(jù)。
總之:
生產(chǎn)者將消息發(fā)送到隊(duì)列,消費(fèi)者從隊(duì)列中獲取消息,隊(duì)列是存儲消息的緩沖區(qū)。
?
我們將用Java編寫兩個程序;發(fā)送單個消息的生產(chǎn)者,以及接收消息并將其打印出來的消費(fèi)者。我們將詳細(xì)介紹Java API中的一些細(xì)節(jié),這是一個消息傳遞的“Hello World”。
我們將調(diào)用我們的消息發(fā)布者(發(fā)送者)Send和我們的消息消費(fèi)者(接收者)Recv。發(fā)布者將連接到RabbitMQ,發(fā)送一條消息,然后退出。
生產(chǎn)者發(fā)送消息
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();// 從連接中創(chuàng)建通道,這是完成大部分API的地方。Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列,必須聲明隊(duì)列才能夠發(fā)送消息,我們可以把消息發(fā)送到隊(duì)列中。// 聲明一個隊(duì)列是冪等的 - 只有當(dāng)它不存在時才會被創(chuàng)建channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息內(nèi)容String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關(guān)閉通道和連接channel.close();connection.close();} }控制臺:
管理工具中查看消息
進(jìn)入隊(duì)列頁面,可以看到新建了一個隊(duì)列:simple_queue
點(diǎn)擊隊(duì)列名稱,進(jìn)入詳情頁,可以查看消息:
在控制臺查看消息并不會將消息消費(fèi),所以消息還在。
消費(fèi)者獲取消息
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監(jiān)聽,如果有消息的時候,會被自動調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 監(jiān)聽隊(duì)列,第二個參數(shù):是否自動進(jìn)行消息確認(rèn)。channel.basicConsume(QUEUE_NAME, true, consumer);} }控制臺:
這個時候,隊(duì)列中的消息就沒了:
我們發(fā)現(xiàn),消費(fèi)者已經(jīng)獲取了消息,但是程序沒有停止,一直在監(jiān)聽隊(duì)列中是否有新的消息。一旦有新的消息進(jìn)入隊(duì)列,就會立即打印.
總結(jié)