增加工作流实例实现

This commit is contained in:
dengqichen 2024-12-06 13:04:10 +08:00
parent 656900d1db
commit 41975feedc

View File

@ -6,6 +6,9 @@ import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -15,6 +18,7 @@ import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*;
@Slf4j @Slf4j
@Component @Component
@ -23,6 +27,8 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
@Resource @Resource
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private final ExecutorService executorService = Executors.newCachedThreadPool();
@Override @Override
public NodeTypeEnum getNodeType() { public NodeTypeEnum getNodeType() {
return NodeTypeEnum.SHELL; return NodeTypeEnum.SHELL;
@ -30,7 +36,23 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
@Override @Override
public void validate(String config) { public void validate(String config) {
try {
ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class);
if (shellConfig.getScript() == null || shellConfig.getScript().trim().isEmpty()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
if (shellConfig.getTimeout() != null && shellConfig.getTimeout() <= 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
if (shellConfig.getRetryTimes() != null && shellConfig.getRetryTimes() < 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
if (shellConfig.getRetryInterval() != null && shellConfig.getRetryInterval() < 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
} }
@Override @Override
@ -38,52 +60,98 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
try { try {
String configJson = nodeInstance.getConfig(); String configJson = nodeInstance.getConfig();
ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class); ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class);
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sh", "-c", config.getScript());
if (config.getWorkingDirectory() != null) { // 设置重试次数和间隔
processBuilder.directory(new java.io.File(config.getWorkingDirectory())); int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1;
} long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0;
if (config.getEnvironment() != null) {
Map<String, String> env = processBuilder.environment();
env.putAll(config.getEnvironment());
}
Process process = processBuilder.start();
// 读取标准输出 Exception lastException = null;
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); for (int attempt = 1; attempt <= maxAttempts; attempt++) {
List<String> output = new ArrayList<>(); try {
// 执行Shell命令
executeShellCommand(config, nodeInstance, context);
return; // 执行成功直接返回
} catch (Exception e) {
lastException = e;
if (attempt < maxAttempts) {
// 记录重试日志
context.log(String.format("Shell execution failed (attempt %d/%d), retrying in %d seconds",
attempt, maxAttempts, retryInterval), LogLevelEnum.WARN);
Thread.sleep(retryInterval * 1000L);
}
}
}
// 如果所有重试都失败抛出最后一个异常
throw lastException;
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e);
}
}
private void executeShellCommand(ShellConfig config, NodeInstance nodeInstance, WorkflowContext context) throws Exception {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sh", "-c", config.getScript());
// 设置工作目录
if (config.getWorkingDirectory() != null) {
processBuilder.directory(new java.io.File(config.getWorkingDirectory()));
}
// 设置环境变量
if (config.getEnvironment() != null) {
Map<String, String> env = processBuilder.environment();
env.putAll(config.getEnvironment());
}
Process process = processBuilder.start();
// 创建用于读取输出的Future
Future<List<String>> outputFuture = executorService.submit(() -> readOutput(process.getInputStream()));
Future<List<String>> errorFuture = executorService.submit(() -> readOutput(process.getErrorStream()));
// 等待进程执行完成或超时
boolean completed = true;
if (config.getTimeout() != null && config.getTimeout() > 0) {
completed = process.waitFor(config.getTimeout(), TimeUnit.SECONDS);
if (!completed) {
process.destroyForcibly();
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED,
String.format("Shell execution timed out after %d seconds", config.getTimeout()));
}
} else {
process.waitFor();
}
// 获取输出结果
List<String> output = outputFuture.get(5, TimeUnit.SECONDS); // 给5秒时间读取输出
List<String> error = errorFuture.get(5, TimeUnit.SECONDS);
// 检查退出码
int exitCode = process.exitValue();
if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED,
String.format("Shell execution failed with exit code: %d%nError output: %s",
exitCode, String.join("\n", error)));
}
// 设置输出结果
nodeInstance.setOutput(String.join("\n", output));
if (!error.isEmpty()) {
nodeInstance.setError(String.join("\n", error));
}
}
private List<String> readOutput(java.io.InputStream inputStream) throws Exception {
List<String> output = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
output.add(line); output.add(line);
} }
// 读取错误输出
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
List<String> error = new ArrayList<>();
while ((line = errorReader.readLine()) != null) {
error.add(line);
}
// 等待进程结束
int exitCode = process.waitFor();
// 检查退出码
if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) {
throw new RuntimeException("Shell script execution failed with exit code: " + exitCode +
"\nError output: " + String.join("\n", error));
}
// 设置输出结果
nodeInstance.setOutput(String.join("\n", output));
if (!error.isEmpty()) {
nodeInstance.setError(String.join("\n", error));
}
} catch (Exception e) {
throw new RuntimeException("Shell script execution failed", e);
} }
return output;
} }
@Override @Override
@ -92,43 +160,14 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
context.log("Shell node termination is not implemented yet", LogLevelEnum.WARN); context.log("Shell node termination is not implemented yet", LogLevelEnum.WARN);
} }
@Data
public static class ShellConfig { public static class ShellConfig {
private String script; private String script;
private String workingDirectory; private String workingDirectory; // 保持与前端一致
private Integer retryTimes; // 重试次数
private Integer retryInterval; // 重试间隔
private Integer timeout; // 超时时间
private Map<String, String> environment; private Map<String, String> environment;
private Integer successExitCode; private Integer successExitCode;
// Getters and setters
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getWorkingDirectory() {
return workingDirectory;
}
public void setWorkingDirectory(String workingDirectory) {
this.workingDirectory = workingDirectory;
}
public Map<String, String> getEnvironment() {
return environment;
}
public void setEnvironment(Map<String, String> environment) {
this.environment = environment;
}
public Integer getSuccessExitCode() {
return successExitCode;
}
public void setSuccessExitCode(Integer successExitCode) {
this.successExitCode = successExitCode;
}
} }
} }