java 双工模式_rabbitMq-工作模式(双工模式)-java
P:代表生產者,向隊列中發送消息。
隊列:保存生產者提供的數據。(隊列的特點,先進先出)
C1,C2:代表消費者,負責將對列中的消息全部讀取,并且完成特定的任務。
工作原理:當生產者向對列中發送消息時,C1和C2一起爭搶消息的執行權,誰搶到,誰執行。內部有負載的策略。
[root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq
[root@bogon rabbitmq]# service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.er.
pom.xml
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.rabbit
schoolmanage
0.0.1-SNAPSHOT
war
com.rabbitmq
amqp-client
3.5.1
org.springframework.amqp
spring-rabbit
1.4.0.RELEASE
junit
junit
4.11
package schoolmanage;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class TestStudentMsgWork {
private Connection connection = null;
// 工作模式
private String queueName = "work";
@Before
public void init() throws IOException {
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 為工廠對象添加數據
// 遠程主機
factory.setHost("192.168.6.130");
// 端口號
factory.setPort(5672);
// 虛擬主機
factory.setVirtualHost("/school");
// 用戶名
factory.setUsername("student");
// 密碼
factory.setPassword("student");
// 創建連接
connection = factory.newConnection();
}
// 消息生產者
@Test
public void provider() throws IOException {
// 創建通道
Channel channel = connection.createChannel();
String msg = "我要吃飯!!!——工作模式";
// 創建隊列
/**
* String queue, 隊列的名稱 boolean durable,持久化 false表示不持久化,
* true表示rabbitmq重啟后會恢復隊列的內容 boolean exclusive,服務器所獨有,如果設置為true則消費者不能使用
* boolean autoDelete,是否自動刪除 當隊列中沒有消息時,該隊列是否自動刪除 Map
* arguments 額外的參數
*/
channel.queueDeclare(queueName, false, false, false, null);
/**
* String exchange, 交換機的名稱,如果需要交換機則添加名稱 如果沒有交換機 則為""串 String routingKey,
* 路由key 尋址的關鍵字,如果需要使用路由key定義特定的關鍵字(orderKey.xxx)
* 如果不需要路由key,在簡單模式中的添加隊列的名稱 BasicProperties props, 其他的配置,一般為null byte[]
* body,表示需要發送的消息-字節碼文件
*/
channel.basicPublish("", queueName, null, msg.getBytes());
// 將流關閉
channel.close();
System.out.println("消息發送成功!!!");
}
// 消費者1
@Test
public void consumer1() throws Exception {
// 創建通道
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
// 需要對消費個數進行定義
// 消費者允許有三次沒有響應給rabbitMQ,如果長時間沒有響應則不再允許再次消費消息
channel.basicQos(1);// 當前執行的總數,一個消費者一次只能拿到一個隊列
// 定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 將消費者與隊列進行綁定
/**
* autoAck為false表示手動返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消費者1,啟動。。。");
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消費者1收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq當前消費的是哪一個消息
/**
* multiple false表示不擴展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
// 消費者2
@Test
public void consumer2() throws Exception {
// 創建通道
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
// 需要對消費個數進行定義
// 消費者允許有三次沒有響應給rabbitMQ,如果長時間沒有響應則不再允許再次消費消息
channel.basicQos(1);// 當前執行的總數,一個消費者一次只能拿到一個隊列
// 定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 將消費者與隊列進行綁定
/**
* autoAck為false表示手動返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消費者2,啟動。。。");
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消費者2收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq當前消費的是哪一個消息
/**
* multiple false表示不擴展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@After
public void close() throws IOException {
connection.close();
}
}
執行結果
啟動consumer1
消費者1,啟動。。。
啟動consumer2
消費者2,啟動。。。
啟動provider
消息發送成功!!!
查看consumer1
消費者1收到 :我要吃飯!!!——工作模式
啟動provider
消息發送成功!!!
查看啟動consumer2
消費者2收到 :我要吃飯!!!——工作模式
總結
以上是生活随笔為你收集整理的java 双工模式_rabbitMq-工作模式(双工模式)-java的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java一次查询900w数据_一次SQL
- 下一篇: Java在电脑桌面的图标_电脑桌面图标都