RabbitMQ 学习一 了解+点对点模式
消息隊列
1、了解
RabbitMQ:
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協議來實現。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
消息中間件 用于消息的接收 基于AMPQ協議,具有高度的一致性,安全性,跨平臺性等。
生產者,RabbitMQ,消費者:
生產者將消息放在消息中間件的交換機中,交換機對應著消息隊列,消費者從消息隊列中取出生產者所存放的消息。實現消息的分發。消費者與消息隊列綁定。
點對點的模式中是將消息放在默認的交換機中。
2、安裝
盡量不要安裝在windows中,安裝在虛擬機中,可以直接下載安裝包,也可以使用docker拉取。
docker拉取:先查詢是否有該鏡像
docker search rabbitmq:management
有則可以拉取
docker pull rabbitmq:management
然后創建容器并發布
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
命令中的【RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin】是web管理平臺的用戶名和密碼
【 -p 15672:15672】 是控制平臺docker映射到系統的對應端口
【 -p 5672:5672】 是應用程序的訪問端口
http://自己的虛擬機ip:15672
3、使用說明:
首先要新建一個虛擬主機,然后給虛擬主機綁定用戶,通過生產者發送消息時,要連接到server,連接到server中對應的某一個虛機主機,通過具體的用戶名密碼,緊接著才可以把消息發布給交換機或者消息隊列中,進而消費者才能去消息隊列中消費消息(消費者也需要連接到rabbitMQ中的server,以及它對應的虛擬主機),當生產者發送完消息后,它就可以進行別的工作了,它與消費者是完全解耦的,消費者只需要去消息隊列中查是否有對應的消息即可,當需要再次發送消息時,是需要重新建立連接即可。
虛擬主機就類似于數據庫中的庫。
學習時,生產者可以使用Test環境進行測試,而消費不行,因為消費者需要一直監聽隊列中的信息,若使用Test,那么或許當消費崗剛消費了這個消息時,線程控制權已經失去了,那么它便沒有可能去對body做處理。
所以要放在main函數中。
點對點模式“HelloWorld”
引入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
需要一個消息提供者,一個消息消費者和RabbitMQ
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
/**
* 點對點 只能有一個消費者 業務場景 用戶注冊進行短信驗證時,短信驗證部分可以交由消費這邊的系統去完成
* @param args
* @throws IOException
* @throws TimeoutException
*/
//消費消息的代碼
public static void main(String[] args) throws IOException, TimeoutException {
//獲取虛擬主機的連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.235.130");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } } ); } }
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
//生產消息的代碼
@Test
public void testSendMessage() throws IOException, TimeoutException {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置連接rabbitmq的主機
connectionFactory.setHost("192.168.235.130");
//設置端口號
connectionFactory.setPort(5672);
//設置連接哪個虛擬主機
connectionFactory.setVirtualHost("/ems");
//設置訪問虛擬主機的用戶名密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
//獲取連接對象
Connection connection = connectionFactory.newConnection();
//通過連接獲取連接中的通道對象
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數一 隊列的名稱,不存在會自動創建
//參數二 用來定義隊列的特性 是否持久化 true持久化(不管是否重啟Rabbitmq,隊列都會存在,當RabbitMQ關閉時他會把隊列存在磁盤中)
//參數三 是否獨占隊列 true獨占 false可以被其他連接使用
//參數四 是否在消費完成后自動刪除隊列 true自動刪除 false 不會自動刪除
//參數五 附加參數
channel.queueDeclare("hello",false,false,false,null);
//發布消息
//參數一 交換機名稱
//參數二 隊列名稱
//參數三 傳遞消息的額外設置
//參數四 傳遞的消息
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
//關閉通道和連接
channel.close();
connection.close();
}
}
為了方便后續學習,可以將共同的代碼抽取出來
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Objects;
public class RabbitMQConnection {
//工廠是重量級的 每次都創建耗費資源 所以做一個靜態的成員 只創建一次
private static ConnectionFactory connectionFactory;
//具體賦值是在類加載時執行,只執行一次
static{
connectionFactory = new ConnectionFactory();
//屬性的賦值一旦確定下來很少變 所以也放在static中
connectionFactory.setHost("192.168.235.130");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}
//定義提供連接對象的方法
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//關閉通道和關閉連接的方法
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if(Objects.nonNull(channel))
{channel.close();}
if (Objects.nonNull(connection))
{connection.close();}
} catch (Exception e) {
e.printStackTrace();
}
}
}
總結
以上是生活随笔為你收集整理的RabbitMQ 学习一 了解+点对点模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: phpmyadmin出现1862错误怎么
- 下一篇: [HNOI2010 Planar平面图判