任务调度之Elastic-Job2
運維平臺
下載解壓運行
git 下載源碼https://github.com/elasticjob/elastic-job-lite
對elastic-job-lite-console 打包得到安裝包
解壓縮elastic-job-lite-console-${version}.tar.gz 并執行bin\start.sh(Windows運行.bat)。打開瀏覽器訪問http://localhost:8899/即可訪問控制臺。
8899 為默認端口號,可通過啟動腳本輸入-p 自定義端口號。
默認管理員用戶名和密碼是root/root。右上角可以切換語言。
添加ZK 注冊中心
第一步,添加注冊中心,輸入ZK 地址和命名空間,并連接。
運維平臺和elastic-job-lite 并無直接關系,是通過讀取作業注冊中心數據展現作業狀態,或更新注冊中心數據修改全局配置。
控制臺只能控制作業本身是否運行,但不能控制作業進程的啟動,因為控制臺和作業本身服務器是完全分離的,控制臺并不能控制作業服務器。
可以對作業進行操作。
事件追蹤
http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/
Elastic-Job 提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用于查詢、統計和監控。
Elastic-Job-Lite 在配置中提供了JobEventConfiguration,目前支持數據庫方式配置。
ejob-standalone:simple.SimpleJobTest
BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); ………… new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); package simple;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class SimpleJobTest {// TODO 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "ejob-standalone"));regCenter.init();// 數據源,使用DBCP /* BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);*/// 定義作業核心配置// TODO 如果修改了代碼,跑之前清空ZKJobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 作業分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業根配置// LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();// 構建Jobnew JobScheduler(regCenter, simpleJobRootConfig).init();// new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();}}事件追蹤的event_trace_rdb_url 屬性對應庫自動創建JOB_EXECUTION_LOG 和JOB_STATUS_TRACE_LOG 兩張表以及若干索引。
需要在運維平臺中添加數據源信息,并且連接:
在作業歷史中查詢:
Spring 集成與分片詳解
ejob-springboot 工程
pom 依賴
<properties><elastic-job.version>2.1.5</elastic-job.version> </properties> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>${elastic-job.version}</version> </dependency> <!-- elastic-job-lite-spring --> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>${elastic-job.version}</version> </dependency>application.properties
定義配置類和任務類中要用到的參數
server.port=${random.int[10000,19999]} regCenter.serverList = localhost:2181 regCenter.namespace = ejob-springboot leonJob.cron = 0/3 * * * * ? leonJob.shardingTotalCount = 2 leonJob.shardingItemParameters = 0=0,1=1創建任務
創建任務類,加上@Component 注解
@Component public class SimpleJobDemo implements SimpleJob {public void execute(ShardingContext shardingContext) {System.out.println(String.format("------Thread ID: %s, %s,任務總片數: %s, " +"當前分片項: %s.當前參數: %s," +"當前任務名稱: %s.當前任務參數%s",Thread.currentThread().getId(),new SimpleDateFormat("HH:mm:ss").format(new Date()),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter(),shardingContext.getJobName(),shardingContext.getJobParameter()));} }注冊中心配置
Bean 的initMethod 屬性用來指定Bean 初始化完成之后要執行的方法,用來替代繼承InitializingBean 接口,以便在容器啟動的時候創建注冊中心。
@Configuration public class ElasticRegCenterConfig {@Bean(initMethod = "init")public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,@Value("${regCenter.namespace}") final String namespace) {return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));} }作業三級配置
Core——Type——Lite
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
@Configuration public class ElasticJobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob,@Value("${leonJob.cron}") final String cron,@Value("${leonJob.shardingTotalCount}") final int shardingTotalCount,@Value("${leonJob.shardingItemParameters}") final StringshardingItemParameters) {return new SpringJobScheduler(simpleJob, regCenter,getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));}private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,final String shardingItemParameters) {return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();} }作業運行
先把application.properties 中的分片數全部改成1
啟動com.leon.EjobApp 的main 方法
package com.leon;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class EjobApp {public static void main(String[] args) {SpringApplication.run(EjobApp.class, args);} }分片策略
分片項與分片參數
任務分片,是為了實現把一個任務拆分成多個子任務,在不同的ejob 示例上執行。例如100W 條數據,在配置文件中指定分成10 個子任務(分片項),這10 個子任務再按照一定的規則分配到5 個實際運行的服務器上執行。除了直接用分片項ShardingItem獲取分片任務之外,還可以用item 對應的parameter 獲取任務。
standalone 工程:simple.SimpleJobTest
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").build(); package simple;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class SimpleJobTest {// TODO 如果修改了代碼,跑之前清空ZKpublic static void main(String[] args) {// ZK注冊中心CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "leon-ejob-standalone"));regCenter.init();// 數據源,使用DBCP /* BasicDataSource dataSource = new BasicDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");dataSource.setUsername("root");dataSource.setPassword("123456");JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);*/// 定義作業核心配置// TODO 如果修改了代碼,跑之前清空ZKJobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();// 定義SIMPLE類型配置SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, MySimpleJob.class.getCanonicalName());// 作業分片策略// 基于平均分配算法的分片策略String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();// 定義Lite作業根配置// LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();// 構建Jobnew JobScheduler(regCenter, simpleJobRootConfig).init();// new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();}}springboot 工程,在application.properties 中定義。
定義幾個分片項,一個任務就會有幾個線程去運行它。
注意:分片個數和分片參數要一一對應。通常把分片項設置得比E-Job 服務器個數大一些,比如3 臺服務器,分成9 片,這樣如果有服務器宕機,分片還可以相對均勻。4.7.2 分片驗證
為避免運行的任務太多看不清楚運行結果,可以注釋在ElasticJobConfig 中注釋DataFlowJob 和ScriptJob。SimpleJob 的分片項改成2。
直接運行com.leon.EjobApp。
或者打成jar 包:mvn package -Dmaven.test.skip=true
Jar 包路徑:ejob-springboot\target\ejob-springboot-0.0.1-SNAPSHOT.jar
修改名稱為ejob.jar 放到D 盤下。
多實例運行(單機):
java –jar ejob.jar
1、多運行一個點,任務不會重跑(兩個節點各獲得一個分片項)
2、關閉一個節點,任務不會漏跑
分片策略
http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/
分片項如何分配到服務器?這個跟分片策略有關。
| AverageAllocationJobShardin gStrategy | 基于平均分配算法的分片策 略,也是默認的分片策略。 | 如果分片不能整除,則不能整除的多余分片將依 次追加到序號小的服務器。如: ? 如果有3 臺服務器,分成9 片,則每臺服務 器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8] ? 如果有3 臺服務器,分成8 片,則每臺服務 器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5] ? 如果有3 臺服務器,分成10 片,則每臺服務 器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8] |
| OdevitySortByNameJobShar dingStrategy | 根據作業名的哈希值奇偶數 決定IP 升降序算法的分片策 略。 | 根據作業名的哈希值奇偶數決定IP 升降序算法的 分片策略。 ? 作業名的哈希值為奇數則IP 升序。 ? 作業名的哈希值為偶數則IP 降序。 用于不同的作業平均分配負載至不同的服務器。 |
| RotateServerByNameJobSha rdingStrategy | 根據作業名的哈希值對服務 器列表進行輪轉的分片策 略。 | ? |
| 自定義分片策略 | ? | 實現JobShardingStrategy 接口并實現sharding 方 法,接口方法參數為作業服務器IP 列表和分片策 略選項,分片策略選項包括作業名稱,分片總數 以及分片序列號和個性化參數對照表,可以根據 需求定制化自己的分片策略。 |
AverageAllocationJobShardingStrategy 的缺點是,一旦分片數小于作業服務器數,作業將永遠分配至IP 地址靠前的服務
器,導致IP 地址靠后的服務器空閑。而OdevitySortByNameJobShardingStrategy 則可以根據作業名稱重新分配服務器負
載。如:
如果有3 臺服務器,分成2 片,作業名稱的哈希值為奇數,則每臺服務器分到的分片是:1=[0], 2=[1], 3=[]
如果有3 臺服務器,分成2 片,作業名稱的哈希值為偶數,則每臺服務器分到的分片是:3=[0], 2=[1], 1=[]
在Lite 配置中指定分片策略:
String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();分片方案
獲取到分片項shardingItem 之后,怎么對數據進行分片嗯?
1、對業務主鍵進行取模,獲取余數等于分片項的數據
舉例:獲取到的sharding item 是0,1
在SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。
這種方式的缺點:會導致索引失效,查詢數據時會全表掃描。
解決方案:在查詢條件中在增加一個索引條件進行過濾。
2、在表中增加一個字段,根據分片數生成一個mod 值。取模的基數要大于機器數。否則在增加機器后,會導致機器空閑。例如取模基數是2,而服務器有5 臺,那么有三臺服務器永遠空閑。而取模基數是10,生成10 個shardingItem,可以分配到5 臺服務器。當然,取模基數也可以調整。
3、如果從業務層面,可以用ShardingParamter 進行分片。
例如0=RDP, 1=CORE, 2=SIMS, 3=ECIF
List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID = 'RDP' limit 0, 100。
在Spring Boot 中要Elastic-Job 要配置的內容太多了,有沒有更簡單的添加任務的方法呢?比如在類上添加一個注解?這個時候我們就要用到starter 了。
e-job starter
Git 上有一個現成的實現
https://github.com/TFdream/elasticjob-spring-boot-starter
工程:elasticjob-spring-boot-starter
需求(一個starter 應該有什么樣子):
| 可以在啟動類上使用@Enable 功 能開啟E-Job 任務調度 | 注解@EnableElasticJob | 在自動配置類上用@ConditionalOnBean 決定是否自動配置 |
| 可以在properties 或yml 中識別配 置內容 | 配置類RegCenterProperties.java | 支持在properties 文件中使用 elasticjob.regCenter 前綴,配置注冊中心 參數 |
| 在類上加上注解,直接創建任務 | 注解@JobScheduled | 配置任務參數,包括定分片項、分片參 數等等 |
| 不用創建ZK注冊中心 | 自動配置類 RegCentreAutoConfiguration.java | 注入從RegCenterProperties.java 讀取到 的參數,自動創ZookeeperConfiguration |
| 不用創建三級(Core、Type、Lite) 配置 | 自動配置類 JobAutoConfiguration.java | 讀取注解的參數, 創建 JobCoreConfiguration 、 JobTypeConfiguration 、 LiteJobConfiguration 在注冊中心創建之后再創建 |
| Spring Boot 啟動時自動配置 | 創建 Resource/META-INF/spring.factori es | 指定兩個自動配置類 |
打包starter 的工程,引入starter 的依賴,即可在項目中使用注解開啟任務調度功能。
E-Job 原理
啟動
standalone 工程
new JobScheduler(regCenter, simpleJobRootConfig).init();init 方法
public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 設置分片數JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(),liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 構建任務,創建調度器JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(),createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),liteJobConfigFromRegCenter.getJobName());// 在ZK 上注冊任務JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 添加任務信息并進行節點選舉schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 啟動調度器jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }registerStartUpInfo 方法
public void registerStartUpInfo(final boolean enabled) {// 啟動所有的監聽器listenerManager.startAllListeners();// 節點選舉leaderService.electLeader();// 服務信息持久化(寫到ZK)serverService.persistOnline(enabled);// 實例信息持久化(寫到ZK)instanceService.persistOnline();// 重新分片shardingService.setReshardingFlag();// 監控信息監聽器monitorService.listen();// 自診斷修復,使本地節點與ZK 數據一致if (!reconcileService.isRunning()) {reconcileService.startAsync();} }監聽器用于監聽ZK 節點信息的變化。
啟動的時候進行主節點選舉
/** * 選舉主節點. */ public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());log.debug("Leader election completed."); }Latch 是一個分布式鎖,選舉成功后在instance 寫入服務器信息。
// 服務信息持久化(寫到ZK servers 節點)
serverService.persistOnline(enabled);以下是單機運行多個實例:
// 實例信息持久化(寫到ZK instances 節點)
instanceService.persistOnline();運行了兩個實例:
任務執行與分片原理
關注兩個問題:
1、LiteJob 是怎么被執行的?
2、分片項是怎么分配給不同的服務實例的?
在創建Job 的時候(createJobDetail),創建的是實現了Quartz 的Job 接口的LiteJob 類,LiteJob 類實現了Quartz 的Job 接口。
在LiteJob 的execute 方法中獲取對應類型的執行器,調用execute()方法。
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {if (null == elasticJob) {return new ScriptJobExecutor(jobFacade);}if (elasticJob instanceof SimpleJob) {return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);}if (elasticJob instanceof DataflowJob) {return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);}throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName()); }EJOB 提供管理任務執行器的抽象類AbstractElasticJobExecutor,核心動作在execute()方法中執行。
public final void execute() {調用了另一個execute()方法,122 行:
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {在這個execute 方法中又調用了process()方法,150 行
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();// 只有一個分片項時,直接執行if (1 == items.size()) {int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName,executionSource, item);process(shardingContexts, item, jobExecutionEvent);return;}final CountDownLatch latch = new CountDownLatch(items.size());// 本節點遍歷執行相應的分片信息for (final int each : items) {final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName,executionSource, each);if (executorService.isShutdown()) {return;}executorService.submit(new Runnable() {@Overridepublic void run() {try {process(shardingContexts, each, jobExecutionEvent);} finally {latch.countDown();}}});}try {// 等待所有的分片項任務執行完畢latch.await();} catch (final InterruptedException ex) {Thread.currentThread().interrupt();} }又調用了另一個process()方法,206 行
protected abstract void process(ShardingContext shardingContext);交給具體的實現類(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去處理。
最終調用到任務類
@Override protected void process(final ShardingContext shardingContext) {simpleJob.execute(shardingContext); }失效轉移
所謂失效轉移,就是在執行任務的過程中發生異常時,這個分片任務可以在其他節點再次執行。
simple.SimpleJobTest,failover 方法:
// 設置失效轉移 JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("MySimpleJob", "0/2 * * * * ?", 4).shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build();FailoverListenerManager 監聽的是zk 的instance 節點刪除事件。如果任務配置了failover 等于true,其中某個instance 與zk 失去聯系或被刪除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。
Job 的失效轉移監聽來源于FailoverListenerManager 中內部類JobCrashedJobListener 的dataChanged 方法。
當節點任務失效時會調用JobCrashedJobListener 監聽器,此監聽器會根據實例id獲取所有的分片,然后調用FailoverService 的setCrashedFailoverFlag 方法,將每個分片id 寫到/jobName/leader/failover/items 下,例如原來的實例負責1、2 分片項,
那么items 節點就會寫入1、2,代表這兩個分片項需要失效轉移。
然后接下來調用FailoverService 的failoverIfNessary 方法,首先判斷是否需要失敗轉移,如果可以需要則只需作業失敗轉移。
public void failoverIfNecessary() {if (needFailover()) {jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());} }條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。
條件二:當前作業不在運行中。
private boolean needFailover() {return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT)&& !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()&& !JobRegistry.getInstance().isJobRunning(jobName); }在主節點執行操作
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {latch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ONhandleException(ex);} }1、再次判斷是否需要失效轉移;
2、從注冊中心獲得一個`${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項;
3、在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 注冊作業分片項為當前作業節點;
4、然后移除任務轉移分片項;
5、最后調用執行,提交任務。
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {@Overridepublic void execute() {// 判斷是否需要失效轉移if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {return;}// 從${JOB_NAME}/leader/failover/items/${ITEM_ID}獲得一個分片項int crashedItem =Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);// 在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover`注冊作業分片項為當前作業節點jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem),JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());// 移除任務轉移分片項jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);if (null != jobScheduleController) {// 提交任務jobScheduleController.triggerJob();}} }這里僅僅是觸發作業,而不是立即執行。
?
總結
以上是生活随笔為你收集整理的任务调度之Elastic-Job2的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 任务调度之Elastic-Job1
- 下一篇: sonar提示总结