javascript
rabbitmq java集群_RabbitMQ集群整合SpringBoot2.x
RabbitMQ相信大家已經再熟悉不過了,作為業界四大主流消息中間件之一(Apache RocketMQ、Apache Kafka、Apache ActiveMQ、RabbitMQ),它具有非常好的性能和可靠性的集群模式,不僅僅在各大互聯網大廠中廣泛使用(比如同程藝龍、美團點評等),而且在互聯網金融行業也常常被作為首選!SpringBoot作為互聯網開發利器已經不需要我再過多介紹什么,接下來我們一起從零開始構建RabbitMQ、并且與SpringBoot2.x的整合吧!
1.安裝RabbitMQ集群十步走!(3.6.5版本、采用rpm安裝方式,我們要安裝3個節點的集群,192.168.11.71 192.168.11.72 192.168.11.73、以71節點為例:)第一步:下載所需依賴包
Rabbitmq基于erlang語言編寫,所以必須要有。
wget?www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
socat包為秘鑰包,如果沒有會提示缺少此依賴。
wget?http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5核心服務包。
wget?www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
第二步:安裝并配置
安裝命令:
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm安裝成功后:修改配置
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件
修改:loopback_users?中的?<>,只保留guest第三步:3個節點同時進行前兩步驟操作:(71、72、73)
第四步:啟停單點服務與開啟管控臺
/etc/init.d/rabbitmq-server start? |? stop?? |? status?? |??? restart
分別啟動三個節點,然后執行命令啟動控制臺插件如下:rabbitmq-plugins enable? rabbitmq_management最后使用?guest/guest登錄成功即可!第五步:接下來進行集群構建:
71、72、73任意一個節點為Master(這里選擇71為Master)
也就是說我們需要把71的Cookie文件同步到72、73節點上去,先停止所有服務器:/etc/init.d/rabbitmq-server stop
然后進入制定目錄(/var/lib/rabbitmq/)并遠程copy 文件到72、73節點,如下操作:scp /var/lib/rabbitmq/.erlang.cookie到192.168.11.72和192.168.11.73中
第六步:組成集群
首先啟動三個節點:rabbitmq-server -detached
然后把72和73分別加入到71中,組成集群 [--ram]為節點以什么方式加入到集群中
ram為內存 存儲 默認不加為disk磁盤存儲,操作如下:
node72:rabbitmqctl stop_app
node72:rabbitmqctl join_cluster [--ram] rabbit@bhz71
node72:rabbitmqctl start_app
第七步:修改集群名稱:
rabbitmqctl set_cluster_name rabbitmq_cluster1
第八步:查看集群狀態:
rabbitmqctl cluster_status?,如下所示表示集群構建OK!
第九步:構建鏡像隊列,任意節點執行命令如下:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
第十步:查看管控臺發現集群已經構建成功!
2.RabbitMQ與SpringBoot2.x整合
生產者端:第一步:pom.xml配置如下<?xml ?version="1.0"?encoding="UTF-8"?>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.bfxy
rabbitmq-springboot-producer
0.0.1-SNAPSHOT
jar
rabbitmq-springboot-producer
rabbitmq-springboot-producer
org.springframework.boot
spring-boot-starter-parent
2.0.2.RELEASE
UTF-8
UTF-8
1.8
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-maven-plugin
第二步:application.properties配置文件spring.rabbitmq.addresses=192.168.11.71:5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true??#confirm模式
spring.rabbitmq.publisher-returns=true???#return機制
spring.rabbitmq.template.mandatory=true??#與return機制結合配置次屬性第三步:編寫RabbitSender生產端代碼package?com.bfxy.springboot.producer;
import?java.util.Map;
import?org.springframework.amqp.rabbit.core.RabbitTemplate;
import?org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import?org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import?org.springframework.amqp.rabbit.support.CorrelationData;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.messaging.Message;
import?org.springframework.messaging.MessageHeaders;
import?org.springframework.messaging.support.MessageBuilder;
import?org.springframework.stereotype.Component;
@Component
public?class?RabbitSender?{
//自動注入RabbitTemplate模板類
@Autowired
private?RabbitTemplate?rabbitTemplate;
//回調函數:?confirm確認
final?ConfirmCallback?confirmCallback?=?new?RabbitTemplate.ConfirmCallback()?{
@Override
public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
System.err.println("correlationData:?"?+?correlationData);
System.err.println("ack:?"?+?ack);
if(!ack){
System.err.println("異常處理....");
}
}
};
//回調函數:?return返回
final?ReturnCallback?returnCallback?=?new?RabbitTemplate.ReturnCallback()?{
@Override
public?void?returnedMessage(org.springframework.amqp.core.Message?message,?int?replyCode,?String?replyText,
String?exchange,?String?routingKey)?{
System.err.println("return?exchange:?"?+?exchange?+?",?routingKey:?"
+?routingKey?+?",?replyCode:?"?+?replyCode?+?",?replyText:?"?+?replyText);
}
};
//發送消息方法調用:?構建Message消息
public?void?send(Object?message,?Map?properties)?throws?Exception?{
MessageHeaders?mhs?=?new?MessageHeaders(properties);
Message?msg?=?MessageBuilder.createMessage(message,?mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id?+?時間戳?全局唯一
CorrelationData?correlationData?=?new?CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1",?"springboot.abc",?msg,?correlationData);
}
}
消費者:第一步:pom文件同生產者一致
第二步:application.properties配置文件spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual?#手工簽收
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10第三步:RabbitRecever消費端代碼package?com.bfxy.springboot.conusmer;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.amqp.support.AmqpHeaders;
import?org.springframework.messaging.Message;
import?org.springframework.stereotype.Component;
import?com.rabbitmq.client.Channel;
@Component
public?class?RabbitReceiver?{
@RabbitListener(bindings?=?@QueueBinding(
value?=?@Queue(value?=?"queue-1",
durable="true"),
exchange?=?@Exchange(value?=?"exchange-1",
durable="true",
type=?"topic",
ignoreDeclarationExceptions?=?"true"),
key?=?"springboot.*"
)
)
@RabbitHandler
public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{
System.err.println("--------------------------------------");
System.err.println("消費端Payload:?"?+?message.getPayload());
Long?deliveryTag?=?(Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag,?false);
}
}
生產端測試:package?com.bfxy.springboot;
import?java.text.SimpleDateFormat;
import?java.util.Date;
import?java.util.HashMap;
import?java.util.Map;
import?org.junit.Test;
import?org.junit.runner.RunWith;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
import?org.springframework.test.context.junit4.SpringRunner;
import?com.bfxy.springboot.producer.RabbitSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public?class?ApplicationTests?{
@Test
public?void?contextLoads()?{
}
@Autowired
private?RabbitSender?rabbitSender;
private?static?SimpleDateFormat?simpleDateFormat?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS");
@Test
public?void?testSender1()?throws?Exception?{
Map?properties?=?new?HashMap<>();
properties.put("number",?"12345");
properties.put("send_time",?simpleDateFormat.format(new?Date()));
rabbitSender.send("Hello?RabbitMQ?For?Spring?Boot!",?properties);
}
}
總結
以上是生活随笔為你收集整理的rabbitmq java集群_RabbitMQ集群整合SpringBoot2.x的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java super this_Java
- 下一篇: java protobuf 例子_用 M