producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)
有序消息
消息有序指的是可以按照消息的發送順序來消費。
RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(queue)順序。
順序消息生產者
public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest2", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 發送消息時,需要實現MessageQueueSelector , 用來選擇合適的queueSendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}上面實現的順序消息時,通過orderId來進行順序消息,同一個訂單ID的消息,發送到同一個Queue里面
順序消息消費者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");// 設置NameServer地址consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}需要注意,registerMessageListener 注冊的消息監聽器 , 需要使用MessageListenerOrderly , ConsumeOrderlyContext , 不可以使用
MessageListenerConcurrently , ConsumeConcurrentlyContext , 否則消費的順序無法保證。
源碼分析
/*** @param msg 消息* @param selector 消息隊列選擇器* @param arg 分片值 (類似分庫分表里面的分片鍵)*/ @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg); }實際發送
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);}private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 1. 獲取topic信息,TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 2. 獲取當前topic的內部隊列信息List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 復制一個消息Message userMessage = MessageAccessor.cloneMessage(msg);// topic信息String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//3. 獲取消息隊列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 獲取到隊列了,執行發送消息, 跟普通消息的發送一樣的return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}步驟說明:
總結: 順序消息的核心就是將你希望按照順序的消息,通過某種特定的條件,計算發送到對應的隊列里面去。
順序消息的缺點:
思考: 通過上面那種順序消息的模式,在broker發生宕機 , 隊列數量發生變化時,會造成消費亂序
比如在多master集群的情況下 ,
topic: TP_TEST 總共8個隊列MASTER-1 : 1,2,3,4 MASTER-2 : 5,6,7,8一個topic分別在多個master上面有隊列, 如果其中一個master宕機了,那么隊列數會變成4個,那么順序消息通過 orderId % queueSize 的這種方式,會造成原來往一個隊列里面發送的,會發送到另外一個隊列里面去,造成消費亂序。
所以如果是要嚴格的順序消息,則不要使用rocketMq, 在極端情況下會造成消費亂序。
http://weixin.qq.com/r/eC-YwJDE7s2RrdSj93pq (二維碼自動識別)
總結
以上是生活随笔為你收集整理的producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 轻微干眼症是什么样的
- 下一篇: 牙周炎可以喝冰水吗