开源任务调度平台elastic-job-lite源码解析
前段時(shí)間寫過一遍文章<一文揭秘定時(shí)任務(wù)調(diào)度框架quartz>,有讀者建議我再講講elastic-job這個(gè)任務(wù)調(diào)度框架,年末沒有那么忙,就來學(xué)習(xí)一下elastic-job。
首先一點(diǎn),elastic-job基于quartz,理解quartz的運(yùn)行機(jī)制有助于對(duì)elastic-job的快速理解。
首先看一下elastic-job-lite的架構(gòu)
我們知道quartz有三個(gè)重要的概念:Job,Trigger,Scheduler。那么elastic-job里面三個(gè)概念是什么體現(xiàn)的呢?
1.Job
LiteJob繼承自quartz的job接口
import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException;/*** Lite調(diào)度作業(yè).** @author zhangliang*/ public final class LiteJob implements Job {@Setterprivate ElasticJob elasticJob;@Setterprivate JobFacade jobFacade;@Overridepublic void execute(final JobExecutionContext context) throws JobExecutionException {JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();} }其中,
1.1 ElasticJob實(shí)現(xiàn)了不同的Job類型
1.2.JobFacade是作業(yè)內(nèi)部服務(wù)門面服務(wù)
注意:elasticJob的特性在里面可以看到如:
任務(wù)分片:
將整體任務(wù)拆解為多個(gè)子任務(wù)
可通過服務(wù)器的增減彈性伸縮任務(wù)處理能力
分布式協(xié)調(diào),任務(wù)服務(wù)器上下線的全自動(dòng)發(fā)現(xiàn)與處理
容錯(cuò)性:
支持定時(shí)自我故障檢測(cè)與自動(dòng)修復(fù)
分布式任務(wù)分片唯一性保證
支持失效轉(zhuǎn)移和錯(cuò)過任務(wù)重觸發(fā)
任務(wù)跟蹤
任務(wù)調(diào)度
public interface JobFacade {/*** 讀取作業(yè)配置.* * @param fromCache 是否從緩存中讀取* @return 作業(yè)配置*/JobRootConfiguration loadJobRootConfiguration(boolean fromCache);/*** 檢查作業(yè)執(zhí)行環(huán)境.* * @throws JobExecutionEnvironmentException 作業(yè)執(zhí)行環(huán)境異常*/void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;/*** 如果需要失效轉(zhuǎn)移, 則執(zhí)行作業(yè)失效轉(zhuǎn)移.*/void failoverIfNecessary();/*** 注冊(cè)作業(yè)啟動(dòng)信息.** @param shardingContexts 分片上下文*/void registerJobBegin(ShardingContexts shardingContexts);/*** 注冊(cè)作業(yè)完成信息.** @param shardingContexts 分片上下文*/void registerJobCompleted(ShardingContexts shardingContexts);/*** 獲取當(dāng)前作業(yè)服務(wù)器的分片上下文.** @return 分片上下文*/ShardingContexts getShardingContexts();/*** 設(shè)置任務(wù)被錯(cuò)過執(zhí)行的標(biāo)記.** @param shardingItems 需要設(shè)置錯(cuò)過執(zhí)行的任務(wù)分片項(xiàng)* @return 是否滿足misfire條件*/boolean misfireIfRunning(Collection<Integer> shardingItems);/*** 清除任務(wù)被錯(cuò)過執(zhí)行的標(biāo)記.** @param shardingItems 需要清除錯(cuò)過執(zhí)行的任務(wù)分片項(xiàng)*/void clearMisfire(Collection<Integer> shardingItems);/*** 判斷作業(yè)是否需要執(zhí)行錯(cuò)過的任務(wù).* * @param shardingItems 任務(wù)分片項(xiàng)集合* @return 作業(yè)是否需要執(zhí)行錯(cuò)過的任務(wù)*/boolean isExecuteMisfired(Collection<Integer> shardingItems);/*** 判斷作業(yè)是否符合繼續(xù)運(yùn)行的條件.* * <p>如果作業(yè)停止或需要重分片或非流式處理則作業(yè)將不會(huì)繼續(xù)運(yùn)行.</p>* * @return 作業(yè)是否符合繼續(xù)運(yùn)行的條件*/boolean isEligibleForJobRunning();/**判斷是否需要重分片.** @return 是否需要重分片*/boolean isNeedSharding();/*** 作業(yè)執(zhí)行前的執(zhí)行的方法.** @param shardingContexts 分片上下文*/void beforeJobExecuted(ShardingContexts shardingContexts);/*** 作業(yè)執(zhí)行后的執(zhí)行的方法.** @param shardingContexts 分片上下文*/void afterJobExecuted(ShardingContexts shardingContexts);/*** 發(fā)布執(zhí)行事件.** @param jobExecutionEvent 作業(yè)執(zhí)行事件*/void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);/*** 發(fā)布作業(yè)狀態(tài)追蹤事件.** @param taskId 作業(yè)Id* @param state 作業(yè)執(zhí)行狀態(tài)* @param message 作業(yè)執(zhí)行消息*/void postJobStatusTraceEvent(String taskId, State state, String message); }2.JobDetail
通用的Job屬性,定義在job.xsd
<xsd:complexType name="base"><xsd:complexContent><xsd:extension base="beans:identifiedType"><xsd:all><xsd:element ref="listener" minOccurs="0" maxOccurs="1" /><xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" /></xsd:all><xsd:attribute name="class" type="xsd:string" /><xsd:attribute name="job-ref" type="xsd:string" /><xsd:attribute name="registry-center-ref" type="xsd:string" use="required" /><xsd:attribute name="cron" type="xsd:string" use="required" /><xsd:attribute name="sharding-total-count" type="xsd:string" use="required" /><xsd:attribute name="sharding-item-parameters" type="xsd:string" /><xsd:attribute name="job-parameter" type="xsd:string" /><xsd:attribute name="monitor-execution" type="xsd:string" default="true"/><xsd:attribute name="monitor-port" type="xsd:string" default="-1"/><xsd:attribute name="max-time-diff-seconds" type="xsd:string" default="-1"/><xsd:attribute name="failover" type="xsd:string" default="false"/><xsd:attribute name="reconcile-interval-minutes" type="xsd:int" default="10"/><xsd:attribute name="misfire" type="xsd:string" default="true"/><xsd:attribute name="job-sharding-strategy-class" type="xsd:string" /><xsd:attribute name="description" type="xsd:string" /><xsd:attribute name="disabled" type="xsd:string" default="false"/><xsd:attribute name="overwrite" type="xsd:string" default="false"/><xsd:attribute name="executor-service-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultExecutorServiceHandler"/><xsd:attribute name="job-exception-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultJobExceptionHandler"/><xsd:attribute name="event-trace-rdb-data-source" type="xsd:string" /></xsd:extension></xsd:complexContent></xsd:complexType>其中Simple類型的任務(wù)完全繼承通用屬性,dataflow類型的任務(wù)增加了streaming-process屬性,script增加了script-command-line屬性
使用的解析器定義在spring.handlers
http\://www.dangdang.com/schema/ddframe/reg=io.elasticjob.lite.spring.reg.handler.RegNamespaceHandler http\://www.dangdang.com/schema/ddframe/job=io.elasticjob.lite.spring.job.handler.JobNamespaceHandlerJobNamespaceHandler
/*** 分布式作業(yè)的命名空間處理器.* * @author caohao*/ public final class JobNamespaceHandler extends NamespaceHandlerSupport {@Overridepublic void init() {registerBeanDefinitionParser("simple", new SimpleJobBeanDefinitionParser());registerBeanDefinitionParser("dataflow", new DataflowJobBeanDefinitionParser());registerBeanDefinitionParser("script", new ScriptJobBeanDefinitionParser());} }在彈性化分布式作業(yè)執(zhí)行器AbstractElasticJobExecutor.java初始化時(shí)獲取配置屬性,并使用對(duì)應(yīng)的Handler進(jìn)行處理。
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {this.jobFacade = jobFacade;jobRootConfig = jobFacade.loadJobRootConfiguration(true);jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);}3 執(zhí)行作業(yè)
彈性化分布式作業(yè)執(zhí)行器AbstractElasticJobExecutor.java
/*** 執(zhí)行作業(yè).*/public final void execute() {try {jobFacade.checkJobExecutionEnvironment(); //1 } catch (final JobExecutionEnvironmentException cause) {jobExceptionHandler.handleException(jobName, cause);}ShardingContexts shardingContexts = jobFacade.getShardingContexts(); //2if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); //3 } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); //4 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); //5 while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); //6 try { jobFacade.afterJobExecuted(shardingContexts); //7 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }?3.1 環(huán)境監(jiān)測(cè)
檢查本機(jī)與注冊(cè)中心的時(shí)間誤差秒數(shù)是否在允許范圍
/*** 檢查本機(jī)與注冊(cè)中心的時(shí)間誤差秒數(shù)是否在允許范圍.* * @throws JobExecutionEnvironmentException 本機(jī)與注冊(cè)中心的時(shí)間誤差秒數(shù)不在允許范圍所拋出的異常*/public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();if (-1 == maxTimeDiffSeconds) {return;}long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());if (timeDiff > maxTimeDiffSeconds * 1000L) {throw new JobExecutionEnvironmentException("Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);}}3.2 根據(jù)分片規(guī)則進(jìn)行分片
如果需要分片且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn), 則作業(yè)分片.
?如果當(dāng)前無可用節(jié)點(diǎn)則不分片.
/*** 如果需要分片且當(dāng)前節(jié)點(diǎn)為主節(jié)點(diǎn), 則作業(yè)分片.* * <p>* 如果當(dāng)前無可用節(jié)點(diǎn)則不分片.* </p>*/public void shardingIfNecessary() {List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();if (!isNeedSharding() || availableJobInstances.isEmpty()) {return;}if (!leaderService.isLeaderUntilBlock()) {blockUntilShardingCompleted();return;}waitingOtherShardingItemCompleted();LiteJobConfiguration liteJobConfig = configService.load(false);int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();log.debug("Job '{}' sharding begin.", jobName);jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");resetShardingInfo(shardingTotalCount); JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));log.debug("Job '{}' sharding complete.", jobName);}3.3 使用EventBus通知
com.google.common.eventbus.EventBus
/*** Posts an event to all registered subscribers. This method will return* successfully after the event has been posted to all subscribers, and* regardless of any exceptions thrown by subscribers.** <p>If no subscribers have been subscribed for {@code event}'s class, and* {@code event} is not already a {@link DeadEvent}, it will be wrapped in a* DeadEvent and reposted.** @param event event to post.*/public void post(Object event) {Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());boolean dispatched = false;for (Class<?> eventType : dispatchTypes) {subscribersByTypeLock.readLock().lock();try {Set<EventSubscriber> wrappers = subscribersByType.get(eventType);if (!wrappers.isEmpty()) {dispatched = true;for (EventSubscriber wrapper : wrappers) {enqueueEvent(event, wrapper);}}} finally {subscribersByTypeLock.readLock().unlock();}}if (!dispatched && !(event instanceof DeadEvent)) {post(new DeadEvent(this, event));}dispatchQueuedEvents();}3.4 job預(yù)執(zhí)行,監(jiān)聽ElasticJobListener
?
@Overridepublic void beforeJobExecuted(final ShardingContexts shardingContexts) {for (ElasticJobListener each : elasticJobListeners) {each.beforeJobExecuted(shardingContexts);}}3.5 job執(zhí)行
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {if (shardingContexts.getShardingItemParameters().isEmpty()) {if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));}return;}jobFacade.registerJobBegin(shardingContexts);//1String taskId = shardingContexts.getTaskId();if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");}try { process(shardingContexts, executionSource);//2} finally {// TODO 考慮增加作業(yè)失敗的狀態(tài),并且考慮如何處理作業(yè)失敗的整體回路 jobFacade.registerJobCompleted(shardingContexts);if (itemErrorMessages.isEmpty()) {if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");}} else {if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());}}}}? >>1.將job注冊(cè)到注冊(cè)中心
? >>2.將各個(gè)任務(wù)分片放到線程池中執(zhí)行
3.6 實(shí)現(xiàn)轉(zhuǎn)移
如果需要失效轉(zhuǎn)移, 則執(zhí)行作業(yè)失效轉(zhuǎn)移.
/*** 在主節(jié)點(diǎn)執(zhí)行操作.* * @param latchNode 分布式鎖使用的作業(yè)節(jié)點(diǎn)名稱* @param callback 執(zhí)行操作的回調(diào)*/public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {latch.start();latch.await();callback.execute();//CHECKSTYLE:OFF} catch (final Exception ex) {//CHECKSTYLE:ON handleException(ex);}}3.7 作業(yè)執(zhí)行后處理
作業(yè)執(zhí)行后的執(zhí)行的方法
@Overridepublic void afterJobExecuted(final ShardingContexts shardingContexts) {for (ElasticJobListener each : elasticJobListeners) {each.afterJobExecuted(shardingContexts);}}4.Trigger?
elasticJob默認(rèn)使用Cron Trigger,在job屬性里定義
<xsd:attribute name="cron" type="xsd:string" use="required" />5.作業(yè)調(diào)度器JobScheduler
/*** 初始化作業(yè).*/public void init() {LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); //1JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //2JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); //3schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); //4}private JobDetail createJobDetail(final String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();if (elasticJobInstance.isPresent()) {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {try {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());} catch (final ReflectiveOperationException ex) {throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);}}return result;}protected Optional<ElasticJob> createElasticJobInstance() {return Optional.absent();}private Scheduler createScheduler() {Scheduler result;try {StdSchedulerFactory factory = new StdSchedulerFactory();factory.initialize(getBaseQuartzProperties());result = factory.getScheduler();result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());} catch (final SchedulerException ex) {throw new JobSystemException(ex);}return result;}private Properties getBaseQuartzProperties() {Properties result = new Properties();result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());result.put("org.quartz.threadPool.threadCount", "1");result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());result.put("org.quartz.jobStore.misfireThreshold", "1");result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());return result;}5.1?更新作業(yè)配置.
/*** 更新作業(yè)配置.** @param liteJobConfig 作業(yè)配置* @return 更新后的作業(yè)配置*/public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {configService.persist(liteJobConfig);return configService.load(false);}5.2 初始化一系列操作
5.2.1 創(chuàng)建quartz scheduler
private Scheduler createScheduler() {Scheduler result;try {StdSchedulerFactory factory = new StdSchedulerFactory();factory.initialize(getBaseQuartzProperties());result = factory.getScheduler();result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());} catch (final SchedulerException ex) {throw new JobSystemException(ex);}return result;}5.2.2 創(chuàng)建JobDetail
private JobDetail createJobDetail(final String jobClass) {JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();if (elasticJobInstance.isPresent()) {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {try {result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());} catch (final ReflectiveOperationException ex) {throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);}}return result;}5.2.3?添加作業(yè)調(diào)度控制器.
/*** 添加作業(yè)調(diào)度控制器.* * @param jobName 作業(yè)名稱* @param jobScheduleController 作業(yè)調(diào)度控制器* @param regCenter 注冊(cè)中心*/public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {schedulerMap.put(jobName, jobScheduleController);regCenterMap.put(jobName, regCenter);regCenter.addCacheData("/" + jobName);}5.2.4?調(diào)度作業(yè).
/*** 調(diào)度作業(yè).* * @param cron CRON表達(dá)式*/public void scheduleJob(final String cron) {try {if (!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron));}scheduler.start();} catch (final SchedulerException ex) {throw new JobSystemException(ex);}}6.總結(jié)
? >>elastic-job使用了quartz的調(diào)度機(jī)制,內(nèi)部原理一致,增加了性能和可用性。
? >>elastic-job使用注冊(cè)中心(zookeeper)替換了quartz的jdbc數(shù)據(jù)存儲(chǔ)方式,性能有較大提升。
?>> elastic-job增加了job的追蹤(使用Listener),便于monitor
?>>elastic-job使用了分片機(jī)制,可以將job分成多個(gè)任務(wù)項(xiàng),放到不同的地方執(zhí)行
?>>elastic-job僅支持cronTrigger,quartz支持更多的trigger實(shí)現(xiàn)
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/10346013.html
總結(jié)
以上是生活随笔為你收集整理的开源任务调度平台elastic-job-lite源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netflix网关zuul(1.x和2.
- 下一篇: 苏宁大数据离线任务开发调度平台实践:任务