quartz分布式集群部署并且可视化配置job定时任务
2019獨角獸企業重金招聘Python工程師標準>>>
?????項目使用quartz框架完成了定時任務集群部署調度,并且對quartz進一步封裝完成在web界面可動態配置定時任務。定時任務如果集群部署,到達時間點時,同一個任務只能在其中一臺機器上執行。對于quartz框架,其支持分布式集群的方案是使用數據庫來加鎖調度。
? ? ?
????以下是quartz分布式集群部署,并且可以動態配置job的代碼。使用了spring和mybatis,數據庫使用了postgresql(用mysql也差不多,只要改下數據源dataSource,以及quartz.properties中的org.quartz.jobStore.driverDelegateClass)。
quartz.properties:
# Default Properties file for use by StdSchedulerFactory # to create a Quartz Scheduler Instance, if a different # properties file is not explicitly specified. ##org.quartz.scheduler.instanceName: DefaultQuartzScheduler org.quartz.scheduler.instanceName: ClusteredScheduler org.quartz.scheduler.instanceId: AUTOorg.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 2 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.class : org.quartz.impl.jdbcjobstore.JobStoreTX ##這里使用postgresql數據庫 org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate org.quartz.jobStore.misfireThreshold : 60000 org.quartz.jobStore.useProperties : true org.quartz.jobStore.tablePrefix : QRTZ_org.quartz.jobStore.isClustered : true org.quartz.jobStore.clusterCheckinInterval : 15000application-scheduler.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans"xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"default-autowire="byName" default-lazy-init="true"><bean id="schedulerFactoryBean" lazy-init="true"class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><!-- dataSource 需另外配 --><property name="dataSource" ref="dataSource" /><property name="waitForJobsToCompleteOnShutdown" value="false" /><property name="autoStartup" value="true" /><property name="overwriteExistingJobs" value="true" /><property name="configLocation" value="classpath:quartz.properties" /><!-- 由于是在web界面動態創建job,這里不定義 <property name="triggers"><list><ref bean="testTrigger" /> </list> </property> --></bean> </beans>?
SchedulerJobBO:
/*** job信息bean* @author hanxuetong**/ @SuppressWarnings("serial") public class SchedulerJobBO extends BasePO{/*** 定時任務運行時狀態*/public static final int SCHEDULER_RUN_STATE=1;/*** 定時任務關閉時狀態*/public static final int SCHEDULER_STOP_STATE=0;/*** 任務名*/private String jobName;private String jobGroup;/*** 任務類的路徑*/private String jobClassPath;/*** cron表達式*/private String cronExpression;/*** 是否啟動定時*/private Integer isRun;/*** 世紀運行中的狀態*/private String triggerState;/*** 世紀運行中的狀態名*/private String triggerStateName;/*** 描述*/private String description;public String getJobName() {return jobName;}public void setJobName(String jobName) {this.jobName = jobName;}public String getJobClassPath() {return jobClassPath;}public void setJobClassPath(String jobClassPath) {this.jobClassPath = jobClassPath;}public String getCronExpression() {return cronExpression;}public void setCronExpression(String cronExpression) {this.cronExpression = cronExpression;}public Integer getIsRun() {return isRun;}public void setIsRun(Integer isRun) {this.isRun = isRun;}public String getDescription() {return description;}public void setDescription(String description) {this.description = description;}public String getJobGroup() {return jobGroup;}public void setJobGroup(String jobGroup) {this.jobGroup = jobGroup;}public String getTriggerState() {return triggerState;}public void setTriggerState(String triggerState) {this.triggerState = triggerState;}public String getTriggerStateName() {return triggerStateName;}public void setTriggerStateName(String triggerStateName) {this.triggerStateName = triggerStateName;}}TriggerStateEnum:
/*** quartz 任務實時狀態枚舉* @author hanxuetong**/ public enum TriggerStateEnum {WAITING("WAITING", "等待"),PAUSED("PAUSED", "暫停"),ACQUIRED("ACQUIRED", "正常執行"),BLOCKED("BLOCKED", "阻塞"),ERROR("ERROR", "錯誤"),NORUN("NORUN", "未開啟");String key;String desc;public static String getDescByKey(String key) {if (key==null) {return "";}for (TriggerStateEnum triggerStateEnum : TriggerStateEnum.values()) {if (triggerStateEnum.getKey().equals(key)) {return triggerStateEnum.getDesc();}}return key;}private TriggerStateEnum(String key, String desc) {this.key = key;this.desc = desc;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}}SchedulerJobDao:
public interface SchedulerJobDao {public SchedulerJobBO loadById(String id);public void insert(SchedulerJobBO schedulerJobBO);public void update(SchedulerJobBO schedulerJobBO);public void delete(String id);public List<SchedulerJobBO> list(Map<String, Object> params);}scheduler_job.xml:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "classpath:mybatis-3-mapper.dtd"> <mapper namespace="scheduler_job"><sql id="Base_Column_List" >id ,scheduler_job.job_name jobName ,scheduler_job.job_group jobGroup,cron_expression cronExpression ,job_class_path jobClassPath ,is_run isRun ,scheduler_job.description ,date_create dateCreate ,date_update dateUpdate,date_delete dateDelete,COALESCE(qrtz_triggers.trigger_state, 'NORUN') triggerState</sql><select id="loadById" resultType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO" parameterType="String">SELECT <include refid="Base_Column_List" />FROM scheduler_job left join qrtz_triggers on qrtz_triggers.job_name=scheduler_job.job_name and qrtz_triggers.job_group=scheduler_job.job_groupwhere scheduler_job.id=#{id} </select><insert id="insert" parameterType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO">insert into scheduler_job(id ,job_name ,job_group,cron_expression ,job_class_path ,is_run ,description ,date_create ,date_update )values(#{id},#{jobName},#{jobGroup},#{cronExpression},#{jobClassPath},#{isRun},#{description}, now(),now())</insert><select id="list" resultType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO" parameterType="map">select <include refid="Base_Column_List" /> from scheduler_job left join qrtz_triggers on qrtz_triggers.job_name=scheduler_job.job_name and qrtz_triggers.job_group=scheduler_job.job_group<trim prefix = "where" prefixOverrides="and|or"> scheduler_job.date_delete is null<if test="id != '' and id != null "> and scheduler_job.id= #{id}</if><if test="status != '' and status != null"> and scheduler_job.status= #{status}</if><if test="job_name != '' and job_name != null"> and scheduler_job.job_name= #{jobName}</if><if test="job_group != '' and job_group != null"> and scheduler_job.job_group= #{jobGroup}</if></trim> order by scheduler_job.job_name</select><update id="update" parameterType="com.hxt.common.bean.bo.scheduler.SchedulerJobBO">update scheduler_job set job_name=#{jobName},job_group=#{jobGroup},cron_expression=#{cronExpression},job_class_path=#{jobClassPath},is_run=#{isRun},description=#{description},date_update=now()where id=#{id}</update><update id="delete" parameterType="String">update scheduler_job set date_delete=now()where id=#{id}</update></mapper>QuartzSchedulerManage:
/*** * 動態添加任務* * @author hanxuetong**/ public interface QuartzSchedulerManage {/*** 創建定時任務* @param jobName* @param jobGroup* @param cronExpression* @param jobClass* @param jobClassParam 運行時的任務類方法可以獲取* @throws SchedulerException*/public void createScheduleJob(String jobName, String jobGroup,String cronExpression,String jobClassPath ,Map<String,Object> jobClassParam) throws Exception;/*** 運行一次任務* * @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void runOnce(String jobName, String jobGroup) throws SchedulerException ;/*** 暫停任務* * @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void pauseJob(String jobName, String jobGroup) throws SchedulerException ;/*** 恢復任務** @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void resumeJob(String jobName, String jobGroup) throws SchedulerException;/*** 更新定時任務** @param scheduler the scheduler* @param jobName the job name* @param jobGroup the job group* @param cronExpression the cron expression* @param isSync the is sync* @param param the param* @throws SchedulerException */public void updateScheduleJob(String jobName, String jobGroup,String cronExpression) throws SchedulerException;/*** 刪除定時任務** @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void deleteScheduleJob(String jobName, String jobGroup) throws SchedulerException;}QuartzSchedulerManageImpl:
/*** * 動態添加任務* * @author hanxuetong**/ @Service public class QuartzSchedulerManageImpl implements QuartzSchedulerManage{@AutowiredSchedulerFactoryBean schedulerFactoryBean;private Scheduler getScheduler(){return schedulerFactoryBean.getScheduler();}/*** 獲取觸發器key* * @param jobName* @param jobGroup* @return*/private TriggerKey getTriggerKey(String jobName, String jobGroup) {return TriggerKey.triggerKey(jobName, jobGroup);}private Class<? extends Job> getClassByPath(String jobClassPath) throws Exception {Class<? extends Job> clazz;try {clazz = (Class<? extends Job>) Class.forName(jobClassPath);} catch (Exception e) {throw new SchedulerException("任務類加載失敗!!");}return clazz;}/*** 獲取表達式觸發器** @param scheduler the scheduler* @param jobName the job name* @param jobGroup the job group* @return cron trigger* @throws SchedulerException */public CronTrigger getCronTrigger(String jobName, String jobGroup) throws SchedulerException {Scheduler scheduler = getScheduler();try {TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);return (CronTrigger) scheduler.getTrigger(triggerKey);} catch (SchedulerException e) {throw new SchedulerException("獲取定時任務CronTrigger出現異常");}}/*** 創建定時任務* @param jobName* @param jobGroup* @param cronExpression* @param jobClass* @param jobClassParam 運行時的任務類方法可以獲取* @throws SchedulerException*/public void createScheduleJob(String jobName, String jobGroup,String cronExpression,String jobClassPath ,Map<String,Object> jobClassParam) throws Exception {//同步或異步// Class<? extends Job> jobClass = isSync ? JobSyncFactory.class : JobFactory.class;try {Class<? extends Job> jobClass=getClassByPath( jobClassPath);Scheduler scheduler = getScheduler();//構建job信息JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build();if(jobClassParam!=null&&jobClassParam.size()>0){//放入參數,運行時的方法可以獲取for(Map.Entry<String, Object> entry:jobClassParam.entrySet()){jobDetail.getJobDataMap().put(entry.getKey(), entry.getValue());} }//表達式調度構建器 加上 withMisfireHandlingInstructionDoNothing防止啟動就運行CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();//按新的cronExpression表達式構建一個新的triggerCronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup).withSchedule(scheduleBuilder).build();scheduler.scheduleJob(jobDetail, trigger);} catch (SchedulerException e) {throw new SchedulerException("創建定時任務失敗");}catch (Exception e) {throw e;}}/*** 運行一次任務* * @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void runOnce(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);try {Scheduler scheduler = getScheduler();scheduler.triggerJob(jobKey);} catch (SchedulerException e) {throw new SchedulerException("運行一次定時任務失敗");}}/*** 暫停任務* * @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void pauseJob(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);try {Scheduler scheduler = getScheduler();scheduler.pauseJob(jobKey);} catch (SchedulerException e) {throw new SchedulerException("暫停定時任務失敗");}}/*** 恢復任務** @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void resumeJob(String jobName, String jobGroup) throws SchedulerException {JobKey jobKey = JobKey.jobKey(jobName, jobGroup);try {Scheduler scheduler = getScheduler();scheduler.resumeJob(jobKey);} catch (SchedulerException e) {throw new SchedulerException("暫停定時任務失敗");}}/*** 獲取jobKey** @param jobName the job name* @param jobGroup the job group* @return the job key*/public JobKey getJobKey(String jobName, String jobGroup) {return JobKey.jobKey(jobName, jobGroup);}/*** 更新定時任務** @param scheduler the scheduler* @param jobName the job name* @param jobGroup the job group* @param cronExpression the cron expression* @param isSync the is sync* @param param the param* @throws SchedulerException */public void updateScheduleJob(String jobName, String jobGroup,String cronExpression) throws SchedulerException {//同步或異步 // Class<? extends Job> jobClass = isSync ? JobSyncFactory.class : JobFactory.class;try {Scheduler scheduler = getScheduler();TriggerKey triggerKey = getTriggerKey(jobName, jobGroup);//表達式調度構建器CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);//按新的cronExpression表達式重新構建triggertrigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();//按新的trigger重新設置job執行scheduler.rescheduleJob(triggerKey, trigger);} catch (SchedulerException e) {throw new SchedulerException("更新定時任務失敗");}}/*** 刪除定時任務** @param scheduler* @param jobName* @param jobGroup* @throws SchedulerException */public void deleteScheduleJob(String jobName, String jobGroup) throws SchedulerException {try {Scheduler scheduler = getScheduler();scheduler.deleteJob(getJobKey(jobName, jobGroup));} catch (SchedulerException e) {throw new SchedulerException("刪除定時任務失敗");}}public SchedulerFactoryBean getSchedulerFactoryBean() {return schedulerFactoryBean;}public void setSchedulerFactoryBean(SchedulerFactoryBean schedulerFactoryBean) {this.schedulerFactoryBean = schedulerFactoryBean;}}SchedulerJobService :
public interface SchedulerJobService {public void add(SchedulerJobBO schedulerJobBO) throws Exception ;public void update(SchedulerJobBO schedulerJobBO) throws Exception ;public void delete(String id) throws Exception ;public SchedulerJobBO loadById(String id);public List<SchedulerJobBO> list();public void startAllJob(); }SchedulerJobServiceImpl:
@Service public class SchedulerJobServiceImpl implements SchedulerJobService{private static final Logger logger = LoggerFactory.getLogger(SchedulerJobServiceImpl.class);@Autowiredprivate SchedulerJobDao schedulerJobDao;@Autowiredprivate QuartzSchedulerManage quartzSchedulerManage;@Overridepublic void add(SchedulerJobBO schedulerJobBO) throws Exception {if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null);}schedulerJobDao.insert(schedulerJobBO); }@Overridepublic void update(SchedulerJobBO schedulerJobBO) throws Exception {if(StringUtil.isEmpty(schedulerJobBO.getId())){throw new SchedulerException("id can not null!");}SchedulerJobBO selectSchedulerJobBO=schedulerJobDao.loadById(schedulerJobBO.getId());if(selectSchedulerJobBO==null){throw new SchedulerException("schedulerJob is null!");}if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){ //任務啟動if(!selectSchedulerJobBO.getJobClassPath().equals(schedulerJobBO.getJobClassPath())){// 任務類路徑已經變,需刪除定時,再重新建立新任務if(checkHasJobRun(selectSchedulerJobBO)){quartzSchedulerManage.deleteScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup());}quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null);}else{if(!checkHasJobRun(selectSchedulerJobBO)){ // quartz中沒有該任務quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null);}else{if(!selectSchedulerJobBO.getCronExpression().equals(schedulerJobBO.getCronExpression())){ //Cron表達式改變quartzSchedulerManage.updateScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression());}}}}else{ //任務關閉if(checkHasJobRun(selectSchedulerJobBO)){ //當前任務quartz中存在quartzSchedulerManage.deleteScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup());}}schedulerJobDao.update(schedulerJobBO);}/*** 判斷quartz中是否有該任務* @param selectSchedulerJobBO* @return true: quartz有任務 , false:quartz無任務*/private boolean checkHasJobRun(SchedulerJobBO selectSchedulerJobBO){return !TriggerStateEnum.NORUN.getKey().equals(selectSchedulerJobBO.getTriggerState());}@Overridepublic void delete(String id) throws Exception {SchedulerJobBO selectSchedulerJobBO=schedulerJobDao.loadById(id);if(selectSchedulerJobBO==null){throw new SchedulerException("schedulerJob is null!");}if(selectSchedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE){quartzSchedulerManage.deleteScheduleJob(selectSchedulerJobBO.getJobName(), selectSchedulerJobBO.getJobGroup());}schedulerJobDao.delete(id);}@Overridepublic SchedulerJobBO loadById(String id) {return schedulerJobDao.loadById(id);}@Overridepublic List<SchedulerJobBO> list() {return schedulerJobDao.list(new HashMap<String,Object>());} /*public QuartzSchedulerManageImpl getQuartzSchedulerEngine() {return quartzSchedulerEngine;}public void setQuartzSchedulerEngine(QuartzSchedulerManageImpl quartzSchedulerEngine) {this.quartzSchedulerEngine = quartzSchedulerEngine;}public void setSchedulerJobDao(SchedulerJobDao schedulerJobDao) {this.schedulerJobDao = schedulerJobDao;} */@Overridepublic void startAllJob() {logger.info("start up all jobs!");List<SchedulerJobBO> schedulerJobs= list();for(SchedulerJobBO schedulerJobBO:schedulerJobs){if(schedulerJobBO.getIsRun()==SchedulerJobBO.SCHEDULER_RUN_STATE&&!checkHasJobRun(schedulerJobBO) ){try {quartzSchedulerManage.createScheduleJob(schedulerJobBO.getJobName(), schedulerJobBO.getJobGroup(), schedulerJobBO.getCronExpression(), schedulerJobBO.getJobClassPath(), null);} catch (Exception e) {e.printStackTrace();}}}logger.info("end all jobs load!");}}BaseJob:
/*** job具體實現需要繼承此類* @author hanxuetong**/ public abstract class BaseJob implements Job {protected final Logger logger= LoggerFactory.getLogger(BaseJob.class);public abstract void run();@Overridepublic void execute(JobExecutionContext context)throws JobExecutionException {try{run();}catch(Exception t){logger.error("Job throw exception", t);t.printStackTrace();}}/*** 根據BeanId獲取Bean實例*/public <T> T getBean(String beanId, Class<T> clazz){return ApplicationContextUtil.getBean(beanId, clazz);}/*** 根據BeanId獲取Bean實例*/public <T> T getBean(Class<T> clazz){return ApplicationContextUtil.getApplicationContext().getBean(clazz);} }TestJobTask:
public class TestJobTask extends BaseJob{private static final Logger logger = LoggerFactory.getLogger(TestJobTask.class);@Overridepublic void run() {System.out.println("run test start ");logger.debug(" run test!");try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("run test end ");}}當然數據庫中還要導入quartz的對應數據源的sql,在下載的quartz-2.2.2-distribution中\docs\dbTables\ 目錄下。本項目中用到postgresql,則是tables_postgres.sql。
?
轉載于:https://my.oschina.net/passerman/blog/705004
總結
以上是生活随笔為你收集整理的quartz分布式集群部署并且可视化配置job定时任务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Gradle学习网站
- 下一篇: 222. Count Complete