获取rabbitmq连接对象_RabbitMQ——简单队列
RabbitMQ 簡述#
RabbitMQ是一個消息代理:它接受并轉發消息。 您可以將其視為郵局:當您將要把寄發的郵件投遞到郵箱中時,您可以確信Postman 先生最終會將郵件發送給收件人。 在這個比喻中,RabbitMQ是一個郵箱,郵局和郵遞員,用來接受,存儲和轉發二進制數據塊的消息。
隊列就像是在RabbitMQ中扮演郵箱的角色。 雖然消息經過RabbitMQ和應用程序,但它們只能存儲在隊列中。 隊列只受主機的內存和磁盤限制的限制,它本質上是一個大的消息緩沖區。 許多生產者可以發送到一個隊列的消息,許多消費者可以嘗試從一個隊列接收數據。
producer即為生產者,用來產生消息發送給隊列。consumer是消費者,需要去讀隊列內的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個主機上;確實在大多數應用程序中它們是這樣分布的。
簡單隊列#
簡單隊列是最簡單的一種模式,由生產者、隊列、消費者組成。生產者將消息發送給隊列,消費者從隊列中讀取消息完成消費。
在下圖中,“P”是我們的生產者,“C”是我們的消費者。 中間的框是隊列 - RabbitMQ代表消費者的消息緩沖區。
java 方式#
生產者#
Copypackage com.anqi.mq.nat;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class MyProducer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel Channel channel = connection.createChannel(); //實際場景中,消息多為json格式的對象 String msg = "hello"; //4. 發送三條數據 for (int i = 1; i <= 3 ; i++) { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Send message" + i +" : " + msg); } //5. 關閉連接 channel.close(); connection.close(); }}Copy /** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException; /** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;消費者#
Copypackage com.anqi.mq.nat;import com.rabbitmq.client.*;import java.io.IOException;public class MyConsumer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 創建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創建 Channel Channel channel = connection.createChannel(); //4. 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /* true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經成功消費 false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,如果消費者一 直沒有反饋,那么該消息將一直處于不可用狀態,并且服務器會認為該消費者已經掛掉,不會再給其發送消息, 直到該消費者反饋。 */ //5. 創建消費者并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(QUEUE_NAME, true, consumer); }}CopySend message1 : helloSend message2 : helloSend message3 : hello [*] Waiting for messages. To exit press CTRL+C [x] Received 'hello' [x] Received 'hello' [x] Received 'hello'當我們啟動生產者之后查看RabbitMQ管理后臺可以看到有一條消息正在等待被消費。
當我們啟動消費者之后再次查看,可以看到積壓的一條消息已經被消費。
總結#
- 隊列聲明queueDeclare的參數:第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數。
- basicConsume的第二個參數autoAck: 應答模式,true:自動應答,即消費者獲取到消息,該消息就會從隊列中刪除掉,false:手動應答,當從隊列中取出消息后,需要程序員手動調用方法應答,如果沒有應答,該消息還會再放進隊列中,就會出現該消息一直沒有被消費掉的現象。
- 這種簡單隊列的模式,系統會為每個隊列隱式地綁定一個默認交換機,交換機名稱為" (AMQP default)",類型為直連 direct,當你手動創建一個隊列時,系統會自動將這個隊列綁定到一個名稱為空的 Direct 類型的交換機上,綁定的路由鍵 routing key 與隊列名稱相同,相當于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實例沒有顯式聲明交換機,但是當路由鍵和隊列名稱一樣時,就會將消息發送到這個默認的交換機中。這種方式比較簡單,但是無法滿足復雜的業務需求,所以通常在生產環境中很少使用這種方式。
- The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認交換機隱式綁定到每個隊列,其中路由鍵等于隊列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。? ——引自 RabbitMQ 官方文檔?
spring-amqp方式#
引入 Maven 依賴
Copy com.rabbitmq amqp-client 5.6.0org.springframework.amqp spring-rabbit 2.1.5.RELEASEspring 配置文件
Copy使用測試
Copyimport org.springframework.amqp.core.AmqpTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Main { public static void main(String[] args) { ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml"); AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("MY-QUEUE", "Item"); String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE"); System.out.println(msg); }}參考方法
Copy/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;/** * Receive a message if there is one from a specific queue and convert it to a Java * object. Returns immediately, possibly with a null value. * * @param queueName the name of the queue to poll * @return a message or null if there is none waiting * @throws AmqpException if there is a problem */@NullableObject receiveAndConvert(String queueName) throws AmqpException;作者: 海向
出處:https://www.cnblogs.com/haixiang/p/10826710.html
本站使用「CC BY 4.0」創作共享協議,轉載請在文章明顯位置注明作者及出處。
總結
以上是生活随笔為你收集整理的获取rabbitmq连接对象_RabbitMQ——简单队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 观念什么意思_观念真不是凭空出现的,也不
- 下一篇: python 数据库 实战_干货!pyt