elasticjob disable JOB
生活随笔
收集整理的這篇文章主要介紹了
elasticjob disable JOB
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
先上總結(jié):
disable 入口代碼:
private void disableOrEnableJobs(final Optional<String> jobName, final Optional<String> serverIp, final boolean disabled) {Preconditions.checkArgument(jobName.isPresent() || serverIp.isPresent(), "At least indicate jobName or serverIp.");if (jobName.isPresent() && serverIp.isPresent()) {persistDisabledOrEnabledJob(jobName.get(), serverIp.get(), disabled);} else if (jobName.isPresent()) {JobNodePath jobNodePath = new JobNodePath(jobName.get());for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath())) {if (disabled) {regCenter.persist(jobNodePath.getServerNodePath(each), "DISABLED");} else {regCenter.persist(jobNodePath.getServerNodePath(each), "");}}} else if (serverIp.isPresent()) {List<String> jobNames = regCenter.getChildrenKeys("/");for (String each : jobNames) {if (regCenter.isExisted(new JobNodePath(each).getServerNodePath(serverIp.get()))) {persistDisabledOrEnabledJob(each, serverIp.get(), disabled);}}}}disable的nodepath 為 /test-disablejobName/servers/192.168.157.1,即按server 進(jìn)行disable
JOB調(diào)度執(zhí)行函數(shù)為類AbstractElasticJobExecutor的如下代碼:
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {if (shardingContexts.getShardingItemParameters().isEmpty()) {if (shardingContexts.isAllowSendJobEvent()) {jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));}return;}上面代碼的getShardingItemParameters 會(huì)返回empty,shardingContexts的獲取來(lái)自以下代碼:
@Overridepublic ShardingContexts getShardingContexts() {boolean isFailover = configService.load(true).isFailover();if (isFailover) {List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();if (!failoverShardingItems.isEmpty()) {return executionContextService.getJobShardingContext(failoverShardingItems);}}shardingService.shardingIfNecessary();List<Integer> shardingItems = shardingService.getLocalShardingItems();if (isFailover) {shardingItems.removeAll(failoverService.getLocalTakeOffItems());}shardingItems.removeAll(executionService.getDisabledItems(shardingItems));return executionContextService.getJobShardingContext(shardingItems);}而關(guān)鍵的獲取getLocalShardingItems,本地sharding item的相關(guān)代碼如下:
/*** 獲取運(yùn)行在本作業(yè)實(shí)例的分片項(xiàng)集合.* * @return 運(yùn)行在本作業(yè)實(shí)例的分片項(xiàng)集合*/public List<Integer> getLocalShardingItems() {if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {return Collections.emptyList();}return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());}/*** 判斷作業(yè)服務(wù)器是否可用.* * @param ip 作業(yè)服務(wù)器IP地址* @return 作業(yè)服務(wù)器是否可用*/public boolean isAvailableServer(final String ip) {return isEnableServer(ip) && hasOnlineInstances(ip);}private boolean hasOnlineInstances(final String ip) {for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {if (each.startsWith(ip)) {return true;}}return false;}/*** 判斷服務(wù)器是否啟用.** @param ip 作業(yè)服務(wù)器IP地址* @return 服務(wù)器是否啟用*/public boolean isEnableServer(final String ip) {return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)));}注意以上的hasOnlineInstances函數(shù)及isEnableServer,去zookeeper獲取的相關(guān)path里面的狀態(tài)被disabled。
總結(jié)
以上是生活随笔為你收集整理的elasticjob disable JOB的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: CS229 7.1应用机器学习中的一些
- 下一篇: Mybatis用#{}从传递过来的参数中