From cc395cc0dd10c38d59edd0bb9482d13557b7e3aa Mon Sep 17 00:00:00 2001 From: dengqichen Date: Fri, 6 Dec 2024 16:20:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9=E5=89=8D=E7=AB=AF=E4=BC=A0?= =?UTF-8?q?=E9=80=92=E8=BF=87=E6=9D=A5=E7=9A=84config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/executor/EndNodeExecutor.java | 19 ++++++- .../engine/executor/GatewayNodeExecutor.java | 49 +++++++++++-------- .../engine/executor/StartNodeExecutor.java | 20 +++++++- .../node/executor/ShellNodeExecutor.java | 37 ++++++++++---- 4 files changed, 93 insertions(+), 32 deletions(-) diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java index 411d184c..d57331aa 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/EndNodeExecutor.java @@ -1,7 +1,10 @@ package com.qqchen.deploy.backend.workflow.engine.executor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext; +import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; @@ -9,6 +12,7 @@ import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; import jakarta.annotation.Resource; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -21,6 +25,9 @@ public class EndNodeExecutor implements NodeExecutor { @Resource private IWorkflowInstanceRepository workflowInstanceRepository; + @Resource + private ObjectMapper objectMapper; + @Override public NodeTypeEnum getNodeType() { return NodeTypeEnum.END; @@ -43,11 +50,21 @@ public class EndNodeExecutor implements NodeExecutor { @Override public void validate(String config) { - // 结束节点无需配置 + try { + // 验证配置格式是否正确 + EndConfig endConfig = objectMapper.readValue(config, EndConfig.class); + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } } @Override public void terminate(NodeInstance nodeInstance, WorkflowContext context) { // 结束节点无需终止操作 } + + @Data + public static class EndConfig { + // 结束节点无需特殊配置 + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/GatewayNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/GatewayNodeExecutor.java index 82329553..541b2462 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/GatewayNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/GatewayNodeExecutor.java @@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext; import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; @@ -15,7 +16,7 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; -import java.util.Map; + @Slf4j @Component @@ -35,22 +36,21 @@ public class GatewayNodeExecutor implements NodeExecutor { GatewayConfig config = objectMapper.readValue(nodeInstance.getConfig(), GatewayConfig.class); if (config.getType() == null) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Gateway type is required"); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } switch (config.getType()) { - case EXCLUSIVE: - handleExclusiveGateway(nodeInstance, context, config); - break; case PARALLEL: handleParallelGateway(nodeInstance, context, config); break; + case EXCLUSIVE: + handleExclusiveGateway(nodeInstance, context, config); + break; case INCLUSIVE: handleInclusiveGateway(nodeInstance, context, config); break; default: - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_TYPE_NOT_SUPPORTED, - "Unsupported gateway type: " + config.getType()); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } } catch (Exception e) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); @@ -67,8 +67,7 @@ public class GatewayNodeExecutor implements NodeExecutor { return; } } - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, - "No condition matched in exclusive gateway"); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED); } catch (Exception e) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); } @@ -97,8 +96,7 @@ public class GatewayNodeExecutor implements NodeExecutor { } } if (nextNodeIds.isEmpty()) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, - "No condition matched in inclusive gateway"); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED); } nodeInstance.setOutput(objectMapper.createObjectNode() .put("nextNodeIds", String.join(",", nextNodeIds)) @@ -119,17 +117,12 @@ public class GatewayNodeExecutor implements NodeExecutor { } } - @Override - public void terminate(NodeInstance nodeInstance, WorkflowContext context) { - // Gateway nodes are instant operations, no need to terminate - } - @Override public void validate(String config) { try { GatewayConfig gatewayConfig = objectMapper.readValue(config, GatewayConfig.class); if (gatewayConfig.getType() == null) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Gateway type is required"); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } // 验证条件表达式 if (gatewayConfig.getConditions() != null) { @@ -137,13 +130,29 @@ public class GatewayNodeExecutor implements NodeExecutor { try { expressionParser.parseExpression(condition.getExpression()); } catch (Exception e) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, - "Invalid condition expression: " + condition.getExpression()); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } } } } catch (Exception e) { - throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } } + + @Override + public void terminate(NodeInstance nodeInstance, WorkflowContext context) { + // Gateway nodes are instant operations, no need to terminate + } + + @Data + public static class GatewayConfig { + private GatewayType type; // 网关类型:PARALLEL, EXCLUSIVE, INCLUSIVE + private List conditions; // 网关条件 + } + + @Data + public static class GatewayCondition { + private String expression; // 条件表达式 + private String nextNodeId; // 下一个节点ID + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java index 7c5d47ba..9a262d0b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/executor/StartNodeExecutor.java @@ -1,10 +1,15 @@ package com.qqchen.deploy.backend.workflow.engine.executor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext; +import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; +import jakarta.annotation.Resource; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -12,6 +17,9 @@ import org.springframework.stereotype.Component; @Component public class StartNodeExecutor implements NodeExecutor { + @Resource + private ObjectMapper objectMapper; + @Override public NodeTypeEnum getNodeType() { return NodeTypeEnum.START; @@ -26,11 +34,21 @@ public class StartNodeExecutor implements NodeExecutor { @Override public void validate(String config) { - // 开始节点无需配置 + try { + // 验证配置格式是否正确 + StartConfig startConfig = objectMapper.readValue(config, StartConfig.class); + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } } @Override public void terminate(NodeInstance nodeInstance, WorkflowContext context) { // 开始节点无需终止操作 } + + @Data + public static class StartConfig { + // 开始节点无需特殊配置 + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java index 83816ec7..3f20d868 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java @@ -38,9 +38,15 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { public void validate(String config) { try { ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class); + // 验证执行器类型 + if (!"SHELL".equals(shellConfig.getExecutor())) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + // 验证脚本内容 if (shellConfig.getScript() == null || shellConfig.getScript().trim().isEmpty()) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } + // 验证其他参数 if (shellConfig.getTimeout() != null && shellConfig.getTimeout() <= 0) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); } @@ -61,6 +67,11 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { String configJson = nodeInstance.getConfig(); ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class); + // 验证执行器类型 + if (!"SHELL".equals(config.getExecutor())) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + // 设置重试次数和间隔 int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1; long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0; @@ -83,7 +94,9 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { } // 如果所有重试都失败,抛出最后一个异常 - throw lastException; + if (lastException != null) { + throw lastException; + } } catch (Exception e) { throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); @@ -95,12 +108,12 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { processBuilder.command("sh", "-c", config.getScript()); // 设置工作目录 - if (config.getWorkingDirectory() != null) { + if (config.getWorkingDirectory() != null && !config.getWorkingDirectory().trim().isEmpty()) { processBuilder.directory(new java.io.File(config.getWorkingDirectory())); } // 设置环境变量 - if (config.getEnvironment() != null) { + if (config.getEnvironment() != null && !config.getEnvironment().isEmpty()) { Map env = processBuilder.environment(); env.putAll(config.getEnvironment()); } @@ -141,6 +154,9 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { if (!error.isEmpty()) { nodeInstance.setError(String.join("\n", error)); } + + // 记录执行日志 + context.log(String.format("Shell script executed successfully with exit code: %d", exitCode), LogLevelEnum.INFO); } private List readOutput(java.io.InputStream inputStream) throws Exception { @@ -162,12 +178,13 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { @Data public static class ShellConfig { - private String script; - private String workingDirectory; // 保持与前端一致 - private Integer retryTimes; // 重试次数 - private Integer retryInterval; // 重试间隔(秒) - private Integer timeout; // 超时时间(秒) - private Map environment; - private Integer successExitCode; + private String executor; // 执行器类型 + private String script; // 脚本内容 + private String workingDirectory; // 工作目录 + private Integer retryTimes; // 重试次数 + private Integer retryInterval; // 重试间隔(秒) + private Integer timeout; // 超时时间(秒) + private Map environment; // 环境变量 + private Integer successExitCode; // 成功退出码 } } \ No newline at end of file