增加动态定时器

This commit is contained in:
dengqichen 2025-10-29 17:39:59 +08:00
parent e4cabb125a
commit 98cef3f68f
12 changed files with 875 additions and 29 deletions

View File

@ -135,6 +135,28 @@ public enum ResponseCode {
WORKFLOW_FORM_CONFIG_EMPTY(2762, "workflow.form.config.empty"),
WORKFLOW_GRAPH_CONFIG_EMPTY(2763, "workflow.graph.config.empty"),
// 定时任务相关错误码 (2800-2899)
SCHEDULE_JOB_NOT_FOUND(2800, "schedule.job.not.found"),
SCHEDULE_JOB_NAME_EXISTS(2801, "schedule.job.name.exists"),
SCHEDULE_JOB_START_FAILED(2802, "schedule.job.start.failed"),
SCHEDULE_JOB_PAUSE_FAILED(2803, "schedule.job.pause.failed"),
SCHEDULE_JOB_RESUME_FAILED(2804, "schedule.job.resume.failed"),
SCHEDULE_JOB_STOP_FAILED(2805, "schedule.job.stop.failed"),
SCHEDULE_JOB_TRIGGER_FAILED(2806, "schedule.job.trigger.failed"),
SCHEDULE_JOB_UPDATE_CRON_FAILED(2807, "schedule.job.update.cron.failed"),
SCHEDULE_JOB_CRON_INVALID(2808, "schedule.job.cron.invalid"),
SCHEDULE_JOB_BEAN_NOT_FOUND(2809, "schedule.job.bean.not.found"),
SCHEDULE_JOB_METHOD_NOT_FOUND(2810, "schedule.job.method.not.found"),
SCHEDULE_JOB_TRIGGER_NOT_FOUND(2811, "schedule.job.trigger.not.found"),
SCHEDULE_JOB_ALREADY_RUNNING(2812, "schedule.job.already.running"),
SCHEDULE_JOB_NOT_RUNNING(2813, "schedule.job.not.running"),
SCHEDULE_JOB_CATEGORY_NOT_FOUND(2820, "schedule.job.category.not.found"),
SCHEDULE_JOB_CATEGORY_CODE_EXISTS(2821, "schedule.job.category.code.exists"),
SCHEDULE_JOB_CATEGORY_HAS_JOBS(2822, "schedule.job.category.has.jobs"),
SCHEDULE_JOB_CATEGORY_IN_USE(2823, "schedule.job.category.in.use"),
SCHEDULE_JOB_LOG_NOT_FOUND(2830, "schedule.job.log.not.found"),
SCHEDULE_JOB_EXECUTOR_NOT_FOUND(2831, "schedule.job.executor.not.found"),
// 2200-2299 工作流节点类型错误
WORKFLOW_NODE_TYPE_NOT_FOUND(2200, "workflow.node.type.not.found"),
WORKFLOW_NODE_TYPE_DISABLED(2201, "workflow.node.type.disabled"),
@ -172,17 +194,7 @@ public enum ResponseCode {
TEAM_MEMBER_NOT_FOUND(2924, "team.member.not.found"),
TEAM_MEMBER_ALREADY_EXISTS(2925, "team.member.already.exists"),
TEAM_APPLICATION_NOT_FOUND(2926, "team.application.not.found"),
TEAM_APPLICATION_ALREADY_EXISTS(2927, "team.application.already.exists"),
// 定时任务相关错误码 (2950-2979)
SCHEDULE_JOB_NOT_FOUND(2950, "schedule.job.not.found"),
SCHEDULE_JOB_NAME_EXISTS(2951, "schedule.job.name.exists"),
SCHEDULE_JOB_CATEGORY_NOT_FOUND(2952, "schedule.job.category.not.found"),
SCHEDULE_JOB_CATEGORY_CODE_EXISTS(2953, "schedule.job.category.code.exists"),
SCHEDULE_JOB_CATEGORY_HAS_JOBS(2954, "schedule.job.category.has.jobs"),
SCHEDULE_JOB_LOG_NOT_FOUND(2955, "schedule.job.log.not.found"),
SCHEDULE_JOB_EXECUTOR_NOT_FOUND(2956, "schedule.job.executor.not.found"),
SCHEDULE_JOB_CRON_INVALID(2957, "schedule.job.cron.invalid");
TEAM_APPLICATION_ALREADY_EXISTS(2927, "team.application.already.exists");
private final int code;
private final String messageKey; // 国际化消息key

View File

@ -1,14 +1,19 @@
package com.qqchen.deploy.backend.schedule.api;
import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.framework.controller.BaseController;
import com.qqchen.deploy.backend.schedule.dto.JobDashboardDTO;
import com.qqchen.deploy.backend.schedule.dto.ScheduleJobDTO;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
import com.qqchen.deploy.backend.schedule.query.ScheduleJobQuery;
import com.qqchen.deploy.backend.schedule.service.IScheduleJobService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -20,9 +25,95 @@ import java.util.List;
@Slf4j
@RestController
@RequestMapping("/api/v1/schedule/jobs")
@Tag(name = "定时任务管理", description = "定时任务的增删改查接口")
@Tag(name = "定时任务管理", description = "定时任务的增删改查及任务控制接口")
public class ScheduleJobApiController extends BaseController<ScheduleJob, ScheduleJobDTO, Long, ScheduleJobQuery> {
@Resource
private IScheduleJobService scheduleJobService;
/**
* 获取Dashboard数据
*/
@Operation(summary = "获取Dashboard数据", description = "获取任务统计、正在执行的任务、最近执行日志")
@GetMapping("/dashboard")
public Response<JobDashboardDTO> getDashboard() {
JobDashboardDTO dashboard = scheduleJobService.getDashboard();
return Response.success(dashboard);
}
/**
* 启动任务
*/
@Operation(summary = "启动定时任务", description = "将任务加载到Quartz调度器并开始执行")
@PostMapping("/{id}/start")
public Response<Void> startJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id
) {
scheduleJobService.startJob(id);
return Response.success();
}
/**
* 暂停任务
*/
@Operation(summary = "暂停定时任务", description = "暂停正在运行的任务,保留调度信息")
@PostMapping("/{id}/pause")
public Response<Void> pauseJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id
) {
scheduleJobService.pauseJob(id);
return Response.success();
}
/**
* 恢复任务
*/
@Operation(summary = "恢复定时任务", description = "恢复已暂停的任务继续执行")
@PostMapping("/{id}/resume")
public Response<Void> resumeJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id
) {
scheduleJobService.resumeJob(id);
return Response.success();
}
/**
* 停止任务
*/
@Operation(summary = "停止定时任务", description = "从Quartz调度器中移除任务停止执行")
@PostMapping("/{id}/stop")
public Response<Void> stopJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id
) {
scheduleJobService.stopJob(id);
return Response.success();
}
/**
* 立即触发任务
*/
@Operation(summary = "立即触发任务", description = "不影响原有调度计划,立即执行一次任务")
@PostMapping("/{id}/trigger")
public Response<Void> triggerJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id
) {
scheduleJobService.triggerJob(id);
return Response.success();
}
/**
* 更新Cron表达式
*/
@Operation(summary = "更新Cron表达式", description = "动态更新任务的Cron调度表达式")
@PutMapping("/{id}/cron")
public Response<Void> updateCron(
@Parameter(description = "任务ID", required = true) @PathVariable Long id,
@Parameter(description = "新的Cron表达式", required = true) @RequestParam String cronExpression
) {
scheduleJobService.updateCron(id, cronExpression);
return Response.success();
}
@Override
protected void exportData(HttpServletResponse response, List<ScheduleJobDTO> data) {
// TODO: 实现导出功能

View File

@ -0,0 +1,34 @@
package com.qqchen.deploy.backend.schedule.config;
import com.qqchen.deploy.backend.schedule.service.IScheduleJobService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* 定时任务加载器
* 在应用启动时自动加载数据库中的定时任务到Quartz调度器
*
* @author qichen
*/
@Slf4j
@Component
public class JobLoader implements ApplicationRunner {
@Resource
private IScheduleJobService scheduleJobService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("========== 开始加载定时任务 ==========");
try {
scheduleJobService.loadAllEnabledJobs();
log.info("========== 定时任务加载完成 ==========");
} catch (Exception e) {
log.error("========== 定时任务加载失败 ==========", e);
}
}
}

View File

@ -0,0 +1,33 @@
package com.qqchen.deploy.backend.schedule.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
/**
* Quartz Job工厂
* 支持Spring依赖注入
*
* @author qichen
*/
@Component
public class QuartzJobFactory extends SpringBeanJobFactory {
private final AutowireCapableBeanFactory beanFactory;
public QuartzJobFactory(ApplicationContext applicationContext) {
this.beanFactory = applicationContext.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 调用父类创建Job实例
Object jobInstance = super.createJobInstance(bundle);
// 注入Spring依赖
beanFactory.autowireBean(jobInstance);
return jobInstance;
}
}

View File

@ -1,12 +1,18 @@
package com.qqchen.deploy.backend.schedule.config;
import org.quartz.Scheduler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.Properties;
/**
* 定时任务配置
* 启用Spring调度功能
* 启用Spring调度功能和Quartz调度器
*
* @author qichen
*/
@ -14,7 +20,63 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@EnableAsync
public class ScheduleConfig {
// Spring @Scheduled 会自动处理定时任务
// 配合 @MonitoredJob 注解实现自动监控
/**
* 配置Quartz调度器工厂
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, QuartzJobFactory quartzJobFactory) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
// 设置数据源可选如果需要持久化到数据库
// factory.setDataSource(dataSource);
// 设置JobFactory支持Spring依赖注入
factory.setJobFactory(quartzJobFactory);
// Quartz配置
Properties properties = new Properties();
// 调度器名称
properties.put("org.quartz.scheduler.instanceName", "DeployEaseScheduler");
properties.put("org.quartz.scheduler.instanceId", "AUTO");
// 线程池配置
properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
properties.put("org.quartz.threadPool.threadCount", "10");
properties.put("org.quartz.threadPool.threadPriority", "5");
// 内存存储不持久化到数据库
properties.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
// 如果需要持久化到数据库使用以下配置
// properties.put("org.quartz.jobStore.class", "org.springframework.scheduling.quartz.LocalDataSourceJobStore");
// properties.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
// properties.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
// properties.put("org.quartz.jobStore.isClustered", "false");
factory.setQuartzProperties(properties);
// 延迟启动
factory.setStartupDelay(10);
// 应用关闭时等待任务完成
factory.setWaitForJobsToCompleteOnShutdown(true);
// 覆盖已存在的任务
factory.setOverwriteExistingJobs(true);
// 自动启动
factory.setAutoStartup(true);
return factory;
}
/**
* 获取Scheduler实例
*/
@Bean
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
}

View File

@ -0,0 +1,109 @@
package com.qqchen.deploy.backend.schedule.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 定时任务Dashboard数据
*
* @author qichen
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JobDashboardDTO {
/**
* 任务统计汇总
*/
private JobSummaryDTO summary;
/**
* 正在执行的任务列表
*/
private List<RunningJobDTO> runningJobs;
/**
* 最近执行日志最近10条
*/
private List<ScheduleJobLogDTO> recentLogs;
/**
* 任务统计汇总
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class JobSummaryDTO {
/**
* 任务总数
*/
private Long total;
/**
* 已启用数量
*/
private Long enabled;
/**
* 已禁用数量
*/
private Long disabled;
/**
* 已暂停数量
*/
private Long paused;
/**
* 正在运行数量
*/
private Long running;
}
/**
* 正在执行的任务信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class RunningJobDTO {
/**
* 任务ID
*/
private Long jobId;
/**
* 任务名称
*/
private String jobName;
/**
* 开始时间
*/
private String startTime;
/**
* 执行进度0-100
*/
private Integer progress;
/**
* 当前执行消息
*/
private String message;
/**
* 执行状态
*/
private String status;
}
}

View File

@ -0,0 +1,168 @@
package com.qqchen.deploy.backend.schedule.job;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Method;
import java.util.Map;
/**
* 动态任务执行器
* 通用的Quartz Job实现根据配置动态调用Spring Bean的方法
*
* @author qichen
*/
@Slf4j
public class DynamicJob implements Job {
@Resource
private ApplicationContext applicationContext;
@Resource
private ObjectMapper objectMapper;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 从JobDataMap中获取参数
String beanName = context.getMergedJobDataMap().getString("beanName");
String methodName = context.getMergedJobDataMap().getString("methodName");
String methodParams = context.getMergedJobDataMap().getString("methodParams");
Long jobId = context.getMergedJobDataMap().getLong("jobId");
String jobName = context.getMergedJobDataMap().getString("jobName");
log.info("开始执行定时任务jobId={}, jobName={}, beanName={}, methodName={}",
jobId, jobName, beanName, methodName);
try {
// 获取Bean实例
Object bean = applicationContext.getBean(beanName);
if (bean == null) {
throw new JobExecutionException("找不到Bean" + beanName);
}
// 解析参数
Object[] params = parseMethodParams(methodParams);
// 调用方法
if (params == null || params.length == 0) {
// 无参方法
Method method = bean.getClass().getMethod(methodName);
method.invoke(bean);
} else {
// 有参方法 - 尝试匹配方法签名
Method method = findMatchingMethod(bean.getClass(), methodName, params);
if (method == null) {
throw new JobExecutionException("找不到匹配的方法:" + methodName);
}
method.invoke(bean, params);
}
log.info("定时任务执行成功jobId={}, jobName={}", jobId, jobName);
} catch (Exception e) {
log.error("定时任务执行失败jobId={}, jobName={}", jobId, jobName, e);
throw new JobExecutionException(e);
}
}
/**
* 解析方法参数
*/
private Object[] parseMethodParams(String methodParams) {
if (methodParams == null || methodParams.trim().isEmpty()) {
return null;
}
try {
// 假设参数是JSON格式的Map
Map<String, Object> paramsMap = objectMapper.readValue(methodParams, Map.class);
if (paramsMap.isEmpty()) {
return null;
}
// 简单处理如果只有一个参数直接返回其值
if (paramsMap.size() == 1) {
Object value = paramsMap.values().iterator().next();
// 尝试转换为Integer常见情况
if (value instanceof Number) {
return new Object[]{((Number) value).intValue()};
}
return new Object[]{value};
}
// 多个参数时返回所有值
return paramsMap.values().toArray();
} catch (Exception e) {
log.warn("解析方法参数失败,将作为无参方法调用:{}", methodParams, e);
return null;
}
}
/**
* 查找匹配的方法
*/
private Method findMatchingMethod(Class<?> clazz, String methodName, Object[] params) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (!method.getName().equals(methodName)) {
continue;
}
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length != params.length) {
continue;
}
// 简单的类型匹配
boolean matches = true;
for (int i = 0; i < paramTypes.length; i++) {
if (params[i] == null) {
continue; // null可以匹配任何引用类型
}
Class<?> paramType = paramTypes[i];
Class<?> argType = params[i].getClass();
// 处理基本类型和包装类型
if (!isAssignableFrom(paramType, argType)) {
matches = false;
break;
}
}
if (matches) {
return method;
}
}
return null;
}
/**
* 判断是否可赋值包括基本类型和包装类型的转换
*/
private boolean isAssignableFrom(Class<?> target, Class<?> source) {
if (target.isAssignableFrom(source)) {
return true;
}
// 处理基本类型和包装类型
if (target == int.class && source == Integer.class) return true;
if (target == long.class && source == Long.class) return true;
if (target == double.class && source == Double.class) return true;
if (target == float.class && source == Float.class) return true;
if (target == boolean.class && source == Boolean.class) return true;
if (target == byte.class && source == Byte.class) return true;
if (target == short.class && source == Short.class) return true;
if (target == char.class && source == Character.class) return true;
return false;
}
}

View File

@ -2,8 +2,11 @@ package com.qqchen.deploy.backend.schedule.repository;
import com.qqchen.deploy.backend.framework.repository.IBaseRepository;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
import com.qqchen.deploy.backend.schedule.enums.ScheduleJobStatusEnum;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* 定时任务Repository
*
@ -21,5 +24,14 @@ public interface IScheduleJobRepository extends IBaseRepository<ScheduleJob, Lon
* 统计分类下的任务数量
*/
Long countByCategoryIdAndDeletedFalse(Long categoryId);
}
/**
* 根据状态查找启用的任务
*/
List<ScheduleJob> findByStatusAndDeletedFalse(ScheduleJobStatusEnum status);
/**
* 统计指定状态的任务数量
*/
Long countByStatusAndDeletedFalse(ScheduleJobStatusEnum status);
}

View File

@ -1,6 +1,7 @@
package com.qqchen.deploy.backend.schedule.service;
import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.schedule.dto.JobDashboardDTO;
import com.qqchen.deploy.backend.schedule.dto.ScheduleJobDTO;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
import com.qqchen.deploy.backend.schedule.query.ScheduleJobQuery;
@ -11,5 +12,60 @@ import com.qqchen.deploy.backend.schedule.query.ScheduleJobQuery;
* @author qichen
*/
public interface IScheduleJobService extends IBaseService<ScheduleJob, ScheduleJobDTO, ScheduleJobQuery, Long> {
}
/**
* 启动任务
*
* @param jobId 任务ID
*/
void startJob(Long jobId);
/**
* 暂停任务
*
* @param jobId 任务ID
*/
void pauseJob(Long jobId);
/**
* 恢复任务
*
* @param jobId 任务ID
*/
void resumeJob(Long jobId);
/**
* 停止任务
*
* @param jobId 任务ID
*/
void stopJob(Long jobId);
/**
* 立即执行一次任务
*
* @param jobId 任务ID
*/
void triggerJob(Long jobId);
/**
* 更新任务的Cron表达式
*
* @param jobId 任务ID
* @param cronExpression 新的Cron表达式
*/
void updateCron(Long jobId, String cronExpression);
/**
* 加载所有启用的任务到调度器
*/
void loadAllEnabledJobs();
/**
* 获取任务Dashboard数据
* 包含任务统计正在执行的任务最近执行日志
*
* @return Dashboard数据
*/
JobDashboardDTO getDashboard();
}

View File

@ -3,12 +3,20 @@ package com.qqchen.deploy.backend.schedule.service.impl;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.framework.utils.RedisUtil;
import com.qqchen.deploy.backend.schedule.converter.ScheduleJobCategoryConverter;
import com.qqchen.deploy.backend.schedule.dto.JobDashboardDTO;
import com.qqchen.deploy.backend.schedule.dto.JobStatusDTO;
import com.qqchen.deploy.backend.schedule.dto.ScheduleJobDTO;
import com.qqchen.deploy.backend.schedule.dto.ScheduleJobLogDTO;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJobCategory;
import com.qqchen.deploy.backend.schedule.entity.ScheduleJobLog;
import com.qqchen.deploy.backend.schedule.query.ScheduleJobQuery;
import com.qqchen.deploy.backend.schedule.enums.ScheduleJobStatusEnum;
import com.qqchen.deploy.backend.schedule.job.DynamicJob;
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobCategoryRepository;
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobLogRepository;
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobRepository;
import com.qqchen.deploy.backend.schedule.service.IScheduleJobService;
import com.qqchen.deploy.backend.workflow.converter.FormDefinitionConverter;
@ -16,11 +24,16 @@ import com.qqchen.deploy.backend.workflow.entity.FormDefinition;
import com.qqchen.deploy.backend.workflow.repository.IFormDefinitionRepository;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -41,6 +54,9 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
@Resource
private IScheduleJobCategoryRepository categoryRepository;
@Resource
private IScheduleJobLogRepository jobLogRepository;
@Resource
private ScheduleJobCategoryConverter categoryConverter;
@ -50,6 +66,12 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
@Resource
private FormDefinitionConverter formDefinitionConverter;
@Resource
private Scheduler scheduler;
@Resource
private RedisUtil redisUtil;
@Override
@Transactional
public ScheduleJobDTO create(ScheduleJobDTO dto) {
@ -83,5 +105,239 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
return new PageImpl<>(content, page.getPageable(), page.getTotalElements());
}
@Override
public void startJob(Long jobId) {
try {
ScheduleJob job = findEntityById(jobId);
// 创建JobDetail
JobDetail jobDetail = JobBuilder.newJob(DynamicJob.class)
.withIdentity(getJobKey(jobId))
.withDescription(job.getJobDescription())
.build();
// 设置JobDataMap
jobDetail.getJobDataMap().put("jobId", job.getId());
jobDetail.getJobDataMap().put("jobName", job.getJobName());
jobDetail.getJobDataMap().put("beanName", job.getBeanName());
jobDetail.getJobDataMap().put("methodName", job.getMethodName());
jobDetail.getJobDataMap().put("methodParams", job.getMethodParams());
// 创建Trigger
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(getTriggerKey(jobId))
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
.build();
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
// 更新任务状态
job.setStatus(ScheduleJobStatusEnum.ENABLED);
jobRepository.save(job);
log.info("任务启动成功jobId={}, jobName={}", jobId, job.getJobName());
} catch (SchedulerException e) {
log.error("启动任务失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_START_FAILED);
}
}
@Override
public void pauseJob(Long jobId) {
try {
scheduler.pauseJob(getJobKey(jobId));
// 更新任务状态
ScheduleJob job = findEntityById(jobId);
job.setStatus(ScheduleJobStatusEnum.PAUSED);
jobRepository.save(job);
log.info("任务暂停成功jobId={}", jobId);
} catch (SchedulerException e) {
log.error("暂停任务失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_PAUSE_FAILED);
}
}
@Override
public void resumeJob(Long jobId) {
try {
scheduler.resumeJob(getJobKey(jobId));
// 更新任务状态
ScheduleJob job = findEntityById(jobId);
job.setStatus(ScheduleJobStatusEnum.ENABLED);
jobRepository.save(job);
log.info("任务恢复成功jobId={}", jobId);
} catch (SchedulerException e) {
log.error("恢复任务失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_RESUME_FAILED);
}
}
@Override
public void stopJob(Long jobId) {
try {
scheduler.deleteJob(getJobKey(jobId));
// 更新任务状态
ScheduleJob job = findEntityById(jobId);
job.setStatus(ScheduleJobStatusEnum.DISABLED);
jobRepository.save(job);
log.info("任务停止成功jobId={}", jobId);
} catch (SchedulerException e) {
log.error("停止任务失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_STOP_FAILED);
}
}
@Override
public void triggerJob(Long jobId) {
try {
scheduler.triggerJob(getJobKey(jobId));
log.info("立即触发任务jobId={}", jobId);
} catch (SchedulerException e) {
log.error("触发任务失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_TRIGGER_FAILED);
}
}
@Override
public void updateCron(Long jobId, String cronExpression) {
try {
TriggerKey triggerKey = getTriggerKey(jobId);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
throw new BusinessException(ResponseCode.SCHEDULE_JOB_TRIGGER_NOT_FOUND);
}
// 创建新的Trigger
CronTrigger newTrigger = trigger.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
// 重新调度
scheduler.rescheduleJob(triggerKey, newTrigger);
// 更新数据库
ScheduleJob job = findEntityById(jobId);
job.setCronExpression(cronExpression);
jobRepository.save(job);
log.info("更新任务Cron表达式成功jobId={}, cron={}", jobId, cronExpression);
} catch (SchedulerException e) {
log.error("更新任务Cron表达式失败jobId={}", jobId, e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_UPDATE_CRON_FAILED);
}
}
@Override
public void loadAllEnabledJobs() {
log.info("开始加载所有启用的定时任务...");
List<ScheduleJob> enabledJobs = jobRepository.findByStatusAndDeletedFalse(ScheduleJobStatusEnum.ENABLED);
for (ScheduleJob job : enabledJobs) {
try {
startJob(job.getId());
log.info("加载任务成功jobId={}, jobName={}", job.getId(), job.getJobName());
} catch (Exception e) {
log.error("加载任务失败jobId={}, jobName={}", job.getId(), job.getJobName(), e);
}
}
log.info("定时任务加载完成,成功加载 {} 个任务", enabledJobs.size());
}
@Override
public JobDashboardDTO getDashboard() {
// 1. 统计各状态任务数量
long total = jobRepository.count();
long enabled = jobRepository.countByStatusAndDeletedFalse(ScheduleJobStatusEnum.ENABLED);
long disabled = jobRepository.countByStatusAndDeletedFalse(ScheduleJobStatusEnum.DISABLED);
long paused = jobRepository.countByStatusAndDeletedFalse(ScheduleJobStatusEnum.PAUSED);
// 2. 获取正在执行的任务从Redis
List<JobDashboardDTO.RunningJobDTO> runningJobs = new ArrayList<>();
List<ScheduleJob> allJobs = jobRepository.findAll();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (ScheduleJob job : allJobs) {
String statusKey = "schedule:job:status:" + job.getId();
JobStatusDTO statusDTO = (JobStatusDTO) redisUtil.get(statusKey);
if (statusDTO != null && "RUNNING".equals(statusDTO.getStatus())) {
JobDashboardDTO.RunningJobDTO runningJob = JobDashboardDTO.RunningJobDTO.builder()
.jobId(job.getId())
.jobName(job.getJobName())
.startTime(statusDTO.getStartTime() != null ?
statusDTO.getStartTime().format(formatter) : null)
.progress(statusDTO.getProgress())
.message(statusDTO.getMessage())
.status(statusDTO.getStatus())
.build();
runningJobs.add(runningJob);
}
}
// 3. 获取最近10条执行日志
PageRequest pageRequest = PageRequest.of(0, 10, Sort.by(Sort.Direction.DESC, "executeTime"));
Page<ScheduleJobLog> logPage = jobLogRepository.findAll(pageRequest);
List<ScheduleJobLogDTO> recentLogs = logPage.getContent().stream()
.map(log -> {
ScheduleJobLogDTO dto = new ScheduleJobLogDTO();
dto.setId(log.getId());
dto.setJobId(log.getJobId());
dto.setJobName(log.getJobName());
dto.setExecuteTime(log.getExecuteTime());
dto.setFinishTime(log.getFinishTime());
dto.setStatus(log.getStatus());
dto.setResultMessage(log.getResultMessage());
dto.setExceptionInfo(log.getExceptionInfo());
return dto;
})
.collect(Collectors.toList());
// 4. 构建返回结果
JobDashboardDTO.JobSummaryDTO summary = JobDashboardDTO.JobSummaryDTO.builder()
.total(total)
.enabled(enabled)
.disabled(disabled)
.paused(paused)
.running((long) runningJobs.size())
.build();
return JobDashboardDTO.builder()
.summary(summary)
.runningJobs(runningJobs)
.recentLogs(recentLogs)
.build();
}
/**
* 获取JobKey
*/
private JobKey getJobKey(Long jobId) {
return JobKey.jobKey("job_" + jobId, "DEFAULT");
}
/**
* 获取TriggerKey
*/
private TriggerKey getTriggerKey(Long jobId) {
return TriggerKey.triggerKey("trigger_" + jobId, "DEFAULT");
}
}

View File

@ -59,9 +59,6 @@ VALUES
(102, '工作流实例', '/workflow/instance', '/src/pages/workflow/instance/index', 'BranchesOutlined', 2, 100, 20, FALSE, TRUE, 'system', '2024-01-01 00:00:00', 0, FALSE),
-- 表单管理
(104, '表单管理', '/workflow/form', '/src/pages/workflow/form/index', 'FormOutlined', 2, 100, 30, FALSE, TRUE, 'system', '2024-01-01 00:00:00', 0, FALSE),
-- 节点管理
(103, '节点管理', '/workflow/node-design', '/src/pages/workflow/nodedesign/design/index', 'ControlOutlined', 2, 100, 40, FALSE, TRUE, 'system', '2024-01-01 00:00:00', 0, FALSE),
-- 运维管理
(200, '运维管理', '/deploy', 'Layout', 'DeploymentUnitOutlined', 1, NULL, 2, FALSE, TRUE, 'system', '2024-01-01 00:00:00', 0, FALSE),
-- 团队管理

View File

@ -188,13 +188,29 @@ team.member.already.exists=该用户已是团队成员
team.application.not.found=团队应用关联不存在或已删除
team.application.already.exists=该应用已关联到此团队
# 定时任务相关 (2950-2969)
# 定时任务相关 (2800-2899)
# 任务基础错误 (2800-2819)
schedule.job.not.found=定时任务不存在或已删除
schedule.job.name.exists=任务名称{0}已存在
schedule.job.start.failed=启动定时任务失败
schedule.job.pause.failed=暂停定时任务失败
schedule.job.resume.failed=恢复定时任务失败
schedule.job.stop.failed=停止定时任务失败
schedule.job.trigger.failed=触发定时任务失败
schedule.job.update.cron.failed=更新Cron表达式失败
schedule.job.cron.invalid=Cron表达式{0}无效
schedule.job.bean.not.found=找不到执行器Bean{0}
schedule.job.method.not.found=找不到执行方法:{0}
schedule.job.trigger.not.found=任务触发器不存在
schedule.job.already.running=定时任务已在运行中
schedule.job.not.running=定时任务未运行
# 任务分类错误 (2820-2829)
schedule.job.category.not.found=定时任务分类不存在或已删除
schedule.job.category.code.exists=分类编码{0}已存在
schedule.job.category.has.jobs=该分类下存在定时任务,无法删除
schedule.job.not.found=定时任务不存在或已删除
schedule.job.name.exists=任务名称{0}已存在
schedule.job.category.has.jobs=该分类下存在任务,无法删除
schedule.job.category.in.use=任务分类正在使用中,无法删除
# 任务日志错误 (2830-2839)
schedule.job.log.not.found=任务执行日志不存在
schedule.job.executor.not.found=找不到任务执行器:{0}
schedule.job.cron.invalid=Cron表达式{0}无效