增加动态定时器

This commit is contained in:
dengqichen 2025-10-30 12:55:33 +08:00
parent 712f664ea6
commit 7fa9b08e3d
6 changed files with 99 additions and 112 deletions

View File

@ -141,7 +141,7 @@ public enum ResponseCode {
SCHEDULE_JOB_START_FAILED(2802, "schedule.job.start.failed"), SCHEDULE_JOB_START_FAILED(2802, "schedule.job.start.failed"),
SCHEDULE_JOB_PAUSE_FAILED(2803, "schedule.job.pause.failed"), SCHEDULE_JOB_PAUSE_FAILED(2803, "schedule.job.pause.failed"),
SCHEDULE_JOB_RESUME_FAILED(2804, "schedule.job.resume.failed"), SCHEDULE_JOB_RESUME_FAILED(2804, "schedule.job.resume.failed"),
SCHEDULE_JOB_STOP_FAILED(2805, "schedule.job.stop.failed"), SCHEDULE_JOB_DISABLE_FAILED(2805, "schedule.job.disable.failed"),
SCHEDULE_JOB_TRIGGER_FAILED(2806, "schedule.job.trigger.failed"), SCHEDULE_JOB_TRIGGER_FAILED(2806, "schedule.job.trigger.failed"),
SCHEDULE_JOB_UPDATE_CRON_FAILED(2807, "schedule.job.update.cron.failed"), SCHEDULE_JOB_UPDATE_CRON_FAILED(2807, "schedule.job.update.cron.failed"),
SCHEDULE_JOB_CRON_INVALID(2808, "schedule.job.cron.invalid"), SCHEDULE_JOB_CRON_INVALID(2808, "schedule.job.cron.invalid"),
@ -150,6 +150,9 @@ public enum ResponseCode {
SCHEDULE_JOB_TRIGGER_NOT_FOUND(2811, "schedule.job.trigger.not.found"), SCHEDULE_JOB_TRIGGER_NOT_FOUND(2811, "schedule.job.trigger.not.found"),
SCHEDULE_JOB_ALREADY_RUNNING(2812, "schedule.job.already.running"), SCHEDULE_JOB_ALREADY_RUNNING(2812, "schedule.job.already.running"),
SCHEDULE_JOB_NOT_RUNNING(2813, "schedule.job.not.running"), SCHEDULE_JOB_NOT_RUNNING(2813, "schedule.job.not.running"),
SCHEDULE_JOB_STATUS_CANNOT_UPDATE(2814, "schedule.job.status.cannot.update"),
SCHEDULE_JOB_DISABLED_CANNOT_TRIGGER(2815, "schedule.job.disabled.cannot.trigger"),
SCHEDULE_JOB_UPDATE_FAILED(2816, "schedule.job.update.failed"),
SCHEDULE_JOB_CATEGORY_NOT_FOUND(2820, "schedule.job.category.not.found"), SCHEDULE_JOB_CATEGORY_NOT_FOUND(2820, "schedule.job.category.not.found"),
SCHEDULE_JOB_CATEGORY_CODE_EXISTS(2821, "schedule.job.category.code.exists"), SCHEDULE_JOB_CATEGORY_CODE_EXISTS(2821, "schedule.job.category.code.exists"),
SCHEDULE_JOB_CATEGORY_HAS_JOBS(2822, "schedule.job.category.has.jobs"), SCHEDULE_JOB_CATEGORY_HAS_JOBS(2822, "schedule.job.category.has.jobs"),

View File

@ -78,14 +78,14 @@ public class ScheduleJobApiController extends BaseController<ScheduleJob, Schedu
} }
/** /**
* 停止任务 * 禁用任务长期停用但保留配置
*/ */
@Operation(summary = "停止定时任务", description = "从Quartz调度器中移除任务停止执行") @Operation(summary = "禁用定时任务", description = "长期停用任务从Quartz调度器中移除但保留配置信息。禁用后不可触发执行。")
@PostMapping("/{id}/stop") @PostMapping("/{id}/disable")
public Response<Void> stopJob( public Response<Void> disableJob(
@Parameter(description = "任务ID", required = true) @PathVariable Long id @Parameter(description = "任务ID", required = true) @PathVariable Long id
) { ) {
scheduleJobService.stopJob(id); scheduleJobService.disableJob(id);
return Response.success(); return Response.success();
} }

View File

@ -1,60 +0,0 @@
package com.qqchen.deploy.backend.schedule.api;
import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.schedule.dto.JobStatusDTO;
import com.qqchen.deploy.backend.schedule.service.JobStatusRedisService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* 定时任务状态查询API控制器
*
* @author qichen
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/schedule/jobs")
@Tag(name = "定时任务状态查询", description = "查询任务实时执行状态")
public class ScheduleJobStatusApiController {
@Resource
private JobStatusRedisService jobStatusRedisService;
/**
* 获取单个任务状态
*/
@Operation(summary = "获取任务执行状态", description = "获取指定任务的实时执行状态")
@GetMapping("/{jobId}/status")
public Response<JobStatusDTO> getJobStatus(
@Parameter(description = "任务ID") @PathVariable Long jobId) {
JobStatusDTO status = jobStatusRedisService.getJobStatus(jobId);
return Response.success(status);
}
/**
* 批量获取所有任务状态
*/
@Operation(summary = "批量获取任务状态", description = "获取所有任务的执行状态,用于列表页展示")
@GetMapping("/status/all")
public Response<Map<String, JobStatusDTO>> getAllJobStatus() {
Map<String, JobStatusDTO> statusMap = jobStatusRedisService.getAllJobStatus();
return Response.success(statusMap);
}
/**
* 获取状态版本号
*/
@Operation(summary = "获取状态版本号", description = "获取当前状态版本号,用于判断状态是否有变化")
@GetMapping("/status/version")
public Response<Long> getStatusVersion() {
Long version = jobStatusRedisService.getStatusVersion();
return Response.success(version);
}
}

View File

@ -35,14 +35,14 @@ public interface IScheduleJobService extends IBaseService<ScheduleJob, ScheduleJ
void resumeJob(Long jobId); void resumeJob(Long jobId);
/** /**
* 停止任务 * 禁用任务长期停用但保留配置
* *
* @param jobId 任务ID * @param jobId 任务ID
*/ */
void stopJob(Long jobId); void disableJob(Long jobId);
/** /**
* 立即执行一次任务 * 立即执行一次任务仅限ENABLED和PAUSED状态
* *
* @param jobId 任务ID * @param jobId 任务ID
*/ */

View File

@ -88,10 +88,24 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
@Override @Override
@Transactional @Transactional
public ScheduleJobDTO update(Long id, ScheduleJobDTO dto) { public ScheduleJobDTO update(Long id, ScheduleJobDTO dto) {
// 更新数据库 // 1. 查询旧数据
ScheduleJob oldJob = findEntityById(id);
ScheduleJobStatusEnum oldStatus = oldJob.getStatus();
// 2. 检查是否试图修改 status强制拦截
if (dto.getStatus() != null && dto.getStatus() != oldStatus) {
throw new BusinessException(ResponseCode.SCHEDULE_JOB_STATUS_CANNOT_UPDATE,
new Object[]{"请使用 /start、/pause、/resume、/disable 接口管理任务状态"});
}
// 3. 强制保持原状态防止前端传入
dto.setStatus(oldStatus);
// 4. 更新数据库
ScheduleJobDTO updated = super.update(id, dto); ScheduleJobDTO updated = super.update(id, dto);
// 如果任务正在运行需要重新调度以应用新参数 // 5. 只有非 DISABLED 状态才需要重新调度
if (oldStatus != ScheduleJobStatusEnum.DISABLED) {
try { try {
JobKey jobKey = getJobKey(id); JobKey jobKey = getJobKey(id);
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
@ -122,12 +136,20 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
// 重新调度 // 重新调度
scheduler.scheduleJob(jobDetail, trigger); scheduler.scheduleJob(jobDetail, trigger);
log.info("任务参数已更新并重新调度jobId={}, jobName={}", job.getId(), job.getJobName()); // 如果原来是 PAUSED重新调度后也要暂停
if (oldStatus == ScheduleJobStatusEnum.PAUSED) {
scheduler.pauseJob(jobKey);
}
log.info("任务配置已更新并重新调度jobId={}, jobName={}", job.getId(), job.getJobName());
} }
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error("更新任务调度失败jobId={}", id, e); log.error("更新任务调度失败jobId={}", id, e);
// 不抛异常因为数据库已经更新成功下次重启会生效 // 不抛异常因为数据库已经更新成功下次重启会生效
} }
} else {
log.info("任务配置已更新DISABLED状态不调度jobId={}", id);
}
return updated; return updated;
} }
@ -232,31 +254,50 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
} }
@Override @Override
public void stopJob(Long jobId) { public void disableJob(Long jobId) {
try {
scheduler.deleteJob(getJobKey(jobId));
// 更新任务状态
ScheduleJob job = findEntityById(jobId); ScheduleJob job = findEntityById(jobId);
// 幂等性检查
if (job.getStatus() == ScheduleJobStatusEnum.DISABLED) {
log.info("任务已经是禁用状态jobId={}, jobName={}", jobId, job.getJobName());
return;
}
try {
// Quartz 删除任务
JobKey jobKey = getJobKey(jobId);
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
log.info("从Quartz删除任务jobId={}, jobName={}", jobId, job.getJobName());
}
// 更新数据库状态
job.setStatus(ScheduleJobStatusEnum.DISABLED); job.setStatus(ScheduleJobStatusEnum.DISABLED);
jobRepository.save(job); jobRepository.save(job);
log.info("任务停止成功jobId={}", jobId); log.info("任务已禁用jobId={}, jobName={}", jobId, job.getJobName());
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error("停止任务失败jobId={}", jobId, e); log.error("禁用任务失败jobId={}, jobName={}", jobId, job.getJobName(), e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_STOP_FAILED); throw new BusinessException(ResponseCode.SCHEDULE_JOB_DISABLE_FAILED);
} }
} }
@Override @Override
public void triggerJob(Long jobId) { public void triggerJob(Long jobId) {
// 检查任务状态
ScheduleJob job = findEntityById(jobId);
if (job.getStatus() == ScheduleJobStatusEnum.DISABLED) {
log.warn("禁用状态的任务不允许触发jobId={}, jobName={}", jobId, job.getJobName());
throw new BusinessException(ResponseCode.SCHEDULE_JOB_DISABLED_CANNOT_TRIGGER);
}
try { try {
scheduler.triggerJob(getJobKey(jobId)); scheduler.triggerJob(getJobKey(jobId));
log.info("立即触发任务jobId={}", jobId); log.info("立即触发任务jobId={}, jobName={}", jobId, job.getJobName());
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error("触发任务失败jobId={}", jobId, e); log.error("触发任务失败jobId={}, jobName={}", jobId, job.getJobName(), e);
throw new BusinessException(ResponseCode.SCHEDULE_JOB_TRIGGER_FAILED); throw new BusinessException(ResponseCode.SCHEDULE_JOB_TRIGGER_FAILED);
} }
} }

View File

@ -195,7 +195,7 @@ schedule.job.name.exists=任务名称{0}已存在
schedule.job.start.failed=启动定时任务失败 schedule.job.start.failed=启动定时任务失败
schedule.job.pause.failed=暂停定时任务失败 schedule.job.pause.failed=暂停定时任务失败
schedule.job.resume.failed=恢复定时任务失败 schedule.job.resume.failed=恢复定时任务失败
schedule.job.stop.failed=停止定时任务失败 schedule.job.disable.failed=禁用定时任务失败
schedule.job.trigger.failed=触发定时任务失败 schedule.job.trigger.failed=触发定时任务失败
schedule.job.update.cron.failed=更新Cron表达式失败 schedule.job.update.cron.failed=更新Cron表达式失败
schedule.job.cron.invalid=Cron表达式{0}无效 schedule.job.cron.invalid=Cron表达式{0}无效
@ -204,6 +204,9 @@ schedule.job.method.not.found=找不到执行方法:{0}
schedule.job.trigger.not.found=任务触发器不存在 schedule.job.trigger.not.found=任务触发器不存在
schedule.job.already.running=定时任务已在运行中 schedule.job.already.running=定时任务已在运行中
schedule.job.not.running=定时任务未运行 schedule.job.not.running=定时任务未运行
schedule.job.status.cannot.update=不允许通过update接口修改任务状态{0}
schedule.job.disabled.cannot.trigger=禁用状态的任务不允许触发执行
schedule.job.update.failed=更新定时任务失败
# 任务分类错误 (2820-2829) # 任务分类错误 (2820-2829)
schedule.job.category.not.found=定时任务分类不存在或已删除 schedule.job.category.not.found=定时任务分类不存在或已删除