From 26b0fb6a3e242ecdef7a66157360fc3920d4574f Mon Sep 17 00:00:00 2001 From: dengqichen Date: Tue, 10 Dec 2024 11:12:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=8E=89=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=B2=A1=E7=94=A8=E7=9A=84=E5=B7=A5=E4=BD=9C=E6=B5=81=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=9A=84=E7=B1=BB=EF=BC=8C=E9=87=8D=E6=96=B0=E5=BC=80?= =?UTF-8?q?=E5=8F=91=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/README.md | 29 ++ .../api/WorkflowDefinitionApiController.java | 222 ++++++++----- .../workflow/delegate/ShellTaskDelegate.java | 84 +++-- .../workflow/dto/WorkflowExecutionDTO.java | 9 +- .../workflow/dto/WorkflowInstanceDTO.java | 3 +- .../workflow/entity/WorkflowInstance.java | 6 +- .../enums/WorkflowInstanceStatus.java | 110 +++++++ .../WorkflowInstanceRepository.java | 27 ++ .../service/IWorkflowInstanceService.java | 54 ++++ .../impl/WorkflowDefinitionServiceImpl.java | 304 +++++++++++------- .../impl/WorkflowInstanceServiceImpl.java | 158 +++++++++ backend/src/main/resources/application.yml | 2 +- 12 files changed, 779 insertions(+), 229 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowInstanceStatus.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/WorkflowInstanceRepository.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java diff --git a/backend/README.md b/backend/README.md index 23d29045..276958cd 100644 --- a/backend/README.md +++ b/backend/README.md @@ -882,3 +882,32 @@ mvn spring-boot:run ## 许可证 [MIT License](LICENSE) ``` + + +workflow_definition(工作流定义表) +存储我们自定义的工作流定义信息 +包含流程名称、标识、版本、BPMN XML内容等 +与Flowable的act_re_procdef表是一对一的关系 +额外存储了图形编辑器的JSON数据,方便前端展示和编辑 +workflow_instance(工作流实例表) +记录工作流的执行实例信息 +存储流程实例ID、业务标识、状态、变量等 +与Flowable的act_ru_execution表是一对一的关系 +提供了更多的业务字段,如开始时间、结束时间等 +workflow_node_instance(工作流节点实例表) +记录工作流中每个节点的执行情况 +存储节点ID、名称、类型、状态等 +与Flowable的act_ru_task和act_hi_actinst表有关联 +特别记录了Shell任务的执行结果、错误信息等 +workflow_log(工作流日志表) +记录工作流执行过程中的详细日志 +可以关联到具体的实例和节点 +包含日志类型、级别、内容等 +方便排查问题和审计 +这些表与Flowable的原生表的主要区别是: + +更贴近业务需求,字段设计更符合我们的使用场景 +提供了更丰富的元数据和状态信息 +支持更细粒度的日志记录 +可以存储自定义的业务数据 +这样的设计让我们可以在使用Flowable强大的工作流引擎的同时,也能满足特定的业务需求。 \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java index e26ac2e8..9849018a 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java @@ -8,6 +8,7 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery; import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService; import io.swagger.v3.oas.annotations.Operation; @@ -103,11 +104,11 @@ public class WorkflowDefinitionApiController extends BaseController activities = historyService.createHistoricActivityInstanceQuery() .processInstanceId(processInstanceId) .orderByHistoricActivityInstanceStartTime() .asc() .list(); - + List activityInstances = activities.stream() .map(activity -> { WorkflowInstanceDTO.ActivityInstance activityInstance = new WorkflowInstanceDTO.ActivityInstance(); @@ -136,7 +137,7 @@ public class WorkflowDefinitionApiController extends BaseController variables = historicProcessInstance.getProcessVariables(); @@ -144,97 +145,97 @@ public class WorkflowDefinitionApiController extends BaseController getWorkflowExecution(@PathVariable String processInstanceId) { - // 获取历史活动实例 - List activities = historyService.createHistoricActivityInstanceQuery() - .processInstanceId(processInstanceId) - .orderByHistoricActivityInstanceStartTime() - .asc() - .list(); +// // 获取历史活动实例 +// List activities = historyService.createHistoricActivityInstanceQuery() +// .processInstanceId(processInstanceId) +// .orderByHistoricActivityInstanceStartTime() +// .asc() +// .list(); +// +// // 获取流程实例 +// HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery() +// .processInstanceId(processInstanceId) +// .singleResult(); +// +// if (processInstance == null) { +// return ResponseEntity.notFound().build(); +// } +// +// // 获取所有流程变量 +// List allVariables = historyService.createHistoricVariableInstanceQuery() +// .processInstanceId(processInstanceId) +// .list(); +// +// // 构建变量映射 +// Map variableMap = new HashMap<>(); +// for (HistoricVariableInstance variable : allVariables) { +// variableMap.put(variable.getVariableName(), variable.getValue()); +// } +// +// // 构建执行状态DTO +// WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO(); +// executionDTO.setProcessInstanceId(processInstance.getId()); +// executionDTO.setProcessDefinitionId(processInstance.getProcessDefinitionId()); +//// executionDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey()); +// executionDTO.setBusinessKey(processInstance.getBusinessKey()); +// +// // 设置流程状态 +// if (processInstance.getEndTime() != null) { +// executionDTO.setStatus(processInstance.getDeleteReason() == null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED); +// } else { +// executionDTO.setStatus(WorkflowInstanceStatus.RUNNING); +// } +// +// // 构建阶段列表 +// List stages = new ArrayList<>(); +// for (HistoricActivityInstance activity : activities) { +// WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO(); +// stage.setId(activity.getActivityId()); +// stage.setName(activity.getActivityName()); +// stage.setType(activity.getActivityType()); +//// stage.setStartTime(activity.getStartTime() != null ? activity.getStartTime().toString() : null); +//// stage.setEndTime(activity.getEndTime() != null ? activity.getEndTime().toString() : null); +// +// // 设置阶段状态 +// if (activity.getEndTime() == null) { +// stage.setStatus(WorkflowInstanceStatus.RUNNING); +// } else { +// stage.setStatus(activity.getDeleteReason() == null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED); +// } +// +// // 如果是Shell任务,获取输出 +// if ("serviceTask".equals(activity.getActivityType()) && +// activity.getActivityId().toLowerCase().contains("shell")) { +// +// // 获取Shell任务的输出 +// String output = (String) variableMap.get("shellOutput"); +// String error = (String) variableMap.get("shellError"); +// Integer exitCode = (Integer) variableMap.get("shellExitCode"); +// +// stage.setOutput(output); +// stage.setError(error); +// stage.setExitCode(exitCode); +// } +// +// stages.add(stage); +// } +// executionDTO.setStages(stages); +// executionDTO.setVariables(variableMap); - // 获取流程实例 - HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery() - .processInstanceId(processInstanceId) - .singleResult(); - - if (processInstance == null) { - return ResponseEntity.notFound().build(); - } - - // 获取所有流程变量 - List allVariables = historyService.createHistoricVariableInstanceQuery() - .processInstanceId(processInstanceId) - .list(); - - // 构建变量映射 - Map variableMap = new HashMap<>(); - for (HistoricVariableInstance variable : allVariables) { - variableMap.put(variable.getVariableName(), variable.getValue()); - } - - // 构建执行状态DTO - WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO(); - executionDTO.setProcessInstanceId(processInstance.getId()); - executionDTO.setProcessDefinitionId(processInstance.getProcessDefinitionId()); -// executionDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey()); - executionDTO.setBusinessKey(processInstance.getBusinessKey()); - - // 设置流程状态 - if (processInstance.getEndTime() != null) { - executionDTO.setStatus(processInstance.getDeleteReason() == null ? "COMPLETED" : "FAILED"); - } else { - executionDTO.setStatus("RUNNING"); - } - - // 构建阶段列表 - List stages = new ArrayList<>(); - for (HistoricActivityInstance activity : activities) { - WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO(); - stage.setId(activity.getActivityId()); - stage.setName(activity.getActivityName()); - stage.setType(activity.getActivityType()); -// stage.setStartTime(activity.getStartTime() != null ? activity.getStartTime().toString() : null); -// stage.setEndTime(activity.getEndTime() != null ? activity.getEndTime().toString() : null); - - // 设置阶段状态 - if (activity.getEndTime() == null) { - stage.setStatus("RUNNING"); - } else { - stage.setStatus(activity.getDeleteReason() == null ? "COMPLETED" : "FAILED"); - } - - // 如果是Shell任务,获取输出 - if ("serviceTask".equals(activity.getActivityType()) && - activity.getActivityId().toLowerCase().contains("shell")) { - - // 获取Shell任务的输出 - String output = (String) variableMap.get("shellOutput"); - String error = (String) variableMap.get("shellError"); - Integer exitCode = (Integer) variableMap.get("shellExitCode"); - - stage.setOutput(output); - stage.setError(error); - stage.setExitCode(exitCode); - } - - stages.add(stage); - } - executionDTO.setStages(stages); - executionDTO.setVariables(variableMap); - - return ResponseEntity.ok(executionDTO); + return ResponseEntity.ok(workflowDefinitionService.getWorkflowExecution(processInstanceId)); } @Operation(summary = "查询工作流实例列表") @@ -261,7 +262,7 @@ public class WorkflowDefinitionApiController extends BaseController> getNodeLogs( + @Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId, + @Parameter(description = "节点ID", required = true) @PathVariable String nodeId + ) { + Map result = new HashMap<>(); + + // 获取历史活动实例 + HistoricActivityInstance activity = historyService.createHistoricActivityInstanceQuery() + .processInstanceId(processInstanceId) + .activityId(nodeId) + .singleResult(); + + if (activity == null) { + return Response.error(ResponseCode.WORKFLOW_NODE_NOT_FOUND); + } + + // 获取执行ID + String executionId = activity.getExecutionId(); + + // 获取节点相关的变量 + List variables = historyService.createHistoricVariableInstanceQuery() + .processInstanceId(processInstanceId) + .list(); + + // 收集日志信息 + variables.forEach(variable -> { + String variableName = variable.getVariableName(); + if (variableName.startsWith(executionId)) { + // 移除executionId前缀 + String key = variableName.substring(executionId.length() + 1); // +1 for the underscore + result.put(key, variable.getValue()); + } + }); + + // 添加节点基本信息 + result.put("nodeId", nodeId); + result.put("nodeName", activity.getActivityName()); + result.put("nodeType", activity.getActivityType()); + result.put("startTime", activity.getStartTime()); + result.put("endTime", activity.getEndTime()); + result.put("status", activity.getEndTime() == null ? "RUNNING" : "COMPLETED"); + + return Response.success(result); + } + @Override protected void exportData(HttpServletResponse response, List data) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java index 648d4a45..8037e686 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java @@ -4,8 +4,10 @@ import lombok.extern.slf4j.Slf4j; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.RuntimeService; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; +import jakarta.annotation.Resource; import java.io.BufferedReader; import java.io.File; @@ -16,6 +18,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; /** * Shell任务委托执行器 @@ -24,10 +27,17 @@ import java.util.concurrent.TimeUnit; @Component public class ShellTaskDelegate implements JavaDelegate { + @Resource + private RuntimeService runtimeService; + private Expression script; private Expression workDir; private Expression env; + // 用于存储实时输出的Map + private static final Map outputMap = new ConcurrentHashMap<>(); + private static final Map errorMap = new ConcurrentHashMap<>(); + @Override public void execute(DelegateExecution execution) { // 从字段注入中获取值 @@ -54,13 +64,40 @@ public class ShellTaskDelegate implements JavaDelegate { } try { + // 初始化输出缓存 + String executionId = execution.getId(); + outputMap.put(executionId, new StringBuilder()); + errorMap.put(executionId, new StringBuilder()); + // 创建进程构建器 ProcessBuilder processBuilder = new ProcessBuilder(); - processBuilder.command("bash", "-c", scriptValue); + + // 根据操作系统选择合适的shell + String os = System.getProperty("os.name").toLowerCase(); + if (os.contains("win")) { + // Windows系统使用cmd + processBuilder.command("cmd", "/c", scriptValue); + } else { + // Unix-like系统使用bash + processBuilder.command("bash", "-c", scriptValue); + } // 设置工作目录 if (StringUtils.hasText(workDirValue)) { - processBuilder.directory(new File(workDirValue)); + // Windows系统路径处理 + if (os.contains("win")) { + // 确保使用Windows风格的路径分隔符 + workDirValue = workDirValue.replace("/", "\\"); + // 如果路径以\开头,去掉第一个\ + if (workDirValue.startsWith("\\")) { + workDirValue = workDirValue.substring(1); + } + } + File workDirFile = new File(workDirValue); + if (!workDirFile.exists()) { + workDirFile.mkdirs(); + } + processBuilder.directory(workDirFile); } // 设置环境变量 @@ -76,9 +113,6 @@ public class ShellTaskDelegate implements JavaDelegate { BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); - StringBuilder output = new StringBuilder(); - StringBuilder error = new StringBuilder(); - // 创建线程池处理输出 ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -87,11 +121,15 @@ public class ShellTaskDelegate implements JavaDelegate { String line; try { while ((line = reader.readLine()) != null) { + StringBuilder output = outputMap.get(executionId); synchronized (output) { output.append(line).append("\n"); } log.info("Shell output: {}", line); - Thread.sleep(500); // 增加延迟,让输出更容易观察 + // 使用RuntimeService更新变量 + runtimeService.setVariable(execution.getProcessInstanceId(), + executionId + "_shellOutput", output.toString()); + Thread.sleep(500); } } catch (Exception e) { log.error("Error reading process output", e); @@ -103,27 +141,20 @@ public class ShellTaskDelegate implements JavaDelegate { String line; try { while ((line = errorReader.readLine()) != null) { + StringBuilder error = errorMap.get(executionId); synchronized (error) { error.append(line).append("\n"); } log.error("Shell error: {}", line); + // 使用RuntimeService更新变量 + runtimeService.setVariable(execution.getProcessInstanceId(), + executionId + "_shellError", error.toString()); } } catch (IOException e) { log.error("Error reading process error", e); } }); - // 定期更新变量 - while (!outputFuture.isDone() || !errorFuture.isDone()) { - synchronized (output) { - execution.setVariable("shellOutput", output.toString()); - } - synchronized (error) { - execution.setVariable("shellError", error.toString()); - } - Thread.sleep(1000); // 每秒更新一次变量 - } - // 等待进程完成 int exitCode = process.waitFor(); @@ -135,22 +166,25 @@ public class ShellTaskDelegate implements JavaDelegate { executorService.shutdown(); // 设置最终结果 - synchronized (output) { - execution.setVariable("shellOutput", output.toString()); - } - synchronized (error) { - execution.setVariable("shellError", error.toString()); - } + StringBuilder finalOutput = outputMap.get(executionId); + StringBuilder finalError = errorMap.get(executionId); + + execution.setVariable("shellOutput", finalOutput.toString()); + execution.setVariable("shellError", finalError.toString()); execution.setVariable("shellExitCode", exitCode); + // 清理缓存 + outputMap.remove(executionId); + errorMap.remove(executionId); + if (exitCode != 0) { log.error("Shell script execution failed with exit code: {}", exitCode); - log.error("Error output: {}", error); + log.error("Error output: {}", finalError); throw new RuntimeException("Shell script execution failed with exit code: " + exitCode); } log.info("Shell script executed successfully"); - log.debug("Script output: {}", output); + log.debug("Script output: {}", finalOutput); } catch (Exception e) { log.error("Shell script execution failed", e); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowExecutionDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowExecutionDTO.java index 91d91f42..c51f6799 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowExecutionDTO.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowExecutionDTO.java @@ -1,5 +1,6 @@ package com.qqchen.deploy.backend.workflow.dto; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; import lombok.Data; import java.util.Date; import java.util.List; @@ -28,9 +29,9 @@ public class WorkflowExecutionDTO { private String businessKey; /** - * 流程状态:RUNNING, SUSPENDED, COMPLETED + * 流程状态 */ - private String status; + private WorkflowInstanceStatus status; /** * 当前活动节点ID @@ -90,9 +91,9 @@ public class WorkflowExecutionDTO { private String type; /** - * 节点状态:PENDING, RUNNING, COMPLETED, FAILED + * 节点状态 */ - private String status; + private WorkflowInstanceStatus status; /** * 开始时间 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowInstanceDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowInstanceDTO.java index a315b45b..a6dca641 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowInstanceDTO.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowInstanceDTO.java @@ -1,5 +1,6 @@ package com.qqchen.deploy.backend.workflow.dto; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import java.util.Date; @@ -31,7 +32,7 @@ public class WorkflowInstanceDTO { private String startUserId; @Schema(description = "状态(RUNNING/COMPLETED/FAILED)") - private String status; + private WorkflowInstanceStatus status; @Data @Schema(description = "活动节点信息") diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java index 2e8bc4f3..d251d591 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java @@ -1,7 +1,10 @@ package com.qqchen.deploy.backend.workflow.entity; import com.qqchen.deploy.backend.framework.domain.Entity; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; import jakarta.persistence.Column; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; import jakarta.persistence.Table; import lombok.Data; import lombok.EqualsAndHashCode; @@ -39,7 +42,8 @@ public class WorkflowInstance extends Entity { * 实例状态 */ @Column(nullable = false) - private String status; + @Enumerated(EnumType.STRING) + private WorkflowInstanceStatus status; /** * 流程变量(JSON) diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowInstanceStatus.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowInstanceStatus.java new file mode 100644 index 00000000..c6fd61c9 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowInstanceStatus.java @@ -0,0 +1,110 @@ +package com.qqchen.deploy.backend.workflow.enums; + +import lombok.Getter; + +/** + * 工作流实例状态枚举 + * 工作流实例的生命周期: + * NOT_STARTED -> CREATED -> RUNNING -> (SUSPENDED) -> COMPLETED/TERMINATED/FAILED + */ +@Getter +public enum WorkflowInstanceStatus { + + /** + * 未开始:节点尚未开始执行 + */ + NOT_STARTED("NOT_STARTED", "未开始"), + + /** + * 已创建:流程实例已创建但还未开始运行 + */ + CREATED("CREATED", "已创建"), + + /** + * 运行中:流程实例正在执行 + */ + RUNNING("RUNNING", "运行中"), + + /** + * 已暂停:流程实例被手动暂停 + */ + SUSPENDED("SUSPENDED", "已暂停"), + + /** + * 已完成:流程实例正常完成 + */ + COMPLETED("COMPLETED", "已完成"), + + /** + * 已终止:流程实例被手动终止 + */ + TERMINATED("TERMINATED", "已终止"), + + /** + * 执行失败:流程实例执行过程中发生错误 + */ + FAILED("FAILED", "执行失败"); + + /** + * 状态编码 + */ + private final String code; + + /** + * 状态描述 + */ + private final String description; + + WorkflowInstanceStatus(String code, String description) { + this.code = code; + this.description = description; + } + + /** + * 根据状态编码获取枚举实例 + * + * @param code 状态编码 + * @return 对应的枚举实例 + */ + public static WorkflowInstanceStatus fromCode(String code) { + for (WorkflowInstanceStatus status : WorkflowInstanceStatus.values()) { + if (status.getCode().equals(code)) { + return status; + } + } + throw new IllegalArgumentException("Invalid workflow instance status code: " + code); + } + + /** + * 判断是否为终态(不可再变化的状态) + */ + public boolean isFinalState() { + return this == COMPLETED || this == TERMINATED || this == FAILED; + } + + /** + * 判断是否可以暂停 + */ + public boolean canBeSuspended() { + return this == RUNNING; + } + + /** + * 判断是否可以恢复 + */ + public boolean canBeResumed() { + return this == SUSPENDED; + } + + /** + * 判断是否可以终止 + */ + public boolean canBeTerminated() { + return this == RUNNING || this == SUSPENDED; + } + + @Override + public String toString() { + return this.code; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/WorkflowInstanceRepository.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/WorkflowInstanceRepository.java new file mode 100644 index 00000000..ec1637f8 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/WorkflowInstanceRepository.java @@ -0,0 +1,27 @@ +package com.qqchen.deploy.backend.workflow.repository; + +import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; + +@Repository +public interface WorkflowInstanceRepository extends JpaRepository { + + /** + * 根据Flowable流程实例ID查询工作流实例 + */ + Optional findByProcessInstanceId(String processInstanceId); + + /** + * 根据业务标识查询工作流实例列表 + */ + List findByBusinessKey(String businessKey); + + /** + * 根据流程定义ID查询工作流实例列表 + */ + List findByProcessDefinitionId(Long processDefinitionId); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java new file mode 100644 index 00000000..d221f923 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java @@ -0,0 +1,54 @@ +package com.qqchen.deploy.backend.workflow.service; + +import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; +import org.flowable.engine.runtime.ProcessInstance; + +import java.util.List; +import java.util.Map; + +public interface IWorkflowInstanceService { + + /** + * 创建工作流实例并关联Flowable实例 + * + * @param processInstance Flowable流程实例 + * @param variables 流程变量 + * @return 工作流实例 + */ + WorkflowInstance createWorkflowInstance(ProcessInstance processInstance, Map variables); + + /** + * 更新工作流实例状态 + * + * @param processInstanceId Flowable流程实例ID + * @param status 新状态 + * @return 更新后的工作流实例 + */ + WorkflowInstance updateInstanceStatus(String processInstanceId, WorkflowInstanceStatus status); + + /** + * 获取工作流实例详情 + * + * @param processInstanceId Flowable流程实例ID + * @return 工作流实例详情DTO + */ + WorkflowInstanceDTO getInstanceDetails(String processInstanceId); + + /** + * 根据业务标识查询工作流实例 + * + * @param businessKey 业务标识 + * @return 工作流实例列表 + */ + List findByBusinessKey(String businessKey); + + /** + * 完成工作流实例 + * + * @param processInstanceId Flowable流程实例ID + * @param variables 完成时的变量 + */ + void completeInstance(String processInstanceId, Map variables); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowDefinitionServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowDefinitionServiceImpl.java index 748a662c..bf142ef9 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowDefinitionServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowDefinitionServiceImpl.java @@ -5,6 +5,7 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService; import jakarta.annotation.Resource; @@ -27,17 +28,22 @@ import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.flowable.variable.api.history.HistoricVariableInstance; + /** * 工作流定义服务实现 */ @Slf4j @Service public class WorkflowDefinitionServiceImpl extends BaseServiceImpl - implements IWorkflowDefinitionService { + implements IWorkflowDefinitionService { @Resource private RepositoryService repositoryService; @@ -57,15 +63,15 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl new RuntimeException("Workflow definition not found: " + dto.getId())); + .orElseThrow(() -> new RuntimeException("Workflow definition not found: " + dto.getId())); // 转换为DTO dto.setFlowVersion(definition.getFlowVersion()); @@ -83,10 +89,10 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl activeActivityIds = runtimeService.getActiveActivityIds(processInstanceId); - - // 3. 获取历史活动节点 - List historicActivities = historyService.createHistoricActivityInstanceQuery() - .processInstanceId(processInstanceId) - .orderByHistoricActivityInstanceStartTime() - .asc() - .list(); + log.info("=== 开始获取工作流执行状态 ==="); + log.info("流程实例ID: {}", processInstanceId); + log.info("流程定义ID: {}", historicInstance.getProcessDefinitionId()); + log.info("是否有运行实例: {}", runningInstance != null); - // 4. 获取流程实例的历史信息 - HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() - .processInstanceId(processInstanceId) - .singleResult(); - - // 5. 获取流程定义模型 - String processDefinitionId = processInstance.getProcessDefinitionId(); - BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinitionId); + // 2. 获取流程定义 + BpmnModel bpmnModel = repositoryService.getBpmnModel(historicInstance.getProcessDefinitionId()); Process process = bpmnModel.getMainProcess(); - Collection flowElements = process.getFlowElements(); - - // 6. 创建返回对象 - WorkflowExecutionDTO dto = new WorkflowExecutionDTO(); - dto.setProcessInstanceId(processInstanceId); - dto.setProcessDefinitionId(processInstance.getProcessDefinitionId()); - dto.setProcessDefinitionName(processInstance.getProcessDefinitionName()); - dto.setBusinessKey(processInstance.getBusinessKey()); - dto.setStatus(processInstance.isSuspended() ? "SUSPENDED" : - (historicProcessInstance != null && historicProcessInstance.getEndTime() != null) ? "COMPLETED" : "RUNNING"); - // 7. 设置时间信息 - if (historicProcessInstance != null) { - dto.setStartTime(historicProcessInstance.getStartTime()); - dto.setEndTime(historicProcessInstance.getEndTime()); - dto.setDurationInMillis(historicProcessInstance.getDurationInMillis()); + // 打印流程定义中的所有节点 + log.info("=== 流程定义节点信息 ==="); + process.getFlowElements().forEach(element -> { + log.info("节点信息 - ID: {}, 名称: {}, 类型: {}", + element.getId(), + element.getName(), + element.getClass().getSimpleName()); + }); + + // 3. 获取所有历史活动实例 + List historicActivities = historyService.createHistoricActivityInstanceQuery() + .processInstanceId(processInstanceId) + .orderByHistoricActivityInstanceStartTime() + .asc() + .list(); + + log.info("=== 历史活动实例信息 ==="); + historicActivities.forEach(activity -> { + log.info("历史活动 - ID: {}, 名称: {}, 类型: {}, 开始时间: {}, 结束时间: {}", + activity.getActivityId(), + activity.getActivityName(), + activity.getActivityType(), + activity.getStartTime(), + activity.getEndTime()); + }); + + // 4. 获取当前活动的节点 + List activeActivityIds = new ArrayList<>(); + if (runningInstance != null) { + activeActivityIds = runtimeService.getActiveActivityIds(processInstanceId); + log.info("=== 当前活动节点 ==="); + log.info("活动节点IDs: {}", activeActivityIds); } - // 8. 获取流程变量 - Map variables = runtimeService.getVariables(processInstanceId); - dto.setVariables(variables); + // 5. 获取所有流程变量 + List variables = historyService.createHistoricVariableInstanceQuery() + .processInstanceId(processInstanceId) + .list(); - // 9. 设置当前活动节点 + log.info("=== 流程变量信息 ==="); + Map variableMap = new HashMap<>(); + variables.forEach(variable -> { + variableMap.put(variable.getVariableName(), variable.getValue()); + log.info("变量 - 名称: {}, 值: {}", variable.getVariableName(), variable.getValue()); + }); + + // 6. 构建执行状态DTO + WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO(); + executionDTO.setProcessInstanceId(processInstanceId); + executionDTO.setProcessDefinitionId(historicInstance.getProcessDefinitionId()); + executionDTO.setProcessDefinitionName(historicInstance.getProcessDefinitionName()); + executionDTO.setBusinessKey(historicInstance.getBusinessKey()); + executionDTO.setStartTime(historicInstance.getStartTime()); + executionDTO.setEndTime(historicInstance.getEndTime()); + executionDTO.setDurationInMillis(historicInstance.getDurationInMillis()); + executionDTO.setVariables(variableMap); + + // 设置状态 + if (historicInstance.getEndTime() != null) { + executionDTO.setStatus(historicInstance.getDeleteReason() == null ? + WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED); + } else { + executionDTO.setStatus(runningInstance != null && runningInstance.isSuspended() ? + WorkflowInstanceStatus.SUSPENDED : WorkflowInstanceStatus.RUNNING); + } + + // 7. 创建历史活动实例的映射,用于快速查找 + Map historicActivityMap = historicActivities.stream() + .collect(Collectors.toMap( + HistoricActivityInstance::getActivityId, + activity -> activity, + (existing, replacement) -> existing + )); + + // 8. 获取所有流程节点并构建阶段列表 + List stages = new ArrayList<>(); + Collection flowElements = process.getFlowElements(); + + // 记录已执行过的节点ID + Set executedActivityIds = historicActivities.stream() + .map(HistoricActivityInstance::getActivityId) + .collect(Collectors.toSet()); + + // 获取所有节点的列表(按照流程定义的顺序) + List orderedElements = new ArrayList<>(); + for (FlowElement element : flowElements) { + if (element instanceof Task || element instanceof Event || element instanceof ServiceTask) { + orderedElements.add(element); + } + } + + // 遍历所有节点 + for (int i = 0; i < orderedElements.size(); i++) { + FlowElement element = orderedElements.get(i); + WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO(); + stage.setId(element.getId()); + stage.setName(element.getName()); + + // 设置节点类型 + if (element instanceof ServiceTask) { + stage.setType("serviceTask"); + } else if (element instanceof Event) { + stage.setType("startEvent"); + if (element.getId().equals("end")) { + stage.setType("endEvent"); + } + } else { + stage.setType(element.getClass().getSimpleName()); + } + + // 获取历史活动实例 + HistoricActivityInstance historicActivity = historicActivityMap.get(element.getId()); + + // 设置开始和结束时间 + if (historicActivity != null) { + stage.setStartTime(historicActivity.getStartTime()); + stage.setEndTime(historicActivity.getEndTime()); + } + + // 设置节点状态 + if (historicActivity != null) { + if (historicActivity.getEndTime() != null) { + // 已完成的节点 + stage.setStatus(historicActivity.getDeleteReason() == null ? + WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED); + } else if (activeActivityIds.contains(element.getId())) { + // 正在执行的节点 + stage.setStatus(WorkflowInstanceStatus.RUNNING); + } else { + // 已开始但未完成的节点 + stage.setStatus(WorkflowInstanceStatus.RUNNING); + } + } else { + // 未执行的节点 + if (i == 0 && executedActivityIds.isEmpty()) { + // 如果是第一个节点且没有任何节点执行过,设置为RUNNING + stage.setStatus(WorkflowInstanceStatus.RUNNING); + } else if (i > 0 && stages.get(i-1).getStatus() == WorkflowInstanceStatus.COMPLETED) { + // 如果前一个节点已完成,当前节点设置为RUNNING + stage.setStatus(WorkflowInstanceStatus.RUNNING); + } else { + // 其他情况设置为NOT_STARTED + stage.setStatus(WorkflowInstanceStatus.NOT_STARTED); + } + } + + // 如果是Shell任务,获取执行结果 + if (element instanceof ServiceTask) { + ServiceTask serviceTask = (ServiceTask) element; + if ("shell".equals(serviceTask.getType()) && historicActivity != null) { + String executionId = historicActivity.getExecutionId(); + stage.setOutput((String) variableMap.get(executionId + "_shellOutput")); + stage.setError((String) variableMap.get(executionId + "_shellError")); + stage.setExitCode((Integer) variableMap.get(executionId + "_exitCode")); + } + } + + stages.add(stage); + } + + executionDTO.setStages(stages); + + // 10. 设置当前活动节点 if (!activeActivityIds.isEmpty()) { String currentActivityId = activeActivityIds.get(0); - dto.setCurrentActivityId(currentActivityId); - - // 从流程定义中找到当前节点的名称和类型 FlowElement currentElement = process.getFlowElement(currentActivityId); if (currentElement != null) { - dto.setCurrentActivityName(currentElement.getName()); - dto.setCurrentActivityType(currentElement.getClass().getSimpleName()); + executionDTO.setCurrentActivityId(currentActivityId); + executionDTO.setCurrentActivityName(currentElement.getName()); + executionDTO.setCurrentActivityType(currentElement.getClass().getSimpleName()); } } - // 10. 创建历史活动实例的映射,用于快速查找 - Map historicActivityMap = historicActivities.stream() - .collect(Collectors.toMap( - HistoricActivityInstance::getActivityId, - activity -> activity, - (existing, replacement) -> existing // 如果有重复,保留第一个 - )); - - // 11. 设置所有节点信息 - List stages = new ArrayList<>(); - for (FlowElement element : flowElements) { - // 只处理任务节点和事件节点 - if (element instanceof Task || element instanceof Event) { - WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO(); - stage.setId(element.getId()); - stage.setName(element.getName()); - stage.setType(element.getClass().getSimpleName()); - - // 获取历史活动实例(如果存在) - HistoricActivityInstance historicActivity = historicActivityMap.get(element.getId()); - - if (historicActivity != null) { - // 节点已经执行过 - stage.setStartTime(historicActivity.getStartTime()); - stage.setEndTime(historicActivity.getEndTime()); - - if (historicActivity.getEndTime() != null) { - stage.setStatus("COMPLETED"); - } else if (activeActivityIds.contains(element.getId())) { - stage.setStatus("RUNNING"); - } else { - stage.setStatus("PENDING"); - } - - // 如果是Shell任务,获取输出 - if (element instanceof ServiceTask && "shellTask".equals(((ServiceTask) element).getType())) { - String executionId = historicActivity.getExecutionId(); - if (executionId != null) { - stage.setOutput((String) variables.get(executionId + "_shellOutput")); - stage.setError((String) variables.get(executionId + "_shellError")); - Integer exitCode = (Integer) variables.get(executionId + "_exitCode"); - stage.setExitCode(exitCode != null ? exitCode : -1); - } - } - } else { - // 节点尚未执行 - stage.setStatus("PENDING"); - } - - stages.add(stage); - } - } - dto.setStages(stages); - - return dto; + return executionDTO; } catch (Exception e) { log.error("Failed to get workflow execution: {}", processInstanceId, e); throw new RuntimeException("Failed to get workflow execution", e); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java new file mode 100644 index 00000000..02a13da5 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java @@ -0,0 +1,158 @@ +package com.qqchen.deploy.backend.workflow.service.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus; +import com.qqchen.deploy.backend.workflow.repository.WorkflowInstanceRepository; +import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.HistoryService; +import org.flowable.engine.RuntimeService; +import org.flowable.engine.history.HistoricActivityInstance; +import org.flowable.engine.runtime.ProcessInstance; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService { + + private final WorkflowInstanceRepository workflowInstanceRepository; + private final RuntimeService runtimeService; + private final HistoryService historyService; + private final ObjectMapper objectMapper; + + @Override + @Transactional + public WorkflowInstance createWorkflowInstance(ProcessInstance processInstance, Map variables) { + WorkflowInstance instance = new WorkflowInstance(); + instance.setProcessInstanceId(processInstance.getId()); + instance.setProcessDefinitionId(Long.valueOf(processInstance.getProcessDefinitionId().split(":")[0])); + instance.setBusinessKey(processInstance.getBusinessKey()); + instance.setStatus(WorkflowInstanceStatus.RUNNING); + instance.setStartTime(LocalDateTime.now()); + + try { + instance.setVariables(objectMapper.writeValueAsString(variables)); + } catch (JsonProcessingException e) { + log.error("Failed to serialize variables", e); + instance.setVariables("{}"); + } + + return workflowInstanceRepository.save(instance); + } + + @Override + @Transactional + public WorkflowInstance updateInstanceStatus(String processInstanceId, WorkflowInstanceStatus status) { + WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId) + .orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId)); + + instance.setStatus(status); + if (status == WorkflowInstanceStatus.COMPLETED || status == WorkflowInstanceStatus.FAILED) { + instance.setEndTime(LocalDateTime.now()); + } + + return workflowInstanceRepository.save(instance); + } + + @Override + public WorkflowInstanceDTO getInstanceDetails(String processInstanceId) { + WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId) + .orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId)); + + WorkflowInstanceDTO dto = new WorkflowInstanceDTO(); + dto.setId(instance.getProcessInstanceId()); + dto.setProcessDefinitionId(String.valueOf(instance.getProcessDefinitionId())); + dto.setBusinessKey(instance.getBusinessKey()); + dto.setStatus(instance.getStatus()); + + // Convert LocalDateTime to Date + dto.setStartTime(java.sql.Timestamp.valueOf(instance.getStartTime())); + if (instance.getEndTime() != null) { + dto.setEndTime(java.sql.Timestamp.valueOf(instance.getEndTime())); + } + + // Calculate duration if both start and end time exist + if (instance.getStartTime() != null && instance.getEndTime() != null) { + dto.setDurationInMillis(java.time.Duration.between(instance.getStartTime(), instance.getEndTime()).toMillis()); + } + + // Get variables + try { + @SuppressWarnings("unchecked") + Map variables = objectMapper.readValue(instance.getVariables(), Map.class); + dto.setVariables(variables); + } catch (JsonProcessingException e) { + log.error("Failed to deserialize variables", e); + dto.setVariables(new HashMap<>()); + } + + // Get activity instances + List activities = historyService.createHistoricActivityInstanceQuery() + .processInstanceId(processInstanceId) + .orderByHistoricActivityInstanceStartTime().asc() + .list(); + + dto.setActivities(activities.stream().map(this::convertToActivityInstance).collect(Collectors.toList())); + + return dto; + } + + @Override + public List findByBusinessKey(String businessKey) { + return workflowInstanceRepository.findByBusinessKey(businessKey).stream() + .map(instance -> getInstanceDetails(instance.getProcessInstanceId())) + .collect(Collectors.toList()); + } + + @Override + @Transactional + public void completeInstance(String processInstanceId, Map variables) { + // Update Flowable variables + runtimeService.setVariables(processInstanceId, variables); + + // Update our instance + WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId) + .orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId)); + + try { + instance.setVariables(objectMapper.writeValueAsString(variables)); + } catch (JsonProcessingException e) { + log.error("Failed to serialize variables", e); + } + + instance.setStatus(WorkflowInstanceStatus.COMPLETED); + instance.setEndTime(LocalDateTime.now()); + workflowInstanceRepository.save(instance); + } + + private WorkflowInstanceDTO.ActivityInstance convertToActivityInstance(HistoricActivityInstance hai) { + WorkflowInstanceDTO.ActivityInstance ai = new WorkflowInstanceDTO.ActivityInstance(); + ai.setId(hai.getId()); + ai.setActivityId(hai.getActivityId()); + ai.setActivityName(hai.getActivityName()); + ai.setActivityType(hai.getActivityType()); + ai.setStartTime(hai.getStartTime()); + ai.setEndTime(hai.getEndTime()); + ai.setDurationInMillis(hai.getDurationInMillis()); + + // Get shell task execution details from variables if available + if ("serviceTask".equals(hai.getActivityType())) { + Map variables = runtimeService.getVariables(hai.getExecutionId()); + ai.setShellOutput((String) variables.get("shellOutput")); + ai.setShellError((String) variables.get("shellError")); + ai.setShellExitCode((Integer) variables.get("shellExitCode")); + } + + return ai; + } +} diff --git a/backend/src/main/resources/application.yml b/backend/src/main/resources/application.yml index 78f7b1ff..87a98a71 100644 --- a/backend/src/main/resources/application.yml +++ b/backend/src/main/resources/application.yml @@ -2,7 +2,7 @@ server: port: 8080 spring: datasource: - url: jdbc:mysql://localhost:3306/deploy-ease-platform?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true + url: jdbc:mysql://localhost:3306/deploy-ease-platform?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true&createDatabaseIfNotExist=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver