【JEECG TBSchedule】详解应对平台高并发的分布式调度框架TBSchedule
原文地址:http://geek.csdn.net/news/detail/65738
【編者按】?TBSchedule是一款非常優秀的高性能分布式調度框架,本文是作者結合多年使用TBSchedule的經驗,在研讀三遍源碼的基礎上完成。期間作者也與阿里空玄有過不少技術交流,并非常感謝空玄給予的大力支持。另外,作者寫這篇文章的目的一是出于對TBSchedule的一種熱愛,二是現在是一個資源共享、技術共享的時代,希望把它展現給大家(送人玫瑰,手留余香),能給大家的工作帶來幫助。
以下為文章正文:
一、TBSchedule初識
? ? 時下互聯網和電商領域,各個平臺都存在大數據、高并發的特點,對數據處理的要求越來越高,既要保證高效性,又要保證安全性、準確性。TBSchedule的使命就是將調度作業從業務系統中分離出來,降低或者是消除和業務系統的耦合度,進行高效異步任務處理。其實在互聯網和電商領域TBSchedule的使用非常廣泛,目前被應用于阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等很多互聯網企業的流程調度系統。
? ? 在深入了解TBSchedule之前我們先從內部和外部形態對它有個初步認識,如圖1.1、圖1.2。
圖1.1 TBSchedule關鍵字 圖1.2 TBSchedule外部形態? ? 從TBSchedule的內部形態來說,與他有關的關鍵詞包括批量任務、動態擴展、多主機、多線程、并發、分片……,這些詞看起來非常的高大上,都是時下互聯網技術比較流行的詞匯。從TBSchedule的外部架構來看,一目了然,宿主在調度應用中與ZooKeeper進行通信。一個框架結構是否是優秀的,從美感的角度就可以看出來,一個好的架構一定是隱藏了內部復雜的原理,外部視覺上美好的,讓用戶使用起來簡單易懂。
二、TBSchedule原理
? ? 為什么TBSchedule值得推廣呢?
? ? TBSchedule到底有多強大呢?我對TBSchedule的優勢特點進行了如下總結:
? ? TBSchedule支持Cluster,可以宿主在多臺服務器多個線程組并行進行任務調度,或者說可以將一個大的任務拆成多個小任務分配到不同的服務器。
? ? TBSchedule的分布式機制是通過靈活的Sharding方式實現的,比如可以按所有數據的ID按10取模分片(分片規則如圖2.1)、按月份分片等等,根據不同的需求,不同的場景由客戶端配置分片規則。然后就是TBSchedule的宿主服務器可以進行動態擴容和資源回收,這個特點主要是因為它后端依賴的ZooKeeper,這里的ZooKeeper對于TBSchedule來說是一個NoSQL,用于存儲策略、任務、心跳信息數據,它的數據結構類似文件系統的目錄結構,它的節點有臨時節點、持久節點之分。調度引擎上線后,隨著業務量數據量的增多,當前Cluster可能不能滿足目前的處理需求,那么就需要增加服務器數量,一個新的服務器上線后會在ZooKeeper中創建一個代表當前服務器的一個唯一性路徑(臨時節點),并且新上線的服務器會和ZooKeeper保持長連接,當通信斷開后,節點會自動摘除。
? ? TBSchedule會定時掃描當前服務器的數量,重新進行任務分配。TBSchedule不僅提供了服務端的高性能調度服務,還提供了一個scheduleConsole war隨著宿主應用的部署直接部署到服務器,可以通過web的方式對調度的任務、策略進行監控管理,以及實時更新調整。
圖2.1 TBSchedule分片規則? ? 是不是已經對TBSchedule稍微了有些好感呢?我們接著往下看。
? ? TBSchedule提供了兩個核心組件ScheduleServer、TBScheduleManagerFactory和兩類核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是客戶端研發的關鍵部分,是使用TBSchedule必須要了解的。
? ? ScheduleServer即任務處理器,的主要作用是任務和策略的管理、任務采集和執行,由一組工作線程組成,這組工作線程是基于隊列實現的,進行任務抓取和任務處理(有兩種處理模式,下面會講)。每個任務處理器和ZooKeeper有一個心跳通信連接,用于檢測Server的狀態和進行任務動態分配。舉個例子,比如3臺服務器的worker集群執行出票消息生成任務,對于這個任務類型每臺服務器可以配置一個ScheduleSever(即一個線程組),也可以配置兩個線程組,那么就相當于6臺服務器在并行執行此任務類型。當某臺服務器宕機或者其他原因與ZooKeeper通信斷開時,它的任務將被其他服務器接管。ScheduleServer參數定義如圖2.2
圖2.2 ScheduleServer參數定義? ? 在這些參數中taskItems是一個非常重要的屬性,是客戶單可以自由發揮的地方,是任務分片的基礎,比如我們處理一個任務可以根據ID按10取模,那么任務項就是0-9,3臺服務器分別拿到4、 3、 3個任務項,服務器的上下線都會對任務項進行重新分配。任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理,但一個Server可以處理任意數量的任務項。這就是剛才我們說的分片特性。
? ? 調度服務器TBScheduleManagerFactory的主要工作ZooKeeper連接參數配置和ZooKeeper的初始化、調度管理。
? ? 兩類核心接口是需要被我們定義的目標任務實現的,根據自己的需要進行任務采集(重寫selectTasks方法)和任務執行(重寫execute方法),這兩類接口也是客戶端研發根據需求自由發揮的地方。
? ? 接下來我們深入了解下TBSchedule,看看它的內部是如何實現的。圖2.3流程圖是我花了很多心血通過一周時間畫出來的,基本是清晰的展現了TBSchedule內部的執行流程以及每個步驟ZooKeeper節點路徑和數據的變化。因為圖中的注釋已經描述的很詳細了,每個節點右側是ZooKeeper的信息(數據結構見圖2.4),這里就不再做過多的文字描述了,有任何建議或者不明白的地方可以找我交流。
圖2.3 TBSchedule內部流程 圖2.4 TBSchedule之ZooKeeper數據結構? ? TBSchedule還有個強大之處是它提供了兩種處理器模式模式:
? ? 1. SLEEP模式
? ? 當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處于活動狀態。如果是,則自己休眠;如果其它線程都已經因為沒有任務進入休眠,當前線程是最后一個活動線程的時候,就調用業務接口,獲取需要處理的任務,放入任務池中,同時喚醒其它休眠線程開始工作。
? ?2. NOTSLEEP模式
? ? 當一個線程任務處理完畢,從任務池中取不到任務的時候,立即調用業務接口獲取需要處理的任務,放入任務池中。
? ? SLEEP模式內部邏輯相對較簡單,如果遇到大任務需要處理較長時間,可能會造成其他線程被動阻塞的情況。但其實生產環境一般都是小而快的任務,即使出現阻塞的情況ScheduleConsole也會及時的監控到。NOTSLEEP模式減少了線程休眠的時間,避免了因大任務造成阻塞的情況,但為了避免數據被重復處理,增加了CPU在數據比較上的開銷。TBSchedule默認是SLEEP模式。
? ? 到目前為止我相信大家對TBSchedule有了一個深刻的了解,心中的疑霧逐漸散開了。理論是實踐的基礎,實踐才是最終的目的,下一節我們將結合理論知識進行TBSchedule實戰。
三、TBSchedule實戰
? ? 在項目中使用TBSchedule需要依賴ZooKeeper、TBSchedule。
? ? ZooKeeper依賴:
<dependency><groupId>org.apache.ZooKeeper</groupId><artifactId>ZooKeeper</artifactId><version>3.4.6</version></dependency>? ? TBSchedule依賴:
<dependency><groupId>com.taobao.pamirs.schedule</groupId><artifactId>TBSchedule</artifactId><version>3.3.3.2</version></dependency>? ? TBSchedule有三種引入方式:
? ? TBSchedule隨著宿主調度應用部署到服務器后,可以通過Web瀏覽器的方式訪問其提供監控平臺。
? ? 第一步,初始化ZooKeeper
? ? 第二步,創建調度策略
? ? 第三步,創建調度任務
? ? 第四步,監控調度任務
? ? 2、通過原生Java引入
// 初始化SpringApplicationContext ctx = new FileSystemXmlApplicationContext("spring-config.xml");// 初始化調度工廠TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();Properties p = new Properties();p.put("zkConnectString", "127.0.0.1:2181");p.put("rootPath", "/taobao-schedule/train_worker");p.put("zkSessionTimeout", "60000"); p.put("userName", "train_dev");p.put("password", " train_dev ");p.put("isCheckParentPath", "true");scheduleManagerFactory.setApplicationContext(ctx);scheduleManagerFactory.init(p); // 創建任務調度任務的基本信息 String baseTaskTypeName = "DemoTask";ScheduleTaskType baseTaskType = new ScheduleTaskType();baseTaskType.setBaseTaskType(baseTaskTypeName);baseTaskType.setDealBeanName("demoTaskBean");baseTaskType.setHeartBeatRate(10000);baseTaskType.setJudgeDeadInterval(100000);baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem("0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +"4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +"8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));baseTaskType.setFetchDataNumber(500);baseTaskType.setThreadNumber(5);this.scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);log.info("創建調度任務成功:" + baseTaskType.toString());// 創建任務的調度策略String taskName = baseTaskTypeName;String strategyName =taskName +"-Strategy";try {this.scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,true);} catch (Exception e) {e.printStackTrace();}ScheduleStrategy strategy = new ScheduleStrategy();strategy.setStrategyName(strategyName);strategy.setKind(ScheduleStrategy.Kind.Schedule);strategy.setTaskName(taskName);strategy.setTaskParameter("china");strategy.setNumOfSingleServer(1);strategy.setAssignNum(10);strategy.setIPList("127.0.0.1".split(","));this.scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);log.info("創建調度策略成功:" + strategy.toString());? ? 3、通過Spring容器引入
<!-- 初始化ZooKeeper --> <bean id="scheduleManagerFactory"class="xx.xx.TBScheduleManagerFactory"> <property name="zkConfig"> <map><entry key="zkConnectString" value="127.0.0.1:2181" /><entry key="rootPath" value="/taobao-schedule/train_worker" /><entry key="zkSessionTimeout" value="60000" /><entry key="userName" value="train_dev" /><entry key="password" value="train_dev" /><entry key="isCheckParentPath" value="true" /> </map> </property> </bean> <!-- 配置調度策略 凌晨1點到3點執行 --> <bean id="abstractDemoScheduleTask" class="com.xx.core.TBSchedule.InitScheduleTask" abstract="true"> <property name="scheduleTaskType.heartBeatRate" value="10000" /> <property name="scheduleTaskType.judgeDeadInterval" value="100000" /> <property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/> <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/> <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" /> <property name="scheduleTaskType.sleepTimeNoData" value="60000"/> <property name="scheduleTaskType.sleepTimeInterval" value="60000"/> <property name="scheduleTaskType.fetchDataNumber" value="500" /> <property name="scheduleTaskType.executeNumber" value="1" /> <property name="scheduleTaskType.threadNumber" value="5" /> <property name="scheduleTaskType.taskItems"> <list><value>0:{TYPE=A,KIND=1}</value><value>1:{TYPE=A,KIND=2}</value><value>2:{TYPE=A,KIND=3}</value><value>3:{TYPE=A,KIND=4}</value><value>4:{TYPE=A,KIND=5}</value><value>5:{TYPE=A,KIND=6}</value><value>6:{TYPE=A,KIND=7}</value><value>7:{TYPE=A,KIND=8}</value><value>8:{TYPE=A,KIND=9}</value><value>9:{TYPE=A,KIND=10}</value></list> </property> <property name="scheduleStrategy.kind" value="Schedule" /> <property name="scheduleStrategy.numOfSingleServer" value="1" /> <property name="scheduleStrategy.assignNum" value="10" /> <property name="scheduleStrategy.iPList"><list><value>127.0.0.1</value></list></property></bean> <!-- 配置調度任務 --> <bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask"> <property name="scheduleTaskType.baseTaskType" value="demoTask" /> <property name="scheduleTaskType.dealBeanName" value="demoTaskBean" /> <property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" /> <property name="scheduleStrategy.taskName" value="demoTaskBean" /> </bean> 調度任務具體實現 DemoTask.java/*** DemoTask任務類*/ public class DemoTask mplementsIScheduleTaskDealSingle,TScheduleTaskDeal {/*** 數據采集* @param taskItemNum--分配的任務項 taskItemList--總任務項 * eachFetchDataNum--采集任務數量*/@Overridepublic List<DemoTask> selectTasks(String taskParameter,String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,int eachFetchDataNum) throws Exception {List<DemoTask> taskList = new LinkedList<DemoTask>();//客戶端根據條件進行數據采集start//客戶端根據條件進行數據采集endreturn rt;}/*** 數據處理*/@Overridepublic boolean execute(DemoTask task, String ownSign)throws Exception {//客戶端pop任務進行處理start//客戶端pop任務進行處理endreturn true;} }? ? 其實我們看對于TBSchedule客戶端的使用非常簡單,初始化ZooKeeper、配置調度策略、配置調度任務,對調度任務進行具體實現,就這幾個步驟?,F在可以慶祝下了,你又掌握了一個優秀開源框架的設計思想和使用方式。
四、TBSchedule挑戰
? ? 任何事物都是沒有最好只有更好,TBSchedule也一樣,雖然它現在已經很完美了,我們不能放棄對更完美的追求。阿里團隊可以在下面幾個方面進行優化。
? ? 至此,我們已經完成了對TBSchedule的全部介紹,盡快使用起來吧!
總結
以上是生活随笔為你收集整理的【JEECG TBSchedule】详解应对平台高并发的分布式调度框架TBSchedule的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信小程序,技术创业的时代可能要来了,但
- 下一篇: Flying to the Mars