ElasticJob‐Lite:自定义作业分片策略
Java SPI機(jī)制
在上一篇博客中介紹了ElasticJob的作業(yè)分片策略:
- ElasticJob‐Lite:作業(yè)分片策略介紹與源碼分析
其中提到了ElasticJob是通過(guò)Java提供的SPI機(jī)制(ServiceLoader類)加載所有作業(yè)分片策略。
ServiceLoader類就是Java提供的SPI,SPI(Service Provider Interface)是JDK內(nèi)置的一種服務(wù)提供發(fā)現(xiàn)機(jī)制,可以用來(lái)啟用框架擴(kuò)展和替換組件,主要是被框架的開發(fā)人員使用,不同廠商可以針對(duì)同一接口做出不同的實(shí)現(xiàn),比如java.sql.Driver接口,MySQL和PostgreSQL都提供了對(duì)應(yīng)的實(shí)現(xiàn)給用戶使用,而Java的SPI機(jī)制可以為某個(gè)接口尋找服務(wù)實(shí)現(xiàn)。Java中SPI機(jī)制主要思想是將裝配的控制權(quán)移到程序之外,在模塊化設(shè)計(jì)中這個(gè)機(jī)制尤其重要,其核心思想就是解耦。
ServiceLoader類正常工作的唯一要求是服務(wù)提供類必須具有無(wú)參構(gòu)造函數(shù),以便它們可以在加載期間實(shí)例化。通過(guò)在資源目錄的META-INF/services中放置服務(wù)提供者配置文件來(lái)標(biāo)識(shí)服務(wù)提供者,文件名是服務(wù)類型的完全限定名(比如ElasticJobListener類的完全限定名),該文件包含具體的服務(wù)提供者類的完全限定名列表(ElasticJobListener實(shí)現(xiàn)類的完全限定名列表),每行一個(gè),每個(gè)名稱周圍的空格和制表符以及空行都將被忽略,該文件必須以UTF-8編碼。
自定義作業(yè)分片策略
所有可用的作業(yè)分片策略在JobShardingStrategyFactory類的靜態(tài)塊中被加載(通過(guò)ElasticJobServiceLoader類,該類是ElasticJob基于Java SPI機(jī)制實(shí)現(xiàn)的特定于作業(yè)的服務(wù)加載器)。
static {ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}加載的類型是JobShardingStrategy.class,因此自定義的作業(yè)分片策略需要實(shí)現(xiàn)該接口。
自定義作業(yè)分片策略ShuffleJobShardingStrategy類:
package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance; import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.*;public class ShuffleJobShardingStrategy implements JobShardingStrategy {@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {if (jobInstances.isEmpty()) {return Collections.emptyMap();}// 先將作業(yè)分片項(xiàng)裝入容器List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);for (int i = 0; i < shardingTotalCount; i++) {shuffleShardingList.add(i);}// 將容器中的作業(yè)分片項(xiàng)順序打亂(使用容器的shuffle方法)Collections.shuffle(shuffleShardingList);// 模仿AverageAllocationJobShardingStrategy作業(yè)分片策略進(jìn)行分配Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);return result;}private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,final int shardingTotalCount,final List<Integer> shuffleShardingList) {Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);// 每個(gè)作業(yè)服務(wù)器最少應(yīng)該分配的作業(yè)分片項(xiàng)數(shù)int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {// 每個(gè)作業(yè)服務(wù)器申請(qǐng)的作業(yè)分片項(xiàng)列表(容量為itemCountPerSharding + 1)// itemCountPerSharding + 1為每個(gè)作業(yè)服務(wù)器最多應(yīng)該分配的作業(yè)分片項(xiàng)數(shù)List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {// 給作業(yè)分片項(xiàng)列表添加容器中的第i個(gè)作業(yè)分片項(xiàng)shardingItems.add(shuffleShardingList.get(i));}// 將作業(yè)服務(wù)器與它執(zhí)行的作業(yè)分片項(xiàng)列表進(jìn)行關(guān)聯(lián)result.put(each, shardingItems);count++;}return result;}private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,final Map<JobInstance, List<Integer>> shardingResults,final List<Integer> shuffleShardingList) {// 無(wú)法平均分配的分片項(xiàng)數(shù)int aliquant = shardingTotalCount % shardingUnits.size();// 已分配的無(wú)法平均分配的分片項(xiàng)數(shù)int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {// 是否還有無(wú)法平均分配的分片項(xiàng)if (count < aliquant) {// 分配給序號(hào)較小的作業(yè)服務(wù)器entry.getValue().add(shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));}// 已分配數(shù)更新count++;}}// 作業(yè)分片策略的標(biāo)識(shí)符@Overridepublic String getType() {return "Shuffle";} }博主自定義的ShuffleJobShardingStrategy作業(yè)分片策略是模仿AverageAllocationJobShardingStrategy作業(yè)分片策略(默認(rèn)的作業(yè)分片策略),只是先將作業(yè)分片項(xiàng)裝入容器,然后將容器中的作業(yè)分片項(xiàng)順序打亂(使用容器的shuffle方法),之后再基于該作業(yè)分片項(xiàng)容器使用AverageAllocationJobShardingStrategy作業(yè)分片策略給作業(yè)服務(wù)器分配該容器中的作業(yè)分片項(xiàng),如果不了解AverageAllocationJobShardingStrategy作業(yè)分片策略,可以去看看最上面列出的博客。
添加服務(wù)實(shí)現(xiàn)
在resources的META-INF/services中放置服務(wù)提供者配置文件來(lái)標(biāo)識(shí)服務(wù)提供者,如下圖所示:
測(cè)試
作業(yè)定義(Simple作業(yè)):
package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import java.text.SimpleDateFormat; import java.util.Date;public class MySimpleJob implements SimpleJob {private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {String job = shardingContext.getShardingParameter();if(job == null || job.trim().equals("")) {System.out.println("請(qǐng)指定幫[Kaven]執(zhí)行的任務(wù)名稱!");throw new RuntimeException();}System.out.printf("%s 執(zhí)行任務(wù)%d - [%s]!\n", formatter.format(new Date()),shardingContext.getShardingItem(), job);} }啟動(dòng)類:
package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;public class Application {public static void main(String[] args) {new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();}// 注冊(cè)中心private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}// 作業(yè)配置private static JobConfiguration createJobConfiguration() {String jobs = "0=看論文,1=做實(shí)驗(yàn),2=打比賽,3=開組會(huì),4=看書,5=寫博客,6=看源碼";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)// 使用自定義的作業(yè)分片策略.jobShardingStrategyType("Shuffle").overwrite(true).build();} }啟動(dòng)三個(gè)作業(yè)服務(wù)器,輸出如下圖所示:
輸出符合預(yù)期,因?yàn)樽远x作業(yè)分片策略是模仿AverageAllocationJobShardingStrategy作業(yè)分片策略,但自定義作業(yè)分片策略中將作業(yè)的分片項(xiàng)順序打亂了,因此給每個(gè)作業(yè)服務(wù)器分配的作業(yè)分片項(xiàng)可能不是連續(xù)的。
修改作業(yè)配置(使用默認(rèn)的作業(yè)分片策略):
private static JobConfiguration createJobConfiguration() {String jobs = "0=看論文,1=做實(shí)驗(yàn),2=打比賽,3=開組會(huì),4=看書,5=寫博客,6=看源碼";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs) // .jobShardingStrategyType("Shuffle").overwrite(true).build();}啟動(dòng)三個(gè)作業(yè)服務(wù)器,輸出如下圖所示:
輸出符合AverageAllocationJobShardingStrategy作業(yè)分片策略,[0,1,6]、[2,3]、[4,5]很顯然是有序的,而博主自定義的作業(yè)分片策略是亂序的。
ElasticJob如何自定義作業(yè)分片策略就介紹到這里,如果博主有說(shuō)錯(cuò)的地方或者大家有不同的見解,歡迎大家評(píng)論補(bǔ)充。
總結(jié)
以上是生活随笔為你收集整理的ElasticJob‐Lite:自定义作业分片策略的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Python的特殊成员
- 下一篇: JAVA基础知识之字节和字符