RocketMQ:NameServer架构设计以及启动关闭流程源码分析
文章目錄
- NameServer
- 1.架構設計
- 2.核心類與配置
- 3.啟動與關閉流程
- 3.1.步驟一
- 3.2.步驟二
- 3.3.步驟三
NameServer
1.架構設計
消息中間件的設計思路一般都是基于主題訂閱與發(fā)布的機制,RocketMQ也不例外。RocketMQ中,消息生產者(Producer)發(fā)送某主題的消息到消息服務器,消息服務器對消息進行持久化存儲,而消息消費者(Consumer)訂閱所需要的主題,消息服務器根據(jù)訂閱信息(路由信息)將消息推送至消息消費者(Push模式)或者消息消費者主動向消息服務器進行拉取(Pull模式),從而實現(xiàn)消息生產者與消息消費者之間解耦。
為了避免消息服務器單點故障而導致的系統(tǒng)癱瘓,消息服務器常常會集群分布,部署多臺服務器共同處理消息并且承擔消息的存儲,消息生產者如何知道要將消息發(fā)送至哪臺服務器和消息消費者如何知道要從哪臺消息服務器進行消息的拉取等等問題,都要由NameServer來處理,其實NameServer充當?shù)慕巧cZookeeper十分相似。
Broker消息服務器啟動時,需要向NameServer集群進行信息注冊,消息生產者Producer發(fā)送消息之前主動向NameServer獲取Broker服務器地址列表,然后根據(jù)負載均衡算法從列表中選出一臺服務器進行消息的發(fā)送。NameServer與每臺Broker保持長連接,并每隔30s對Broker存活狀態(tài)進行檢測,如果檢測到Broker宕機并且長時間沒有進行連接重試,則會將該Broker從路由注冊表中刪除,以此保證Broker集群的高可用,但是路由變化不會立馬對生產者進行通知,需要Producer一段時間之后重新向NameServer進行獲取并更新路由信息。這也是NameServer與Zookeeper的不同,NameServer這樣的設計降低了整個NameServer實現(xiàn)的復雜度,整個NameServer代碼實現(xiàn)不超過一千行,簡單而高效!
以下是NameServer整個項目預覽:
可以看到NameServer主要有以下幾個作用:
- 配置信息管理
- 請求處理
- 路由信息管理
2.核心類與配置
NamesrvController
NameserController 是 NameServer 模塊的核心控制類。
private final NamesrvConfig namesrvConfig;//主要指定 nameserver 的相關配置屬性 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));//NameServer定時任務執(zhí)行線程池-->每隔10s掃描broker,對存活的Broker信息進行維護并且打印KVConfig private final KVConfigManager kvConfigManager;//讀取或變更NameServer的配置屬性,加載 NamesrvConfig中配置到內存 private final RouteInfoManager routeInfoManager;//NameServer 數(shù)據(jù)的載體,記錄 Broker、Topic 等信息。private final NettyServerConfig nettyServerConfig;//與網絡通訊相關的配置 private RemotingServer remotingServer;//網絡通信服務 private ExecutorService remotingExecutor;//網絡通信服務NamesrvConfig
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;rocketmqHome:rocketmq主目錄
kvConfigPath:NameServer存儲KV配置屬性的持久化路徑
configStorePath:nameServer默認配置文件路徑
orderMessageEnable:是否支持順序消息
NettyServerConfig
private int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120;private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;listenPort:NameServer監(jiān)聽端口,該值默認會被初始化為9876
serverWorkerThreads:Netty業(yè)務線程池線程個數(shù)
serverCallbackExecutorThreads:Netty public任務線程池線程個數(shù),Netty網絡設計,根據(jù)業(yè)務類型會創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費、心跳檢測等。
serverSelectorThreads:IO線程池個數(shù),主要是NameServer、Broker端解析請求、返回相應的線程個數(shù),這類線程主要是處理網路請求的,解析請求包,然后轉發(fā)到各個業(yè)務線程池完成具體的操作,然后將結果返回給調用方;
serverOnewaySemaphoreValue:send oneway消息請求;
serverAsyncSemaphoreValue:異步消息發(fā)送最大并發(fā)數(shù);
serverChannelMaxIdleTimeSeconds :網絡連接最大的空閑時間,默認120s。
serverSocketSndBufSize:網絡socket發(fā)送端緩沖區(qū)大小。
serverSocketRcvBufSize: 網絡socket接收端緩存區(qū)大小。
serverPooledByteBufAllocatorEnable:ByteBuffer是否開啟緩存;
useEpollNativeSelector:是否啟用Epoll IO模型。
RouteInfoManager
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;BROKER_CHANNEL_EXPIRED_TIME:NameServer與Broker空閑連接時長,在2 minNameServer之內沒有收到Broker的心跳包,則NameServer會關閉與該Broker的連接并刪除Broker的路由信息。
lock:讀寫鎖,用來保護以下用于存儲關鍵信息的非線程安全容器HashMap。
topicQueueTable:用于存儲主題與隊列的映射關系,記錄一個主題topic的隊列分布在哪些Broker上。以下是QueueData屬性值:
private String brokerName; //broker名稱 private int readQueueNums; //讀隊列個數(shù) private int writeQueueNums; //寫隊列個數(shù) private int perm; //操作權限 private int topicSysFlag; //同步復制還是異步復制的標識brokerAddrTable:用于記錄所有Broker信息。以下是BrokerData屬性值:
private String cluster; //當前Broker所屬集群 private String brokerName; //Broker名稱 //BrokerId=0表示主節(jié)點,BrokerId>0表示從節(jié)點 //記錄BrokerId與對應節(jié)點地址的映射信息 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;clusterAddrTable:用于記錄Broker集群信息
brokerLiveTable:用于記錄活躍狀態(tài)的Broker,NameServer每隔10s對所有Broker進行掃描,如果有Broker宕機,會將該Broker從該表中刪去,以此維護可用的Broker列表信息。以下是BrokerLiveInfo的屬性值:
private long lastUpdateTimestamp; //上次發(fā)送心跳包的時間戳 private DataVersion dataVersion; //記錄數(shù)據(jù)版本信息 private Channel channel; private String haServerAddr; //Master節(jié)點地址3.啟動與關閉流程
NameServer啟動時序圖:
啟動類:org.apache.rocketmq.namesrv.NamesrvStartup.java
3.1.步驟一
解析配置文件,填充NamesrvConfig、NettyServerConfig并創(chuàng)建NamesrvController:
啟動類:
public static void main(String[] args) {main0(args); }public static NamesrvController main0(String[] args) {try {//創(chuàng)建NamesrvController的入口NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }NamesrvController#createNamesrvController:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {//....//創(chuàng)建namesrvConfigfinal NamesrvConfig namesrvConfig = new NamesrvConfig();//創(chuàng)建nettyServerConfigfinal NettyServerConfig nettyServerConfig = new NettyServerConfig();//設置默認端口9876nettyServerConfig.setListenPort(9876);//-c 指定屬性配置文件的位置if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}//-p 屬性名=屬性值if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}//將啟動參數(shù)填充到namesrvConfig中MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//如果未指定'ROCKETMQ_HOME'環(huán)境變量if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//....//打印配置信息日志MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);//根據(jù)namesrvConfig和nettyServerConfig創(chuàng)建NamesrvControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// 將配置存入controller.configuration以防止配置丟失controller.getConfiguration().registerConfig(properties);return controller; }3.2.步驟二
根據(jù)配置創(chuàng)建好NamesrvController之后,對其進行初始化:
//NamesrvStartup#start public static NamesrvController start(final NamesrvController controller) throws Exception {//進行簡單的檢查if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}//controller初始化boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//....controller.start(); //開啟遠程服務-this.remotingServer.start();return controller; }//NamesrvController#initialize public boolean initialize() {//加載配置管理器this.kvConfigManager.load();//創(chuàng)建Netty遠程服務this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//創(chuàng)建遠程服務線程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//注冊線程池this.registerProcessor();//定時任務線程池--->每隔十秒掃描活躍狀態(tài)異常的Broker信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {/*** 對Not Active Broker 進行掃描*/@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//定時任務線程池--->每隔十秒打印KVConfig信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);//....return true;}3.3.步驟三
在JVM進程關閉之前,先將線程池關閉,及時釋放資源。
public static NamesrvController start(final NamesrvController controller) throws Exception {//....//JVM進程關閉之前,將線程池關閉,資源釋放Runtime.getRuntime().addShutdownHook/*注冊JVM鉤子函數(shù)*/(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));//.... }以上僅供個人學習使用,如有不足請指正!
總結
以上是生活随笔為你收集整理的RocketMQ:NameServer架构设计以及启动关闭流程源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ集群启动报错:java.
- 下一篇: RocketMQ:NameServer路