增加工作流代码可正常启动

This commit is contained in:
戚辰先生 2024-12-03 23:10:01 +08:00
parent 609a662945
commit 6050c5e189
15 changed files with 312 additions and 344 deletions

View File

@ -1,11 +1,13 @@
package com.qqchen.deploy.backend.workflow.engine.impl;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.workflow.engine.WorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.WorkflowEngine;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.INodeInstanceRepository;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowLogService;
@ -14,7 +16,6 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import java.util.List;
import java.util.Map;
@ -42,34 +43,27 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
@Override
@Transactional
public void startInstance(Long instanceId, Map<String, Object> variables) {
// 获取工作流实例
WorkflowInstance instance = getWorkflowInstance(instanceId);
if (instance.getStatus() != WorkflowStatusEnum.CREATED) {
throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID);
throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING);
}
// 初始化工作流上下文
WorkflowContext context = initContext(instance, variables);
try {
// 更新工作流状态
instance.setStatus(WorkflowStatusEnum.RUNNING);
workflowInstanceRepository.save(instance);
// 保存工作流变量
workflowVariableService.saveVariables(instanceId, variables);
// 记录工作流日志
workflowLogService.logWorkflowStart(instance);
// 执行第一个节点
executeNextNode(context);
} catch (Exception e) {
log.error("Failed to start workflow instance: {}", instanceId, e);
instance.setStatus(WorkflowStatusEnum.FAILED);
workflowInstanceRepository.save(instance);
workflowLogService.logWorkflowError(instance, e.getMessage());
throw new BusinessException(ResponseCode.WORKFLOW_START_FAILED);
throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED);
}
}
@ -77,16 +71,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
@Transactional
public void cancelInstance(Long instanceId) {
WorkflowInstance instance = getWorkflowInstance(instanceId);
if (instance.getStatus() != WorkflowStatusEnum.RUNNING && instance.getStatus() != WorkflowStatusEnum.PAUSED) {
throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID);
if (!instance.getStatus().canCancel()) {
throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_ALREADY_CANCELED);
}
instance.setStatus(WorkflowStatusEnum.CANCELLED);
workflowInstanceRepository.save(instance);
// 取消所有运行中的节点
List<NodeInstance> runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(
instanceId, NodeStatusEnum.RUNNING);
instanceId, NodeStatusEnum.RUNNING.name());
runningNodes.forEach(node -> {
node.setStatus(NodeStatusEnum.CANCELLED);
nodeInstanceRepository.save(node);
@ -99,16 +92,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
@Transactional
public void pauseInstance(Long instanceId) {
WorkflowInstance instance = getWorkflowInstance(instanceId);
if (instance.getStatus() != WorkflowStatusEnum.RUNNING) {
throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID);
if (!instance.getStatus().canPause()) {
throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING);
}
instance.setStatus(WorkflowStatusEnum.PAUSED);
workflowInstanceRepository.save(instance);
// 暂停所有运行中的节点
List<NodeInstance> runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(
instanceId, NodeStatusEnum.RUNNING);
instanceId, NodeStatusEnum.RUNNING.name());
runningNodes.forEach(node -> {
node.setStatus(NodeStatusEnum.PAUSED);
nodeInstanceRepository.save(node);
@ -121,16 +113,15 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
@Transactional
public void resumeInstance(Long instanceId) {
WorkflowInstance instance = getWorkflowInstance(instanceId);
if (instance.getStatus() != WorkflowStatusEnum.PAUSED) {
throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID);
if (!instance.getStatus().canResume()) {
throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING);
}
instance.setStatus(WorkflowStatusEnum.RUNNING);
workflowInstanceRepository.save(instance);
// 恢复所有暂停的节点
List<NodeInstance> pausedNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(
instanceId, NodeStatusEnum.PAUSED);
instanceId, NodeStatusEnum.PAUSED.name());
pausedNodes.forEach(node -> {
node.setStatus(NodeStatusEnum.RUNNING);
nodeInstanceRepository.save(node);
@ -144,23 +135,20 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
public void retryNode(Long nodeInstanceId) {
NodeInstance node = getNodeInstance(nodeInstanceId);
if (node.getStatus() != NodeStatusEnum.FAILED) {
throw new BusinessException(ResponseCode.NODE_STATUS_INVALID);
throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED);
}
WorkflowInstance instance = getWorkflowInstance(node.getWorkflowInstanceId());
if (instance.getStatus() != WorkflowStatusEnum.FAILED) {
throw new BusinessException(ResponseCode.WORKFLOW_STATUS_INVALID);
throw new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_RUNNING);
}
// 重置节点状态
node.setStatus(NodeStatusEnum.RUNNING);
nodeInstanceRepository.save(node);
// 重置工作流状态
instance.setStatus(WorkflowStatusEnum.RUNNING);
workflowInstanceRepository.save(instance);
// 记录重试日志
workflowLogService.logNodeRetry(node);
}
@ -169,35 +157,28 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
public void skipNode(Long nodeInstanceId) {
NodeInstance node = getNodeInstance(nodeInstanceId);
if (node.getStatus() != NodeStatusEnum.FAILED) {
throw new BusinessException(ResponseCode.NODE_STATUS_INVALID);
throw new BusinessException(ResponseCode.WORKFLOW_NODE_EXECUTION_FAILED);
}
// 更新节点状态为已跳过
node.setStatus(NodeStatusEnum.SKIPPED);
nodeInstanceRepository.save(node);
// 获取工作流实例
WorkflowInstance instance = getWorkflowInstance(node.getWorkflowInstanceId());
// 初始化上下文
WorkflowContext context = initContext(instance, workflowVariableService.getVariables(instance.getId()));
context.setCurrentNode(node);
// 执行下一个节点
executeNextNode(context);
// 记录跳过日志
workflowLogService.logNodeSkip(node);
}
private WorkflowInstance getWorkflowInstance(Long instanceId) {
return workflowInstanceRepository.findById(instanceId)
.orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_NOT_FOUND));
.orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_INSTANCE_NOT_FOUND));
}
private NodeInstance getNodeInstance(Long nodeInstanceId) {
return nodeInstanceRepository.findById(nodeInstanceId)
.orElseThrow(() -> new BusinessException(ResponseCode.NODE_NOT_FOUND));
.orElseThrow(() -> new BusinessException(ResponseCode.WORKFLOW_NODE_NOT_FOUND));
}
private WorkflowContext initContext(WorkflowInstance instance, Map<String, Object> variables) {
@ -212,7 +193,6 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
NodeInstance currentNode = context.getCurrentNode();
List<NodeInstance> allNodes = context.getAllNodes();
// 如果当前节点为空说明是第一个节点
Optional<NodeInstance> nextNode;
if (currentNode == null) {
nextNode = allNodes.stream()
@ -231,7 +211,6 @@ public class DefaultWorkflowEngine implements WorkflowEngine {
workflowLogService.logNodeStart(node);
// TODO: 实际节点执行逻辑
} else {
// 没有下一个节点工作流完成
WorkflowInstance instance = context.getWorkflowInstance();
instance.setStatus(WorkflowStatusEnum.COMPLETED);
workflowInstanceRepository.save(instance);

View File

@ -2,47 +2,24 @@ package com.qqchen.deploy.backend.workflow.engine.node;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
/**
* 脚本节点配置
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ScriptNodeConfig extends NodeConfig {
/**
* 脚本类型SHELL, PYTHON, GROOVY
*/
private String scriptType;
/**
* 脚本内容
*/
private String content;
/**
* 脚本参数
*/
private Map<String, String> parameters;
/**
* 执行主机ID
*/
private Long hostId;
/**
* 工作目录
*/
private String script;
private String language;
private String workingDirectory;
/**
* 环境变量
*/
private Integer timeout;
private Map<String, String> environment;
/**
* 成功退出码
*/
private Integer successExitCode;
public String getScript() {
return script;
}
public Map<String, String> getEnvironment() {
return environment;
}
}

View File

@ -1,6 +1,5 @@
package com.qqchen.deploy.backend.workflow.engine.node.executor;
import com.qqchen.deploy.backend.enums.LogLevelEnum;
import com.qqchen.deploy.backend.workflow.engine.WorkflowContext;
import com.qqchen.deploy.backend.workflow.engine.node.AbstractNodeExecutor;
import com.qqchen.deploy.backend.workflow.engine.node.NodeConfig;
@ -8,29 +7,21 @@ import com.qqchen.deploy.backend.workflow.engine.node.NodeType;
import com.qqchen.deploy.backend.workflow.engine.node.ScriptNodeConfig;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.Map;
/**
* Shell节点执行器
*/
@Slf4j
@Component
public class ShellNodeExecutor extends AbstractNodeExecutor {
private static final int DEFAULT_TIMEOUT = 3600;
private Process currentProcess;
@Override
public NodeType getNodeType() {
return NodeType.SHELL;
return NodeType.SCRIPT;
}
@Override
@ -40,79 +31,56 @@ public class ShellNodeExecutor extends AbstractNodeExecutor {
@Override
protected boolean doExecute(NodeInstance nodeInstance, WorkflowContext context, NodeConfig config) throws Exception {
ScriptNodeConfig shellConfig = (ScriptNodeConfig) config;
ScriptNodeConfig scriptConfig = (ScriptNodeConfig) config;
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sh", "-c", scriptConfig.getScript());
// 构建命令
List<String> command = buildCommand(shellConfig);
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true);
// 设置工作目录
if (shellConfig.getWorkingDirectory() != null) {
processBuilder.directory(new java.io.File(shellConfig.getWorkingDirectory()));
if (scriptConfig.getWorkingDirectory() != null) {
processBuilder.directory(new java.io.File(scriptConfig.getWorkingDirectory()));
}
// 设置环境变量
if (shellConfig.getEnvironment() != null) {
processBuilder.environment().putAll(shellConfig.getEnvironment());
if (scriptConfig.getEnvironment() != null) {
Map<String, String> env = processBuilder.environment();
env.putAll(scriptConfig.getEnvironment());
}
// 启动进程
currentProcess = processBuilder.start();
Process process = processBuilder.start();
// 读取输出
try (BufferedReader reader = new BufferedReader(new InputStreamReader(currentProcess.getInputStream(), StandardCharsets.UTF_8))) {
// 读取标准输出
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
List<String> output = new ArrayList<>();
String line;
while ((line = reader.readLine()) != null) {
workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.INFO, line, null);
}
output.add(line);
}
// 等待进程完成
boolean completed = currentProcess.waitFor(shellConfig.getTimeout() != null ? shellConfig.getTimeout() : DEFAULT_TIMEOUT, TimeUnit.SECONDS);
if (!completed) {
currentProcess.destroyForcibly();
workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.ERROR, "Shell execution timeout", null);
return false;
// 读取错误输出
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
List<String> error = new ArrayList<>();
while ((line = errorReader.readLine()) != null) {
error.add(line);
}
// 等待进程结束
int exitCode = process.waitFor();
// 检查退出码
int exitCode = currentProcess.exitValue();
if (exitCode != 0) {
workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.ERROR,
"Shell execution failed with exit code: " + exitCode, null);
return false;
if (scriptConfig.getSuccessExitCode() != null && exitCode != scriptConfig.getSuccessExitCode()) {
throw new RuntimeException("Shell script execution failed with exit code: " + exitCode +
"\nError output: " + String.join("\n", error));
}
return true;
// 设置输出结果
nodeInstance.setOutput(String.join("\n", output));
if (!error.isEmpty()) {
nodeInstance.setError(String.join("\n", error));
}
return exitCode == 0;
}
@Override
protected void doCancel(NodeInstance nodeInstance, WorkflowContext context, NodeConfig config) throws Exception {
if (currentProcess != null && currentProcess.isAlive()) {
currentProcess.destroyForcibly();
workflowLogService.logSystem(context.getWorkflowInstance(), LogLevelEnum.WARN,
"Shell execution cancelled", null);
}
}
private List<String> buildCommand(ScriptNodeConfig config) {
List<String> command = new ArrayList<>();
// 添加Shell解释器
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
command.add("cmd");
command.add("/c");
} else {
command.add("sh");
command.add("-c");
}
// 添加脚本内容
command.add(config.getScript());
return command;
// TODO: 实现取消逻辑例如终止正在运行的进程
}
}

View File

@ -1,80 +1,107 @@
package com.qqchen.deploy.backend.workflow.entity;
import com.qqchen.deploy.backend.framework.annotation.LogicDelete;
import com.qqchen.deploy.backend.framework.domain.Entity;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnum;
import jakarta.persistence.*;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* 节点实例实体
*/
@Data
@EqualsAndHashCode(callSuper = true)
@jakarta.persistence.Entity
@Table(name = "sys_node_instance")
@Table(name = "wf_node_instance")
@LogicDelete
public class NodeInstance extends com.qqchen.deploy.backend.framework.domain.Entity<Long> {
public class NodeInstance extends Entity<Long> {
/**
* 工作流实例ID用于优化查询
*/
@Column(name = "workflow_instance_id", nullable = false)
private Long workflowInstanceId;
/**
* 工作流实例
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "workflow_instance_id", nullable = false)
@JoinColumn(name = "workflow_instance_id", insertable = false, updatable = false)
private WorkflowInstance workflowInstance;
/**
* 节点ID
*/
@Column(name = "node_id", nullable = false, length = 50)
@Column(nullable = false)
private String nodeId;
/**
* 节点类型
*/
@Column(name = "node_type", nullable = false, length = 50)
private String nodeType;
@Column(nullable = false)
private NodeTypeEnum nodeType;
/**
* 节点名称
*/
@Column(name = "name", nullable = false, length = 100)
@Column(nullable = false)
private String name;
/**
* 状态PENDING/RUNNING/COMPLETED/FAILED/CANCELED
* 节点状态
*/
@Column(name = "status", nullable = false, length = 20)
private String status;
@Column(nullable = false)
private NodeStatusEnum status;
/**
* 开始时间
*/
@Column(name = "start_time")
private LocalDateTime startTime;
/**
* 结束时间
*/
@Column(name = "end_time")
private LocalDateTime endTime;
/**
* 节点配置(JSON)
*/
@Column(columnDefinition = "TEXT")
private String config;
/**
* 输入参数(JSON)
*/
@Column(name = "input", columnDefinition = "TEXT")
@Column(columnDefinition = "TEXT")
private String input;
/**
* 输出结果(JSON)
*/
@Column(name = "output", columnDefinition = "TEXT")
@Column(columnDefinition = "TEXT")
private String output;
/**
* 错误信息
*/
@Column(name = "error", columnDefinition = "TEXT")
@Column(columnDefinition = "TEXT")
private String error;
/**
* 前置节点ID
*/
private String preNodeId;
public String getConfig() {
return config;
}
public Long getWorkflowInstanceId() {
return workflowInstanceId;
}
public String getPreNodeId() {
return preNodeId;
}
}

View File

@ -2,7 +2,10 @@ package com.qqchen.deploy.backend.workflow.entity;
import com.qqchen.deploy.backend.framework.annotation.LogicDelete;
import com.qqchen.deploy.backend.framework.domain.Entity;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import jakarta.persistence.Column;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.FetchType;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
@ -36,10 +39,11 @@ public class WorkflowInstance extends Entity<Long> {
private Long projectEnvId;
/**
* 状态RUNNING/COMPLETED/FAILED/CANCELED
* 状态
*/
@Column(name = "status", nullable = false, length = 20)
private String status;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private WorkflowStatusEnum status;
/**
* 开始时间

View File

@ -12,9 +12,11 @@ public enum NodeStatusEnum {
PENDING("PENDING", "等待执行"),
RUNNING("RUNNING", "执行中"),
PAUSED("PAUSED", "已暂停"),
COMPLETED("COMPLETED", "已完成"),
FAILED("FAILED", "执行失败"),
CANCELED("CANCELED", "已取消");
CANCELLED("CANCELLED", "已取消"),
SKIPPED("SKIPPED", "已跳过");
private final String code;
private final String description;

View File

@ -0,0 +1,29 @@
package com.qqchen.deploy.backend.workflow.enums;
import lombok.Getter;
/**
* 节点类型枚举
*/
@Getter
public enum NodeTypeEnum {
START("START", "开始节点"),
END("END", "结束节点"),
DEPLOY("DEPLOY", "部署节点"),
APPROVAL("APPROVAL", "审批节点"),
SCRIPT("SCRIPT", "脚本节点"),
SHELL("SHELL", "Shell脚本节点"),
CONFIG_SYNC("CONFIG_SYNC", "配置同步节点"),
CONDITION("CONDITION", "条件节点"),
PARALLEL("PARALLEL", "并行节点"),
SERIAL("SERIAL", "串行节点");
private final String code;
private final String desc;
NodeTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
}

View File

@ -16,13 +16,13 @@ public enum WorkflowStatusEnum {
DISABLED("DISABLED", "已禁用"),
// 工作流实例状态
CREATED("CREATED", "已创建"),
PENDING("PENDING", "等待执行"),
RUNNING("RUNNING", "执行中"),
PAUSED("PAUSED", "已暂停"),
COMPLETED("COMPLETED", "已完成"),
FAILED("FAILED", "执行失败"),
CANCELED("CANCELED", "已取消"),
TIMEOUT("TIMEOUT", "已超时");
CANCELLED("CANCELLED", "已取消");
private final String code;
private final String description;
@ -31,7 +31,7 @@ public enum WorkflowStatusEnum {
* 判断是否为终态
*/
public boolean isFinalStatus() {
return this == COMPLETED || this == FAILED || this == CANCELED || this == TIMEOUT;
return this == COMPLETED || this == FAILED || this == CANCELLED;
}
/**

View File

@ -3,6 +3,9 @@ package com.qqchen.deploy.backend.workflow.service;
import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.workflow.api.dto.NodeInstanceDTO;
import com.qqchen.deploy.backend.workflow.entity.NodeInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
@ -37,4 +40,9 @@ public interface INodeInstanceService extends IBaseService<NodeInstance, NodeIns
* @return 是否成功
*/
boolean updateStatus(Long id, String status, String output, String error);
List<NodeInstance> findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, NodeStatusEnum status);
@Transactional
void cancelRunningNodes(Long workflowInstanceId);
}

View File

@ -3,6 +3,9 @@ package com.qqchen.deploy.backend.workflow.service;
import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.workflow.api.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
@ -10,6 +13,9 @@ import java.util.List;
*/
public interface IWorkflowInstanceService extends IBaseService<WorkflowInstance, WorkflowInstanceDTO, Long> {
@Transactional
void updateStatus(Long id, WorkflowStatusEnum status, String error);
/**
* 启动工作流实例
*

View File

@ -8,7 +8,6 @@ import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.INodeInstanceRepository;
import com.qqchen.deploy.backend.workflow.service.INodeInstanceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -16,13 +15,8 @@ import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* 节点实例服务实现类
*/
@Slf4j
@Service
public class NodeInstanceServiceImpl extends BaseServiceImpl<NodeInstance, NodeInstanceDTO, Long>
implements INodeInstanceService {
public class NodeInstanceServiceImpl extends BaseServiceImpl<NodeInstance, NodeInstanceDTO, Long> implements INodeInstanceService {
@Resource
private INodeInstanceRepository nodeInstanceRepository;
@ -32,16 +26,16 @@ public class NodeInstanceServiceImpl extends BaseServiceImpl<NodeInstance, NodeI
@Override
public List<NodeInstanceDTO> findByWorkflowInstanceId(Long workflowInstanceId) {
List<NodeInstance> instances = nodeInstanceRepository.findByWorkflowInstanceId(workflowInstanceId);
return instances.stream()
return nodeInstanceRepository.findByWorkflowInstanceId(workflowInstanceId)
.stream()
.map(nodeInstanceConverter::toDto)
.collect(Collectors.toList());
}
@Override
public List<NodeInstanceDTO> findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, String status) {
List<NodeInstance> instances = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status);
return instances.stream()
return nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status)
.stream()
.map(nodeInstanceConverter::toDto)
.collect(Collectors.toList());
}
@ -49,23 +43,38 @@ public class NodeInstanceServiceImpl extends BaseServiceImpl<NodeInstance, NodeI
@Override
@Transactional
public boolean updateStatus(Long id, String status, String output, String error) {
NodeInstance instance = findEntityById(id);
instance.setStatus(status);
instance.setOutput(output);
instance.setError(error);
NodeInstance node = super.converter.toEntity(super.findById(id));
node.setStatus(NodeStatusEnum.valueOf(status));
node.setOutput(output);
node.setError(error);
// 如果是开始执行设置开始时间
if (NodeStatusEnum.RUNNING.getCode().equals(status)) {
instance.setStartTime(LocalDateTime.now());
}
// 如果是结束状态设置结束时间
else if (NodeStatusEnum.COMPLETED.getCode().equals(status)
|| NodeStatusEnum.FAILED.getCode().equals(status)
|| NodeStatusEnum.CANCELED.getCode().equals(status)) {
instance.setEndTime(LocalDateTime.now());
if (NodeStatusEnum.RUNNING.name().equals(status)) {
node.setStartTime(LocalDateTime.now());
} else if (NodeStatusEnum.COMPLETED.name().equals(status) ||
NodeStatusEnum.FAILED.name().equals(status) ||
NodeStatusEnum.CANCELLED.name().equals(status) ||
NodeStatusEnum.SKIPPED.name().equals(status)) {
node.setEndTime(LocalDateTime.now());
}
nodeInstanceRepository.save(instance);
nodeInstanceRepository.save(node);
return true;
}
@Override
public List<NodeInstance> findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, NodeStatusEnum status) {
return nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(workflowInstanceId, status.name());
}
@Override
@Transactional
public void cancelRunningNodes(Long workflowInstanceId) {
List<NodeInstance> runningNodes = nodeInstanceRepository.findByWorkflowInstanceIdAndStatus(
workflowInstanceId, NodeStatusEnum.RUNNING.name());
runningNodes.forEach(node -> {
node.setStatus(NodeStatusEnum.CANCELLED);
node.setEndTime(LocalDateTime.now());
nodeInstanceRepository.save(node);
});
}
}

View File

@ -2,89 +2,65 @@ package com.qqchen.deploy.backend.workflow.service.impl;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.workflow.api.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.converter.WorkflowInstanceConverter;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.NodeStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnum;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
import com.qqchen.deploy.backend.workflow.service.INodeInstanceService;
import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* 工作流实例服务实现类
*/
@Slf4j
@Service
public class WorkflowInstanceServiceImpl extends BaseServiceImpl<WorkflowInstance, WorkflowInstanceDTO, Long>
implements IWorkflowInstanceService {
public class WorkflowInstanceServiceImpl extends BaseServiceImpl<WorkflowInstance, WorkflowInstanceDTO, Long> implements IWorkflowInstanceService {
@Resource
private IWorkflowInstanceRepository workflowInstanceRepository;
@Resource
private IWorkflowDefinitionRepository workflowDefinitionRepository;
@Resource
private WorkflowInstanceConverter workflowInstanceConverter;
private INodeInstanceService nodeInstanceService;
@Override
@Transactional
public WorkflowInstanceDTO start(Long definitionId, Long projectEnvId, String variables) {
// 查询工作流定义
WorkflowDefinition definition = workflowDefinitionRepository.findById(definitionId)
.orElseThrow(() -> new IllegalArgumentException("工作流定义不存在"));
public void updateStatus(Long id, WorkflowStatusEnum status, String error) {
WorkflowInstance instance = super.converter.toEntity(super.findById(id));
instance.setStatus(status);
instance.setError(error);
// 创建工作流实例
WorkflowInstance instance = new WorkflowInstance();
instance.setDefinition(definition);
instance.setProjectEnvId(projectEnvId);
instance.setVariables(variables);
instance.setStatus(NodeStatusEnum.RUNNING.getCode());
if (status == WorkflowStatusEnum.RUNNING) {
instance.setStartTime(LocalDateTime.now());
} else if (status == WorkflowStatusEnum.COMPLETED || status == WorkflowStatusEnum.FAILED ||
status == WorkflowStatusEnum.CANCELLED) {
instance.setEndTime(LocalDateTime.now());
}
// 保存工作流实例
WorkflowInstanceDTO dto = workflowInstanceConverter.toDto(instance);
dto = super.create(dto);
if (status == WorkflowStatusEnum.CANCELLED) {
nodeInstanceService.cancelRunningNodes(id);
}
// TODO: 启动工作流执行引擎
return dto;
workflowInstanceRepository.save(instance);
}
@Override
public WorkflowInstanceDTO start(Long definitionId, Long projectEnvId, String variables) {
return null;
}
@Override
@Transactional
public boolean cancel(Long id) {
WorkflowInstance instance = findEntityById(id);
instance.setStatus(NodeStatusEnum.CANCELED.getCode());
instance.setEndTime(LocalDateTime.now());
workflowInstanceRepository.save(instance);
// TODO: 通知工作流执行引擎取消执行
return true;
return false;
}
@Override
public List<WorkflowInstanceDTO> findByProjectEnvId(Long projectEnvId) {
List<WorkflowInstance> instances = workflowInstanceRepository.findByProjectEnvId(projectEnvId);
return instances.stream()
.map(workflowInstanceConverter::toDto)
.collect(Collectors.toList());
return List.of();
}
@Override
public List<WorkflowInstanceDTO> findByProjectEnvIdAndStatus(Long projectEnvId, String status) {
List<WorkflowInstance> instances = workflowInstanceRepository.findByProjectEnvIdAndStatus(projectEnvId, status);
return instances.stream()
.map(workflowInstanceConverter::toDto)
.collect(Collectors.toList());
return List.of();
}
}

View File

@ -103,17 +103,18 @@ public class WorkflowPermissionServiceImpl implements IWorkflowPermissionService
}
User user = userOpt.get();
// 检查角色权限
if (user.getRoleId() != null && workflowPermissionRepository
.existsByWorkflowDefinitionIdAndRoleIdAndType(workflowDefinitionId, user.getRoleId(), type)) {
// // 检查角色权限
// if (user.getRoleId() != null && workflowPermissionRepository
// .existsByWorkflowDefinitionIdAndRoleIdAndType(workflowDefinitionId, user.getRoleId(), type)) {
// return true;
// }
//
// // 检查部门权限
// return user.getDepartmentId() != null && workflowPermissionRepository
// .existsByWorkflowDefinitionIdAndDepartmentIdAndType(workflowDefinitionId, user.getDepartmentId(), type);
return true;
}
// 检查部门权限
return user.getDepartmentId() != null && workflowPermissionRepository
.existsByWorkflowDefinitionIdAndDepartmentIdAndType(workflowDefinitionId, user.getDepartmentId(), type);
}
@Override
public List<WorkflowPermission> getPermissions(Long workflowDefinitionId) {
return workflowPermissionRepository.findByWorkflowDefinitionId(workflowDefinitionId);

View File

@ -1,88 +1,75 @@
package com.qqchen.deploy.backend.workflow.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.workflow.entity.WorkflowVariable;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowVariableRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowVariableService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 工作流变量服务实现
*/
@Slf4j
@Service
public class WorkflowVariableServiceImpl implements IWorkflowVariableService {
@Resource
private IWorkflowVariableRepository workflowVariableRepository;
@Resource
private ObjectMapper objectMapper;
@Override
@Transactional
public void saveVariables(Long workflowInstanceId, Map<String, Object> variables) {
variables.forEach((name, value) -> setVariable(workflowInstanceId, name, value));
if (variables == null || variables.isEmpty()) {
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID);
}
// 删除旧的变量
workflowVariableRepository.deleteByWorkflowInstanceId(workflowInstanceId);
// 保存新的变量
variables.forEach((name, value) -> {
WorkflowVariable variable = new WorkflowVariable();
variable.setWorkflowInstanceId(workflowInstanceId);
variable.setName(name);
variable.setValue(value.toString());
variable.setType(value.getClass().getSimpleName());
workflowVariableRepository.save(variable);
});
}
@Override
public Map<String, Object> getVariables(Long workflowInstanceId) {
List<WorkflowVariable> variables = workflowVariableRepository.findByWorkflowInstanceId(workflowInstanceId);
Map<String, Object> result = new HashMap<>(variables.size());
variables.forEach(variable -> {
try {
result.put(variable.getName(), deserializeValue(variable.getValue(), variable.getType()));
} catch (JsonProcessingException e) {
log.error("Failed to deserialize variable value: {}", variable.getName(), e);
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_DESERIALIZE_ERROR);
}
});
return result;
return workflowVariableRepository.findByWorkflowInstanceId(workflowInstanceId)
.stream()
.collect(Collectors.toMap(
WorkflowVariable::getName,
this::convertValue
));
}
@Override
public Object getVariable(Long workflowInstanceId, String name) {
return workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name)
.map(variable -> {
try {
return deserializeValue(variable.getValue(), variable.getType());
} catch (JsonProcessingException e) {
log.error("Failed to deserialize variable value: {}", name, e);
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_DESERIALIZE_ERROR);
}
})
.orElse(null);
Optional<WorkflowVariable> variable = workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name);
return variable.map(this::convertValue).orElse(null);
}
@Override
@Transactional
public void setVariable(Long workflowInstanceId, String name, Object value) {
try {
WorkflowVariable variable = workflowVariableRepository
.findByWorkflowInstanceIdAndName(workflowInstanceId, name)
WorkflowVariable variable = workflowVariableRepository.findByWorkflowInstanceIdAndName(workflowInstanceId, name)
.orElseGet(() -> {
WorkflowVariable newVariable = new WorkflowVariable();
newVariable.setWorkflowInstanceId(workflowInstanceId);
newVariable.setName(name);
return newVariable;
WorkflowVariable newVar = new WorkflowVariable();
newVar.setWorkflowInstanceId(workflowInstanceId);
newVar.setName(name);
return newVar;
});
variable.setType(value.getClass().getName());
variable.setValue(serializeValue(value));
variable.setValue(value.toString());
variable.setType(value.getClass().getSimpleName());
workflowVariableRepository.save(variable);
} catch (JsonProcessingException e) {
log.error("Failed to serialize variable value: {}", name, e);
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_SERIALIZE_ERROR);
}
}
@Override
@ -97,17 +84,24 @@ public class WorkflowVariableServiceImpl implements IWorkflowVariableService {
workflowVariableRepository.deleteByWorkflowInstanceId(workflowInstanceId);
}
private String serializeValue(Object value) throws JsonProcessingException {
return objectMapper.writeValueAsString(value);
}
private Object deserializeValue(String value, String type) throws JsonProcessingException {
private Object convertValue(WorkflowVariable variable) {
try {
Class<?> clazz = Class.forName(type);
return objectMapper.readValue(value, clazz);
} catch (ClassNotFoundException e) {
log.error("Failed to find class for type: {}", type, e);
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_ERROR);
switch (variable.getType()) {
case "String":
return variable.getValue();
case "Integer":
return Integer.parseInt(variable.getValue());
case "Long":
return Long.parseLong(variable.getValue());
case "Double":
return Double.parseDouble(variable.getValue());
case "Boolean":
return Boolean.parseBoolean(variable.getValue());
default:
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID);
}
} catch (Exception e) {
throw new BusinessException(ResponseCode.WORKFLOW_VARIABLE_TYPE_INVALID);
}
}
}

View File

@ -99,26 +99,14 @@ workflow.definition.already.published=工作流定义已发布
workflow.definition.cannot.delete=工作流定义已被使用,无法删除
workflow.instance.not.found=工作流实例不存在
workflow.instance.already.started=工作流实例已启动
workflow.instance.already.ended=工作流实例已结束
workflow.instance.already.suspended=工作流实例已挂起
workflow.instance.not.suspended=工作流实例未挂起
workflow.instance.cannot.start=工作流实例无法启动
workflow.instance.cannot.cancel=工作流实例无法取消
workflow.instance.cannot.suspend=工作流实例无法挂起
workflow.instance.cannot.pause=工作流实例无法暂停
workflow.instance.cannot.resume=工作流实例无法恢复
workflow.instance.cannot.retry=工作流实例无法重试
workflow.node.not.found=工作流节点不存在
workflow.node.type.not.supported=不支持的节点类型:{0}
workflow.node.config.invalid=节点配置无效:{0}
workflow.node.execution.failed=节点执行失败:{0}
workflow.node.timeout=节点执行超时
workflow.node.approval.rejected=节点审批被拒绝
workflow.node.approval.canceled=节点审批已取消
workflow.variable.not.found=工作流变量不存在
workflow.variable.required=工作流变量"{0}"为必填项
workflow.variable.invalid=工作流变量"{0}"的值无效
workflow.permission.denied=无权操作此工作流
workflow.operation.not.allowed=当前状态不允许此操作
workflow.concurrent.operation=工作流正在执行其他操作,请稍后重试
# 节点相关错误消息
node.instance.not.found=节点实例不存在
node.instance.cannot.retry=节点实例无法重试
node.instance.cannot.skip=节点实例无法跳过
node.executor.not.found=节点执行器不存在