打印了JENKINS节点日志

This commit is contained in:
dengqichen 2025-11-03 16:48:19 +08:00
parent a2c08ad75d
commit 3307b5cb46
24 changed files with 1024 additions and 549 deletions

View File

@ -76,6 +76,12 @@
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- Mail -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!-- Database -->
<dependency>
<groupId>com.mysql</groupId>

View File

@ -22,6 +22,7 @@ import com.qqchen.deploy.backend.deploy.entity.DeployRecord;
import com.qqchen.deploy.backend.deploy.enums.DeployRecordStatusEnums;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -107,18 +108,13 @@ public class DeployServiceImpl implements IDeployService {
List<Long> teamIds = new ArrayList<>(teamIdSet);
// 3. 批量查询团队信息
Map<Long, Team> teamMap = teamRepository.findAllById(teamIds)
.stream().collect(toMap(Team::getId, t -> t));
Map<Long, Team> teamMap = teamRepository.findAllById(teamIds).stream().collect(toMap(Team::getId, t -> t));
// 4. 批量查询团队配置
Map<Long, TeamConfig> configMap = teamConfigRepository.findByTeamIdIn(teamIds)
.stream().collect(toMap(TeamConfig::getTeamId, c -> c));
Map<Long, TeamConfig> configMap = teamConfigRepository.findByTeamIdIn(teamIds).stream().collect(toMap(TeamConfig::getTeamId, c -> c));
// 5. 收集所有环境ID
Set<Long> allEnvIds = configMap.values().stream()
.filter(c -> c.getAllowedEnvironmentIds() != null)
.flatMap(c -> c.getAllowedEnvironmentIds().stream())
.collect(Collectors.toSet());
Set<Long> allEnvIds = configMap.values().stream().filter(c -> c.getAllowedEnvironmentIds() != null).flatMap(c -> c.getAllowedEnvironmentIds().stream()).collect(Collectors.toSet());
if (allEnvIds.isEmpty()) {
log.info("用户 {} 所属团队未配置任何环境", user.getUsername());
@ -126,73 +122,50 @@ public class DeployServiceImpl implements IDeployService {
}
// 6. 批量查询环境信息
Map<Long, Environment> envMap = environmentRepository.findAllById(allEnvIds)
.stream().collect(toMap(Environment::getId, e -> e));
Map<Long, Environment> envMap = environmentRepository.findAllById(allEnvIds).stream().collect(toMap(Environment::getId, e -> e));
// 7. 批量查询所有团队的应用配置
List<TeamApplication> allTeamApps = teamApplicationRepository.findByTeamIdIn(teamIds);
Map<Long, List<TeamApplication>> teamAppsMap = allTeamApps.stream()
.collect(groupingBy(TeamApplication::getTeamId));
Map<Long, List<TeamApplication>> teamAppsMap = allTeamApps.stream().collect(groupingBy(TeamApplication::getTeamId));
// 8. 批量查询应用信息
Set<Long> appIds = allTeamApps.stream()
.map(TeamApplication::getApplicationId)
.collect(Collectors.toSet());
Set<Long> appIds = allTeamApps.stream().map(TeamApplication::getApplicationId).collect(Collectors.toSet());
final Map<Long, Application> appMap;
if (!appIds.isEmpty()) {
appMap = applicationRepository.findAllById(appIds)
.stream().collect(toMap(Application::getId, a -> a));
appMap = applicationRepository.findAllById(appIds).stream().collect(toMap(Application::getId, a -> a));
} else {
appMap = Collections.emptyMap();
}
// 9. 批量查询部署系统
Set<Long> systemIds = allTeamApps.stream()
.map(TeamApplication::getDeploySystemId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Set<Long> systemIds = allTeamApps.stream().map(TeamApplication::getDeploySystemId).filter(Objects::nonNull).collect(Collectors.toSet());
final Map<Long, ExternalSystem> systemMap;
if (!systemIds.isEmpty()) {
systemMap = externalSystemRepository.findAllById(systemIds)
.stream().collect(toMap(ExternalSystem::getId, s -> s));
systemMap = externalSystemRepository.findAllById(systemIds).stream().collect(toMap(ExternalSystem::getId, s -> s));
} else {
systemMap = Collections.emptyMap();
}
// 10. 批量查询工作流定义
Set<Long> workflowIds = allTeamApps.stream()
.map(TeamApplication::getWorkflowDefinitionId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Set<Long> workflowIds = allTeamApps.stream().map(TeamApplication::getWorkflowDefinitionId).filter(Objects::nonNull).collect(Collectors.toSet());
final Map<Long, WorkflowDefinition> workflowMap;
if (!workflowIds.isEmpty()) {
workflowMap = workflowDefinitionRepository.findAllById(workflowIds)
.stream().collect(toMap(WorkflowDefinition::getId, w -> w));
workflowMap = workflowDefinitionRepository.findAllById(workflowIds).stream().collect(toMap(WorkflowDefinition::getId, w -> w));
} else {
workflowMap = Collections.emptyMap();
}
// 11. 批量查询审批人信息
Set<Long> approverUserIds = configMap.values().stream()
.filter(c -> c.getApproverUserIds() != null)
.flatMap(c -> c.getApproverUserIds().stream())
.filter(obj -> obj instanceof List)
.flatMap(obj -> ((List<?>) obj).stream())
.filter(id -> id instanceof Number)
.map(id -> ((Number) id).longValue())
.collect(Collectors.toSet());
Set<Long> approverUserIds = configMap.values().stream().filter(c -> c.getApproverUserIds() != null).flatMap(c -> c.getApproverUserIds().stream()).filter(obj -> obj instanceof List).flatMap(obj -> ((List<?>) obj).stream()).filter(id -> id instanceof Number).map(id -> ((Number) id).longValue()).collect(Collectors.toSet());
final Map<Long, User> approverMap;
if (!approverUserIds.isEmpty()) {
approverMap = userRepository.findAllById(approverUserIds)
.stream().collect(toMap(User::getId, u -> u));
approverMap = userRepository.findAllById(approverUserIds).stream().collect(toMap(User::getId, u -> u));
} else {
approverMap = Collections.emptyMap();
}
// 12. 批量查询部署记录信息
List<Long> teamApplicationIds = allTeamApps.stream()
.map(TeamApplication::getId)
.collect(toList());
List<Long> teamApplicationIds = allTeamApps.stream().map(TeamApplication::getId).collect(toList());
// 12.1 批量查询部署统计信息
final Map<Long, DeployStatisticsDTO> statisticsMap = new HashMap<>();
@ -201,8 +174,7 @@ public class DeployServiceImpl implements IDeployService {
if (!teamApplicationIds.isEmpty()) {
// 查询统计信息
List<Object[]> statisticsList = deployRecordRepository
.findDeployStatisticsByTeamApplicationIds(teamApplicationIds);
List<Object[]> statisticsList = deployRecordRepository.findDeployStatisticsByTeamApplicationIds(teamApplicationIds);
for (Object[] row : statisticsList) {
Long teamApplicationId = (Long) row[0];
Long totalCount = ((Number) row[1]).longValue();
@ -229,10 +201,8 @@ public class DeployServiceImpl implements IDeployService {
}
// 查询最新部署记录用于获取最新状态和部署人
List<DeployRecord> latestRecords = deployRecordRepository
.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds);
latestRecordMap.putAll(latestRecords.stream()
.collect(toMap(DeployRecord::getTeamApplicationId, r -> r)));
List<DeployRecord> latestRecords = deployRecordRepository.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds);
latestRecordMap.putAll(latestRecords.stream().collect(toMap(DeployRecord::getTeamApplicationId, r -> r)));
// 更新统计信息中的最新状态和部署人
latestRecordMap.forEach((teamAppId, record) -> {
@ -246,35 +216,14 @@ public class DeployServiceImpl implements IDeployService {
});
// 查询最近10条部署记录
List<DeployRecord> recentRecords = deployRecordRepository
.findRecentDeployRecordsByTeamApplicationIds(teamApplicationIds, 10);
recentRecordsMap.putAll(recentRecords.stream()
.collect(groupingBy(DeployRecord::getTeamApplicationId)));
List<DeployRecord> recentRecords = deployRecordRepository.findRecentDeployRecordsByTeamApplicationIds(teamApplicationIds, 10);
recentRecordsMap.putAll(recentRecords.stream().collect(groupingBy(DeployRecord::getTeamApplicationId)));
}
// 13. 组装团队数据
Map<Long, TeamMember> teamMemberMap = teamMembers.stream()
.collect(toMap(TeamMember::getTeamId, tm -> tm));
Map<Long, TeamMember> teamMemberMap = teamMembers.stream().collect(toMap(TeamMember::getTeamId, tm -> tm));
List<TeamDeployableDTO> teamDTOs = teamIds.stream()
.map(teamId -> buildTeamDTO(
teamId,
currentUserId,
teamMap,
teamMemberMap,
configMap,
envMap,
teamAppsMap,
appMap,
systemMap,
workflowMap,
approverMap,
statisticsMap,
latestRecordMap,
recentRecordsMap
))
.filter(Objects::nonNull)
.collect(toList());
List<TeamDeployableDTO> teamDTOs = teamIds.stream().map(teamId -> buildTeamDTO(teamId, currentUserId, teamMap, teamMemberMap, configMap, envMap, teamAppsMap, appMap, systemMap, workflowMap, approverMap, statisticsMap, latestRecordMap, recentRecordsMap)).filter(Objects::nonNull).collect(toList());
// 14. 组装最终结果
UserDeployableDTO result = new UserDeployableDTO();
@ -302,22 +251,7 @@ public class DeployServiceImpl implements IDeployService {
/**
* 构建团队DTO
*/
private TeamDeployableDTO buildTeamDTO(
Long teamId,
Long currentUserId,
Map<Long, Team> teamMap,
Map<Long, TeamMember> teamMemberMap,
Map<Long, TeamConfig> configMap,
Map<Long, Environment> envMap,
Map<Long, List<TeamApplication>> teamAppsMap,
Map<Long, Application> appMap,
Map<Long, ExternalSystem> systemMap,
Map<Long, WorkflowDefinition> workflowMap,
Map<Long, User> approverMap,
Map<Long, DeployStatisticsDTO> statisticsMap,
Map<Long, DeployRecord> latestRecordMap,
Map<Long, List<DeployRecord>> recentRecordsMap
) {
private TeamDeployableDTO buildTeamDTO(Long teamId, Long currentUserId, Map<Long, Team> teamMap, Map<Long, TeamMember> teamMemberMap, Map<Long, TeamConfig> configMap, Map<Long, Environment> envMap, Map<Long, List<TeamApplication>> teamAppsMap, Map<Long, Application> appMap, Map<Long, ExternalSystem> systemMap, Map<Long, WorkflowDefinition> workflowMap, Map<Long, User> approverMap, Map<Long, DeployStatisticsDTO> statisticsMap, Map<Long, DeployRecord> latestRecordMap, Map<Long, List<DeployRecord>> recentRecordsMap) {
Team team = teamMap.get(teamId);
if (team == null) {
return null;
@ -347,8 +281,7 @@ public class DeployServiceImpl implements IDeployService {
List<TeamApplication> teamApps = teamAppsMap.getOrDefault(teamId, Collections.emptyList());
// 按环境分组应用
Map<Long, List<TeamApplication>> appsByEnv = teamApps.stream()
.collect(groupingBy(TeamApplication::getEnvironmentId));
Map<Long, List<TeamApplication>> appsByEnv = teamApps.stream().collect(groupingBy(TeamApplication::getEnvironmentId));
List<DeployableEnvironmentDTO> envDTOs = new ArrayList<>();
for (int i = 0; i < allowedEnvIds.size(); i++) {
@ -358,19 +291,7 @@ public class DeployServiceImpl implements IDeployService {
continue;
}
DeployableEnvironmentDTO envDTO = buildEnvironmentDTO(
env,
config,
i,
appsByEnv.getOrDefault(envId, Collections.emptyList()),
appMap,
systemMap,
workflowMap,
approverMap,
statisticsMap,
latestRecordMap,
recentRecordsMap
);
DeployableEnvironmentDTO envDTO = buildEnvironmentDTO(env, config, i, appsByEnv.getOrDefault(envId, Collections.emptyList()), appMap, systemMap, workflowMap, approverMap, statisticsMap, latestRecordMap, recentRecordsMap);
envDTOs.add(envDTO);
}
@ -387,19 +308,7 @@ public class DeployServiceImpl implements IDeployService {
/**
* 构建环境DTO
*/
private DeployableEnvironmentDTO buildEnvironmentDTO(
Environment env,
TeamConfig config,
int envIndex,
List<TeamApplication> teamApps,
Map<Long, Application> appMap,
Map<Long, ExternalSystem> systemMap,
Map<Long, WorkflowDefinition> workflowMap,
Map<Long, User> approverMap,
Map<Long, DeployStatisticsDTO> statisticsMap,
Map<Long, DeployRecord> latestRecordMap,
Map<Long, List<DeployRecord>> recentRecordsMap
) {
private DeployableEnvironmentDTO buildEnvironmentDTO(Environment env, TeamConfig config, int envIndex, List<TeamApplication> teamApps, Map<Long, Application> appMap, Map<Long, ExternalSystem> systemMap, Map<Long, WorkflowDefinition> workflowMap, Map<Long, User> approverMap, Map<Long, DeployStatisticsDTO> statisticsMap, Map<Long, DeployRecord> latestRecordMap, Map<Long, List<DeployRecord>> recentRecordsMap) {
DeployableEnvironmentDTO envDTO = new DeployableEnvironmentDTO();
envDTO.setEnvironmentId(env.getId());
envDTO.setEnvironmentCode(env.getEnvCode());
@ -420,11 +329,7 @@ public class DeployServiceImpl implements IDeployService {
Object approverObj = config.getApproverUserIds().get(envIndex);
if (approverObj instanceof List) {
List<?> approverList = (List<?>) approverObj;
List<ApproverDTO> approverDTOs = approverList.stream()
.filter(id -> id instanceof Number)
.map(id -> buildApproverDTO(((Number) id).longValue(), approverMap))
.filter(Objects::nonNull)
.collect(toList());
List<ApproverDTO> approverDTOs = approverList.stream().filter(id -> id instanceof Number).map(id -> buildApproverDTO(((Number) id).longValue(), approverMap)).filter(Objects::nonNull).collect(toList());
envDTO.setApprovers(approverDTOs);
} else {
envDTO.setApprovers(Collections.emptyList());
@ -434,18 +339,7 @@ public class DeployServiceImpl implements IDeployService {
}
// 应用列表
List<DeployableApplicationDTO> appDTOs = teamApps.stream()
.map(ta -> buildApplicationDTO(
ta,
appMap,
systemMap,
workflowMap,
statisticsMap,
latestRecordMap,
recentRecordsMap
))
.filter(Objects::nonNull)
.collect(toList());
List<DeployableApplicationDTO> appDTOs = teamApps.stream().map(ta -> buildApplicationDTO(ta, appMap, systemMap, workflowMap, statisticsMap, latestRecordMap, recentRecordsMap)).filter(Objects::nonNull).collect(toList());
envDTO.setApplications(appDTOs);
@ -471,15 +365,7 @@ public class DeployServiceImpl implements IDeployService {
/**
* 构建应用DTO
*/
private DeployableApplicationDTO buildApplicationDTO(
TeamApplication ta,
Map<Long, Application> appMap,
Map<Long, ExternalSystem> systemMap,
Map<Long, WorkflowDefinition> workflowMap,
Map<Long, DeployStatisticsDTO> statisticsMap,
Map<Long, DeployRecord> latestRecordMap,
Map<Long, List<DeployRecord>> recentRecordsMap
) {
private DeployableApplicationDTO buildApplicationDTO(TeamApplication ta, Map<Long, Application> appMap, Map<Long, ExternalSystem> systemMap, Map<Long, WorkflowDefinition> workflowMap, Map<Long, DeployStatisticsDTO> statisticsMap, Map<Long, DeployRecord> latestRecordMap, Map<Long, List<DeployRecord>> recentRecordsMap) {
Application app = appMap.get(ta.getApplicationId());
if (app == null) {
return null;
@ -522,8 +408,7 @@ public class DeployServiceImpl implements IDeployService {
DeployRecord latestRecord = latestRecordMap.get(ta.getId());
if (latestRecord != null) {
DeployRecordStatusEnums status = latestRecord.getStatus();
dto.setIsDeploying(status == DeployRecordStatusEnums.CREATED ||
status == DeployRecordStatusEnums.RUNNING);
dto.setIsDeploying(status == DeployRecordStatusEnums.CREATED || status == DeployRecordStatusEnums.RUNNING);
} else {
dto.setIsDeploying(false);
}
@ -540,9 +425,7 @@ public class DeployServiceImpl implements IDeployService {
// 最近部署记录列表
List<DeployRecord> recentRecords = recentRecordsMap.getOrDefault(ta.getId(), Collections.emptyList());
List<DeployRecordSummaryDTO> recordSummaryList = recentRecords.stream()
.map(this::buildDeployRecordSummary)
.collect(toList());
List<DeployRecordSummaryDTO> recordSummaryList = recentRecords.stream().map(this::buildDeployRecordSummary).collect(toList());
dto.setRecentDeployRecords(recordSummaryList);
return dto;
@ -577,20 +460,16 @@ public class DeployServiceImpl implements IDeployService {
@Transactional
public DeployResultDTO executeDeploy(DeployRequestDTO request) {
// 1. 查询团队应用配置
TeamApplication teamApp = teamApplicationRepository.findById(request.getTeamApplicationId())
.orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND));
TeamApplication teamApp = teamApplicationRepository.findById(request.getTeamApplicationId()).orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND));
// 2. 查询工作流定义获取 processKey
WorkflowDefinition workflowDefinition = workflowDefinitionRepository.findById(teamApp.getWorkflowDefinitionId())
.orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[]{"工作流定义"}));
WorkflowDefinition workflowDefinition = workflowDefinitionRepository.findById(teamApp.getWorkflowDefinitionId()).orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[] {"工作流定义"}));
// 3. 查询应用信息
Application application = applicationRepository.findById(teamApp.getApplicationId())
.orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[]{"应用"}));
Application application = applicationRepository.findById(teamApp.getApplicationId()).orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[] {"应用"}));
// 4. 查询环境信息
Environment environment = environmentRepository.findById(teamApp.getEnvironmentId())
.orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[]{"环境"}));
Environment environment = environmentRepository.findById(teamApp.getEnvironmentId()).orElseThrow(() -> new BusinessException(ResponseCode.NOT_FOUND, new Object[] {"环境"}));
// 5. 生成业务标识UUID
String businessKey = UUID.randomUUID().toString();
@ -623,8 +502,10 @@ public class DeployServiceImpl implements IDeployService {
// 转换为 MapFlowable 只支持基本类型
variables.put("jenkins", objectMapper.convertValue(jenkinsInput, Map.class));
variables.put("approval", Map.of("required", true, "userIds", "admin"));
}
// 7. 构造工作流启动请求
WorkflowInstanceStartRequest workflowRequest = new WorkflowInstanceStartRequest();
workflowRequest.setProcessKey(workflowDefinition.getKey());
@ -634,20 +515,10 @@ public class DeployServiceImpl implements IDeployService {
// 8. 启动工作流
WorkflowInstanceDTO workflowInstance = workflowInstanceService.startWorkflow(workflowRequest);
log.info("部署流程已启动: businessKey={}, workflowInstanceId={}, application={}, environment={}",
businessKey, workflowInstance.getId(), application.getAppCode(), environment.getEnvCode());
log.info("部署流程已启动: businessKey={}, workflowInstanceId={}, application={}, environment={}", businessKey, workflowInstance.getId(), application.getAppCode(), environment.getEnvCode());
// 9. 创建部署记录此时已有实例ID
deployRecordService.createDeployRecord(
workflowInstance.getId(),
businessKey,
teamApp.getId(),
teamApp.getTeamId(),
teamApp.getApplicationId(),
teamApp.getEnvironmentId(),
SecurityUtils.getCurrentUsername(),
request.getRemark()
);
deployRecordService.createDeployRecord(workflowInstance.getId(), businessKey, teamApp.getId(), teamApp.getTeamId(), teamApp.getApplicationId(), teamApp.getEnvironmentId(), SecurityUtils.getCurrentUsername(), request.getRemark());
// 10. 返回结果
DeployResultDTO result = new DeployResultDTO();

View File

@ -0,0 +1,42 @@
package com.qqchen.deploy.backend.notification.adapter;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelTypeEnum;
import java.util.Map;
/**
* 通知渠道适配器接口
*
* @author qqchen
* @since 2025-11-03
*/
public interface INotificationChannelAdapter {
/**
* 发送通知
*
* @param config 渠道配置从数据库config字段解析
* @param request 通知请求
* @throws Exception 发送失败时抛出异常
*/
void send(Map<String, Object> config, NotificationRequest request) throws Exception;
/**
* 支持的渠道类型
*
* @return 渠道类型枚举
*/
NotificationChannelTypeEnum supportedType();
/**
* 校验配置是否有效可选实现
*
* @param config 渠道配置
* @return 校验结果消息
*/
default String validateConfig(Map<String, Object> config) {
return "配置有效";
}
}

View File

@ -0,0 +1,164 @@
package com.qqchen.deploy.backend.notification.adapter.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.notification.adapter.INotificationChannelAdapter;
import com.qqchen.deploy.backend.notification.dto.EmailNotificationConfig;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelTypeEnum;
import jakarta.annotation.Resource;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* 邮件通知渠道适配器
*
* @author qqchen
* @since 2025-11-03
*/
@Slf4j
@Component
public class EmailChannelAdapter implements INotificationChannelAdapter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void send(Map<String, Object> config, NotificationRequest request) throws Exception {
// 1. 解析配置
EmailNotificationConfig emailConfig = objectMapper.convertValue(config, EmailNotificationConfig.class);
validateEmailConfig(emailConfig);
// 2. 创建JavaMailSender
JavaMailSenderImpl mailSender = createMailSender(emailConfig);
// 3. 确定收件人
List<String> receivers = determineReceivers(emailConfig, request);
if (CollectionUtils.isEmpty(receivers)) {
throw new IllegalArgumentException("收件人列表为空,且未配置默认收件人");
}
// 4. 构建邮件
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true, "UTF-8");
// 设置发件人
if (emailConfig.getFromName() != null && !emailConfig.getFromName().isEmpty()) {
helper.setFrom(new InternetAddress(emailConfig.getFrom(), emailConfig.getFromName(), "UTF-8"));
} else {
helper.setFrom(emailConfig.getFrom());
}
// 设置收件人
helper.setTo(receivers.toArray(new String[0]));
// 设置抄送
if (!CollectionUtils.isEmpty(request.getMentions())) {
helper.setCc(request.getMentions().toArray(new String[0]));
}
// 设置主题
String subject = request.getTitle() != null && !request.getTitle().isEmpty()
? request.getTitle()
: "系统通知";
helper.setSubject(subject);
// 设置内容支持HTML
helper.setText(request.getContent(), false);
// 5. 发送邮件
log.info("发送邮件通知 - 收件人: {}, 主题: {}", receivers, subject);
mailSender.send(mimeMessage);
log.info("邮件通知发送成功");
}
@Override
public NotificationChannelTypeEnum supportedType() {
return NotificationChannelTypeEnum.EMAIL;
}
@Override
public String validateConfig(Map<String, Object> config) {
try {
EmailNotificationConfig emailConfig = objectMapper.convertValue(config, EmailNotificationConfig.class);
validateEmailConfig(emailConfig);
return "配置有效";
} catch (Exception e) {
return "配置验证失败: " + e.getMessage();
}
}
/**
* 校验邮件配置
*/
private void validateEmailConfig(EmailNotificationConfig config) {
if (config.getSmtpHost() == null || config.getSmtpHost().isEmpty()) {
throw new IllegalArgumentException("SMTP服务器地址不能为空");
}
if (config.getSmtpPort() == null) {
throw new IllegalArgumentException("SMTP端口不能为空");
}
if (config.getUsername() == null || config.getUsername().isEmpty()) {
throw new IllegalArgumentException("SMTP用户名不能为空");
}
if (config.getPassword() == null || config.getPassword().isEmpty()) {
throw new IllegalArgumentException("SMTP密码不能为空");
}
if (config.getFrom() == null || config.getFrom().isEmpty()) {
throw new IllegalArgumentException("发件人邮箱不能为空");
}
}
/**
* 创建JavaMailSender
*/
private JavaMailSenderImpl createMailSender(EmailNotificationConfig config) {
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
mailSender.setHost(config.getSmtpHost());
mailSender.setPort(config.getSmtpPort());
mailSender.setUsername(config.getUsername());
mailSender.setPassword(config.getPassword());
// 设置邮件属性
Properties props = mailSender.getJavaMailProperties();
props.put("mail.transport.protocol", "smtp");
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.debug", "false");
// 如果使用SSL
if (Boolean.TRUE.equals(config.getUseSsl())) {
props.put("mail.smtp.ssl.enable", "true");
props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
}
return mailSender;
}
/**
* 确定收件人列表
*/
private List<String> determineReceivers(EmailNotificationConfig config, NotificationRequest request) {
// 优先使用请求中的收件人
if (!CollectionUtils.isEmpty(request.getReceivers())) {
return request.getReceivers();
}
// 使用配置中的默认收件人
return config.getDefaultReceivers();
}
}

View File

@ -0,0 +1,152 @@
package com.qqchen.deploy.backend.notification.adapter.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.notification.adapter.INotificationChannelAdapter;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
import com.qqchen.deploy.backend.notification.dto.WeworkNotificationConfig;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 企业微信通知渠道适配器
*
* @author qqchen
* @since 2025-11-03
*/
@Slf4j
@Component
public class WeworkChannelAdapter implements INotificationChannelAdapter {
private final RestTemplate restTemplate = new RestTemplate();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void send(Map<String, Object> config, NotificationRequest request) throws Exception {
// 1. 解析配置
WeworkNotificationConfig weworkConfig = objectMapper.convertValue(config, WeworkNotificationConfig.class);
if (weworkConfig.getWebhookUrl() == null || weworkConfig.getWebhookUrl().isEmpty()) {
throw new IllegalArgumentException("企业微信Webhook URL未配置");
}
// 2. 构建消息内容
String message = buildMessage(request);
// 3. 构建@人列表
List<String> mentionedList = buildMentionedList(weworkConfig, request);
List<String> mentionedMobileList = buildMentionedMobileList(weworkConfig, request);
// 4. 构建企业微信消息体
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("msgtype", "text");
Map<String, Object> textContent = new HashMap<>();
textContent.put("content", message);
if (!CollectionUtils.isEmpty(mentionedList)) {
textContent.put("mentioned_list", mentionedList);
}
if (!CollectionUtils.isEmpty(mentionedMobileList)) {
textContent.put("mentioned_mobile_list", mentionedMobileList);
}
messageBody.put("text", textContent);
// 5. 发送请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
String jsonBody = objectMapper.writeValueAsString(messageBody);
HttpEntity<String> entity = new HttpEntity<>(jsonBody, headers);
log.info("发送企业微信通知 - URL: {}, 消息: {}", weworkConfig.getWebhookUrl(), message);
String response = restTemplate.exchange(
weworkConfig.getWebhookUrl(),
HttpMethod.POST,
entity,
String.class
).getBody();
log.info("企业微信通知发送成功 - 响应: {}", response);
}
@Override
public NotificationChannelTypeEnum supportedType() {
return NotificationChannelTypeEnum.WEWORK;
}
@Override
public String validateConfig(Map<String, Object> config) {
try {
WeworkNotificationConfig weworkConfig = objectMapper.convertValue(config, WeworkNotificationConfig.class);
if (weworkConfig.getWebhookUrl() == null || weworkConfig.getWebhookUrl().isEmpty()) {
return "Webhook URL不能为空";
}
if (!weworkConfig.getWebhookUrl().startsWith("https://qyapi.weixin.qq.com")) {
return "Webhook URL格式不正确";
}
return "配置有效";
} catch (Exception e) {
return "配置解析失败: " + e.getMessage();
}
}
/**
* 构建消息内容
*/
private String buildMessage(NotificationRequest request) {
if (request.getTitle() != null && !request.getTitle().isEmpty()) {
return request.getTitle() + "\n" + request.getContent();
}
return request.getContent();
}
/**
* 构建@人列表userid
*/
private List<String> buildMentionedList(WeworkNotificationConfig config, NotificationRequest request) {
List<String> mentionedList = new ArrayList<>();
// 优先使用请求中的mentions
if (!CollectionUtils.isEmpty(request.getMentions())) {
mentionedList.addAll(request.getMentions());
} else if (!CollectionUtils.isEmpty(config.getMentionedList())) {
// 使用配置中的默认值
mentionedList.addAll(config.getMentionedList());
}
return mentionedList;
}
/**
* 构建@人列表手机号
*/
private List<String> buildMentionedMobileList(WeworkNotificationConfig config, NotificationRequest request) {
List<String> mentionedMobileList = new ArrayList<>();
// 使用配置中的默认手机号
if (!CollectionUtils.isEmpty(config.getMentionedMobileList())) {
mentionedMobileList.addAll(config.getMentionedMobileList());
}
return mentionedMobileList;
}
}

View File

@ -4,9 +4,11 @@ import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.framework.controller.BaseController;
import com.qqchen.deploy.backend.notification.dto.NotificationChannelDTO;
import com.qqchen.deploy.backend.notification.dto.NotificationChannelQuery;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
import com.qqchen.deploy.backend.notification.entity.NotificationChannel;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelTypeEnum;
import com.qqchen.deploy.backend.notification.service.INotificationChannelService;
import com.qqchen.deploy.backend.notification.service.INotificationSendService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -39,6 +41,9 @@ public class NotificationChannelApiController
@Resource
private INotificationChannelService notificationChannelService;
@Resource
private INotificationSendService notificationSendService;
@Override
public Response<NotificationChannelDTO> create(NotificationChannelDTO dto) {
return super.create(dto);
@ -122,6 +127,15 @@ public class NotificationChannelApiController
return Response.success();
}
@Operation(summary = "发送通知消息")
@PostMapping("/send")
public Response<Void> send(
@Parameter(description = "通知请求", required = true) @RequestBody NotificationRequest request
) {
notificationSendService.send(request);
return Response.success();
}
@Override
protected void exportData(HttpServletResponse response, List<NotificationChannelDTO> data) {
// TODO: 实现导出功能

View File

@ -0,0 +1,56 @@
package com.qqchen.deploy.backend.notification.dto;
import lombok.Data;
import java.util.List;
/**
* 邮件通知配置DTO
*
* @author qqchen
* @since 2025-11-03
*/
@Data
public class EmailNotificationConfig {
/**
* SMTP服务器地址必填
*/
private String smtpHost;
/**
* SMTP服务器端口必填
*/
private Integer smtpPort;
/**
* SMTP用户名必填
*/
private String username;
/**
* SMTP密码必填
*/
private String password;
/**
* 发件人邮箱必填
*/
private String from;
/**
* 发件人名称可选
*/
private String fromName;
/**
* 默认收件人列表可选
*/
private List<String> defaultReceivers;
/**
* 是否使用SSL可选默认true
*/
private Boolean useSsl = true;
}

View File

@ -0,0 +1,60 @@
package com.qqchen.deploy.backend.notification.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 通知请求DTO
*
* @author qqchen
* @since 2025-11-03
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NotificationRequest {
/**
* 通知渠道ID必填
* 从sys_notification_channel表查询
*/
@NotNull(message = "渠道ID不能为空")
private Long channelId;
/**
* 消息标题可选
* - 企业微信会与content拼接
* - 邮件作为邮件主题
*/
private String title;
/**
* 消息内容必填
* - 企业微信文本内容
* - 邮件邮件正文
*/
@NotBlank(message = "消息内容不能为空")
private String content;
/**
* 收件人列表可选
* - 企业微信不需要发送到配置的群
* - 邮件收件人邮箱列表如果为空则使用渠道配置中的默认收件人
*/
private List<String> receivers;
/**
* @人列表可选
* - 企业微信@specific用户手机号或userid
* - 邮件抄送列表
*/
private List<String> mentions;
}

View File

@ -0,0 +1,32 @@
package com.qqchen.deploy.backend.notification.dto;
import lombok.Data;
import java.util.List;
/**
* 企业微信通知配置DTO
*
* @author qqchen
* @since 2025-11-03
*/
@Data
public class WeworkNotificationConfig {
/**
* Webhook URL必填
*/
private String webhookUrl;
/**
* 默认@的手机号列表可选
*/
private List<String> mentionedMobileList;
/**
* 默认@的用户列表可选
* 例如["@all"] 表示@所有人
*/
private List<String> mentionedList;
}

View File

@ -14,7 +14,12 @@ public enum NotificationChannelTypeEnum {
/**
* 企业微信
*/
WEWORK("企业微信", "通过企业微信群机器人发送消息");
WEWORK("企业微信", "通过企业微信群机器人发送消息"),
/**
* 邮件
*/
EMAIL("邮件", "通过SMTP发送邮件");
// /**
// * 飞书
@ -32,11 +37,6 @@ public enum NotificationChannelTypeEnum {
// SMS("短信", "通过短信平台发送消息"),
//
// /**
// * 邮件
// */
// EMAIL("邮件", "通过SMTP发送邮件"),
//
// /**
// * Slack
// */
// SLACK("Slack", "通过Slack Webhook发送消息");

View File

@ -0,0 +1,58 @@
package com.qqchen.deploy.backend.notification.factory;
import com.qqchen.deploy.backend.notification.adapter.INotificationChannelAdapter;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelTypeEnum;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 通知渠道适配器工厂
*
* @author qqchen
* @since 2025-11-03
*/
@Slf4j
@Component
public class NotificationChannelAdapterFactory {
@Resource
private List<INotificationChannelAdapter> adapters;
private final Map<NotificationChannelTypeEnum, INotificationChannelAdapter> adapterMap = new HashMap<>();
/**
* 初始化适配器映射
*/
@PostConstruct
public void init() {
for (INotificationChannelAdapter adapter : adapters) {
NotificationChannelTypeEnum type = adapter.supportedType();
adapterMap.put(type, adapter);
log.info("注册通知渠道适配器: {} -> {}", type, adapter.getClass().getSimpleName());
}
}
/**
* 根据渠道类型获取适配器
*
* @param type 渠道类型
* @return 适配器实例
* @throws IllegalArgumentException 如果不支持该渠道类型
*/
public INotificationChannelAdapter getAdapter(NotificationChannelTypeEnum type) {
INotificationChannelAdapter adapter = adapterMap.get(type);
if (adapter == null) {
throw new IllegalArgumentException("不支持的通知渠道类型: " + type);
}
return adapter;
}
}

View File

@ -0,0 +1,51 @@
package com.qqchen.deploy.backend.notification.service;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
/**
* 通知发送服务接口
*
* @author qqchen
* @since 2025-11-03
*/
public interface INotificationSendService {
/**
* 发送通知统一接口
*
* @param request 通知请求
* @throws com.qqchen.deploy.backend.framework.exception.BusinessException 渠道不存在渠道已禁用发送失败
*/
void send(NotificationRequest request);
/**
* 便捷方法发送简单文本通知
*
* @param channelId 渠道ID
* @param content 消息内容
*/
default void sendSimple(Long channelId, String content) {
NotificationRequest request = NotificationRequest.builder()
.channelId(channelId)
.content(content)
.build();
send(request);
}
/**
* 便捷方法发送带标题的通知
*
* @param channelId 渠道ID
* @param title 标题
* @param content 内容
*/
default void send(Long channelId, String title, String content) {
NotificationRequest request = NotificationRequest.builder()
.channelId(channelId)
.title(title)
.content(content)
.build();
send(request);
}
}

View File

@ -5,13 +5,17 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.exception.UniqueConstraintException;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.notification.adapter.INotificationChannelAdapter;
import com.qqchen.deploy.backend.notification.converter.NotificationChannelConverter;
import com.qqchen.deploy.backend.notification.dto.NotificationChannelDTO;
import com.qqchen.deploy.backend.notification.dto.NotificationChannelQuery;
import com.qqchen.deploy.backend.notification.dto.NotificationRequest;
import com.qqchen.deploy.backend.notification.entity.NotificationChannel;
import com.qqchen.deploy.backend.notification.enums.NotificationChannelStatusEnum;
import com.qqchen.deploy.backend.notification.factory.NotificationChannelAdapterFactory;
import com.qqchen.deploy.backend.notification.repository.INotificationChannelRepository;
import com.qqchen.deploy.backend.notification.service.INotificationChannelService;
import com.qqchen.deploy.backend.notification.service.INotificationSendService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -30,7 +34,7 @@ import static com.qqchen.deploy.backend.framework.annotation.ServiceType.Type.DA
@ServiceType(DATABASE)
public class NotificationChannelServiceImpl
extends BaseServiceImpl<NotificationChannel, NotificationChannelDTO, NotificationChannelQuery, Long>
implements INotificationChannelService {
implements INotificationChannelService, INotificationSendService {
@Resource
private INotificationChannelRepository notificationChannelRepository;
@ -38,6 +42,9 @@ public class NotificationChannelServiceImpl
@Resource
private NotificationChannelConverter notificationChannelConverter;
@Resource
private NotificationChannelAdapterFactory adapterFactory;
@Override
protected void validateUniqueConstraints(NotificationChannelDTO dto) {
// 检查渠道名称唯一性
@ -81,5 +88,48 @@ public class NotificationChannelServiceImpl
log.info("禁用通知渠道: id={}, name={}", id, channel.getName());
}
@Override
public void send(NotificationRequest request) {
// 1. 参数校验
if (request == null || request.getChannelId() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM);
}
if (request.getContent() == null || request.getContent().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM);
}
// 2. 查询渠道配置
NotificationChannel channel = notificationChannelRepository.findById(request.getChannelId())
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND));
// 3. 校验渠道状态
if (channel.getStatus() != NotificationChannelStatusEnum.ENABLED) {
throw new BusinessException(ResponseCode.DATA_NOT_FOUND);
}
// 4. 获取对应的适配器
INotificationChannelAdapter adapter;
try {
adapter = adapterFactory.getAdapter(channel.getChannelType());
} catch (IllegalArgumentException e) {
log.error("获取通知渠道适配器失败: {}", e.getMessage());
throw new BusinessException(ResponseCode.ERROR);
}
// 5. 发送通知
try {
log.info("发送通知 - 渠道ID: {}, 渠道类型: {}, 标题: {}",
channel.getId(), channel.getChannelType(), request.getTitle());
adapter.send(channel.getConfig(), request);
log.info("通知发送成功 - 渠道ID: {}", channel.getId());
} catch (Exception e) {
log.error("通知发送失败 - 渠道ID: {}, 错误: {}", channel.getId(), e.getMessage(), e);
throw new BusinessException(ResponseCode.ERROR, new Object[]{e.getMessage()});
}
}
}

View File

@ -4,10 +4,13 @@ import com.qqchen.deploy.backend.workflow.dto.inputmapping.ApprovalInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.ApprovalOutputs;
import com.qqchen.deploy.backend.workflow.enums.ApprovalModeEnum;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.flowable.task.service.delegate.DelegateTask;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
@ -106,16 +109,87 @@ public class ApprovalTaskListener extends BaseTaskListener<ApprovalInputMapping,
case VARIABLE:
// 从流程变量中获取审批人
if (input.getApproverVariable() != null) {
Object approverValue = task.getVariable(input.getApproverVariable());
String approverVariableExpression = input.getApproverVariable();
log.info("解析审批人变量表达式: {}", approverVariableExpression);
// 解析表达式 ${approval.userIds}
Object approverValue = resolveExpression(task, approverVariableExpression);
if (approverValue != null) {
task.setAssignee(approverValue.toString());
log.info("Set assignee from variable {}: {}", input.getApproverVariable(), approverValue);
log.info("解析得到审批人: {}", approverValue);
// 处理不同类型的审批人数据
if (approverValue instanceof List) {
// 列表形式可能是多个审批人
List<?> approvers = (List<?>) approverValue;
if (!approvers.isEmpty()) {
if (input.getApprovalMode() == ApprovalModeEnum.SINGLE) {
// 单人审批取第一个
task.setAssignee(String.valueOf(approvers.get(0)));
log.info("Set single assignee from list: {}", approvers.get(0));
} else {
// 会签/或签设置所有候选人
for (Object approver : approvers) {
task.addCandidateUser(String.valueOf(approver));
}
log.info("Set candidate users from list: {}", approvers);
}
}
} else {
// 单个值直接设置为审批人
task.setAssignee(String.valueOf(approverValue));
log.info("Set assignee from variable {}: {}", approverVariableExpression, approverValue);
}
} else {
log.warn("无法解析审批人变量: {}", approverVariableExpression);
}
}
break;
}
}
/**
* 解析流程变量表达式
* 支持简单表达式 ${approval.userIds}和直接变量名 approval.userIds
*/
private Object resolveExpression(DelegateTask task, String expression) {
try {
// 如果是表达式格式${...}手动解析
if (expression.startsWith("${") && expression.endsWith("}")) {
// 去掉 ${ }
String variablePath = expression.substring(2, expression.length() - 1);
return resolveVariablePath(task, variablePath);
}
// 如果不是表达式格式直接解析路径 approval.userIds
return resolveVariablePath(task, expression);
} catch (Exception e) {
log.error("解析表达式失败: {}", expression, e);
return null;
}
}
/**
* 手动解析变量路径 approval.userIds
*/
private Object resolveVariablePath(DelegateTask task, String path) {
String[] parts = path.split("\\.");
Object value = task.getVariable(parts[0]);
// 如果是嵌套路径继续解析
for (int i = 1; i < parts.length && value != null; i++) {
if (value instanceof Map) {
value = ((Map<?, ?>) value).get(parts[i]);
} else {
log.warn("变量 {} 不是 Map 类型,无法继续解析路径", parts[i - 1]);
return null;
}
}
return value;
}
/**
* 将审批相关信息保存为任务变量
* 这样前端查询任务时可以获取到这些信息

View File

@ -13,6 +13,7 @@ import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.JenkinsBuildInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.JenkinsBuildOutputs;
import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance;
import com.qqchen.deploy.backend.workflow.enums.LogLevel;
import com.qqchen.deploy.backend.workflow.enums.LogSource;
import com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum;
import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeInstanceService;
@ -45,9 +46,6 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
@Resource
private IJenkinsJobRepository jenkinsJobRepository;
@Resource
private IWorkflowNodeInstanceService workflowNodeInstanceService;
@Resource
private IWorkflowNodeLogService workflowNodeLogService;
@ -59,110 +57,89 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
private static final int MAX_BUILD_POLLS = 180; // 30分钟超时
// 用于存储当前节点实例ID在线程内共享
private ThreadLocal<Long> currentNodeInstanceId = new ThreadLocal<>();
@Override
protected JenkinsBuildOutputs executeInternal(DelegateExecution execution, Map<String, Object> configs, JenkinsBuildInputMapping input) {
try {
log.info("Jenkins Build - serverId: {}, jobName: {}",
input.getServerId(), input.getJobName());
log.info("Jenkins Build - serverId: {}, jobName: {}", input.getServerId(), input.getJobName());
// 1. 获取外部系统
ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId())
.orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId()));
// 1. 获取外部系统
ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId()).orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId()));
String jobName = input.getJobName();
String jobName = input.getJobName();
// 2. 触发构建
Map<String, String> parameters = new HashMap<>();
// 可以根据需要添加构建参数
// parameters.put("BRANCH", "main");
// 2. 触发构建
Map<String, String> parameters = new HashMap<>();
// 可以根据需要添加构建参数
// parameters.put("BRANCH", "main");
String queueId = jenkinsServiceIntegration.buildWithParameters(
externalSystem, jobName, parameters);
String queueId = jenkinsServiceIntegration.buildWithParameters(externalSystem, jobName, parameters);
log.info("Jenkins build queued: queueId={}", queueId);
log.info("Jenkins build queued: queueId={}", queueId);
// 3. 等待构建从队列中开始
JenkinsQueueBuildInfoResponse buildInfo = waitForBuildToStart(queueId);
// 3. 等待构建从队列中开始
JenkinsQueueBuildInfoResponse buildInfo = waitForBuildToStart(queueId);
log.info("Jenkins build started: buildNumber={}", buildInfo.getBuildNumber());
log.info("Jenkins build started: buildNumber={}", buildInfo.getBuildNumber());
// 4. 获取节点实例ID延迟获取此时节点实例应该已经创建
Long nodeInstanceId = getNodeInstanceIdSafely(execution);
if (nodeInstanceId != null) {
currentNodeInstanceId.set(nodeInstanceId);
workflowNodeLogService.info(nodeInstanceId, LogSource.JENKINS,
String.format("Jenkins 构建已启动: job=%s, buildNumber=%d", jobName, buildInfo.getBuildNumber()));
// 4. 记录构建启动日志
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("Jenkins 构建已启动: job=%s, buildNumber=%d", jobName, buildInfo.getBuildNumber()));
// 5. 轮询构建状态直到完成
// 注意如果构建失败或被取消pollBuildStatus 会抛出 BpmnError触发错误边界事件
// 只有成功时才会返回到这里
JenkinsBuildStatus buildStatus = pollBuildStatus(execution, externalSystem, jobName, buildInfo.getBuildNumber());
// 5. 获取构建详细信息包括 duration, changeSets, artifacts
JenkinsBuildResponse buildDetails = jenkinsServiceIntegration.getBuildDetails(externalSystem, jobName, buildInfo.getBuildNumber());
// 打印调试信息
log.info("Build details - changeSets: {}, artifacts: {}",
buildDetails.getChangeSets(), buildDetails.getArtifacts());
// 6. 构造输出结果执行到这里说明构建成功
JenkinsBuildOutputs outputs = new JenkinsBuildOutputs();
// 设置统一的执行状态为成功
outputs.setStatus(NodeExecutionStatusEnum.SUCCESS);
// 设置 Jenkins 特有字段
outputs.setBuildStatus(buildStatus.name());
outputs.setBuildNumber(buildInfo.getBuildNumber());
outputs.setBuildUrl(buildInfo.getBuildUrl());
// 从构建详情中提取信息
outputs.setBuildDuration(buildDetails.getDuration() != null ? buildDetails.getDuration().intValue() : 0);
// 提取 Git Commit ID changeSets 中获取第一个
if (buildDetails.getChangeSets() != null && !buildDetails.getChangeSets().isEmpty()) {
log.info("Found {} changeSets", buildDetails.getChangeSets().size());
var changeSet = buildDetails.getChangeSets().get(0);
if (changeSet.getItems() != null && !changeSet.getItems().isEmpty()) {
log.info("Found {} items in changeSet", changeSet.getItems().size());
outputs.setGitCommitId(changeSet.getItems().get(0).getCommitId());
}
// 5. 轮询构建状态直到完成
// 注意如果构建失败或被取消pollBuildStatus 会抛出 BpmnError触发错误边界事件
// 只有成功时才会返回到这里
JenkinsBuildStatus buildStatus = pollBuildStatus(externalSystem, jobName, buildInfo.getBuildNumber());
// 5. 获取构建详细信息包括 duration, changeSets, artifacts
JenkinsBuildResponse buildDetails = jenkinsServiceIntegration.getBuildDetails(externalSystem, jobName, buildInfo.getBuildNumber());
// 打印调试信息
log.info("Build details - changeSets: {}, artifacts: {}",
buildDetails.getChangeSets(), buildDetails.getArtifacts());
// 6. 构造输出结果执行到这里说明构建成功
JenkinsBuildOutputs outputs = new JenkinsBuildOutputs();
// 设置统一的执行状态为成功
outputs.setStatus(NodeExecutionStatusEnum.SUCCESS);
// 设置 Jenkins 特有字段
outputs.setBuildStatus(buildStatus.name());
outputs.setBuildNumber(buildInfo.getBuildNumber());
outputs.setBuildUrl(buildInfo.getBuildUrl());
// 从构建详情中提取信息
outputs.setBuildDuration(buildDetails.getDuration() != null ? buildDetails.getDuration().intValue() : 0);
// 提取 Git Commit ID changeSets 中获取第一个
if (buildDetails.getChangeSets() != null && !buildDetails.getChangeSets().isEmpty()) {
log.info("Found {} changeSets", buildDetails.getChangeSets().size());
var changeSet = buildDetails.getChangeSets().get(0);
if (changeSet.getItems() != null && !changeSet.getItems().isEmpty()) {
log.info("Found {} items in changeSet", changeSet.getItems().size());
outputs.setGitCommitId(changeSet.getItems().get(0).getCommitId());
}
} else {
log.warn("No changeSets found in build details");
}
if (outputs.getGitCommitId() == null) {
outputs.setGitCommitId("");
}
// 提取构建制品URL如果有多个制品拼接成逗号分隔的列表
if (buildDetails.getArtifacts() != null && !buildDetails.getArtifacts().isEmpty()) {
log.info("Found {} artifacts", buildDetails.getArtifacts().size());
String artifactUrls = buildDetails.getArtifacts().stream()
.map(artifact -> buildInfo.getBuildUrl() + "artifact/" + artifact.getRelativePath())
.collect(java.util.stream.Collectors.joining(","));
outputs.setArtifactUrl(artifactUrls);
} else {
log.warn("No artifacts found in build details");
outputs.setArtifactUrl("");
}
// 记录完成日志
Long finalNodeInstanceId = currentNodeInstanceId.get();
if (finalNodeInstanceId != null) {
workflowNodeLogService.info(finalNodeInstanceId, LogSource.JENKINS,
"Jenkins 构建任务执行完成");
}
return outputs;
} finally {
// 清理 ThreadLocal避免内存泄漏
currentNodeInstanceId.remove();
} else {
log.warn("No changeSets found in build details");
}
if (outputs.getGitCommitId() == null) {
outputs.setGitCommitId("");
}
// 提取构建制品URL如果有多个制品拼接成逗号分隔的列表
if (buildDetails.getArtifacts() != null && !buildDetails.getArtifacts().isEmpty()) {
log.info("Found {} artifacts", buildDetails.getArtifacts().size());
String artifactUrls = buildDetails.getArtifacts().stream()
.map(artifact -> buildInfo.getBuildUrl() + "artifact/" + artifact.getRelativePath())
.collect(java.util.stream.Collectors.joining(","));
outputs.setArtifactUrl(artifactUrls);
} else {
log.warn("No artifacts found in build details");
outputs.setArtifactUrl("");
}
// 记录完成日志
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "Jenkins 构建任务执行完成");
return outputs;
}
private JenkinsQueueBuildInfoResponse waitForBuildToStart(String queueId) {
@ -188,10 +165,9 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Build did not start within %d seconds", MAX_QUEUE_POLLS * QUEUE_POLL_INTERVAL));
}
private JenkinsBuildStatus pollBuildStatus(ExternalSystem externalSystem, String jobName, Integer buildNumber) {
private JenkinsBuildStatus pollBuildStatus(DelegateExecution execution, ExternalSystem externalSystem, String jobName, Integer buildNumber) {
int attempts = 0;
long logOffset = 0L; // 记录日志读取位置
Long nodeInstanceId = currentNodeInstanceId.get();
while (attempts < MAX_BUILD_POLLS) {
try {
@ -200,33 +176,19 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
// 1. 增量拉取并保存 Jenkins 构建日志
try {
JenkinsConsoleOutputResponse consoleOutput =
jenkinsServiceIntegration.getConsoleOutput(externalSystem, jobName, buildNumber, logOffset);
JenkinsConsoleOutputResponse consoleOutput = jenkinsServiceIntegration.getConsoleOutput(externalSystem, jobName, buildNumber, logOffset);
// 批量保存日志到数据库同时也输出到控制台
if (consoleOutput.getLines() != null && !consoleOutput.getLines().isEmpty()) {
// 保存到数据库如果有节点实例ID
if (nodeInstanceId != null) {
workflowNodeLogService.batchLog(nodeInstanceId, LogSource.JENKINS,
com.qqchen.deploy.backend.workflow.enums.LogLevel.INFO,
consoleOutput.getLines());
}
// 同时输出到控制台方便开发调试
consoleOutput.getLines().forEach(line -> {
log.info("[Jenkins Build #{}] {}", buildNumber, line);
});
// 保存到数据库
workflowNodeLogService.batchLog(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, LogLevel.INFO, consoleOutput.getLines());
}
// 更新日志偏移量
logOffset = consoleOutput.getNextOffset();
} catch (Exception logEx) {
log.warn("Failed to fetch Jenkins console log, continuing: {}", logEx.getMessage());
if (nodeInstanceId != null) {
workflowNodeLogService.warn(nodeInstanceId, LogSource.JENKINS,
"获取 Jenkins 日志失败: " + logEx.getMessage());
}
workflowNodeLogService.warn(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "获取 Jenkins 日志失败: " + logEx.getMessage());
}
// 2. 获取构建状态
@ -237,148 +199,59 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
case SUCCESS:
// 构建成功拉取剩余日志后返回状态
log.info("Jenkins build succeeded: job={}, buildNumber={}", jobName, buildNumber);
fetchRemainingLogs(externalSystem, jobName, buildNumber, logOffset);
if (nodeInstanceId != null) {
workflowNodeLogService.info(nodeInstanceId, LogSource.JENKINS,
String.format("✅ Jenkins 构建成功: buildNumber=%d", buildNumber));
}
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("✅ Jenkins 构建成功: buildNumber=%d", buildNumber));
return status;
case FAILURE:
// 构建失败拉取剩余日志后抛出错误
fetchRemainingLogs(externalSystem, jobName, buildNumber, logOffset);
if (nodeInstanceId != null) {
workflowNodeLogService.error(nodeInstanceId, LogSource.JENKINS,
String.format("❌ Jenkins 构建失败: buildNumber=%d", buildNumber));
}
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR,
String.format("Jenkins build failed: job=%s, buildNumber=%d", jobName, buildNumber));
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建失败: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build failed: job=%s, buildNumber=%d", jobName, buildNumber));
case ABORTED:
// 构建被取消拉取剩余日志后抛出错误
fetchRemainingLogs(externalSystem, jobName, buildNumber, logOffset);
if (nodeInstanceId != null) {
workflowNodeLogService.error(nodeInstanceId, LogSource.JENKINS,
String.format("❌ Jenkins 构建被取消: buildNumber=%d", buildNumber));
}
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR,
String.format("Jenkins build was aborted: job=%s, buildNumber=%d", jobName, buildNumber));
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建被取消: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build was aborted: job=%s, buildNumber=%d", jobName, buildNumber));
case IN_PROGRESS:
// 继续轮询
attempts++;
break;
case NOT_FOUND:
// 构建记录丢失抛出系统异常
if (nodeInstanceId != null) {
workflowNodeLogService.error(nodeInstanceId, LogSource.JENKINS,
String.format("❌ Jenkins 构建记录未找到: buildNumber=%d", buildNumber));
}
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR,
String.format("Jenkins build not found: job=%s, buildNumber=%d", jobName, buildNumber));
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建记录未找到: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build not found: job=%s, buildNumber=%d", jobName, buildNumber));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (nodeInstanceId != null) {
workflowNodeLogService.error(nodeInstanceId, LogSource.JENKINS, "构建状态轮询被中断");
}
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "构建状态轮询被中断");
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Build status polling was interrupted");
}
}
// 超过最大轮询次数视为超时系统异常
if (nodeInstanceId != null) {
workflowNodeLogService.error(nodeInstanceId, LogSource.JENKINS,
String.format("❌ Jenkins 构建超时: 超过 %d 分钟", MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60));
}
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR,
String.format("Jenkins build timed out after %d minutes: job=%s, buildNumber=%d",
MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60, jobName, buildNumber));
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建超时: 超过 %d 分钟", MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60)
);
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build timed out after %d minutes: job=%s, buildNumber=%d", MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60, jobName, buildNumber));
}
/**
* 拉取剩余的日志构建完成时调用
*/
private void fetchRemainingLogs(ExternalSystem externalSystem, String jobName, Integer buildNumber, long lastOffset) {
Long nodeInstanceId = currentNodeInstanceId.get();
private void fetchRemainingLogs(DelegateExecution execution, ExternalSystem externalSystem, String jobName, Integer buildNumber, long lastOffset) {
try {
JenkinsConsoleOutputResponse consoleOutput =
jenkinsServiceIntegration.getConsoleOutput(externalSystem, jobName, buildNumber, lastOffset);
JenkinsConsoleOutputResponse consoleOutput = jenkinsServiceIntegration.getConsoleOutput(externalSystem, jobName, buildNumber, lastOffset);
if (consoleOutput.getLines() != null && !consoleOutput.getLines().isEmpty()) {
// 保存到数据库如果有节点实例ID
if (nodeInstanceId != null) {
workflowNodeLogService.batchLog(nodeInstanceId, LogSource.JENKINS,
com.qqchen.deploy.backend.workflow.enums.LogLevel.INFO,
consoleOutput.getLines());
}
// 输出到控制台
consoleOutput.getLines().forEach(line -> {
log.info("[Jenkins Build #{}] {}", buildNumber, line);
});
// 保存到数据库
workflowNodeLogService.batchLog(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, LogLevel.INFO, consoleOutput.getLines());
}
log.info("Jenkins build log complete: job={}, buildNumber={}", jobName, buildNumber);
if (nodeInstanceId != null) {
workflowNodeLogService.info(nodeInstanceId, LogSource.JENKINS, "Jenkins 构建日志已完整收集");
}
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "Jenkins 构建日志已完整收集");
} catch (Exception e) {
log.warn("Failed to fetch remaining Jenkins logs: {}", e.getMessage());
if (nodeInstanceId != null) {
workflowNodeLogService.warn(nodeInstanceId, LogSource.JENKINS,
"获取剩余日志失败: " + e.getMessage());
}
workflowNodeLogService.warn(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "获取剩余日志失败: " + e.getMessage());
}
}
/**
* 安全地获取节点实例ID带重试机制
*/
private Long getNodeInstanceIdSafely(DelegateExecution execution) {
String processInstanceId = execution.getProcessInstanceId();
String nodeId = execution.getCurrentActivityId();
// 最多重试 5 每次间隔 500ms
int maxRetries = 5;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// 通过 processInstanceId nodeId 查询 WorkflowNodeInstance
WorkflowNodeInstance nodeInstance = workflowNodeInstanceService
.findByProcessInstanceIdAndNodeId(processInstanceId, nodeId);
if (nodeInstance != null) {
log.info("成功获取节点实例ID: nodeInstanceId={}, retry={}", nodeInstance.getId(), retryCount);
return nodeInstance.getId();
}
// 还没创建等待后重试
retryCount++;
if (retryCount < maxRetries) {
log.debug("节点实例尚未创建,等待重试 ({}/{}): processInstanceId={}, nodeId={}",
retryCount, maxRetries, processInstanceId, nodeId);
Thread.sleep(500); // 等待 500ms
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("等待节点实例创建被中断");
return null;
} catch (Exception e) {
log.warn("获取节点实例ID失败 (retry {}): {}", retryCount, e.getMessage());
retryCount++;
if (retryCount < maxRetries) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return null;
}
}
}
}
log.warn("经过 {} 次重试后仍未获取到节点实例ID: processInstanceId={}, nodeId={}",
maxRetries, processInstanceId, nodeId);
return null;
}
}

View File

@ -1,15 +1,13 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.notification.service.INotificationSendService;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.NotificationInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.NotificationOutputs;
import com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.Map;
@ -23,68 +21,35 @@ import java.util.Map;
@Component("notificationDelegate")
public class NotificationNodeDelegate extends BaseNodeDelegate<NotificationInputMapping, NotificationOutputs> {
private final RestTemplate restTemplate = new RestTemplate();
// TODO: 从数据库中读取webhook配置
private final String WX_HOOK_API = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=614b110b-8957-4be8-95b9-4eca84c15028";
@Resource
private INotificationSendService notificationSendService;
@Override
protected NotificationOutputs executeInternal(
DelegateExecution execution,
Map<String, Object> configs,
NotificationInputMapping input
DelegateExecution execution,
Map<String, Object> configs,
NotificationInputMapping input
) {
log.info("Sending notification - channel: {}, title: {}, content: {}",
input.getNotificationChannel(), input.getTitle(), input.getContent());
log.info("Sending notification - channel: {}, title: {}, content: {}", input.getChannelId(), input.getTitle(), input.getContent());
// 1. 根据notificationChannel获取webhook配置
// TODO: 从数据库查询通知渠道配置
// 2. 发送通知
try {
sendWeChatNotification(input.getTitle(), input.getContent());
// 使用通知服务发送消息
Long channelId = input.getChannelId() != null ? input.getChannelId() : null;
// 3. 返回成功结果
notificationSendService.send(channelId, input.getTitle(), input.getContent());
// 返回成功结果
NotificationOutputs outputs = new NotificationOutputs();
outputs.setStatus(com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum.SUCCESS);
outputs.setStatus(NodeExecutionStatusEnum.SUCCESS);
outputs.setMessage("通知发送成功");
return outputs;
} catch (Exception e) {
log.error("Failed to send notification", e);
NotificationOutputs outputs = new NotificationOutputs();
outputs.setStatus(com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum.FAILURE);
outputs.setStatus(NodeExecutionStatusEnum.FAILURE);
outputs.setMessage("通知发送失败: " + e.getMessage());
return outputs;
}
}
/**
* 发送企业微信通知
*/
private void sendWeChatNotification(String title, String content) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
String message = String.format("%s\n%s", title, content);
String body = String.format(
"{\n" +
" \"msgtype\": \"text\",\n" +
" \"text\": {\n" +
" \"content\": \"%s\"\n" +
" }\n" +
"}", message
);
HttpEntity<String> entity = new HttpEntity<>(body, headers);
restTemplate.exchange(
WX_HOOK_API,
HttpMethod.POST,
entity,
String.class
);
log.info("WeChat notification sent successfully");
}
}

View File

@ -18,7 +18,7 @@ public class NotificationInputMapping {
* 通知渠道ID
*/
@NotNull(message = "通知渠道ID不能为空")
private Integer notificationChannel;
private Long channelId;
/**
* 通知标题

View File

@ -20,10 +20,16 @@ import lombok.EqualsAndHashCode;
public class WorkflowNodeLog extends Entity<Long> {
/**
* 节点实例ID关联 workflow_node_instance.id
* 流程实例IDFlowable processInstanceId
*/
@Column(name = "node_instance_id", nullable = false)
private Long nodeInstanceId;
@Column(name = "process_instance_id", nullable = false, length = 64)
private String processInstanceId;
/**
* 节点IDFlowable nodeId, 例如: sid_xxx
*/
@Column(name = "node_id", nullable = false, length = 64)
private String nodeId;
/**
* 日志序号保证同一节点内日志有序从1开始递增

View File

@ -20,34 +20,30 @@ import java.util.List;
public interface IWorkflowNodeLogRepository extends IBaseRepository<WorkflowNodeLog, Long> {
/**
* 根据节点实例ID查询日志按序号排序
* 根据流程实例ID和节点ID查询日志按序号排序
*/
List<WorkflowNodeLog> findByNodeInstanceIdOrderBySequenceIdAsc(Long nodeInstanceId);
List<WorkflowNodeLog> findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(
String processInstanceId, String nodeId);
/**
* 根据节点实例ID分页查询日志
* 根据流程实例ID和节点ID分页查询日志
*/
Page<WorkflowNodeLog> findByNodeInstanceIdOrderBySequenceIdAsc(Long nodeInstanceId, Pageable pageable);
Page<WorkflowNodeLog> findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(
String processInstanceId, String nodeId, Pageable pageable);
/**
* 根据工作流实例ID查询所有节点的日志使用原生 SQL
* 根据实例ID查询所有节点的日志
*/
@Query(value = "SELECT l.* FROM workflow_node_log l " +
"WHERE l.node_instance_id IN (" +
" SELECT n.id FROM workflow_node_instance n WHERE n.workflow_instance_id = :workflowInstanceId" +
") " +
"ORDER BY l.node_instance_id, l.sequence_id",
nativeQuery = true)
List<WorkflowNodeLog> findByWorkflowInstanceId(@Param("workflowInstanceId") Long workflowInstanceId);
List<WorkflowNodeLog> findByProcessInstanceIdOrderBySequenceIdAsc(String processInstanceId);
/**
* 删除节点的所有日志
*/
void deleteByNodeInstanceId(Long nodeInstanceId);
void deleteByProcessInstanceIdAndNodeId(String processInstanceId, String nodeId);
/**
* 统计节点的日志数量
*/
long countByNodeInstanceId(Long nodeInstanceId);
long countByProcessInstanceIdAndNodeId(String processInstanceId, String nodeId);
}

View File

@ -10,6 +10,7 @@ import java.util.List;
/**
* 工作流节点日志服务接口
* 使用 Flowable processInstanceId + nodeId 作为日志关联键
*
* @author qqchen
* @since 2025-11-03
@ -19,46 +20,46 @@ public interface IWorkflowNodeLogService {
/**
* 记录日志单条
*/
void log(Long nodeInstanceId, LogSource source, LogLevel level, String message);
void log(String processInstanceId, String nodeId, LogSource source, LogLevel level, String message);
/**
* 批量记录日志
*/
void batchLog(Long nodeInstanceId, LogSource source, LogLevel level, List<String> messages);
void batchLog(String processInstanceId, String nodeId, LogSource source, LogLevel level, List<String> messages);
/**
* 便捷方法记录 INFO 日志
*/
void info(Long nodeInstanceId, LogSource source, String message);
void info(String processInstanceId, String nodeId, LogSource source, String message);
/**
* 便捷方法记录 WARN 日志
*/
void warn(Long nodeInstanceId, LogSource source, String message);
void warn(String processInstanceId, String nodeId, LogSource source, String message);
/**
* 便捷方法记录 ERROR 日志
*/
void error(Long nodeInstanceId, LogSource source, String message);
void error(String processInstanceId, String nodeId, LogSource source, String message);
/**
* 查询节点的所有日志
*/
List<WorkflowNodeLog> getNodeLogs(Long nodeInstanceId);
List<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId);
/**
* 分页查询节点日志
*/
Page<WorkflowNodeLog> getNodeLogs(Long nodeInstanceId, Pageable pageable);
Page<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId, Pageable pageable);
/**
* 查询工作流的所有日志
* 查询程实例的所有日志
*/
List<WorkflowNodeLog> getWorkflowLogs(Long workflowInstanceId);
List<WorkflowNodeLog> getProcessInstanceLogs(String processInstanceId);
/**
* 删除节点日志
*/
void deleteNodeLogs(Long nodeInstanceId);
void deleteNodeLogs(String processInstanceId, String nodeId);
}

View File

@ -18,6 +18,7 @@ import java.util.List;
/**
* 工作流节点日志服务实现
* 使用 Flowable processInstanceId + nodeId 作为日志关联键
*
* @author qqchen
* @since 2025-11-03
@ -33,21 +34,22 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
private RedisTemplate<String, String> redisTemplate;
/**
* 生成日志序列号
* 生成日志序列号使用 Redis INCR 保证全局递增
*/
private Long generateSequenceId(Long nodeInstanceId) {
String key = "workflow:node:log:seq:" + nodeInstanceId;
private Long generateSequenceId(String processInstanceId, String nodeId) {
String key = "workflow:node:log:seq:" + processInstanceId + ":" + nodeId;
return redisTemplate.opsForValue().increment(key, 1);
}
@Override
@Transactional
public void log(Long nodeInstanceId, LogSource source, LogLevel level, String message) {
public void log(String processInstanceId, String nodeId, LogSource source, LogLevel level, String message) {
try {
Long sequenceId = generateSequenceId(nodeInstanceId);
Long sequenceId = generateSequenceId(processInstanceId, nodeId);
WorkflowNodeLog log = new WorkflowNodeLog();
log.setNodeInstanceId(nodeInstanceId);
log.setProcessInstanceId(processInstanceId);
log.setNodeId(nodeId);
log.setSequenceId(sequenceId);
log.setTimestamp(System.currentTimeMillis());
log.setSource(source);
@ -57,14 +59,14 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
logRepository.save(log);
} catch (Exception e) {
log.error("Failed to save workflow node log: nodeInstanceId={}, source={}, level={}",
nodeInstanceId, source, level, e);
log.error("Failed to save workflow node log: processInstanceId={}, nodeId={}, source={}, level={}",
processInstanceId, nodeId, source, level, e);
}
}
@Override
@Transactional
public void batchLog(Long nodeInstanceId, LogSource source, LogLevel level, List<String> messages) {
public void batchLog(String processInstanceId, String nodeId, LogSource source, LogLevel level, List<String> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
@ -73,10 +75,11 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
List<WorkflowNodeLog> logs = new ArrayList<>(messages.size());
for (String message : messages) {
Long sequenceId = generateSequenceId(nodeInstanceId);
Long sequenceId = generateSequenceId(processInstanceId, nodeId);
WorkflowNodeLog log = new WorkflowNodeLog();
log.setNodeInstanceId(nodeInstanceId);
log.setProcessInstanceId(processInstanceId);
log.setNodeId(nodeId);
log.setSequenceId(sequenceId);
log.setTimestamp(System.currentTimeMillis());
log.setSource(source);
@ -89,45 +92,45 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
logRepository.saveAll(logs);
} catch (Exception e) {
log.error("Failed to batch save workflow node logs: nodeInstanceId={}, count={}",
nodeInstanceId, messages.size(), e);
log.error("Failed to batch save workflow node logs: processInstanceId={}, nodeId={}, count={}",
processInstanceId, nodeId, messages.size(), e);
}
}
@Override
public void info(Long nodeInstanceId, LogSource source, String message) {
log(nodeInstanceId, source, LogLevel.INFO, message);
public void info(String processInstanceId, String nodeId, LogSource source, String message) {
log(processInstanceId, nodeId, source, LogLevel.INFO, message);
}
@Override
public void warn(Long nodeInstanceId, LogSource source, String message) {
log(nodeInstanceId, source, LogLevel.WARN, message);
public void warn(String processInstanceId, String nodeId, LogSource source, String message) {
log(processInstanceId, nodeId, source, LogLevel.WARN, message);
}
@Override
public void error(Long nodeInstanceId, LogSource source, String message) {
log(nodeInstanceId, source, LogLevel.ERROR, message);
public void error(String processInstanceId, String nodeId, LogSource source, String message) {
log(processInstanceId, nodeId, source, LogLevel.ERROR, message);
}
@Override
public List<WorkflowNodeLog> getNodeLogs(Long nodeInstanceId) {
return logRepository.findByNodeInstanceIdOrderBySequenceIdAsc(nodeInstanceId);
public List<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId) {
return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId);
}
@Override
public Page<WorkflowNodeLog> getNodeLogs(Long nodeInstanceId, Pageable pageable) {
return logRepository.findByNodeInstanceIdOrderBySequenceIdAsc(nodeInstanceId, pageable);
public Page<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId, Pageable pageable) {
return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId, pageable);
}
@Override
public List<WorkflowNodeLog> getWorkflowLogs(Long workflowInstanceId) {
return logRepository.findByWorkflowInstanceId(workflowInstanceId);
public List<WorkflowNodeLog> getProcessInstanceLogs(String processInstanceId) {
return logRepository.findByProcessInstanceIdOrderBySequenceIdAsc(processInstanceId);
}
@Override
@Transactional
public void deleteNodeLogs(Long nodeInstanceId) {
logRepository.deleteByNodeInstanceId(nodeInstanceId);
public void deleteNodeLogs(String processInstanceId, String nodeId) {
logRepository.deleteByProcessInstanceIdAndNodeId(processInstanceId, nodeId);
}
}

View File

@ -662,24 +662,25 @@ CREATE TABLE workflow_node_instance
-- 工作流节点日志表
CREATE TABLE workflow_node_log
(
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除0未删除1已删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除0未删除1已删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
node_instance_id BIGINT NOT NULL COMMENT '节点实例ID关联 workflow_node_instance.id',
sequence_id BIGINT NOT NULL COMMENT '日志序号保证同一节点内日志有序从1开始递增',
timestamp BIGINT NOT NULL COMMENT '时间戳Unix毫秒',
level VARCHAR(10) NOT NULL COMMENT '日志级别INFO, WARN, ERROR, DEBUG',
source VARCHAR(20) NOT NULL COMMENT '日志来源JENKINS, FLOWABLE, SHELL, NOTIFICATION',
message TEXT NOT NULL COMMENT '日志内容',
process_instance_id VARCHAR(64) NOT NULL COMMENT '流程实例IDFlowable processInstanceId',
node_id VARCHAR(64) NOT NULL COMMENT '节点IDFlowable nodeId, 例如: sid_xxx',
sequence_id BIGINT NOT NULL COMMENT '日志序号保证同一节点内日志有序从1开始递增',
timestamp BIGINT NOT NULL COMMENT '时间戳Unix毫秒',
level VARCHAR(10) NOT NULL COMMENT '日志级别INFO, WARN, ERROR, DEBUG',
source VARCHAR(20) NOT NULL COMMENT '日志来源JENKINS, FLOWABLE, SHELL, NOTIFICATION',
message TEXT NOT NULL COMMENT '日志内容',
INDEX idx_node_seq (node_instance_id, sequence_id),
INDEX idx_node_time (node_instance_id, timestamp),
CONSTRAINT FK_workflow_node_log_instance FOREIGN KEY (node_instance_id) REFERENCES workflow_node_instance (id) ON DELETE CASCADE
INDEX idx_process_node_seq (process_instance_id, node_id, sequence_id),
INDEX idx_process_seq (process_instance_id, sequence_id),
INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='工作流节点日志表';
-- --------------------------------------------------------------------------------------