打印了JENKINS节点日志

This commit is contained in:
dengqichen 2025-11-07 16:02:18 +08:00
parent 344ba25284
commit 28cae45e51
21 changed files with 1292 additions and 229 deletions

View File

@ -90,5 +90,18 @@ public class DeployApiController {
deployService.completeApproval(request);
return Response.success();
}
/**
* 获取节点日志
*/
@Operation(summary = "获取节点日志", description = "获取指定节点的执行日志日志保留7天超过7天将被清除")
@GetMapping("/logs")
@PreAuthorize("isAuthenticated()")
public Response<DeployNodeLogDTO> getNodeLogs(
@Parameter(description = "流程实例ID", required = true) @RequestParam String processInstanceId,
@Parameter(description = "节点ID", required = true) @RequestParam String nodeId
) {
return Response.success(deployService.getNodeLogs(processInstanceId, nodeId));
}
}

View File

@ -0,0 +1,59 @@
package com.qqchen.deploy.backend.deploy.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;
/**
* 部署节点日志响应DTO
*
* @author qqchen
* @since 2025-11-07
*/
@Data
@Schema(description = "部署节点日志响应")
public class DeployNodeLogDTO {
@Schema(description = "流程实例ID")
private String processInstanceId;
@Schema(description = "节点ID")
private String nodeId;
@Schema(description = "节点名称")
private String nodeName;
@Schema(description = "日志列表")
private List<LogEntry> logs;
@Schema(description = "日志是否已过期超过7天被清除")
private Boolean expired;
@Schema(description = "提示信息")
private String message;
/**
* 日志条目
*/
@Data
@Schema(description = "日志条目")
public static class LogEntry {
@Schema(description = "序列号")
private Long sequenceId;
@Schema(description = "时间戳(毫秒)")
private Long timestamp;
@Schema(description = "日志级别INFO/WARN/ERROR/DEBUG")
private String level;
@Schema(description = "日志来源JENKINS/FLOWABLE/SHELL/NOTIFICATION")
private String source;
@Schema(description = "日志内容")
private String message;
}
}

View File

@ -44,5 +44,15 @@ public interface IDeployService {
* @param request 审批完成请求
*/
void completeApproval(DeployApprovalCompleteRequest request);
/**
* 获取节点日志
* <p>获取指定节点的执行日志日志保留7天超过7天将被清除
*
* @param processInstanceId 流程实例ID
* @param nodeId 节点ID
* @return 节点日志响应
*/
DeployNodeLogDTO getNodeLogs(String processInstanceId, String nodeId);
}

View File

@ -18,9 +18,11 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.ApprovalInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.ApprovalOutputs;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeLog;
import com.qqchen.deploy.backend.workflow.model.NodeContext;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService;
import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService;
import com.qqchen.deploy.backend.deploy.service.IDeployRecordService;
import com.qqchen.deploy.backend.deploy.repository.IDeployRecordRepository;
import com.qqchen.deploy.backend.deploy.entity.DeployRecord;
@ -100,6 +102,9 @@ public class DeployServiceImpl implements IDeployService {
@Resource
private RuntimeService runtimeService;
@Resource
private IWorkflowNodeLogService workflowNodeLogService;
@Override
public List<UserTeamDeployableDTO> getDeployableEnvironments() {
@ -139,7 +144,7 @@ public class DeployServiceImpl implements IDeployService {
// 补充查询作为成员但不是负责人的团队
Set<Long> missingTeamIds = teamIds.stream()
.filter(id -> !teamMap.containsKey(id))
.collect(Collectors.toSet());
.collect(Collectors.toSet());
if (!missingTeamIds.isEmpty()) {
teamRepository.findAllById(missingTeamIds).forEach(team -> teamMap.put(team.getId(), team));
}
@ -218,13 +223,13 @@ public class DeployServiceImpl implements IDeployService {
// 14. 批量查询团队环境配置
List<TeamEnvironmentConfig> teamEnvConfigs = teamEnvironmentConfigRepository.findByTeamIdIn(teamIds);
Map<String, TeamEnvironmentConfig> teamEnvConfigMap = teamEnvConfigs.stream()
.collect(toMap(c -> c.getTeamId() + "_" + c.getEnvironmentId(), c -> c));
.collect(toMap(c -> c.getTeamId() + "_" + c.getEnvironmentId(), c -> c));
// 15. 批量查询审批人信息
Set<Long> approverUserIds = teamEnvConfigs.stream()
.filter(c -> c.getApproverUserIds() != null)
.flatMap(c -> c.getApproverUserIds().stream())
.collect(Collectors.toSet());
.filter(c -> c.getApproverUserIds() != null)
.flatMap(c -> c.getApproverUserIds().stream())
.collect(Collectors.toSet());
Map<Long, User> approverMap = !approverUserIds.isEmpty()
? userRepository.findAllById(approverUserIds).stream().collect(toMap(User::getId, u -> u))
: Collections.emptyMap();
@ -536,32 +541,32 @@ public class DeployServiceImpl implements IDeployService {
*/
private Map<Long, DeployStatisticsDTO> queryDeployStatistics(List<Long> teamApplicationIds) {
Map<Long, DeployStatisticsDTO> statisticsMap = new HashMap<>();
List<Object[]> statisticsList = deployRecordRepository.findDeployStatisticsByTeamApplicationIds(teamApplicationIds);
List<Object[]> statisticsList = deployRecordRepository.findDeployStatisticsByTeamApplicationIds(teamApplicationIds);
for (Object[] row : statisticsList) {
Long teamApplicationId = (Long) row[0];
Long totalCount = ((Number) row[1]).longValue();
Long successCount = ((Number) row[2]).longValue();
Long failedCount = ((Number) row[3]).longValue();
Long runningCount = ((Number) row[4]).longValue();
for (Object[] row : statisticsList) {
Long teamApplicationId = (Long) row[0];
Long totalCount = ((Number) row[1]).longValue();
Long successCount = ((Number) row[2]).longValue();
Long failedCount = ((Number) row[3]).longValue();
Long runningCount = ((Number) row[4]).longValue();
LocalDateTime lastDeployTime = null;
if (row[5] != null) {
if (row[5] instanceof Timestamp) {
lastDeployTime = ((Timestamp) row[5]).toLocalDateTime();
} else if (row[5] instanceof LocalDateTime) {
lastDeployTime = (LocalDateTime) row[5];
LocalDateTime lastDeployTime = null;
if (row[5] != null) {
if (row[5] instanceof Timestamp) {
lastDeployTime = ((Timestamp) row[5]).toLocalDateTime();
} else if (row[5] instanceof LocalDateTime) {
lastDeployTime = (LocalDateTime) row[5];
}
}
}
DeployStatisticsDTO stats = new DeployStatisticsDTO();
stats.setTotalCount(totalCount);
stats.setSuccessCount(successCount);
stats.setFailedCount(failedCount);
stats.setRunningCount(runningCount);
stats.setLastDeployTime(lastDeployTime);
statisticsMap.put(teamApplicationId, stats);
}
DeployStatisticsDTO stats = new DeployStatisticsDTO();
stats.setTotalCount(totalCount);
stats.setSuccessCount(successCount);
stats.setFailedCount(failedCount);
stats.setRunningCount(runningCount);
stats.setLastDeployTime(lastDeployTime);
statisticsMap.put(teamApplicationId, stats);
}
return statisticsMap;
}
@ -570,12 +575,12 @@ public class DeployServiceImpl implements IDeployService {
* 批量查询最新部署记录
*/
private Map<Long, DeployRecord> queryLatestRecords(List<Long> teamApplicationIds) {
List<DeployRecord> latestRecords = deployRecordRepository.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds);
List<DeployRecord> latestRecords = deployRecordRepository.findLatestDeployRecordsByTeamApplicationIds(teamApplicationIds);
Map<Long, DeployRecord> latestRecordMap = latestRecords.stream()
.collect(toMap(DeployRecord::getTeamApplicationId, r -> r));
// 更新统计信息中的最新状态和部署人
latestRecordMap.forEach((teamAppId, record) -> {
// 更新统计信息中的最新状态和部署人
latestRecordMap.forEach((teamAppId, record) -> {
// 这里可以添加额外的处理逻辑
});
@ -1015,5 +1020,42 @@ public class DeployServiceImpl implements IDeployService {
return false;
}
@Override
public DeployNodeLogDTO getNodeLogs(String processInstanceId, String nodeId) {
DeployNodeLogDTO result = new DeployNodeLogDTO();
result.setProcessInstanceId(processInstanceId);
result.setNodeId(nodeId);
// 查询日志
List<WorkflowNodeLog> logs = workflowNodeLogService.getNodeLogs(processInstanceId, nodeId);
if (logs.isEmpty()) {
// 判断是过期还是还没有日志
result.setExpired(true);
result.setMessage("日志已过期超过7天或不存在");
result.setLogs(Collections.emptyList());
} else {
result.setExpired(false);
result.setMessage("查询成功");
// 转换为 DTO
List<DeployNodeLogDTO.LogEntry> logEntries = logs.stream()
.map(log -> {
DeployNodeLogDTO.LogEntry entry = new DeployNodeLogDTO.LogEntry();
entry.setSequenceId(log.getSequenceId());
entry.setTimestamp(log.getTimestamp());
entry.setLevel(log.getLevel().name());
entry.setSource(log.getSource().name());
entry.setMessage(log.getMessage());
return entry;
})
.collect(Collectors.toList());
result.setLogs(logEntries);
}
return result;
}
}

View File

@ -2,6 +2,9 @@ package com.qqchen.deploy.backend.framework.utils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@ -548,5 +551,111 @@ public class RedisUtil {
return Collections.emptySet();
}
}
// ===============================Stream=================================
/**
* 向Stream中添加数据
* @param key Stream的key
* @param data 数据Map格式
* @return 消息ID
*/
public RecordId streamAdd(String key, Map<String, String> data) {
try {
return redisTemplate.opsForStream().add(key, data);
} catch (Exception e) {
log.error("Redis streamAdd error: key={}", key, e);
return null;
}
}
/**
* 向Stream中添加数据并设置过期时间
* @param key Stream的key
* @param data 数据Map格式
* @param time 过期时间
* @return 消息ID
*/
public RecordId streamAdd(String key, Map<String, String> data, long time) {
try {
RecordId recordId = redisTemplate.opsForStream().add(key, data);
if (time > 0) {
expire(key, time);
}
return recordId;
} catch (Exception e) {
log.error("Redis streamAdd with expire error: key={}, time={}", key, time, e);
return null;
}
}
/**
* 限制Stream的长度保留最新的N条消息
* @param key Stream的key
* @param maxLen 最大长度
* @return 是否成功
*/
public boolean streamTrim(String key, long maxLen) {
try {
redisTemplate.opsForStream().trim(key, maxLen);
return true;
} catch (Exception e) {
log.error("Redis streamTrim error: key={}, maxLen={}", key, maxLen, e);
return false;
}
}
/**
* 读取Stream中指定范围的数据
* @param key Stream的key
* @param range 范围使用Range.unbounded()表示所有数据
* @return 消息列表
*/
public List<MapRecord<String, Object, Object>> streamRange(String key, Range<String> range) {
try {
return redisTemplate.opsForStream().range(key, range);
} catch (Exception e) {
log.error("Redis streamRange error: key={}, range={}", key, range, e);
return Collections.emptyList();
}
}
/**
* 读取Stream中所有数据
* @param key Stream的key
* @return 消息列表
*/
public List<MapRecord<String, Object, Object>> streamRangeAll(String key) {
return streamRange(key, Range.unbounded());
}
/**
* 删除Stream中的指定消息
* @param key Stream的key
* @param recordIds 消息ID列表
* @return 删除的数量
*/
public Long streamDelete(String key, String... recordIds) {
try {
return redisTemplate.opsForStream().delete(key, recordIds);
} catch (Exception e) {
log.error("Redis streamDelete error: key={}", key, e);
return 0L;
}
}
/**
* 获取Stream的长度
* @param key Stream的key
* @return 消息数量
*/
public Long streamSize(String key) {
try {
return redisTemplate.opsForStream().size(key);
} catch (Exception e) {
log.error("Redis streamSize error: key={}", key, e);
return 0L;
}
}
}

View File

@ -5,8 +5,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.utils.JsonUtils;
import com.qqchen.deploy.backend.framework.utils.SpelExpressionResolver;
import com.qqchen.deploy.backend.workflow.dto.outputs.BaseNodeOutputs;
import com.qqchen.deploy.backend.workflow.enums.LogLevel;
import com.qqchen.deploy.backend.workflow.enums.LogSource;
import com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum;
import com.qqchen.deploy.backend.workflow.model.NodeContext;
import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService;
import com.qqchen.deploy.backend.workflow.utils.WorkflowUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
@ -35,6 +39,8 @@ public abstract class BaseNodeDelegate<I, O> implements JavaDelegate {
@Autowired
private ObjectMapper objectMapper;
@Autowired(required = false)
private IWorkflowNodeLogService workflowNodeLogService;
// Flowable自动注入的字段
protected Expression nodeId;
@ -50,60 +56,84 @@ public abstract class BaseNodeDelegate<I, O> implements JavaDelegate {
private Class<O> outputsClass;
// 当前执行上下文用于日志记录
private String currentProcessInstanceId;
private String currentNodeId;
// 预初始化的输出对象子类可以直接访问和修改
protected O output;
// 当前输入映射对象用于读取 continueOnFailure 等配置
protected I currentInputMapping;
@Override
public void execute(DelegateExecution execution) {
String currentNodeId = null;
NodeContext<I, O> nodeContext = new NodeContext<>();
Map<String, Object> configsMap = null;
I inputMappingObj = null;
try {
// 1. 获取节点ID
currentNodeId = getFieldValue(nodeId, execution);
// 1. 获取节点ID和流程实例ID用于日志记录
this.currentNodeId = getFieldValue(nodeId, execution);
this.currentProcessInstanceId = execution.getProcessInstanceId();
log.info("Executing node: {}", currentNodeId);
// 2. 解析配置通用Map
configsMap = parseJsonField(configs, execution);
// 3. 解析并转换InputMapping强类型
// 3. 自动创建并初始化输出对象为 SUCCESS 状态
this.output = createSuccessOutputs(configsMap);
// 4. 解析并转换InputMapping强类型
inputMappingObj = parseAndConvertInputMapping(execution);
this.currentInputMapping = inputMappingObj; // 保存引用供子类使用
// 4. 执行具体的业务逻辑返回强类型输出
O outputsObj = executeInternal(execution, configsMap, inputMappingObj);
// 5. 执行具体的业务逻辑子类直接修改 this.output
executeInternal(execution, configsMap, inputMappingObj);
// 5. 使用 NodeContext 保存节点数据
// 6. 使用 NodeContext 保存节点数据
nodeContext.setConfigs(configsMap);
nodeContext.setInputMapping(inputMappingObj);
nodeContext.setOutputs(outputsObj);
nodeContext.setOutputs(this.output);
execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper));
log.info("Stored NodeContext for: {}", currentNodeId);
} catch (Exception e) {
// 即使失败也保存完整的 NodeContext包含 configs inputMapping
BaseNodeOutputs failureNodeOutputs = new BaseNodeOutputs();
failureNodeOutputs.setStatus(NodeExecutionStatusEnum.FAILURE);
failureNodeOutputs.setMessage(e.getMessage());
// 业务异常根据 continueOnFailure 配置决定行为
log.error("Business exception in node: {}", currentNodeId, e);
nodeContext.setConfigs(configsMap); // 保存已解析的配置
nodeContext.setInputMapping(inputMappingObj); // 保存已解析的输入
nodeContext.setOutputs((O) failureNodeOutputs);
boolean continueOnFailure = WorkflowUtils.getContinueOnFailure(currentInputMapping);
if (continueOnFailure) {
// 非阻断模式标记失败但流程继续
log.warn("⚠️ Node failed (continue mode enabled by config): {}", e.getMessage());
markFailure(e);
execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper));
log.error("Task execution failed", e);
// 保存失败状态的 NodeContext
nodeContext.setConfigs(configsMap);
nodeContext.setInputMapping(inputMappingObj);
nodeContext.setOutputs(this.output);
execution.setVariable(currentNodeId, nodeContext.toMap(objectMapper));
} else {
// 阻断模式终止流程默认行为
log.error("❌ Node failed (terminate mode, default): {}", e.getMessage());
terminateWorkflow(e); // 抛出 BpmnError触发流程终止
}
}
}
/**
* 执行具体的业务逻辑子类实现
* <p>
* 子类可以直接修改 {@code this.output} 对象来设置输出结果
* 输出对象已预初始化为 SUCCESS 状态失败时调用 {@code markFailure()} {@code terminateWorkflow()}
*
* @param execution Flowable执行上下文
* @param configs 节点配置
* @param inputMapping 输入映射强类型
* @return 节点输出结果强类型
*/
protected abstract O executeInternal(
protected abstract void executeInternal(
DelegateExecution execution,
Map<String, Object> configs,
I inputMapping
@ -226,4 +256,245 @@ public abstract class BaseNodeDelegate<I, O> implements JavaDelegate {
}
}
// ===================== 便利方法流程控制 =====================
/**
* 终止流程致命错误- 使用自定义错误码
* <p>
* 适用场景需要指定特定错误码用于边界事件捕获
*
* @param errorCode 错误码 WORKFLOW_EXEC_ERROR
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorCode, String errorMessage) {
log.error("🛑 Terminating workflow: errorCode={}, message={}", errorCode, errorMessage);
// 自动记录错误日志到 Redis Stream
logError(errorMessage);
throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage);
}
/**
* 终止流程致命错误- 使用默认错误码
* <p>
* 适用场景构建失败审批拒绝系统级异常等
*
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorMessage) {
terminateWorkflow("workflow_exec_error", errorMessage);
}
/**
* 终止流程致命错误- 使用异常对象
* <p>
* 适用场景捕获异常后直接终止流程
*
* @param exception 异常对象
*/
protected void terminateWorkflow(Exception exception) {
String errorMessage = exception.getMessage() != null
? exception.getMessage()
: exception.getClass().getSimpleName();
terminateWorkflow(errorMessage);
}
/**
* 标记当前节点为失败状态非致命错误流程继续
* <p>
* 直接修改 {@code this.output} 对象的状态为 FAILURE并自动记录 WARN 日志到 Redis Stream
* <p>
* 适用场景通知发送失败可选步骤失败日志记录失败等
*
* @param errorMessage 错误描述
*/
protected void markFailure(String errorMessage) {
log.warn("⚠️ Node failed but workflow continues: {}", errorMessage);
// 自动记录警告日志到 Redis Stream
logWarn(errorMessage);
// 设置失败状态
if (output instanceof BaseNodeOutputs) {
((BaseNodeOutputs) output).setStatus(NodeExecutionStatusEnum.FAILURE);
((BaseNodeOutputs) output).setMessage(errorMessage);
}
}
/**
* 标记当前节点为失败状态非致命错误流程继续- 使用异常对象
* <p>
* 适用场景捕获异常后继续流程但标记为失败
*
* @param exception 异常对象
*/
protected void markFailure(Exception exception) {
String errorMessage = exception.getMessage() != null
? exception.getMessage()
: exception.getClass().getSimpleName();
markFailure(errorMessage);
}
/**
* 创建失败状态的输出对象非致命错误流程继续
* <p>
* 已废弃请使用 {@link #markFailure(String)} 代替
* <p>
* 适用场景通知发送失败可选步骤失败日志记录失败等
*
* @param errorMessage 错误描述
* @return 失败状态的输出对象类型为当前节点的输出类型 O
* @deprecated 使用 {@link #markFailure(String)} 代替
*/
@Deprecated
protected O createFailureOutputs(String errorMessage) {
log.warn("⚠️ Node failed but workflow continues: {}", errorMessage);
// 自动记录警告日志到 Redis Stream
logWarn(errorMessage);
try {
// 通过反射创建正确类型的输出对象
Class<O> outputClass = getOutputsClass();
O outputs = outputClass.getDeclaredConstructor().newInstance();
// 设置失败状态假设所有输出都继承自 BaseNodeOutputs
if (outputs instanceof BaseNodeOutputs) {
((BaseNodeOutputs) outputs).setStatus(NodeExecutionStatusEnum.FAILURE);
((BaseNodeOutputs) outputs).setMessage(errorMessage);
}
return outputs;
} catch (Exception e) {
// 降级方案如果反射失败返回 BaseNodeOutputs
log.error("Failed to create failure outputs, using fallback BaseNodeOutputs", e);
BaseNodeOutputs fallback = new BaseNodeOutputs();
fallback.setStatus(NodeExecutionStatusEnum.FAILURE);
fallback.setMessage(errorMessage);
return (O) fallback;
}
}
/**
* 创建失败状态的输出对象非致命错误流程继续- 使用异常对象
* <p>
* 已废弃请使用 {@link #markFailure(Exception)} 代替
* <p>
* 适用场景捕获异常后继续流程但标记为失败
*
* @param exception 异常对象
* @return 失败状态的输出对象
* @deprecated 使用 {@link #markFailure(Exception)} 代替
*/
@Deprecated
protected O createFailureOutputs(Exception exception) {
String errorMessage = exception.getMessage() != null
? exception.getMessage()
: exception.getClass().getSimpleName();
return createFailureOutputs(errorMessage);
}
/**
* 创建成功状态的输出对象便利方法
* <p>
* 自动设置状态为 SUCCESS并根据节点名称生成默认消息"{节点名称}执行成功"
*
* @param configsMap 节点配置用于提取节点名称
* @return 成功状态的输出对象类型为当前节点的输出类型 O
*/
protected O createSuccessOutputs(Map<String, Object> configsMap) {
try {
Class<O> outputClass = getOutputsClass();
O outputs = outputClass.getDeclaredConstructor().newInstance();
if (outputs instanceof BaseNodeOutputs) {
((BaseNodeOutputs) outputs).setStatus(NodeExecutionStatusEnum.SUCCESS);
// 自动生成默认成功消息
String nodeName = WorkflowUtils.extractNodeName(configsMap);
((BaseNodeOutputs) outputs).setMessage(nodeName + "执行成功");
}
return outputs;
} catch (Exception e) {
log.error("Failed to create success outputs, using fallback BaseNodeOutputs", e);
BaseNodeOutputs fallback = new BaseNodeOutputs();
fallback.setStatus(NodeExecutionStatusEnum.SUCCESS);
fallback.setMessage("节点执行成功");
return (O) fallback;
}
}
// ===================== 日志记录辅助方法 =====================
/**
* 记录错误日志到 Redis Stream
*
* @param message 错误信息
*/
protected void logError(String message) {
if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) {
try {
workflowNodeLogService.log(
currentProcessInstanceId,
currentNodeId,
LogSource.FLOWABLE,
LogLevel.ERROR,
message
);
} catch (Exception e) {
// 日志记录失败不影响主流程
log.error("Failed to log to Redis Stream: {}", message, e);
}
}
}
/**
* 记录警告日志到 Redis Stream
*
* @param message 警告信息
*/
protected void logWarn(String message) {
if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) {
try {
workflowNodeLogService.log(
currentProcessInstanceId,
currentNodeId,
LogSource.FLOWABLE,
LogLevel.WARN,
message
);
} catch (Exception e) {
log.warn("Failed to log to Redis Stream: {}", message, e);
}
}
}
/**
* 记录信息日志到 Redis Stream
*
* @param message 信息内容
*/
protected void logInfo(String message) {
if (workflowNodeLogService != null && currentProcessInstanceId != null && currentNodeId != null) {
try {
workflowNodeLogService.log(
currentProcessInstanceId,
currentNodeId,
LogSource.FLOWABLE,
LogLevel.INFO,
message
);
} catch (Exception e) {
log.debug("Failed to log to Redis Stream: {}", message, e);
}
}
}
}

View File

@ -58,11 +58,12 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
private static final int MAX_BUILD_POLLS = 180; // 30分钟超时
@Override
protected JenkinsBuildOutputs executeInternal(DelegateExecution execution, Map<String, Object> configs, JenkinsBuildInputMapping input) {
protected void executeInternal(DelegateExecution execution, Map<String, Object> configs, JenkinsBuildInputMapping input) {
log.info("Jenkins Build - serverId: {}, jobName: {}", input.getServerId(), input.getJobName());
// 1. 获取外部系统
ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId()).orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId()));
// 1. 获取外部系统不存在会抛出异常由基类处理
ExternalSystem externalSystem = externalSystemRepository.findById(input.getServerId())
.orElseThrow(() -> new RuntimeException("Jenkins服务器不存在: " + input.getServerId()));
String jobName = input.getJobName();
@ -95,19 +96,16 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
log.info("Build details - changeSets: {}, artifacts: {}",
buildDetails.getChangeSets(), buildDetails.getArtifacts());
// 6. 构造输出结果执行到这里说明构建成功
JenkinsBuildOutputs outputs = new JenkinsBuildOutputs();
// 设置统一的执行状态为成功
outputs.setStatus(NodeExecutionStatusEnum.SUCCESS);
// 6. 设置输出结果执行到这里说明构建成功
// 直接修改预初始化的 output 对象
// 设置 Jenkins 特有字段
outputs.setBuildStatus(buildStatus.name());
outputs.setBuildNumber(buildInfo.getBuildNumber());
outputs.setBuildUrl(buildInfo.getBuildUrl());
output.setBuildStatus(buildStatus.name());
output.setBuildNumber(buildInfo.getBuildNumber());
output.setBuildUrl(buildInfo.getBuildUrl());
// 从构建详情中提取信息
outputs.setBuildDuration(buildDetails.getDuration() != null ? buildDetails.getDuration().intValue() : 0);
output.setBuildDuration(buildDetails.getDuration() != null ? buildDetails.getDuration().intValue() : 0);
// 提取 Git Commit ID changeSets 中获取第一个
if (buildDetails.getChangeSets() != null && !buildDetails.getChangeSets().isEmpty()) {
@ -115,13 +113,13 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
var changeSet = buildDetails.getChangeSets().get(0);
if (changeSet.getItems() != null && !changeSet.getItems().isEmpty()) {
log.info("Found {} items in changeSet", changeSet.getItems().size());
outputs.setGitCommitId(changeSet.getItems().get(0).getCommitId());
output.setGitCommitId(changeSet.getItems().get(0).getCommitId());
}
} else {
log.warn("No changeSets found in build details");
}
if (outputs.getGitCommitId() == null) {
outputs.setGitCommitId("");
if (output.getGitCommitId() == null) {
output.setGitCommitId("");
}
// 提取构建制品URL如果有多个制品拼接成逗号分隔的列表
@ -130,16 +128,16 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
String artifactUrls = buildDetails.getArtifacts().stream()
.map(artifact -> buildInfo.getBuildUrl() + "artifact/" + artifact.getRelativePath())
.collect(java.util.stream.Collectors.joining(","));
outputs.setArtifactUrl(artifactUrls);
output.setArtifactUrl(artifactUrls);
} else {
log.warn("No artifacts found in build details");
outputs.setArtifactUrl("");
output.setArtifactUrl("");
}
// 记录完成日志
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "Jenkins 构建任务执行完成");
return outputs;
// 不需要 returnstatus 已经是 SUCCESS
}
private JenkinsQueueBuildInfoResponse waitForBuildToStart(ExternalSystem externalSystem, String queueId) {
@ -158,11 +156,11 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BpmnError("POLLING_INTERRUPTED", "Interrupted while waiting for build to start");
throw new RuntimeException("Interrupted while waiting for build to start", e);
}
}
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Build did not start within %d seconds", MAX_QUEUE_POLLS * QUEUE_POLL_INTERVAL));
throw new RuntimeException(String.format("Build did not start within %d seconds", MAX_QUEUE_POLLS * QUEUE_POLL_INTERVAL));
}
private JenkinsBuildStatus pollBuildStatus(DelegateExecution execution, ExternalSystem externalSystem, String jobName, Integer buildNumber) {
@ -197,40 +195,34 @@ public class JenkinsBuildDelegate extends BaseNodeDelegate<JenkinsBuildInputMapp
switch (status) {
case SUCCESS:
// 构建成功拉取剩余日志后返回状态
// 构建成功拉取剩余日志后返回状态
log.info("Jenkins build succeeded: job={}, buildNumber={}", jobName, buildNumber);
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.info(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("✅ Jenkins 构建成功: buildNumber=%d", buildNumber));
logInfo(String.format("✅ Jenkins 构建成功: buildNumber=%d", buildNumber));
return status;
case FAILURE:
// 构建失败拉取剩余日志后抛出错误
// 构建失败拉取剩余日志后抛出异常
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建失败: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build failed: job=%s, buildNumber=%d", jobName, buildNumber));
throw new RuntimeException(String.format("Jenkins build failed: job=%s, buildNumber=%d", jobName, buildNumber));
case ABORTED:
// 构建被取消拉取剩余日志后抛出错误
// 构建被取消拉取剩余日志后抛出异常
fetchRemainingLogs(execution, externalSystem, jobName, buildNumber, logOffset);
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建被取消: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build was aborted: job=%s, buildNumber=%d", jobName, buildNumber));
throw new RuntimeException(String.format("Jenkins build was aborted: job=%s, buildNumber=%d", jobName, buildNumber));
case IN_PROGRESS:
// 继续轮询
attempts++;
break;
case NOT_FOUND:
// 构建记录丢失抛出系统异常
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建记录未找到: buildNumber=%d", buildNumber));
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build not found: job=%s, buildNumber=%d", jobName, buildNumber));
// 构建记录丢失抛出异常
throw new RuntimeException(String.format("Jenkins build not found: job=%s, buildNumber=%d", jobName, buildNumber));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, "构建状态轮询被中断");
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Build status polling was interrupted");
throw new RuntimeException("Build status polling was interrupted", e);
}
}
// 超过最大轮询次数视为超时系统异常
workflowNodeLogService.error(execution.getProcessInstanceId(), execution.getCurrentActivityId(), LogSource.JENKINS, String.format("❌ Jenkins 构建超时: 超过 %d 分钟", MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60)
);
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, String.format("Jenkins build timed out after %d minutes: job=%s, buildNumber=%d", MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60, jobName, buildNumber));
throw new RuntimeException(String.format("Jenkins build timed out after %d minutes: job=%s, buildNumber=%d",
MAX_BUILD_POLLS * BUILD_POLL_INTERVAL / 60, jobName, buildNumber));
}
/**

View File

@ -3,9 +3,9 @@ package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.notification.service.INotificationSendService;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.NotificationInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.NotificationOutputs;
import com.qqchen.deploy.backend.workflow.enums.NodeExecutionStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.stereotype.Component;
@ -25,27 +25,11 @@ public class NotificationNodeDelegate extends BaseNodeDelegate<NotificationInput
private INotificationSendService notificationSendService;
@Override
protected NotificationOutputs executeInternal(DelegateExecution execution, Map<String, Object> configs, NotificationInputMapping input) {
log.info("Sending notification - channel: {}, title: {}, content: {}", input.getChannelId(), input.getTitle(), input.getContent());
try {
// 使用通知服务发送消息
Long channelId = input.getChannelId() != null ? input.getChannelId() : null;
notificationSendService.send(channelId, input.getTitle(), input.getContent());
// 返回成功结果
NotificationOutputs outputs = new NotificationOutputs();
outputs.setStatus(NodeExecutionStatusEnum.SUCCESS);
outputs.setMessage("通知发送成功");
return outputs;
} catch (Exception e) {
log.error("Failed to send notification", e);
NotificationOutputs outputs = new NotificationOutputs();
outputs.setStatus(NodeExecutionStatusEnum.FAILURE);
outputs.setMessage("通知发送失败: " + e.getMessage());
return outputs;
protected void executeInternal(DelegateExecution execution, Map<String, Object> configs, NotificationInputMapping input) {
if (input.getChannelId() == null || StringUtils.isEmpty(input.getTitle()) || StringUtils.isEmpty(input.getContent())) {
logError(String.format("Notification delegate parameter verification failed %s %s %s", input.getChannelId(), input.getTitle(), input.getContent()));
return;
}
notificationSendService.send(input.getChannelId(), input.getTitle(), input.getContent());
}
}

View File

@ -1,10 +1,8 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.ShellInputMapping;
import com.qqchen.deploy.backend.workflow.dto.outputs.ShellOutputs;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.stereotype.Component;
@ -21,25 +19,25 @@ import java.util.Map;
public class ShellNodeDelegate extends BaseNodeDelegate<ShellInputMapping, ShellOutputs> {
@Override
protected ShellOutputs executeInternal(
protected void executeInternal(
DelegateExecution execution,
Map<String, Object> configs,
ShellInputMapping input
) {
if (input.getScript() == null || input.getScript().isEmpty()) {
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR,
"Script is required but not provided");
// 致命错误脚本为空
terminateWorkflow("Script is required but not provided");
}
log.info("Executing shell script: {}", input.getScript());
// TODO: 实现Shell脚本执行逻辑
// 目前先返回模拟结果
ShellOutputs outputs = new ShellOutputs();
outputs.setExitCode(0);
outputs.setStdout("Shell execution completed (mocked)");
outputs.setStderr("");
// 直接修改预初始化的 output 对象
output.setExitCode(0);
output.setStdout("Shell execution completed (mocked)");
output.setStderr("");
return outputs;
// 不需要 returnstatus 已经是 SUCCESS
}
}

View File

@ -16,7 +16,7 @@ import java.util.List;
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ApprovalInputMapping {
public class ApprovalInputMapping extends BaseNodeInputMapping {
/**
* 审批模式

View File

@ -0,0 +1,42 @@
package com.qqchen.deploy.backend.workflow.dto.inputmapping;
import lombok.Data;
/**
* 节点输入映射基类
* <p>
* 所有节点的 InputMapping 都应该继承此类提供通用的配置字段
*
* @author qqchen
* @since 2025-11-07
*/
@Data
public class BaseNodeInputMapping {
/**
* 失败时是否继续执行非阻断模式
* <p>
* - true: 节点失败时标记为 FAILURE但流程继续执行后续节点
* - false: 节点失败时抛出 BpmnError终止流程默认
* <p>
* 注意
* <ul>
* <li>仅对<strong>可恢复的业务错误</strong>生效如通知发送失败可选步骤失败</li>
* <li><strong>致命错误</strong>无效如配置缺失系统异常这些错误会直接终止流程</li>
* <li>默认值为 false阻断模式确保流程安全</li>
* </ul>
* <p>
* 使用示例
* <pre>
* // 前端配置通知节点可以失败后继续
* {
* "title": "部署通知",
* "content": "...",
* "channelId": 123,
* "continueOnFailure": true // 通知失败不影响流程
* }
* </pre>
*/
private Boolean continueOnFailure = false;
}

View File

@ -13,7 +13,7 @@ import jakarta.validation.constraints.NotBlank;
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class JenkinsBuildInputMapping {
public class JenkinsBuildInputMapping extends BaseNodeInputMapping {
/**
* Jenkins服务器ID

View File

@ -12,7 +12,7 @@ import jakarta.validation.constraints.NotNull;
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class NotificationInputMapping {
public class NotificationInputMapping extends BaseNodeInputMapping {
/**
* 通知渠道ID

View File

@ -13,7 +13,7 @@ import java.util.Map;
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ShellInputMapping {
public class ShellInputMapping extends BaseNodeInputMapping {
/**
* Shell脚本内容

View File

@ -10,6 +10,7 @@ import com.qqchen.deploy.backend.workflow.model.NodeContext;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowCategoryRepository;
import com.qqchen.deploy.backend.workflow.utils.WorkflowUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
@ -36,7 +37,7 @@ import java.util.Map;
*/
@Slf4j
@Component("approvalEndExecutionListener")
public class ApprovalEndExecutionListener implements ExecutionListener {
public class ApprovalEndExecutionListener extends BaseExecutionListener {
@Resource
private ObjectMapper objectMapper;
@ -59,12 +60,13 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
@Override
public void notify(DelegateExecution execution) {
String nodeId = execution.getCurrentActivityId();
NodeContext<ApprovalInputMapping, ApprovalOutputs> nodeContext = null;
try {
log.info("ApprovalExecutionListener: Building outputs for node: {}", nodeId);
// 1. 读取 NodeContext统一使用 NodeContext BaseNodeDelegate 保持一致
NodeContext<ApprovalInputMapping, ApprovalOutputs> nodeContext =
readNodeContext(execution, nodeId);
nodeContext = WorkflowUtils.readNodeContext(execution, nodeId, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper);
if (nodeContext == null) {
return;
}
@ -81,7 +83,7 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
// 4. 更新并保存 NodeContext BaseNodeDelegate 保持一致
nodeContext.setOutputs(outputs);
saveNodeContext(execution, nodeId, nodeContext);
WorkflowUtils.saveNodeContext(execution, nodeId, nodeContext, objectMapper);
log.info("Stored approval outputs for node: {}, result: {}", nodeId, outputs.getApprovalResult());
@ -91,47 +93,23 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
} catch (Exception e) {
log.error("Failed to build approval outputs for node: {}", nodeId, e);
// 异常处理统一使用 NodeContext 设置失败状态 BaseNodeDelegate 保持一致
handleFailure(execution, nodeId, e);
// 根据 continueOnFailure 配置决定行为
boolean continueOnFailure = WorkflowUtils.getContinueOnFailure(
nodeContext != null ? nodeContext.getInputMapping() : null
);
throw new RuntimeException("Failed to build approval outputs: " + nodeId, e);
}
}
/**
* 读取 NodeContext BaseNodeDelegate 的模式保持一致
*/
private NodeContext<ApprovalInputMapping, ApprovalOutputs> readNodeContext(
DelegateExecution execution, String nodeId) {
try {
Object nodeDataObj = execution.getVariable(nodeId);
if (!(nodeDataObj instanceof Map)) {
log.warn("NodeContext not found for node: {}, skipping ApprovalExecutionListener", nodeId);
return null;
if (continueOnFailure) {
// 非阻断模式记录失败但流程继续
log.warn("⚠️ Approval listener failed (continue mode enabled by config): {}", e.getMessage());
handleFailure(execution, nodeId, e);
} else {
// 阻断模式终止流程默认行为
log.error("❌ Approval listener failed (terminate mode, default): {}", e.getMessage());
terminateWorkflow(e);
}
@SuppressWarnings("unchecked")
Map<String, Object> nodeDataMap = (Map<String, Object>) nodeDataObj;
return NodeContext.fromMap(nodeDataMap, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper);
} catch (Exception e) {
log.error("Failed to read NodeContext for node: {}", nodeId, e);
return null;
}
}
/**
* 保存 NodeContext BaseNodeDelegate 的模式保持一致
*/
private void saveNodeContext(DelegateExecution execution, String nodeId,
NodeContext<ApprovalInputMapping, ApprovalOutputs> nodeContext) {
try {
execution.setVariable(nodeId, nodeContext.toMap(objectMapper));
log.debug("Saved NodeContext for node: {}", nodeId);
} catch (Exception e) {
log.error("Failed to save NodeContext for node: {}", nodeId, e);
throw new RuntimeException("Failed to save NodeContext: " + nodeId, e);
}
}
/**
* 自动装配 ApprovalOutputs丰富 outputs 的上下文信息
@ -155,7 +133,8 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
*/
private void handleFailure(DelegateExecution execution, String nodeId, Exception e) {
try {
NodeContext<ApprovalInputMapping, ApprovalOutputs> nodeContext = readNodeContext(execution, nodeId);
NodeContext<ApprovalInputMapping, ApprovalOutputs> nodeContext =
WorkflowUtils.readNodeContext(execution, nodeId, ApprovalInputMapping.class, ApprovalOutputs.class, objectMapper);
if (nodeContext == null) {
// 如果无法读取 NodeContext创建新的
nodeContext = new NodeContext<>();
@ -170,7 +149,7 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
failureOutputs.setMessage("审批节点执行异常: " + e.getMessage());
nodeContext.setOutputs(failureOutputs);
saveNodeContext(execution, nodeId, nodeContext);
WorkflowUtils.saveNodeContext(execution, nodeId, nodeContext, objectMapper);
} catch (Exception ex) {
log.error("Failed to set error status for node: {}", nodeId, ex);
}
@ -357,5 +336,6 @@ public class ApprovalEndExecutionListener implements ExecutionListener {
if (date == null) return null;
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
}

View File

@ -0,0 +1,56 @@
package com.qqchen.deploy.backend.workflow.listener;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.ExecutionListener;
/**
* 执行监听器基类
* 提供统一的流程控制方法
*
* @author qqchen
* @since 2025-11-07
*/
@Slf4j
public abstract class BaseExecutionListener implements ExecutionListener {
// ===================== 便利方法流程控制 =====================
/**
* 终止流程致命错误- 使用自定义错误码
* <p>
* 适用场景需要指定特定错误码用于边界事件捕获
*
* @param errorCode 错误码 workflow_exec_error
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorCode, String errorMessage) {
log.error("🛑 Listener terminating workflow: errorCode={}, message={}", errorCode, errorMessage);
throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage);
}
/**
* 终止流程致命错误- 使用默认错误码
* <p>
* 适用场景监听器中发生致命错误需要立即终止流程
*
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorMessage) {
terminateWorkflow("workflow_exec_error", errorMessage);
}
/**
* 终止流程致命错误- 使用异常对象
* <p>
* 适用场景捕获异常后直接终止流程
*
* @param exception 异常对象
*/
protected void terminateWorkflow(Exception exception) {
String errorMessage = exception.getMessage() != null
? exception.getMessage()
: exception.getClass().getSimpleName();
terminateWorkflow(errorMessage);
}
}

View File

@ -65,9 +65,16 @@ public abstract class BaseTaskListener<I, O> implements TaskListener {
log.info("Task configuration completed for node: {}", currentNodeId);
} catch (org.flowable.engine.delegate.BpmnError e) {
// BpmnError 应该向上传播触发边界事件终止流程
log.error("BpmnError occurred in task listener: nodeId={}, error={}", currentNodeId, e.getMessage());
throw e;
} catch (Exception e) {
log.error("Failed to configure task for node: {}", currentNodeId, e);
throw new RuntimeException("Failed to configure task: " + currentNodeId, e);
// TaskListener 异常根据业务需求决定是否终止流程
// 默认记录日志但不终止流程
log.warn("Task listener failed, but task will still be created");
}
}
@ -139,5 +146,45 @@ public abstract class BaseTaskListener<I, O> implements TaskListener {
return new HashMap<>();
}
}
// ===================== 便利方法流程控制 =====================
/**
* 终止流程致命错误- 使用自定义错误码
* <p>
* 适用场景任务创建阶段发生致命错误需要立即终止流程
*
* @param errorCode 错误码 workflow_exec_error
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorCode, String errorMessage) {
log.error("🛑 TaskListener terminating workflow: errorCode={}, message={}", errorCode, errorMessage);
throw new org.flowable.engine.delegate.BpmnError(errorCode, errorMessage);
}
/**
* 终止流程致命错误- 使用默认错误码
* <p>
* 适用场景任务创建阶段发生致命错误
*
* @param errorMessage 错误描述
*/
protected void terminateWorkflow(String errorMessage) {
terminateWorkflow("workflow_exec_error", errorMessage);
}
/**
* 终止流程致命错误- 使用异常对象
* <p>
* 适用场景捕获异常后直接终止流程
*
* @param exception 异常对象
*/
protected void terminateWorkflow(Exception exception) {
String errorMessage = exception.getMessage() != null
? exception.getMessage()
: exception.getClass().getSimpleName();
terminateWorkflow(errorMessage);
}
}

View File

@ -52,11 +52,6 @@ public interface IWorkflowNodeLogService {
*/
Page<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId, Pageable pageable);
/**
* 查询流程实例的所有日志
*/
List<WorkflowNodeLog> getProcessInstanceLogs(String processInstanceId);
/**
* 删除节点日志
*/

View File

@ -1,24 +1,24 @@
package com.qqchen.deploy.backend.workflow.service.impl;
import com.qqchen.deploy.backend.framework.utils.RedisUtil;
import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeLog;
import com.qqchen.deploy.backend.workflow.enums.LogLevel;
import com.qqchen.deploy.backend.workflow.enums.LogSource;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowNodeLogRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
/**
* 工作流节点日志服务实现
* 使用 Flowable processInstanceId + nodeId 作为日志关联键
* 工作流节点日志服务实现 - 基于 Redis Stream
* 日志保存在 Redis Stream 保留 7 天后自动删除
*
* @author qqchen
* @since 2025-11-03
@ -28,71 +28,80 @@ import java.util.List;
public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
@Resource
private IWorkflowNodeLogRepository logRepository;
@Resource
private RedisTemplate<String, String> redisTemplate;
private RedisUtil redisUtil;
/**
* 生成日志序列号使用 Redis INCR 保证全局递增
* 日志流保留时间7天
*/
private Long generateSequenceId(String processInstanceId, String nodeId) {
String key = "workflow:node:log:seq:" + processInstanceId + ":" + nodeId;
return redisTemplate.opsForValue().increment(key, 1);
private static final int LOG_STREAM_TTL_DAYS = 7;
/**
* 每个流最多保留的日志条数防止单个流过大
*/
private static final int LOG_STREAM_MAX_LEN = 50000;
/**
* 生成 Redis Stream Key
* 格式: workflow:log:stream:{processInstanceId}:{nodeId}
*/
private String buildStreamKey(String processInstanceId, String nodeId) {
return String.format("workflow:log:stream:%s:%s", processInstanceId, nodeId);
}
@Override
@Transactional
public void log(String processInstanceId, String nodeId, LogSource source, LogLevel level, String message) {
try {
Long sequenceId = generateSequenceId(processInstanceId, nodeId);
String streamKey = buildStreamKey(processInstanceId, nodeId);
WorkflowNodeLog log = new WorkflowNodeLog();
log.setProcessInstanceId(processInstanceId);
log.setNodeId(nodeId);
log.setSequenceId(sequenceId);
log.setTimestamp(System.currentTimeMillis());
log.setSource(source);
log.setLevel(level);
log.setMessage(message);
// 构建日志条目
Map<String, String> logEntry = new HashMap<>();
logEntry.put("timestamp", String.valueOf(System.currentTimeMillis()));
logEntry.put("level", level.name());
logEntry.put("source", source.name());
logEntry.put("message", message);
logRepository.save(log);
// 写入 Redis Stream
redisUtil.streamAdd(streamKey, logEntry);
// 限制流长度保留最新的日志
redisUtil.streamTrim(streamKey, LOG_STREAM_MAX_LEN);
// 设置过期时间7天转换为秒
redisUtil.expire(streamKey, LOG_STREAM_TTL_DAYS * 24 * 60 * 60);
} catch (Exception e) {
log.error("Failed to save workflow node log: processInstanceId={}, nodeId={}, source={}, level={}",
processInstanceId, nodeId, source, level, e);
// Redis 失败不影响业务流程
log.error("Failed to save workflow log to Redis (non-blocking): processInstanceId={}, nodeId={}",
processInstanceId, nodeId, e);
}
}
@Override
@Transactional
public void batchLog(String processInstanceId, String nodeId, LogSource source, LogLevel level, List<String> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
try {
List<WorkflowNodeLog> logs = new ArrayList<>(messages.size());
String streamKey = buildStreamKey(processInstanceId, nodeId);
// 批量添加到 Stream
for (String message : messages) {
Long sequenceId = generateSequenceId(processInstanceId, nodeId);
Map<String, String> logEntry = new HashMap<>();
logEntry.put("timestamp", String.valueOf(System.currentTimeMillis()));
logEntry.put("level", level.name());
logEntry.put("source", source.name());
logEntry.put("message", message);
WorkflowNodeLog log = new WorkflowNodeLog();
log.setProcessInstanceId(processInstanceId);
log.setNodeId(nodeId);
log.setSequenceId(sequenceId);
log.setTimestamp(System.currentTimeMillis());
log.setSource(source);
log.setLevel(level);
log.setMessage(message);
logs.add(log);
redisUtil.streamAdd(streamKey, logEntry);
}
logRepository.saveAll(logs);
// 限制流长度并设置过期转换为秒
redisUtil.streamTrim(streamKey, LOG_STREAM_MAX_LEN);
redisUtil.expire(streamKey, LOG_STREAM_TTL_DAYS * 24 * 60 * 60);
} catch (Exception e) {
log.error("Failed to batch save workflow node logs: processInstanceId={}, nodeId={}, count={}",
log.error("Failed to batch save workflow logs to Redis (non-blocking): processInstanceId={}, nodeId={}, count={}",
processInstanceId, nodeId, messages.size(), e);
}
}
@ -114,23 +123,82 @@ public class WorkflowNodeLogServiceImpl implements IWorkflowNodeLogService {
@Override
public List<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId) {
return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId);
try {
String streamKey = buildStreamKey(processInstanceId, nodeId);
// 检查流是否存在
if (!redisUtil.hasKey(streamKey)) {
log.warn("日志流不存在,可能已过期: processInstanceId={}, nodeId={}", processInstanceId, nodeId);
return Collections.emptyList();
}
// Stream 读取所有日志 (从头到尾)
List<MapRecord<String, Object, Object>> records = redisUtil.streamRangeAll(streamKey);
if (records == null || records.isEmpty()) {
return Collections.emptyList();
}
// 转换为实体
return records.stream()
.map(record -> convertToWorkflowNodeLog(processInstanceId, nodeId, record))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to get workflow logs from Redis: processInstanceId={}, nodeId={}",
processInstanceId, nodeId, e);
return Collections.emptyList();
}
}
@Override
public Page<WorkflowNodeLog> getNodeLogs(String processInstanceId, String nodeId, Pageable pageable) {
return logRepository.findByProcessInstanceIdAndNodeIdOrderBySequenceIdAsc(processInstanceId, nodeId, pageable);
List<WorkflowNodeLog> allLogs = getNodeLogs(processInstanceId, nodeId);
int start = (int) pageable.getOffset();
int end = Math.min(start + pageable.getPageSize(), allLogs.size());
if (start >= allLogs.size()) {
return new PageImpl<>(Collections.emptyList(), pageable, allLogs.size());
}
List<WorkflowNodeLog> pageContent = allLogs.subList(start, end);
return new PageImpl<>(pageContent, pageable, allLogs.size());
}
@Override
public List<WorkflowNodeLog> getProcessInstanceLogs(String processInstanceId) {
return logRepository.findByProcessInstanceIdOrderBySequenceIdAsc(processInstanceId);
}
@Override
@Transactional
public void deleteNodeLogs(String processInstanceId, String nodeId) {
logRepository.deleteByProcessInstanceIdAndNodeId(processInstanceId, nodeId);
try {
String streamKey = buildStreamKey(processInstanceId, nodeId);
redisUtil.del(streamKey);
log.info("删除节点日志流: processInstanceId={}, nodeId={}", processInstanceId, nodeId);
} catch (Exception e) {
log.error("Failed to delete workflow logs from Redis: processInstanceId={}, nodeId={}",
processInstanceId, nodeId, e);
}
}
/**
* Redis Stream Record 转换为 WorkflowNodeLog 实体
*/
private WorkflowNodeLog convertToWorkflowNodeLog(String processInstanceId, String nodeId,
MapRecord<String, Object, Object> record) {
WorkflowNodeLog log = new WorkflowNodeLog();
log.setProcessInstanceId(processInstanceId);
log.setNodeId(nodeId);
Map<Object, Object> value = record.getValue();
log.setTimestamp(Long.parseLong(value.get("timestamp").toString()));
log.setLevel(LogLevel.valueOf(value.get("level").toString()));
log.setSource(LogSource.valueOf(value.get("source").toString()));
log.setMessage(value.get("message").toString());
// Redis Stream ID 的第一部分作为序列号 (毫秒时间戳)
String streamId = record.getId().getValue();
log.setSequenceId(Long.parseLong(streamId.split("-")[0]));
return log;
}
}

View File

@ -0,0 +1,191 @@
package com.qqchen.deploy.backend.workflow.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.workflow.dto.inputmapping.BaseNodeInputMapping;
import com.qqchen.deploy.backend.workflow.model.NodeContext;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import java.util.Map;
/**
* 工作流工具类
* <p>
* 提供工作流相关的通用工具方法
*
* @author qqchen
* @since 2025-11-07
*/
@Slf4j
public class WorkflowUtils {
private WorkflowUtils() {
// 工具类禁止实例化
}
/**
* InputMapping 获取 continueOnFailure 配置
* <p>
* 用于判断节点失败后是否继续执行流程
* <ul>
* <li>true: 节点失败时标记为 FAILURE但流程继续执行</li>
* <li>false: 节点失败时抛出 BpmnError终止流程默认</li>
* </ul>
*
* @param inputMapping 输入映射对象必须继承自 BaseNodeInputMapping
* @return true: 失败后继续执行, false: 失败后终止流程默认
*/
public static boolean getContinueOnFailure(Object inputMapping) {
if (inputMapping == null) {
return false; // 默认阻断模式
}
try {
// 所有 InputMapping 都继承自 BaseNodeInputMapping
if (inputMapping instanceof BaseNodeInputMapping) {
Boolean continueOnFailure = ((BaseNodeInputMapping) inputMapping).getContinueOnFailure();
return Boolean.TRUE.equals(continueOnFailure);
}
} catch (Exception e) {
log.warn("Failed to get continueOnFailure config, using default (false)", e);
}
return false; // 默认阻断模式
}
/**
* InputMapping 获取 continueOnFailure 配置带默认值
*
* @param inputMapping 输入映射对象
* @param defaultValue 默认值
* @return continueOnFailure 配置值
*/
public static boolean getContinueOnFailure(Object inputMapping, boolean defaultValue) {
if (inputMapping == null) {
return defaultValue;
}
try {
if (inputMapping instanceof BaseNodeInputMapping) {
Boolean continueOnFailure = ((BaseNodeInputMapping) inputMapping).getContinueOnFailure();
if (continueOnFailure != null) {
return continueOnFailure;
}
}
} catch (Exception e) {
log.warn("Failed to get continueOnFailure config, using default ({})", defaultValue, e);
}
return defaultValue;
}
// ===================== 节点配置相关 =====================
/**
* 从配置中提取节点名称
* <p>
* 优先级nodeName nodeCode "节点"默认
*
* @param configsMap 节点配置
* @return 节点名称默认返回"节点"
*/
public static String extractNodeName(Map<String, Object> configsMap) {
if (configsMap == null) {
return "节点";
}
// 1. 优先使用 nodeName
Object nodeName = configsMap.get("nodeName");
if (nodeName != null && !nodeName.toString().trim().isEmpty()) {
return nodeName.toString().trim();
}
// 2. 降级使用 nodeCode
Object nodeCode = configsMap.get("nodeCode");
if (nodeCode != null && !nodeCode.toString().trim().isEmpty()) {
return nodeCode.toString().trim();
}
// 3. 默认值
return "节点";
}
// ===================== NodeContext 操作相关 =====================
/**
* Flowable 执行上下文读取 NodeContext
* <p>
* 统一的 NodeContext 读取方法 Delegate Listener 共享
*
* @param execution Flowable 执行上下文
* @param nodeId 节点ID
* @param inputClass 输入映射类型
* @param outputClass 输出映射类型
* @param objectMapper JSON 序列化工具
* @param <I> 输入映射泛型
* @param <O> 输出映射泛型
* @return NodeContext 对象解析失败返回 null
*/
public static <I, O> NodeContext<I, O> readNodeContext(
DelegateExecution execution,
String nodeId,
Class<I> inputClass,
Class<O> outputClass,
ObjectMapper objectMapper) {
if (execution == null || nodeId == null || nodeId.trim().isEmpty()) {
log.warn("Invalid parameters for readNodeContext: execution={}, nodeId={}", execution, nodeId);
return null;
}
try {
Object nodeDataObj = execution.getVariable(nodeId);
if (!(nodeDataObj instanceof Map)) {
log.debug("NodeContext not found or invalid type for node: {}", nodeId);
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> nodeDataMap = (Map<String, Object>) nodeDataObj;
return NodeContext.fromMap(nodeDataMap, inputClass, outputClass, objectMapper);
} catch (Exception e) {
log.error("Failed to read NodeContext for node: {}", nodeId, e);
return null;
}
}
/**
* 保存 NodeContext Flowable 执行上下文
* <p>
* 统一的 NodeContext 保存方法 Delegate Listener 共享
*
* @param execution Flowable 执行上下文
* @param nodeId 节点ID
* @param nodeContext NodeContext 对象
* @param objectMapper JSON 序列化工具
* @param <I> 输入映射泛型
* @param <O> 输出映射泛型
* @throws RuntimeException 保存失败时抛出
*/
public static <I, O> void saveNodeContext(
DelegateExecution execution,
String nodeId,
NodeContext<I, O> nodeContext,
ObjectMapper objectMapper) {
if (execution == null || nodeId == null || nodeId.trim().isEmpty() || nodeContext == null) {
throw new IllegalArgumentException("Invalid parameters for saveNodeContext");
}
try {
execution.setVariable(nodeId, nodeContext.toMap(objectMapper));
log.debug("Saved NodeContext for node: {}", nodeId);
} catch (Exception e) {
log.error("Failed to save NodeContext for node: {}", nodeId, e);
throw new RuntimeException("Failed to save NodeContext: " + nodeId, e);
}
}
}

View File

@ -0,0 +1,206 @@
import React, { useEffect, useState, useRef } from 'react';
import {
Dialog,
DialogContent,
DialogHeader,
DialogTitle,
DialogBody,
DialogFooter,
} from '@/components/ui/dialog';
import { Button } from '@/components/ui/button';
import { ScrollArea } from '@/components/ui/scroll-area';
import { Loader2, AlertCircle, Clock, FileText, RefreshCw } from 'lucide-react';
import { cn } from '@/lib/utils';
import { getDeployNodeLogs } from '../service';
import type { DeployNodeLogDTO, LogLevel } from '../types';
import dayjs from 'dayjs';
interface DeployNodeLogDialogProps {
open: boolean;
onOpenChange: (open: boolean) => void;
processInstanceId: string;
nodeId: string;
nodeName?: string;
}
const getLevelClass = (level: LogLevel): string => {
const levelMap: Record<LogLevel, string> = {
INFO: 'text-blue-600',
WARN: 'text-yellow-600',
ERROR: 'text-red-600',
};
return levelMap[level] || 'text-gray-600';
};
const DeployNodeLogDialog: React.FC<DeployNodeLogDialogProps> = ({
open,
onOpenChange,
processInstanceId,
nodeId,
nodeName,
}) => {
const [loading, setLoading] = useState(false);
const [logData, setLogData] = useState<DeployNodeLogDTO | null>(null);
const scrollAreaRef = useRef<HTMLDivElement>(null);
const intervalRef = useRef<NodeJS.Timeout | null>(null);
const fetchLogs = async () => {
if (!processInstanceId || !nodeId) return;
setLoading(true);
try {
const response = await getDeployNodeLogs(processInstanceId, nodeId);
setLogData(response);
// 如果日志未过期且有新日志,滚动到底部
if (response && !response.expired && response.logs.length > 0) {
setTimeout(() => {
if (scrollAreaRef.current) {
const viewport = scrollAreaRef.current.querySelector('[data-radix-scroll-area-viewport]');
if (viewport) {
viewport.scrollTop = viewport.scrollHeight;
}
}
}, 100);
}
// 如果日志已过期,停止轮询
if (response?.expired) {
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = null;
}
}
} catch (error) {
console.error('获取节点日志失败:', error);
} finally {
setLoading(false);
}
};
useEffect(() => {
if (open && processInstanceId && nodeId) {
setLogData(null);
fetchLogs();
// 不再自动轮询,只在打开时加载一次
} else {
// 关闭时清除定时器
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = null;
}
}
return () => {
// 组件卸载时清除定时器
if (intervalRef.current) {
clearInterval(intervalRef.current);
intervalRef.current = null;
}
};
}, [open, processInstanceId, nodeId]);
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="max-w-6xl h-[80vh] flex flex-col">
<DialogHeader>
<DialogTitle className="flex items-center gap-2">
<FileText className="h-5 w-5 text-primary" />
- {nodeName || nodeId}
{logData?.expired && (
<span className="ml-2 text-sm font-normal text-red-500">()</span>
)}
</DialogTitle>
</DialogHeader>
<DialogBody className="flex-1 flex flex-col min-h-0">
{/* 工具栏 */}
<div className="flex items-center justify-between mb-3">
<div className="flex items-center gap-2 text-sm text-muted-foreground">
{logData && !logData.expired && (
<span> {logData.logs.length} </span>
)}
</div>
<Button
variant="outline"
size="sm"
onClick={fetchLogs}
disabled={loading}
>
<RefreshCw className={cn("h-4 w-4 mr-2", loading && "animate-spin")} />
</Button>
</div>
{/* 日志内容区域 */}
{loading && !logData ? (
<div className="flex items-center justify-center h-full">
<div className="text-center">
<Loader2 className="h-8 w-8 animate-spin text-primary mx-auto mb-2" />
<span className="text-sm text-muted-foreground">...</span>
</div>
</div>
) : logData?.expired ? (
<div className="flex flex-col items-center justify-center h-full text-muted-foreground">
<AlertCircle className="h-12 w-12 mb-4 text-muted-foreground/50" />
<p className="text-lg font-medium"></p>
<p className="text-sm mt-2"> 7 </p>
</div>
) : (
<ScrollArea className="flex-1 border rounded-md bg-gray-50" ref={scrollAreaRef}>
<div className="p-2 font-mono text-xs">
{logData?.logs && logData.logs.length > 0 ? (
logData.logs.map((log, index) => (
<div
key={log.sequenceId}
className="flex items-start hover:bg-gray-200 px-2 py-0.5 whitespace-nowrap"
>
{/* 行号 - 根据总行数动态调整宽度 */}
<span
className="text-muted-foreground flex-shrink-0 text-right pr-3 select-none"
style={{ width: `${Math.max(3, String(logData.logs.length).length)}ch` }}
>
{index + 1}
</span>
{/* 时间 - 18个字符宽度 */}
<span className="text-muted-foreground flex-shrink-0 pr-2" style={{ width: '18ch' }}>
{dayjs(log.timestamp).format('MM-DD HH:mm:ss.SSS')}
</span>
{/* 级别 - 5个字符宽度 */}
<span
className={cn('flex-shrink-0 font-semibold pr-2', getLevelClass(log.level))}
style={{ width: '5ch' }}
>
{log.level}
</span>
{/* 日志内容 - 不换行,支持水平滚动 */}
<span className="flex-1 text-gray-800 whitespace-nowrap overflow-x-auto">
{log.message}
</span>
</div>
))
) : (
<div className="flex flex-col items-center justify-center h-64 text-muted-foreground">
<Clock className="h-12 w-12 mb-4 text-muted-foreground/30" />
<p className="text-base font-medium"></p>
<p className="text-sm mt-2"></p>
</div>
)}
</div>
</ScrollArea>
)}
</DialogBody>
<DialogFooter>
<Button variant="outline" onClick={() => onOpenChange(false)}>
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
};
export default DeployNodeLogDialog;