任务调度之Elastic-Job1
目標
1、了解Elastic-Job 的基本特性
2、掌握Elastic-Job 開發與配置方式(包括Java 開發和Spring Boot 開發),掌握任務類型和任務分片策略
3、了解Elastic-Job 運維平臺的使用
4、掌握Elastic-Job 運行原理
內容定位
適合了解了Quartz 的調度模型之后,想要知道如何基于ZK 配置Quartz 和如何實現任務分片的同學
Quartz-Misfire
什么情況下錯過觸發?錯過觸發怎么辦?
線程池只有5 個線程,當有5 個任務都在執行的時候,第六個任務即將觸發,這個時候任務就不能得到執行。在quartz.properties 有一個屬性misfireThreshold,用來定義觸發器超時的"臨界值",也就是超過了這個時間,就算錯過觸發了。
例如,如果misfireThreshold 是60000(60 秒),9 點整應該執行的任務,9 點零1 分還沒有可用線程執行它,就會超時(misfires)。
下面這些原因可能造成misfired job:
1、沒有可用線程
2、Trigger 被暫停
3、系統重啟
4、禁止并發執行的任務在到達觸發時間時,上次執行還沒有結束。
錯過觸發怎么辦?Misfire 策略設置
每一種Trigger 都定義了自己的Misfire 策略,不同的策略通過不同的方法來設置。
standalone 工程MisfireTest
大體上來說有3 種:
1、忽略
2、立即跑一次
3、下次跑
詳細內容參考:
https://gper.club/articles/7e7e7f7ff7g59gc5g69
怎么避免任務錯過觸發?
合理地設置線程池數量,以及任務觸發間隔。
認識E-Job
任務調度高級需求
Quartz 的不足:
1、作業只能通過DB 搶占隨機負載,無法協調
2、任務不能分片——單個任務數據太多了跑不完,消耗線程,負載不均
3、作業日志可視化監控、統計
發展歷史
E-Job 是怎么來的?
在當當的ddframe 框架中,需要一個任務調度系統(作業系統)
實現的話有兩種思路,一個是修改開源產品,一種是基于開源產品搭建(封裝),當當選擇了后者,最開始這個調度系統叫做dd-job。它是一個無中心化的分布式調度框架。因為數據庫缺少分布式協調功能(比如選主),替換為Zookeeper 后,增加了彈性
擴容和數據分片的功能。
Elastic-Job 是ddframe 中的dd-job 作業模塊分離出來的作業框架,基于Quartz和Curator 開發,在2015 年開源。
輕量級,無中心化解決方案。
為什么說是去中心化呢?因為沒有統一的調度中心。集群的每個節點都是對等的,節點之間通過注冊中心進行分布式協調。E-Job 存在主節點的概念,但是主節點沒有調度的功能,而是用于處理一些集中式任務,如分片,清理運行時信息等。
思考:如果ZK 掛了怎么辦?
每個任務有獨立的線程池。
從官網開始
http://elasticjob.io/docs/elastic-job-lite/00-overview/
https://github.com/elasticjob
Elastic-Job 最開始只有一個elastic-job-core 的項目,在2.X 版本以后主要分為Elastic-Job-Lite 和Elastic-Job-Cloud 兩個子項目。其中,Elastic-Job-Lite 定位為輕量級無中心化解決方案, 使用jar 包的形式提供分布式任務的協調服務。而Elastic-Job-Cloud 使用Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務(跟Lite 的區別只是部署方式不同,他們使用相同的API,只要開發一次)。
功能特性
分布式調度協調:用ZK 實現注冊中心
錯過執行作業重觸發(Misfire)
支持并行調度(任務分片)
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
彈性擴容縮容:將任務拆分為n 個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job 將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。
失效轉移failover:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
支持作業生命周期操作(Listener)
豐富的作業類型(Simple、DataFlow、Script)
Spring 整合以及命名空間提供
運維平臺
項目架構
應用在各自的節點執行任務,通過ZK 注冊中心協調。節點注冊、節點選舉、任務分片、監聽都在E-Job 的代碼中完成。
Java 開發
工程:ejob-standalone
pom 依賴
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version> </dependency>任務類型
standalone 工程
任務類型有三種:
SimpleJob
SimpleJob: 簡單實現,未經任何封裝的類型。需實現SimpleJob 接口。
ejob-standalone MySimpleJob.java
public class MyElasticJob implements SimpleJob {public void execute(ShardingContext context) {System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId()));} }DataFlowJob
DataFlowJob:Dataflow 類型用于處理數據流,必須實現fetchData()和processData()的方法,一個用來獲取數據,一個用來處理獲取到的數據。
ejob-standalone MyDataFlowJob.java
public class MyDataFlowJob implements DataflowJob<String> {@Overridepublic List<String> fetchData(ShardingContext shardingContext) {// 獲取到了數據return Arrays.asList("leon","jack","seven");}@Overridepublic void processData(ShardingContext shardingContext, List<String> data) {data.forEach(x-> System.out.println("開始處理數據:"+x));} }ScriptJob
Script:Script 類型作業意為腳本類型作業,支持shell,python,perl 等所有類型腳本。D 盤下新建1.bat,內容:
@echo ------【腳本任務】Sharding Context: %*ejob-standalone script.ScriptJobTest
package script;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import dataflow.MyDataFlowJob;public class ScriptJobTest {// 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "ejob-standalone"));regCenter.init();// 定義作業核心配置JobCoreConfiguration scriptJobCoreConfig = JobCoreConfiguration.newBuilder("MyScriptJob", "0/4 * * * * ?", 2).build();// 定義SCRIPT類型配置ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptJobCoreConfig,"D:/1.bat");// 作業分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業根配置// LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).build();// 構建Jobnew JobScheduler(regCenter, scriptJobRootConfig).init();// new JobScheduler(regCenter, scriptJobRootConfig, jobEventConfig).init();}}只要指定腳本的內容或者位置
E-Job 配置
配置步驟
配置手冊:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
1、ZK 注冊中心配置(后面繼續分析)
2、作業配置(從底層往上層:Core——Type——Lite)
| Core | JobCoreConfiguration | 用于提供作業核心配置信息,如:作業名稱、CRON 表達式、分片總數等。 |
| Type | JobTypeConfiguration | 有3 個子類分別對應SIMPLE, DATAFLOW 和SCRIPT 類型作業,提供3 種作 業需要的不同配置,如:DATAFLOW 類型是否流式處理或SCRIPT 類型的命 令行等。Simple 和DataFlow 需要指定任務類的路徑。 |
| Root | JobRootConfiguration | 有2 個子類分別對應Lite 和Cloud 部署類型,提供不同部署類型所需的配 置,如:Lite 類型的是否需要覆蓋本地配置或Cloud 占用CPU 或Memory 數量等。 可以定義分片策略。 http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/ |
作業配置分為3 級,分別是JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。LiteJobConfiguration 使用JobTypeConfiguration ,JobTypeConfiguration 使用JobCoreConfiguration,層層嵌套。
JobTypeConfiguration 根據不同實現類型分為SimpleJobConfiguration ,DataflowJobConfiguration 和ScriptJobConfiguration。
E-Job 使用ZK 來做分布式協調,所有的配置都會寫入到ZK 節點。
ZK 注冊中心數據結構
一個任務一個二級節點。
這里面有些節點是臨時節點,只有任務運行的時候才能看到。
注意:修改了任務重新運行任務不生效,是因為ZK 的信息不會更新, 除非把overwrite 修改成true。
config 節點
JSON 格式存儲。
存儲任務的配置信息,包含執行類,cron 表達式,分片算法類,分片數量,分片參數等等。
{"jobName": "MySimpleJob","jobClass": "job.MySimpleJob","jobType": "SIMPLE","cron": "0/2 * * * * ?","shardingTotalCount": 1,"shardingItemParameters": "","jobParameter": "","failover": false,"misfire": true,"description": "","jobProperties": {"job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"monitorExecution": true,"maxTimeDiffSeconds": -1,"monitorPort": -1,"jobShardingStrategyClass": "","reconcileIntervalMinutes": 10,"disabled": false,"overwrite": false }config 節點的數據是通過ConfigService 持久化到zookeeper 中去的。默認狀態下,如果你修改了Job 的配置比如cron 表達式、分片數量等是不會更新到zookeeper 上去的,除非你在Lite 級別的配置把參數overwrite 修改成true。
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();instances 節點
同一個Job 下的elastic-job 的部署實例。一臺機器上可以啟動多個Job 實例,也就是Jar 包。instances 的命名是IP+@-@+PID。只有在運行的時候能看到。
leader 節點
任務實例的主節點信息,通過zookeeper 的主節點選舉,選出來的主節點信息。在elastic job 中,任務的執行可以分布在不同的實例(節點)中,但任務分片等核心控制,需要由主節點完成。因此,任務執行前,需要選舉出主節點。
下面有三個子節點:
election:主節點選舉
sharding:分片
failover:失效轉移
election 下面的instance 節點顯示了當前主節點的實例ID:jobInstanceId。
election 下面的latch 節點也是一個永久節點用于選舉時候的實現分布式鎖。
sharding 節點下面有一個臨時節點,necessary,是否需要重新分片的標記。如果分片總數變化,或任務實例節點上下線或啟用/禁用,以及主節點選舉,都會觸發設置重分片標記,主節點會進行分片計算。
servers 節點
任務實例的信息,主要是IP 地址,任務實例的IP 地址。跟instances 不同,如果多個任務實例在同一臺機器上運行則只會出現一個IP 子節點。可在IP 地址節點寫入DISABLED 表示該任務實例禁用。
sharding 節點
任務的分片信息,子節點是分片項序號,從0 開始。分片個數是在任務配置中設置的。分片項序號的子節點存儲詳細信息。每個分片項下的子節點用于控制和記錄分片運行狀態。最主要的子節點就是instance。
| instance | 否 | 執行該分片項的作業運行實例主鍵 |
| running | 是 | 分片項正在運行的狀態 僅配置monitorExecution 時有效 |
| failover | 是 | 如果該分片項被失效轉移分配給其他作業服務器,則此節點值記錄執行此分 片的作業服務器IP |
| misfire | 否 | 是否開啟錯過任務重新執行 |
| disabled | 否 | 是否禁用此分片項 |
?
總結
以上是生活随笔為你收集整理的任务调度之Elastic-Job1的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 任务调度之Quartz2
- 下一篇: 任务调度之Elastic-Job2