兼容前端传递过来的config

This commit is contained in:
dengqichen 2024-12-06 16:20:30 +08:00
parent ebea17343f
commit cc395cc0dd
4 changed files with 93 additions and 32 deletions

View File

@ -1,7 +1,10 @@
package com.qqchen.deploy.backend.workflow.engine.executor; package com.qqchen.deploy.backend.workflow.engine.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
@ -9,6 +12,7 @@ import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum; import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -21,6 +25,9 @@ public class EndNodeExecutor implements NodeExecutor {
@Resource @Resource
private IWorkflowInstanceRepository workflowInstanceRepository; private IWorkflowInstanceRepository workflowInstanceRepository;
@Resource
private ObjectMapper objectMapper;
@Override @Override
public NodeTypeEnum getNodeType() { public NodeTypeEnum getNodeType() {
return NodeTypeEnum.END; return NodeTypeEnum.END;
@ -43,11 +50,21 @@ public class EndNodeExecutor implements NodeExecutor {
@Override @Override
public void validate(String config) { public void validate(String config) {
// 结束节点无需配置 try {
// 验证配置格式是否正确
EndConfig endConfig = objectMapper.readValue(config, EndConfig.class);
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
} }
@Override @Override
public void terminate(NodeInstance nodeInstance, WorkflowContext context) { public void terminate(NodeInstance nodeInstance, WorkflowContext context) {
// 结束节点无需终止操作 // 结束节点无需终止操作
} }
@Data
public static class EndConfig {
// 结束节点无需特殊配置
}
} }

View File

@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException; import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.Expression; import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
@ -15,7 +16,7 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@Component @Component
@ -35,22 +36,21 @@ public class GatewayNodeExecutor implements NodeExecutor {
GatewayConfig config = objectMapper.readValue(nodeInstance.getConfig(), GatewayConfig.class); GatewayConfig config = objectMapper.readValue(nodeInstance.getConfig(), GatewayConfig.class);
if (config.getType() == null) { if (config.getType() == null) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Gateway type is required"); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
} }
switch (config.getType()) { switch (config.getType()) {
case EXCLUSIVE:
handleExclusiveGateway(nodeInstance, context, config);
break;
case PARALLEL: case PARALLEL:
handleParallelGateway(nodeInstance, context, config); handleParallelGateway(nodeInstance, context, config);
break; break;
case EXCLUSIVE:
handleExclusiveGateway(nodeInstance, context, config);
break;
case INCLUSIVE: case INCLUSIVE:
handleInclusiveGateway(nodeInstance, context, config); handleInclusiveGateway(nodeInstance, context, config);
break; break;
default: default:
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_TYPE_NOT_SUPPORTED, throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
"Unsupported gateway type: " + config.getType());
} }
} catch (Exception e) { } catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e);
@ -67,8 +67,7 @@ public class GatewayNodeExecutor implements NodeExecutor {
return; return;
} }
} }
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED);
"No condition matched in exclusive gateway");
} catch (Exception e) { } catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e);
} }
@ -97,8 +96,7 @@ public class GatewayNodeExecutor implements NodeExecutor {
} }
} }
if (nextNodeIds.isEmpty()) { if (nextNodeIds.isEmpty()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED);
"No condition matched in inclusive gateway");
} }
nodeInstance.setOutput(objectMapper.createObjectNode() nodeInstance.setOutput(objectMapper.createObjectNode()
.put("nextNodeIds", String.join(",", nextNodeIds)) .put("nextNodeIds", String.join(",", nextNodeIds))
@ -119,17 +117,12 @@ public class GatewayNodeExecutor implements NodeExecutor {
} }
} }
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContext context) {
// Gateway nodes are instant operations, no need to terminate
}
@Override @Override
public void validate(String config) { public void validate(String config) {
try { try {
GatewayConfig gatewayConfig = objectMapper.readValue(config, GatewayConfig.class); GatewayConfig gatewayConfig = objectMapper.readValue(config, GatewayConfig.class);
if (gatewayConfig.getType() == null) { if (gatewayConfig.getType() == null) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, "Gateway type is required"); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
} }
// 验证条件表达式 // 验证条件表达式
if (gatewayConfig.getConditions() != null) { if (gatewayConfig.getConditions() != null) {
@ -137,13 +130,29 @@ public class GatewayNodeExecutor implements NodeExecutor {
try { try {
expressionParser.parseExpression(condition.getExpression()); expressionParser.parseExpression(condition.getExpression());
} catch (Exception e) { } catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
"Invalid condition expression: " + condition.getExpression());
} }
} }
} }
} catch (Exception e) { } catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR, e); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
} }
} }
@Override
public void terminate(NodeInstance nodeInstance, WorkflowContext context) {
// Gateway nodes are instant operations, no need to terminate
}
@Data
public static class GatewayConfig {
private GatewayType type; // 网关类型PARALLEL, EXCLUSIVE, INCLUSIVE
private List<GatewayCondition> conditions; // 网关条件
}
@Data
public static class GatewayCondition {
private String expression; // 条件表达式
private String nextNodeId; // 下一个节点ID
}
} }

View File

@ -1,10 +1,15 @@
package com.qqchen.deploy.backend.workflow.engine.executor; package com.qqchen.deploy.backend.workflow.engine.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.system.enums.LogLevelEnum; import com.qqchen.deploy.backend.system.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext; import com.qqchen.deploy.backend.workflow.engine.context.WorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.exception.WorkflowEngineException;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance; import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum; import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -12,6 +17,9 @@ import org.springframework.stereotype.Component;
@Component @Component
public class StartNodeExecutor implements NodeExecutor { public class StartNodeExecutor implements NodeExecutor {
@Resource
private ObjectMapper objectMapper;
@Override @Override
public NodeTypeEnum getNodeType() { public NodeTypeEnum getNodeType() {
return NodeTypeEnum.START; return NodeTypeEnum.START;
@ -26,11 +34,21 @@ public class StartNodeExecutor implements NodeExecutor {
@Override @Override
public void validate(String config) { public void validate(String config) {
// 开始节点无需配置 try {
// 验证配置格式是否正确
StartConfig startConfig = objectMapper.readValue(config, StartConfig.class);
} catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
} }
@Override @Override
public void terminate(NodeInstance nodeInstance, WorkflowContext context) { public void terminate(NodeInstance nodeInstance, WorkflowContext context) {
// 开始节点无需终止操作 // 开始节点无需终止操作
} }
@Data
public static class StartConfig {
// 开始节点无需特殊配置
}
} }

View File

@ -38,9 +38,15 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
public void validate(String config) { public void validate(String config) {
try { try {
ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class); ShellConfig shellConfig = objectMapper.readValue(config, ShellConfig.class);
// 验证执行器类型
if (!"SHELL".equals(shellConfig.getExecutor())) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
// 验证脚本内容
if (shellConfig.getScript() == null || shellConfig.getScript().trim().isEmpty()) { if (shellConfig.getScript() == null || shellConfig.getScript().trim().isEmpty()) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
} }
// 验证其他参数
if (shellConfig.getTimeout() != null && shellConfig.getTimeout() <= 0) { if (shellConfig.getTimeout() != null && shellConfig.getTimeout() <= 0) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
} }
@ -61,6 +67,11 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
String configJson = nodeInstance.getConfig(); String configJson = nodeInstance.getConfig();
ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class); ShellConfig config = objectMapper.readValue(configJson, ShellConfig.class);
// 验证执行器类型
if (!"SHELL".equals(config.getExecutor())) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_CONFIG_ERROR);
}
// 设置重试次数和间隔 // 设置重试次数和间隔
int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1; int maxAttempts = config.getRetryTimes() != null ? config.getRetryTimes() : 1;
long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0; long retryInterval = config.getRetryInterval() != null ? config.getRetryInterval() : 0;
@ -83,7 +94,9 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
} }
// 如果所有重试都失败抛出最后一个异常 // 如果所有重试都失败抛出最后一个异常
throw lastException; if (lastException != null) {
throw lastException;
}
} catch (Exception e) { } catch (Exception e) {
throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e); throw new WorkflowEngineException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED, e);
@ -95,12 +108,12 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
processBuilder.command("sh", "-c", config.getScript()); processBuilder.command("sh", "-c", config.getScript());
// 设置工作目录 // 设置工作目录
if (config.getWorkingDirectory() != null) { if (config.getWorkingDirectory() != null && !config.getWorkingDirectory().trim().isEmpty()) {
processBuilder.directory(new java.io.File(config.getWorkingDirectory())); processBuilder.directory(new java.io.File(config.getWorkingDirectory()));
} }
// 设置环境变量 // 设置环境变量
if (config.getEnvironment() != null) { if (config.getEnvironment() != null && !config.getEnvironment().isEmpty()) {
Map<String, String> env = processBuilder.environment(); Map<String, String> env = processBuilder.environment();
env.putAll(config.getEnvironment()); env.putAll(config.getEnvironment());
} }
@ -141,6 +154,9 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
if (!error.isEmpty()) { if (!error.isEmpty()) {
nodeInstance.setError(String.join("\n", error)); nodeInstance.setError(String.join("\n", error));
} }
// 记录执行日志
context.log(String.format("Shell script executed successfully with exit code: %d", exitCode), LogLevelEnum.INFO);
} }
private List<String> readOutput(java.io.InputStream inputStream) throws Exception { private List<String> readOutput(java.io.InputStream inputStream) throws Exception {
@ -162,12 +178,13 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
@Data @Data
public static class ShellConfig { public static class ShellConfig {
private String script; private String executor; // 执行器类型
private String workingDirectory; // 保持与前端一致 private String script; // 脚本内容
private Integer retryTimes; // 重试次数 private String workingDirectory; // 工作目录
private Integer retryInterval; // 重试间隔 private Integer retryTimes; // 重试次数
private Integer timeout; // 超时时间 private Integer retryInterval; // 重试间隔
private Map<String, String> environment; private Integer timeout; // 超时时间
private Integer successExitCode; private Map<String, String> environment; // 环境变量
private Integer successExitCode; // 成功退出码
} }
} }