From 41975feedca72d9e44145f7340cb959c64b5f42e Mon Sep 17 00:00:00 2001 From: dengqichen Date: Fri, 6 Dec 2024 13:04:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../node/executor/ShellNodeExecutor.java | 187 +++++++++++------- 1 file changed, 113 insertions(+), 74 deletions(-) diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java index 1b13d2d2..83816ec7 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/engine/node/executor/ShellNodeExecutor.java @@ -6,6 +6,9 @@ import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.system.enums.LogLevelEnum; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; +import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import jakarta.annotation.Resource; @@ -15,6 +18,7 @@ import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.*; @Slf4j @Component @@ -23,6 +27,8 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { @Resource private ObjectMapper objectMapper; + private final ExecutorService executorService = Executors.newCachedThreadPool(); + @Override public NodeTypeEnum getNodeType() { return NodeTypeEnum.SHELL; @@ -30,7 +36,23 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { @Override public void validate(String config) { - + try { + ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class); + if (shellConfig.getScript() == null || shellConfig.getScript().trim().isEmpty()) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + if (shellConfig.getTimeout() != null && shellConfig.getTimeout() <= 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + if (shellConfig.getRetryTimes() != null && shellConfig.getRetryTimes() < 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + if (shellConfig.getRetryInterval() != null && shellConfig.getRetryInterval() < 0) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); + } } @Override @@ -38,52 +60,98 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { try { String configJson = nodeInstance.getConfig(); ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class); - ProcessBuilder processBuilder = new ProcessBuilder(); - processBuilder.command("sh", "-c", config.getScript()); - if (config.getWorkingDirectory() != null) { - processBuilder.directory(new java.io.File(config.getWorkingDirectory())); - } - - if (config.getEnvironment() != null) { - Map env = processBuilder.environment(); - env.putAll(config.getEnvironment()); - } - - Process process = processBuilder.start(); + // 设置重试次数和间隔 + int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1; + long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0; - // 读取标准输出 - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - List output = new ArrayList<>(); + Exception lastException = null; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + try { + // 执行Shell命令 + executeShellCommand(config, nodeInstance, context); + return; // 执行成功,直接返回 + } catch (Exception e) { + lastException = e; + if (attempt < maxAttempts) { + // 记录重试日志 + context.log(String.format("Shell execution failed (attempt %d/%d), retrying in %d seconds", + attempt, maxAttempts, retryInterval), LogLevelEnum.WARN); + Thread.sleep(retryInterval * 1000L); + } + } + } + + // 如果所有重试都失败,抛出最后一个异常 + throw lastException; + + } catch (Exception e) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); + } + } + + private void executeShellCommand(ShellConfig config, NodeInstance nodeInstance, WorkflowContext context) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command("sh", "-c", config.getScript()); + + // 设置工作目录 + if (config.getWorkingDirectory() != null) { + processBuilder.directory(new java.io.File(config.getWorkingDirectory())); + } + + // 设置环境变量 + if (config.getEnvironment() != null) { + Map env = processBuilder.environment(); + env.putAll(config.getEnvironment()); + } + + Process process = processBuilder.start(); + + // 创建用于读取输出的Future + Future> outputFuture = executorService.submit(() -> readOutput(process.getInputStream())); + Future> errorFuture = executorService.submit(() -> readOutput(process.getErrorStream())); + + // 等待进程执行完成或超时 + boolean completed = true; + if (config.getTimeout() != null && config.getTimeout() > 0) { + completed = process.waitFor(config.getTimeout(), TimeUnit.SECONDS); + if (!completed) { + process.destroyForcibly(); + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, + String.format("Shell execution timed out after %d seconds", config.getTimeout())); + } + } else { + process.waitFor(); + } + + // 获取输出结果 + List output = outputFuture.get(5, TimeUnit.SECONDS); // 给5秒时间读取输出 + List error = errorFuture.get(5, TimeUnit.SECONDS); + + // 检查退出码 + int exitCode = process.exitValue(); + if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) { + throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, + String.format("Shell execution failed with exit code: %d%nError output: %s", + exitCode, String.join("\n", error))); + } + + // 设置输出结果 + nodeInstance.setOutput(String.join("\n", output)); + if (!error.isEmpty()) { + nodeInstance.setError(String.join("\n", error)); + } + } + + private List readOutput(java.io.InputStream inputStream) throws Exception { + List output = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { String line; while ((line = reader.readLine()) != null) { output.add(line); } - - // 读取错误输出 - BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); - List error = new ArrayList<>(); - while ((line = errorReader.readLine()) != null) { - error.add(line); - } - - // 等待进程结束 - int exitCode = process.waitFor(); - - // 检查退出码 - if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) { - throw new RuntimeException("Shell script execution failed with exit code: " + exitCode + - "\nError output: " + String.join("\n", error)); - } - - // 设置输出结果 - nodeInstance.setOutput(String.join("\n", output)); - if (!error.isEmpty()) { - nodeInstance.setError(String.join("\n", error)); - } - } catch (Exception e) { - throw new RuntimeException("Shell script execution failed", e); } + return output; } @Override @@ -92,43 +160,14 @@ public class ShellNodeExecutor extends AbstractNodeExecutor { context.log("Shell node termination is not implemented yet", LogLevelEnum.WARN); } + @Data public static class ShellConfig { private String script; - private String workingDirectory; + private String workingDirectory; // 保持与前端一致 + private Integer retryTimes; // 重试次数 + private Integer retryInterval; // 重试间隔(秒) + private Integer timeout; // 超时时间(秒) private Map environment; private Integer successExitCode; - - // Getters and setters - public String getScript() { - return script; - } - - public void setScript(String script) { - this.script = script; - } - - public String getWorkingDirectory() { - return workingDirectory; - } - - public void setWorkingDirectory(String workingDirectory) { - this.workingDirectory = workingDirectory; - } - - public Map getEnvironment() { - return environment; - } - - public void setEnvironment(Map environment) { - this.environment = environment; - } - - public Integer getSuccessExitCode() { - return successExitCode; - } - - public void setSuccessExitCode(Integer successExitCode) { - this.successExitCode = successExitCode; - } } } \ No newline at end of file