增加动态定时器
This commit is contained in:
parent
f60bcf704b
commit
712f664ea6
@ -17,8 +17,6 @@ public class RepositoryBranchDTO extends BaseDTO {
|
||||
|
||||
private Boolean isDefaultBranch;
|
||||
|
||||
private Boolean isProtected;
|
||||
|
||||
private Boolean canPush;
|
||||
|
||||
private Boolean developersCanPush;
|
||||
@ -27,14 +25,18 @@ public class RepositoryBranchDTO extends BaseDTO {
|
||||
|
||||
private String lastCommitId;
|
||||
|
||||
private String lastCommitMessage;
|
||||
private String commitMessage;
|
||||
|
||||
private String commitAuthor;
|
||||
|
||||
private LocalDateTime commitDate;
|
||||
|
||||
private LocalDateTime lastUpdateTime;
|
||||
|
||||
private String webUrl;
|
||||
|
||||
private Long projectId;
|
||||
|
||||
private Long repoProjectId;
|
||||
|
||||
private Long externalSystemId;
|
||||
|
||||
@ -34,9 +34,6 @@ public class RepositoryBranch extends Entity<Long> {
|
||||
@Column(name = "last_update_time")
|
||||
private LocalDateTime lastUpdateTime;
|
||||
|
||||
@Column(name = "last_commit_time")
|
||||
private LocalDateTime lastCommitTime;
|
||||
|
||||
@Column(name = "developers_can_push")
|
||||
private Boolean developersCanPush = true;
|
||||
|
||||
|
||||
@ -67,8 +67,4 @@ public class GitBranchResponse {
|
||||
return commit != null && commit.committedDate != null ?
|
||||
commit.committedDate.atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime() : null;
|
||||
}
|
||||
|
||||
public LocalDateTime getLastCommitTime() {
|
||||
return getLastUpdateTime();
|
||||
}
|
||||
}
|
||||
@ -14,28 +14,28 @@ import java.time.LocalDateTime;
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class JenkinsBuildQuery extends BaseQuery {
|
||||
|
||||
@QueryField(field = "build_number")
|
||||
@QueryField(field = "buildNumber")
|
||||
private Integer buildNumber;
|
||||
|
||||
@QueryField(field = "build_status", type = QueryType.LIKE)
|
||||
@QueryField(field = "buildStatus", type = QueryType.LIKE)
|
||||
private String buildStatus;
|
||||
|
||||
@QueryField(field = "build_url", type = QueryType.LIKE)
|
||||
@QueryField(field = "buildUrl", type = QueryType.LIKE)
|
||||
private String buildUrl;
|
||||
|
||||
@QueryField(field = "duration")
|
||||
private Long duration;
|
||||
|
||||
@QueryField(field = "startTime")
|
||||
@QueryField(field = "starttime")
|
||||
private LocalDateTime starttime;
|
||||
|
||||
@QueryField(field = "actions", type = QueryType.LIKE)
|
||||
private String actions;
|
||||
|
||||
@QueryField(field = "external_system_id")
|
||||
@QueryField(field = "externalSystemId")
|
||||
private Long externalSystemId;
|
||||
|
||||
@QueryField(field = "job_id")
|
||||
@QueryField(field = "jobId")
|
||||
private Long jobId;
|
||||
|
||||
}
|
||||
@ -20,25 +20,25 @@ public class JenkinsJobQuery extends BaseQuery {
|
||||
@QueryField(field = "description", type = QueryType.LIKE)
|
||||
private String description;
|
||||
|
||||
@QueryField(field = "job_name", type = QueryType.LIKE)
|
||||
@QueryField(field = "jobName", type = QueryType.LIKE)
|
||||
private String jobName;
|
||||
|
||||
@QueryField(field = "job_url", type = QueryType.LIKE)
|
||||
@QueryField(field = "jobUrl", type = QueryType.LIKE)
|
||||
private String jobUrl;
|
||||
|
||||
@QueryField(field = "next_build_number")
|
||||
@QueryField(field = "nextBuildNumber")
|
||||
private Integer nextBuildNumber;
|
||||
|
||||
@QueryField(field = "last_build_number")
|
||||
@QueryField(field = "lastBuildNumber")
|
||||
private Integer lastBuildNumber;
|
||||
|
||||
@QueryField(field = "last_build_status", type = QueryType.LIKE)
|
||||
@QueryField(field = "lastBuildStatus", type = QueryType.LIKE)
|
||||
private String lastBuildStatus;
|
||||
|
||||
@QueryField(field = "health_report_score")
|
||||
@QueryField(field = "healthReportScore")
|
||||
private Integer healthReportScore;
|
||||
|
||||
@QueryField(field = "last_build_time")
|
||||
@QueryField(field = "lastBuildTime")
|
||||
private LocalDateTime lastBuildTime;
|
||||
|
||||
@QueryField(field = "externalSystemId")
|
||||
|
||||
@ -17,13 +17,13 @@ public class JenkinsViewQuery extends BaseQuery {
|
||||
@QueryField(field = "description", type = QueryType.LIKE)
|
||||
private String description;
|
||||
|
||||
@QueryField(field = "external_system_id")
|
||||
@QueryField(field = "externalSystemId")
|
||||
private Long externalSystemId;
|
||||
|
||||
@QueryField(field = "view_name", type = QueryType.LIKE)
|
||||
@QueryField(field = "viewName", type = QueryType.LIKE)
|
||||
private String viewName;
|
||||
|
||||
@QueryField(field = "view_url", type = QueryType.LIKE)
|
||||
@QueryField(field = "viewUrl", type = QueryType.LIKE)
|
||||
private String viewUrl;
|
||||
|
||||
}
|
||||
@ -16,21 +16,42 @@ public class RepositoryBranchQuery extends BaseQuery {
|
||||
@QueryField(field = "name", type = QueryType.LIKE)
|
||||
private String name;
|
||||
|
||||
@QueryField(field = "is_default_branch")
|
||||
private Boolean isDefaultBranch;
|
||||
@QueryField(field = "lastCommitId", type = QueryType.LIKE)
|
||||
private String lastCommitId;
|
||||
|
||||
@QueryField(field = "is_protected")
|
||||
private Boolean isProtected;
|
||||
@QueryField(field = "commitMessage", type = QueryType.LIKE)
|
||||
private String commitMessage;
|
||||
|
||||
@QueryField(field = "commit_author", type = QueryType.LIKE)
|
||||
@QueryField(field = "commitAuthor", type = QueryType.LIKE)
|
||||
private String commitAuthor;
|
||||
|
||||
@QueryField(field = "externalSystemId", type = QueryType.EQUAL)
|
||||
private Long externalSystemId;
|
||||
@QueryField(field = "commitDate")
|
||||
private java.time.LocalDateTime commitDate;
|
||||
|
||||
@QueryField(field = "projectId", type = QueryType.EQUAL)
|
||||
@QueryField(field = "lastUpdateTime")
|
||||
private java.time.LocalDateTime lastUpdateTime;
|
||||
|
||||
@QueryField(field = "developersCanPush")
|
||||
private Boolean developersCanPush;
|
||||
|
||||
@QueryField(field = "developersCanMerge")
|
||||
private Boolean developersCanMerge;
|
||||
|
||||
@QueryField(field = "canPush")
|
||||
private Boolean canPush;
|
||||
|
||||
@QueryField(field = "isDefaultBranch")
|
||||
private Boolean isDefaultBranch;
|
||||
|
||||
@QueryField(field = "webUrl", type = QueryType.LIKE)
|
||||
private String webUrl;
|
||||
|
||||
@QueryField(field = "projectId")
|
||||
private Long projectId;
|
||||
|
||||
@QueryField(field = "repoProjectId", type = QueryType.EQUAL)
|
||||
@QueryField(field = "repoProjectId")
|
||||
private Long repoProjectId;
|
||||
|
||||
@QueryField(field = "externalSystemId")
|
||||
private Long externalSystemId;
|
||||
}
|
||||
@ -2,8 +2,11 @@ package com.qqchen.deploy.backend.deploy.repository;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.entity.JenkinsBuild;
|
||||
import com.qqchen.deploy.backend.framework.repository.IBaseRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.List;
|
||||
|
||||
@ -50,4 +53,15 @@ public interface IJenkinsBuildRepository extends IBaseRepository<JenkinsBuild, L
|
||||
* @return 构建数量
|
||||
*/
|
||||
Long countByJobIdAndDeletedFalse(Long jobId);
|
||||
|
||||
/**
|
||||
* 批量查询任务的构建数量(用于解决N+1查询问题)
|
||||
* @param jobIds 任务ID集合
|
||||
* @return Object数组列表,[0]=jobId, [1]=count
|
||||
*/
|
||||
@Query("SELECT b.jobId as jobId, COUNT(b.id) as count " +
|
||||
"FROM JenkinsBuild b " +
|
||||
"WHERE b.jobId IN :jobIds AND b.deleted = false " +
|
||||
"GROUP BY b.jobId")
|
||||
List<Object[]> countByJobIds(@Param("jobIds") Collection<Long> jobIds);
|
||||
}
|
||||
@ -2,8 +2,11 @@ package com.qqchen.deploy.backend.deploy.repository;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.entity.JenkinsJob;
|
||||
import com.qqchen.deploy.backend.framework.repository.IBaseRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.List;
|
||||
|
||||
@ -39,4 +42,15 @@ public interface IJenkinsJobRepository extends IBaseRepository<JenkinsJob, Long>
|
||||
* @return 任务数量
|
||||
*/
|
||||
Long countByViewIdAndDeletedFalse(Long viewId);
|
||||
|
||||
/**
|
||||
* 批量查询视图的任务数量(用于解决N+1查询问题)
|
||||
* @param viewIds 视图ID集合
|
||||
* @return Object数组列表,[0]=viewId, [1]=count
|
||||
*/
|
||||
@Query("SELECT j.viewId as viewId, COUNT(j.id) as count " +
|
||||
"FROM JenkinsJob j " +
|
||||
"WHERE j.viewId IN :viewIds AND j.deleted = false " +
|
||||
"GROUP BY j.viewId")
|
||||
List<Object[]> countByViewIds(@Param("viewIds") Collection<Long> viewIds);
|
||||
}
|
||||
@ -2,8 +2,11 @@ package com.qqchen.deploy.backend.deploy.repository;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.repository.IBaseRepository;
|
||||
import com.qqchen.deploy.backend.deploy.entity.RepositoryBranch;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@Repository
|
||||
@ -46,4 +49,15 @@ public interface IRepositoryBranchRepository extends IBaseRepository<RepositoryB
|
||||
List<RepositoryBranch> findByExternalSystemIdAndProjectIdAndDeletedFalse(Long externalSystemId, Long id);
|
||||
|
||||
List<RepositoryBranch> findByExternalSystemIdAndDeletedFalse(Long externalSystemId);
|
||||
|
||||
/**
|
||||
* 批量查询项目的分支数量(用于解决N+1查询问题)
|
||||
* @param projectIds 项目ID集合
|
||||
* @return Object数组列表,[0]=projectId, [1]=count
|
||||
*/
|
||||
@Query("SELECT b.projectId as projectId, COUNT(b.id) as count " +
|
||||
"FROM RepositoryBranch b " +
|
||||
"WHERE b.projectId IN :projectIds AND b.deleted = false " +
|
||||
"GROUP BY b.projectId")
|
||||
List<Object[]> countByProjectIds(@Param("projectIds") Collection<Long> projectIds);
|
||||
}
|
||||
@ -40,4 +40,9 @@ public interface IRepositoryGroupRepository extends IBaseRepository<RepositoryGr
|
||||
List<RepositoryGroup> findByExternalSystemIdAndDeletedFalse(Long externalSystemId);
|
||||
|
||||
Optional<RepositoryGroup> findByRepoGroupId(Long repoGroupId);
|
||||
|
||||
/**
|
||||
* 根据Git系统组ID、外部系统ID查询仓库组
|
||||
*/
|
||||
Optional<RepositoryGroup> findByRepoGroupIdAndExternalSystemIdAndDeletedFalse(Long repoGroupId, Long externalSystemId);
|
||||
}
|
||||
@ -2,8 +2,11 @@ package com.qqchen.deploy.backend.deploy.repository;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.repository.IBaseRepository;
|
||||
import com.qqchen.deploy.backend.deploy.entity.RepositoryProject;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@ -52,4 +55,20 @@ public interface IRepositoryProjectRepository extends IBaseRepository<Repository
|
||||
List<RepositoryProject> findByExternalSystemIdAndRepoGroupIdAndDeletedFalse(Long externalSystemId, Long repoGroupId);
|
||||
|
||||
Optional<RepositoryProject> findByRepoProjectId(Long repoProjectId);
|
||||
|
||||
/**
|
||||
* 根据Git系统项目ID、外部系统ID查询项目
|
||||
*/
|
||||
Optional<RepositoryProject> findByRepoProjectIdAndExternalSystemIdAndDeletedFalse(Long repoProjectId, Long externalSystemId);
|
||||
|
||||
/**
|
||||
* 批量查询仓库组的项目数量(用于解决N+1查询问题)
|
||||
* @param repoGroupIds 仓库组ID集合
|
||||
* @return Object数组列表,[0]=repoGroupId, [1]=count
|
||||
*/
|
||||
@Query("SELECT p.repoGroupId as repoGroupId, COUNT(p.id) as count " +
|
||||
"FROM RepositoryProject p " +
|
||||
"WHERE p.repoGroupId IN :repoGroupIds AND p.deleted = false " +
|
||||
"GROUP BY p.repoGroupId")
|
||||
List<Object[]> countByRepoGroupIds(@Param("repoGroupIds") Collection<Long> repoGroupIds);
|
||||
}
|
||||
@ -33,6 +33,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -327,11 +328,7 @@ public class JenkinsJobServiceImpl extends BaseServiceImpl<JenkinsJob, JenkinsJo
|
||||
@Override
|
||||
public org.springframework.data.domain.Page<JenkinsJobDTO> page(JenkinsJobQuery query) {
|
||||
org.springframework.data.domain.Page<JenkinsJobDTO> page = super.page(query);
|
||||
// 填充每个任务的构建数量
|
||||
page.getContent().forEach(dto -> {
|
||||
Long buildCount = jenkinsBuildRepository.countByJobIdAndDeletedFalse(dto.getId());
|
||||
dto.setBuildCount(buildCount);
|
||||
});
|
||||
fillBuildCounts(page.getContent());
|
||||
return page;
|
||||
}
|
||||
|
||||
@ -341,11 +338,35 @@ public class JenkinsJobServiceImpl extends BaseServiceImpl<JenkinsJob, JenkinsJo
|
||||
@Override
|
||||
public List<JenkinsJobDTO> findAll(JenkinsJobQuery query) {
|
||||
List<JenkinsJobDTO> list = super.findAll(query);
|
||||
// 填充每个任务的构建数量
|
||||
list.forEach(dto -> {
|
||||
Long buildCount = jenkinsBuildRepository.countByJobIdAndDeletedFalse(dto.getId());
|
||||
dto.setBuildCount(buildCount);
|
||||
});
|
||||
fillBuildCounts(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量填充任务的构建数量(解决N+1查询问题)
|
||||
*/
|
||||
private void fillBuildCounts(List<JenkinsJobDTO> jobs) {
|
||||
if (jobs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 收集所有任务ID
|
||||
Set<Long> jobIds = jobs.stream()
|
||||
.map(JenkinsJobDTO::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 2. 批量查询构建数量(1条SQL)
|
||||
List<Object[]> countResults = jenkinsBuildRepository.countByJobIds(jobIds);
|
||||
Map<Long, Long> buildCountMap = countResults.stream()
|
||||
.collect(Collectors.toMap(
|
||||
arr -> (Long) arr[0], // jobId
|
||||
arr -> (Long) arr[1] // count
|
||||
));
|
||||
|
||||
// 3. 填充数据
|
||||
jobs.forEach(job -> {
|
||||
Long buildCount = buildCountMap.getOrDefault(job.getId(), 0L);
|
||||
job.setBuildCount(buildCount);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -26,7 +26,10 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Jenkins视图 Service实现
|
||||
@ -148,11 +151,7 @@ public class JenkinsViewServiceImpl extends BaseServiceImpl<JenkinsView, Jenkins
|
||||
@Override
|
||||
public org.springframework.data.domain.Page<JenkinsViewDTO> page(JenkinsViewQuery query) {
|
||||
org.springframework.data.domain.Page<JenkinsViewDTO> page = super.page(query);
|
||||
// 填充每个视图的任务数量
|
||||
page.getContent().forEach(dto -> {
|
||||
Long jobCount = jenkinsJobRepository.countByViewIdAndDeletedFalse(dto.getId());
|
||||
dto.setJobCount(jobCount);
|
||||
});
|
||||
fillJobCounts(page.getContent());
|
||||
return page;
|
||||
}
|
||||
|
||||
@ -162,11 +161,35 @@ public class JenkinsViewServiceImpl extends BaseServiceImpl<JenkinsView, Jenkins
|
||||
@Override
|
||||
public List<JenkinsViewDTO> findAll(JenkinsViewQuery query) {
|
||||
List<JenkinsViewDTO> list = super.findAll(query);
|
||||
// 填充每个视图的任务数量
|
||||
list.forEach(dto -> {
|
||||
Long jobCount = jenkinsJobRepository.countByViewIdAndDeletedFalse(dto.getId());
|
||||
dto.setJobCount(jobCount);
|
||||
});
|
||||
fillJobCounts(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量填充视图的任务数量(解决N+1查询问题)
|
||||
*/
|
||||
private void fillJobCounts(List<JenkinsViewDTO> views) {
|
||||
if (views.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 收集所有视图ID
|
||||
Set<Long> viewIds = views.stream()
|
||||
.map(JenkinsViewDTO::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 2. 批量查询任务数量(1条SQL)
|
||||
List<Object[]> countResults = jenkinsJobRepository.countByViewIds(viewIds);
|
||||
Map<Long, Long> jobCountMap = countResults.stream()
|
||||
.collect(Collectors.toMap(
|
||||
arr -> (Long) arr[0], // viewId
|
||||
arr -> (Long) arr[1] // count
|
||||
));
|
||||
|
||||
// 3. 填充数据
|
||||
views.forEach(view -> {
|
||||
Long jobCount = jobCountMap.getOrDefault(view.getId(), 0L);
|
||||
view.setJobCount(jobCount);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -361,21 +361,21 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
|
||||
*/
|
||||
private List<RepositoryProject> getProjectsToSync(Long externalSystemId, Long repoGroupId, Long repoProjectId) {
|
||||
if (repoProjectId != null) {
|
||||
// 同步单个项目
|
||||
RepositoryProject project = repositoryProjectRepository.findById(repoProjectId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND));
|
||||
// 同步单个项目(repoProjectId是Git系统中的项目ID,不是数据库主键)
|
||||
RepositoryProject project = repositoryProjectRepository
|
||||
.findByRepoProjectIdAndExternalSystemIdAndDeletedFalse(repoProjectId, externalSystemId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND,
|
||||
new Object[]{"Git项目ID", repoProjectId + "(外部系统:" + externalSystemId + ")"}));
|
||||
|
||||
// 验证项目属于指定的外部系统和仓库组
|
||||
if (!project.getExternalSystemId().equals(externalSystemId)) {
|
||||
throw new BusinessException(ResponseCode.DATA_NOT_FOUND);
|
||||
}
|
||||
// 验证项目属于指定的仓库组(如果指定了repoGroupId)
|
||||
if (repoGroupId != null && !project.getRepoGroupId().equals(repoGroupId)) {
|
||||
throw new BusinessException(ResponseCode.DATA_NOT_FOUND);
|
||||
throw new BusinessException(ResponseCode.DATA_NOT_FOUND,
|
||||
new Object[]{"Git项目ID", repoProjectId + "(不属于仓库组:" + repoGroupId + ")"});
|
||||
}
|
||||
|
||||
return List.of(project);
|
||||
} else if (repoGroupId != null) {
|
||||
// 同步仓库组下所有项目
|
||||
// 同步仓库组下所有项目(repoGroupId是Git系统中的组ID)
|
||||
return repositoryProjectRepository.findByExternalSystemIdAndRepoGroupIdAndDeletedFalse(externalSystemId, repoGroupId);
|
||||
} else {
|
||||
// 全量同步
|
||||
@ -436,7 +436,6 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
|
||||
branch.setCommitDate(remoteBranch.getCommitDate());
|
||||
branch.setWebUrl(remoteBranch.getWebUrl());
|
||||
branch.setLastUpdateTime(remoteBranch.getLastUpdateTime());
|
||||
branch.setLastCommitTime(remoteBranch.getLastCommitTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -29,6 +29,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.ArrayList;
|
||||
@ -137,28 +138,42 @@ public class RepositoryGroupServiceImpl extends BaseServiceImpl<RepositoryGroup,
|
||||
@Override
|
||||
public Page<RepositoryGroupDTO> page(RepositoryGroupQuery query) {
|
||||
Page<RepositoryGroupDTO> page = super.page(query);
|
||||
List<RepositoryGroupDTO> result = page.getContent().stream().peek(group -> {
|
||||
// 统计该仓库组下的项目数量
|
||||
Long projectCount = repositoryProjectRepository.countByExternalSystemIdAndRepoGroupIdAndDeletedFalse(
|
||||
group.getExternalSystemId(),
|
||||
group.getRepoGroupId()
|
||||
);
|
||||
group.setProjectCount(projectCount);
|
||||
}).collect(Collectors.toList());
|
||||
return new PageImpl<>(result, page.getPageable(), page.getTotalElements());
|
||||
fillProjectCounts(page.getContent());
|
||||
return page;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RepositoryGroupDTO> findAll(RepositoryGroupQuery query) {
|
||||
List<RepositoryGroupDTO> list = super.findAll(query);
|
||||
list.forEach(group -> {
|
||||
// 统计该仓库组下的项目数量
|
||||
Long projectCount = repositoryProjectRepository.countByExternalSystemIdAndRepoGroupIdAndDeletedFalse(
|
||||
group.getExternalSystemId(),
|
||||
group.getRepoGroupId()
|
||||
);
|
||||
group.setProjectCount(projectCount);
|
||||
});
|
||||
fillProjectCounts(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量填充仓库组的项目数量(解决N+1查询问题)
|
||||
*/
|
||||
private void fillProjectCounts(List<RepositoryGroupDTO> groups) {
|
||||
if (groups.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 收集所有仓库组的repoGroupId
|
||||
Set<Long> repoGroupIds = groups.stream()
|
||||
.map(RepositoryGroupDTO::getRepoGroupId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 2. 批量查询项目数量(1条SQL)
|
||||
List<Object[]> countResults = repositoryProjectRepository.countByRepoGroupIds(repoGroupIds);
|
||||
Map<Long, Long> projectCountMap = countResults.stream()
|
||||
.collect(Collectors.toMap(
|
||||
arr -> (Long) arr[0], // repoGroupId
|
||||
arr -> (Long) arr[1] // count
|
||||
));
|
||||
|
||||
// 3. 填充数据
|
||||
groups.forEach(group -> {
|
||||
Long projectCount = projectCountMap.getOrDefault(group.getRepoGroupId(), 0L);
|
||||
group.setProjectCount(projectCount);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -278,14 +278,11 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
|
||||
*/
|
||||
private List<RepositoryGroup> getGroupsToSync(Long externalSystemId, Long repoGroupId) {
|
||||
if (repoGroupId != null) {
|
||||
// 同步单个仓库组
|
||||
RepositoryGroup group = repositoryGroupRepository.findById(repoGroupId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND));
|
||||
|
||||
// 验证仓库组属于指定的外部系统
|
||||
if (!group.getExternalSystemId().equals(externalSystemId)) {
|
||||
throw new BusinessException(ResponseCode.DATA_NOT_FOUND);
|
||||
}
|
||||
// 同步单个仓库组(repoGroupId是Git系统中的组ID,不是数据库主键)
|
||||
RepositoryGroup group = repositoryGroupRepository
|
||||
.findByRepoGroupIdAndExternalSystemIdAndDeletedFalse(repoGroupId, externalSystemId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND,
|
||||
new Object[]{"Git仓库组ID", repoGroupId + "(外部系统:" + externalSystemId + ")"}));
|
||||
|
||||
return List.of(group);
|
||||
} else {
|
||||
@ -307,28 +304,42 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
|
||||
@Override
|
||||
public Page<RepositoryProjectDTO> page(RepositoryProjectQuery query) {
|
||||
Page<RepositoryProjectDTO> page = super.page(query);
|
||||
List<RepositoryProjectDTO> result = page.getContent().stream().peek(project -> {
|
||||
// 统计该项目下的分支数量
|
||||
Long branchCount = repositoryBranchRepository.countByExternalSystemIdAndRepoProjectIdAndDeletedFalse(
|
||||
project.getExternalSystemId(),
|
||||
project.getRepoProjectId()
|
||||
);
|
||||
project.setBranchCount(branchCount);
|
||||
}).collect(Collectors.toList());
|
||||
return new PageImpl<>(result, page.getPageable(), page.getTotalElements());
|
||||
fillBranchCounts(page.getContent());
|
||||
return page;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RepositoryProjectDTO> findAll(RepositoryProjectQuery query) {
|
||||
List<RepositoryProjectDTO> list = super.findAll(query);
|
||||
list.forEach(project -> {
|
||||
// 统计该项目下的分支数量
|
||||
Long branchCount = repositoryBranchRepository.countByExternalSystemIdAndRepoProjectIdAndDeletedFalse(
|
||||
project.getExternalSystemId(),
|
||||
project.getRepoProjectId()
|
||||
);
|
||||
project.setBranchCount(branchCount);
|
||||
});
|
||||
fillBranchCounts(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量填充项目的分支数量(解决N+1查询问题)
|
||||
*/
|
||||
private void fillBranchCounts(List<RepositoryProjectDTO> projects) {
|
||||
if (projects.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 收集所有项目ID
|
||||
Set<Long> projectIds = projects.stream()
|
||||
.map(RepositoryProjectDTO::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// 2. 批量查询分支数量(1条SQL)
|
||||
List<Object[]> countResults = repositoryBranchRepository.countByProjectIds(projectIds);
|
||||
Map<Long, Long> branchCountMap = countResults.stream()
|
||||
.collect(Collectors.toMap(
|
||||
arr -> (Long) arr[0], // projectId
|
||||
arr -> (Long) arr[1] // count
|
||||
));
|
||||
|
||||
// 3. 填充数据
|
||||
projects.forEach(project -> {
|
||||
Long branchCount = branchCountMap.getOrDefault(project.getId(), 0L);
|
||||
project.setBranchCount(branchCount);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -1,26 +0,0 @@
|
||||
package com.qqchen.deploy.backend.schedule.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 定时任务监控注解
|
||||
* 标记需要自动监控执行状态的任务方法
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface MonitoredJob {
|
||||
|
||||
/**
|
||||
* 任务ID(对应数据库中的schedule_job.id)
|
||||
*/
|
||||
long jobId();
|
||||
|
||||
/**
|
||||
* 任务名称
|
||||
*/
|
||||
String jobName();
|
||||
}
|
||||
|
||||
@ -1,238 +0,0 @@
|
||||
package com.qqchen.deploy.backend.schedule.aspect;
|
||||
|
||||
import com.qqchen.deploy.backend.schedule.annotation.MonitoredJob;
|
||||
import com.qqchen.deploy.backend.schedule.dto.JobStatusDTO;
|
||||
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
|
||||
import com.qqchen.deploy.backend.schedule.entity.ScheduleJobLog;
|
||||
import com.qqchen.deploy.backend.schedule.enums.JobStatusEnum;
|
||||
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobLogRepository;
|
||||
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobRepository;
|
||||
import com.qqchen.deploy.backend.schedule.service.JobProgressReporter;
|
||||
import com.qqchen.deploy.backend.schedule.service.JobStatusRedisService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.reflect.MethodSignature;
|
||||
import org.quartz.CronTrigger;
|
||||
import org.quartz.Scheduler;
|
||||
import org.quartz.SchedulerException;
|
||||
import org.quartz.TriggerKey;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 定时任务监控切面
|
||||
* 自动拦截带有@MonitoredJob注解的方法,记录执行状态
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
@Slf4j
|
||||
@Aspect
|
||||
@Component
|
||||
public class JobMonitorAspect {
|
||||
|
||||
@Resource
|
||||
private JobStatusRedisService jobStatusRedisService;
|
||||
|
||||
@Resource
|
||||
private JobProgressReporter jobProgressReporter;
|
||||
|
||||
@Resource
|
||||
private IScheduleJobLogRepository jobLogRepository;
|
||||
|
||||
@Resource
|
||||
private IScheduleJobRepository jobRepository;
|
||||
|
||||
@Resource
|
||||
private Scheduler scheduler;
|
||||
|
||||
@Around("@annotation(monitoredJob)")
|
||||
public Object monitor(ProceedingJoinPoint joinPoint, MonitoredJob monitoredJob) throws Throwable {
|
||||
Long jobId = monitoredJob.jobId();
|
||||
String jobName = monitoredJob.jobName();
|
||||
LocalDateTime startTime = LocalDateTime.now();
|
||||
|
||||
// 获取执行器信息
|
||||
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
|
||||
String beanName = joinPoint.getTarget().getClass().getSimpleName();
|
||||
String methodName = signature.getName();
|
||||
String methodParams = Arrays.stream(joinPoint.getArgs())
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.joining(", "));
|
||||
|
||||
// 设置ThreadLocal,供进度报告使用
|
||||
jobProgressReporter.setCurrentJob(jobId, jobName, startTime, beanName, methodName, methodParams);
|
||||
|
||||
try {
|
||||
// 1. 记录开始状态到Redis
|
||||
log.info("任务开始执行: jobId={}, jobName={}", jobId, jobName);
|
||||
saveStatus(jobId, jobName, "RUNNING", 0, "任务执行中", startTime);
|
||||
|
||||
// 2. 执行业务逻辑
|
||||
Object result = joinPoint.proceed();
|
||||
|
||||
// 3. 记录成功状态到Redis
|
||||
log.info("任务执行成功: jobId={}", jobId);
|
||||
saveStatus(jobId, jobName, "SUCCESS", 100, "任务执行成功", startTime);
|
||||
|
||||
// 4. 保存成功日志到数据库
|
||||
saveLog(jobId, jobName, beanName, methodName, startTime, JobStatusEnum.SUCCESS, "任务执行成功", null);
|
||||
|
||||
// 5. 更新任务统计信息(成功)
|
||||
updateJobStatistics(jobId, true);
|
||||
|
||||
return result;
|
||||
|
||||
} catch (Throwable e) {
|
||||
// 6. 记录失败状态到Redis
|
||||
log.error("任务执行失败: jobId={}", jobId, e);
|
||||
saveStatus(jobId, jobName, "FAIL", null, "任务执行失败: " + e.getMessage(), startTime);
|
||||
|
||||
// 7. 保存失败日志到数据库
|
||||
saveLog(jobId, jobName, beanName, methodName, startTime, JobStatusEnum.FAIL, "任务执行失败", e);
|
||||
|
||||
// 8. 更新任务统计信息(失败)
|
||||
updateJobStatistics(jobId, false);
|
||||
|
||||
throw e;
|
||||
|
||||
} finally {
|
||||
// 清除ThreadLocal
|
||||
jobProgressReporter.clearCurrentJob();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务状态到Redis
|
||||
*/
|
||||
private void saveStatus(Long jobId, String jobName, String status, Integer progress,
|
||||
String message, LocalDateTime startTime) {
|
||||
JobStatusDTO statusDTO = JobStatusDTO.builder()
|
||||
.jobId(jobId)
|
||||
.jobName(jobName)
|
||||
.status(status)
|
||||
.progress(progress != null ? progress : 0)
|
||||
.currentStep(message)
|
||||
.message(message)
|
||||
.startTime(startTime)
|
||||
.build();
|
||||
|
||||
jobStatusRedisService.saveJobStatus(jobId, statusDTO);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务日志到数据库
|
||||
*/
|
||||
private void saveLog(Long jobId, String jobName, String beanName, String methodName,
|
||||
LocalDateTime startTime, JobStatusEnum status, String message, Throwable exception) {
|
||||
try {
|
||||
ScheduleJobLog log = new ScheduleJobLog();
|
||||
log.setJobId(jobId);
|
||||
log.setJobName(jobName);
|
||||
log.setBeanName(beanName);
|
||||
log.setMethodName(methodName);
|
||||
log.setExecuteTime(startTime);
|
||||
log.setFinishTime(LocalDateTime.now());
|
||||
log.setDuration(Duration.between(startTime, LocalDateTime.now()).toMillis());
|
||||
log.setStatus(status);
|
||||
log.setResultMessage(message);
|
||||
|
||||
if (exception != null) {
|
||||
log.setExceptionInfo(getStackTrace(exception));
|
||||
}
|
||||
|
||||
// 获取服务器信息
|
||||
try {
|
||||
InetAddress addr = InetAddress.getLocalHost();
|
||||
log.setServerIp(addr.getHostAddress());
|
||||
log.setServerHost(addr.getHostName());
|
||||
} catch (Exception e) {
|
||||
this.log.warn("获取服务器信息失败", e);
|
||||
}
|
||||
|
||||
jobLogRepository.save(log);
|
||||
|
||||
} catch (Exception e) {
|
||||
this.log.error("保存任务日志失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务统计信息
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @param success 是否执行成功
|
||||
*/
|
||||
private void updateJobStatistics(Long jobId, boolean success) {
|
||||
try {
|
||||
ScheduleJob job = jobRepository.findById(jobId).orElse(null);
|
||||
if (job == null) {
|
||||
log.warn("更新任务统计失败:任务不存在,jobId={}", jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新执行次数
|
||||
Integer executeCount = job.getExecuteCount();
|
||||
job.setExecuteCount(executeCount == null ? 1 : executeCount + 1);
|
||||
|
||||
// 更新成功/失败次数
|
||||
if (success) {
|
||||
Integer successCount = job.getSuccessCount();
|
||||
job.setSuccessCount(successCount == null ? 1 : successCount + 1);
|
||||
} else {
|
||||
Integer failCount = job.getFailCount();
|
||||
job.setFailCount(failCount == null ? 1 : failCount + 1);
|
||||
}
|
||||
|
||||
// 更新上次执行时间
|
||||
job.setLastExecuteTime(LocalDateTime.now());
|
||||
|
||||
// 更新下次执行时间(从Quartz Trigger获取)
|
||||
try {
|
||||
TriggerKey triggerKey = TriggerKey.triggerKey("trigger_" + jobId, "DEFAULT");
|
||||
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
|
||||
if (trigger != null) {
|
||||
Date nextFireTime = trigger.getNextFireTime();
|
||||
if (nextFireTime != null) {
|
||||
job.setNextExecuteTime(LocalDateTime.ofInstant(
|
||||
nextFireTime.toInstant(),
|
||||
ZoneId.systemDefault()
|
||||
));
|
||||
}
|
||||
}
|
||||
} catch (SchedulerException e) {
|
||||
log.warn("获取下次执行时间失败:jobId={}", jobId, e);
|
||||
}
|
||||
|
||||
// 保存更新
|
||||
jobRepository.save(job);
|
||||
log.info("任务统计信息已更新:jobId={}, executeCount={}, successCount={}, failCount={}",
|
||||
jobId, job.getExecuteCount(), job.getSuccessCount(), job.getFailCount());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("更新任务统计失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取异常堆栈信息
|
||||
*/
|
||||
private String getStackTrace(Throwable throwable) {
|
||||
StringWriter sw = new StringWriter();
|
||||
PrintWriter pw = new PrintWriter(sw);
|
||||
throwable.printStackTrace(pw);
|
||||
return sw.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,129 @@
|
||||
package com.qqchen.deploy.backend.schedule.job;
|
||||
|
||||
import com.qqchen.deploy.backend.schedule.service.JobProgressReporter;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 定时任务抽象基类
|
||||
* 封装了公共的进度报告和日志记录逻辑
|
||||
*
|
||||
* 使用方式:
|
||||
* 1. 继承此类
|
||||
* 2. 实现 doExecute() 方法
|
||||
* 3. 使用 updateProgress() 报告进度
|
||||
* 4. 添加 @Component 注解并指定bean名称
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractScheduleJob implements IScheduleJob {
|
||||
|
||||
@Resource
|
||||
protected JobProgressReporter progressReporter;
|
||||
|
||||
/**
|
||||
* 执行任务(模板方法)
|
||||
* 统一处理进度报告、异常捕获等
|
||||
*/
|
||||
@Override
|
||||
public final void execute(Map<String, Object> params) throws Exception {
|
||||
log.info("开始执行定时任务: {} - {}", getJobName(), getJobDescription());
|
||||
|
||||
try {
|
||||
// 执行前处理
|
||||
beforeExecute(params);
|
||||
|
||||
// 报告开始状态
|
||||
if (supportProgress()) {
|
||||
updateProgress(0, "任务开始执行");
|
||||
}
|
||||
|
||||
// 执行核心业务逻辑
|
||||
doExecute(params);
|
||||
|
||||
// 报告完成状态
|
||||
if (supportProgress()) {
|
||||
updateProgress(100, "任务执行完成");
|
||||
}
|
||||
|
||||
// 执行后处理
|
||||
afterExecute(params, true);
|
||||
|
||||
log.info("定时任务执行完成: {}", getJobName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("定时任务执行失败: {}", getJobName(), e);
|
||||
afterExecute(params, false);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 核心业务逻辑实现
|
||||
* 子类必须实现此方法
|
||||
*
|
||||
* @param params 任务参数
|
||||
* @throws Exception 执行异常
|
||||
*/
|
||||
protected abstract void doExecute(Map<String, Object> params) throws Exception;
|
||||
|
||||
/**
|
||||
* 更新任务进度
|
||||
*
|
||||
* @param progress 进度百分比 (0-100)
|
||||
* @param message 进度消息
|
||||
*/
|
||||
protected void updateProgress(int progress, String message) {
|
||||
if (progressReporter != null) {
|
||||
progressReporter.updateProgress(progress, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从参数中获取值
|
||||
*
|
||||
* @param params 参数Map
|
||||
* @param key 参数键
|
||||
* @param defaultValue 默认值
|
||||
* @param <T> 值类型
|
||||
* @return 参数值
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T getParam(Map<String, Object> params, String key, T defaultValue) {
|
||||
if (params == null || !params.containsKey(key)) {
|
||||
return defaultValue;
|
||||
}
|
||||
try {
|
||||
return (T) params.get(key);
|
||||
} catch (ClassCastException e) {
|
||||
log.warn("参数类型转换失败: {} = {}, 使用默认值: {}", key, params.get(key), defaultValue);
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从参数中获取必填值
|
||||
*
|
||||
* @param params 参数Map
|
||||
* @param key 参数键
|
||||
* @param <T> 值类型
|
||||
* @return 参数值
|
||||
* @throws IllegalArgumentException 如果参数不存在
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T getRequiredParam(Map<String, Object> params, String key) {
|
||||
if (params == null || !params.containsKey(key)) {
|
||||
throw new IllegalArgumentException("缺少必填参数: " + key);
|
||||
}
|
||||
return (T) params.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportProgress() {
|
||||
return true; // 默认支持进度报告
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,19 +1,46 @@
|
||||
package com.qqchen.deploy.backend.schedule.job;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.qqchen.deploy.backend.schedule.dto.JobStatusDTO;
|
||||
import com.qqchen.deploy.backend.schedule.entity.ScheduleJob;
|
||||
import com.qqchen.deploy.backend.schedule.entity.ScheduleJobLog;
|
||||
import com.qqchen.deploy.backend.schedule.enums.JobStatusEnum;
|
||||
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobLogRepository;
|
||||
import com.qqchen.deploy.backend.schedule.repository.IScheduleJobRepository;
|
||||
import com.qqchen.deploy.backend.schedule.service.JobStatusRedisService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.quartz.JobExecutionException;
|
||||
import org.quartz.*;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetAddress;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 动态任务执行器
|
||||
* 通用的Quartz Job实现,根据配置动态调用Spring Bean的方法
|
||||
* 智能双模式任务调度器
|
||||
*
|
||||
* 支持两种执行模式:
|
||||
* 1. IScheduleJob接口模式:适用于复杂任务(有进度报告、多步骤等)
|
||||
* 2. Service方法调用模式:适用于简单任务(直接调用Service方法,纯配置化)
|
||||
*
|
||||
* 使用示例:
|
||||
*
|
||||
* 【模式1 - 复杂任务】
|
||||
* 配置: beanName="workflowCleanJob", methodName="execute", methodParams="{...}"
|
||||
* 要求: WorkflowCleanJob 需要继承 AbstractScheduleJob
|
||||
*
|
||||
* 【模式2 - 简单任务】
|
||||
* 配置: beanName="repositoryGroupService", methodName="syncGroups", methodParams='{"externalSystemId":1}'
|
||||
* 要求: 方法必须存在,参数会自动解析和匹配
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
@ -26,6 +53,18 @@ public class DynamicJob implements Job {
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private JobStatusRedisService jobStatusRedisService;
|
||||
|
||||
@Resource
|
||||
private IScheduleJobLogRepository jobLogRepository;
|
||||
|
||||
@Resource
|
||||
private IScheduleJobRepository jobRepository;
|
||||
|
||||
@Resource
|
||||
private Scheduler scheduler;
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext context) throws JobExecutionException {
|
||||
// 从JobDataMap中获取参数
|
||||
@ -34,135 +73,424 @@ public class DynamicJob implements Job {
|
||||
String methodParams = context.getMergedJobDataMap().getString("methodParams");
|
||||
Long jobId = context.getMergedJobDataMap().getLong("jobId");
|
||||
String jobName = context.getMergedJobDataMap().getString("jobName");
|
||||
LocalDateTime startTime = LocalDateTime.now();
|
||||
|
||||
log.info("开始执行定时任务:jobId={}, jobName={}, beanName={}, methodName={}",
|
||||
jobId, jobName, beanName, methodName);
|
||||
|
||||
try {
|
||||
// 获取Bean实例
|
||||
// 1. 记录开始状态到Redis
|
||||
saveStartStatus(jobId, jobName, startTime);
|
||||
|
||||
// 2. 获取Bean实例
|
||||
Object bean = applicationContext.getBean(beanName);
|
||||
if (bean == null) {
|
||||
throw new JobExecutionException("找不到Bean:" + beanName);
|
||||
}
|
||||
|
||||
// 解析参数
|
||||
Object[] params = parseMethodParams(methodParams);
|
||||
|
||||
// 调用方法
|
||||
if (params == null || params.length == 0) {
|
||||
// 无参方法
|
||||
Method method = bean.getClass().getMethod(methodName);
|
||||
method.invoke(bean);
|
||||
// 3. 执行业务逻辑(智能识别模式)
|
||||
if (bean instanceof IScheduleJob) {
|
||||
executeScheduleJob((IScheduleJob) bean, methodParams);
|
||||
} else {
|
||||
// 有参方法 - 尝试匹配方法签名
|
||||
Method method = findMatchingMethod(bean.getClass(), methodName, params);
|
||||
if (method == null) {
|
||||
throw new JobExecutionException("找不到匹配的方法:" + methodName);
|
||||
}
|
||||
method.invoke(bean, params);
|
||||
executeServiceMethod(bean, methodName, methodParams);
|
||||
}
|
||||
|
||||
// 4. 记录成功状态和日志
|
||||
log.info("定时任务执行成功:jobId={}, jobName={}", jobId, jobName);
|
||||
saveSuccessStatus(jobId, jobName, startTime);
|
||||
saveSuccessLog(jobId, jobName, beanName, methodName, startTime);
|
||||
updateJobStatistics(jobId, true);
|
||||
|
||||
} catch (Exception e) {
|
||||
// 5. 记录失败状态和日志
|
||||
log.error("定时任务执行失败:jobId={}, jobName={}", jobId, jobName, e);
|
||||
saveFailStatus(jobId, jobName, startTime, e);
|
||||
saveFailLog(jobId, jobName, beanName, methodName, startTime, e);
|
||||
updateJobStatistics(jobId, false);
|
||||
throw new JobExecutionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析方法参数
|
||||
* 执行IScheduleJob接口任务(模式1:复杂任务)
|
||||
*/
|
||||
private Object[] parseMethodParams(String methodParams) {
|
||||
if (methodParams == null || methodParams.trim().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
// 假设参数是JSON格式的Map
|
||||
Map<String, Object> paramsMap = objectMapper.readValue(methodParams, Map.class);
|
||||
if (paramsMap.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 简单处理:如果只有一个参数,直接返回其值
|
||||
if (paramsMap.size() == 1) {
|
||||
Object value = paramsMap.values().iterator().next();
|
||||
// 尝试转换为Integer(常见情况)
|
||||
if (value instanceof Number) {
|
||||
return new Object[]{((Number) value).intValue()};
|
||||
}
|
||||
return new Object[]{value};
|
||||
}
|
||||
|
||||
// 多个参数时,返回所有值
|
||||
return paramsMap.values().toArray();
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("解析方法参数失败,将作为无参方法调用:{}", methodParams, e);
|
||||
return null;
|
||||
}
|
||||
private void executeScheduleJob(IScheduleJob scheduleJob, String methodParams) throws Exception {
|
||||
Map<String, Object> params = parseParamsToMap(methodParams);
|
||||
scheduleJob.execute(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找匹配的方法
|
||||
* 执行普通Service方法(模式2:简单任务,配置化调用)
|
||||
*/
|
||||
private Method findMatchingMethod(Class<?> clazz, String methodName, Object[] params) {
|
||||
private void executeServiceMethod(Object bean, String methodName, String methodParams) throws Exception {
|
||||
// 解析参数
|
||||
Map<String, Object> paramsMap = parseParamsToMap(methodParams);
|
||||
|
||||
// 查找匹配的方法
|
||||
Method method = findBestMatchMethod(bean.getClass(), methodName, paramsMap);
|
||||
if (method == null) {
|
||||
throw new NoSuchMethodException(
|
||||
String.format("找不到方法:%s.%s,参数:%s",
|
||||
bean.getClass().getSimpleName(), methodName, paramsMap.keySet()));
|
||||
}
|
||||
|
||||
// 准备方法参数
|
||||
Object[] args = prepareMethodArgs(method, paramsMap);
|
||||
|
||||
// 调用方法
|
||||
log.debug("调用方法:{}.{},参数:{}", bean.getClass().getSimpleName(), methodName, args);
|
||||
method.invoke(bean, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找最佳匹配的方法
|
||||
* 支持参数名匹配(推荐)和参数数量匹配(兼容)
|
||||
*/
|
||||
private Method findBestMatchMethod(Class<?> clazz, String methodName, Map<String, Object> paramsMap) {
|
||||
Method[] methods = clazz.getMethods();
|
||||
Method bestMatch = null;
|
||||
int bestScore = -1;
|
||||
|
||||
for (Method method : methods) {
|
||||
if (!method.getName().equals(methodName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Class<?>[] paramTypes = method.getParameterTypes();
|
||||
if (paramTypes.length != params.length) {
|
||||
continue;
|
||||
}
|
||||
int paramCount = method.getParameterCount();
|
||||
|
||||
// 简单的类型匹配
|
||||
boolean matches = true;
|
||||
for (int i = 0; i < paramTypes.length; i++) {
|
||||
if (params[i] == null) {
|
||||
continue; // null可以匹配任何引用类型
|
||||
}
|
||||
|
||||
Class<?> paramType = paramTypes[i];
|
||||
Class<?> argType = params[i].getClass();
|
||||
|
||||
// 处理基本类型和包装类型
|
||||
if (!isAssignableFrom(paramType, argType)) {
|
||||
matches = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (matches) {
|
||||
// 无参方法
|
||||
if (paramCount == 0 && paramsMap.isEmpty()) {
|
||||
return method;
|
||||
}
|
||||
|
||||
// 参数数量匹配检查
|
||||
if (paramCount == paramsMap.size()) {
|
||||
// 计算匹配得分(参数类型兼容性)
|
||||
int score = calculateMatchScore(method, paramsMap);
|
||||
if (score > bestScore) {
|
||||
bestScore = score;
|
||||
bestMatch = method;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
return bestMatch;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否可赋值(包括基本类型和包装类型的转换)
|
||||
* 计算方法参数匹配得分
|
||||
*/
|
||||
private boolean isAssignableFrom(Class<?> target, Class<?> source) {
|
||||
private int calculateMatchScore(Method method, Map<String, Object> paramsMap) {
|
||||
Class<?>[] paramTypes = method.getParameterTypes();
|
||||
List<Object> paramValues = new ArrayList<>(paramsMap.values());
|
||||
|
||||
int score = 0;
|
||||
for (int i = 0; i < paramTypes.length; i++) {
|
||||
if (i >= paramValues.size()) {
|
||||
return -1; // 参数不足
|
||||
}
|
||||
|
||||
Object value = paramValues.get(i);
|
||||
Class<?> targetType = paramTypes[i];
|
||||
|
||||
if (value == null) {
|
||||
// null可以赋值给任何引用类型
|
||||
if (!targetType.isPrimitive()) {
|
||||
score += 50;
|
||||
} else {
|
||||
return -1; // null不能赋值给基本类型
|
||||
}
|
||||
} else if (isTypeCompatible(targetType, value.getClass())) {
|
||||
score += 100; // 完全兼容
|
||||
} else if (isNumberConvertible(targetType, value)) {
|
||||
score += 80; // 数字类型可转换
|
||||
} else {
|
||||
return -1; // 类型不兼容
|
||||
}
|
||||
}
|
||||
|
||||
return score;
|
||||
}
|
||||
|
||||
/**
|
||||
* 准备方法参数数组
|
||||
*/
|
||||
private Object[] prepareMethodArgs(Method method, Map<String, Object> paramsMap) {
|
||||
Class<?>[] paramTypes = method.getParameterTypes();
|
||||
List<Object> paramValues = new ArrayList<>(paramsMap.values());
|
||||
Object[] args = new Object[paramTypes.length];
|
||||
|
||||
for (int i = 0; i < paramTypes.length; i++) {
|
||||
Object value = i < paramValues.size() ? paramValues.get(i) : null;
|
||||
args[i] = convertParameter(value, paramTypes[i]);
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换参数类型
|
||||
*/
|
||||
private Object convertParameter(Object value, Class<?> targetType) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 已经是目标类型
|
||||
if (targetType.isInstance(value)) {
|
||||
return value;
|
||||
}
|
||||
|
||||
// 数字类型转换
|
||||
if (value instanceof Number) {
|
||||
Number num = (Number) value;
|
||||
if (targetType == Long.class || targetType == long.class) {
|
||||
return num.longValue();
|
||||
} else if (targetType == Integer.class || targetType == int.class) {
|
||||
return num.intValue();
|
||||
} else if (targetType == Double.class || targetType == double.class) {
|
||||
return num.doubleValue();
|
||||
} else if (targetType == Float.class || targetType == float.class) {
|
||||
return num.floatValue();
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查类型是否兼容
|
||||
*/
|
||||
private boolean isTypeCompatible(Class<?> target, Class<?> source) {
|
||||
if (target.isAssignableFrom(source)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 处理基本类型和包装类型
|
||||
// 基本类型和包装类型互转
|
||||
if (target == int.class && source == Integer.class) return true;
|
||||
if (target == Integer.class && source == int.class) return true;
|
||||
if (target == long.class && source == Long.class) return true;
|
||||
if (target == Long.class && source == long.class) return true;
|
||||
if (target == double.class && source == Double.class) return true;
|
||||
if (target == float.class && source == Float.class) return true;
|
||||
if (target == Double.class && source == double.class) return true;
|
||||
if (target == boolean.class && source == Boolean.class) return true;
|
||||
if (target == byte.class && source == Byte.class) return true;
|
||||
if (target == short.class && source == Short.class) return true;
|
||||
if (target == char.class && source == Character.class) return true;
|
||||
if (target == Boolean.class && source == boolean.class) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查数字是否可转换
|
||||
*/
|
||||
private boolean isNumberConvertible(Class<?> target, Object value) {
|
||||
if (!(value instanceof Number)) {
|
||||
return false;
|
||||
}
|
||||
return target == Long.class || target == long.class ||
|
||||
target == Integer.class || target == int.class ||
|
||||
target == Double.class || target == double.class ||
|
||||
target == Float.class || target == float.class;
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析参数为Map格式
|
||||
*/
|
||||
private Map<String, Object> parseParamsToMap(String methodParams) {
|
||||
if (methodParams == null || methodParams.trim().isEmpty()) {
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
try {
|
||||
return objectMapper.readValue(methodParams, Map.class);
|
||||
} catch (Exception e) {
|
||||
log.warn("解析参数失败,返回空Map:{}", methodParams, e);
|
||||
return Map.of();
|
||||
}
|
||||
}
|
||||
|
||||
// ======================== 任务监控相关方法 ========================
|
||||
|
||||
/**
|
||||
* 保存任务开始状态到Redis
|
||||
*/
|
||||
private void saveStartStatus(Long jobId, String jobName, LocalDateTime startTime) {
|
||||
try {
|
||||
JobStatusDTO statusDTO = JobStatusDTO.builder()
|
||||
.jobId(jobId)
|
||||
.jobName(jobName)
|
||||
.status("RUNNING")
|
||||
.progress(0)
|
||||
.currentStep("任务执行中")
|
||||
.message("任务执行中")
|
||||
.startTime(startTime)
|
||||
.build();
|
||||
jobStatusRedisService.saveJobStatus(jobId, statusDTO);
|
||||
} catch (Exception e) {
|
||||
log.warn("保存任务开始状态失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务成功状态到Redis
|
||||
*/
|
||||
private void saveSuccessStatus(Long jobId, String jobName, LocalDateTime startTime) {
|
||||
try {
|
||||
JobStatusDTO statusDTO = JobStatusDTO.builder()
|
||||
.jobId(jobId)
|
||||
.jobName(jobName)
|
||||
.status("SUCCESS")
|
||||
.progress(100)
|
||||
.currentStep("任务执行成功")
|
||||
.message("任务执行成功")
|
||||
.startTime(startTime)
|
||||
.build();
|
||||
jobStatusRedisService.saveJobStatus(jobId, statusDTO);
|
||||
} catch (Exception e) {
|
||||
log.warn("保存任务成功状态失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务失败状态到Redis
|
||||
*/
|
||||
private void saveFailStatus(Long jobId, String jobName, LocalDateTime startTime, Exception exception) {
|
||||
try {
|
||||
JobStatusDTO statusDTO = JobStatusDTO.builder()
|
||||
.jobId(jobId)
|
||||
.jobName(jobName)
|
||||
.status("FAIL")
|
||||
.progress(null)
|
||||
.currentStep("任务执行失败")
|
||||
.message("任务执行失败: " + exception.getMessage())
|
||||
.startTime(startTime)
|
||||
.build();
|
||||
jobStatusRedisService.saveJobStatus(jobId, statusDTO);
|
||||
} catch (Exception e) {
|
||||
log.warn("保存任务失败状态失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务成功日志到数据库
|
||||
*/
|
||||
private void saveSuccessLog(Long jobId, String jobName, String beanName, String methodName, LocalDateTime startTime) {
|
||||
try {
|
||||
ScheduleJobLog jobLog = new ScheduleJobLog();
|
||||
jobLog.setJobId(jobId);
|
||||
jobLog.setJobName(jobName);
|
||||
jobLog.setBeanName(beanName);
|
||||
jobLog.setMethodName(methodName);
|
||||
jobLog.setExecuteTime(startTime);
|
||||
jobLog.setFinishTime(LocalDateTime.now());
|
||||
jobLog.setDuration(Duration.between(startTime, LocalDateTime.now()).toMillis());
|
||||
jobLog.setStatus(JobStatusEnum.SUCCESS);
|
||||
jobLog.setResultMessage("任务执行成功");
|
||||
setServerInfo(jobLog);
|
||||
jobLogRepository.save(jobLog);
|
||||
} catch (Exception e) {
|
||||
log.error("保存任务成功日志失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务失败日志到数据库
|
||||
*/
|
||||
private void saveFailLog(Long jobId, String jobName, String beanName, String methodName,
|
||||
LocalDateTime startTime, Exception exception) {
|
||||
try {
|
||||
ScheduleJobLog jobLog = new ScheduleJobLog();
|
||||
jobLog.setJobId(jobId);
|
||||
jobLog.setJobName(jobName);
|
||||
jobLog.setBeanName(beanName);
|
||||
jobLog.setMethodName(methodName);
|
||||
jobLog.setExecuteTime(startTime);
|
||||
jobLog.setFinishTime(LocalDateTime.now());
|
||||
jobLog.setDuration(Duration.between(startTime, LocalDateTime.now()).toMillis());
|
||||
jobLog.setStatus(JobStatusEnum.FAIL);
|
||||
jobLog.setResultMessage("任务执行失败");
|
||||
jobLog.setExceptionInfo(getStackTrace(exception));
|
||||
setServerInfo(jobLog);
|
||||
jobLogRepository.save(jobLog);
|
||||
} catch (Exception e) {
|
||||
log.error("保存任务失败日志失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置服务器信息
|
||||
*/
|
||||
private void setServerInfo(ScheduleJobLog jobLog) {
|
||||
try {
|
||||
InetAddress addr = InetAddress.getLocalHost();
|
||||
jobLog.setServerIp(addr.getHostAddress());
|
||||
jobLog.setServerHost(addr.getHostName());
|
||||
} catch (Exception e) {
|
||||
log.warn("获取服务器信息失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务统计信息
|
||||
*/
|
||||
private void updateJobStatistics(Long jobId, boolean success) {
|
||||
try {
|
||||
ScheduleJob job = jobRepository.findById(jobId).orElse(null);
|
||||
if (job == null) {
|
||||
log.warn("更新任务统计失败:任务不存在,jobId={}", jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新执行次数
|
||||
Integer executeCount = job.getExecuteCount();
|
||||
job.setExecuteCount(executeCount == null ? 1 : executeCount + 1);
|
||||
|
||||
// 更新成功/失败次数
|
||||
if (success) {
|
||||
Integer successCount = job.getSuccessCount();
|
||||
job.setSuccessCount(successCount == null ? 1 : successCount + 1);
|
||||
} else {
|
||||
Integer failCount = job.getFailCount();
|
||||
job.setFailCount(failCount == null ? 1 : failCount + 1);
|
||||
}
|
||||
|
||||
// 更新上次执行时间
|
||||
job.setLastExecuteTime(LocalDateTime.now());
|
||||
|
||||
// 更新下次执行时间(从Quartz Trigger获取)
|
||||
try {
|
||||
TriggerKey triggerKey = TriggerKey.triggerKey("trigger_" + jobId, "DEFAULT");
|
||||
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
|
||||
if (trigger != null) {
|
||||
Date nextFireTime = trigger.getNextFireTime();
|
||||
if (nextFireTime != null) {
|
||||
job.setNextExecuteTime(LocalDateTime.ofInstant(
|
||||
nextFireTime.toInstant(),
|
||||
ZoneId.systemDefault()
|
||||
));
|
||||
}
|
||||
}
|
||||
} catch (SchedulerException e) {
|
||||
log.warn("获取下次执行时间失败:jobId={}", jobId, e);
|
||||
}
|
||||
|
||||
// 保存更新
|
||||
jobRepository.save(job);
|
||||
log.info("任务统计信息已更新:jobId={}, executeCount={}, successCount={}, failCount={}",
|
||||
jobId, job.getExecuteCount(), job.getSuccessCount(), job.getFailCount());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("更新任务统计失败:jobId={}", jobId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取异常堆栈信息
|
||||
*/
|
||||
private String getStackTrace(Throwable throwable) {
|
||||
StringWriter sw = new StringWriter();
|
||||
PrintWriter pw = new PrintWriter(sw);
|
||||
throwable.printStackTrace(pw);
|
||||
return sw.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
package com.qqchen.deploy.backend.schedule.job;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 定时任务统一接口
|
||||
* 所有定时任务都应实现此接口,以便统一管理和调度
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
public interface IScheduleJob {
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
*
|
||||
* @param params 任务参数(从表单定义或数据库配置中获取)
|
||||
* @throws Exception 任务执行异常
|
||||
*/
|
||||
void execute(Map<String, Object> params) throws Exception;
|
||||
|
||||
/**
|
||||
* 获取任务名称
|
||||
*
|
||||
* @return 任务名称
|
||||
*/
|
||||
default String getJobName() {
|
||||
return this.getClass().getSimpleName();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取任务描述
|
||||
*
|
||||
* @return 任务描述
|
||||
*/
|
||||
default String getJobDescription() {
|
||||
return "定时任务";
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否支持进度报告
|
||||
* 返回true表示该任务会主动上报进度,返回false则只记录开始和结束
|
||||
*
|
||||
* @return 是否支持进度报告
|
||||
*/
|
||||
default boolean supportProgress() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务执行前的预处理
|
||||
* 可用于参数验证、资源准备等
|
||||
*
|
||||
* @param params 任务参数
|
||||
* @throws Exception 预处理异常
|
||||
*/
|
||||
default void beforeExecute(Map<String, Object> params) throws Exception {
|
||||
// 默认空实现
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务执行后的后处理
|
||||
* 可用于资源释放、结果处理等
|
||||
*
|
||||
* @param params 任务参数
|
||||
* @param success 是否执行成功
|
||||
*/
|
||||
default void afterExecute(Map<String, Object> params, boolean success) {
|
||||
// 默认空实现
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,78 +1,68 @@
|
||||
package com.qqchen.deploy.backend.schedule.job;
|
||||
|
||||
import com.qqchen.deploy.backend.schedule.annotation.MonitoredJob;
|
||||
import com.qqchen.deploy.backend.schedule.service.JobProgressReporter;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 工作流历史数据清理任务
|
||||
* 示例定时任务:演示如何使用MonitoredJob注解和JobProgressReporter进行进度报告
|
||||
*
|
||||
* 使用统一的任务接口和抽象基类,大幅简化代码
|
||||
* 不再需要手动注入 JobProgressReporter 和添加 @MonitoredJob 注解
|
||||
*
|
||||
* @author qichen
|
||||
*/
|
||||
@Slf4j
|
||||
@Component("workflowCleanJob")
|
||||
public class WorkflowCleanJob {
|
||||
public class WorkflowCleanJob extends AbstractScheduleJob {
|
||||
|
||||
@Resource
|
||||
private JobProgressReporter progressReporter;
|
||||
@Override
|
||||
public String getJobName() {
|
||||
return "工作流历史数据清理任务";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJobDescription() {
|
||||
return "定期清理过期的工作流实例、节点和表单数据";
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行工作流历史数据清理
|
||||
*
|
||||
* @param retentionDays 保留天数
|
||||
* 核心业务逻辑
|
||||
*/
|
||||
@MonitoredJob(jobId = 1, jobName = "工作流历史数据清理任务")
|
||||
public void execute(Integer retentionDays) {
|
||||
log.info("开始执行工作流历史数据清理任务,保留天数: {}", retentionDays);
|
||||
@Override
|
||||
protected void doExecute(Map<String, Object> params) throws Exception {
|
||||
// 从参数中获取保留天数,默认90天
|
||||
Integer retentionDays = getParam(params, "retentionDays", 90);
|
||||
log.info("保留天数: {}", retentionDays);
|
||||
|
||||
try {
|
||||
// 步骤1: 查询需要清理的数据
|
||||
progressReporter.updateProgress(10, "正在查询需要清理的数据...");
|
||||
LocalDateTime cutoffDate = LocalDateTime.now().minusDays(retentionDays != null ? retentionDays : 90);
|
||||
log.info("清理截止日期: {}", cutoffDate);
|
||||
// 步骤1: 查询需要清理的数据
|
||||
updateProgress(10, "正在查询需要清理的数据...");
|
||||
LocalDateTime cutoffDate = LocalDateTime.now().minusDays(retentionDays);
|
||||
log.info("清理截止日期: {}", cutoffDate);
|
||||
Thread.sleep(1000); // 模拟耗时
|
||||
|
||||
// 模拟耗时操作
|
||||
Thread.sleep(1000);
|
||||
// 步骤2: 清理工作流实例数据
|
||||
updateProgress(30, "正在清理工作流实例数据...");
|
||||
int deletedInstances = cleanWorkflowInstances(cutoffDate);
|
||||
log.info("已清理工作流实例数: {}", deletedInstances);
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 步骤2: 清理工作流实例数据
|
||||
progressReporter.updateProgress(30, "正在清理工作流实例数据...");
|
||||
int deletedInstances = cleanWorkflowInstances(cutoffDate);
|
||||
log.info("已清理工作流实例数: {}", deletedInstances);
|
||||
// 步骤3: 清理节点实例数据
|
||||
updateProgress(60, "正在清理节点实例数据...");
|
||||
int deletedNodes = cleanNodeInstances(cutoffDate);
|
||||
log.info("已清理节点实例数: {}", deletedNodes);
|
||||
Thread.sleep(1000);
|
||||
|
||||
Thread.sleep(1000);
|
||||
// 步骤4: 清理表单数据
|
||||
updateProgress(80, "正在清理表单数据...");
|
||||
int deletedForms = cleanFormData(cutoffDate);
|
||||
log.info("已清理表单数据数: {}", deletedForms);
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 步骤3: 清理节点实例数据
|
||||
progressReporter.updateProgress(60, "正在清理节点实例数据...");
|
||||
int deletedNodes = cleanNodeInstances(cutoffDate);
|
||||
log.info("已清理节点实例数: {}", deletedNodes);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 步骤4: 清理表单数据
|
||||
progressReporter.updateProgress(80, "正在清理表单数据...");
|
||||
int deletedForms = cleanFormData(cutoffDate);
|
||||
log.info("已清理表单数据数: {}", deletedForms);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// 步骤5: 完成
|
||||
progressReporter.updateProgress(100, "清理任务完成");
|
||||
log.info("工作流历史数据清理任务执行完成,共清理实例: {}, 节点: {}, 表单: {}",
|
||||
deletedInstances, deletedNodes, deletedForms);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
log.error("任务执行被中断", e);
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("任务执行被中断", e);
|
||||
} catch (Exception e) {
|
||||
log.error("工作流历史数据清理任务执行失败", e);
|
||||
throw new RuntimeException("工作流历史数据清理任务执行失败", e);
|
||||
}
|
||||
log.info("清理完成,共清理实例: {}, 节点: {}, 表单: {}",
|
||||
deletedInstances, deletedNodes, deletedForms);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -85,6 +85,53 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJob, Schedul
|
||||
return super.create(dto);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public ScheduleJobDTO update(Long id, ScheduleJobDTO dto) {
|
||||
// 更新数据库
|
||||
ScheduleJobDTO updated = super.update(id, dto);
|
||||
|
||||
// 如果任务正在运行,需要重新调度以应用新参数
|
||||
try {
|
||||
JobKey jobKey = getJobKey(id);
|
||||
if (scheduler.checkExists(jobKey)) {
|
||||
ScheduleJob job = findEntityById(id);
|
||||
|
||||
// 先删除旧的Job
|
||||
scheduler.deleteJob(jobKey);
|
||||
|
||||
// 重新创建JobDetail(使用最新参数)
|
||||
JobDetail jobDetail = JobBuilder.newJob(DynamicJob.class)
|
||||
.withIdentity(jobKey)
|
||||
.withDescription(job.getJobDescription())
|
||||
.build();
|
||||
|
||||
// 设置最新的JobDataMap
|
||||
jobDetail.getJobDataMap().put("jobId", job.getId());
|
||||
jobDetail.getJobDataMap().put("jobName", job.getJobName());
|
||||
jobDetail.getJobDataMap().put("beanName", job.getBeanName());
|
||||
jobDetail.getJobDataMap().put("methodName", job.getMethodName());
|
||||
jobDetail.getJobDataMap().put("methodParams", job.getMethodParams());
|
||||
|
||||
// 重新创建Trigger
|
||||
Trigger trigger = TriggerBuilder.newTrigger()
|
||||
.withIdentity(getTriggerKey(id))
|
||||
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
|
||||
.build();
|
||||
|
||||
// 重新调度
|
||||
scheduler.scheduleJob(jobDetail, trigger);
|
||||
|
||||
log.info("任务参数已更新并重新调度:jobId={}, jobName={}", job.getId(), job.getJobName());
|
||||
}
|
||||
} catch (SchedulerException e) {
|
||||
log.error("更新任务调度失败:jobId={}", id, e);
|
||||
// 不抛异常,因为数据库已经更新成功,下次重启会生效
|
||||
}
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<ScheduleJobDTO> page(ScheduleJobQuery query) {
|
||||
Page<ScheduleJobDTO> page = super.page(query);
|
||||
|
||||
@ -886,6 +886,33 @@ INSERT INTO schedule_job (id, job_name, job_description, category_id, bean_name,
|
||||
cron_expression, status, concurrent, timeout_seconds, retry_count,
|
||||
create_by, create_time, update_by, update_time, version, deleted)
|
||||
VALUES
|
||||
-- 数据清理任务
|
||||
(1, '工作流历史数据清理', '清理90天前已完成或已取消的工作流实例数据', 1, 'workflowCleanJob', 'execute',
|
||||
'{"retentionDays": 90}', '0 0 2 * * ?', 'ENABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
-- Git数据同步任务(需要先配置外部系统,修改externalSystemId参数)
|
||||
(2, 'Git仓库组同步', '定期同步Git仓库组信息,每天凌晨2点执行', 2, 'repositoryGroupServiceImpl', 'syncGroups',
|
||||
'{"externalSystemId": 1}', '0 0 2 * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
(3, 'Git项目同步', '定期同步Git项目信息,每天凌晨3点执行', 2, 'repositoryProjectServiceImpl', 'syncProjects',
|
||||
'{"externalSystemId": 1}', '0 0 3 * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
(4, 'Git分支同步', '定期同步Git仓库分支信息,每5分钟执行一次', 2, 'repositoryBranchServiceImpl', 'syncBranches',
|
||||
'{"externalSystemId": 1}', '0 */5 * * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
-- Jenkins数据同步任务(需要先配置外部系统,修改externalSystemId参数)
|
||||
(5, 'Jenkins视图同步', '定期同步Jenkins视图信息,每天凌晨4点执行', 2, 'jenkinsViewServiceImpl', 'syncViews',
|
||||
'{"externalSystemId": 1}', '0 0 4 * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
(6, 'Jenkins任务同步', '定期同步Jenkins任务信息,每天凌晨5点执行', 2, 'jenkinsJobServiceImpl', 'syncJobs',
|
||||
'{"externalSystemId": 1}', '0 0 5 * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0),
|
||||
|
||||
(7, 'Jenkins构建同步', '定期同步Jenkins构建信息,每5分钟执行一次', 2, 'jenkinsBuildServiceImpl', 'syncBuilds',
|
||||
'{"externalSystemId": 1}', '0 */5 * * * ?', 'DISABLED', 0, 3600, 2,
|
||||
'system', '2024-01-01 00:00:00', 'system', '2024-01-01 00:00:00', 1, 0);
|
||||
|
||||
@ -350,9 +350,8 @@ CREATE TABLE deploy_repo_branch
|
||||
last_commit_id VARCHAR(64) NULL COMMENT '最新提交ID',
|
||||
commit_message TEXT NULL COMMENT '最新提交信息',
|
||||
commit_author VARCHAR(100) NULL COMMENT '最新提交作者',
|
||||
commit_date DATETIME(6) NULL COMMENT '最新提交时间',
|
||||
last_update_time DATETIME(6) NULL COMMENT '分支最后更新时间',
|
||||
last_commit_time DATETIME(6) NULL COMMENT '分支最后提交时间',
|
||||
commit_date DATETIME(6) NULL COMMENT '最新提交时间(authored_date)',
|
||||
last_update_time DATETIME(6) NULL COMMENT '分支最后更新时间(committed_date)',
|
||||
web_url VARCHAR(1000) NULL COMMENT '网页URL',
|
||||
project_id BIGINT NOT NULL COMMENT '所属项目ID',
|
||||
external_system_id BIGINT NOT NULL COMMENT '外部系统ID',
|
||||
|
||||
Loading…
Reference in New Issue
Block a user