前言 在复盘 ieg 一面看到定时任务阻塞的问题时,研究了下 @EnableScheduling 的源码,觉得可以单开一篇文章讲一讲
本文主要讲述了使用 @EnableScheduling 可能出现的线程阻塞导致定时任务延期的问题,也顺便解释了动态定时任务源码上的实现
引用文章:
@Schedule定时任务+分布式环境:@Schedule定时任务+分布式环境,这些坑你一定得注意!!! (qq.com)
java 中的线程池参数:java中四种线程池及poolSize、corePoolSize、maximumPoolSize_maximum-pool-size-CSDN博客
线程池的拒绝策略:线程池的RejectedExecutionHandler(拒绝策略)-CSDN博客
Java 中实现定时任务:Java中实现定时任务,有多少种解决方案?好久没更新博客了,最近上班做了点小东西,总结复盘一下。主要介绍了定时任务的三种 - 掘金 (juejin.cn)
线程阻塞问题 问题根源 Java中 使用 Springboot 自带的定时任务 @EnableScheduling 和 @Scheduled 注解,会装配一个 SchedulingConfiguration 的类
1 2 3 4 5 6 7 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { }
1 2 3 4 5 6 7 8 9 @Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor () { return new ScheduledAnnotationBeanPostProcessor (); } }
这个配置类又会创建一个 ScheduledAnnotationBeanPostProcessor 的 Bean
在这个类的无参构造中又初始化了一个 ScheduledTaskRegistrar 的对象
1 2 3 public ScheduledAnnotationBeanPostProcessor () { this .registrar = new ScheduledTaskRegistrar (); }
在 创建单例或刷新上下文之后,会执行 finishRegistration 方法,最后执行 registrar 的 afterPropertiesSet 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public void afterSingletonsInstantiated () { this .nonAnnotatedClasses.clear(); if (this .applicationContext == null ) { finishRegistration(); } } @Override public void onApplicationEvent (ContextRefreshedEvent event) { if (event.getApplicationContext() == this .applicationContext) { finishRegistration(); } } private void finishRegistration () { if (this .scheduler != null ) { this .registrar.setScheduler(this .scheduler); } this .registrar.afterPropertiesSet(); }
ScheduledTaskRegistrar 的成员变量包括任务的执行器以及几种类型的定时任务列表
1 2 3 4 5 6 7 8 9 10 11 @Nullable private TaskScheduler taskScheduler; @Nullable private ScheduledExecutorService localExecutor; @Nullable private List<TriggerTask> triggerTasks; @Nullable private List<CronTask> cronTasks;
afterPropertiesSet 方法会获取一个执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public void afterPropertiesSet () { scheduleTasks(); } @SuppressWarnings("deprecation") protected void scheduleTasks () { if (this .taskScheduler == null ) { this .localExecutor = Executors.newSingleThreadScheduledExecutor(); this .taskScheduler = new ConcurrentTaskScheduler (this .localExecutor); } if (this .triggerTasks != null ) { for (TriggerTask task : this .triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this .cronTasks != null ) { for (CronTask task : this .cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this .fixedRateTasks != null ) { for (IntervalTask task : this .fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this .fixedDelayTasks != null ) { for (IntervalTask task : this .fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
进入 newSingleThreadScheduledExecutor 可以看到,默认使用了一个 corePoolSize 为 1, maximumPoolSize 为 Integer.MAX_VALUE 的线程池
1 2 3 4 public static ScheduledExecutorService newSingleThreadScheduledExecutor () { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor (1 )); }
1 2 3 4 5 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue ()); }
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
而线程池主要有几个重要的参数分别是:
corePoolSize:线程池的基本大小。
maximumPoolSize:线程池中允许的最大线程数。
poolSize:线程池中当前线程的数量。
当提交一个新任务时,若
poolSize < corePoolSize : 创建新线程处理该任务
poolSize = corePoolSize : 将任务置于阻塞队列中
阻塞队列的容量达到上限,且这时 poolSize < maximumPoolSize :
阻塞队列满了,且 poolSize = maximumPoolSize : 那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 拒绝新的任务,默认是 AbortPolicy 会丢掉任务并抛出异常
解决方案 注入自己编写的线程池,自行设置参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class MyTheadPoolConfig { @Bean public TaskExecutor taskExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(20 ); executor.setQueueCapacity(200 ); executor.setKeepAliveSeconds(60 ); executor.setThreadNamePrefix("nzc-create-scheduling-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true ); return executor; } }
在定时任务的类上再加一个 @EnableAsync 注解,给方法添加一个 @Async 即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @Component @EnableAsync @EnableScheduling public class ScheduleService { @Autowired TaskExecutor taskExecutor; @Async(value = "taskExecutor") @Scheduled(cron = "0/5 * * * * ? ") public void testSchedule () { try { Thread.sleep(10000 ); log.info("当前执行任务的线程号ID===>{}" , Thread.currentThread().getId()); } catch (Exception e) { e.printStackTrace(); } } }
动态定时任务 上面提到了
@EnableScheduling 导入了 SchedulingConfiguration,SchedulingConfiguration 又创建了 ScheduledAnnotationBeanPostProcessor 的Bean,ScheduledAnnotationBeanPostProcessor 又实例化了 ScheduledTaskRegistrar 对象,即
@EnableScheduling -> SchedulingConfiguration -> ScheduledAnnotationBeanPostProcessor -> ScheduledTaskRegistrar
实际上,在 ScheduledAnnotationBeanPostProcessor 的 finishRegistration 方法中,会先获取所有实现了 SchedulingConfigurer 接口的 Bean,并执行他们的 configureTasks 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void finishRegistration () { if (this .scheduler != null ) { this .registrar.setScheduler(this .scheduler); } if (this .beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this .beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList <>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this .registrar); } } }
我们可以通过配置一个实现了 SchedulingConfigurer 接口的 Bean,实现动态加载定时任务的执行时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Data @Slf4j @Component @RequiredArgsConstructor @PropertySource("classpath:task-config.ini") public class ScheduleTask implements SchedulingConfigurer { @Value("${printTime.cron}") private String cron; @Override public void configureTasks (ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addTriggerTask(new Runnable () { @Override public void run () { } }, new Trigger () { @Override public Date nextExecutionTime (TriggerContext triggerContext) { CronTrigger cronTrigger = new CronTrigger (cron); Date nextExecutionTime = cronTrigger.nextExecutionTime(triggerContext); return nextExecutionTime; } }); } }