1 简单队列
目錄
- 1、獲取MQ連接
- 2、發(fā)送消息
- 3、消費者舊API
- 4、消費者新API
采用老的API實現(xiàn)的,所謂的簡單隊列就是一個消費者一個生產(chǎn)者,是1:1的關系
- P :消息的生產(chǎn)者
- 紅色的:隊列
- C:消費者
1、獲取MQ連接
public class MqConnectionUtil {private static final Logger log =LoggerFactory.getLogger(RedisUtil.class);public static Connection getConnection() {Connection connection = null;ConnectionFactory factory = new ConnectionFactory();// 用戶名factory.setUsername("guest");// 密碼factory.setPassword("guest'");// 服務器地址factory.setHost("47.*.*.9");// 端口號,也就是AMQPfactory.setPort(5672);// 數(shù)據(jù)庫 “/”代表所有的數(shù)據(jù)庫factory.setVirtualHost("/");try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {log.error("RabbitMQ connection create failed!!");e.printStackTrace();}return connection;} }2、發(fā)送消息
public static void main(String[] args) throws IOException, TimeoutException {// 獲取連接Connection connection = MqConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 創(chuàng)建隊列申明channel.queueDeclare(QUENU_NAME,false,false,false,null);channel.basicPublish("",QUENU_NAME,null,"hello zhaodi".getBytes());channel.close();connection.close();}3、消費者舊API
這段代碼中的許多方法已經(jīng)被JAVA棄用,但是目前我們作為學習的使用
public static void main(String[] args) throws IOException, InterruptedException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定義消費者QueueingConsumer consumer = new QueueingConsumer(channel);//監(jiān)聽隊列channel.basicConsume(QUENU_NAME,true,consumer);while(true) {Delivery delivery = consumer.nextDelivery();System.out.println("收到的消息 :"+new String(delivery.getBody()));}}4、消費者新API
public class Consumer2 {private static final String QUEUE_NAME = "my-simple-queue";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = MqConnectionUtil.getConnection();Channel channel = connection.createChannel();// 申明隊列 如果該隊列已經(jīng)存在/或者生產(chǎn)者已經(jīng)申明了在這里就不需要再次申明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定義消費者DefaultConsumer consumer = new DefaultConsumer(channel){/***獲取到到達的消息,觸發(fā)回調(diào)事件**/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println("接受的消息是:"+msg);}};//監(jiān)聽隊列channel.basicConsume(QUEUE_NAME,true,consumer);} }轉(zhuǎn)載于:https://www.cnblogs.com/zhaod/p/11388994.html
總結
- 上一篇: 从零写一个编译器(十二):代码生成之生成
- 下一篇: 2 工作队列