Elastic Job入门(1) - 简介
1、如果業(yè)務(wù)工程采用集群化的部署,可能會多次重復(fù)執(zhí)行定時(shí)任務(wù)而導(dǎo)致系統(tǒng)的業(yè)務(wù)邏輯錯誤,并產(chǎn)生系統(tǒng)故障。
2、Quartz的集群方案具備HA功能,可以實(shí)現(xiàn)定時(shí)任務(wù)的分發(fā),但是通過增加機(jī)器節(jié)點(diǎn)數(shù)量的方式并不能提高每次定時(shí)任務(wù)的執(zhí)行效率,無法實(shí)現(xiàn)任務(wù)的彈性分片。
開源產(chǎn)品Elastic-Job是當(dāng)當(dāng)開源的一款分布式彈性定時(shí)任務(wù)調(diào)度框架,在2.X版本以后主要分為Elastic-Job-Lite和Elastic-Job-Cloud兩個子項(xiàng)目。其中,Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。而Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)。本文主要學(xué)習(xí)Elastic-Job-Lite。
GitHub地址為:https://github.com/elasticjob/elastic-job-lite
中文官網(wǎng)為:http://elasticjob.io/index_zh.html
從Elastic-Job的架構(gòu)圖上基本就可以看出,其以Jar的形式為業(yè)務(wù)工程(諸如,Spring Boot工程)的快速集成提供了簡便的方式。同時(shí),其提供的定時(shí)任務(wù)分片、彈性擴(kuò)縮容、失效轉(zhuǎn)移、作業(yè)監(jiān)控和支持多種作業(yè)模式等強(qiáng)大的功能,使業(yè)務(wù)開發(fā)人員無需在這些方面花費(fèi)較大多的精力,而可以更加專注于平臺的業(yè)務(wù)開發(fā)。其主要的功能如下:
1、定時(shí)任務(wù):基于成熟的定時(shí)任務(wù)作業(yè)框架Quartz cron表達(dá)式執(zhí)行定時(shí)任務(wù);
2、作業(yè)注冊中心:基于Zookeeper和其客戶端Curator實(shí)現(xiàn)的全局作業(yè)注冊控制中心;作業(yè)注冊中心僅用于作業(yè)任務(wù)注冊和監(jiān)控信息的暫存;
3、定時(shí)任務(wù)分片:可以將原本一個較大任務(wù)分片成為多小的子任務(wù)項(xiàng)分別在多個服務(wù)器上同時(shí)執(zhí)行,提高總?cè)蝿?wù)的執(zhí)行處理效率;
4、彈性擴(kuò)容縮容:運(yùn)行中定時(shí)任務(wù)所在的服務(wù)器崩潰,或新增加n臺作業(yè)服務(wù)器,作業(yè)框架將在下次任務(wù)執(zhí)行前重新進(jìn)行任務(wù)調(diào)度分發(fā),不影響當(dāng)前任務(wù)的處理與執(zhí)行;
5、支持多種任務(wù)模式:分別支持Simple、Dataflow和Script類型的定時(shí)任務(wù);
6、失效轉(zhuǎn)移:運(yùn)行中的定時(shí)任務(wù)所在的服務(wù)器崩潰不會導(dǎo)致重新分片,會在下次定時(shí)任務(wù)啟動時(shí)重新分發(fā)和調(diào)度;
7、運(yùn)行時(shí)定時(shí)任務(wù)狀態(tài)收集:監(jiān)控任務(wù)運(yùn)行時(shí)的狀態(tài),統(tǒng)計(jì)最近一段時(shí)間任務(wù)處理成功和失敗的數(shù)量,記錄作業(yè)上次運(yùn)行開始時(shí)間,結(jié)束時(shí)間和下次運(yùn)行時(shí)間;
8、支持配置定時(shí)任務(wù)停止、恢復(fù)和禁用:用于操作定時(shí)任務(wù)的啟停,并可以禁止某任務(wù)的執(zhí)行;
9、Spring支持:Elastic-Job-Lite項(xiàng)目完美支持spring的容器,自定義命名空間,支持占位符
10、運(yùn)維平臺:提供運(yùn)維界面,方便開發(fā)和運(yùn)維人員管理生產(chǎn)環(huán)境上已經(jīng)發(fā)布的定時(shí)任務(wù)和注冊中心; 水平分片 橫向拓容 Springboot2默認(rèn)數(shù)據(jù)庫連接池選擇了HikariCP <dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId> </dependency> spring:datasource:url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=falsedriver-class-name: com.mysql.jdbc.Driverusername: rootpassword: roottype: com.zaxxer.hikari.HikariDataSource# 自動創(chuàng)建更新驗(yàn)證數(shù)據(jù)庫結(jié)構(gòu)jpa:hibernate:ddl-auto: updateshow-sql: truedatabase: mysql
?
==========================================API配置啟動==========================================
引入Maven
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>${latest.release.version}</version> </dependency>?
方法參數(shù)ShardingContext包含作業(yè)配置,分片和運(yùn)行時(shí)信息。可通過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數(shù),運(yùn)行在本作業(yè)服務(wù)器的分片序列號集合等。
Simple類型作業(yè)
與Quartz接口相似,用于執(zhí)行普通的定時(shí)任務(wù),只是增加了彈性拓容和分片等功能:
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ... }} }?
Dataflow類型作業(yè)
Dataflow類型用于處理數(shù)據(jù)流,需實(shí)現(xiàn)DataflowJob接口。該接口提供2個方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。
public class MyElasticJob implements DataflowJob<Foo> {@Overridepublic List<Foo> fetchData(ShardingContext context) {switch (context.getShardingItem()) {case 0: List<Foo> data = // get data from database by sharding item 0return data;case 1: List<Foo> data = // get data from database by sharding item 1return data;case 2: List<Foo> data = // get data from database by sharding item 2return data;// case n: ... }}@Overridepublic void processData(ShardingContext shardingContext, List<Foo> data) {// process data// ... } }流式處理
可通過DataflowJobConfiguration配置是否流式處理。流式處理數(shù)據(jù)只有fetchData方法的返回值為null或集合長度為空時(shí),作業(yè)才停止抓取,否則作業(yè)將一直運(yùn)行下去; 非流式處理數(shù)據(jù)則只會在每次作業(yè)執(zhí)行過程中執(zhí)行一次fetchData方法和processData方法,隨即完成本次作業(yè)。如果采用流式作業(yè)處理方式,建議processData處理數(shù)據(jù)后更新其狀態(tài),避免fetchData再次抓取到,從而使得作業(yè)永不停止。 流式數(shù)據(jù)處理參照TbSchedule設(shè)計(jì),適用于不間歇的數(shù)據(jù)處理。
?
作業(yè)配置
Elastic-Job配置分為3個層級,分別是Core, Type和Root。每個層級使用相似于裝飾者模式的方式裝配。
Core對應(yīng)JobCoreConfiguration,用于提供作業(yè)核心配置信息,如:作業(yè)名稱、分片總數(shù)、CRON表達(dá)式等。
Type對應(yīng)JobTypeConfiguration,有3個子類分別對應(yīng)SIMPLE, DATAFLOW和SCRIPT類型作業(yè),提供3種作業(yè)需要的不同配置,如:DATAFLOW類型是否流式處理或SCRIPT類型的命令行等。
Root對應(yīng)JobRootConfiguration,有2個子類分別對應(yīng)Lite和Cloud部署類型,提供不同部署類型所需的配置,如:Lite類型的是否需要覆蓋本地配置或Cloud占用CPU或Memory數(shù)量等。
// 定義作業(yè)核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName()); // 定義Lite作業(yè)根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();?
作業(yè)啟動
public class JobDemo {public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}private static CoordinatorRegistryCenter createRegistryCenter() {CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {// 創(chuàng)建作業(yè)配置// ... } }?
==========================================Spring配置啟動==========================================
引入Maven
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>${latest.release.version}</version> </dependency>?
作業(yè)配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.dangdang.com/schema/ddframe/reghttp://www.dangdang.com/schema/ddframe/reg/reg.xsdhttp://www.dangdang.com/schema/ddframe/jobhttp://www.dangdang.com/schema/ddframe/job/job.xsd"><!--配置作業(yè)注冊中心 --><reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /><!-- 配置作業(yè)--><job:simple id="demoSimpleSpringJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>?
作業(yè)啟動
將配置Spring命名空間的xml通過Spring啟動,作業(yè)將自動加載。
轉(zhuǎn)載于:https://www.cnblogs.com/ijavanese/p/9956094.html
總結(jié)
以上是生活随笔為你收集整理的Elastic Job入门(1) - 简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [转]使用HttpOnly提升Cooki
- 下一篇: MySQL之视图