rocketmq 初探(五)
大家好,我是烤鴨:
????上一篇簡單介紹部分 NettyRequestProcessor (AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor),這一篇介紹其他的。
PullMessageProcessor、QueryMessageProcessor、ReplyMessageProcessor、SendMessageProcessor
NettyRequestProcessor
PullMessageProcessor,一個將近300行的代碼,差點沒給我送走了,把一些無關的判斷直接…了,為了省篇幅。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);response.setOpaque(request.getOpaque());log.debug("receive PullMessage request command, {}", request);// 當前節(jié)點是否有讀權限if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}// 獲取訂閱關系進行判斷SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// ... 是否為空,(里邊的customer)是否可以消費// ...// 獲取topic 配置進行判斷TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// ... 是否為空,是否讀權限,請求的queueId<0 或者 <topic的readQueueNumsSubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;// 有訂閱關系,解析訂閱信息(獲取消費標記)if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());// expressionType 不是 tag類型的,根據(jù)request創(chuàng)建了一個consumerFilterData(里邊有topic、customerGroup)if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {// ...}} else {// 獲取 consumer 組信息ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());// ... 判斷非空,判斷訂閱組的配置是非廣播且消息類型是廣播消息,返回錯誤// 根據(jù)topic獲取訂閱信息subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());// ... 判斷非空,判斷時間戳// ... expressionType 不是 tag類型的,從 filter獲取 consumerFilterData(里邊有topic、customerGroup)// ... expressionType 不是 tag類型的,broker配置也不支持屬性過濾,返回錯誤if (!ExpressionType.isTagType(subscriptionData.getExpressionType())&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}MessageFilter messageFilter;// 根據(jù)是否支持filter的重試,構建不同的 messageFilterif (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 從commitlog中獲取message,為空的話,response設置error msgfinal GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());// 消費過慢,可以從 salve節(jié)點拉取消息(配置的從節(jié)點brokerId)if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:// 從節(jié)點要有讀權限if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machineif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;// ... // 節(jié)點的queue中沒有message,queue的offset是否為0case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify me} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;default:assert false;break;}// 是否有消費消息的鉤子if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {// ... 不同code 構建不同的 context}// 回調(diào)鉤子this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:// group.getNums + 1(消費數(shù)量)this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());// group.getSize + 1(buffer總和) this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());// broker..getNums + 1(消費數(shù)量) this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());// 默認是消息從內(nèi)存中獲取if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills = this.brokerController.getMessageStore().now();// 從byteBuffer中獲取,記錄時間final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());// getLatency + 1(延遲消息數(shù)量) this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),(int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {// 不記錄刷盤時間,netty response 寫回try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:// 允許延遲,超時延遲拉取if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}// 立即重試case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:// 主節(jié)點或者從節(jié)點開啟了offset檢測if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {// 記錄 warn 日志MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;// 更新offsetTable(Map:key(group@topic),value(queueId,offset))if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response; }QueryMessageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 后臺接口switch (request.getCode()) {// 條件查詢msg(topic、key、maxNum、時間戳)case RequestCode.QUERY_MESSAGE:return this.queryMessage(ctx, request);// 根據(jù)offset獲取commitlogcase RequestCode.VIEW_MESSAGE_BY_ID:return this.viewMessageById(ctx, request);default:break;}return null; }ReplyMessageProcessor
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {SendMessageContext mqtraceContext = null;SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;}// 構建 traceContextmqtraceContext = buildMsgContext(ctx, requestHeader);// 執(zhí)行hook之前,完善 sendMessageContext 對象this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// belowRemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);// do nothingthis.executeSendMessageHookAfter(response, mqtraceContext);return response;} private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) {final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();response.setOpaque(request.getOpaque());response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("receive SendReplyMessage request command, {}", request);final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();if (this.brokerController.getMessageStore().now() < startTimstamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));return response;}response.setCode(-1);// 校驗topicConfig(如果是重試消息,topicConfig為空則創(chuàng)建)和queueId(大于topicConfig中的讀寫數(shù)量,返回錯誤)super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();}// 構建channel msgMessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());// 完善header,同步到broker的clientPushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);// 保存返回值if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);}return response; }SendMessageProcessor
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消費者的回復消息case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 和batch相比,多個同步和異步刷盤還有事務的判斷return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}asyncConsumerSendMsgBack
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());// 回調(diào)鉤子之后處理,完善context對象if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);this.executeConsumeMessageHookAfter(context);}SubscriptionGroupConfig subscriptionGroupConfig =// 訂閱組關系和一些校驗(非空、寫權限) this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return CompletableFuture.completedFuture(response);}if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");return CompletableFuture.completedFuture(response);}// 無需重試,返回成功if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return CompletableFuture.completedFuture(response);}// 需要重試,構建新topicString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag = 0;if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 創(chuàng)建topicConfigTopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);// 校驗(非空、寫權限)if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));return CompletableFuture.completedFuture(response);}// 根據(jù)offset獲取msgMessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("look message by offset failed, " + requestHeader.getOffset());return CompletableFuture.completedFuture(response);}// retryTopic 為空的話,給默認的final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}msgExt.setWaitStoreMsgOK(false);// 延遲等級int delayLevel = requestHeader.getDelayLevel();// 客戶端版本<3.4.9,最大重試次數(shù)取request的int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {Integer times = requestHeader.getMaxReconsumeTimes();if (times != null) {maxReconsumeTimes = times;}}// 重試次數(shù) > 最大次數(shù) 或 非延時if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {// 進入前綴為死信的topicnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return CompletableFuture.completedFuture(response);}} else {// 延時為0的時候,延遲級別=次數(shù)+3(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h )if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}// 構建store msgMessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));// 存入commitlog,并更新最后操作時間CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply((r) -> {if (r != null) {switch (r.getPutMessageStatus()) {case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if (correctTopic != null) {backTopic = correctTopic;}// 消費次數(shù) + 1this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;default:break;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(r.getPutMessageStatus().name());return response;}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("putMessageResult is null");return response;}); }asyncSendBatchMessage
private CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}int queueIdInt = requestHeader.getQueueId();// 獲取topic配置TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("message topic length too long " + requestHeader.getTopic().length());return CompletableFuture.completedFuture(response);}// 構建批量messageMessageExtBatch messageExtBatch = new MessageExtBatch();messageExtBatch.setTopic(requestHeader.getTopic());messageExtBatch.setQueueId(queueIdInt);int sysFlag = requestHeader.getSysFlag();if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;}messageExtBatch.setSysFlag(sysFlag);messageExtBatch.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));messageExtBatch.setBody(request.getBody());messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());messageExtBatch.setBornHost(ctx.channel().remoteAddress());messageExtBatch.setStoreHost(this.getStoreHost());messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);// 刷盤CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessages(messageExtBatch);// 處理刷盤結果return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); }handlePutMessageResultFuture —> handlePutMessageResult
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;// ...// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;// ... default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);// 發(fā)送成功if (sendOK) {// 計數(shù),topic刷盤數(shù)量this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);// 計數(shù),topic刷盤數(shù)據(jù)大小this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());// 計數(shù),broker刷盤數(shù)量this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());responseHeader.setQueueId(queueIdInt);responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());// response寫回doResponse(ctx, request, response);// 有鉤子的話,構建 sendMessageContext 對象if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {// 發(fā)送失敗,有鉤子的話,構建 sendMessageContext 對象,狀態(tài)是發(fā)送失敗if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response; }小結
PullMessageProcessor:
先看下方法的流轉(zhuǎn),broker接收到消息會觸發(fā)監(jiān)聽,同時通知 PullMessageProcessor.processRequest() 對請求進行處理
DefaultMessageStore#ReputMessageService.doReput() —> NotifyMessageArrivingListener.arriving() —>
pullRequestHoldService.notifyMessageArriving() —> PullMessageProcessor.executeRequestWhenWakeup()
獲取訂閱關系和topic配置,必要的判斷。
獲取消費標記,產(chǎn)生 subscriptionData 和 consumerFilterData。做一些版本和tag類型的判斷。
根據(jù)messageFilter的配置,初始化消息過濾。
消費過慢,可以從 salve節(jié)點拉取消息(配置的從節(jié)點brokerId)。從節(jié)點要有權限。
是否有消費消息的鉤子,回調(diào)鉤子。
拉取成功:更新組的消息數(shù)量和buffer的大小。默認從內(nèi)存中獲取消息,記錄時間并netty返回。
拉取失敗:允許延遲的話,調(diào)用延遲拉取。
offset移動:記錄warn日志。
更新offsetTable(Map:key(group@topic),value(queueId,offset))。
QueryMessageProcessor:
后臺接口,條件查詢msg(topic、key、maxNum、時間戳)。
后臺接口,根據(jù)offset獲取commitlog。
ReplyMessageProcessor:
DefaultMQProducerImpl.sendMessage(SEND_REPLY_MESSAGE/SEND_REPLY_MESSAGE_V2) —>
ReplyMessageProcessor.processRequest()
構建 traceContext
執(zhí)行hook之前,完善 sendMessageContext 對象
校驗topicConfig(如果是重試消息,topicConfig為空則創(chuàng)建)和queueId(大于topicConfig中的讀寫數(shù)量,返回錯誤)
建channel msg
完善header,同步到broker的client
保存返回值
SendMessageProcessor:
DefaultMQProducerImpl.sendMessage(SEND_MESSAGE/SEND_MESSAGE_V2/SEND_BATCH_MESSAGE) / DefaultMQPullConsumerImpl/DefaultMQPushConsumerImpl.consumerSendMessageBack ——>
SendMessageProcessor.processRequest ()
消費的回復消息:
做校驗和判斷,定義重試topic
重試次數(shù) > 最大次數(shù) 或 非延時,創(chuàng)建死信的topic
延時為0的時候,延遲級別=次數(shù)+3
構建store msg 存入 commitlog,更新消費成功數(shù)量 +1
正常異步發(fā)送消息單條/批量:
單條:同步/異步刷盤、事務的話構建半消息
批量:刷盤,計數(shù),topic刷盤數(shù)量/數(shù)據(jù)大小,broker刷盤數(shù)量。構建回調(diào)鉤子的sendMessageContext,并標記狀態(tài)。
總結
以上是生活随笔為你收集整理的rocketmq 初探(五)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oauth基本流程和原理
- 下一篇: CentOS 7 防止端口自动关闭