rabbitmq简单收发服务搭建
消息發送、接收簡單代碼示例
mq.xml//rabbitmq config spring.rabbitmq.host=ip:port spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.virtual-host=//發送隊列 send.exchange.name= send.queue.name=//接收listen.queue.name.system=@Configuration public class AmqpConfig {@Value("${spring.rabbitmq.host}")private String address;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(address);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true); //必須要設置、消息發送確認return connectionFactory;}/*** 常用spring為singleton單例模式,此處mq消息需將其改為非單例模式*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必須是prototype類型public RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}}//消息發送 @Component public class SystemMqMessageSender {private static final Logger logger = LoggerFactory.getLogger(SystemMqMessageSender.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@Value("${send.exchange.name}")private String exchangeSystem;@Value("${send.queue.name}")private String queueSystem;@Resourceprivate RabbitAdmin rabbitAdmin;public void sendMessage(EventModel eventModel) {String message = JsonUtils.json(eventModel);logger.info("發送消息:{}", message);rabbitTemplate.convertAndSend(exchangeSystem, queueSystem, message);}//聲明持久化隊列,并綁定到exchange上@Beanpublic Binding bindingExchangeSystem() {Queue queue = QueueBuilder.durable(queueSystem).build();//隊列持久化rabbitAdmin.declareQueue(queue);//聲明隊列DirectExchange exchange = (DirectExchange) ExchangeBuilder.directExchange(exchangeSystem).build();rabbitAdmin.declareExchange(exchange);//創建路由Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();//綁定路由rabbitAdmin.declareBinding(binding);return binding;}}//消息接收 @Component @RabbitListener(queues = "${listen.queue.name.system}") public class SystemMessageListener extends BaseListener implements EventModelConsumer,InitializingBean {private static final Logger logger = LoggerFactory.getLogger(SystemMessageListener.class);@Value("${listen.queue.name.system}")private String queueName;@RabbitHandlerpublic void process(String message) {//監聽消息logger.info("接收到消息:{}", message);processMessage(message, queueName);}public void processMessage(String content, String queueName) {//業務處理} }rabbitmq如何保證高可用呢
答案是消息應答機制,一下是rabbitmq消息應答機制的原文:
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It's time to set this flag to false and send a proper acknowledgment from the worker, once we're done with a task.
執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ就可以刪除它了。
如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。
沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
消息應答是默認打開的。我們通過顯示的設置autoAsk=true關閉這種機制?,F即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。
轉載于:https://www.cnblogs.com/canmeng-cn/p/8543113.html
總結
以上是生活随笔為你收集整理的rabbitmq简单收发服务搭建的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ubuntu16.04下Hadoop的本
- 下一篇: 从零开始学PowerShell(5)自定