定时任务动态管理-Scheduled
创始人
2024-04-04 22:22:26
0

文章目录

  • 前言
  • 一、架构流程图
  • 二、代码实现流程
    • 1.引入库
    • 2.代码流程


前言

定时任务动态管理分为两种方式:
方式一:Web前台配置Trigger触发器(关联Cron)、ThreadPoolTaskScheduler类创建Scheduler方式下进行Schedule调度任务的动态管理
方式二:基于已创建的Schedule调度任务的动态管理,即以组件类 @Scheduled注解声明Schedule调度,在启动程序前一次性初始化,如:

@Component
public class TestTask {private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");@Scheduled(cron = "0/2 * * * * ?")public void robReceiveExpireTask() {System.out.println(df.format(LocalDateTime.now()) + "测试测试");}
}

解决方案参考 SpringBoot的定时任务动态管理
缺陷:目前无法在运行期间增加Schedule以及stop、Start、Reset等管理。

本文章主要编写方式一的实现方案,主要从架构流程图配合代码进行说明。

一、架构流程图

在这里插入图片描述
流程图

二、代码实现流程

架构为SpringBoot + Spring + mybatis-plus

1.引入库

pom.xml


merak-hyper-automation-bootcom.merak.automation1.0.04.0.0automation-quartzjaraliyunaliyun Repositoryhttp://maven.aliyun.com/nexus/content/groups/publicfalseorg.springframeworkspring-context-supportorg.springframeworkspring-webmysqlmysql-connector-javacom.alibabadruid-spring-boot-starterorg.apache.commonscommons-lang3jakarta.validationjakarta.validation-apicom.alibabafastjsoncommons-iocommons-iocom.fasterxml.jackson.corejackson-annotationsorg.springframework.bootspring-boot-starter-quartz2.2.5.RELEASEorg.springframework.bootspring-boot-maven-plugin

resources目录下文件/application.yml:
spring:
profiles:
active: dev
resources目录下文件/application-dev.yml:

server:port: 12105servlet:context-path: /automation-quartzmanagement:endpoints:web:exposure:include: '*'# Spring配置
spring:resources:static-locations: classpath:/static/,classpath:/templates/mvc:throw-exception-if-no-handler-found: truestatic-path-pattern: /**application:name: automation-workflowmain:allow-bean-definition-overriding: true# 文件上传servlet:multipart:# 单个文件大小max-file-size: 2000MB# 设置总上传的文件大小max-request-size: 4000MB#json 时间戳统一转换jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8aop:proxy-target-class: trueautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfiguredatasource:dynamic:druid:# 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)# 连接池的配置信息# 初始化大小,最小,最大initial-size: 1min-idle: 1maxActive: 20# 配置获取连接等待超时的时间maxWait: 60000# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒timeBetweenEvictionRunsMillis: 60000# 配置一个连接在池中最小生存的时间,单位是毫秒minEvictableIdleTimeMillis: 300000validationQuery: SELECT 1testWhileIdle: truetestOnBorrow: falsetestOnReturn: false# 打开PSCache,并且指定每个连接上PSCache的大小poolPreparedStatements: truemaxPoolPreparedStatementPerConnectionSize: 20# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙filters: stat,wall,slf4j# 通过connectProperties属性来打开mergeSql功能;慢SQL记录connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000datasource:master:url: jdbc:mysql://127.0.0.1:3308/merak_dev?characterEncoding=UTF-8&useUnicode=true&useSSL=falseusername: rootpassword: rootdriver-class-name: com.mysql.jdbc.Driver#mybatis plus 设置
mybatis-plus:mapper-locations: classpath*:com/merak/hyper/automation/persist/**/xml/*Mapper.xmlglobal-config:# 关闭MP3.0自带的bannerbanner: falsedb-config:id-type: ID_WORKER_STR# 默认数据库表下划线命名table-underline: trueconfiguration:log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:level:com.merar.hyper: debugcom.merak.hyper.automation.persist.**.mapper: debugorg.springframework: warn

2.代码流程

启动MerakQuartzApplication类

package com.merak.hyper.automation;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author chenjun* @version 1.0* @ClassName: MerakQuartzApplication* @description: 工单任务调度* @date 2022/9/22 10:30*/
@EnableScheduling
@EnableAsync
@MapperScan(basePackages = {"com.merak.hyper.automation.persist.**.mapper"})
@SpringBootApplication(scanBasePackages = {"com.merak.hyper.automation.**"}, exclude = {SecurityAutoConfiguration.class})
public class MerakQuartzApplication {public static final Logger log = LoggerFactory.getLogger(MerakQuartzApplication.class);public static void main(String[] args) {SpringApplication.run(MerakQuartzApplication.class, args);}private int taskSchedulerCorePoolSize = 15;private int awaitTerminationSeconds = 60;private String threadNamePrefix = "taskExecutor-";/*** @description: 实例化ThreadPoolTaskScheduler对象,用于创建ScheduledFuture scheduledFuture*/@Beanpublic ThreadPoolTaskScheduler threadPoolTaskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(taskSchedulerCorePoolSize);taskScheduler.setThreadNamePrefix(threadNamePrefix);taskScheduler.setWaitForTasksToCompleteOnShutdown(false);taskScheduler.setAwaitTerminationSeconds(awaitTerminationSeconds);/**需要实例化线程*/taskScheduler.initialize();
//        isinitialized = true;log.info("初始化ThreadPoolTaskScheduler ThreadNamePrefix=" + threadNamePrefix + ",PoolSize=" + taskSchedulerCorePoolSize+ ",awaitTerminationSeconds=" + awaitTerminationSeconds);return taskScheduler;}/*** @description: 实例化ThreadPoolTaskExecutor对象,管理asyncTask启动的线程,应用类为 ScheduledHelper */@Bean("asyncTaskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(5);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix("asyncTaskExecutor-");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);//修改拒绝策略为使用当前线程执行taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();return taskExecutor;}
}

一、启动时项目启动时,加载任务关联的触发器,并全量执行流程。
initLineRunner类:

package com.merak.hyper.automation.Scheduling;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;
import com.merak.hyper.automation.persist.entity.BusWorkflow;
import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService;
import com.merak.hyper.automation.persist.service.IBusWorkflowService;
import com.merak.hyper.automation.util.CommonUtil;
import com.merak.hyper.automation.util.ScheduleUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;/*** 项目启动时,加载数字员工关联的触发器,并全量执行* @Date: 2020/12/25:16:00**/
@Component
@Order(1)
public class initLineRunner implements CommandLineRunner {public static final Logger log = LoggerFactory.getLogger(initLineRunner.class);private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");@Autowiredprivate TaskService taskService;@Autowiredprivate IAutoTriggerInfoService triggerInfoService;@Autowiredprivate IBusWorkflowService workflowService;@Overridepublic void run(String... args) {log.info("项目启动:加载数字员工关联的触发器信息并全量执行," + df.format(LocalDateTime.now()));QueryWrapper wrapper = new QueryWrapper<>();wrapper.eq("wf_type", "3");//3:云托管wrapper.eq("wf_state", "1");List busWorkflows = workflowService.list(wrapper);List triggerInfos =  triggerInfoService.list();if( 0 == busWorkflows.size() || 0 == triggerInfos.size() ){log.info("数字员工关联的触发器信息不正确,员工记录数:"+busWorkflows.size()+",触发器记录数:"+triggerInfos.size());}else{//数字员工关联的触发器信息Map loadWfidAndTriggerInfo = CommonUtil.loadWfidAndTriggerInfo(busWorkflows,triggerInfos);Iterator> entries = loadWfidAndTriggerInfo.entrySet().iterator();while (entries.hasNext()) {Map.Entry entry = entries.next();String wfId = entry.getKey();BusWorkflow workflow = busWorkflows.stream().filter( t -> wfId.equals(t.getWfId()) ).findAny().orElse(null);if( null != workflow ){ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue());}}log.info("数字员工关联的触发器信息全量执行完成,数字员工定时个数:"+loadWfidAndTriggerInfo.size()+","+df.format(LocalDateTime.now()));}}
}核心代码:
```javaScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue());

Scheduler管理工具类:启动、取消、修改等管理

package com.merak.hyper.automation.util;
import com.merak.hyper.automation.Scheduling.ScheduleTask;
import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/*** @author chenjun* @version 1.0* @ClassName: ScheduleUtil* @description: Scheduler管理工具类:启动、取消、修改等管理* @date 2022/10/22 14:37*/
public class ScheduleUtil {public static final Logger log = LoggerFactory.getLogger(ScheduleUtil.class);private static ThreadPoolTaskScheduler threadPoolTaskScheduler = SpringContextUtils.getBean(ThreadPoolTaskScheduler.class);//存储[数字员工wfI,dScheduledFuture]集合private static Map> scheduledFutureMap = new HashMap<>();/*** 启动** @param scheduleTask 定时任务* @param triggerInfo*/public static boolean start(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) {String wfId = scheduleTask.getId();log.info("启动数字员工"+wfId+"定时任务线程" + scheduleTask.getId());ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(triggerInfo.getLogicConfig()));scheduledFutureMap.put(wfId, scheduledFuture);return true;}/*** 取消** @param scheduleTask 定时任务*/public static boolean cancel(ScheduleTask scheduleTask) {log.info("关闭定时任务线程 taskId " + scheduleTask.getId());ScheduledFuture scheduledFuture = scheduledFutureMap.get(scheduleTask.getId());if (scheduledFuture != null && !scheduledFuture.isCancelled()) {scheduledFuture.cancel(false);}scheduledFutureMap.remove(scheduleTask.getId());return true;}/*** 修改** @param scheduleTask 定时任务* @param triggerInfo*/public static boolean reset(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) {//先取消定时任务cancel(scheduleTask);//然后启动新的定时任务start(scheduleTask, triggerInfo);return true;}
}

ScheduleTask类:ScheduleTask任务类

package com.merak.hyper.automation.Scheduling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @author chenjun* @version 1.0* @ClassName: ScheduleTask* @description: ScheduleTask,关联任务id、用户id和具体执行的TaskService类,实现Runnable类* @date 2022/10/22 14:38*/public class ScheduleTask implements Runnable {private static final int TIMEOUT = 30000;private String id;private String userId;private TaskService service;public static final Logger log = LoggerFactory.getLogger(ScheduleTask.class);public String getId() {return id;}/*** @param id      任务ID* @param service 业务类*/public ScheduleTask(String id, String userId, TaskService service) {this.id = id;this.userId = userId;this.service = service;}@Overridepublic void run() {log.info("ScheduleTask-执行数字员工消息的发送,id:"+ this.id + ",用户id:"+userId);service.work(this.id,this.userId);}
}
/*** @author chenjun* @version 1.0* @ClassName: TaskService* @description: TaskService* @date 2022/10/22 14:42*/
public interface TaskService {/*** 业务处理方法* @param keyword 关键参数* @param userId*/void work(String keyword,String userId);
}/*** @description: TaskService实现类,具体执行定时调度的业务*/
@Service
public class TaskServiceImpl implements TaskService {public static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class);@Autowiredprivate IAutoDeviceInfoService deviceInfoService;@Overridepublic void work(String wfId,String userId) {try {log.info("定时任务:根据数字员工wfId"+ wfId +",用户id:"+userId+",发送消息...");//sendRobotMsg(wfId,userId);Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}

二、通过WEB配置的变更,动态管理定时任务
ScheduledController类:scheduled Web业务层:启动、取消、修改等管理schedule
调度任务信息变更(如1:Trigger Cron变更 2:任务停止 3:任务新增加等)

package com.merak.hyper.automation.controller;
import com.merak.hyper.automation.common.core.domain.AjaxResult;
import com.merak.hyper.automation.common.core.vo.ScheduledApiVo;
import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;
import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService;
import com.merak.hyper.automation.util.ScheduledHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/*** @author chenjun* @version 1.0* @ClassName: ScheduledController* @description: scheduled Web业务层:启动、取消、修改等管理schedule* @date 2022/10/21 17:19*/
@RestController
@RequestMapping("/api/scheduled")
public class ScheduledController {public static final Logger log = LoggerFactory.getLogger(ScheduledController.class);@Autowiredprivate IAutoTriggerInfoService triggerInfoService;@Autowiredprivate ScheduledHelper scheduledHelper;@PostMapping("/add")public AjaxResult addScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId());scheduledHelper.addScheduleds(scheduledApiVo,autoTriggerInfo);return AjaxResult.success();}@PostMapping("/reset")public AjaxResult resetScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId());scheduledHelper.resetScheduleds(scheduledApiVo,autoTriggerInfo);return AjaxResult.success();}@PostMapping("/stop")public AjaxResult stopScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId());scheduledHelper.stopScheduleds(scheduledApiVo);return AjaxResult.success();}
}ScheduledHelper类:对外提供ScheduledHelper管理:创建、变更、停止
```java
package com.merak.hyper.automation.util;
import com.merak.hyper.automation.Scheduling.ScheduleTask;
import com.merak.hyper.automation.Scheduling.TaskService;
import com.merak.hyper.automation.common.core.vo.ScheduledApiVo;
import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/*** @author chenjun* @version 1.0* @ClassName: ScheduledHelper* @description:对外提供ScheduledHelper管理:创建、变更、停止* @date 2022/10/21 17:16*/
@Component
public class ScheduledHelper {public static final Logger log = LoggerFactory.getLogger(ScheduledHelper.class);/*** @description: 对外(Web)提供异步的Scheduleds增加操作*/@Async("asyncTaskExecutor")public void addScheduleds(ScheduledApiVo scheduledApiVo, AutoTriggerInfo triggerInfo) {//addSchedule任务log.warn("创建原数字员工["+scheduledApiVo.getWfId()+"],同步启动Schedule任务");TaskService taskService = SpringContextUtils.getBean(TaskService.class);ScheduleUtil.start(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo);}@Async("asyncTaskExecutor")public void resetScheduleds(ScheduledApiVo scheduledApiVo,AutoTriggerInfo triggerInfo) {//cron值改变,变更Schedule任务log.warn("数字员工["+scheduledApiVo.getWfId()+"]关联的触发器信息cron值改变,变更Schedule任务");TaskService taskService = SpringContextUtils.getBean(TaskService.class);ScheduleUtil.reset(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo);}@Async("asyncTaskExecutor")public void stopScheduleds(ScheduledApiVo scheduledApiVo) {//移除Wfid,停止原Schedule任务log.warn("原数字员工["+scheduledApiVo.getWfId()+"]无效,同步停止Schedule任务");TaskService taskService = SpringContextUtils.getBean(TaskService.class);ScheduleUtil.cancel(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService));}
}

SpringContextUtils类:

package com.merak.hyper.automation.util;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @author chenjun* @version 1.0* @ClassName: SpringContextUtils* @description: 加载Class对象* @date 2022/10/22 14:15*/
@Component
public class SpringContextUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {SpringContextUtils.applicationContext = applicationContext;}public static Object getBean(String name) {return applicationContext.getBean(name);}public static  T getBean(Class requiredType) {return applicationContext.getBean(requiredType);}public static  T getBean(String name, Class requiredType) {return applicationContext.getBean(name, requiredType);}public static boolean containsBean(String name) {return applicationContext.containsBean(name);}public static boolean isSingleton(String name) {return applicationContext.isSingleton(name);}public static Class getType(String name) {return applicationContext.getType(name);}
}

ScheduledApiVo类:

import java.io.Serializable;/*** @author chenjun* @version 1.0* @ClassName: ScheduledApiVo* @description: scheduled Web业务层Api传递参数Vo类*/
public class ScheduledApiVo implements Serializable {private String wfId;private String userId;private String triggerId;//set get 略
}

最终:Web端通过发送Http请求 ,调用ScheduledHelper管理类接口,实现Scheduled创建、变更、停止操作

  log.info("3:云托管更新启动数字员工操作");ScheduledApiVo scheduledApiVo = new ScheduledApiVo();scheduledApiVo.setWfId(wfId);scheduledApiVo.setUserId(String.valueOf(updateUserId));scheduledApiVo.setTriggerId(newTriggerInfo.getId());String webHookBody = JSON.toJSONString(scheduledApiVo);EmsApiUtil.SendQuartzMessage(url, "add", webHookBody);******************** 分隔     ************************public static boolean SendQuartzMessage(String quartzUrl, String method, String webHookBody){boolean result = false;try{//org.apache.httpcomponents.httpclient sendPost,pom依赖如下dependencyString resp = HttpClientUtil.sendPostByJson(quartzUrl+"/"+method, webHookBody,0);if( "error".equals(resp) || resp.contains("405 Not Allowed")){log.error("调用任务调度中心消息发送失败,地址:"+quartzUrl);}else {JSONObject jsonObject = JSON.parseObject(resp);if( "200".equals(String.valueOf(jsonObject.get("code"))) ){result = true;}else{log.error("调用任务调度中心失败,msg:"+String.valueOf(jsonObject.get("msg")));}}}catch (Exception e){log.error("调用任务调度中心失败,msg:"+e.getMessage());}return result;}
    org.apache.httpcomponentshttpclient4.5.2

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...