admin管理员组文章数量:1794759
SpringBoot与quartz框架实现分布式定时任务
目录
- 前言
- 一、介绍
- 1.quartz 核心概念
- 2.原理图
- 二、使用步骤
- 1.引入依赖
- 2.在项目中添加quartz.properties文件(不添加该文件该框架会加载自带的properties文件)
- 3.在数据库中创建quartz相关的表(建议与业务隔离库)
- 4.项目结构
- 5.代码配置
- 1) 多数据源配置 - DataSourceConfig:
- 2) 配置文件 - application.yml:
- 3) Quartz配置 - QuartzConfig:
- 4) 多数据源 Mybatis配置(其一) - DemoMybatisConfig:
- 6.开发流程
- 1) JobCommandLine:
- 2) Job:
- 3) CommonAbstractJob:
- 4) ScheduleJobService:
- 5) SimpleAbstractJob:
- 6) DemoJob:
- 总结
quartz的分布式调度策略是以数据库为边界资源的一种异步策略。各个调度器都遵守一个基于数据库锁的操作规则从而保证了操作的唯一性。 在quartz的集群解决方案里有张表scheduler_locks,quartz采用了悲观锁的方式对triggers表进行行加锁,以保证任务同步的正确性。一旦某一个节点上面的线程获取了该锁,那么这个Job就会在这台机器上被执行,同时这个锁就会被这台机器占用。同时另外一台机器也会想要触发这个任务,但是锁已经被占用了,就只能等待,直到这个锁被释放。
提示:以下是本篇文章正文内容。
一、介绍 1.Quartz 核心概念我们需要明白 Quartz 的几个核心概念,这样理解起 Quartz 的原理就会变得简单了。
JobDetail: 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
Trigger: 代表一个调度参数的配置,什么时候去调。
Scheduler: 代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。
代码如下(示例):
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <!--mybatis--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>com.mchange</groupId> <artifactId>c3p0</artifactId> <version>0.9.5.2</version> </dependency> <!-- Druid是阿里巴巴推出的国产数据库连接池,据网上测试对比,比目前的DBCP或C3P0数据库连接池性能更好--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <!--google工具类--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> 2.在项目中添加quartz.properties文件(不添加该文件该框架会加载自带的properties文件) # Default Properties file for use by StdSchedulerFactory # to create a Quartz Scheduler Instance, if a different # properties file is not explicitly specified. #使用自己的配置文件 org.quartz.jobStore.useProperties:true #默认或是自己改名字都行 org.quartz.scheduler.instanceName: DefaultQuartzScheduler #如果使用集群,instanceId必须唯一,设置成AUTO org.quartz.scheduler.instanceId = AUTO org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 #============================================================================ # Configure JobStore #============================================================================ #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore #存储方式使用JobStoreTX,也就是数据库 org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate #数据库中quartz表的表名前缀 org.quartz.jobStore.tablePrefix:qrtz_ org.quartz.jobStore.dataSource:qzDS #是否使用集群(如果项目只部署到 一台服务器,就不用了) org.quartz.jobStore.isClustered = true #============================================================================ # Configure Datasources #============================================================================ #配置数据库源(org.quartz.dataSource.qzDS.maxConnections: c3p0配置的是有s的,druid数据源没有s) org.quartz.dataSource.qzDS.connectionProvider.class:com.cbw.quartz02.util.DruidConnectionProvider org.quartz.dataSource.qzDS.driver: com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL: jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8 org.quartz.dataSource.qzDS.user: root org.quartz.dataSource.qzDS.password: 123 org.quartz.dataSource.qzDS.maxConnection: 10在依赖中可以看到引入了两种连接池,这两种连接池是可选择的。quartz框架默认的选择C3P0连接池,如果想要更换连接池就需要配置文件,如上进行修改。
3.在数据库中创建quartz相关的表(建议与业务隔离库)1)进入quartz的官网www.quartz-scheduler/,点击Downloads,下载后在目录\\docs\\dbTables下有常用数据库创建quartz表的脚本。 例如:“tables_mysql.sql” tables_mysql.sql 、tables_mysql_innodb.sql 上述两者所有的数据库引擎不一样,根据需要进行选择。导入之后,数据会出现下列几张表,但没有数据。
2)本博客最后项目中包含数据库脚本。 3)请问度娘。
4.项目结构包名解释:
代码解释: 1. schedulerFactory方法:注入schedulerFactory。 2. jobFactory方法:注入AutowiringSpringBeanJobFactory。 3. scheduleJobService方法:注入ScheduleJobService。 4. threadPoolTaskExecutor方法:注入ThreadPoolTaskExecutor(线程池任务执行器)。
4) 多数据源 Mybatis配置(其一) - DemoMybatisConfig: @Configuration @MapperScan(value = "com.wyc.demo.dao.demo", sqlSessionTemplateRef = "demoSqlSessionTemplate") public class DemoMybatisConfig { @Bean(name = "demoTransactionManager") public DataSourceTransactionManager adminTransactionManager(@Qualifier("demoDataSource") DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean(name = "demoSqlSessionFactory") public SqlSessionFactory adminSqlSessionFactory(@Qualifier("demoDataSource") DataSource dataSource, @Value("${mybatis.demo.mapper-locations}") Resource[] mappers) throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSource); factoryBean.setMapperLocations(mappers); return factoryBean.getObject(); } @Bean(name = "demoSqlSessionTemplate") public SqlSessionTemplate adminSqlSessionTemplate(@Qualifier("demoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); } } 6.开发流程 1) JobCommandLine: @Component public class JobCommandLine implements CommandLineRunner { @Resource private DemoJob demoJob; @Override public void run(String... args) throws Exception { demoJob.startJob(); } }代码解释: 1. 此类(JobCommandLine)实现了SpringBoot中的CommandLineRunner接口,这是一个函数式接口,应用启动时会执行其中的run方法,可以搭配定时任务使用。 2. run方法中的demoJob.startJob() 执行的业务数据源。 3. 进入到startJob() 方法中,进入到了SimpleAbstractJob类。
2) Job:代码解释: 1. 此接口来自 package org.quartz;,这就是quartz的四大核心概念之一,这个接口里只有一个方法,就是 execute方法,方法体里的内容就是定时任务的执行程序,需要我们实现该接口来添加。(该方法很重要,下面也有讲述)
3) CommonAbstractJob: abstract class CommonAbstractJob implements Job { @Resource protected ScheduleJobService scheduleJobService; private static final SimpleDateFormat CRON_FORMAT = new SimpleDateFormat("ss mm HH dd MM ? yyyy"); protected final Logger logger = LoggerFactory.getLogger(getClass()); /** * 获取JobGroup * @return */ public abstract JobGroupType getJobGroup(); /** * 获取cron表达式 * * @param date * @return */ public String parseCronExpression(Date date) { return CRON_FORMAT.format(date); } protected void enableSchedule(ScheduleJob job, JobDataMap jobDataMap) throws Exception { scheduleJobService.enableSchedule(job, jobDataMap); } public String getJobGroupString(){ return getJobGroup().getCode(); } }代码解释:
这个方法是什么意思呢?进入到ScheduleJobService中看一下便知晓了。
4) ScheduleJobService: public class ScheduleJobService { @Autowired @Qualifier("schedulerFactory") private Scheduler scheduler; /** * 启用定时任务或重设定时任务的触发时间 * * @param job * @param jobDataMap * @throws Exception */ public void enableSchedule(ScheduleJob job, JobDataMap jobDataMap) throws Exception { if (job == null) { return; } JobDetail jobDetail = JobBuilder.newJob(job.getJobExecuteClass()) .withIdentity(job.getJobName(), job.getJobGroup().getCode()) .withDescription(job.getJobGroup().getDesc()) .build(); if (jobDataMap != null) { jobDetail.getJobDataMap().putAll(jobDataMap); } //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); //按新的cronExpression表达式构建一个新的trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(job.getTriggerName(), job.getJobGroup().getCode()) .withSchedule(scheduleBuilder) .withDescription(job.getJobGroup().getDesc()) .build(); Trigger exists = scheduler.getTrigger(trigger.getKey()); if (exists != null) { if (exists instanceof CronTriggerImpl) { ((CronTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof CalendarIntervalTriggerImpl) { ((CalendarIntervalTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof DailyTimeIntervalTriggerImpl) { ((DailyTimeIntervalTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof SimpleTriggerImpl) { ((SimpleTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } } scheduler.scheduleJob(jobDetail, Sets.newHashSet(trigger), true); } /** * 删除定时任务 * * @param jobName * @param jobGroup * @throws Exception */ public void removeSchedule(String jobName, String jobGroup) throws Exception { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); scheduler.pauseJob(jobKey); scheduler.deleteJob(jobKey); } /** * 删除定时任务 * * @param keys:jobGroup.jobName * @throws Exception */ public void removeSchedule(String keys) throws Exception { String[] arr = keys.split("[.]"); String jobName = arr[1]; String jobGroup = arr[0]; removeSchedule(jobName, jobGroup); } /** * 立即执行定时任务 * * @param jobName * @param jobGroup * @param delete * @param block * @throws Exception */ public void execSchedule(String jobName, String jobGroup, JobDataMap jobDataMap, boolean delete, boolean block) throws Exception { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); scheduler.triggerJob(jobKey, jobDataMap); RemoveAfterRunListener afterExecListener = new RemoveAfterRunListener(); if (delete) {//如果要执行完后立即取消定时器 scheduler.getListenerManager().addJobListener(afterExecListener, KeyMatcher.keyEquals(jobKey)); } if (block) {//如果要阻塞等待回调结果 long start = System.currentTimeMillis(); int state = afterExecListener.getState(); while (state != 1 && (System.currentTimeMillis() - start) < 1000L) { state = afterExecListener.getState(); } } } /** * 暂停定时任务 * * @param jobName * @param jobGroup * @throws Exception */ public void pauseSchedule(String jobName, String jobGroup) throws Exception { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); scheduler.pauseJob(jobKey); } /** * 恢复定时任务 * * @param jobName * @param jobGroup * @throws Exception */ public void resumeSchedule(String jobName, String jobGroup) throws Exception { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); scheduler.resumeJob(jobKey); } /** * 根据jobName和jobGroup获取jobDataMap * * @param jobName * @param jobGroup * @return * @throws Exception */ public JobDataMap getJobDataMap(String jobName, String jobGroup) throws Exception { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); JobDetail jobDetail = scheduler.getJobDetail(jobKey); JobDataMap jobDataMap = null; if (jobDetail != null) { jobDataMap = jobDetail.getJobDataMap(); } return jobDataMap; } }代码解释: 1.在讲解enableSchedule方法之前,我想先讲解一下Scheduler是怎么被注入的。打开QuartzConfig配置类,如下:
@Bean @ConditionalOnMissingBean(name = "schedulerFactory") public SchedulerFactoryBean schedulerFactory(DataSource quartzDataSource, JobFactory jobFactory, DataSourceTransactionManager quartzTransactionManager) { SchedulerFactoryBean bean = new SchedulerFactoryBean(); bean.setDataSource(quartzDataSource); bean.setTransactionManager(quartzTransactionManager); bean.setApplicationContextSchedulerContextKey("applicationContextKey"); // bean.setConfigLocation(new ClassPathResource("quartz.properties")); bean.setJobFactory(jobFactory); bean.setTriggers(triggers.toArray(new CronTrigger[]{})); return bean; }代码解释: 1. 此处会说明Quartz四大核心概念中的Scheduler。 2. 看 scheduleFactory方法 ,被@Bean修饰,说明是一个注入的类;@ConditionalOnMissingBean(name = “schedulerFactory”),说明是在缺失schedulerFactory的时候才生效。此方法返回类型是SchedulerFactoryBean,进入到这个类中:
public class SchedulerFactoryBean extends SchedulerAccessor implements FactoryBean<Scheduler>, BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean, SmartLifecycle { // 省略 }代码解释:
代码解释:
总结:
下面回过头来继续看ScheduleJobService中的enableSchedule方法:
/** * 启用定时任务或重设定时任务的触发时间 * * @param job * @param jobDataMap * @throws Exception */ public void enableSchedule(ScheduleJob job, JobDataMap jobDataMap) throws Exception { if (job == null) { return; } JobDetail jobDetail = JobBuilder.newJob(job.getJobExecuteClass()) .withIdentity(job.getJobName(), job.getJobGroup().getCode()) .withDescription(job.getJobGroup().getDesc()) .build(); if (jobDataMap != null) { jobDetail.getJobDataMap().putAll(jobDataMap); } //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); //按新的cronExpression表达式构建一个新的trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(job.getTriggerName(), job.getJobGroup().getCode()) .withSchedule(scheduleBuilder) .withDescription(job.getJobGroup().getDesc()) .build(); Trigger exists = scheduler.getTrigger(trigger.getKey()); if (exists != null) { if (exists instanceof CronTriggerImpl) { ((CronTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof CalendarIntervalTriggerImpl) { ((CalendarIntervalTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof DailyTimeIntervalTriggerImpl) { ((DailyTimeIntervalTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } else if (exists instanceof SimpleTriggerImpl) { ((SimpleTriggerImpl) trigger).setPreviousFireTime(exists.getPreviousFireTime()); } } scheduler.scheduleJob(jobDetail, Sets.newHashSet(trigger), true); }代码解释: Quartz四大概念中的最后二位登场了,它就是JobDetail和Trigger,在这个方法中,将设置JobDetail程序和Trigger触发器并且将其放入Scheduler容器执行。
5) SimpleAbstractJob: public abstract class SimpleAbstractJob extends CommonAbstractJob { /** * 获取执行表达式 * @return */ public abstract String getCronExpression(); /** * 获取JobDataMap * @return */ public abstract JobDataMap getJobDataMap(); /** * 获取JobName * @return */ public abstract String getJobName(); public void startJob() throws Exception { String cronExpression = getCronExpression(); //jobName不要包含时间戳,和group不能同时重复 ScheduleJob job = new ScheduleJob(getJobName(), getJobGroup(), cronExpression, getClass()); enableSchedule(job, getJobDataMap()); logger.info("---设置完成:{}---", cronExpression); } /** * 停止定时器 * * @throws Exception */ public void stopJob() throws Exception { scheduleJobService.removeSchedule(getJobName(), getJobGroupString()); } /** * 立即运行定时器 * * @param delete * @param block * @throws Exception */ public void runJob(boolean delete, boolean block) throws Exception { scheduleJobService.execSchedule(getJobName(), getJobGroupString(), getJobDataMap(), delete, block); } }代码解释: 1. 继承CommonAbstractJob抽象类,此类的重点在于 startJob()方法,这是定时任务的启动方法!ScheduleJob类是我们自己定义的包装了jobName、jobGroup、cronExpression等信的实体类。如下:
public class ScheduleJob implements Serializable { private static final long serialVersionUID = -3454363184589312090L; private String jobName; private JobGroupType jobGroup; private Integer jobStatus; private String cronExpression; private String desc; private Class<? extends Job> jobExecuteClass; // 省略getter、setter代码解释: 1. DemoJob是我们自己的Job类,此类继承SimpleAbstractJob抽象类(继承了startJob方法),间接实现了Job接口的execute方法(我们定时器的业务代码写在这里面),也就是说我们的DemoJob类既拥有了startJob方法也拥有了execute方法。 2. 由上述代码可见,我们定时器任务每隔1分钟打印一次日志。
总结
源码地址:github_quartz_demo
本文标签: 分布式框架SpringBootquartz
版权声明:本文标题:SpringBoot与quartz框架实现分布式定时任务 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1686819046a106730.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论