admin管理员组

文章数量:1794759

Quartz实现分布式集群schedule调度(定时任务执行)

Quartz实现分布式集群schedule调度(定时任务执行)

项目需求: 需要执行一些定时任务,比如每间隔几小时同步其他系统中信,或者定期监控一些服务是否正常。如果定时任务程序有多个实例在运行,就是运行多次,因此需要集群分布式的schedule。保证定时任务有且只有一个执行。

调研各种schedule框架后,选择Quartz,原因如下:1, 项目已经在使用quartz,大家比较熟悉,2,quartz提供的clustering功能完备,满足项目的需求。

说明, 本文主要参考quartz的官方文档,从www.quartz-scheduler/downloads下载, 解压后参考example13 - Clustering Quartz and JDBC Job Stores。 我下载的是quartz-2.2.3-distribution.tar.gz

具体步骤, 我们使用springboot程序,完整代码在这里, 欢迎加星,fork。 我使用了本机的数据库,用户名和密码也配置了,运行时需要先创建数据库,创建表,然后修改application.properties中的配置信。 因此具体步骤如下

  • 初始数据库 初始脚本在 quartz-2.2.3\\docs\\dbTables下面。我用的是mysql数据innodb引擎,因此运行tables_mysql_innodb.sql脚本。

  • 引入依赖包

  • 这里主要列出quartz的依赖包,springboot的可以参考完整的代码

    <quartz.version>2.3.0</quartz.version> ... <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>${quartz.version}</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>${quartz.version}</version> </dependency>
  • 创建数据源配置
  • package com.yq.config; import javax.sql.DataSource; import com.alibaba.druid.pool.DruidDataSource; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DataSourceConfig { @Value("${spring.datasource.url:jdbc:mysql://127.0.0.1:3306/myscheduler?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false}") private String url; @Value("${spring.datasource.driver-class-name:com.mysql.jdbc.Driver}") private String driverClassName; @Value("${spring.datasource.username:user1}") private String userName; @Value("${spring.datasource.password:password}") private String password; @Value("${spring.datasource.initialSize:6}") private int initialSize; @Value("${spring.datasource.minIdle:6}") private int minIdle; @Value("${pring.datasource.maxActive:24}") private int maxActive; @Value("${spring.datasource.maxWait:60000}") private int maxWait; @Value("${spring.datasource.timeBetweenEvictionRunsMilli:50000}") private int timeBetweenEvictionRunsMillis; @Value("${spring.datasource.minEvictableIdleTimeMillis:200000}") private int minEvictableIdleTimeMillis; @Value("${spring.datasource.validationQuery:SELECT 1 FROM DUAL}") private String validationQuery; @Value("${spring.datasource.testWhileIdle:true}") private boolean testWhileIdle; @Value("${spring.datasource.testOnBorrow:false}") private boolean testOnBorrow; @Value("${spring.datasource.testOnReturn:false}") private boolean testOnReturn; @Value("${spring.datasource.poolPreparedStatements:true}") private boolean poolPreparedStatements; @Value("${spring.datasource.connectionProperties:druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000}") private String properties; @Bean public DataSource dataSource(){ DruidDataSource configDataSource = new DruidDataSource(); configDataSource.setUrl(url); configDataSource.setDriverClassName(driverClassName); configDataSource.setUsername(userName); configDataSource.setPassword(password); configDataSource.setInitialSize(initialSize); configDataSource.setDefaultAutoCommit(true); configDataSource.setMinIdle(minIdle); configDataSource.setMaxActive(maxActive); configDataSource.setMaxWait(maxWait); configDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); configDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); configDataSource.setValidationQuery(validationQuery); configDataSource.setTestWhileIdle(testWhileIdle); configDataSource.setTestOnBorrow(testOnBorrow); configDataSource.setTestOnReturn(testOnReturn); configDataSource.setConnectionProperties(properties); return configDataSource; } }

    3 ,创建Quartz调度配置类

    package com.yq.config; import com.yq.job.HelloWorldJob; import com.yq.job.UpdateRunningDaysJob; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.spi.JobFactory; import org.quartz.spi.TriggerFiredBundle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.scheduling.quartz.SpringBeanJobFactory; import java.io.IOException; import java.util.Properties; @Configuration @Slf4j public class QuartzSchedulerConfig { @Autowired private DataSource dataSource; private static final String QUARTZ_PROPERTIES_NAME = "/quartz.properties"; @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } @Bean public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, CronTrigger[] cronTrigger, JobDetail[] jobDetails) { SchedulerFactoryBean factoryBean = new SchedulerFactoryBean(); try { factoryBean.setQuartzProperties(quartzProperties()); factoryBean.setDataSource(dataSource); factoryBean.setJobFactory(jobFactory); factoryBean.setTriggers(cronTrigger); factoryBean.setJobDetails(jobDetails); factoryBean.setOverwriteExistingJobs(true); } catch (Exception e) { log.error("Failed to load config file {}.", QUARTZ_PROPERTIES_NAME, e); throw new RuntimeException("LoadingConfigFileError", e); } return factoryBean; } @Bean(name = "job1Trigger") public CronTriggerFactoryBean job1Trigger(@Qualifier("jobHelloWorldDetail") JobDetail jobDetail) { CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(jobDetail); //这里为了演示效果,直接hardcode为"0/15 * * * * ?", 实际项目中可以配置在application.propertis中 cronTriggerFactoryBean.setCronExpression("0/15 * * * * ?"); return cronTriggerFactoryBean; } @Bean(name = "jobHelloWorldDetail") public JobDetailFactoryBean job1Detail() { JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean(); //HelloWorldJob.class是具体的执行任务的job类 jobDetailFactoryBean.setJobClass(HelloWorldJob.class); jobDetailFactoryBean.setDurability(true); return jobDetailFactoryBean; } @Bean(name = "job2Trigger") public CronTriggerFactoryBean job2Trigger(@Qualifier("jobUpdateRunningDaysDetail") JobDetail jobDetail) { CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(jobDetail); //这里为了演示效果,直接hardcode为"0/15 * * * * ?", 实际项目中可以配置在application.propertis中 cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?"); return cronTriggerFactoryBean; } @Bean(name = "jobUpdateRunningDaysDetail") public JobDetailFactoryBean job2Detail() { JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean(); //UpdateRunningDaysJob.class是具体的执行任务的job类 jobDetailFactoryBean.setJobClass(UpdateRunningDaysJob.class); jobDetailFactoryBean.setDurability(true); return jobDetailFactoryBean; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_PROPERTIES_NAME)); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { private transient AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(final ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } } }

    4, 创建job job是schedule具体执行的任务,我们可以在job中引入serviceImpl类,这里是示例代码因此只打印日志。job继承QuartzJobBean ,只需要实现executeInternal方法即可。

    package com.yq.job; import lombok.extern.slf4j.Slf4j; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @Slf4j public class HelloWorldJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { log.info("HelloWorldJob at {}", (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")).format(new Date())); } }

    5, 配置quartz.properties

    org.quartz.scheduler.instanceName = quartzScheduler org.quartz.scheduler.instanceId = AUTO org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.useProperties = false org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 10 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

    完整的代码在这里,运行多个实例后可以观察日志,只有一个实例执行schedule,多个实例之间随机获得机会执行schedule任务。

    本文标签: 分布式集群quartzschedule