可正常启动,工作流可以正常运行,增加了其他节点,但是还没运行过,编译无错。

This commit is contained in:
戚辰先生 2024-12-08 16:33:38 +08:00
parent 591abe983d
commit 6a9b0f36b0
12 changed files with 557 additions and 128 deletions

View File

@ -1,11 +1,16 @@
package com.qqchen.deploy.backend.workflow.engine;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.engine.context.DefaultWorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.engine.executor.NodeExecutor;
import com.qqchen.deploy.backend.workflow.engine.parser.WorkflowDefinitionParser;
import com.qqchen.deploy.backend.workflow.entity.NodeConfig;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.entity.TransitionConfig;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
@ -22,6 +27,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -44,39 +51,51 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
@Resource
private WorkflowVariableOperations variableOperations;
@Resource
private ObjectMapper objectMapper;
@Resource
private WorkflowDefinitionParser workflowDefinitionParser;
@Override
@Transactional
public WorkflowInstance startWorkflow(String workflowCode, String businessKey, Map<String, Object> variables) {
// 1. 检查工作流定义
// 1. 获取工作流定义
WorkflowDefinition definition = workflowDefinitionRepository.findByCodeAndDeletedFalse(workflowCode);
if (definition == null) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NOT_FOUND);
}
// 检查工作流定义状态
// 2. 检查工作流定义状态
if (definition.getStatus() != WorkflowDefinitionStatusEnum.PUBLISHED) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NOT_PUBLISHED);
}
// 2. 创建工作流实例
// 3. 创建工作流实例
WorkflowInstance instance = new WorkflowInstance();
instance.setWorkflowDefinition(definition);
instance.setBusinessKey(businessKey);
// 设置工作流实例初始状态为 PENDING
instance.setStatus(WorkflowInstanceStatusEnum.PENDING);
instance.setStatus(WorkflowInstanceStatusEnum.RUNNING);
instance.setCreateTime(LocalDateTime.now());
workflowInstanceRepository.save(instance);
// 3. 设置工作流变量
// 4. 初始化工作流变量
if (variables != null && !variables.isEmpty()) {
variableOperations.setVariables(instance.getId(), variables);
}
// 4. 创建开始节点实例并启动工作流
NodeInstance startNode = createStartNode(definition, instance);
instance.start();
workflowInstanceRepository.save(instance);
executeNode(startNode.getId());
// 5. 创建并执行开始节点
NodeInstance startNode = new NodeInstance();
startNode.setWorkflowInstance(instance);
startNode.setNodeId("start");
startNode.setNodeType(NodeTypeEnum.START);
startNode.setName("开始节点");
startNode.setStatus(NodeStatusEnum.PENDING);
startNode.setCreateTime(LocalDateTime.now());
nodeInstanceRepository.save(startNode);
// 6. 执行开始节点
executeNode(startNode.getId());
return instance;
}
@ -97,7 +116,7 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTOR_NOT_FOUND);
}
// 创建工作流上下文
// 创建执行上下文
WorkflowContextOperations context = DefaultWorkflowContext.builder()
.workflowInstance(instance)
.variableOperations(variableOperations)
@ -111,7 +130,33 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
nodeInstance.setEndTime(LocalDateTime.now());
nodeInstanceRepository.save(nodeInstance);
// 4. 检查是否所有节点都已完成
// 4. 获取并执行后续节点
WorkflowDefinition definition = instance.getWorkflowDefinition();
List<TransitionConfig> transitions = workflowDefinitionParser.parseTransitionConfig(definition.getTransitionConfig());
List<NodeConfig> nodeConfigs = workflowDefinitionParser.parseNodeConfig(definition.getNodeConfig());
// 获取当前节点的后续节点
List<String> nextNodeIds = transitions.stream()
.filter(t -> t.getSourceNodeId().equals(nodeInstance.getNodeId()))
.map(TransitionConfig::getTargetNodeId)
.toList();
// 创建并执行后续节点
for (String nextNodeId : nextNodeIds) {
NodeConfig nodeConfig = nodeConfigs.stream()
.filter(n -> n.getNodeId().equals(nextNodeId))
.findFirst()
.orElse(null);
if (nodeConfig == null) {
log.error("Node configuration not found for node: {}", nextNodeId);
continue;
}
createAndExecuteNextNode(instance, nextNodeId, nodeConfig);
}
// 5. 检查是否所有节点都已完成
List<NodeInstance> uncompletedNodes = nodeInstanceRepository.findByWorkflowInstanceAndStatusNot(
instance, NodeStatusEnum.COMPLETED);
@ -121,6 +166,24 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
}
}
private void createAndExecuteNextNode(WorkflowInstance instance, String nextNodeId, NodeConfig nodeConfig) {
try {
NodeInstance nextNode = new NodeInstance();
nextNode.setNodeId(nextNodeId);
nextNode.setWorkflowInstance(instance);
nextNode.setNodeType(nodeConfig.getType());
nextNode.setName(nodeConfig.getName());
nextNode.setConfig(objectMapper.writeValueAsString(nodeConfig.getConfig()));
nextNode.setStatus(NodeStatusEnum.PENDING);
nodeInstanceRepository.save(nextNode);
// 递归执行后续节点
executeNode(nextNode.getId());
} catch (JsonProcessingException e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e);
}
}
@Override
@Transactional
public void completeNode(Long nodeInstanceId, Map<String, Object> variables) {

View File

@ -1,54 +0,0 @@
package com.qqchen.deploy.backend.workflow.engine.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class StartNodeExecutor implements NodeExecutor {
@Resource
private ObjectMapper objectMapper;
@Override
public NodeTypeEnum getNodeType() {
return NodeTypeEnum.START;
}
@Override
public void execute(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 开始节点直接完成
nodeInstance.setStatus(NodeStatusEnum.COMPLETED);
context.log("开始节点执行完成", LogLevelEnum.INFO);
}
@Override
public void validate(String config) {
try {
// 验证配置格式是否正确
StartConfig startConfig = objectMapper.readValue(config, StartConfig.class);
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
}
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 开始节点无需终止操作
}
@Data
public static class StartConfig {
// 开始节点无需特殊配置
}
}

View File

@ -12,6 +12,7 @@ import lombok.Data;
@JsonSubTypes({
@JsonSubTypes.Type(value = ApprovalNodeConfig.class, name = "APPROVAL"),
@JsonSubTypes.Type(value = ScriptNodeConfig.class, name = "SCRIPT"),
@JsonSubTypes.Type(value = ShellNodeConfig.class, name = "SHELL"),
@JsonSubTypes.Type(value = JenkinsNodeConfig.class, name = "JENKINS"),
@JsonSubTypes.Type(value = GitNodeConfig.class, name = "GIT"),
@JsonSubTypes.Type(value = ConditionNodeConfig.class, name = "CONDITION"),

View File

@ -5,15 +5,63 @@ import lombok.EqualsAndHashCode;
import java.util.Map;
/**
* 脚本节点配置
* 支持多种脚本语言Python, Shell, JavaScript等
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ScriptNodeConfig extends NodeConfig {
/**
* 脚本内容
*/
private String script;
/**
* 脚本语言python/shell/javascript
*/
private String language;
/**
* 解释器路径可选
* 例如/usr/local/bin/python3
*/
private String interpreter;
/**
* 工作目录
*/
private String workingDirectory;
/**
* 超时时间
*/
private Integer timeout;
/**
* 环境变量
*/
private Map<String, String> environment;
private Integer successExitCode;
/**
* 成功退出码
*/
private Integer successExitCode = 0;
/**
* 重试次数
*/
private Integer retryTimes;
/**
* 重试间隔
*/
private Integer retryInterval;
/**
* 脚本参数
*/
private Map<String, String> arguments;
public String getScript() {
return script;

View File

@ -0,0 +1,55 @@
package com.qqchen.deploy.backend.workflow.engine.node;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
/**
* Shell节点配置
* @deprecated 请使用 {@link ScriptNodeConfig} 替代设置 language="shell"
*/
@Deprecated(since = "1.0", forRemoval = true)
@Data
@EqualsAndHashCode(callSuper = true)
public class ShellNodeConfig extends NodeConfig {
/**
* 执行器类型固定为 SHELL
*/
private String executor = "SHELL";
/**
* Shell脚本内容
*/
private String script;
/**
* 工作目录
*/
private String workingDirectory;
/**
* 超时时间
*/
private Integer timeout;
/**
* 重试次数
*/
private Integer retryTimes;
/**
* 重试间隔
*/
private Integer retryInterval;
/**
* 环境变量
*/
private Map<String, String> environment;
/**
* 成功退出码
*/
private Integer successExitCode = 0;
}

View File

@ -1,70 +1,55 @@
package com.qqchen.deploy.backend.workflow.engine.executor;
package com.qqchen.deploy.backend.workflow.engine.node.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 结束节点执行器
* 负责完成工作流实例的执行
*/
@Slf4j
@Component
public class EndNodeExecutor implements NodeExecutor {
@Component("endNodeExecutor")
public class EndNodeExecutor extends AbstractNodeExecutor {
@Resource
private IWorkflowInstanceRepository workflowInstanceRepository;
@Resource
private ObjectMapper objectMapper;
@Override
public NodeTypeEnum getNodeType() {
return NodeTypeEnum.END;
}
@Override
public void execute(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 1. 完成结束节点
nodeInstance.setStatus(NodeStatusEnum.COMPLETED);
nodeInstance.setEndTime(LocalDateTime.now());
public void validate(String config) {
// 结束节点不需要配置无需验证
}
// 2. 完成工作流实例
@Override
protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 完成工作流实例
WorkflowInstance instance = nodeInstance.getWorkflowInstance();
instance.setStatus(WorkflowInstanceStatusEnum.COMPLETED);
instance.setEndTime(LocalDateTime.now());
workflowInstanceRepository.save(instance);
// 记录日志
context.log("工作流执行完成", LogLevelEnum.INFO);
}
@Override
public void validate(String config) {
try {
// 验证配置格式是否正确
EndConfig endConfig = objectMapper.readValue(config, EndConfig.class);
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
}
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 结束节点无需终止操作
}
@Data
public static class EndConfig {
// 结束节点无需特殊配置
// 结束节点不需要终止逻辑
context.log("结束节点无需终止操作", LogLevelEnum.INFO);
}
}

View File

@ -0,0 +1,201 @@
package com.qqchen.deploy.backend.workflow.engine.node.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.engine.node.ScriptNodeConfig;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import com.qqchen.deploy.backend.workflow.service.WorkflowVariableOperations;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* 脚本节点执行器
* 支持多种脚本语言Python, Shell, JavaScript等
*/
@Slf4j
@Component
public class ScriptNodeExecutor extends AbstractNodeExecutor {
@Resource
private ObjectMapper objectMapper;
@Resource
private WorkflowVariableOperations variableOperations;
private final ExecutorService executorService = Executors.newCachedThreadPool();
@Override
public NodeTypeEnum getNodeType() {
return NodeTypeEnum.SCRIPT;
}
@Override
public void validate(String config) {
try {
ScriptNodeConfig scriptConfig = objectMapper.readValue(config, ScriptNodeConfig.class);
// 验证脚本内容
if (scriptConfig.getScript() == null || scriptConfig.getScript().trim().isEmpty()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Script content cannot be empty");
}
// 验证脚本语言
if (scriptConfig.getLanguage() == null || scriptConfig.getLanguage().trim().isEmpty()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Script language must be specified");
}
// 验证其他参数
if (scriptConfig.getTimeout() != null && scriptConfig.getTimeout() <= 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Timeout must be positive");
}
if (scriptConfig.getRetryTimes() != null && scriptConfig.getRetryTimes() < 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Retry times cannot be negative");
}
if (scriptConfig.getRetryInterval() != null && scriptConfig.getRetryInterval() < 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Retry interval cannot be negative");
}
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e);
}
}
@Override
protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) {
try {
String configJson = nodeInstance.getConfig();
ScriptNodeConfig config = objectMapper.readValue(configJson, ScriptNodeConfig.class);
// 设置重试次数和间隔
int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1;
long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0;
Exception lastException = null;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
// 执行脚本
executeScript(config, nodeInstance, context);
return; // 执行成功直接返回
} catch (Exception e) {
lastException = e;
if (attempt < maxAttempts) {
context.log(String.format("Script execution failed (attempt %d/%d), retrying in %d seconds",
attempt, maxAttempts, retryInterval), LogLevelEnum.WARN);
Thread.sleep(retryInterval * 1000L);
}
}
}
// 如果所有重试都失败抛出最后一个异常
if (lastException != null) {
throw lastException;
}
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e);
}
}
private void executeScript(ScriptNodeConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception {
ProcessBuilder processBuilder = new ProcessBuilder();
// 根据脚本语言设置命令
List<String> command = new ArrayList<>();
switch (config.getLanguage().toLowerCase()) {
case "python":
command.add(config.getInterpreter() != null ? config.getInterpreter() : "python");
command.add("-c");
command.add(config.getScript());
break;
case "shell":
command.add("sh");
command.add("-c");
command.add(config.getScript());
break;
// TODO: 添加其他语言支持
default:
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR,
"Unsupported script language: " + config.getLanguage());
}
processBuilder.command(command);
// 设置工作目录
if (config.getWorkingDirectory() != null && !config.getWorkingDirectory().trim().isEmpty()) {
processBuilder.directory(new java.io.File(config.getWorkingDirectory()));
}
// 设置环境变量
if (config.getEnvironment() != null && !config.getEnvironment().isEmpty()) {
Map<String, String> env = processBuilder.environment();
env.putAll(config.getEnvironment());
}
Process process = processBuilder.start();
// 创建用于读取输出的Future
Future<List<String>> outputFuture = executorService.submit(() -> readOutput(process.getInputStream()));
Future<List<String>> errorFuture = executorService.submit(() -> readOutput(process.getErrorStream()));
// 等待进程执行完成或超时
boolean completed = true;
if (config.getTimeout() != null && config.getTimeout() > 0) {
completed = process.waitFor(config.getTimeout(), TimeUnit.SECONDS);
if (!completed) {
process.destroyForcibly();
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED,
String.format("Script execution timed out after %d seconds", config.getTimeout()));
}
} else {
process.waitFor();
}
// 获取输出结果
List<String> output = outputFuture.get(5, TimeUnit.SECONDS); // 给5秒时间读取输出
List<String> error = errorFuture.get(5, TimeUnit.SECONDS);
// 检查退出码
int exitCode = process.exitValue();
if (config.getSuccessExitCode() != null && exitCode != config.getSuccessExitCode()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED,
String.format("Script execution failed with exit code: %d%nError output: %s",
exitCode, String.join("\n", error)));
}
// 设置输出变量
Map<String, Object> outputVariables = new HashMap<>();
outputVariables.put("scriptOutput", String.join("\n", output));
outputVariables.put("exitCode", exitCode);
variableOperations.setVariables(nodeInstance.getWorkflowInstance().getId(), outputVariables);
// 记录执行日志
context.log(String.format("Script executed successfully with exit code: %d", exitCode), LogLevelEnum.INFO);
}
private List<String> readOutput(java.io.InputStream inputStream) throws Exception {
List<String> output = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
output.add(line);
}
}
return output;
}
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) {
// TODO: 实现终止脚本进程的逻辑
context.log("Script node termination is not implemented yet", LogLevelEnum.WARN);
}
}

View File

@ -6,10 +6,10 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.engine.node.ShellNodeConfig;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import com.qqchen.deploy.backend.workflow.service.WorkflowVariableOperations;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
@ -22,6 +22,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* Shell节点执行器
* @deprecated 请使用 {@link ScriptNodeExecutor} 替代
*/
@Deprecated(since = "1.0", forRemoval = true)
@Slf4j
@Component
public class ShellNodeExecutor extends AbstractNodeExecutor {
@ -42,7 +47,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
@Override
public void validate(String config) {
try {
ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class);
ShellNodeConfig shellConfig = objectMapper.readValue(config, ShellNodeConfig.class);
// 验证执行器类型
if (!"SHELL".equals(shellConfig.getExecutor())) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
@ -70,7 +75,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) {
try {
String configJson = nodeInstance.getConfig();
ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class);
ShellNodeConfig config = objectMapper.readValue(configJson, ShellNodeConfig.class);
// 验证执行器类型
if (!"SHELL".equals(config.getExecutor())) {
@ -106,7 +111,7 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
}
}
private void executeShellCommand(ShellConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception {
private void executeShellCommand(ShellNodeConfig config, NodeInstance nodeInstance, WorkflowContextOperations context) throws Exception {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sh", "-c", config.getScript());
@ -178,16 +183,4 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
// TODO: 实现终止Shell进程的逻辑
context.log("Shell node termination is not implemented yet", LogLevelEnum.WARN);
}
@Data
public static class ShellConfig {
private String executor; // 执行器类型
private String script; // 脚本内容
private String workingDirectory; // 工作目录
private Integer retryTimes; // 重试次数
private Integer retryInterval; // 重试间隔
private Integer timeout; // 超时时间
private Map<String, String> environment; // 环境变量
private Integer successExitCode; // 成功退出码
}
}

View File

@ -0,0 +1,58 @@
package com.qqchen.deploy.backend.workflow.engine.node.executor;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContextOperations;
import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 开始节点执行器
* 负责工作流实例的初始化工作
*/
@Slf4j
@Component("startNodeExecutor")
public class StartNodeExecutor extends AbstractNodeExecutor {
@Override
public NodeTypeEnum getNodeType() {
return NodeTypeEnum.START;
}
@Override
public void validate(String config) {
// 开始节点不需要配置无需验证
}
@Override
protected void doExecute(NodeInstance nodeInstance, WorkflowContextOperations context) {
WorkflowInstance instance = context.getInstance();
// 记录启动日志
String message = String.format(
"工作流[%s]开始执行实例ID: %d业务键: %s",
instance.getWorkflowDefinition().getName(),
instance.getId(),
instance.getBusinessKey()
);
context.log(message, LogLevelEnum.INFO);
// 记录启动时间
log.info("Workflow instance {} started at {}", instance.getId(), instance.getCreateTime());
}
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContextOperations context) {
// 开始节点的终止意味着整个工作流的终止
WorkflowInstance instance = nodeInstance.getWorkflowInstance();
String message = String.format(
"工作流[%s]在启动阶段被终止实例ID: %d",
instance.getWorkflowDefinition().getName(),
instance.getId()
);
context.log(message, LogLevelEnum.WARN);
}
}

View File

@ -16,9 +16,10 @@ public enum NodeTypeEnum {
GATEWAY(3, "GATEWAY", "网关节点"),
SUB_PROCESS(4, "SUB_PROCESS", "子流程节点"),
SHELL(5, "SHELL", "Shell脚本节点"),
APPROVAL(6, "APPROVAL", "审批节点"),
JENKINS(7, "JENKINS", "Jenkins任务节点"),
GIT(8, "GIT", "Git操作节点");
SCRIPT(6, "SCRIPT", "脚本节点"),
APPROVAL(7, "APPROVAL", "审批节点"),
JENKINS(8, "JENKINS", "Jenkins任务节点"),
GIT(9, "GIT", "Git操作节点");
private final int value;
private final String code;

View File

@ -0,0 +1,36 @@
package com.qqchen.deploy.backend.workflow.engine;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import java.nio.file.Files;
import java.util.HashMap;
@SpringBootTest
public class WorkflowShellTest {
@Resource
private WorkflowEngine workflowEngine;
@Resource
private IWorkflowDefinitionRepository workflowDefinitionRepository;
@Resource
private ObjectMapper objectMapper;
@Test
public void testShellWorkflow() throws Exception {
// 1. 加载工作流定义
String json = Files.readString(new ClassPathResource("workflow-shell-test.json").getFile().toPath());
WorkflowDefinition definition = objectMapper.readValue(json, WorkflowDefinition.class);
workflowDefinitionRepository.save(definition);
// 2. 启动工作流实例
workflowEngine.startWorkflow(definition.getId(), "TEST-" + System.currentTimeMillis(), new HashMap<>());
}
}

View File

@ -0,0 +1,42 @@
{
"id": "test-shell-workflow",
"name": "Shell测试工作流",
"description": "测试Shell节点执行",
"nodes": {
"start": {
"id": "start",
"name": "开始",
"type": "START"
},
"shell1": {
"id": "shell1",
"name": "Shell命令",
"type": "SHELL",
"config": {
"executor": "SHELL",
"script": "echo 'Hello World' && ls -la",
"timeout": 30,
"retryTimes": 3,
"retryInterval": 5,
"successExitCode": 0
}
},
"end": {
"id": "end",
"name": "结束",
"type": "END"
}
},
"transitions": {
"start-to-shell": {
"from": "start",
"to": "shell1",
"condition": null
},
"shell-to-end": {
"from": "shell1",
"to": "end",
"condition": null
}
}
}