定时任务多线程-springboot
创始人
2024-03-03 21:49:31
0

定时任务

在项目开发过程中,经常需要定时任务来帮助我们实现某些业务功能,比如定时生成数据报表、生成对账单、订单超时处理等。Spring Boot提供了内置的@Scheduled注解实现定时任务的功能。


步骤

1.修改启动类

在启动类上加上@EnableScheduling开启定时任务。

@SpringBootApplication
@EnableSchedulingpublic class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

使用@EnableScheduling注解打开定时功能之后,默认情况下,系统会自动启动一个线程,调度执行定义的后台定时任务。

2.创建定时任务类

package com.qsdbl.malldemo.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @author: 轻率的保罗* @since: 2022-11* @Description: 定时任务类* 注意:需在启动类上加上@EnableScheduling开启定时任务*/
@Slf4j
@Component
public class SchedulerTask {/*** 定时任务:定期从数据库中移除已删除的数据(deleted=1的数据)* 测试每10秒执行一次!*/@Scheduled(cron="*/10 * * * * ?")//每10秒执行一次
//    @Scheduled(fixedRate=10*1000)//每10秒执行一次
//    @Scheduled(initialDelay=3000, fixedRate=10000)//首次延迟3秒后执行,之后每10秒执行一次public void taskCron(){SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");log.info("执行定时【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));}
}

类上别忘了添加注解@Component,交由Spring管理


参数

@Scheduled注解可以接受两种定时的参数设置:

  • 一种是我们常用的cron参数,设定按Cron表达式方式执行;
    • cron参数:使用Cron表达式规则执行,比如@Scheduled(cron=“*/5 * * * * *”)为间隔5秒执行。
  • 另一种是按fixedRate设定的固定时间执行。
    • fixedRate参数:按指定的固定时间间隔执行,单位为毫秒,如@Scheduled(fixedDelay =5000)为上一次执行完毕时间点之后5秒再次执行,每次执行间隔5秒。
    • 还支持简单的延时操作,比如fixedDelay、initialDelay后面填写相应的毫秒数即可:@Scheduled(initialDelay=1000, fixedRate=10000):首次延迟1秒后执行,之后按fixedRate指定的固定时间间隔执行,即每10秒执行一次。

cron表达式

Cron表达式主要由秒、分、小时、日期、月份、星期、年份7个字段构成,其中年份可选。示例中的Cron表达式“*/5 * * * *?”表示每5秒执行一次。


各个字段的含义

Cron一共有7位,最后一位是年份,可以留空。因此,一般我们可以写6位。另外,第6位星期(DayofWeek)的取值范围为1~7,从星期日(SUN)开始。


常用的Cron表达式

Cron表达式看起来晦涩难懂,但是只要明白了字段和通配符的含义,就能一眼看出表达式的触发执行规则。下边是一些常用的Cron表达式:


问题

默认情况下,Spring Boot定时任务是按单线程方式执行的。如果只有一个定时任务,这样做肯定没问题;当定时任务增多时,如果一个任务被阻塞,则会导致其他任务无法正常执行。


测试阻塞

测试多个定时任务造成阻塞现象,添加两个定时任务taskCron、taskFixedRate:

//定时任务类 测试多个定时任务造成阻塞现象/*** 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)*/
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));//模拟延时Thread.sleep(10*1000);
}/*** 定时任务2,每3秒执行一次*/
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}

分析日志

  • 定时任务taskFixedRate、taskCron都有按指定的配置运行
  • 所有的定时任务都是在线程"scheduling-1"执行 - 单线程方式执行
  • 定时任务taskCron在【18:16:05】开始执行,执行10秒钟;在【18:16:15】执行了三个定时任务taskFixedRate
    • 说明,在定时任务taskCron执行期间阻塞了其他定时任务的执行。本应分别在08、11、14秒执行的,推迟到了15秒才执行。
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:05
... [   scheduling-1] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:05
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:15... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:17
... [   scheduling-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:16:20
... [   scheduling-1] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:16:20

要解决这个问题,需使用线程池。见下边多线程笔记。



多线程

配置文件

springboot配置文件中添加如下配置:

# 异步线程(线程池)配置
# 阿里巴巴编程规范:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
# 配置核心线程数
async.executor.thread.core_pool_size=5
# 配置最大线程数
async.executor.thread.max_pool_size=10
# 配置队列大小
async.executor.thread.queue_capacity=100
# 配置线程池中的线程的名称前缀
# 阿里巴巴编程规范:创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
async.executor.thread.name.prefix=async-service-

配置类

使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类。自定义线程池,名为asyncServiceExecutor。

package com.qsdbl.malldemo.configuration.thread;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author: 轻率的保罗* @since: 2022-11* @Description: 多线程配置类(注解@EnableAsync,开启异步事件支持)*/
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {@Value("${async.executor.thread.core_pool_size}")//从配置文件中获取配置项private int corePoolSize;@Value("${async.executor.thread.max_pool_size}")private int maxPoolSize;@Value("${async.executor.thread.queue_capacity}")private int queueCapacity;@Value("${async.executor.thread.name.prefix}")private String namePrefix;/*** 自定义线程池配置类。* 不要命名为 taskScheduler,与spring框架的bean重名。* @return*/@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {//阿里巴巴编程规范:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。//SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务//     1、CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行。//        "该策略既不会抛弃任务,也不会抛出异常,而是将任务回推到调用者。"顾名思义,在饱和的情况下,调用者会执行该任务(而不是由多线程执行)//     2、AbortPolicy:拒绝策略,直接拒绝抛出异常//     3、。。。executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//执行初始化executor.initialize();return executor;}
}

使用

  • 1、类上要加注解@Component(衍生注解也行,需交由spring容器管理)
    • 扩展:交由spring容器管理之后,可通过注解@Autowired在其他地方启用该异步事件(发送电子邮件、批量处理过期订单、批量同步数据等等)
  • 2、方法上增加@Async注解,使任务能够异步执行,这样各个后台任务就不会阻塞。@Async注解中指定使用我们配置的线程池asyncServiceExecutor。示例:@Async(value = "asyncServiceExecutor")

与上边的测试相比多了个@Async(value = "asyncServiceExecutor")配置:

//定时任务类 测试多个定时任务造成阻塞现象/*** 定时任务1,每15秒执行一次,会执行10秒(造成10秒阻塞)*/
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=15*1000)//每15秒执行一次
public void taskCron() throws InterruptedException {SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");log.info("taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: " + dateFormat.format(new Date()));//模拟延时Thread.sleep(10*1000);
}/*** 定时任务2,每3秒执行一次*/
@Async(value = "asyncServiceExecutor")
@Scheduled(fixedRate=3*1000)//每3秒执行一次
public void taskFixedRate(){SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");log.info("taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: " + dateFormat.format(new Date()));
}

分析日志

  • 定时任务taskFixedRate、taskCron都有按指定的配置运行
  • 所有的定时任务都是在我们配置的线程池"async-service-xxx"中执行 - 不再是单线程方式执行
  • 定时任务taskCron在【18:34:26】开始执行,执行10秒钟;在【18:34:29】执行了定时任务taskFixedRate
    • 没有出现阻塞其他定时任务执行的情况
... [async-service-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:26
... [async-service-2] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:26... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:29
... [async-service-4] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:32
... [async-service-5] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:35
... [async-service-1] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:38
... [async-service-4] ...  : taskCron - 执行定时任务【从数据库中移除已删除超30天的数据】,时间: 18:34:41
... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:41
... [async-service-5] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 18:34:44

已解决,默认情况下Spring Boot定时任务按单线程方式执行,造成阻塞的问题!


扩展

监控 线程池。往线程池提交任务前,在日志中打印线程池情况。

自定义Executor

编写ThreadPoolTaskExecutor的子类MyThreadPoolTaskExecutor,往线程池提交任务前,在日志中打印线程池情况。

package com.qsdbl.malldemo.configuration.thread;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;/*** @author: 轻率的保罗* @since: 2022-11-30* @Description: 监控 线程池。往线程池提交任务前,在日志中打印线程池情况。*/
@Slf4j
public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {/*** 打印 多线程信息* 在父类的execute、submit等方法里调用,每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中* @param method 提交任务执行的方法。*               submit():提交任务,返回Future,可以返回结果,可以传入Callable、Runnable。*               execute():提交任务,没有返回值,只可以传入Runnable。*/private void showThreadPoolInfo(String method) {ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();if (null == threadPoolExecutor) {return;}log.info("\n\n提交任务前【线程池】情况:\n    {}\n    线程池当前线程数量 = {}\n    当前线程池中正在执行任务的线程数量 = {}\n    队列大小 = {}\n    线程池已执行和未执行的任务总数 = {}\n    已完成的任务数量 = {}\n",method,threadPoolExecutor.getPoolSize()+1,threadPoolExecutor.getActiveCount()+1,threadPoolExecutor.getQueue().size(),threadPoolExecutor.getTaskCount(),threadPoolExecutor.getCompletedTaskCount());}@Overridepublic void execute(Runnable task) {showThreadPoolInfo("do execute 提交任务");super.execute(task);}@Overridepublic void execute(Runnable task, long startTimeout) {showThreadPoolInfo("do execute 提交任务");super.execute(task, startTimeout);}@Overridepublic Future submit(Runnable task) {showThreadPoolInfo("do submit 提交任务");return super.submit(task);}@Overridepublic  Future submit(Callable task) {showThreadPoolInfo("do submit 提交任务");return super.submit(task);}@Overridepublic ListenableFuture submitListenable(Runnable task) {showThreadPoolInfo("do submitListenable 提交任务");return super.submitListenable(task);}@Overridepublic  ListenableFuture submitListenable(Callable task) {showThreadPoolInfo("do submitListenable 提交任务");return super.submitListenable(task);}
}

修改配置类

将上边配置类做如下修改:

在创建ThreadPoolTaskExecutor对象时,使用我们自定义的MyThreadPoolTaskExecutor。

...
//SpringBoot项目,可使用Spring提供的对 ThreadPoolExecutor 封装的线程池 ThreadPoolTaskExecutor:
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();//自定义ThreadPoolTaskExecutor,会打印线程池情况
//配置核心线程数
...

运行效果

提交任务前【线程池】情况:do submit 提交任务线程池当前线程数量 = 3当前线程池中正在执行任务的线程数量 = 2队列大小 = 0线程池已执行和未执行的任务总数 = 2已完成的任务数量 = 1... [async-service-3] ...  : taskFixedRate - 执行定时任务【注销已超1天未支付的订单】,时间: 19:07:47

如何合理配置线程池

从两个方面考虑,你的任务是CPU密集型还是IO密集型。

CPU密集型

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

CPU密集型任务配置尽可能少的线程数量,一般公式:CPU核数+1个线程的线程池。

示例:双核CPU,配置 2+1 = 3。

IO密集型

IO密集型,即该任务需要大量的IO,即大量的阻塞。

在单线程上运行IO密集型的任务会导致大量的CPU运算能力浪费在等待上。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞!

IO密集型时,大部分线程都阻塞,故需要多配置线程数,参考公式:CPU核数/(1-阻塞系数),阻塞系数在0.8~0.9之间。

另一种说法:由于IO密集型任务,线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2

示例:

8核CPU:8/(1-0.9) = 80 个线程数

双核: 2/(1-0.9) = 2/0.1 = 20


扩展

System.out.println(Runtime.getRuntime().availableProcessors()); // 查看CPU核数
MacOS 命令查看CPU信息:sysctl machdep.cpu

qsdbl@macbook mysql % sysctl machdep.cpu 
machdep.cpu.cores_per_package: 8
machdep.cpu.core_count: 8
machdep.cpu.logical_per_package: 8
machdep.cpu.thread_count: 8
machdep.cpu.brand_string: Apple M1


笔记摘自:微信公众号-小哈学Java、CSDN-yyangqqian、《Spring Boot从入门到实战》-章为忠

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...