diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/DefaultWorkflowEngine.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/DefaultWorkflowEngine.java index e9414b61..4225bbaa 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/DefaultWorkflowEngine.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/DefaultWorkflowEngine.java @@ -1,11 +1,16 @@ package com.qqchen.deploy.backend.workflow.engine; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.workflow.engine.context.DefaultWorkflowContext; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.engine.executor.NodeExecutor; +import com.qqchen.deploy.backend.workflow.engine.parser.WorkflowDefinitionParser; +import com.qqchen.deploy.backend.workflow.entity.NodeConfig; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; +import com.qqchen.deploy.backend.workflow.entity.TransitionConfig; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; @@ -22,6 +27,8 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,39 +51,51 @@ public class DefaultWorkflowEngine implements WorkflowEngine { @Resource private WorkflowVariableOperations variableOperations; + @Resource + private ObjectMapper objectMapper; + + @Resource + private WorkflowDefinitionParser workflowDefinitionParser; + @Override @Transactional public WorkflowInstance startWorkflow(String workflowCode, String businessKey, Map variables) { - // 1. 检查工作流定义 + // 1. 获取工作流定义 WorkflowDefinition definition = workflowDefinitionRepository.findByCodeAndDeletedFalse(workflowCode); if (definition == null) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NOT_FOUND); } - // 检查工作流定义状态 + // 2. 检查工作流定义状态 if (definition.getStatus() != WorkflowDefinitionStatusEnum.PUBLISHED) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NOT_PUBLISHED); } - // 2. 创建工作流实例 + // 3. 创建工作流实例 WorkflowInstance instance = new WorkflowInstance(); instance.setWorkflowDefinition(definition); instance.setBusinessKey(businessKey); - // 设置工作流实例初始状态为 PENDING - instance.setStatus(WorkflowInstanceStatusEnum.PENDING); + instance.setStatus(WorkflowInstanceStatusEnum.RUNNING); + instance.setCreateTime(LocalDateTime.now()); workflowInstanceRepository.save(instance); - // 3. 设置工作流变量 + // 4. 初始化工作流变量 if (variables != null && !variables.isEmpty()) { variableOperations.setVariables(instance.getId(), variables); } - // 4. 创建开始节点实例并启动工作流 - NodeInstance startNode = createStartNode(definition, instance); - instance.start(); - workflowInstanceRepository.save(instance); - executeNode(startNode.getId()); + // 5. 创建并执行开始节点 + NodeInstance startNode = new NodeInstance(); + startNode.setWorkflowInstance(instance); + startNode.setNodeId("start"); + startNode.setNodeType(NodeTypeEnum.START); + startNode.setName("开始节点"); + startNode.setStatus(NodeStatusEnum.PENDING); + startNode.setCreateTime(LocalDateTime.now()); + nodeInstanceRepository.save(startNode); + // 6. 执行开始节点 + executeNode(startNode.getId()); return instance; } @@ -97,12 +116,12 @@ public class DefaultWorkflowEngine implements WorkflowEngine { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTOR_NOT_FOUND); } - // 创建工作流上下文 + // 创建执行上下文 WorkflowContextOperations context = DefaultWorkflowContext.builder() .workflowInstance(instance) .variableOperations(variableOperations) .build(); - + // 执行节点 executor.execute(nodeInstance, context); @@ -111,16 +130,60 @@ public class DefaultWorkflowEngine implements WorkflowEngine { nodeInstance.setEndTime(LocalDateTime.now()); nodeInstanceRepository.save(nodeInstance); - // 4. 检查是否所有节点都已完成 + // 4. 获取并执行后续节点 + WorkflowDefinition definition = instance.getWorkflowDefinition(); + List transitions = workflowDefinitionParser.parseTransitionConfig(definition.getTransitionConfig()); + List nodeConfigs = workflowDefinitionParser.parseNodeConfig(definition.getNodeConfig()); + + // 获取当前节点的后续节点 + List nextNodeIds = transitions.stream() + .filter(t -> t.getSourceNodeId().equals(nodeInstance.getNodeId())) + .map(TransitionConfig::getTargetNodeId) + .toList(); + + // 创建并执行后续节点 + for (String nextNodeId : nextNodeIds) { + NodeConfig nodeConfig = nodeConfigs.stream() + .filter(n -> n.getNodeId().equals(nextNodeId)) + .findFirst() + .orElse(null); + + if (nodeConfig == null) { + log.error("Node configuration not found for node: {}", nextNodeId); + continue; + } + + createAndExecuteNextNode(instance, nextNodeId, nodeConfig); + } + + // 5. 检查是否所有节点都已完成 List uncompletedNodes = nodeInstanceRepository.findByWorkflowInstanceAndStatusNot( instance, NodeStatusEnum.COMPLETED); - + if (uncompletedNodes.isEmpty()) { instance.complete(); workflowInstanceRepository.save(instance); } } + private void createAndExecuteNextNode(WorkflowInstance instance, String nextNodeId, NodeConfig nodeConfig) { + try { + NodeInstance nextNode = new NodeInstance(); + nextNode.setNodeId(nextNodeId); + nextNode.setWorkflowInstance(instance); + nextNode.setNodeType(nodeConfig.getType()); + nextNode.setName(nodeConfig.getName()); + nextNode.setConfig(objectMapper.writeValueAsString(nodeConfig.getConfig())); + nextNode.setStatus(NodeStatusEnum.PENDING); + nodeInstanceRepository.save(nextNode); + + // 递归执行后续节点 + executeNode(nextNode.getId()); + } catch (JsonProcessingException e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e); + } + } + @Override @Transactional public void completeNode(Long nodeInstanceId, Map variables) { @@ -128,7 +191,7 @@ public class DefaultWorkflowEngine implements WorkflowEngine { .orElseThrow(() -> new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_NOT_FOUND)); WorkflowInstance instance = nodeInstance.getWorkflowInstance(); - + // 设置节点输出变量 if (variables != null && !variables.isEmpty()) { variableOperations.setVariables(instance.getId(), variables); @@ -144,10 +207,10 @@ public class DefaultWorkflowEngine implements WorkflowEngine { public void terminateWorkflow(Long instanceId, String reason) { WorkflowInstance instance = workflowInstanceRepository.findById(instanceId) .orElseThrow(() -> new WorkflowEngineException(ResponseCode.WORKFLOW_INSTANCE_NOT_FOUND)); - + instance.terminate(reason); workflowInstanceRepository.save(instance); - + // 清理上下文缓存 variableOperations.clearVariables(instance.getId()); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java deleted file mode 100644 index c131da70..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.qqchen.deploy.backend.workflow.engine.executor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.qqchen.deploy.backend.framework.enums.ResponseCode; -import com.qqchen.deploy.backend.system.enums.LogLevelEnum; -import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; -import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; -import com.qqchen.deploy.backend.workflow.entity.NodeInstance; -import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; -import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; -import jakarta.annotation.Resource; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class StartNodeExecutor implements NodeExecutor { - - @Resource - private ObjectMapper objectMapper; - - @Override - public NodeTypeEnum getNodeType() { - return NodeTypeEnum.START; - } - - @Override - public void execute(NodeInstance nodeInstance, WorkflowContextOperations context) { - // 开始节点直接完成 - nodeInstance.setStatus(NodeStatusEnum.COMPLETED); - context.log("开始节点执行完成", LogLevelEnum.INFO); - } - - @Override - public void validate(String config) { - try { - // 验证配置格式是否正确 - StartConfig startConfig = objectMapper.readValue(config, StartConfig.class); - } catch (Exception e) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); - } - } - - @Override - public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) { - // 开始节点无需终止操作 - } - - @Data - public static class StartConfig { - // 开始节点无需特殊配置 - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/NodeConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/NodeConfig.java index f36b3d3a..68ae8a93 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/NodeConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/NodeConfig.java @@ -12,6 +12,7 @@ import lombok.Data; @JsonSubTypes({ @JsonSubTypes.Type(value = ApprovalNodeConfig.class, name = "APPROVAL"), @JsonSubTypes.Type(value = ScriptNodeConfig.class, name = "SCRIPT"), + @JsonSubTypes.Type(value = ShellNodeConfig.class, name = "SHELL"), @JsonSubTypes.Type(value = JenkinsNodeConfig.class, name = "JENKINS"), @JsonSubTypes.Type(value = GitNodeConfig.class, name = "GIT"), @JsonSubTypes.Type(value = ConditionNodeConfig.class, name = "CONDITION"), diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java index a02bda49..ca82a3dc 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ScriptNodeConfig.java @@ -5,15 +5,63 @@ import lombok.EqualsAndHashCode; import java.util.Map; +/** + * 脚本节点配置 + * 支持多种脚本语言(Python, Shell, JavaScript等) + */ @Data @EqualsAndHashCode(callSuper = true) public class ScriptNodeConfig extends NodeConfig { + /** + * 脚本内容 + */ private String script; + + /** + * 脚本语言:python/shell/javascript + */ private String language; + + /** + * 解释器路径(可选) + * 例如:/usr/local/bin/python3 + */ + private String interpreter; + + /** + * 工作目录 + */ private String workingDirectory; + + /** + * 超时时间(秒) + */ private Integer timeout; + + /** + * 环境变量 + */ private Map environment; - private Integer successExitCode; + + /** + * 成功退出码 + */ + private Integer successExitCode = 0; + + /** + * 重试次数 + */ + private Integer retryTimes; + + /** + * 重试间隔(秒) + */ + private Integer retryInterval; + + /** + * 脚本参数 + */ + private Map arguments; public String getScript() { return script; @@ -22,4 +70,4 @@ public class ScriptNodeConfig extends NodeConfig { public Map getEnvironment() { return environment; } -} \ No newline at end of file +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ShellNodeConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ShellNodeConfig.java new file mode 100644 index 00000000..aa4fa076 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/ShellNodeConfig.java @@ -0,0 +1,55 @@ +package com.qqchen.deploy.backend.workflow.engine.node; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Map; + +/** + * Shell节点配置 + * @deprecated 请使用 {@link ScriptNodeConfig} 替代,设置 language="shell" + */ +@Deprecated(since = "1.0", forRemoval = true) +@Data +@EqualsAndHashCode(callSuper = true) +public class ShellNodeConfig extends NodeConfig { + /** + * 执行器类型,固定为 SHELL + */ + private String executor = "SHELL"; + + /** + * Shell脚本内容 + */ + private String script; + + /** + * 工作目录 + */ + private String workingDirectory; + + /** + * 超时时间(秒) + */ + private Integer timeout; + + /** + * 重试次数 + */ + private Integer retryTimes; + + /** + * 重试间隔(秒) + */ + private Integer retryInterval; + + /** + * 环境变量 + */ + private Map environment; + + /** + * 成功退出码 + */ + private Integer successExitCode = 0; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/EndNodeExecutor.java similarity index 53% rename from backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java rename to backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/EndNodeExecutor.java index ec6f63bc..226a6c59 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/EndNodeExecutor.java @@ -1,70 +1,55 @@ -package com.qqchen.deploy.backend.workflow.engine.executor; +package com.qqchen.deploy.backend.workflow.engine.node.executor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; -import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; +import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; -import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnum; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; import jakarta.annotation.Resource; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalDateTime; +/** + * 结束节点执行器 + * 负责完成工作流实例的执行 + */ @Slf4j -@Component -public class EndNodeExecutor implements NodeExecutor { +@Component("endNodeExecutor") +public class EndNodeExecutor extends AbstractNodeExecutor { @Resource private IWorkflowInstanceRepository workflowInstanceRepository; - @Resource - private ObjectMapper objectMapper; - @Override public NodeTypeEnum getNodeType() { return NodeTypeEnum.END; } @Override - public void execute(NodeInstance nodeInstance, WorkflowContextOperations context) { - // 1. 完成结束节点 - nodeInstance.setStatus(NodeStatusEnum.COMPLETED); - nodeInstance.setEndTime(LocalDateTime.now()); - - // 2. 完成工作流实例 + public void validate(String config) { + // 结束节点不需要配置,无需验证 + } + + @Override + protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) { + // 完成工作流实例 WorkflowInstance instance = nodeInstance.getWorkflowInstance(); instance.setStatus(WorkflowInstanceStatusEnum.COMPLETED); instance.setEndTime(LocalDateTime.now()); workflowInstanceRepository.save(instance); + // 记录日志 context.log("工作流执行完成", LogLevelEnum.INFO); } - @Override - public void validate(String config) { - try { - // 验证配置格式是否正确 - EndConfig endConfig = objectMapper.readValue(config, EndConfig.class); - } catch (Exception e) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); - } - } - @Override public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) { - // 结束节点无需终止操作 + // 结束节点不需要终止逻辑 + context.log("结束节点无需终止操作", LogLevelEnum.INFO); } - - @Data - public static class EndConfig { - // 结束节点无需特殊配置 - } -} \ No newline at end of file +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ScriptNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ScriptNodeExecutor.java new file mode 100644 index 00000000..fdf30f64 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ScriptNodeExecutor.java @@ -0,0 +1,201 @@ +package com.qqchen.deploy.backend.workflow.engine.node.executor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.system.enums.LogLevelEnum; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; +import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; +import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; +import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; +import com.qqchen.deploy.backend.workflow.engine.node.ScriptNodeConfig; +import com.qqchen.deploy.backend.workflow.entity.NodeInstance; +import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; +import com.qqchen.deploy.backend.workflow.service.WorkflowVariableOperations; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import jakarta.annotation.Resource; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +/** + * 脚本节点执行器 + * 支持多种脚本语言(Python, Shell, JavaScript等) + */ +@Slf4j +@Component +public class ScriptNodeExecutor extends AbstractNodeExecutor { + + @Resource + private ObjectMapper objectMapper; + + @Resource + private WorkflowVariableOperations variableOperations; + + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + @Override + public NodeTypeEnum getNodeType() { + return NodeTypeEnum.SCRIPT; + } + + @Override + public void validate(String config) { + try { + ScriptNodeConfig scriptConfig = objectMapper.readValue(config, ScriptNodeConfig.class); + // 验证脚本内容 + if (scriptConfig.getScript() == null || scriptConfig.getScript().trim().isEmpty()) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Script content cannot be empty"); + } + // 验证脚本语言 + if (scriptConfig.getLanguage() == null || scriptConfig.getLanguage().trim().isEmpty()) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Script language must be specified"); + } + // 验证其他参数 + if (scriptConfig.getTimeout() != null && scriptConfig.getTimeout() <= 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Timeout must be positive"); + } + if (scriptConfig.getRetryTimes() != null && scriptConfig.getRetryTimes() < 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Retry times cannot be negative"); + } + if (scriptConfig.getRetryInterval() != null && scriptConfig.getRetryInterval() < 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Retry interval cannot be negative"); + } + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e); + } + } + + @Override + protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) { + try { + String configJson = nodeInstance.getConfig(); + ScriptNodeConfig config = objectMapper.readValue(configJson, ScriptNodeConfig.class); + + // 设置重试次数和间隔 + int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1; + long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0; + + Exception lastException = null; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + try { + // 执行脚本 + executeScript(config, nodeInstance, context); + return; // 执行成功,直接返回 + } catch (Exception e) { + lastException = e; + if (attempt < maxAttempts) { + context.log(String.format("Script execution failed (attempt %d/%d), retrying in %d seconds", + attempt, maxAttempts, retryInterval), LogLevelEnum.WARN); + Thread.sleep(retryInterval * 1000L); + } + } + } + + // 如果所有重试都失败,抛出最后一个异常 + if (lastException != null) { + throw lastException; + } + + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); + } + } + + private void executeScript(ScriptNodeConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(); + + // 根据脚本语言设置命令 + List command = new ArrayList<>(); + switch (config.getLanguage().toLowerCase()) { + case "python": + command.add(config.getInterpreter() != null ? config.getInterpreter() : "python"); + command.add("-c"); + command.add(config.getScript()); + break; + case "shell": + command.add("sh"); + command.add("-c"); + command.add(config.getScript()); + break; + // TODO: 添加其他语言支持 + default: + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, + "Unsupported script language: " + config.getLanguage()); + } + + processBuilder.command(command); + + // 设置工作目录 + if (config.getWorkingDirectory() != null && !config.getWorkingDirectory().trim().isEmpty()) { + processBuilder.directory(new java.io.File(config.getWorkingDirectory())); + } + + // 设置环境变量 + if (config.getEnvironment() != null && !config.getEnvironment().isEmpty()) { + Map env = processBuilder.environment(); + env.putAll(config.getEnvironment()); + } + + Process process = processBuilder.start(); + + // 创建用于读取输出的Future + Future> outputFuture = executorService.submit(() -> readOutput(process.getInputStream())); + Future> errorFuture = executorService.submit(() -> readOutput(process.getErrorStream())); + + // 等待进程执行完成或超时 + boolean completed = true; + if (config.getTimeout() != null && config.getTimeout() > 0) { + completed = process.waitFor(config.getTimeout(), TimeUnit.SECONDS); + if (!completed) { + process.destroyForcibly(); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, + String.format("Script execution timed out after %d seconds", config.getTimeout())); + } + } else { + process.waitFor(); + } + + // 获取输出结果 + List output = outputFuture.get(5, TimeUnit.SECONDS); // 给5秒时间读取输出 + List error = errorFuture.get(5, TimeUnit.SECONDS); + + // 检查退出码 + int exitCode = process.exitValue(); + if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, + String.format("Script execution failed with exit code: %d%nError output: %s", + exitCode, String.join("\n", error))); + } + + // 设置输出变量 + Map outputVariables = new HashMap<>(); + outputVariables.put("scriptOutput", String.join("\n", output)); + outputVariables.put("exitCode", exitCode); + variableOperations.setVariables(nodeInstance.getWorkflowInstance().getId(), outputVariables); + + // 记录执行日志 + context.log(String.format("Script executed successfully with exit code: %d", exitCode), LogLevelEnum.INFO); + } + + private List readOutput(java.io.InputStream inputStream) throws Exception { + List output = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + String line; + while ((line = reader.readLine()) != null) { + output.add(line); + } + } + return output; + } + + @Override + public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) { + // TODO: 实现终止脚本进程的逻辑 + context.log("Script node termination is not implemented yet", LogLevelEnum.WARN); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java index c8109ecc..a4d98a45 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java @@ -6,10 +6,10 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; +import com.qqchen.deploy.backend.workflow.engine.node.ShellNodeConfig; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.service.WorkflowVariableOperations; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import jakarta.annotation.Resource; @@ -22,6 +22,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.*; +/** + * Shell节点执行器 + * @deprecated 请使用 {@link ScriptNodeExecutor} 替代 + */ +@Deprecated(since = "1.0", forRemoval = true) @Slf4j @Component public class ShellNodeExecutor extends AbstractNodeExecutor { @@ -42,7 +47,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { @Override public void validate(String config) { try { - ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class); + ShellNodeConfig shellConfig = objectMapper.readValue(config, ShellNodeConfig.class); // 验证执行器类型 if (!"SHELL".equals(shellConfig.getExecutor())) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); @@ -70,7 +75,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) { try { String configJson = nodeInstance.getConfig(); - ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class); + ShellNodeConfig config = objectMapper.readValue(configJson, ShellNodeConfig.class); // 验证执行器类型 if (!"SHELL".equals(config.getExecutor())) { @@ -106,7 +111,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { } } - private void executeShellCommand(ShellConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception { + private void executeShellCommand(ShellNodeConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception { ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command("sh", "-c", config.getScript()); @@ -178,16 +183,4 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { // TODO: 实现终止Shell进程的逻辑 context.log("Shell node termination is not implemented yet", LogLevelEnum.WARN); } - - @Data - public static class ShellConfig { - private String executor; // 执行器类型 - private String script; // 脚本内容 - private String workingDirectory; // 工作目录 - private Integer retryTimes; // 重试次数 - private Integer retryInterval; // 重试间隔(秒) - private Integer timeout; // 超时时间(秒) - private Map environment; // 环境变量 - private Integer successExitCode; // 成功退出码 - } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/StartNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/StartNodeExecutor.java new file mode 100644 index 00000000..ce1468d0 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/StartNodeExecutor.java @@ -0,0 +1,58 @@ +package com.qqchen.deploy.backend.workflow.engine.node.executor; + +import com.qqchen.deploy.backend.system.enums.LogLevelEnum; +import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations; +import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; +import com.qqchen.deploy.backend.workflow.entity.NodeInstance; +import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 开始节点执行器 + * 负责工作流实例的初始化工作 + */ +@Slf4j +@Component("startNodeExecutor") +public class StartNodeExecutor extends AbstractNodeExecutor { + + @Override + public NodeTypeEnum getNodeType() { + return NodeTypeEnum.START; + } + + @Override + public void validate(String config) { + // 开始节点不需要配置,无需验证 + } + + @Override + protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) { + WorkflowInstance instance = context.getInstance(); + + // 记录启动日志 + String message = String.format( + "工作流[%s]开始执行,实例ID: %d,业务键: %s", + instance.getWorkflowDefinition().getName(), + instance.getId(), + instance.getBusinessKey() + ); + context.log(message, LogLevelEnum.INFO); + + // 记录启动时间 + log.info("Workflow instance {} started at {}", instance.getId(), instance.getCreateTime()); + } + + @Override + public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) { + // 开始节点的终止意味着整个工作流的终止 + WorkflowInstance instance = nodeInstance.getWorkflowInstance(); + String message = String.format( + "工作流[%s]在启动阶段被终止,实例ID: %d", + instance.getWorkflowDefinition().getName(), + instance.getId() + ); + context.log(message, LogLevelEnum.WARN); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java index 1477441b..0629ea4e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnum.java @@ -16,11 +16,12 @@ public enum NodeTypeEnum { GATEWAY(3, "GATEWAY", "网关节点"), SUB_PROCESS(4, "SUB_PROCESS", "子流程节点"), SHELL(5, "SHELL", "Shell脚本节点"), - APPROVAL(6, "APPROVAL", "审批节点"), - JENKINS(7, "JENKINS", "Jenkins任务节点"), - GIT(8, "GIT", "Git操作节点"); + SCRIPT(6, "SCRIPT", "脚本节点"), + APPROVAL(7, "APPROVAL", "审批节点"), + JENKINS(8, "JENKINS", "Jenkins任务节点"), + GIT(9, "GIT", "Git操作节点"); private final int value; private final String code; private final String description; -} \ No newline at end of file +} \ No newline at end of file diff --git a/backend/src/test/java/com/qqchen/deploy/backend/workflow/engine/WorkflowShellTest.java b/backend/src/test/java/com/qqchen/deploy/backend/workflow/engine/WorkflowShellTest.java new file mode 100644 index 00000000..95b47791 --- /dev/null +++ b/backend/src/test/java/com/qqchen/deploy/backend/workflow/engine/WorkflowShellTest.java @@ -0,0 +1,36 @@ +package com.qqchen.deploy.backend.workflow.engine; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; +import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; +import jakarta.annotation.Resource; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.core.io.ClassPathResource; + +import java.nio.file.Files; +import java.util.HashMap; + +@SpringBootTest +public class WorkflowShellTest { + + @Resource + private WorkflowEngine workflowEngine; + + @Resource + private IWorkflowDefinitionRepository workflowDefinitionRepository; + + @Resource + private ObjectMapper objectMapper; + + @Test + public void testShellWorkflow() throws Exception { + // 1. 加载工作流定义 + String json = Files.readString(new ClassPathResource("workflow-shell-test.json").getFile().toPath()); + WorkflowDefinition definition = objectMapper.readValue(json, WorkflowDefinition.class); + workflowDefinitionRepository.save(definition); + + // 2. 启动工作流实例 + workflowEngine.startWorkflow(definition.getId(), "TEST-" + System.currentTimeMillis(), new HashMap<>()); + } +} diff --git a/backend/src/test/resources/workflow-shell-test.json b/backend/src/test/resources/workflow-shell-test.json new file mode 100644 index 00000000..f851460e --- /dev/null +++ b/backend/src/test/resources/workflow-shell-test.json @@ -0,0 +1,42 @@ +{ + "id": "test-shell-workflow", + "name": "Shell测试工作流", + "description": "测试Shell节点执行", + "nodes": { + "start": { + "id": "start", + "name": "开始", + "type": "START" + }, + "shell1": { + "id": "shell1", + "name": "Shell命令", + "type": "SHELL", + "config": { + "executor": "SHELL", + "script": "echo 'Hello World' && ls -la", + "timeout": 30, + "retryTimes": 3, + "retryInterval": 5, + "successExitCode": 0 + } + }, + "end": { + "id": "end", + "name": "结束", + "type": "END" + } + }, + "transitions": { + "start-to-shell": { + "from": "start", + "to": "shell1", + "condition": null + }, + "shell-to-end": { + "from": "shell1", + "to": "end", + "condition": null + } + } +}