RabbitMQ入门篇、介绍RabbitMQ常用的五种模式
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ入门篇、介绍RabbitMQ常用的五种模式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
RabbitMQ
認識RabbitMQ
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。
注意事項:
RabbitMQ六種工作模式
RabbitMQ官網6種工作模式的介紹:https://www.rabbitmq.com/getstarted.html
RabbitMQ常用的工作模式使用
以下工作模式代碼中共用代碼
pom.xml使用到的依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version> </dependency> <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version> </dependency>工具的封裝(下面的工作模式中都用到了)
public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic"; } import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class RabbitUtils {private static ConnectionFactory factory = new ConnectionFactory();static {factory.setHost("39.105.91.158");factory.setPort(5672);factory.setUsername("jack");factory.setPassword("123456");factory.setVirtualHost("/test");}public static Connection getConnection(){Connection conn = null;try {conn = factory.newConnection();return conn;} catch (Exception e) {throw new RuntimeException(e);}} }Hello World模式
代碼
生產者
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//TCP 物理連接Connection conn= RabbitUtils.getConnection();//創建通信“通道”,相當于TCP中的虛擬連接Channel channel = conn.createChannel();//創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列//參數1:隊列名稱ID//參數2:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失//參數3:是否隊列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用,其他消費者不讓訪問//參數4:是否自動刪除,false代表連接停掉后不自動刪除掉這個隊列//參數5: nullchannel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//四個參數//參數1:exchange 交換機,暫時用不到,在后面進行發布訂閱時才會用到//參數2:隊列名稱//參數3:額外的設置屬性//參數4:最后一個參數是要傳遞的消息字節數組String message = "helloworld!";channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());channel.close();conn.close();System.out.println("發送數據成功");} }消費者
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {Connection conn= RabbitUtils.getConnection();//創建通道Channel channel = conn.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);//參數1:指定消費者要消費的隊列//參數2:代表是否自動確認收到消息,false代表手動編程來確認消息,這是MQ的推薦做法//參數3:第三個參數要傳入DefaultConsumer的實現類channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));} }class Reciver extends DefaultConsumer{private Channel channel;//重寫構造函數,Channel通道對象需要從外層傳入,在handleDelivery中要用到public Reciver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {/*super.handleDelivery(consumerTag,envelope,properties,body);*/String messageBody = new String(body);System.out.println("消費者接收到:" + messageBody);//簽收消息,確認消息//envelope.getDeliveryTag() 獲取這個消息的TagId//false只確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息channel.basicAck(envelope.getDeliveryTag() , false);} }Work Queue模式
介紹
代碼
消息對象
public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }生產者
import com.google.gson.Gson; import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** OrderSystem作為訂單消息的生產者* SMSSender1、SMSSender2、SMSSender3 這3個不同的消費者從同一個隊列中消費不同的消息*/ public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);for(int i = 0 ; i <= 100 ; i++) {SMS sms = new SMS("乘客" + i, "13900000" + i, "您的車票已預訂成功");String jsonSMS = new Gson().toJson(sms);channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());}System.out.println("發送數據成功");channel.close();connection.close();} }消費者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender1作為訂單消息的生產者1*/ public class SMSSender1 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者//basicQos,MQ不再對消費者一次發送多個請求,而是每次消費者處理完一個消息后(確認后),再從隊列中獲取一個新的//處理完一個取一個(這樣性能比較好的服務器可以多消費消息)channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1-短信發送成功:" + jsonSMS);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});} }消費者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender2作為訂單消息的生產者2*/ public class SMSSender2 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender2-短信發送成功:" + jsonSMS);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});} }消費者3
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;/*** SMSSender3作為訂單消息的生產者3*/ public class SMSSender3 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender3-短信發送成功:" + jsonSMS);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag() , false);}});} }發布(Publish)/訂閱(Subscribe)模式
介紹
使用場景
發布訂閱模式因為所有消費者獲得相同的消息,所以特別適合“數據提供商與應用商“。
例如:中國氣象局提供“天氣預報”送入交換機,網 易、新浪、百度、搜狐等門戶接入通過隊列綁定到該交換機,自動獲取氣象局推送的氣象數據。
代碼
生產者
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;/*** WeatherBureau作為消息生產者,將消息發布到交換機weather中* Baidu和Sina創建自己的隊列并與交換機weather進行綁定*/ public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();String input = new Scanner(System.in).next();Channel channel = connection.createChannel();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());channel.close();connection.close();} }消費者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名//參數2:交互機名//參數3:路由key(暫時用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }消費者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由key(暫時用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }路由Routing模式
介紹
代碼
生產者
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中國河北石家莊20991011天氣數據");area.put("china.shandong.qingdao.20991011", "中國山東青島20991011天氣數據");area.put("china.henan.zhengzhou.20991011", "中國河南鄭州20991011天氣數據");area.put("us.cal.la.20991011", "美國加州洛杉磯20991011天氣數據");area.put("china.hebei.shijiazhuang.20991012", "中國河北石家莊20991012天氣數據");area.put("china.shandong.qingdao.20991012", "中國山東青島20991012天氣數據");area.put("china.henan.zhengzhou.20991012", "中國河南鄭州20991012天氣數據");area.put("us.cal.la.20991012", "美國加州洛杉磯20991012天氣數據");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二個參數相當于數據篩選的條件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();} }消費者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }消費者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }主題Topic模式
介紹
代碼
生產者
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中國河北石家莊20991011天氣數據");area.put("china.shandong.qingdao.20991011", "中國山東青島20991011天氣數據");area.put("china.henan.zhengzhou.20991011", "中國河南鄭州20991011天氣數據");area.put("us.cal.la.20991011", "美國加州洛杉磯20991011天氣數據");area.put("china.hebei.shijiazhuang.20991012", "中國河北石家莊20991012天氣數據");area.put("china.shandong.qingdao.20991012", "中國山東青島20991012天氣數據");area.put("china.henan.zhengzhou.20991012", "中國河南鄭州20991012天氣數據");area.put("us.cal.la.20991012", "美國加州洛杉磯20991012天氣數據");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二個參數相當于數據篩選的條件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();} }消費者1
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于將隊列與交換機綁定//參數1:隊列名 參數2:交互機名 參數三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//channel.queueUnbind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20991011");//*.hebei.*.*channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }消費者2
import com.itlaoqi.rabbitmq.utils.RabbitConstant; import com.itlaoqi.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到氣象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});} }Spring與RabbitMQ整合Exchange模式
代碼
生產者和消費者共用的依賴:pom.xml文件的依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version> </dependency> <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.5.RELEASE</version> </dependency>生產者端的代碼和配置
spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 設置連接工廠,配置基本參數 --><rabbit:connection-factory id="connectionFactory"host="127.0.0.1"port="5672"username="guest"password="guest"virtual-host="/test" /><!--fanout-exchange | direct-exchange | topic-exchange聲明一個名為topicExchange的topic交換機,如果這個交換機不存在,則自動創建--><rabbit:topic-exchange name="topicExchange" auto-declare="true"></rabbit:topic-exchange><!-- Spring為我們封裝了RabbitTemplate對象來簡化生產者發送數據的過程,對常用的方法進行了封裝。 --><rabbit:template id="template" connection-factory="connectionFactory" exchange="topicExchange"></rabbit:template><!--在生產者中配置template對象,用于發送數據--><bean id="newsProducer" class="com.itlaoqi.rabbit.exchange.NewsProducer"><property name="rabbitTemplate" ref="template"/></bean><!-- RabbitAdmin對象用于創建、綁定、管理隊列與交換機 --><rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/> </beans>消息對象News
import java.io.Serializable; import java.util.Date;public class News implements Serializable{private String source;private String title;private Date createTime;private String content;public News(String source, String title, Date createTime, String content) {this.source = source;this.title = title;this.createTime = createTime;this.content = content;}public String getSource() {return source;}public void setSource(String source) {this.source = source;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public String getContent() {return content;}public void setContent(String content) {this.content = content;} }消息生產者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;import java.util.Date;public class NewsProducer {private RabbitTemplate rabbitTemplate = null;public RabbitTemplate getRabbitTemplate() {return rabbitTemplate;}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendNews(String routingKey , News news){//convertAndSend 用于向exchange發送數據//第一個參數是routingkey//第二個參數是要傳遞的對象,可以是字符串、byte【】或者任何實現了【序列化接口】的對象rabbitTemplate.convertAndSend(routingKey , news);System.out.println("新聞發送成功");}public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");NewsProducer np = (NewsProducer)ctx.getBean("newsProducer");np.sendNews("us.20190101" , new News("新華社" , "特朗普又又又退群啦" , new Date() , "國際新聞內容"));np.sendNews("china.20190101" , new News("鳳凰TV" , "XXX企業榮登世界500強" , new Date() , "國內新聞內容"));} }消費者端的代碼和配置
spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory"host="39.105.91.158"port="5672"username="jack"password="123456"virtual-host="/test"/><rabbit:admin connection-factory="connectionFactory"/><!--創建隊列--><rabbit:queue name="topicQueue" auto-declare="true" auto-delete="false" durable="false" exclusive="false"/><!--交換機與隊列綁定,并指明篩選條件--><rabbit:topic-exchange name="topicExchange" auto-declare="true"><rabbit:bindings><rabbit:binding queue="topicQueue" pattern="us.*"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--啟動消費者后,Spring底層自動監聽對應的topicQueue數據,一旦有新的消息進來,自動傳入到consumer Bean的recv的News參數中,之后再程序對News進一步處理--><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="consumer" method="recv" queue-names="topicQueue"/></rabbit:listener-container><bean id="consumer" class="com.itlaoqi.rabbitmq.NewsConsumer"></bean> </beans>消息消費者
package com.itlaoqi.rabbitmq;import com.itlaoqi.rabbit.exchange.News; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;public class NewsConsumer {public void recv(News news){System.out.println("接收到最新新聞:" + news.getTitle() + ":" + news.getSource());}public static void main(String[] args) {//初始化IOC容器,加載spring的配置文件后,就會創建配置文件中配置的隊列 // 并消費與之綁定的交換機里面的消息ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");} }SpringBoot2與RabbitMQ整合
總結
以上是生活随笔為你收集整理的RabbitMQ入门篇、介绍RabbitMQ常用的五种模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2017282110258--高级软件工
- 下一篇: 子平真诠释疑笔记(六)