admin管理员组

文章数量:1794759

springboot的schedule集群实现

springboot的schedule集群实现

原理:

1.利用spring的schedule功能实现定时任务 2.利用redis的过期策略实现集群中定时任务的分配(单机版redis,集群的redis请考虑redission)

内容概述:

1.多线程调度定时任务 2.增加定时任务管理表(数据库或者redis中持久化) 3.基于反射机制实现动态调用不同的自定义定时任务 4.自动的根据定时任务管理表对定时任务进行增删改查 5.通过定时任务管理使定时任务只执行一次。

1.多线程调度定时任务

默认的schedule使用的是单线程,即多个定时任务需要排队执行,如果某些定时任务耗时过长,会导致其他任务排队过久,且不利于使用redis的过期策略实现分布式定时任务的分配,所以在集群情况下,定时任务需要使用多线程实现,建议:线程数大于同时可执行任务数

/** * @author Bight Chen * @Date: 2021/9/17 10:27 * 定时任务线程池 */ @Configuration public class ScheduleConfig { @Bean public TaskScheduler taskScheduler(){ //此bean对象支持根据cron表达式创建周期性任务 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); //定时任务执行线程池核心数 //线程数一定大于当前可执行任务数 taskScheduler.setPoolSize(50); //此方法会使得任务一旦被取消将立即被移除 taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("Schedule-"); return taskScheduler; } } 2.增加定时任务管理表(数据库或者redis中持久化)

1.需要的原因是利于自动增加或者删除所需定时任务 ps: 在mysql中做为数据表 @Data是基于lombok来简化编写 get() set() toString()等方法

@Data public class ScheduleJob { //id private Long scheduleJobId; //动态bean private String beanName; //方法 private String methodName; //参数 private String jobParams; //表达式 private String jobCron; //任务名 private String jobName; private String remark; //0停止,1正常,2已完成 private String status; private String createdBy; private Date createdTime; private String lastUpdatedBy; } 3.基于反射机制实现动态调用不同的自定义定时任务

1.反射调用bean的工具类

/** * @author Bight Chen * @Date: 2021/9/17 11:54 */ @SuppressWarnings("unchecked") @Component public class SpringToolsConfig implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringToolsConfig.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } }

2.动态调用bean

/** * @author Bight Chen * @Date: 2021/9/17 11:40 */ public class TaskRunable implements Runnable { private static final Logger logger = LogManager.getLogger(TaskRunable.class); private ScheduleJob scheduleJob; public TaskRunable(ScheduleJob scheduleJob) { this.scheduleJob = scheduleJob; } private RedisTemplate redisTemplate = (RedisTemplate) SpringToolsConfig.getBean("redisTemplate"); @Override public void run() { //周期性任务:加锁的过期时间要大于不同服务器时间之差,且小于同个任务2次执行间隔(cron) //抢到锁的执行,没抢到锁的等待下一次任务执行 try { Object target = SpringToolsConfig.getBean(scheduleJob.getBeanName()); Method method = null; if (!StringUtils.isEmpty(scheduleJob.getJobParams())) { method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName(), String.class); } else { method = target.getClass().getDeclaredMethod(scheduleJob.getMethodName()); } //考虑到集群部署时,旧版本的查找不到新增的定时任务时处理,就需要先找到bean再存redis //保证原子性 if(!redisTemplate.opsForValue().setIfAbsent("schdeleJob:"+scheduleJob.getScheduleJobId().toString(), "1", 15000L,TimeUnit.MILLISECONDS)){ logger.info("{}正在执行!!,不能重复执行",scheduleJob.getJobName()); return; } logger.info("定时任务开始执行 - 参数:{}", scheduleJob.toString()); long startTime = System.currentTimeMillis(); try { ReflectionUtils.makeAccessible(method); if (!StringUtils.isEmpty(scheduleJob.getJobParams())) { JSONObject jsonObject = JSONObject.parseObject(scheduleJob.getJobParams()); jsonObject.put("scheduleJobId", scheduleJob.getScheduleJobId()); method.invoke(target, JSONObject.toJSONString(jsonObject)); } else { method.invoke(target); } } catch (Exception e) { logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e); } finally { Object oDelete = redisTemplate.opsForValue().get("schdeleJob:" + scheduleJob.getScheduleJobId()); if (oDelete != null) { //锁依旧存在则自动删除 redisTemplate.delete("schdeleJob:" + scheduleJob.getScheduleJobId().toString()); } } long times = System.currentTimeMillis() - startTime; logger.info("定时任务执行结束 -参数:{},耗时:{} 毫秒", scheduleJob.toString(), times); } catch (Exception e) { logger.error("定时任务执行异常 -参数:{} ,异常:{}", scheduleJob.toString(), e); } } } 4.自动的根据定时任务管理表对定时任务进行增删改查 /** * @author Bight Chen * @Date: 2021/9/17 13:59 * 分布式定时任务初始化 */ @Component public class ScheduleInitConfig { private static final Logger logger = LogManager.getLogger(ScheduleInitConfig.class); //内存中保存定时任务数据 private HashMap<Long, ScheduledFuture> map = new HashMap<>(); @Autowired private TaskScheduler autoTaskScheduler; @Autowired private ScheduleJobService scheduleJobService; /** * 定时自动查询增加任务数据,注入定时任务 * schedule.cron:0/15 * * * * ? */ @Scheduled(cron = "${schedule.cron}") public void autoAddTask() { Long time = System.currentTimeMillis(); try { List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample()); for (ScheduleJob scheduleJob : list) { if (map.get(scheduleJob.getScheduleJobId()) == null && "1".equals(scheduleJob.getStatus())) { //存在 启动状态的 定时任务自动增加 TaskRunable taskRunable = new TaskRunable(scheduleJob); ScheduledFuture future = autoTaskScheduler.schedule(taskRunable, new CronTrigger(scheduleJob.getJobCron())); map.put(scheduleJob.getScheduleJobId(), future); logger.info("autoAddTask,自动增加任务,参数:{}", scheduleJob.toString()); } } } catch (Exception e) { logger.error("autoAddTask,error:{}", e); } //logger.info("autoAddTask,end:{}", System.currentTimeMillis() - time); } /** * 定时自动查询任务数据,删除过期任务列表 * schedule.cron:0/15 * * * * ? */ @Scheduled(cron = "${schedule.cron}") public void autoDeleteTask() { Long time = System.currentTimeMillis(); try { List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample()); for (ScheduleJob scheduleJob : list) { if (map.get(scheduleJob.getScheduleJobId()) != null &&!"1".equals(scheduleJob.getStatus())) { //非启动中的都删除 ScheduledFuture future = map.get(scheduleJob.getScheduleJobId()); future.cancel(true); map.remove(scheduleJob.getScheduleJobId()); logger.info("autoDeleteTask,自动删除任务,参数:{}", scheduleJob.toString()); } } } catch (Exception e) { logger.error("autoDeleteTask,error:{}", e); } //logger.info("autoDeleteTask,end:{}", System.currentTimeMillis() - time); } } 5.通过定时任务管理使定时任务只执行一次。 /** * @author Bight Chen * @Date: 2021/9/17 13:53 */ @Component("testTask") public class TestTask implements BaseTask { private static final Logger logger = LogManager.getLogger(TestTask.class); @Autowired private ScheduleJobService scheduleJobService; @Override public void runTask() { } @Override public void runTask(String params) { JSONObject jsonObject = JSON.parseObject(params); Long scheduleJobId = Long.parseLong( jsonObject.get("scheduleJobId")+""); try { Thread.sleep(16000); } catch (Exception e) { logger.info("runTask>>>"+e.getMessage()); }finally { if (scheduleJobId != null) { //最后关闭当前任务,使任务执行一次 scheduleJobService.updateStatusById(scheduleJobId, "2"); } } logger.info(Thread.currentThread().getName() + ":" + params); } }

本文标签: 集群SpringBootschedule