From 28cae45e51ed89801cd548533129ddd762a8eb34 Mon Sep 17 00:00:00 2001 From: dengqichen Date: Fri, 7 Nov 2025 16:02:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=93=E5=8D=B0=E4=BA=86JENKINS=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../deploy/api/DeployApiController.java | 13 + .../backend/deploy/dto/DeployNodeLogDTO.java | 59 ++++ .../deploy/service/IDeployService.java | 10 + .../service/impl/DeployServiceImpl.java | 104 ++++-- .../backend/framework/utils/RedisUtil.java | 109 ++++++ .../workflow/delegate/BaseNodeDelegate.java | 313 ++++++++++++++++-- .../delegate/JenkinsBuildDelegate.java | 66 ++-- .../delegate/NotificationNodeDelegate.java | 28 +- .../workflow/delegate/ShellNodeDelegate.java | 18 +- .../inputmapping/ApprovalInputMapping.java | 4 +- .../inputmapping/BaseNodeInputMapping.java | 42 +++ .../JenkinsBuildInputMapping.java | 2 +- .../NotificationInputMapping.java | 2 +- .../dto/inputmapping/ShellInputMapping.java | 8 +- .../ApprovalEndExecutionListener.java | 64 ++-- .../listener/BaseExecutionListener.java | 56 ++++ .../workflow/listener/BaseTaskListener.java | 49 ++- .../service/IWorkflowNodeLogService.java | 5 - .../impl/WorkflowNodeLogServiceImpl.java | 172 +++++++--- .../backend/workflow/utils/WorkflowUtils.java | 191 +++++++++++ .../components/DeployNodeLogDialog.tsx | 206 ++++++++++++ 21 files changed, 1292 insertions(+), 229 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/DeployNodeLogDTO.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/BaseNodeInputMapping.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseExecutionListener.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/utils/WorkflowUtils.java create mode 100644 frontend/src/pages/Dashboard/components/DeployNodeLogDialog.tsx diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/DeployApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/DeployApiController.java index d3f35f2c..321f941b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/DeployApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/DeployApiController.java @@ -90,5 +90,18 @@ public class DeployApiController { deployService.completeApproval(request); return Response.success(); } + + /** + * 获取节点日志 + */ + @Operation(summary = "获取节点日志", description = "获取指定节点的执行日志,日志保留7天,超过7天将被清除") + @GetMapping("/logs") + @PreAuthorize("isAuthenticated()") + public Response getNodeLogs( + @Parameter(description = "流程实例ID", required = true) @RequestParam String processInstanceId, + @Parameter(description = "节点ID", required = true) @RequestParam String nodeId + ) { + return Response.success(deployService.getNodeLogs(processInstanceId, nodeId)); + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/DeployNodeLogDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/DeployNodeLogDTO.java new file mode 100644 index 00000000..aef87ee2 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/DeployNodeLogDTO.java @@ -0,0 +1,59 @@ +package com.qqchen.deploy.backend.deploy.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.util.List; + +/** + * 部署节点日志响应DTO + * + * @author qqchen + * @since 2025-11-07 + */ +@Data +@Schema(description = "部署节点日志响应") +public class DeployNodeLogDTO { + + @Schema(description = "流程实例ID") + private String processInstanceId; + + @Schema(description = "节点ID") + private String nodeId; + + @Schema(description = "节点名称") + private String nodeName; + + @Schema(description = "日志列表") + private List logs; + + @Schema(description = "日志是否已过期(超过7天被清除)") + private Boolean expired; + + @Schema(description = "提示信息") + private String message; + + /** + * 日志条目 + */ + @Data + @Schema(description = "日志条目") + public static class LogEntry { + + @Schema(description = "序列号") + private Long sequenceId; + + @Schema(description = "时间戳(毫秒)") + private Long timestamp; + + @Schema(description = "日志级别(INFO/WARN/ERROR/DEBUG)") + private String level; + + @Schema(description = "日志来源(JENKINS/FLOWABLE/SHELL/NOTIFICATION)") + private String source; + + @Schema(description = "日志内容") + private String message; + } +} + diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IDeployService.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IDeployService.java index 6b6993b7..ee237760 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IDeployService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IDeployService.java @@ -44,5 +44,15 @@ public interface IDeployService { * @param request 审批完成请求 */ void completeApproval(DeployApprovalCompleteRequest request); + + /** + * 获取节点日志 + *

获取指定节点的执行日志,日志保留7天,超过7天将被清除 + * + * @param processInstanceId 流程实例ID + * @param nodeId 节点ID + * @return 节点日志响应 + */ + DeployNodeLogDTO getNodeLogs(String processInstanceId, String nodeId); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/DeployServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/DeployServiceImpl.java index b7b6f863..1a32c86c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/DeployServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/DeployServiceImpl.java @@ -18,9 +18,11 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest; import com.qqchen.deploy.backend.workflow.dto.inputmapping.ApprovalInputMapping; import com.qqchen.deploy.backend.workflow.dto.outputs.ApprovalOutputs; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeLog; import com.qqchen.deploy.backend.workflow.model.NodeContext; import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService; +import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService; import com.qqchen.deploy.backend.deploy.service.IDeployRecordService; import com.qqchen.deploy.backend.deploy.repository.IDeployRecordRepository; import com.qqchen.deploy.backend.deploy.entity.DeployRecord; @@ -99,6 +101,9 @@ public class DeployServiceImpl implements IDeployService { @Resource private RuntimeService runtimeService; + + @Resource + private IWorkflowNodeLogService workflowNodeLogService; @Override @@ -139,7 +144,7 @@ public class DeployServiceImpl implements IDeployService { // 补充查询作为成员但不是负责人的团队 Set missingTeamIds = teamIds.stream() .filter(id -> !teamMap.containsKey(id)) - .collect(Collectors.toSet()); + .collect(Collectors.toSet()); if (!missingTeamIds.isEmpty()) { teamRepository.findAllById(missingTeamIds).forEach(team -> teamMap.put(team.getId(), team)); } @@ -218,13 +223,13 @@ public class DeployServiceImpl implements IDeployService { // 14. 批量查询团队环境配置 List teamEnvConfigs = teamEnvironmentConfigRepository.findByTeamIdIn(teamIds); Map teamEnvConfigMap = teamEnvConfigs.stream() - .collect(toMap(c -> c.getTeamId() + "_" + c.getEnvironmentId(), c -> c)); - + .collect(toMap(c -> c.getTeamId() + "_" + c.getEnvironmentId(), c -> c)); + // 15. 批量查询审批人信息 Set approverUserIds = teamEnvConfigs.stream() - .filter(c -> c.getApproverUserIds() != null) - .flatMap(c -> c.getApproverUserIds().stream()) - .collect(Collectors.toSet()); + .filter(c -> c.getApproverUserIds() != null) + .flatMap(c -> c.getApproverUserIds().stream()) + .collect(Collectors.toSet()); Map approverMap = !approverUserIds.isEmpty() ? userRepository.findAllById(approverUserIds).stream().collect(toMap(User::getId, u -> u)) : Collections.emptyMap(); @@ -536,32 +541,32 @@ public class DeployServiceImpl implements IDeployService { */ private Map queryDeployStatistics(List teamApplicationIds) { Map statisticsMap = new HashMap<>(); - List statisticsList = deployRecordRepository.findDeployStatisticsByTeamApplicationIds(teamApplicationIds); + List statisticsList = deployRecordRepository.findDeployStatisticsByTeamApplicationIds(teamApplicationIds); - for (Object[] row : statisticsList) { - Long teamApplicationId = (Long) row[0]; - Long totalCount = ((Number) row[1]).longValue(); - Long successCount = ((Number) row[2]).longValue(); - Long failedCount = ((Number) row[3]).longValue(); - Long runningCount = ((Number) row[4]).longValue(); + for (Object[] row : statisticsList) { + Long teamApplicationId = (Long) row[0]; + Long totalCount = ((Number) row[1]).longValue(); + Long successCount = ((Number) row[2]).longValue(); + Long failedCount = ((Number) row[3]).longValue(); + Long runningCount = ((Number) row[4]).longValue(); - LocalDateTime lastDeployTime = null; - if (row[5] != null) { - if (row[5] instanceof Timestamp) { - lastDeployTime = ((Timestamp) row[5]).toLocalDateTime(); - } else if (row[5] instanceof LocalDateTime) { - lastDeployTime = (LocalDateTime) row[5]; + LocalDateTime lastDeployTime = null; + if (row[5] != null) { + if (row[5] instanceof Timestamp) { + lastDeployTime = ((Timestamp) row[5]).toLocalDateTime(); + } else if (row[5] instanceof LocalDateTime) { + lastDeployTime = (LocalDateTime) row[5]; + } } - } - DeployStatisticsDTO stats = new DeployStatisticsDTO(); - stats.setTotalCount(totalCount); - stats.setSuccessCount(successCount); - stats.setFailedCount(failedCount); - stats.setRunningCount(runningCount); - stats.setLastDeployTime(lastDeployTime); - statisticsMap.put(teamApplicationId, stats); - } + DeployStatisticsDTO stats = new DeployStatisticsDTO(); + stats.setTotalCount(totalCount); + stats.setSuccessCount(successCount); + stats.setFailedCount(failedCount); + stats.setRunningCount(runningCount); + stats.setLastDeployTime(lastDeployTime); + statisticsMap.put(teamApplicationId, stats); + } return statisticsMap; } @@ -570,12 +575,12 @@ public class DeployServiceImpl implements IDeployService { * 批量查询最新部署记录 */ private Map queryLatestRecords(List teamApplicationIds) { - List latestRecords = deployRecordRepository.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds); + List latestRecords = deployRecordRepository.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds); Map latestRecordMap = latestRecords.stream() .collect(toMap(DeployRecord::getTeamApplicationId, r -> r)); - // 更新统计信息中的最新状态和部署人 - latestRecordMap.forEach((teamAppId, record) -> { + // 更新统计信息中的最新状态和部署人 + latestRecordMap.forEach((teamAppId, record) -> { // 这里可以添加额外的处理逻辑 }); @@ -1015,5 +1020,42 @@ public class DeployServiceImpl implements IDeployService { return false; } + + @Override + public DeployNodeLogDTO getNodeLogs(String processInstanceId, String nodeId) { + DeployNodeLogDTO result = new DeployNodeLogDTO(); + result.setProcessInstanceId(processInstanceId); + result.setNodeId(nodeId); + + // 查询日志 + List logs = workflowNodeLogService.getNodeLogs(processInstanceId, nodeId); + + if (logs.isEmpty()) { + // 判断是过期还是还没有日志 + result.setExpired(true); + result.setMessage("日志已过期(超过7天)或不存在"); + result.setLogs(Collections.emptyList()); + } else { + result.setExpired(false); + result.setMessage("查询成功"); + + // 转换为 DTO + List logEntries = logs.stream() + .map(log -> { + DeployNodeLogDTO.LogEntry entry = new DeployNodeLogDTO.LogEntry(); + entry.setSequenceId(log.getSequenceId()); + entry.setTimestamp(log.getTimestamp()); + entry.setLevel(log.getLevel().name()); + entry.setSource(log.getSource().name()); + entry.setMessage(log.getMessage()); + return entry; + }) + .collect(Collectors.toList()); + + result.setLogs(logEntries); + } + + return result; + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/RedisUtil.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/RedisUtil.java index 1e81d42f..02a48090 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/RedisUtil.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/RedisUtil.java @@ -2,6 +2,9 @@ package com.qqchen.deploy.backend.framework.utils; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -548,5 +551,111 @@ public class RedisUtil { return Collections.emptySet(); } } + + // ===============================Stream================================= + + /** + * 向Stream中添加数据 + * @param key Stream的key + * @param data 数据(Map格式) + * @return 消息ID + */ + public RecordId streamAdd(String key, Map data) { + try { + return redisTemplate.opsForStream().add(key, data); + } catch (Exception e) { + log.error("Redis streamAdd error: key={}", key, e); + return null; + } + } + + /** + * 向Stream中添加数据并设置过期时间 + * @param key Stream的key + * @param data 数据(Map格式) + * @param time 过期时间(秒) + * @return 消息ID + */ + public RecordId streamAdd(String key, Map data, long time) { + try { + RecordId recordId = redisTemplate.opsForStream().add(key, data); + if (time > 0) { + expire(key, time); + } + return recordId; + } catch (Exception e) { + log.error("Redis streamAdd with expire error: key={}, time={}", key, time, e); + return null; + } + } + + /** + * 限制Stream的长度(保留最新的N条消息) + * @param key Stream的key + * @param maxLen 最大长度 + * @return 是否成功 + */ + public boolean streamTrim(String key, long maxLen) { + try { + redisTemplate.opsForStream().trim(key, maxLen); + return true; + } catch (Exception e) { + log.error("Redis streamTrim error: key={}, maxLen={}", key, maxLen, e); + return false; + } + } + + /** + * 读取Stream中指定范围的数据 + * @param key Stream的key + * @param range 范围(使用Range.unbounded()表示所有数据) + * @return 消息列表 + */ + public List> streamRange(String key, Range range) { + try { + return redisTemplate.opsForStream().range(key, range); + } catch (Exception e) { + log.error("Redis streamRange error: key={}, range={}", key, range, e); + return Collections.emptyList(); + } + } + + /** + * 读取Stream中所有数据 + * @param key Stream的key + * @return 消息列表 + */ + public List> streamRangeAll(String key) { + return streamRange(key, Range.unbounded()); + } + + /** + * 删除Stream中的指定消息 + * @param key Stream的key + * @param recordIds 消息ID列表 + * @return 删除的数量 + */ + public Long streamDelete(String key, String... recordIds) { + try { + return redisTemplate.opsForStream().delete(key, recordIds); + } catch (Exception e) { + log.error("Redis streamDelete error: key={}", key, e); + return 0L; + } + } + + /** + * 获取Stream的长度 + * @param key Stream的key + * @return 消息数量 + */ + public Long streamSize(String key) { + try { + return redisTemplate.opsForStream().size(key); + } catch (Exception e) { + log.error("Redis streamSize error: key={}", key, e); + return 0L; + } + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/BaseNodeDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/BaseNodeDelegate.java index 047efec0..ac095e4d 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/BaseNodeDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/BaseNodeDelegate.java @@ -5,8 +5,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.qqchen.deploy.backend.framework.utils.JsonUtils; import com.qqchen.deploy.backend.framework.utils.SpelExpressionResolver; import com.qqchen.deploy.backend.workflow.dto.outputs.BaseNodeOutputs; +import com.qqchen.deploy.backend.workflow.enums.LogLevel; +import com.qqchen.deploy.backend.workflow.enums.LogSource; import com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum; import com.qqchen.deploy.backend.workflow.model.NodeContext; +import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService; +import com.qqchen.deploy.backend.workflow.utils.WorkflowUtils; import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; @@ -35,6 +39,8 @@ public abstract class BaseNodeDelegate implements JavaDelegate { @Autowired private ObjectMapper objectMapper; + @Autowired(required = false) + private IWorkflowNodeLogService workflowNodeLogService; // Flowable自动注入的字段 protected Expression nodeId; @@ -50,60 +56,84 @@ public abstract class BaseNodeDelegate implements JavaDelegate { private Class outputsClass; + // 当前执行上下文(用于日志记录) + private String currentProcessInstanceId; + private String currentNodeId; + + // ✅ 预初始化的输出对象(子类可以直接访问和修改) + protected O output; + + // ✅ 当前输入映射对象(用于读取 continueOnFailure 等配置) + protected I currentInputMapping; + @Override public void execute(DelegateExecution execution) { - String currentNodeId = null; NodeContext nodeContext = new NodeContext<>(); Map configsMap = null; I inputMappingObj = null; try { - // 1. 获取节点ID - currentNodeId = getFieldValue(nodeId, execution); + // 1. 获取节点ID和流程实例ID(用于日志记录) + this.currentNodeId = getFieldValue(nodeId, execution); + this.currentProcessInstanceId = execution.getProcessInstanceId(); log.info("Executing node: {}", currentNodeId); // 2. 解析配置(通用Map) configsMap = parseJsonField(configs, execution); - // 3. 解析并转换InputMapping(强类型) + // 3. 自动创建并初始化输出对象为 SUCCESS 状态 + this.output = createSuccessOutputs(configsMap); + + // 4. 解析并转换InputMapping(强类型) inputMappingObj = parseAndConvertInputMapping(execution); + this.currentInputMapping = inputMappingObj; // 保存引用,供子类使用 - // 4. 执行具体的业务逻辑,返回强类型输出 - O outputsObj = executeInternal(execution, configsMap, inputMappingObj); + // 5. 执行具体的业务逻辑(子类直接修改 this.output) + executeInternal(execution, configsMap, inputMappingObj); - // 5. 使用 NodeContext 保存节点数据 + // 6. 使用 NodeContext 保存节点数据 nodeContext.setConfigs(configsMap); nodeContext.setInputMapping(inputMappingObj); - nodeContext.setOutputs(outputsObj); + nodeContext.setOutputs(this.output); execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper)); log.info("Stored NodeContext for: {}", currentNodeId); } catch (Exception e) { - // 即使失败,也保存完整的 NodeContext(包含 configs 和 inputMapping) - BaseNodeOutputs failureNodeOutputs = new BaseNodeOutputs(); - failureNodeOutputs.setStatus(NodeExecutionStatusEnum.FAILURE); - failureNodeOutputs.setMessage(e.getMessage()); - - nodeContext.setConfigs(configsMap); // 保存已解析的配置 - nodeContext.setInputMapping(inputMappingObj); // 保存已解析的输入 - nodeContext.setOutputs((O) failureNodeOutputs); - - execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper)); - log.error("Task execution failed", e); + // ❌ 业务异常:根据 continueOnFailure 配置决定行为 + log.error("Business exception in node: {}", currentNodeId, e); + + boolean continueOnFailure = WorkflowUtils.getContinueOnFailure(currentInputMapping); + if (continueOnFailure) { + // ⚠️ 非阻断模式:标记失败但流程继续 + log.warn("⚠️ Node failed (continue mode enabled by config): {}", e.getMessage()); + markFailure(e); + + // 保存失败状态的 NodeContext + nodeContext.setConfigs(configsMap); + nodeContext.setInputMapping(inputMappingObj); + nodeContext.setOutputs(this.output); + execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper)); + } else { + // ❌ 阻断模式:终止流程(默认行为) + log.error("❌ Node failed (terminate mode, default): {}", e.getMessage()); + terminateWorkflow(e); // 抛出 BpmnError,触发流程终止 + } } } /** * 执行具体的业务逻辑(子类实现) + *

+ * 子类可以直接修改 {@code this.output} 对象来设置输出结果。 + * 输出对象已预初始化为 SUCCESS 状态,失败时调用 {@code markFailure()} 或 {@code terminateWorkflow()}。 * * @param execution Flowable执行上下文 * @param configs 节点配置 * @param inputMapping 输入映射(强类型) - * @return 节点输出结果(强类型) */ - protected abstract O executeInternal( + protected abstract void executeInternal( DelegateExecution execution, Map configs, I inputMapping @@ -226,4 +256,245 @@ public abstract class BaseNodeDelegate implements JavaDelegate { } } + // ===================== 便利方法:流程控制 ===================== + + /** + * 终止流程(致命错误)- 使用自定义错误码 + *

+ * 适用场景:需要指定特定错误码用于边界事件捕获 + * + * @param errorCode 错误码(如 WORKFLOW_EXEC_ERROR) + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorCode, String errorMessage) { + log.error("🛑 Terminating workflow: errorCode={}, message={}", errorCode, errorMessage); + + // 自动记录错误日志到 Redis Stream + logError(errorMessage); + + throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage); + } + + /** + * 终止流程(致命错误)- 使用默认错误码 + *

+ * 适用场景:构建失败、审批拒绝、系统级异常等 + * + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorMessage) { + terminateWorkflow("workflow_exec_error", errorMessage); + } + + /** + * 终止流程(致命错误)- 使用异常对象 + *

+ * 适用场景:捕获异常后直接终止流程 + * + * @param exception 异常对象 + */ + protected void terminateWorkflow(Exception exception) { + String errorMessage = exception.getMessage() != null + ? exception.getMessage() + : exception.getClass().getSimpleName(); + terminateWorkflow(errorMessage); + } + + /** + * 标记当前节点为失败状态(非致命错误,流程继续) + *

+ * 直接修改 {@code this.output} 对象的状态为 FAILURE,并自动记录 WARN 日志到 Redis Stream。 + *

+ * 适用场景:通知发送失败、可选步骤失败、日志记录失败等 + * + * @param errorMessage 错误描述 + */ + protected void markFailure(String errorMessage) { + log.warn("⚠️ Node failed but workflow continues: {}", errorMessage); + + // 自动记录警告日志到 Redis Stream + logWarn(errorMessage); + + // 设置失败状态 + if (output instanceof BaseNodeOutputs) { + ((BaseNodeOutputs) output).setStatus(NodeExecutionStatusEnum.FAILURE); + ((BaseNodeOutputs) output).setMessage(errorMessage); + } + } + + /** + * 标记当前节点为失败状态(非致命错误,流程继续)- 使用异常对象 + *

+ * 适用场景:捕获异常后继续流程,但标记为失败 + * + * @param exception 异常对象 + */ + protected void markFailure(Exception exception) { + String errorMessage = exception.getMessage() != null + ? exception.getMessage() + : exception.getClass().getSimpleName(); + markFailure(errorMessage); + } + + + + /** + * 创建失败状态的输出对象(非致命错误,流程继续) + *

+ * ⚠️ 已废弃:请使用 {@link #markFailure(String)} 代替 + *

+ * 适用场景:通知发送失败、可选步骤失败、日志记录失败等 + * + * @param errorMessage 错误描述 + * @return 失败状态的输出对象(类型为当前节点的输出类型 O) + * @deprecated 使用 {@link #markFailure(String)} 代替 + */ + @Deprecated + protected O createFailureOutputs(String errorMessage) { + log.warn("⚠️ Node failed but workflow continues: {}", errorMessage); + + // 自动记录警告日志到 Redis Stream + logWarn(errorMessage); + + try { + // 通过反射创建正确类型的输出对象 + Class outputClass = getOutputsClass(); + O outputs = outputClass.getDeclaredConstructor().newInstance(); + + // 设置失败状态(假设所有输出都继承自 BaseNodeOutputs) + if (outputs instanceof BaseNodeOutputs) { + ((BaseNodeOutputs) outputs).setStatus(NodeExecutionStatusEnum.FAILURE); + ((BaseNodeOutputs) outputs).setMessage(errorMessage); + } + + return outputs; + + } catch (Exception e) { + // 降级方案:如果反射失败,返回 BaseNodeOutputs + log.error("Failed to create failure outputs, using fallback BaseNodeOutputs", e); + BaseNodeOutputs fallback = new BaseNodeOutputs(); + fallback.setStatus(NodeExecutionStatusEnum.FAILURE); + fallback.setMessage(errorMessage); + return (O) fallback; + } + } + + /** + * 创建失败状态的输出对象(非致命错误,流程继续)- 使用异常对象 + *

+ * ⚠️ 已废弃:请使用 {@link #markFailure(Exception)} 代替 + *

+ * 适用场景:捕获异常后继续流程,但标记为失败 + * + * @param exception 异常对象 + * @return 失败状态的输出对象 + * @deprecated 使用 {@link #markFailure(Exception)} 代替 + */ + @Deprecated + protected O createFailureOutputs(Exception exception) { + String errorMessage = exception.getMessage() != null + ? exception.getMessage() + : exception.getClass().getSimpleName(); + return createFailureOutputs(errorMessage); + } + + /** + * 创建成功状态的输出对象(便利方法) + *

+ * 自动设置状态为 SUCCESS,并根据节点名称生成默认消息:"{节点名称}执行成功" + * + * @param configsMap 节点配置(用于提取节点名称) + * @return 成功状态的输出对象(类型为当前节点的输出类型 O) + */ + protected O createSuccessOutputs(Map configsMap) { + try { + Class outputClass = getOutputsClass(); + O outputs = outputClass.getDeclaredConstructor().newInstance(); + + if (outputs instanceof BaseNodeOutputs) { + ((BaseNodeOutputs) outputs).setStatus(NodeExecutionStatusEnum.SUCCESS); + + // ✅ 自动生成默认成功消息 + String nodeName = WorkflowUtils.extractNodeName(configsMap); + ((BaseNodeOutputs) outputs).setMessage(nodeName + "执行成功"); + } + + return outputs; + + } catch (Exception e) { + log.error("Failed to create success outputs, using fallback BaseNodeOutputs", e); + BaseNodeOutputs fallback = new BaseNodeOutputs(); + fallback.setStatus(NodeExecutionStatusEnum.SUCCESS); + fallback.setMessage("节点执行成功"); + return (O) fallback; + } + } + + + // ===================== 日志记录辅助方法 ===================== + + /** + * 记录错误日志到 Redis Stream + * + * @param message 错误信息 + */ + protected void logError(String message) { + if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) { + try { + workflowNodeLogService.log( + currentProcessInstanceId, + currentNodeId, + LogSource.FLOWABLE, + LogLevel.ERROR, + message + ); + } catch (Exception e) { + // 日志记录失败不影响主流程 + log.error("Failed to log to Redis Stream: {}", message, e); + } + } + } + + /** + * 记录警告日志到 Redis Stream + * + * @param message 警告信息 + */ + protected void logWarn(String message) { + if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) { + try { + workflowNodeLogService.log( + currentProcessInstanceId, + currentNodeId, + LogSource.FLOWABLE, + LogLevel.WARN, + message + ); + } catch (Exception e) { + log.warn("Failed to log to Redis Stream: {}", message, e); + } + } + } + + /** + * 记录信息日志到 Redis Stream + * + * @param message 信息内容 + */ + protected void logInfo(String message) { + if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) { + try { + workflowNodeLogService.log( + currentProcessInstanceId, + currentNodeId, + LogSource.FLOWABLE, + LogLevel.INFO, + message + ); + } catch (Exception e) { + log.debug("Failed to log to Redis Stream: {}", message, e); + } + } + } + } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/JenkinsBuildDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/JenkinsBuildDelegate.java index 4b802037..22dce399 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/JenkinsBuildDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/JenkinsBuildDelegate.java @@ -58,11 +58,12 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate configs, JenkinsBuildInputMapping input) { + protected void executeInternal(DelegateExecution execution, Map configs, JenkinsBuildInputMapping input) { log.info("Jenkins Build - serverId: {}, jobName: {}", input.getServerId(), input.getJobName()); - // 1. 获取外部系统 - ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId()).orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId())); + // 1. 获取外部系统(不存在会抛出异常,由基类处理) + ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId()) + .orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId())); String jobName = input.getJobName(); @@ -95,19 +96,16 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate buildInfo.getBuildUrl() + "artifact/" + artifact.getRelativePath()) .collect(java.util.stream.Collectors.joining(",")); - outputs.setArtifactUrl(artifactUrls); + output.setArtifactUrl(artifactUrls); } else { log.warn("No artifacts found in build details"); - outputs.setArtifactUrl(""); + output.setArtifactUrl(""); } // 记录完成日志 workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "Jenkins 构建任务执行完成"); - return outputs; + // ✅ 不需要 return,status 已经是 SUCCESS } private JenkinsQueueBuildInfoResponse waitForBuildToStart(ExternalSystem externalSystem, String queueId) { @@ -158,11 +156,11 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate configs, NotificationInputMapping input) { - log.info("Sending notification - channel: {}, title: {}, content: {}", input.getChannelId(), input.getTitle(), input.getContent()); - - try { - // 使用通知服务发送消息 - Long channelId = input.getChannelId() != null ? input.getChannelId() : null; - - notificationSendService.send(channelId, input.getTitle(), input.getContent()); - - // 返回成功结果 - NotificationOutputs outputs = new NotificationOutputs(); - outputs.setStatus(NodeExecutionStatusEnum.SUCCESS); - outputs.setMessage("通知发送成功"); - return outputs; - - } catch (Exception e) { - log.error("Failed to send notification", e); - NotificationOutputs outputs = new NotificationOutputs(); - outputs.setStatus(NodeExecutionStatusEnum.FAILURE); - outputs.setMessage("通知发送失败: " + e.getMessage()); - return outputs; + protected void executeInternal(DelegateExecution execution, Map configs, NotificationInputMapping input) { + if (input.getChannelId() == null || StringUtils.isEmpty(input.getTitle()) || StringUtils.isEmpty(input.getContent())) { + logError(String.format("Notification delegate parameter verification failed %s %s %s", input.getChannelId(), input.getTitle(), input.getContent())); + return; } + notificationSendService.send(input.getChannelId(), input.getTitle(), input.getContent()); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellNodeDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellNodeDelegate.java index 56fc7af2..9bb629ef 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellNodeDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellNodeDelegate.java @@ -1,10 +1,8 @@ package com.qqchen.deploy.backend.workflow.delegate; -import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants; import com.qqchen.deploy.backend.workflow.dto.inputmapping.ShellInputMapping; import com.qqchen.deploy.backend.workflow.dto.outputs.ShellOutputs; import lombok.extern.slf4j.Slf4j; -import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.stereotype.Component; @@ -21,25 +19,25 @@ import java.util.Map; public class ShellNodeDelegate extends BaseNodeDelegate { @Override - protected ShellOutputs executeInternal( + protected void executeInternal( DelegateExecution execution, Map configs, ShellInputMapping input ) { if (input.getScript() == null || input.getScript().isEmpty()) { - throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, - "Script is required but not provided"); + // ❌ 致命错误:脚本为空 + terminateWorkflow("Script is required but not provided"); } log.info("Executing shell script: {}", input.getScript()); // TODO: 实现Shell脚本执行逻辑 // 目前先返回模拟结果 - ShellOutputs outputs = new ShellOutputs(); - outputs.setExitCode(0); - outputs.setStdout("Shell execution completed (mocked)"); - outputs.setStderr(""); + // ✅ 直接修改预初始化的 output 对象 + output.setExitCode(0); + output.setStdout("Shell execution completed (mocked)"); + output.setStderr(""); - return outputs; + // ✅ 不需要 return,status 已经是 SUCCESS } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ApprovalInputMapping.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ApprovalInputMapping.java index c0a83b08..6c9f007b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ApprovalInputMapping.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ApprovalInputMapping.java @@ -10,13 +10,13 @@ import java.util.List; /** * 审批节点输入映射 - * + * * @author qqchen * @date 2025-10-23 */ @Data @JsonIgnoreProperties(ignoreUnknown = true) -public class ApprovalInputMapping { +public class ApprovalInputMapping extends BaseNodeInputMapping { /** * 审批模式 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/BaseNodeInputMapping.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/BaseNodeInputMapping.java new file mode 100644 index 00000000..97974b4c --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/BaseNodeInputMapping.java @@ -0,0 +1,42 @@ +package com.qqchen.deploy.backend.workflow.dto.inputmapping; + +import lombok.Data; + +/** + * 节点输入映射基类 + *

+ * 所有节点的 InputMapping 都应该继承此类,提供通用的配置字段 + * + * @author qqchen + * @since 2025-11-07 + */ +@Data +public class BaseNodeInputMapping { + + /** + * 失败时是否继续执行(非阻断模式) + *

+ * - true: 节点失败时标记为 FAILURE,但流程继续执行后续节点 + * - false: 节点失败时抛出 BpmnError,终止流程(默认) + *

+ * ⚠️ 注意: + *

    + *
  • 仅对可恢复的业务错误生效(如通知发送失败、可选步骤失败)
  • + *
  • 致命错误无效(如配置缺失、系统异常),这些错误会直接终止流程
  • + *
  • 默认值为 false(阻断模式),确保流程安全
  • + *
+ *

+ * 使用示例: + *

+     * // 前端配置(通知节点可以失败后继续)
+     * {
+     *   "title": "部署通知",
+     *   "content": "...",
+     *   "channelId": 123,
+     *   "continueOnFailure": true  // 通知失败不影响流程
+     * }
+     * 
+ */ + private Boolean continueOnFailure = false; + +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/JenkinsBuildInputMapping.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/JenkinsBuildInputMapping.java index 506a829b..a9c44f8b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/JenkinsBuildInputMapping.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/JenkinsBuildInputMapping.java @@ -13,7 +13,7 @@ import jakarta.validation.constraints.NotBlank; */ @Data @JsonIgnoreProperties(ignoreUnknown = true) -public class JenkinsBuildInputMapping { +public class JenkinsBuildInputMapping extends BaseNodeInputMapping { /** * Jenkins服务器ID diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/NotificationInputMapping.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/NotificationInputMapping.java index 2d31bd30..7b9c2e3d 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/NotificationInputMapping.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/NotificationInputMapping.java @@ -12,7 +12,7 @@ import jakarta.validation.constraints.NotNull; */ @Data @JsonIgnoreProperties(ignoreUnknown = true) -public class NotificationInputMapping { +public class NotificationInputMapping extends BaseNodeInputMapping { /** * 通知渠道ID diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ShellInputMapping.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ShellInputMapping.java index 5ff74e96..e975a408 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ShellInputMapping.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/inputmapping/ShellInputMapping.java @@ -13,18 +13,18 @@ import java.util.Map; */ @Data @JsonIgnoreProperties(ignoreUnknown = true) -public class ShellInputMapping { - +public class ShellInputMapping extends BaseNodeInputMapping { + /** * Shell脚本内容 */ private String script; - + /** * 工作目录 */ private String workDir; - + /** * 环境变量 */ diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/ApprovalEndExecutionListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/ApprovalEndExecutionListener.java index 0b000e97..4cf5dee2 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/ApprovalEndExecutionListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/ApprovalEndExecutionListener.java @@ -10,6 +10,7 @@ import com.qqchen.deploy.backend.workflow.model.NodeContext; import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; import com.qqchen.deploy.backend.workflow.repository.IWorkflowCategoryRepository; +import com.qqchen.deploy.backend.workflow.utils.WorkflowUtils; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.flowable.engine.HistoryService; @@ -36,7 +37,7 @@ import java.util.Map; */ @Slf4j @Component("approvalEndExecutionListener") -public class ApprovalEndExecutionListener implements ExecutionListener { +public class ApprovalEndExecutionListener extends BaseExecutionListener { @Resource private ObjectMapper objectMapper; @@ -59,12 +60,13 @@ public class ApprovalEndExecutionListener implements ExecutionListener { @Override public void notify(DelegateExecution execution) { String nodeId = execution.getCurrentActivityId(); + NodeContext nodeContext = null; + try { log.info("ApprovalExecutionListener: Building outputs for node: {}", nodeId); // 1. 读取 NodeContext(统一使用 NodeContext,与 BaseNodeDelegate 保持一致) - NodeContext nodeContext = - readNodeContext(execution, nodeId); + nodeContext = WorkflowUtils.readNodeContext(execution, nodeId, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper); if (nodeContext == null) { return; } @@ -81,7 +83,7 @@ public class ApprovalEndExecutionListener implements ExecutionListener { // 4. 更新并保存 NodeContext(与 BaseNodeDelegate 保持一致) nodeContext.setOutputs(outputs); - saveNodeContext(execution, nodeId, nodeContext); + WorkflowUtils.saveNodeContext(execution, nodeId, nodeContext, objectMapper); log.info("Stored approval outputs for node: {}, result: {}", nodeId, outputs.getApprovalResult()); @@ -91,47 +93,23 @@ public class ApprovalEndExecutionListener implements ExecutionListener { } catch (Exception e) { log.error("Failed to build approval outputs for node: {}", nodeId, e); - // 异常处理:统一使用 NodeContext 设置失败状态(与 BaseNodeDelegate 保持一致) - handleFailure(execution, nodeId, e); + // ❌ 根据 continueOnFailure 配置决定行为 + boolean continueOnFailure = WorkflowUtils.getContinueOnFailure( + nodeContext != null ? nodeContext.getInputMapping() : null + ); - throw new RuntimeException("Failed to build approval outputs: " + nodeId, e); - } - } - - /** - * 读取 NodeContext(与 BaseNodeDelegate 的模式保持一致) - */ - private NodeContext readNodeContext( - DelegateExecution execution, String nodeId) { - try { - Object nodeDataObj = execution.getVariable(nodeId); - if (!(nodeDataObj instanceof Map)) { - log.warn("NodeContext not found for node: {}, skipping ApprovalExecutionListener", nodeId); - return null; + if (continueOnFailure) { + // ⚠️ 非阻断模式:记录失败但流程继续 + log.warn("⚠️ Approval listener failed (continue mode enabled by config): {}", e.getMessage()); + handleFailure(execution, nodeId, e); + } else { + // ❌ 阻断模式:终止流程(默认行为) + log.error("❌ Approval listener failed (terminate mode, default): {}", e.getMessage()); + terminateWorkflow(e); } - - @SuppressWarnings("unchecked") - Map nodeDataMap = (Map) nodeDataObj; - return NodeContext.fromMap(nodeDataMap, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper); - } catch (Exception e) { - log.error("Failed to read NodeContext for node: {}", nodeId, e); - return null; } } - /** - * 保存 NodeContext(与 BaseNodeDelegate 的模式保持一致) - */ - private void saveNodeContext(DelegateExecution execution, String nodeId, - NodeContext nodeContext) { - try { - execution.setVariable(nodeId, nodeContext.toMap(objectMapper)); - log.debug("Saved NodeContext for node: {}", nodeId); - } catch (Exception e) { - log.error("Failed to save NodeContext for node: {}", nodeId, e); - throw new RuntimeException("Failed to save NodeContext: " + nodeId, e); - } - } /** * 自动装配 ApprovalOutputs(丰富 outputs 的上下文信息) @@ -155,7 +133,8 @@ public class ApprovalEndExecutionListener implements ExecutionListener { */ private void handleFailure(DelegateExecution execution, String nodeId, Exception e) { try { - NodeContext nodeContext = readNodeContext(execution, nodeId); + NodeContext nodeContext = + WorkflowUtils.readNodeContext(execution, nodeId, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper); if (nodeContext == null) { // 如果无法读取 NodeContext,创建新的 nodeContext = new NodeContext<>(); @@ -170,7 +149,7 @@ public class ApprovalEndExecutionListener implements ExecutionListener { failureOutputs.setMessage("审批节点执行异常: " + e.getMessage()); nodeContext.setOutputs(failureOutputs); - saveNodeContext(execution, nodeId, nodeContext); + WorkflowUtils.saveNodeContext(execution, nodeId, nodeContext, objectMapper); } catch (Exception ex) { log.error("Failed to set error status for node: {}", nodeId, ex); } @@ -357,5 +336,6 @@ public class ApprovalEndExecutionListener implements ExecutionListener { if (date == null) return null; return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); } + } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseExecutionListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseExecutionListener.java new file mode 100644 index 00000000..e81e8154 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseExecutionListener.java @@ -0,0 +1,56 @@ +package com.qqchen.deploy.backend.workflow.listener; + +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.ExecutionListener; + +/** + * 执行监听器基类 + * 提供统一的流程控制方法 + * + * @author qqchen + * @since 2025-11-07 + */ +@Slf4j +public abstract class BaseExecutionListener implements ExecutionListener { + + // ===================== 便利方法:流程控制 ===================== + + /** + * 终止流程(致命错误)- 使用自定义错误码 + *

+ * 适用场景:需要指定特定错误码用于边界事件捕获 + * + * @param errorCode 错误码(如 workflow_exec_error) + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorCode, String errorMessage) { + log.error("🛑 Listener terminating workflow: errorCode={}, message={}", errorCode, errorMessage); + throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage); + } + + /** + * 终止流程(致命错误)- 使用默认错误码 + *

+ * 适用场景:监听器中发生致命错误,需要立即终止流程 + * + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorMessage) { + terminateWorkflow("workflow_exec_error", errorMessage); + } + + /** + * 终止流程(致命错误)- 使用异常对象 + *

+ * 适用场景:捕获异常后直接终止流程 + * + * @param exception 异常对象 + */ + protected void terminateWorkflow(Exception exception) { + String errorMessage = exception.getMessage() != null + ? exception.getMessage() + : exception.getClass().getSimpleName(); + terminateWorkflow(errorMessage); + } +} + diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseTaskListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseTaskListener.java index de4f4808..f8721d7a 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseTaskListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/BaseTaskListener.java @@ -65,9 +65,16 @@ public abstract class BaseTaskListener implements TaskListener { log.info("Task configuration completed for node: {}", currentNodeId); + } catch (org.flowable.engine.delegate.BpmnError e) { + // ✅ BpmnError 应该向上传播,触发边界事件终止流程 + log.error("BpmnError occurred in task listener: nodeId={}, error={}", currentNodeId, e.getMessage()); + throw e; + } catch (Exception e) { log.error("Failed to configure task for node: {}", currentNodeId, e); - throw new RuntimeException("Failed to configure task: " + currentNodeId, e); + // ⚠️ TaskListener 异常:根据业务需求决定是否终止流程 + // 默认:记录日志但不终止流程 + log.warn("Task listener failed, but task will still be created"); } } @@ -139,5 +146,45 @@ public abstract class BaseTaskListener implements TaskListener { return new HashMap<>(); } } + + // ===================== 便利方法:流程控制 ===================== + + /** + * 终止流程(致命错误)- 使用自定义错误码 + *

+ * 适用场景:任务创建阶段发生致命错误,需要立即终止流程 + * + * @param errorCode 错误码(如 workflow_exec_error) + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorCode, String errorMessage) { + log.error("🛑 TaskListener terminating workflow: errorCode={}, message={}", errorCode, errorMessage); + throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage); + } + + /** + * 终止流程(致命错误)- 使用默认错误码 + *

+ * 适用场景:任务创建阶段发生致命错误 + * + * @param errorMessage 错误描述 + */ + protected void terminateWorkflow(String errorMessage) { + terminateWorkflow("workflow_exec_error", errorMessage); + } + + /** + * 终止流程(致命错误)- 使用异常对象 + *

+ * 适用场景:捕获异常后直接终止流程 + * + * @param exception 异常对象 + */ + protected void terminateWorkflow(Exception exception) { + String errorMessage = exception.getMessage() != null + ? exception.getMessage() + : exception.getClass().getSimpleName(); + terminateWorkflow(errorMessage); + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeLogService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeLogService.java index 502fc1be..d1d8c3f8 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeLogService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeLogService.java @@ -52,11 +52,6 @@ public interface IWorkflowNodeLogService { */ Page getNodeLogs(String processInstanceId, String nodeId, Pageable pageable); - /** - * 查询流程实例的所有日志 - */ - List getProcessInstanceLogs(String processInstanceId); - /** * 删除节点日志 */ diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeLogServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeLogServiceImpl.java index e3b8ce04..4dab30a8 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeLogServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeLogServiceImpl.java @@ -1,24 +1,24 @@ package com.qqchen.deploy.backend.workflow.service.impl; +import com.qqchen.deploy.backend.framework.utils.RedisUtil; import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeLog; import com.qqchen.deploy.backend.workflow.enums.LogLevel; import com.qqchen.deploy.backend.workflow.enums.LogSource; -import com.qqchen.deploy.backend.workflow.repository.IWorkflowNodeLogRepository; import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; /** - * 工作流节点日志服务实现 - * 使用 Flowable 的 processInstanceId + nodeId 作为日志关联键 + * 工作流节点日志服务实现 - 基于 Redis Stream + * 日志保存在 Redis Stream 中,保留 7 天后自动删除 * * @author qqchen * @since 2025-11-03 @@ -28,71 +28,80 @@ import java.util.List; public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService { @Resource - private IWorkflowNodeLogRepository logRepository; - - @Resource - private RedisTemplate redisTemplate; + private RedisUtil redisUtil; /** - * 生成日志序列号(使用 Redis INCR 保证全局递增) + * 日志流保留时间:7天 */ - private Long generateSequenceId(String processInstanceId, String nodeId) { - String key = "workflow:node:log:seq:" + processInstanceId + ":" + nodeId; - return redisTemplate.opsForValue().increment(key, 1); + private static final int LOG_STREAM_TTL_DAYS = 7; + + /** + * 每个流最多保留的日志条数(防止单个流过大) + */ + private static final int LOG_STREAM_MAX_LEN = 50000; + + /** + * 生成 Redis Stream Key + * 格式: workflow:log:stream:{processInstanceId}:{nodeId} + */ + private String buildStreamKey(String processInstanceId, String nodeId) { + return String.format("workflow:log:stream:%s:%s", processInstanceId, nodeId); } @Override - @Transactional public void log(String processInstanceId, String nodeId, LogSource source, LogLevel level, String message) { try { - Long sequenceId = generateSequenceId(processInstanceId, nodeId); + String streamKey = buildStreamKey(processInstanceId, nodeId); - WorkflowNodeLog log = new WorkflowNodeLog(); - log.setProcessInstanceId(processInstanceId); - log.setNodeId(nodeId); - log.setSequenceId(sequenceId); - log.setTimestamp(System.currentTimeMillis()); - log.setSource(source); - log.setLevel(level); - log.setMessage(message); + // 构建日志条目 + Map logEntry = new HashMap<>(); + logEntry.put("timestamp", String.valueOf(System.currentTimeMillis())); + logEntry.put("level", level.name()); + logEntry.put("source", source.name()); + logEntry.put("message", message); - logRepository.save(log); + // 写入 Redis Stream + redisUtil.streamAdd(streamKey, logEntry); + + // 限制流长度(保留最新的日志) + redisUtil.streamTrim(streamKey, LOG_STREAM_MAX_LEN); + + // 设置过期时间:7天(转换为秒) + redisUtil.expire(streamKey, LOG_STREAM_TTL_DAYS * 24 * 60 * 60); } catch (Exception e) { - log.error("Failed to save workflow node log: processInstanceId={}, nodeId={}, source={}, level={}", - processInstanceId, nodeId, source, level, e); + // ❌ Redis 失败不影响业务流程 + log.error("Failed to save workflow log to Redis (non-blocking): processInstanceId={}, nodeId={}", + processInstanceId, nodeId, e); } } @Override - @Transactional public void batchLog(String processInstanceId, String nodeId, LogSource source, LogLevel level, List messages) { if (messages == null || messages.isEmpty()) { return; } try { - List logs = new ArrayList<>(messages.size()); + String streamKey = buildStreamKey(processInstanceId, nodeId); + // 批量添加到 Stream for (String message : messages) { - Long sequenceId = generateSequenceId(processInstanceId, nodeId); + Map logEntry = new HashMap<>(); + logEntry.put("timestamp", String.valueOf(System.currentTimeMillis())); + logEntry.put("level", level.name()); + logEntry.put("source", source.name()); + logEntry.put("message", message); - WorkflowNodeLog log = new WorkflowNodeLog(); - log.setProcessInstanceId(processInstanceId); - log.setNodeId(nodeId); - log.setSequenceId(sequenceId); - log.setTimestamp(System.currentTimeMillis()); - log.setSource(source); - log.setLevel(level); - log.setMessage(message); - - logs.add(log); + redisUtil.streamAdd(streamKey, logEntry); } - logRepository.saveAll(logs); + // 限制流长度并设置过期(转换为秒) + redisUtil.streamTrim(streamKey, LOG_STREAM_MAX_LEN); + redisUtil.expire(streamKey, LOG_STREAM_TTL_DAYS * 24 * 60 * 60); } catch (Exception e) { - log.error("Failed to batch save workflow node logs: processInstanceId={}, nodeId={}, count={}", + log.error("Failed to batch save workflow logs to Redis (non-blocking): processInstanceId={}, nodeId={}, count={}", processInstanceId, nodeId, messages.size(), e); } } @@ -114,23 +123,82 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService { @Override public List getNodeLogs(String processInstanceId, String nodeId) { - return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId); + try { + String streamKey = buildStreamKey(processInstanceId, nodeId); + + // 检查流是否存在 + if (!redisUtil.hasKey(streamKey)) { + log.warn("日志流不存在,可能已过期: processInstanceId={}, nodeId={}", processInstanceId, nodeId); + return Collections.emptyList(); + } + + // 从 Stream 读取所有日志 (从头到尾) + List> records = redisUtil.streamRangeAll(streamKey); + + if (records == null || records.isEmpty()) { + return Collections.emptyList(); + } + + // 转换为实体 + return records.stream() + .map(record -> convertToWorkflowNodeLog(processInstanceId, nodeId, record)) + .collect(Collectors.toList()); + + } catch (Exception e) { + log.error("Failed to get workflow logs from Redis: processInstanceId={}, nodeId={}", + processInstanceId, nodeId, e); + return Collections.emptyList(); + } } @Override public Page getNodeLogs(String processInstanceId, String nodeId, Pageable pageable) { - return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId, pageable); + List allLogs = getNodeLogs(processInstanceId, nodeId); + + int start = (int) pageable.getOffset(); + int end = Math.min(start + pageable.getPageSize(), allLogs.size()); + + if (start >= allLogs.size()) { + return new PageImpl<>(Collections.emptyList(), pageable, allLogs.size()); + } + + List pageContent = allLogs.subList(start, end); + return new PageImpl<>(pageContent, pageable, allLogs.size()); } @Override - public List getProcessInstanceLogs(String processInstanceId) { - return logRepository.findByProcessInstanceIdOrderBySequenceIdAsc(processInstanceId); - } - - @Override - @Transactional public void deleteNodeLogs(String processInstanceId, String nodeId) { - logRepository.deleteByProcessInstanceIdAndNodeId(processInstanceId, nodeId); + try { + String streamKey = buildStreamKey(processInstanceId, nodeId); + redisUtil.del(streamKey); + log.info("删除节点日志流: processInstanceId={}, nodeId={}", processInstanceId, nodeId); + } catch (Exception e) { + log.error("Failed to delete workflow logs from Redis: processInstanceId={}, nodeId={}", + processInstanceId, nodeId, e); + } + } + + /** + * 将 Redis Stream Record 转换为 WorkflowNodeLog 实体 + */ + private WorkflowNodeLog convertToWorkflowNodeLog(String processInstanceId, String nodeId, + MapRecord record) { + WorkflowNodeLog log = new WorkflowNodeLog(); + log.setProcessInstanceId(processInstanceId); + log.setNodeId(nodeId); + + Map value = record.getValue(); + log.setTimestamp(Long.parseLong(value.get("timestamp").toString())); + log.setLevel(LogLevel.valueOf(value.get("level").toString())); + log.setSource(LogSource.valueOf(value.get("source").toString())); + log.setMessage(value.get("message").toString()); + + // Redis Stream ID 的第一部分作为序列号 (毫秒时间戳) + String streamId = record.getId().getValue(); + log.setSequenceId(Long.parseLong(streamId.split("-")[0])); + + return log; } } + diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/utils/WorkflowUtils.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/utils/WorkflowUtils.java new file mode 100644 index 00000000..fd4a08f3 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/utils/WorkflowUtils.java @@ -0,0 +1,191 @@ +package com.qqchen.deploy.backend.workflow.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.workflow.dto.inputmapping.BaseNodeInputMapping; +import com.qqchen.deploy.backend.workflow.model.NodeContext; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.delegate.DelegateExecution; + +import java.util.Map; + +/** + * 工作流工具类 + *

+ * 提供工作流相关的通用工具方法 + * + * @author qqchen + * @since 2025-11-07 + */ +@Slf4j +public class WorkflowUtils { + + private WorkflowUtils() { + // 工具类,禁止实例化 + } + + /** + * 从 InputMapping 获取 continueOnFailure 配置 + *

+ * 用于判断节点失败后是否继续执行流程: + *

    + *
  • true: 节点失败时标记为 FAILURE,但流程继续执行
  • + *
  • false: 节点失败时抛出 BpmnError,终止流程(默认)
  • + *
+ * + * @param inputMapping 输入映射对象(必须继承自 BaseNodeInputMapping) + * @return true: 失败后继续执行, false: 失败后终止流程(默认) + */ + public static boolean getContinueOnFailure(Object inputMapping) { + if (inputMapping == null) { + return false; // 默认阻断模式 + } + + try { + // 所有 InputMapping 都继承自 BaseNodeInputMapping + if (inputMapping instanceof BaseNodeInputMapping) { + Boolean continueOnFailure = ((BaseNodeInputMapping) inputMapping).getContinueOnFailure(); + return Boolean.TRUE.equals(continueOnFailure); + } + } catch (Exception e) { + log.warn("Failed to get continueOnFailure config, using default (false)", e); + } + + return false; // 默认阻断模式 + } + + /** + * 从 InputMapping 获取 continueOnFailure 配置(带默认值) + * + * @param inputMapping 输入映射对象 + * @param defaultValue 默认值 + * @return continueOnFailure 配置值 + */ + public static boolean getContinueOnFailure(Object inputMapping, boolean defaultValue) { + if (inputMapping == null) { + return defaultValue; + } + + try { + if (inputMapping instanceof BaseNodeInputMapping) { + Boolean continueOnFailure = ((BaseNodeInputMapping) inputMapping).getContinueOnFailure(); + if (continueOnFailure != null) { + return continueOnFailure; + } + } + } catch (Exception e) { + log.warn("Failed to get continueOnFailure config, using default ({})", defaultValue, e); + } + + return defaultValue; + } + + // ===================== 节点配置相关 ===================== + + /** + * 从配置中提取节点名称 + *

+ * 优先级:nodeName → nodeCode → "节点"(默认) + * + * @param configsMap 节点配置 + * @return 节点名称,默认返回"节点" + */ + public static String extractNodeName(Map configsMap) { + if (configsMap == null) { + return "节点"; + } + + // 1. 优先使用 nodeName + Object nodeName = configsMap.get("nodeName"); + if (nodeName != null && !nodeName.toString().trim().isEmpty()) { + return nodeName.toString().trim(); + } + + // 2. 降级:使用 nodeCode + Object nodeCode = configsMap.get("nodeCode"); + if (nodeCode != null && !nodeCode.toString().trim().isEmpty()) { + return nodeCode.toString().trim(); + } + + // 3. 默认值 + return "节点"; + } + + // ===================== NodeContext 操作相关 ===================== + + /** + * 从 Flowable 执行上下文读取 NodeContext + *

+ * 统一的 NodeContext 读取方法,供 Delegate 和 Listener 共享 + * + * @param execution Flowable 执行上下文 + * @param nodeId 节点ID + * @param inputClass 输入映射类型 + * @param outputClass 输出映射类型 + * @param objectMapper JSON 序列化工具 + * @param 输入映射泛型 + * @param 输出映射泛型 + * @return NodeContext 对象,解析失败返回 null + */ + public static NodeContext readNodeContext( + DelegateExecution execution, + String nodeId, + Class inputClass, + Class outputClass, + ObjectMapper objectMapper) { + + if (execution == null || nodeId == null || nodeId.trim().isEmpty()) { + log.warn("Invalid parameters for readNodeContext: execution={}, nodeId={}", execution, nodeId); + return null; + } + + try { + Object nodeDataObj = execution.getVariable(nodeId); + if (!(nodeDataObj instanceof Map)) { + log.debug("NodeContext not found or invalid type for node: {}", nodeId); + return null; + } + + @SuppressWarnings("unchecked") + Map nodeDataMap = (Map) nodeDataObj; + return NodeContext.fromMap(nodeDataMap, inputClass, outputClass, objectMapper); + + } catch (Exception e) { + log.error("Failed to read NodeContext for node: {}", nodeId, e); + return null; + } + } + + /** + * 保存 NodeContext 到 Flowable 执行上下文 + *

+ * 统一的 NodeContext 保存方法,供 Delegate 和 Listener 共享 + * + * @param execution Flowable 执行上下文 + * @param nodeId 节点ID + * @param nodeContext NodeContext 对象 + * @param objectMapper JSON 序列化工具 + * @param 输入映射泛型 + * @param 输出映射泛型 + * @throws RuntimeException 保存失败时抛出 + */ + public static void saveNodeContext( + DelegateExecution execution, + String nodeId, + NodeContext nodeContext, + ObjectMapper objectMapper) { + + if (execution == null || nodeId == null || nodeId.trim().isEmpty() || nodeContext == null) { + throw new IllegalArgumentException("Invalid parameters for saveNodeContext"); + } + + try { + execution.setVariable(nodeId, nodeContext.toMap(objectMapper)); + log.debug("Saved NodeContext for node: {}", nodeId); + + } catch (Exception e) { + log.error("Failed to save NodeContext for node: {}", nodeId, e); + throw new RuntimeException("Failed to save NodeContext: " + nodeId, e); + } + } +} + diff --git a/frontend/src/pages/Dashboard/components/DeployNodeLogDialog.tsx b/frontend/src/pages/Dashboard/components/DeployNodeLogDialog.tsx new file mode 100644 index 00000000..f04a363b --- /dev/null +++ b/frontend/src/pages/Dashboard/components/DeployNodeLogDialog.tsx @@ -0,0 +1,206 @@ +import React, { useEffect, useState, useRef } from 'react'; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, + DialogBody, + DialogFooter, +} from '@/components/ui/dialog'; +import { Button } from '@/components/ui/button'; +import { ScrollArea } from '@/components/ui/scroll-area'; +import { Loader2, AlertCircle, Clock, FileText, RefreshCw } from 'lucide-react'; +import { cn } from '@/lib/utils'; +import { getDeployNodeLogs } from '../service'; +import type { DeployNodeLogDTO, LogLevel } from '../types'; +import dayjs from 'dayjs'; + +interface DeployNodeLogDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + processInstanceId: string; + nodeId: string; + nodeName?: string; +} + +const getLevelClass = (level: LogLevel): string => { + const levelMap: Record = { + INFO: 'text-blue-600', + WARN: 'text-yellow-600', + ERROR: 'text-red-600', + }; + return levelMap[level] || 'text-gray-600'; +}; + +const DeployNodeLogDialog: React.FC = ({ + open, + onOpenChange, + processInstanceId, + nodeId, + nodeName, +}) => { + const [loading, setLoading] = useState(false); + const [logData, setLogData] = useState(null); + const scrollAreaRef = useRef(null); + const intervalRef = useRef(null); + + const fetchLogs = async () => { + if (!processInstanceId || !nodeId) return; + + setLoading(true); + try { + const response = await getDeployNodeLogs(processInstanceId, nodeId); + setLogData(response); + + // 如果日志未过期且有新日志,滚动到底部 + if (response && !response.expired && response.logs.length > 0) { + setTimeout(() => { + if (scrollAreaRef.current) { + const viewport = scrollAreaRef.current.querySelector('[data-radix-scroll-area-viewport]'); + if (viewport) { + viewport.scrollTop = viewport.scrollHeight; + } + } + }, 100); + } + + // 如果日志已过期,停止轮询 + if (response?.expired) { + if (intervalRef.current) { + clearInterval(intervalRef.current); + intervalRef.current = null; + } + } + } catch (error) { + console.error('获取节点日志失败:', error); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + if (open && processInstanceId && nodeId) { + setLogData(null); + fetchLogs(); + + // 不再自动轮询,只在打开时加载一次 + } else { + // 关闭时清除定时器 + if (intervalRef.current) { + clearInterval(intervalRef.current); + intervalRef.current = null; + } + } + + return () => { + // 组件卸载时清除定时器 + if (intervalRef.current) { + clearInterval(intervalRef.current); + intervalRef.current = null; + } + }; + }, [open, processInstanceId, nodeId]); + + return ( +

+ + + + + 节点日志 - {nodeName || nodeId} + {logData?.expired && ( + (日志已过期或不存在) + )} + + + + + {/* 工具栏 */} +
+
+ {logData && !logData.expired && ( + 共 {logData.logs.length} 条日志 + )} +
+ +
+ + {/* 日志内容区域 */} + {loading && !logData ? ( +
+
+ + 加载日志中... +
+
+ ) : logData?.expired ? ( +
+ +

日志已过期或不存在

+

日志通常保留 7 天,请检查部署时间

+
+ ) : ( + +
+ {logData?.logs && logData.logs.length > 0 ? ( + logData.logs.map((log, index) => ( +
+ {/* 行号 - 根据总行数动态调整宽度 */} + + {index + 1} + + {/* 时间 - 18个字符宽度 */} + + {dayjs(log.timestamp).format('MM-DD HH:mm:ss.SSS')} + + {/* 级别 - 5个字符宽度 */} + + {log.level} + + {/* 日志内容 - 不换行,支持水平滚动 */} + + {log.message} + +
+ )) + ) : ( +
+ +

暂无日志

+

任务可能尚未开始或未产生日志

+
+ )} +
+
+ )} +
+ + + + +
+
+ ); +}; + +export default DeployNodeLogDialog; +