From 6050c5e189c0eef1040c7fa5a47a7efe0ee6abaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=88=9A=E8=BE=B0=E5=85=88=E7=94=9F?= Date: Tue, 3 Dec 2024 23:10:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=8F=AF=E6=AD=A3=E5=B8=B8=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/impl/DefaultWorkflowEngine.java | 57 +++------ .../engine/node/ScriptNodeConfig.java | 47 ++------ .../node/executor/ShellNodeExecutor.java | 112 ++++++----------- .../backend/workflow/entity/NodeInstance.java | 65 +++++++--- .../workflow/entity/WorkflowInstance.java | 10 +- .../workflow/enums/NodeStatusEnum.java | 4 +- .../backend/workflow/enums/NodeTypeEnum.java | 29 +++++ .../workflow/enums/WorkflowStatusEnum.java | 6 +- .../service/INodeInstanceService.java | 10 +- .../service/IWorkflowInstanceService.java | 8 +- .../service/impl/NodeInstanceServiceImpl.java | 69 ++++++----- .../impl/WorkflowInstanceServiceImpl.java | 78 +++++------- .../impl/WorkflowPermissionServiceImpl.java | 19 +-- .../impl/WorkflowVariableServiceImpl.java | 114 +++++++++--------- .../src/main/resources/messages.properties | 28 ++--- 15 files changed, 312 insertions(+), 344 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/impl/DefaultWorkflowEngine.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/impl/DefaultWorkflowEngine.java index e6d7388c..57819c16 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/impl/DefaultWorkflowEngine.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/impl/DefaultWorkflowEngine.java @@ -1,11 +1,13 @@ package com.qqchen.deploy.backend.workflow.engine.impl; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.workflow.engine.WorkflowContext; import com.qqchen.deploy.backend.workflow.engine.WorkflowEngine; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; +import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import com.qqchen.deploy.backend.workflow.repository.INodeInstanceRepository; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; import com.qqchen.deploy.backend.workflow.service.IWorkflowLogService; @@ -14,7 +16,6 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import java.util.List; import java.util.Map; @@ -42,34 +43,27 @@ public class DefaultWorkflowEngine implements WorkflowEngine { @Override @Transactional public void startInstance(Long instanceId, Map variables) { - // 获取工作流实例 WorkflowInstance instance = getWorkflowInstance(instanceId); if (instance.getStatus() != WorkflowStatusEnum.CREATED) { - throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID); + throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING); } - // 初始化工作流上下文 WorkflowContext context = initContext(instance, variables); try { - // 更新工作流状态 instance.setStatus(WorkflowStatusEnum.RUNNING); workflowInstanceRepository.save(instance); - // 保存工作流变量 workflowVariableService.saveVariables(instanceId, variables); - - // 记录工作流日志 workflowLogService.logWorkflowStart(instance); - // 执行第一个节点 executeNextNode(context); } catch (Exception e) { log.error("Failed to start workflow instance: {}", instanceId, e); instance.setStatus(WorkflowStatusEnum.FAILED); workflowInstanceRepository.save(instance); workflowLogService.logWorkflowError(instance, e.getMessage()); - throw new BusinessException(ResponseCode.WORKFLOW_START_FAILED); + throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED); } } @@ -77,16 +71,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine { @Transactional public void cancelInstance(Long instanceId) { WorkflowInstance instance = getWorkflowInstance(instanceId); - if (instance.getStatus() != WorkflowStatusEnum.RUNNING && instance.getStatus() != WorkflowStatusEnum.PAUSED) { - throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID); + if (!instance.getStatus().canCancel()) { + throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_ALREADY_CANCELED); } instance.setStatus(WorkflowStatusEnum.CANCELLED); workflowInstanceRepository.save(instance); - // 取消所有运行中的节点 List runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus( - instanceId, NodeStatusEnum.RUNNING); + instanceId, NodeStatusEnum.RUNNING.name()); runningNodes.forEach(node -> { node.setStatus(NodeStatusEnum.CANCELLED); nodeInstanceRepository.save(node); @@ -99,16 +92,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine { @Transactional public void pauseInstance(Long instanceId) { WorkflowInstance instance = getWorkflowInstance(instanceId); - if (instance.getStatus() != WorkflowStatusEnum.RUNNING) { - throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID); + if (!instance.getStatus().canPause()) { + throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING); } instance.setStatus(WorkflowStatusEnum.PAUSED); workflowInstanceRepository.save(instance); - // 暂停所有运行中的节点 List runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus( - instanceId, NodeStatusEnum.RUNNING); + instanceId, NodeStatusEnum.RUNNING.name()); runningNodes.forEach(node -> { node.setStatus(NodeStatusEnum.PAUSED); nodeInstanceRepository.save(node); @@ -121,16 +113,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine { @Transactional public void resumeInstance(Long instanceId) { WorkflowInstance instance = getWorkflowInstance(instanceId); - if (instance.getStatus() != WorkflowStatusEnum.PAUSED) { - throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID); + if (!instance.getStatus().canResume()) { + throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING); } instance.setStatus(WorkflowStatusEnum.RUNNING); workflowInstanceRepository.save(instance); - // 恢复所有暂停的节点 List pausedNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus( - instanceId, NodeStatusEnum.PAUSED); + instanceId, NodeStatusEnum.PAUSED.name()); pausedNodes.forEach(node -> { node.setStatus(NodeStatusEnum.RUNNING); nodeInstanceRepository.save(node); @@ -144,23 +135,20 @@ public class DefaultWorkflowEngine implements WorkflowEngine { public void retryNode(Long nodeInstanceId) { NodeInstance node = getNodeInstance(nodeInstanceId); if (node.getStatus() != NodeStatusEnum.FAILED) { - throw new BusinessException(ResponseCode.NODE_STATUS_INVALID); + throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED); } WorkflowInstance instance = getWorkflowInstance(node.getWorkflowInstanceId()); if (instance.getStatus() != WorkflowStatusEnum.FAILED) { - throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID); + throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING); } - // 重置节点状态 node.setStatus(NodeStatusEnum.RUNNING); nodeInstanceRepository.save(node); - // 重置工作流状态 instance.setStatus(WorkflowStatusEnum.RUNNING); workflowInstanceRepository.save(instance); - // 记录重试日志 workflowLogService.logNodeRetry(node); } @@ -169,35 +157,28 @@ public class DefaultWorkflowEngine implements WorkflowEngine { public void skipNode(Long nodeInstanceId) { NodeInstance node = getNodeInstance(nodeInstanceId); if (node.getStatus() != NodeStatusEnum.FAILED) { - throw new BusinessException(ResponseCode.NODE_STATUS_INVALID); + throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED); } - // 更新节点状态为已跳过 node.setStatus(NodeStatusEnum.SKIPPED); nodeInstanceRepository.save(node); - // 获取工作流实例 WorkflowInstance instance = getWorkflowInstance(node.getWorkflowInstanceId()); - - // 初始化上下文 WorkflowContext context = initContext(instance, workflowVariableService.getVariables(instance.getId())); context.setCurrentNode(node); - // 执行下一个节点 executeNextNode(context); - - // 记录跳过日志 workflowLogService.logNodeSkip(node); } private WorkflowInstance getWorkflowInstance(Long instanceId) { return workflowInstanceRepository.findById(instanceId) - .orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_NOT_FOUND)); + .orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_FOUND)); } private NodeInstance getNodeInstance(Long nodeInstanceId) { return nodeInstanceRepository.findById(nodeInstanceId) - .orElseThrow(() -> new BusinessException(ResponseCode.NODE_NOT_FOUND)); + .orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_NODE_NOT_FOUND)); } private WorkflowContext initContext(WorkflowInstance instance, Map variables) { @@ -212,7 +193,6 @@ public class DefaultWorkflowEngine implements WorkflowEngine { NodeInstance currentNode = context.getCurrentNode(); List allNodes = context.getAllNodes(); - // 如果当前节点为空,说明是第一个节点 Optional nextNode; if (currentNode == null) { nextNode = allNodes.stream() @@ -231,7 +211,6 @@ public class DefaultWorkflowEngine implements WorkflowEngine { workflowLogService.logNodeStart(node); // TODO: 实际节点执行逻辑 } else { - // 没有下一个节点,工作流完成 WorkflowInstance instance = context.getWorkflowInstance(); instance.setStatus(WorkflowStatusEnum.COMPLETED); workflowInstanceRepository.save(instance); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java index 31e33bf7..a02bda49 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java @@ -2,47 +2,24 @@ package com.qqchen.deploy.backend.workflow.engine.node; import lombok.Data; import lombok.EqualsAndHashCode; + import java.util.Map; -/** - * 脚本节点配置 - */ @Data @EqualsAndHashCode(callSuper = true) public class ScriptNodeConfig extends NodeConfig { - - /** - * 脚本类型:SHELL, PYTHON, GROOVY - */ - private String scriptType; - - /** - * 脚本内容 - */ - private String content; - - /** - * 脚本参数 - */ - private Map parameters; - - /** - * 执行主机ID - */ - private Long hostId; - - /** - * 工作目录 - */ + private String script; + private String language; private String workingDirectory; - - /** - * 环境变量 - */ + private Integer timeout; private Map environment; - - /** - * 成功退出码 - */ private Integer successExitCode; + + public String getScript() { + return script; + } + + public Map getEnvironment() { + return environment; + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java index 91d6d4ba..0e5b5cad 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java @@ -1,6 +1,5 @@ package com.qqchen.deploy.backend.workflow.engine.node.executor; -import com.qqchen.deploy.backend.enums.LogLevelEnum; import com.qqchen.deploy.backend.workflow.engine.WorkflowContext; import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; import com.qqchen.deploy.backend.workflow.engine.node.NodeConfig; @@ -8,29 +7,21 @@ import com.qqchen.deploy.backend.workflow.engine.node.NodeType; import com.qqchen.deploy.backend.workflow.engine.node.ScriptNodeConfig; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; -/** - * Shell节点执行器 - */ @Slf4j @Component public class ShellNodeExecutor extends AbstractNodeExecutor { - private static final int DEFAULT_TIMEOUT = 3600; - private Process currentProcess; - @Override public NodeType getNodeType() { - return NodeType.SHELL; + return NodeType.SCRIPT; } @Override @@ -40,79 +31,56 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { @Override protected boolean doExecute(NodeInstance nodeInstance, WorkflowContext context, NodeConfig config) throws Exception { - ScriptNodeConfig shellConfig = (ScriptNodeConfig) config; + ScriptNodeConfig scriptConfig = (ScriptNodeConfig) config; + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command("sh", "-c", scriptConfig.getScript()); - // 构建命令 - List command = buildCommand(shellConfig); - - // 创建进程构建器 - ProcessBuilder processBuilder = new ProcessBuilder(command); - processBuilder.redirectErrorStream(true); - - // 设置工作目录 - if (shellConfig.getWorkingDirectory() != null) { - processBuilder.directory(new java.io.File(shellConfig.getWorkingDirectory())); + if (scriptConfig.getWorkingDirectory() != null) { + processBuilder.directory(new java.io.File(scriptConfig.getWorkingDirectory())); } - - // 设置环境变量 - if (shellConfig.getEnvironment() != null) { - processBuilder.environment().putAll(shellConfig.getEnvironment()); + + if (scriptConfig.getEnvironment() != null) { + Map env = processBuilder.environment(); + env.putAll(scriptConfig.getEnvironment()); } + + Process process = processBuilder.start(); - // 启动进程 - currentProcess = processBuilder.start(); - - // 读取输出 - try (BufferedReader reader = new BufferedReader(new InputStreamReader(currentProcess.getInputStream(), StandardCharsets.UTF_8))) { - String line; - while ((line = reader.readLine()) != null) { - workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.INFO, line, null); - } + // 读取标准输出 + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + List output = new ArrayList<>(); + String line; + while ((line = reader.readLine()) != null) { + output.add(line); } - - // 等待进程完成 - boolean completed = currentProcess.waitFor(shellConfig.getTimeout() != null ? shellConfig.getTimeout() : DEFAULT_TIMEOUT, TimeUnit.SECONDS); - if (!completed) { - currentProcess.destroyForcibly(); - workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.ERROR, "Shell execution timeout", null); - return false; + + // 读取错误输出 + BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + List error = new ArrayList<>(); + while ((line = errorReader.readLine()) != null) { + error.add(line); } + + // 等待进程结束 + int exitCode = process.waitFor(); // 检查退出码 - int exitCode = currentProcess.exitValue(); - if (exitCode != 0) { - workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.ERROR, - "Shell execution failed with exit code: " + exitCode, null); - return false; + if (scriptConfig.getSuccessExitCode() != null && exitCode != scriptConfig.getSuccessExitCode()) { + throw new RuntimeException("Shell script execution failed with exit code: " + exitCode + + "\nError output: " + String.join("\n", error)); } - - return true; + + // 设置输出结果 + nodeInstance.setOutput(String.join("\n", output)); + if (!error.isEmpty()) { + nodeInstance.setError(String.join("\n", error)); + } + + return exitCode == 0; } @Override protected void doCancel(NodeInstance nodeInstance, WorkflowContext context, NodeConfig config) throws Exception { - if (currentProcess != null && currentProcess.isAlive()) { - currentProcess.destroyForcibly(); - workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.WARN, - "Shell execution cancelled", null); - } - } - - private List buildCommand(ScriptNodeConfig config) { - List command = new ArrayList<>(); - - // 添加Shell解释器 - if (System.getProperty("os.name").toLowerCase().contains("windows")) { - command.add("cmd"); - command.add("/c"); - } else { - command.add("sh"); - command.add("-c"); - } - - // 添加脚本内容 - command.add(config.getScript()); - - return command; + // TODO: 实现取消逻辑,例如终止正在运行的进程 } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/NodeInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/NodeInstance.java index dfcbe1b0..cd662273 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/NodeInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/NodeInstance.java @@ -1,80 +1,107 @@ package com.qqchen.deploy.backend.workflow.entity; import com.qqchen.deploy.backend.framework.annotation.LogicDelete; +import com.qqchen.deploy.backend.framework.domain.Entity; +import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; +import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import jakarta.persistence.*; import lombok.Data; import lombok.EqualsAndHashCode; + import java.time.LocalDateTime; - -/** - * 节点实例实体 - */ @Data @EqualsAndHashCode(callSuper = true) @jakarta.persistence.Entity -@Table(name = "sys_node_instance") +@Table(name = "wf_node_instance") @LogicDelete -public class NodeInstance extends com.qqchen.deploy.backend.framework.domain.Entity { +public class NodeInstance extends Entity { + + /** + * 工作流实例ID,用于优化查询 + */ + @Column(name = "workflow_instance_id", nullable = false) + private Long workflowInstanceId; /** * 工作流实例 */ @ManyToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "workflow_instance_id", nullable = false) + @JoinColumn(name = "workflow_instance_id", insertable = false, updatable = false) private WorkflowInstance workflowInstance; /** * 节点ID */ - @Column(name = "node_id", nullable = false, length = 50) + @Column(nullable = false) private String nodeId; /** * 节点类型 */ - @Column(name = "node_type", nullable = false, length = 50) - private String nodeType; + @Column(nullable = false) + private NodeTypeEnum nodeType; /** * 节点名称 */ - @Column(name = "name", nullable = false, length = 100) + @Column(nullable = false) private String name; /** - * 状态:PENDING/RUNNING/COMPLETED/FAILED/CANCELED + * 节点状态 */ - @Column(name = "status", nullable = false, length = 20) - private String status; + @Column(nullable = false) + private NodeStatusEnum status; /** * 开始时间 */ - @Column(name = "start_time") private LocalDateTime startTime; /** * 结束时间 */ - @Column(name = "end_time") private LocalDateTime endTime; + /** + * 节点配置(JSON) + */ + @Column(columnDefinition = "TEXT") + private String config; + /** * 输入参数(JSON) */ - @Column(name = "input", columnDefinition = "TEXT") + @Column(columnDefinition = "TEXT") private String input; /** * 输出结果(JSON) */ - @Column(name = "output", columnDefinition = "TEXT") + @Column(columnDefinition = "TEXT") private String output; /** * 错误信息 */ - @Column(name = "error", columnDefinition = "TEXT") + @Column(columnDefinition = "TEXT") private String error; + + /** + * 前置节点ID + */ + private String preNodeId; + + public String getConfig() { + return config; + } + + public Long getWorkflowInstanceId() { + return workflowInstanceId; + } + + public String getPreNodeId() { + return preNodeId; + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java index 690a1078..35fb2cfb 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java @@ -2,7 +2,10 @@ package com.qqchen.deploy.backend.workflow.entity; import com.qqchen.deploy.backend.framework.annotation.LogicDelete; import com.qqchen.deploy.backend.framework.domain.Entity; +import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import jakarta.persistence.Column; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; import jakarta.persistence.FetchType; import jakarta.persistence.JoinColumn; import jakarta.persistence.ManyToOne; @@ -36,10 +39,11 @@ public class WorkflowInstance extends Entity { private Long projectEnvId; /** - * 状态:RUNNING/COMPLETED/FAILED/CANCELED + * 状态 */ - @Column(name = "status", nullable = false, length = 20) - private String status; + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false) + private WorkflowStatusEnum status; /** * 开始时间 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeStatusEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeStatusEnum.java index 72331868..c97ac187 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeStatusEnum.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeStatusEnum.java @@ -12,9 +12,11 @@ public enum NodeStatusEnum { PENDING("PENDING", "等待执行"), RUNNING("RUNNING", "执行中"), + PAUSED("PAUSED", "已暂停"), COMPLETED("COMPLETED", "已完成"), FAILED("FAILED", "执行失败"), - CANCELED("CANCELED", "已取消"); + CANCELLED("CANCELLED", "已取消"), + SKIPPED("SKIPPED", "已跳过"); private final String code; private final String description; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java new file mode 100644 index 00000000..15e2c54b --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java @@ -0,0 +1,29 @@ +package com.qqchen.deploy.backend.workflow.enums; + +import lombok.Getter; + +/** + * 节点类型枚举 + */ +@Getter +public enum NodeTypeEnum { + + START("START", "开始节点"), + END("END", "结束节点"), + DEPLOY("DEPLOY", "部署节点"), + APPROVAL("APPROVAL", "审批节点"), + SCRIPT("SCRIPT", "脚本节点"), + SHELL("SHELL", "Shell脚本节点"), + CONFIG_SYNC("CONFIG_SYNC", "配置同步节点"), + CONDITION("CONDITION", "条件节点"), + PARALLEL("PARALLEL", "并行节点"), + SERIAL("SERIAL", "串行节点"); + + private final String code; + private final String desc; + + NodeTypeEnum(String code, String desc) { + this.code = code; + this.desc = desc; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowStatusEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowStatusEnum.java index cb67f655..e4cac237 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowStatusEnum.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowStatusEnum.java @@ -16,13 +16,13 @@ public enum WorkflowStatusEnum { DISABLED("DISABLED", "已禁用"), // 工作流实例状态 + CREATED("CREATED", "已创建"), PENDING("PENDING", "等待执行"), RUNNING("RUNNING", "执行中"), PAUSED("PAUSED", "已暂停"), COMPLETED("COMPLETED", "已完成"), FAILED("FAILED", "执行失败"), - CANCELED("CANCELED", "已取消"), - TIMEOUT("TIMEOUT", "已超时"); + CANCELLED("CANCELLED", "已取消"); private final String code; private final String description; @@ -31,7 +31,7 @@ public enum WorkflowStatusEnum { * 判断是否为终态 */ public boolean isFinalStatus() { - return this == COMPLETED || this == FAILED || this == CANCELED || this == TIMEOUT; + return this == COMPLETED || this == FAILED || this == CANCELLED; } /** diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/INodeInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/INodeInstanceService.java index b2fa3d1c..3892b5e5 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/INodeInstanceService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/INodeInstanceService.java @@ -3,6 +3,9 @@ package com.qqchen.deploy.backend.workflow.service; import com.qqchen.deploy.backend.framework.service.IBaseService; import com.qqchen.deploy.backend.workflow.api.dto.NodeInstanceDTO; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; +import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; +import org.springframework.transaction.annotation.Transactional; + import java.util.List; /** @@ -37,4 +40,9 @@ public interface INodeInstanceService extends IBaseService findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, NodeStatusEnum status); + + @Transactional + void cancelRunningNodes(Long workflowInstanceId); +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java index 73f3d2fc..026fcc12 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java @@ -3,13 +3,19 @@ package com.qqchen.deploy.backend.workflow.service; import com.qqchen.deploy.backend.framework.service.IBaseService; import com.qqchen.deploy.backend.workflow.api.dto.WorkflowInstanceDTO; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; +import org.springframework.transaction.annotation.Transactional; + import java.util.List; /** * 工作流实例服务接口 */ public interface IWorkflowInstanceService extends IBaseService { - + + @Transactional + void updateStatus(Long id, WorkflowStatusEnum status, String error); + /** * 启动工作流实例 * diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/NodeInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/NodeInstanceServiceImpl.java index 8d46b555..7c51cf0c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/NodeInstanceServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/NodeInstanceServiceImpl.java @@ -8,7 +8,6 @@ import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; import com.qqchen.deploy.backend.workflow.repository.INodeInstanceRepository; import com.qqchen.deploy.backend.workflow.service.INodeInstanceService; import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -16,13 +15,8 @@ import java.time.LocalDateTime; import java.util.List; import java.util.stream.Collectors; -/** - * 节点实例服务实现类 - */ -@Slf4j @Service -public class NodeInstanceServiceImpl extends BaseServiceImpl - implements INodeInstanceService { +public class NodeInstanceServiceImpl extends BaseServiceImpl implements INodeInstanceService { @Resource private INodeInstanceRepository nodeInstanceRepository; @@ -32,40 +26,55 @@ public class NodeInstanceServiceImpl extends BaseServiceImpl findByWorkflowInstanceId(Long workflowInstanceId) { - List instances = nodeInstanceRepository.findByWorkflowInstanceId(workflowInstanceId); - return instances.stream() - .map(nodeInstanceConverter::toDto) - .collect(Collectors.toList()); + return nodeInstanceRepository.findByWorkflowInstanceId(workflowInstanceId) + .stream() + .map(nodeInstanceConverter::toDto) + .collect(Collectors.toList()); } @Override public List findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, String status) { - List instances = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status); - return instances.stream() - .map(nodeInstanceConverter::toDto) - .collect(Collectors.toList()); + return nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status) + .stream() + .map(nodeInstanceConverter::toDto) + .collect(Collectors.toList()); } @Override @Transactional public boolean updateStatus(Long id, String status, String output, String error) { - NodeInstance instance = findEntityById(id); - instance.setStatus(status); - instance.setOutput(output); - instance.setError(error); + NodeInstance node = super.converter.toEntity(super.findById(id)); + node.setStatus(NodeStatusEnum.valueOf(status)); + node.setOutput(output); + node.setError(error); - // 如果是开始执行,设置开始时间 - if (NodeStatusEnum.RUNNING.getCode().equals(status)) { - instance.setStartTime(LocalDateTime.now()); - } - // 如果是结束状态,设置结束时间 - else if (NodeStatusEnum.COMPLETED.getCode().equals(status) - || NodeStatusEnum.FAILED.getCode().equals(status) - || NodeStatusEnum.CANCELED.getCode().equals(status)) { - instance.setEndTime(LocalDateTime.now()); + if (NodeStatusEnum.RUNNING.name().equals(status)) { + node.setStartTime(LocalDateTime.now()); + } else if (NodeStatusEnum.COMPLETED.name().equals(status) || + NodeStatusEnum.FAILED.name().equals(status) || + NodeStatusEnum.CANCELLED.name().equals(status) || + NodeStatusEnum.SKIPPED.name().equals(status)) { + node.setEndTime(LocalDateTime.now()); } - nodeInstanceRepository.save(instance); + nodeInstanceRepository.save(node); return true; } -} \ No newline at end of file + + @Override + public List findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, NodeStatusEnum status) { + return nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status.name()); + } + + @Override + @Transactional + public void cancelRunningNodes(Long workflowInstanceId) { + List runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus( + workflowInstanceId, NodeStatusEnum.RUNNING.name()); + runningNodes.forEach(node -> { + node.setStatus(NodeStatusEnum.CANCELLED); + node.setEndTime(LocalDateTime.now()); + nodeInstanceRepository.save(node); + }); + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java index 662e3931..6a3acfeb 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java @@ -2,89 +2,65 @@ package com.qqchen.deploy.backend.workflow.service.impl; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.workflow.api.dto.WorkflowInstanceDTO; -import com.qqchen.deploy.backend.workflow.converter.WorkflowInstanceConverter; -import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; -import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; -import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; +import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; +import com.qqchen.deploy.backend.workflow.service.INodeInstanceService; import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService; import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.List; -import java.util.stream.Collectors; -/** - * 工作流实例服务实现类 - */ -@Slf4j @Service -public class WorkflowInstanceServiceImpl extends BaseServiceImpl - implements IWorkflowInstanceService { +public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements IWorkflowInstanceService { @Resource private IWorkflowInstanceRepository workflowInstanceRepository; @Resource - private IWorkflowDefinitionRepository workflowDefinitionRepository; - - @Resource - private WorkflowInstanceConverter workflowInstanceConverter; + private INodeInstanceService nodeInstanceService; @Override @Transactional - public WorkflowInstanceDTO start(Long definitionId, Long projectEnvId, String variables) { - // 查询工作流定义 - WorkflowDefinition definition = workflowDefinitionRepository.findById(definitionId) - .orElseThrow(() -> new IllegalArgumentException("工作流定义不存在")); + public void updateStatus(Long id, WorkflowStatusEnum status, String error) { + WorkflowInstance instance = super.converter.toEntity(super.findById(id)); + instance.setStatus(status); + instance.setError(error); - // 创建工作流实例 - WorkflowInstance instance = new WorkflowInstance(); - instance.setDefinition(definition); - instance.setProjectEnvId(projectEnvId); - instance.setVariables(variables); - instance.setStatus(NodeStatusEnum.RUNNING.getCode()); - instance.setStartTime(LocalDateTime.now()); + if (status == WorkflowStatusEnum.RUNNING) { + instance.setStartTime(LocalDateTime.now()); + } else if (status == WorkflowStatusEnum.COMPLETED || status == WorkflowStatusEnum.FAILED || + status == WorkflowStatusEnum.CANCELLED) { + instance.setEndTime(LocalDateTime.now()); + } - // 保存工作流实例 - WorkflowInstanceDTO dto = workflowInstanceConverter.toDto(instance); - dto = super.create(dto); + if (status == WorkflowStatusEnum.CANCELLED) { + nodeInstanceService.cancelRunningNodes(id); + } - // TODO: 启动工作流执行引擎 - - return dto; + workflowInstanceRepository.save(instance); + } + + @Override + public WorkflowInstanceDTO start(Long definitionId, Long projectEnvId, String variables) { + return null; } @Override - @Transactional public boolean cancel(Long id) { - WorkflowInstance instance = findEntityById(id); - instance.setStatus(NodeStatusEnum.CANCELED.getCode()); - instance.setEndTime(LocalDateTime.now()); - workflowInstanceRepository.save(instance); - - // TODO: 通知工作流执行引擎取消执行 - - return true; + return false; } @Override public List findByProjectEnvId(Long projectEnvId) { - List instances = workflowInstanceRepository.findByProjectEnvId(projectEnvId); - return instances.stream() - .map(workflowInstanceConverter::toDto) - .collect(Collectors.toList()); + return List.of(); } @Override public List findByProjectEnvIdAndStatus(Long projectEnvId, String status) { - List instances = workflowInstanceRepository.findByProjectEnvIdAndStatus(projectEnvId, status); - return instances.stream() - .map(workflowInstanceConverter::toDto) - .collect(Collectors.toList()); + return List.of(); } -} \ No newline at end of file +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowPermissionServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowPermissionServiceImpl.java index 097b5cef..5016d0f4 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowPermissionServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowPermissionServiceImpl.java @@ -103,15 +103,16 @@ public class WorkflowPermissionServiceImpl implements IWorkflowPermissionService } User user = userOpt.get(); - // 检查角色权限 - if (user.getRoleId() != null && workflowPermissionRepository - .existsByWorkflowDefinitionIdAndRoleIdAndType(workflowDefinitionId, user.getRoleId(), type)) { - return true; - } - - // 检查部门权限 - return user.getDepartmentId() != null && workflowPermissionRepository - .existsByWorkflowDefinitionIdAndDepartmentIdAndType(workflowDefinitionId, user.getDepartmentId(), type); +// // 检查角色权限 +// if (user.getRoleId() != null && workflowPermissionRepository +// .existsByWorkflowDefinitionIdAndRoleIdAndType(workflowDefinitionId, user.getRoleId(), type)) { +// return true; +// } +// +// // 检查部门权限 +// return user.getDepartmentId() != null && workflowPermissionRepository +// .existsByWorkflowDefinitionIdAndDepartmentIdAndType(workflowDefinitionId, user.getDepartmentId(), type); + return true; } @Override diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowVariableServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowVariableServiceImpl.java index 2f2f934b..f10a8a4b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowVariableServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowVariableServiceImpl.java @@ -1,88 +1,75 @@ package com.qqchen.deploy.backend.workflow.service.impl; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.workflow.entity.WorkflowVariable; import com.qqchen.deploy.backend.workflow.repository.IWorkflowVariableRepository; import com.qqchen.deploy.backend.workflow.service.IWorkflowVariableService; import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; -/** - * 工作流变量服务实现 - */ -@Slf4j @Service public class WorkflowVariableServiceImpl implements IWorkflowVariableService { @Resource private IWorkflowVariableRepository workflowVariableRepository; - @Resource - private ObjectMapper objectMapper; - @Override @Transactional public void saveVariables(Long workflowInstanceId, Map variables) { - variables.forEach((name, value) -> setVariable(workflowInstanceId, name, value)); + if (variables == null || variables.isEmpty()) { + throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID); + } + + // 删除旧的变量 + workflowVariableRepository.deleteByWorkflowInstanceId(workflowInstanceId); + + // 保存新的变量 + variables.forEach((name, value) -> { + WorkflowVariable variable = new WorkflowVariable(); + variable.setWorkflowInstanceId(workflowInstanceId); + variable.setName(name); + variable.setValue(value.toString()); + variable.setType(value.getClass().getSimpleName()); + workflowVariableRepository.save(variable); + }); } @Override public Map getVariables(Long workflowInstanceId) { - List variables = workflowVariableRepository.findByWorkflowInstanceId(workflowInstanceId); - Map result = new HashMap<>(variables.size()); - variables.forEach(variable -> { - try { - result.put(variable.getName(), deserializeValue(variable.getValue(), variable.getType())); - } catch (JsonProcessingException e) { - log.error("Failed to deserialize variable value: {}", variable.getName(), e); - throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_DESERIALIZE_ERROR); - } - }); - return result; + return workflowVariableRepository.findByWorkflowInstanceId(workflowInstanceId) + .stream() + .collect(Collectors.toMap( + WorkflowVariable::getName, + this::convertValue + )); } @Override public Object getVariable(Long workflowInstanceId, String name) { - return workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name) - .map(variable -> { - try { - return deserializeValue(variable.getValue(), variable.getType()); - } catch (JsonProcessingException e) { - log.error("Failed to deserialize variable value: {}", name, e); - throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_DESERIALIZE_ERROR); - } - }) - .orElse(null); + Optional variable = workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name); + return variable.map(this::convertValue).orElse(null); } @Override @Transactional public void setVariable(Long workflowInstanceId, String name, Object value) { - try { - WorkflowVariable variable = workflowVariableRepository - .findByWorkflowInstanceIdAndName(workflowInstanceId, name) - .orElseGet(() -> { - WorkflowVariable newVariable = new WorkflowVariable(); - newVariable.setWorkflowInstanceId(workflowInstanceId); - newVariable.setName(name); - return newVariable; - }); + WorkflowVariable variable = workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name) + .orElseGet(() -> { + WorkflowVariable newVar = new WorkflowVariable(); + newVar.setWorkflowInstanceId(workflowInstanceId); + newVar.setName(name); + return newVar; + }); - variable.setType(value.getClass().getName()); - variable.setValue(serializeValue(value)); - workflowVariableRepository.save(variable); - } catch (JsonProcessingException e) { - log.error("Failed to serialize variable value: {}", name, e); - throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_SERIALIZE_ERROR); - } + variable.setValue(value.toString()); + variable.setType(value.getClass().getSimpleName()); + workflowVariableRepository.save(variable); } @Override @@ -97,17 +84,24 @@ public class WorkflowVariableServiceImpl implements IWorkflowVariableService { workflowVariableRepository.deleteByWorkflowInstanceId(workflowInstanceId); } - private String serializeValue(Object value) throws JsonProcessingException { - return objectMapper.writeValueAsString(value); - } - - private Object deserializeValue(String value, String type) throws JsonProcessingException { + private Object convertValue(WorkflowVariable variable) { try { - Class clazz = Class.forName(type); - return objectMapper.readValue(value, clazz); - } catch (ClassNotFoundException e) { - log.error("Failed to find class for type: {}", type, e); - throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_ERROR); + switch (variable.getType()) { + case "String": + return variable.getValue(); + case "Integer": + return Integer.parseInt(variable.getValue()); + case "Long": + return Long.parseLong(variable.getValue()); + case "Double": + return Double.parseDouble(variable.getValue()); + case "Boolean": + return Boolean.parseBoolean(variable.getValue()); + default: + throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID); + } + } catch (Exception e) { + throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID); } } } \ No newline at end of file diff --git a/backend/src/main/resources/messages.properties b/backend/src/main/resources/messages.properties index 5962bbed..641bc79c 100644 --- a/backend/src/main/resources/messages.properties +++ b/backend/src/main/resources/messages.properties @@ -99,26 +99,14 @@ workflow.definition.already.published=工作流定义已发布 workflow.definition.cannot.delete=工作流定义已被使用,无法删除 workflow.instance.not.found=工作流实例不存在 -workflow.instance.already.started=工作流实例已启动 -workflow.instance.already.ended=工作流实例已结束 -workflow.instance.already.suspended=工作流实例已挂起 -workflow.instance.not.suspended=工作流实例未挂起 +workflow.instance.cannot.start=工作流实例无法启动 workflow.instance.cannot.cancel=工作流实例无法取消 -workflow.instance.cannot.suspend=工作流实例无法挂起 +workflow.instance.cannot.pause=工作流实例无法暂停 workflow.instance.cannot.resume=工作流实例无法恢复 +workflow.instance.cannot.retry=工作流实例无法重试 -workflow.node.not.found=工作流节点不存在 -workflow.node.type.not.supported=不支持的节点类型:{0} -workflow.node.config.invalid=节点配置无效:{0} -workflow.node.execution.failed=节点执行失败:{0} -workflow.node.timeout=节点执行超时 -workflow.node.approval.rejected=节点审批被拒绝 -workflow.node.approval.canceled=节点审批已取消 - -workflow.variable.not.found=工作流变量不存在 -workflow.variable.required=工作流变量"{0}"为必填项 -workflow.variable.invalid=工作流变量"{0}"的值无效 - -workflow.permission.denied=无权操作此工作流 -workflow.operation.not.allowed=当前状态不允许此操作 -workflow.concurrent.operation=工作流正在执行其他操作,请稍后重试 \ No newline at end of file +# 节点相关错误消息 +node.instance.not.found=节点实例不存在 +node.instance.cannot.retry=节点实例无法重试 +node.instance.cannot.skip=节点实例无法跳过 +node.executor.not.found=节点执行器不存在 \ No newline at end of file