前言 在复盘 ieg 一面看到定时任务阻塞的问题时,研究了下 @EnableScheduling 的源码,觉得可以单开一篇文章讲一讲
本文主要讲述了使用 @EnableScheduling 可能出现的线程阻塞导致定时任务延期的问题,也顺便解释了动态定时任务源码上的实现
引用文章:
@Schedule定时任务+分布式环境:@Schedule定时任务+分布式环境,这些坑你一定得注意!!! (qq.com) 
java 中的线程池参数:java中四种线程池及poolSize、corePoolSize、maximumPoolSize_maximum-pool-size-CSDN博客 
线程池的拒绝策略:线程池的RejectedExecutionHandler(拒绝策略)-CSDN博客 
Java 中实现定时任务:Java中实现定时任务,有多少种解决方案?好久没更新博客了,最近上班做了点小东西,总结复盘一下。主要介绍了定时任务的三种 - 掘金 (juejin.cn) 
线程阻塞问题 问题根源 Java中 使用 Springboot 自带的定时任务 @EnableScheduling 和 @Scheduled 注解,会装配一个 SchedulingConfiguration  的类
1 2 3 4 5 6 7 @Target(ElementType.TYPE)   @Retention(RetentionPolicy.RUNTIME)   @Import(SchedulingConfiguration.class)   @Documented   public  @interface  EnableScheduling {     } 
 
1 2 3 4 5 6 7 8 9 @Configuration(proxyBeanMethods = false)   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)   public  class  SchedulingConfiguration  {      @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)        public  ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor ()  {           return  new  ScheduledAnnotationBeanPostProcessor ();       }   } 
 
这个配置类又会创建一个 ScheduledAnnotationBeanPostProcessor  的 Bean
在这个类的无参构造中又初始化了一个 ScheduledTaskRegistrar  的对象
1 2 3 public  ScheduledAnnotationBeanPostProcessor ()  {      this .registrar = new  ScheduledTaskRegistrar ();   } 
 
在 创建单例或刷新上下文之后,会执行 finishRegistration  方法,最后执行 registrar 的 afterPropertiesSet  方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override   public  void  afterSingletonsInstantiated ()  {           this .nonAnnotatedClasses.clear();       if  (this .applicationContext == null ) {                    finishRegistration();       }   }      @Override   public  void  onApplicationEvent (ContextRefreshedEvent event)  {      if  (event.getApplicationContext() == this .applicationContext) {                                      finishRegistration();       }   }      private  void  finishRegistration ()  {      if  (this .scheduler != null ) {           this .registrar.setScheduler(this .scheduler);       }                      this .registrar.afterPropertiesSet();   } 
 
ScheduledTaskRegistrar  的成员变量包括任务的执行器以及几种类型的定时任务列表
1 2 3 4 5 6 7 8 9 10 11 @Nullable   private  TaskScheduler taskScheduler;     @Nullable   private  ScheduledExecutorService localExecutor;     @Nullable   private  List<TriggerTask> triggerTasks;     @Nullable   private  List<CronTask> cronTasks;
 
afterPropertiesSet  方法会获取一个执行器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override   public  void  afterPropertiesSet ()  {      scheduleTasks();   }         @SuppressWarnings("deprecation")   protected  void  scheduleTasks ()  {      if  (this .taskScheduler == null ) {           this .localExecutor = Executors.newSingleThreadScheduledExecutor();           this .taskScheduler = new  ConcurrentTaskScheduler (this .localExecutor);       }       if  (this .triggerTasks != null ) {           for  (TriggerTask task : this .triggerTasks) {               addScheduledTask(scheduleTriggerTask(task));           }       }       if  (this .cronTasks != null ) {           for  (CronTask task : this .cronTasks) {               addScheduledTask(scheduleCronTask(task));           }       }       if  (this .fixedRateTasks != null ) {           for  (IntervalTask task : this .fixedRateTasks) {               addScheduledTask(scheduleFixedRateTask(task));           }       }       if  (this .fixedDelayTasks != null ) {           for  (IntervalTask task : this .fixedDelayTasks) {               addScheduledTask(scheduleFixedDelayTask(task));           }       }   } 
 
进入 newSingleThreadScheduledExecutor  可以看到,默认使用了一个 corePoolSize 为 1, maximumPoolSize 为 Integer.MAX_VALUE 的线程池
1 2 3 4 public  static  ScheduledExecutorService newSingleThreadScheduledExecutor ()  {      return  new  DelegatedScheduledExecutorService            (new  ScheduledThreadPoolExecutor (1 ));   } 
 
1 2 3 4 5 public  ScheduledThreadPoolExecutor (int  corePoolSize)  {      super (corePoolSize, Integer.MAX_VALUE,           DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,           new  DelayedWorkQueue ());   } 
 
1 2 3 4 5 6 7 8 public  ThreadPoolExecutor (int  corePoolSize,                           int  maximumPoolSize,                           long  keepAliveTime,                           TimeUnit unit,                           BlockingQueue<Runnable> workQueue)  {      this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,       Executors.defaultThreadFactory(), defaultHandler);   } 
 
而线程池主要有几个重要的参数分别是:
corePoolSize:线程池的基本大小。 
maximumPoolSize:线程池中允许的最大线程数。 
poolSize:线程池中当前线程的数量。 
 
当提交一个新任务时,若
poolSize < corePoolSize : 创建新线程处理该任务 
poolSize = corePoolSize : 将任务置于阻塞队列中 
阻塞队列的容量达到上限,且这时 poolSize < maximumPoolSize : 
阻塞队列满了,且 poolSize = maximumPoolSize : 那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 拒绝新的任务,默认是 AbortPolicy 会丢掉任务并抛出异常 
 
解决方案 注入自己编写的线程池,自行设置参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration    public  class  MyTheadPoolConfig  {            @Bean         public  TaskExecutor taskExecutor ()  {            ThreadPoolTaskExecutor  executor  =  new  ThreadPoolTaskExecutor ();                      executor.setCorePoolSize(10 );                      executor.setMaxPoolSize(20 );                      executor.setQueueCapacity(200 );                      executor.setKeepAliveSeconds(60 );                                executor.setThreadNamePrefix("nzc-create-scheduling-" );                      executor.setRejectedExecutionHandler(new  ThreadPoolExecutor .CallerRunsPolicy());            executor.setWaitForTasksToCompleteOnShutdown(true );            return  executor;        }    } 
 
在定时任务的类上再加一个 @EnableAsync 注解,给方法添加一个 @Async 即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j   @Component   @EnableAsync   @EnableScheduling   public  class  ScheduleService  {      @Autowired        TaskExecutor taskExecutor;       @Async(value = "taskExecutor")        @Scheduled(cron = "0/5 * * * * ? ")        public  void  testSchedule ()  {            try  {                Thread.sleep(10000 );                log.info("当前执行任务的线程号ID===>{}" , Thread.currentThread().getId());            } catch  (Exception e) {                e.printStackTrace();            }        }   } 
 
动态定时任务 上面提到了
@EnableScheduling 导入了 SchedulingConfiguration,SchedulingConfiguration 又创建了 ScheduledAnnotationBeanPostProcessor 的Bean,ScheduledAnnotationBeanPostProcessor 又实例化了 ScheduledTaskRegistrar 对象,即
@EnableScheduling  -> SchedulingConfiguration  -> ScheduledAnnotationBeanPostProcessor  -> ScheduledTaskRegistrar 
实际上,在 ScheduledAnnotationBeanPostProcessor 的 finishRegistration 方法中,会先获取所有实现了 SchedulingConfigurer  接口的 Bean,并执行他们的 configureTasks 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private  void  finishRegistration ()  {      if  (this .scheduler != null ) {           this .registrar.setScheduler(this .scheduler);       }       if  (this .beanFactory instanceof  ListableBeanFactory) {           Map<String, SchedulingConfigurer> beans =           ((ListableBeanFactory) this .beanFactory).getBeansOfType(SchedulingConfigurer.class);           List<SchedulingConfigurer> configurers = new  ArrayList <>(beans.values());           AnnotationAwareOrderComparator.sort(configurers);           for  (SchedulingConfigurer configurer : configurers) {               configurer.configureTasks(this .registrar);           }       }      } 
 
我们可以通过配置一个实现了 SchedulingConfigurer  接口的 Bean,实现动态加载定时任务的执行时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Data   @Slf4j   @Component   @RequiredArgsConstructor   @PropertySource("classpath:task-config.ini")   public  class  ScheduleTask  implements  SchedulingConfigurer  {              @Value("${printTime.cron}")        private  String cron;       @Override        public  void  configureTasks (ScheduledTaskRegistrar taskRegistrar)  {                    taskRegistrar.addTriggerTask(new  Runnable () {           @Override            public  void  run () {                       }         }, new  Trigger () {           @Override            public  Date nextExecutionTime (TriggerContext triggerContext)  {                            CronTrigger  cronTrigger  =  new  CronTrigger (cron);               Date  nextExecutionTime  =  cronTrigger.nextExecutionTime(triggerContext);               return  nextExecutionTime;                                                                            }           });       }   }