???關注微信公眾號:【芋艿的后端小屋】有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源碼分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注釋源碼 GitHub 地址 您對于源碼的疑問每條留言都將得到認真回復。甚至不知道如何讀源碼也可以請教噢。 新的源碼解析文章實時收到通知。每周更新一篇左右。
- 1. 概述
- 2. Filtersrv 注冊到 Broker
- 3. 過濾類
- 3.1 Consumer 訂閱時設置 過濾類代碼
- 3.2 Consumer 上傳 過濾類代碼
- 3.3 Filter 編譯 過濾類代碼
- 4. 過濾消息
- 4.1 Consumer 從 Filtersrv 拉取消息
- 4.2 Filtersrv 從 Broker 拉取消息
- 5. Filtersrv 高可用
1. 概述
Filtersrv ,負責自定義規則過濾 Consumer 從 Broker 拉取的消息。
Filtersrv.png
為什么 Broker 不提供過濾消息的功能呢?我們來看看官方的說法:
- Broker 端消息過濾
在 Broker 中,按照 Consumer 的要求做過濾,優點是減少了對于 Consumer 無用消息的網絡傳輸。 缺點是增加了 Broker 的負擔,實現相對復雜。
(1). 淘寶 Notify 支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎可以滿足最苛刻的過濾需求。
(2). 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。
(3). CORBA Notification 規范中也支持靈活的語法表達式過濾。 - Consumer 端消息過濾
這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的消息要傳輸到 Consumer 端。
就是在這種考慮下,Filtersrv 出現了。減少了 Broker 的負擔,又減少了 Consumer 接收無用的消息。當然缺點也是有的,多了一層 Filtersrv 網絡開銷。
2. Filtersrv 注冊到 Broker
- ? 一個 Filtersrv 只對應一個 Broker。
- ? 一個 Broker 可以對應多個 Filtersrv。Filtersrv 的高可用通過啟動多個 Filtersrv 實現。
- ? Filtersrv 注冊失敗時,主動退出關閉。
核心代碼如下:
1:
2:
public boolean initialize() {
3:
4:
5:
6:
this.scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
7:
8:
@Override9:
public void run() {
10: FiltersrvController.
this.registerFilterServerToBroker();
11: }
12: },
15,
10, TimeUnit.SECONDS);
13:
14:
15: }
16:
17:
21:
public void registerFilterServerToBroker() {
22:
try {
23: RegisterFilterServerResponseHeader responseHeader =
24:
this.filterServerOuterAPI.registerFilterServerToBroker(
25:
this.filtersrvConfig.getConnectWhichBroker(),
this.localAddr());
26:
this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
27: .setDefaultBrokerId(responseHeader.getBrokerId());
28:
29:
if (
null ==
this.brokerName) {
30:
this.brokerName = responseHeader.getBrokerName();
31: }
32:
33: log.info(
"register filter server<{}> to broker<{}> OK, Return: {} {}",
34:
this.localAddr(),
35:
this.filtersrvConfig.getConnectWhichBroker(),
36: responseHeader.getBrokerName(),
37: responseHeader.getBrokerId());
38: }
catch (Exception e) {
39: log.warn(
"register filter server Exception", e);
40:
41: log.warn(
"access broker failed, kill oneself");
42: System.exit(-
1);
43: }
44: }
復制代碼3. 過濾類
Filtersrv過濾類
3.1 Consumer 訂閱時設置 過濾類代碼
- ? Consumer 針對每個 Topic 可以訂閱不同的 過濾類代碼。
1:
2:
@Override3:
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
4:
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
5: }
復制代碼3.2 Consumer 上傳 過濾類代碼
- ? Consumer 心跳注冊到 Broker 的同時,上傳 過濾類代碼 到 Broker 對應的所有 Filtersrv。
1:
2:
5:
public void sendHeartbeatToAllBrokerWithLock() {
6:
if (
this.lockHeartbeat.tryLock()) {
7:
try {
8:
this.sendHeartbeatToAllBroker();
9:
this.uploadFilterClassSource();
10: }
catch (
final Exception e) {
11: log.error(
"sendHeartbeatToAllBroker exception", e);
12: }
finally {
13:
this.lockHeartbeat.unlock();
14: }
15: }
else {
16: log.warn(
"lock heartBeat, but failed.");
17: }
18: }
19:
20:
23:
private void uploadFilterClassSource() {
24: Iterator<Entry<String, MQConsumerInner>> it =
this.consumerTable.entrySet().iterator();
25:
while (it.hasNext()) {
26: Entry<String, MQConsumerInner> next = it.next();
27: MQConsumerInner consumer = next.getValue();
28:
if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
29: Set<SubscriptionData> subscriptions = consumer.subscriptions();
30:
for (SubscriptionData sub : subscriptions) {
31:
if (sub.isClassFilterMode() && sub.getFilterClassSource() !=
null) {
32:
final String consumerGroup = consumer.groupName();
33:
final String className = sub.getSubString();
34:
final String topic = sub.getTopic();
35:
final String filterClassSource = sub.getFilterClassSource();
36:
try {
37:
this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
38: }
catch (Exception e) {
39: log.error(
"uploadFilterClassToAllFilterServer Exception", e);
40: }
41: }
42: }
43: }
44: }
45: }
復制代碼3.3 Filter 編譯 過濾類代碼
- ? Filtersrv 處理 Consumer 上傳的 過濾類代碼,并進行編譯使用。
核心代碼如下:
1:
2:
12:
public boolean registerFilterClass(final String consumerGroup, final String topic,13: final String className, final int classCRC, final byte[] filterSourceBinary) {
14:
final String key = buildKey(consumerGroup, topic);
15:
16:
boolean registerNew =
false;
17: FilterClassInfo filterClassInfoPrev =
this.filterClassTable.get(key);
18:
if (
null == filterClassInfoPrev) {
19: registerNew =
true;
20: }
else {
21:
if (
this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
22:
if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC !=
0) {
23: registerNew =
true;
24: }
25: }
26: }
27:
28:
if (registerNew) {
29:
synchronized (
this.compileLock) {
30: filterClassInfoPrev =
this.filterClassTable.get(key);
31:
if (
null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
32:
return true;
33: }
34:
try {
35: FilterClassInfo filterClassInfoNew =
new FilterClassInfo();
36: filterClassInfoNew.setClassName(className);
37: filterClassInfoNew.setClassCRC(
0);
38: filterClassInfoNew.setMessageFilter(
null);
39:
40:
if (
this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
41: String javaSource =
new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
42:
43: Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
44:
45: Object newInstance = newClass.newInstance();
46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
47: filterClassInfoNew.setClassCRC(classCRC);
48: }
49:
50:
this.filterClassTable.put(key, filterClassInfoNew);
51: }
catch (Throwable e) {
52: String info = String.format(
"FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
53: consumerGroup, topic, className);
54: log.error(info, e);
55:
return false;
56: }
57: }
58: }
59:
60:
return true;
61: }
復制代碼
4. 過濾消息
Filtersrv.png
4.1 Consumer 從 Filtersrv 拉取消息
- ? Consumer 拉取 使用過濾類方式訂閱 的消費消息時,從 Broker 對應的 Filtersrv 列表隨機選擇一個拉取消息。如果選擇不到 Filtersrv,則無法拉取消息。因此,Filtersrv 一定要做高可用。
1:
2:
22:
protected PullResult pullKernelImpl(23: final MessageQueue mq,24: final String subExpression,25: final long subVersion,26: final long offset,27: final int maxNums,28: final int sysFlag,29: final long commitOffset,30: final long brokerSuspendMaxTimeMillis,31: final long timeoutMillis,32: final CommunicationMode communicationMode,33: final PullCallback pullCallback34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
35:
36:
37:
if (findBrokerResult !=
null) {
38:
39:
40: String brokerAddr = findBrokerResult.getBrokerAddr();
41:
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
43: }
44:
45: PullResult pullResult =
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
46: brokerAddr,
47: requestHeader,
48: timeoutMillis,
49: communicationMode,
50: pullCallback);
51:
52:
return pullResult;
53: }
54:
55:
56:
throw new MQClientException(
"The broker[" + mq.getBrokerName() +
"] not exist",
null);
57: }
58:
59:
67:
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)68: throws MQClientException {
69: ConcurrentHashMap<String, TopicRouteData> topicRouteTable =
this.mQClientFactory.getTopicRouteTable();
70:
if (topicRouteTable !=
null) {
71: TopicRouteData topicRouteData = topicRouteTable.get(topic);
72: List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
73:
if (list !=
null && !list.isEmpty()) {
74:
return list.get(randomNum() % list.size());
75: }
76: }
77:
throw new MQClientException(
"Find Filter Server Failed, Broker Addr: " + brokerAddr +
" topic: "78: + topic,
null);
79: }
復制代碼4.2 Filtersrv 從 Broker 拉取消息
- ? Filtersrv 拉取消息后,會建議 Consumer 向 Broker主節點 拉取消息。
- ? Filtersrv 可以理解成一個 Consumer,向 Broker 拉取消息時,實際使用的 DefaultMQPullConsumer.java 的方法和邏輯。
1:
2:
10:
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
11:
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
12:
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
13:
final PullMessageRequestHeader requestHeader =
14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
15:
16:
final FilterContext filterContext =
new FilterContext();
17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
18:
19: response.setOpaque(request.getOpaque());
20:
21: DefaultMQPullConsumer pullConsumer =
this.filtersrvController.getDefaultMQPullConsumer();
22:
23:
24:
final FilterClassInfo findFilterClass =
this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
25:
if (
null == findFilterClass) {
26: response.setCode(ResponseCode.SYSTEM_ERROR);
27: response.setRemark(
"Find Filter class failed, not registered");
28:
return response;
29: }
30:
if (
null == findFilterClass.getMessageFilter()) {
31: response.setCode(ResponseCode.SYSTEM_ERROR);
32: response.setRemark(
"Find Filter class failed, registered but no class");
33:
return response;
34: }
35:
36:
37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
38:
39: MessageQueue mq =
new MessageQueue();
40: mq.setTopic(requestHeader.getTopic());
41: mq.setQueueId(requestHeader.getQueueId());
42: mq.setBrokerName(
this.filtersrvController.getBrokerName());
43:
long offset = requestHeader.getQueueOffset();
44:
int maxNums = requestHeader.getMaxMsgNums();
45:
46:
final PullCallback pullCallback =
new PullCallback() {
47:
48:
@Override49:
public void onSuccess(PullResult pullResult) {
50: responseHeader.setMaxOffset(pullResult.getMaxOffset());
51: responseHeader.setMinOffset(pullResult.getMinOffset());
52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
53: response.setRemark(
null);
54:
55:
switch (pullResult.getPullStatus()) {
56:
case FOUND:
57: response.setCode(ResponseCode.SUCCESS);
58:
59: List<MessageExt> msgListOK =
new ArrayList<MessageExt>();
60:
try {
61:
for (MessageExt msg : pullResult.getMsgFoundList()) {
62:
63:
boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
64:
if (match) {
65: msgListOK.add(msg);
66: }
67: }
68:
69:
if (!msgListOK.isEmpty()) {
70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
71:
return;
72: }
else {
73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
74: }
75: }
catch (Throwable e) {
76:
final String error =
77: String.format(
"do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
78: requestHeader.getConsumerGroup(), requestHeader.getTopic());
79: log.error(error, e);
80:
81: response.setCode(ResponseCode.SYSTEM_ERROR);
82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
84:
return;
85: }
86:
87:
break;
88:
case NO_MATCHED_MSG:
89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
90:
break;
91:
case NO_NEW_MSG:
92: response.setCode(ResponseCode.PULL_NOT_FOUND);
93:
break;
94:
case OFFSET_ILLEGAL:
95: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
96:
break;
97:
default:
98:
break;
99: }
100:
101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
102: }
103:
104:
@Override
105:
public void onException(Throwable e) {
106: response.setCode(ResponseCode.SYSTEM_ERROR);
107: response.setRemark(
"Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
109:
return;
110: }
111: };
112:
113:
114: pullConsumer.pullBlockIfNotFound(mq,
null, offset, maxNums, pullCallback);
115:
return null;
116: }
復制代碼5. Filtersrv 高可用
Filtersrv過可用
總結
以上是生活随笔為你收集整理的RocketMQ源码解析:Filtersrv的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。