生活随笔
收集整理的這篇文章主要介紹了
分布式消息通信ActiveMQ原理-消费消息策略-笔记
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
消息消費流程圖
消費端消費消息的原理
- 我們通過上一節課的講解,知道有兩種方法可以接收消息,
- 一種是使用同步阻塞的MessageConsumer#receive方法。
- 另一種是使用消息監聽器MessageListener。
- 這里需要注意的是,在同一個session下,這兩者不能同時工作,
- 也就是說不能針對不同消息采用不同的接收方式。
- 否則會拋出異常。
- 至于為什么這么做,最大的原因還是在事務性會話中,兩種消費模式的事務不好管控
ActiveMQMessageConsumer.receive
public Message receive() throws JMSException {checkClosed();checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中sendPullCommand(0); //如果PrefetchSizeSize為0并且unconsumerMessage為空,則發起pull命令MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息if (md == null) {return null;}beforeMessageIsConsumed(md);afterMessageIsConsumed(md, false); //發送ack給到brokerreturn createActiveMQMessage(md);//獲取消息并返回}
sendPullCommand
- 發送pull命令從broker上獲取消息,前提是prefetchSize=0并且unconsumedMessages為空。
- unconsumedMessage表示未消費的消息,這里面預讀取的消息大小為prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {clearDeliveredList();if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {MessagePull messagePull = new MessagePull();messagePull.configure(info);messagePull.setTimeout(timeout);session.asyncSendPacket(messagePull); //向服務端異步發送messagePull指令}}
clearDeliveredList
- 在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,
- 主要用來清理已經分發的消息鏈表deliveredMessages
- deliveredMessages,存儲分發給消費者但還為應答的消息鏈表
- ? 如果session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來做重發
- ? 如果session是非事務的,根據ACK的模式來選擇不同的應答操作
private void clearDeliveredList() {if (clearDeliveredList) {synchronized (deliveredMessages) {if (clearDeliveredList) {if (!deliveredMessages.isEmpty()) {if (session.isTransacted()) {if (previouslyDeliveredMessages == null) {previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId());}for (MessageDispatch delivered : deliveredMessages) {previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);}LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",getConsumerId(), previouslyDeliveredMessages.transactionId,deliveredMessages.size());} else {if (session.isClientAcknowledge()) {LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());// allow redeliveryif (!this.info.isBrowser()) {for (MessageDispatch md : deliveredMessages) {this.session.connection.rollbackDuplicate(this,md.getMessage());}}}LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());deliveredMessages.clear();pendingAck = null;}}clearDeliveredList = false;}}}}
dequeue
- 從unconsumedMessage中取出一個消息,
- 在創建一個消費者時,就會未這個消費者創建一個為消費的消息通道,這個通道分為兩種,
- 一種是簡單優先級隊列分發通道SimplePriorityMessageDispatchChannel ;
- 另一種是先進先出的分發通道FifoMessageDispatchChannel.
- 至于為什么要存在這樣一個消息分發通道,大家可以想象一下,
- 如果消費者每次去消費完一個消息以后再去broker拿一個消息,效率是比較低的。
- 所以通過這樣的設計可以允許session能夠一次性將多條消息分發給一個消費者。
- 默認情況下對于queue來說,prefetchSize的值是1000
beforeMessageIsConsumed
- 這里面主要是做消息消費之前的一些準備工作,
- 如果ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來說就是除了Topic和DupAck這兩種情況),
- 所有的消息先放到deliveredMessages鏈表的開頭。
- 并且如果當前是事務類型的會話,
- 則判斷transactedIndividualAck,如果為true,表示單條消息直接返回ack。
- 否則,調用ackLater,批量應答,
- client端在消費消息后暫且不發送ACK,而是把它緩存下來(pendingACK),
- 等到這些消息的條數達到一定閥值時,只需要通過一個ACK指令把它們全部確認;
- 這比對每條消息都逐個確認,在性能上要提高很多
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {md.setDeliverySequenceId(session.getNextDeliveryId());lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();if (!isAutoAcknowledgeBatch()) {synchronized(deliveredMessages) {deliveredMessages.addFirst(md);}if (session.getTransacted()) {if (transactedIndividualAck) {immediateIndividualTransactedAck(md);} else {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}}}}
afterMessageIsConsumed
- 這個方法的主要作用是執行應答操作,這里面做以下幾個操作
- ? 如果消息過期,則返回消息過期的ack
- ? 如果是事務類型的會話,則不做任何處理
- ? 如果是AUTOACK或者(DUPS_OK_ACK且是隊列),并且是優化ack操作,則走批量確認ack
- ? 如果是DUPS_OK_ACK,則走ackLater邏輯
- ? 如果是CLIENT_ACK,則執行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throwsJMSException {if (unconsumedMessages.isClosed()) {return;}if (messageExpired) {acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);stats.getExpiredMessageCount().increment();} else {stats.onMessage();if (session.getTransacted()) {// Do nothing.} else if (isAutoAcknowledgeEach()) {if (deliveryingAcknowledgements.compareAndSet(false, true)) {synchronized (deliveredMessages) {if (!deliveredMessages.isEmpty()) {if (optimizeAcknowledge) {ackCounter++;// AMQ-3956 evaluate both expired and normal msgs as// otherwise consumer may get stalledif (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)|| (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +optimizeAcknowledgeTimeOut))) {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();ackCounter = 0;session.sendAck(ack);optimizeAckTimestamp = System.currentTimeMillis();}// AMQ-3956 - as further optimization send// ack for expired msgs when there are any.// This resets the deliveredCounter to 0 so that// we won't sent standard acks with every msg just// because the deliveredCounter just below// 0.5 * prefetch as used in ackLater()if (pendingAck != null && deliveredCounter > 0) {session.sendAck(pendingAck);pendingAck = null;deliveredCounter = 0;}}} else {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();session.sendAck(ack);}}}}deliveryingAcknowledgements.set(false);}} else if (isAutoAcknowledgeBatch()) {ackLater(md, MessageAck.STANDARD_ACK_TYPE);} else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {boolean messageUnackedByConsumer = false;synchronized (deliveredMessages) {messageUnackedByConsumer = deliveredMessages.contains(md);}if (messageUnackedByConsumer) {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}} else {throw new IllegalStateException("Invalid session state.");}}}
?
轉載于:https://my.oschina.net/u/3847203/blog/2989560
總結
以上是生活随笔為你收集整理的分布式消息通信ActiveMQ原理-消费消息策略-笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。