聊聊rocketmq的BrokerHousekeepingService
生活随笔
收集整理的這篇文章主要介紹了
聊聊rocketmq的BrokerHousekeepingService
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
序
本文主要研究一下rocketmq的BrokerHousekeepingService
BrokerHousekeepingService
org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
public class BrokerHousekeepingService implements ChannelEventListener {private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvController namesrvController;public BrokerHousekeepingService(NamesrvController namesrvController) {this.namesrvController = namesrvController;}@Overridepublic void onChannelConnect(String remoteAddr, Channel channel) {}@Overridepublic void onChannelClose(String remoteAddr, Channel channel) {this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);}@Overridepublic void onChannelException(String remoteAddr, Channel channel) {this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);}@Overridepublic void onChannelIdle(String remoteAddr, Channel channel) {this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);} }- 實現了ChannelEventListener接口,除了onChannelConnect外,其余各個方法均委托給namesrvController的routeInfoManager的onChannelDestroy方法
RouteInfoManager
org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
public class RouteInfoManager {private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;private final ReadWriteLock lock = new ReentrantReadWriteLock();private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;public RouteInfoManager() {this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);this.brokerAddrTable = new HashMap<String, BrokerData>(128);this.clusterAddrTable = new HashMap<String, Set<String>>(32);this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);this.filterServerTable = new HashMap<String, List<String>>(256);}public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {this.lock.writeLock().lockInterruptibly();this.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}if (removeBrokerName) {Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();String topic = entry.getKey();List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}if (queueDataList.isEmpty()) {itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}//...... }- 使用HashMap定義了topicQueueTable、brokerAddrTable、clusterAddrTable、clusterAddrTable、filterServerTable
- 在onChannelDestroy方法里頭使用讀寫鎖對這些map進行并發控制
- 首先找事件channel對應的broker信息,然后將其從brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable中移除
小結
rocketmq的BrokerHousekeepingService實現了ChannelEventListener接口,除了onChannelConnect外,其余各個方法均委托給namesrvController的routeInfoManager的onChannelDestroy方法,該方法主要是將下線的broker的信息從內存的路由映射中刪除掉。
doc
- BrokerHousekeepingService
轉載于:https://my.oschina.net/go4it/blog/1922317
總結
以上是生活随笔為你收集整理的聊聊rocketmq的BrokerHousekeepingService的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Go语言学习重点
- 下一篇: JDBC进行简单的增删改查