删除掉所有没用的工作流相关的类,重新开发。

This commit is contained in:
dengqichen 2024-12-10 11:12:28 +08:00
parent 3484ace4bf
commit 26b0fb6a3e
12 changed files with 779 additions and 229 deletions

View File

@ -882,3 +882,32 @@ mvn spring-boot:run
## 许可证
[MIT License](LICENSE)
```
workflow_definition工作流定义表
存储我们自定义的工作流定义信息
包含流程名称、标识、版本、BPMN XML内容等
与Flowable的act_re_procdef表是一对一的关系
额外存储了图形编辑器的JSON数据方便前端展示和编辑
workflow_instance工作流实例表
记录工作流的执行实例信息
存储流程实例ID、业务标识、状态、变量等
与Flowable的act_ru_execution表是一对一的关系
提供了更多的业务字段,如开始时间、结束时间等
workflow_node_instance工作流节点实例表
记录工作流中每个节点的执行情况
存储节点ID、名称、类型、状态等
与Flowable的act_ru_task和act_hi_actinst表有关联
特别记录了Shell任务的执行结果、错误信息等
workflow_log工作流日志表
记录工作流执行过程中的详细日志
可以关联到具体的实例和节点
包含日志类型、级别、内容等
方便排查问题和审计
这些表与Flowable的原生表的主要区别是
更贴近业务需求,字段设计更符合我们的使用场景
提供了更丰富的元数据和状态信息
支持更细粒度的日志记录
可以存储自定义的业务数据
这样的设计让我们可以在使用Flowable强大的工作流引擎的同时也能满足特定的业务需求。

View File

@ -8,6 +8,7 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery;
import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService;
import io.swagger.v3.oas.annotations.Operation;
@ -103,11 +104,11 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
.processInstanceId(processInstanceId)
.includeProcessVariables()
.singleResult();
if (historicProcessInstance == null) {
return Response.error(ResponseCode.WORKFLOW_NOT_FOUND);
}
WorkflowInstanceDTO instanceDTO = new WorkflowInstanceDTO();
instanceDTO.setId(historicProcessInstance.getId());
instanceDTO.setProcessDefinitionId(historicProcessInstance.getProcessDefinitionId());
@ -116,16 +117,16 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
instanceDTO.setEndTime(historicProcessInstance.getEndTime());
instanceDTO.setDurationInMillis(historicProcessInstance.getDurationInMillis());
instanceDTO.setStartUserId(historicProcessInstance.getStartUserId());
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? "COMPLETED" : "RUNNING");
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.RUNNING);
instanceDTO.setVariables(historicProcessInstance.getProcessVariables());
// 查询活动节点历史
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
List<WorkflowInstanceDTO.ActivityInstance> activityInstances = activities.stream()
.map(activity -> {
WorkflowInstanceDTO.ActivityInstance activityInstance = new WorkflowInstanceDTO.ActivityInstance();
@ -136,7 +137,7 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
activityInstance.setStartTime(activity.getStartTime());
activityInstance.setEndTime(activity.getEndTime());
activityInstance.setDurationInMillis(activity.getDurationInMillis());
// 如果是Shell任务获取Shell相关变量
if ("serviceTask".equals(activity.getActivityType())) {
Map<String, Object> variables = historicProcessInstance.getProcessVariables();
@ -144,97 +145,97 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
activityInstance.setShellError((String) variables.get("shellError"));
activityInstance.setShellExitCode((Integer) variables.get("shellExitCode"));
}
return activityInstance;
})
.collect(Collectors.toList());
instanceDTO.setActivities(activityInstances);
return Response.success(instanceDTO);
}
@Operation(summary = "获取工作流执行状态")
@GetMapping("/instance/{processInstanceId}/execution")
public ResponseEntity<WorkflowExecutionDTO> getWorkflowExecution(@PathVariable String processInstanceId) {
// 获取历史活动实例
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
// // 获取历史活动实例
// List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
// .processInstanceId(processInstanceId)
// .orderByHistoricActivityInstanceStartTime()
// .asc()
// .list();
//
// // 获取流程实例
// HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery()
// .processInstanceId(processInstanceId)
// .singleResult();
//
// if (processInstance == null) {
// return ResponseEntity.notFound().build();
// }
//
// // 获取所有流程变量
// List<HistoricVariableInstance> allVariables = historyService.createHistoricVariableInstanceQuery()
// .processInstanceId(processInstanceId)
// .list();
//
// // 构建变量映射
// Map<String, Object> variableMap = new HashMap<>();
// for (HistoricVariableInstance variable : allVariables) {
// variableMap.put(variable.getVariableName(), variable.getValue());
// }
//
// // 构建执行状态DTO
// WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO();
// executionDTO.setProcessInstanceId(processInstance.getId());
// executionDTO.setProcessDefinitionId(processInstance.getProcessDefinitionId());
//// executionDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
// executionDTO.setBusinessKey(processInstance.getBusinessKey());
//
// // 设置流程状态
// if (processInstance.getEndTime() != null) {
// executionDTO.setStatus(processInstance.getDeleteReason() == null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED);
// } else {
// executionDTO.setStatus(WorkflowInstanceStatus.RUNNING);
// }
//
// // 构建阶段列表
// List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
// for (HistoricActivityInstance activity : activities) {
// WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
// stage.setId(activity.getActivityId());
// stage.setName(activity.getActivityName());
// stage.setType(activity.getActivityType());
//// stage.setStartTime(activity.getStartTime() != null ? activity.getStartTime().toString() : null);
//// stage.setEndTime(activity.getEndTime() != null ? activity.getEndTime().toString() : null);
//
// // 设置阶段状态
// if (activity.getEndTime() == null) {
// stage.setStatus(WorkflowInstanceStatus.RUNNING);
// } else {
// stage.setStatus(activity.getDeleteReason() == null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED);
// }
//
// // 如果是Shell任务获取输出
// if ("serviceTask".equals(activity.getActivityType()) &&
// activity.getActivityId().toLowerCase().contains("shell")) {
//
// // 获取Shell任务的输出
// String output = (String) variableMap.get("shellOutput");
// String error = (String) variableMap.get("shellError");
// Integer exitCode = (Integer) variableMap.get("shellExitCode");
//
// stage.setOutput(output);
// stage.setError(error);
// stage.setExitCode(exitCode);
// }
//
// stages.add(stage);
// }
// executionDTO.setStages(stages);
// executionDTO.setVariables(variableMap);
// 获取流程实例
HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
return ResponseEntity.notFound().build();
}
// 获取所有流程变量
List<HistoricVariableInstance> allVariables = historyService.createHistoricVariableInstanceQuery()
.processInstanceId(processInstanceId)
.list();
// 构建变量映射
Map<String, Object> variableMap = new HashMap<>();
for (HistoricVariableInstance variable : allVariables) {
variableMap.put(variable.getVariableName(), variable.getValue());
}
// 构建执行状态DTO
WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO();
executionDTO.setProcessInstanceId(processInstance.getId());
executionDTO.setProcessDefinitionId(processInstance.getProcessDefinitionId());
// executionDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
executionDTO.setBusinessKey(processInstance.getBusinessKey());
// 设置流程状态
if (processInstance.getEndTime() != null) {
executionDTO.setStatus(processInstance.getDeleteReason() == null ? "COMPLETED" : "FAILED");
} else {
executionDTO.setStatus("RUNNING");
}
// 构建阶段列表
List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
for (HistoricActivityInstance activity : activities) {
WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
stage.setId(activity.getActivityId());
stage.setName(activity.getActivityName());
stage.setType(activity.getActivityType());
// stage.setStartTime(activity.getStartTime() != null ? activity.getStartTime().toString() : null);
// stage.setEndTime(activity.getEndTime() != null ? activity.getEndTime().toString() : null);
// 设置阶段状态
if (activity.getEndTime() == null) {
stage.setStatus("RUNNING");
} else {
stage.setStatus(activity.getDeleteReason() == null ? "COMPLETED" : "FAILED");
}
// 如果是Shell任务获取输出
if ("serviceTask".equals(activity.getActivityType()) &&
activity.getActivityId().toLowerCase().contains("shell")) {
// 获取Shell任务的输出
String output = (String) variableMap.get("shellOutput");
String error = (String) variableMap.get("shellError");
Integer exitCode = (Integer) variableMap.get("shellExitCode");
stage.setOutput(output);
stage.setError(error);
stage.setExitCode(exitCode);
}
stages.add(stage);
}
executionDTO.setStages(stages);
executionDTO.setVariables(variableMap);
return ResponseEntity.ok(executionDTO);
return ResponseEntity.ok(workflowDefinitionService.getWorkflowExecution(processInstanceId));
}
@Operation(summary = "查询工作流实例列表")
@ -261,7 +262,7 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
instanceDTO.setEndTime(historicProcessInstance.getEndTime());
instanceDTO.setDurationInMillis(historicProcessInstance.getDurationInMillis());
instanceDTO.setStartUserId(historicProcessInstance.getStartUserId());
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? "COMPLETED" : "RUNNING");
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.RUNNING);
instanceDTO.setVariables(historicProcessInstance.getProcessVariables());
return instanceDTO;
})
@ -270,6 +271,53 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
return Response.success(instanceDTOs);
}
@Operation(summary = "获取节点实时日志")
@GetMapping("/instance/{processInstanceId}/node/{nodeId}/logs")
public Response<Map<String, Object>> getNodeLogs(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId,
@Parameter(description = "节点ID", required = true) @PathVariable String nodeId
) {
Map<String, Object> result = new HashMap<>();
// 获取历史活动实例
HistoricActivityInstance activity = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.activityId(nodeId)
.singleResult();
if (activity == null) {
return Response.error(ResponseCode.WORKFLOW_NODE_NOT_FOUND);
}
// 获取执行ID
String executionId = activity.getExecutionId();
// 获取节点相关的变量
List<HistoricVariableInstance> variables = historyService.createHistoricVariableInstanceQuery()
.processInstanceId(processInstanceId)
.list();
// 收集日志信息
variables.forEach(variable -> {
String variableName = variable.getVariableName();
if (variableName.startsWith(executionId)) {
// 移除executionId前缀
String key = variableName.substring(executionId.length() + 1); // +1 for the underscore
result.put(key, variable.getValue());
}
});
// 添加节点基本信息
result.put("nodeId", nodeId);
result.put("nodeName", activity.getActivityName());
result.put("nodeType", activity.getActivityType());
result.put("startTime", activity.getStartTime());
result.put("endTime", activity.getEndTime());
result.put("status", activity.getEndTime() == null ? "RUNNING" : "COMPLETED");
return Response.success(result);
}
@Override
protected void exportData(HttpServletResponse response, List<WorkflowDefinitionDTO> data) {

View File

@ -4,8 +4,10 @@ import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.RuntimeService;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import jakarta.annotation.Resource;
import java.io.BufferedReader;
import java.io.File;
@ -16,6 +18,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
/**
* Shell任务委托执行器
@ -24,10 +27,17 @@ import java.util.concurrent.TimeUnit;
@Component
public class ShellTaskDelegate implements JavaDelegate {
@Resource
private RuntimeService runtimeService;
private Expression script;
private Expression workDir;
private Expression env;
// 用于存储实时输出的Map
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
@Override
public void execute(DelegateExecution execution) {
// 从字段注入中获取值
@ -54,13 +64,40 @@ public class ShellTaskDelegate implements JavaDelegate {
}
try {
// 初始化输出缓存
String executionId = execution.getId();
outputMap.put(executionId, new StringBuilder());
errorMap.put(executionId, new StringBuilder());
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("bash", "-c", scriptValue);
// 根据操作系统选择合适的shell
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
// Windows系统使用cmd
processBuilder.command("cmd", "/c", scriptValue);
} else {
// Unix-like系统使用bash
processBuilder.command("bash", "-c", scriptValue);
}
// 设置工作目录
if (StringUtils.hasText(workDirValue)) {
processBuilder.directory(new File(workDirValue));
// Windows系统路径处理
if (os.contains("win")) {
// 确保使用Windows风格的路径分隔符
workDirValue = workDirValue.replace("/", "\\");
// 如果路径以\开头去掉第一个\
if (workDirValue.startsWith("\\")) {
workDirValue = workDirValue.substring(1);
}
}
File workDirFile = new File(workDirValue);
if (!workDirFile.exists()) {
workDirFile.mkdirs();
}
processBuilder.directory(workDirFile);
}
// 设置环境变量
@ -76,9 +113,6 @@ public class ShellTaskDelegate implements JavaDelegate {
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
StringBuilder output = new StringBuilder();
StringBuilder error = new StringBuilder();
// 创建线程池处理输出
ExecutorService executorService = Executors.newFixedThreadPool(2);
@ -87,11 +121,15 @@ public class ShellTaskDelegate implements JavaDelegate {
String line;
try {
while ((line = reader.readLine()) != null) {
StringBuilder output = outputMap.get(executionId);
synchronized (output) {
output.append(line).append("\n");
}
log.info("Shell output: {}", line);
Thread.sleep(500); // 增加延迟让输出更容易观察
// 使用RuntimeService更新变量
runtimeService.setVariable(execution.getProcessInstanceId(),
executionId + "_shellOutput", output.toString());
Thread.sleep(500);
}
} catch (Exception e) {
log.error("Error reading process output", e);
@ -103,27 +141,20 @@ public class ShellTaskDelegate implements JavaDelegate {
String line;
try {
while ((line = errorReader.readLine()) != null) {
StringBuilder error = errorMap.get(executionId);
synchronized (error) {
error.append(line).append("\n");
}
log.error("Shell error: {}", line);
// 使用RuntimeService更新变量
runtimeService.setVariable(execution.getProcessInstanceId(),
executionId + "_shellError", error.toString());
}
} catch (IOException e) {
log.error("Error reading process error", e);
}
});
// 定期更新变量
while (!outputFuture.isDone() || !errorFuture.isDone()) {
synchronized (output) {
execution.setVariable("shellOutput", output.toString());
}
synchronized (error) {
execution.setVariable("shellError", error.toString());
}
Thread.sleep(1000); // 每秒更新一次变量
}
// 等待进程完成
int exitCode = process.waitFor();
@ -135,22 +166,25 @@ public class ShellTaskDelegate implements JavaDelegate {
executorService.shutdown();
// 设置最终结果
synchronized (output) {
execution.setVariable("shellOutput", output.toString());
}
synchronized (error) {
execution.setVariable("shellError", error.toString());
}
StringBuilder finalOutput = outputMap.get(executionId);
StringBuilder finalError = errorMap.get(executionId);
execution.setVariable("shellOutput", finalOutput.toString());
execution.setVariable("shellError", finalError.toString());
execution.setVariable("shellExitCode", exitCode);
// 清理缓存
outputMap.remove(executionId);
errorMap.remove(executionId);
if (exitCode != 0) {
log.error("Shell script execution failed with exit code: {}", exitCode);
log.error("Error output: {}", error);
log.error("Error output: {}", finalError);
throw new RuntimeException("Shell script execution failed with exit code: " + exitCode);
}
log.info("Shell script executed successfully");
log.debug("Script output: {}", output);
log.debug("Script output: {}", finalOutput);
} catch (Exception e) {
log.error("Shell script execution failed", e);

View File

@ -1,5 +1,6 @@
package com.qqchen.deploy.backend.workflow.dto;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import lombok.Data;
import java.util.Date;
import java.util.List;
@ -28,9 +29,9 @@ public class WorkflowExecutionDTO {
private String businessKey;
/**
* 流程状态RUNNING, SUSPENDED, COMPLETED
* 流程状态
*/
private String status;
private WorkflowInstanceStatus status;
/**
* 当前活动节点ID
@ -90,9 +91,9 @@ public class WorkflowExecutionDTO {
private String type;
/**
* 节点状态PENDING, RUNNING, COMPLETED, FAILED
* 节点状态
*/
private String status;
private WorkflowInstanceStatus status;
/**
* 开始时间

View File

@ -1,5 +1,6 @@
package com.qqchen.deploy.backend.workflow.dto;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.Date;
@ -31,7 +32,7 @@ public class WorkflowInstanceDTO {
private String startUserId;
@Schema(description = "状态(RUNNING/COMPLETED/FAILED)")
private String status;
private WorkflowInstanceStatus status;
@Data
@Schema(description = "活动节点信息")

View File

@ -1,7 +1,10 @@
package com.qqchen.deploy.backend.workflow.entity;
import com.qqchen.deploy.backend.framework.domain.Entity;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import jakarta.persistence.Column;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Table;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -39,7 +42,8 @@ public class WorkflowInstance extends Entity<Long> {
* 实例状态
*/
@Column(nullable = false)
private String status;
@Enumerated(EnumType.STRING)
private WorkflowInstanceStatus status;
/**
* 流程变量(JSON)

View File

@ -0,0 +1,110 @@
package com.qqchen.deploy.backend.workflow.enums;
import lombok.Getter;
/**
* 工作流实例状态枚举
* 工作流实例的生命周期
* NOT_STARTED -> CREATED -> RUNNING -> (SUSPENDED) -> COMPLETED/TERMINATED/FAILED
*/
@Getter
public enum WorkflowInstanceStatus {
/**
* 未开始节点尚未开始执行
*/
NOT_STARTED("NOT_STARTED", "未开始"),
/**
* 已创建流程实例已创建但还未开始运行
*/
CREATED("CREATED", "已创建"),
/**
* 运行中流程实例正在执行
*/
RUNNING("RUNNING", "运行中"),
/**
* 已暂停流程实例被手动暂停
*/
SUSPENDED("SUSPENDED", "已暂停"),
/**
* 已完成流程实例正常完成
*/
COMPLETED("COMPLETED", "已完成"),
/**
* 已终止流程实例被手动终止
*/
TERMINATED("TERMINATED", "已终止"),
/**
* 执行失败流程实例执行过程中发生错误
*/
FAILED("FAILED", "执行失败");
/**
* 状态编码
*/
private final String code;
/**
* 状态描述
*/
private final String description;
WorkflowInstanceStatus(String code, String description) {
this.code = code;
this.description = description;
}
/**
* 根据状态编码获取枚举实例
*
* @param code 状态编码
* @return 对应的枚举实例
*/
public static WorkflowInstanceStatus fromCode(String code) {
for (WorkflowInstanceStatus status : WorkflowInstanceStatus.values()) {
if (status.getCode().equals(code)) {
return status;
}
}
throw new IllegalArgumentException("Invalid workflow instance status code: " + code);
}
/**
* 判断是否为终态不可再变化的状态
*/
public boolean isFinalState() {
return this == COMPLETED || this == TERMINATED || this == FAILED;
}
/**
* 判断是否可以暂停
*/
public boolean canBeSuspended() {
return this == RUNNING;
}
/**
* 判断是否可以恢复
*/
public boolean canBeResumed() {
return this == SUSPENDED;
}
/**
* 判断是否可以终止
*/
public boolean canBeTerminated() {
return this == RUNNING || this == SUSPENDED;
}
@Override
public String toString() {
return this.code;
}
}

View File

@ -0,0 +1,27 @@
package com.qqchen.deploy.backend.workflow.repository;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Optional;
@Repository
public interface WorkflowInstanceRepository extends JpaRepository<WorkflowInstance, Long> {
/**
* 根据Flowable流程实例ID查询工作流实例
*/
Optional<WorkflowInstance> findByProcessInstanceId(String processInstanceId);
/**
* 根据业务标识查询工作流实例列表
*/
List<WorkflowInstance> findByBusinessKey(String businessKey);
/**
* 根据流程定义ID查询工作流实例列表
*/
List<WorkflowInstance> findByProcessDefinitionId(Long processDefinitionId);
}

View File

@ -0,0 +1,54 @@
package com.qqchen.deploy.backend.workflow.service;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import org.flowable.engine.runtime.ProcessInstance;
import java.util.List;
import java.util.Map;
public interface IWorkflowInstanceService {
/**
* 创建工作流实例并关联Flowable实例
*
* @param processInstance Flowable流程实例
* @param variables 流程变量
* @return 工作流实例
*/
WorkflowInstance createWorkflowInstance(ProcessInstance processInstance, Map<String, Object> variables);
/**
* 更新工作流实例状态
*
* @param processInstanceId Flowable流程实例ID
* @param status 新状态
* @return 更新后的工作流实例
*/
WorkflowInstance updateInstanceStatus(String processInstanceId, WorkflowInstanceStatus status);
/**
* 获取工作流实例详情
*
* @param processInstanceId Flowable流程实例ID
* @return 工作流实例详情DTO
*/
WorkflowInstanceDTO getInstanceDetails(String processInstanceId);
/**
* 根据业务标识查询工作流实例
*
* @param businessKey 业务标识
* @return 工作流实例列表
*/
List<WorkflowInstanceDTO> findByBusinessKey(String businessKey);
/**
* 完成工作流实例
*
* @param processInstanceId Flowable流程实例ID
* @param variables 完成时的变量
*/
void completeInstance(String processInstanceId, Map<String, Object> variables);
}

View File

@ -5,6 +5,7 @@ import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService;
import jakarta.annotation.Resource;
@ -27,17 +28,22 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.flowable.variable.api.history.HistoricVariableInstance;
/**
* 工作流定义服务实现
*/
@Slf4j
@Service
public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefinition, WorkflowDefinitionDTO, Long>
implements IWorkflowDefinitionService {
implements IWorkflowDefinitionService {
@Resource
private RepositoryService repositoryService;
@ -57,15 +63,15 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
try {
// 部署流程
Deployment deployment = repositoryService.createDeployment()
.addString(dto.getName() + ".bpmn20.xml", dto.getBpmnXml())
.name(dto.getName())
.deploy();
.addString(dto.getName() + ".bpmn20.xml", dto.getBpmnXml())
.name(dto.getName())
.deploy();
log.info("Successfully deployed workflow: {}", dto.getName());
// 查找已存在的工作流定义
WorkflowDefinition definition = workflowDefinitionRepository.findById(dto.getId())
.orElseThrow(() -> new RuntimeException("Workflow definition not found: " + dto.getId()));
.orElseThrow(() -> new RuntimeException("Workflow definition not found: " + dto.getId()));
// 转换为DTO
dto.setFlowVersion(definition.getFlowVersion());
@ -83,10 +89,10 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
try {
// 1. 创建并异步启动流程实例
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey(processKey)
.businessKey(businessKey)
.variables(variables)
.startAsync(); // 异步启动会自动执行 shell 任务
.processDefinitionKey(processKey)
.businessKey(businessKey)
.variables(variables)
.startAsync(); // 异步启动会自动执行 shell 任务
// 2. 返回实例信息
WorkflowInstanceCreateDTO dto = new WorkflowInstanceCreateDTO();
@ -127,124 +133,202 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
@Override
public WorkflowExecutionDTO getWorkflowExecution(String processInstanceId) {
try {
// 1. 查询流程实例
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 1. 获取流程实例信息
ProcessInstance runningInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
HistoricProcessInstance historicInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (historicInstance == null) {
throw new RuntimeException("Process instance not found: " + processInstanceId);
}
// 2. 查询当前活动的节点
List<String> activeActivityIds = runtimeService.getActiveActivityIds(processInstanceId);
// 3. 获取历史活动节点
List<HistoricActivityInstance> historicActivities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
log.info("=== 开始获取工作流执行状态 ===");
log.info("流程实例ID: {}", processInstanceId);
log.info("流程定义ID: {}", historicInstance.getProcessDefinitionId());
log.info("是否有运行实例: {}", runningInstance != null);
// 4. 获取流程实例的历史信息
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 5. 获取流程定义模型
String processDefinitionId = processInstance.getProcessDefinitionId();
BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinitionId);
// 2. 获取流程定义
BpmnModel bpmnModel = repositoryService.getBpmnModel(historicInstance.getProcessDefinitionId());
Process process = bpmnModel.getMainProcess();
Collection<FlowElement> flowElements = process.getFlowElements();
// 6. 创建返回对象
WorkflowExecutionDTO dto = new WorkflowExecutionDTO();
dto.setProcessInstanceId(processInstanceId);
dto.setProcessDefinitionId(processInstance.getProcessDefinitionId());
dto.setProcessDefinitionName(processInstance.getProcessDefinitionName());
dto.setBusinessKey(processInstance.getBusinessKey());
dto.setStatus(processInstance.isSuspended() ? "SUSPENDED" :
(historicProcessInstance != null && historicProcessInstance.getEndTime() != null) ? "COMPLETED" : "RUNNING");
// 7. 设置时间信息
if (historicProcessInstance != null) {
dto.setStartTime(historicProcessInstance.getStartTime());
dto.setEndTime(historicProcessInstance.getEndTime());
dto.setDurationInMillis(historicProcessInstance.getDurationInMillis());
// 打印流程定义中的所有节点
log.info("=== 流程定义节点信息 ===");
process.getFlowElements().forEach(element -> {
log.info("节点信息 - ID: {}, 名称: {}, 类型: {}",
element.getId(),
element.getName(),
element.getClass().getSimpleName());
});
// 3. 获取所有历史活动实例
List<HistoricActivityInstance> historicActivities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
log.info("=== 历史活动实例信息 ===");
historicActivities.forEach(activity -> {
log.info("历史活动 - ID: {}, 名称: {}, 类型: {}, 开始时间: {}, 结束时间: {}",
activity.getActivityId(),
activity.getActivityName(),
activity.getActivityType(),
activity.getStartTime(),
activity.getEndTime());
});
// 4. 获取当前活动的节点
List<String> activeActivityIds = new ArrayList<>();
if (runningInstance != null) {
activeActivityIds = runtimeService.getActiveActivityIds(processInstanceId);
log.info("=== 当前活动节点 ===");
log.info("活动节点IDs: {}", activeActivityIds);
}
// 8. 获取流程变量
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
dto.setVariables(variables);
// 5. 获取所有流程变量
List<HistoricVariableInstance> variables = historyService.createHistoricVariableInstanceQuery()
.processInstanceId(processInstanceId)
.list();
// 9. 设置当前活动节点
log.info("=== 流程变量信息 ===");
Map<String, Object> variableMap = new HashMap<>();
variables.forEach(variable -> {
variableMap.put(variable.getVariableName(), variable.getValue());
log.info("变量 - 名称: {}, 值: {}", variable.getVariableName(), variable.getValue());
});
// 6. 构建执行状态DTO
WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO();
executionDTO.setProcessInstanceId(processInstanceId);
executionDTO.setProcessDefinitionId(historicInstance.getProcessDefinitionId());
executionDTO.setProcessDefinitionName(historicInstance.getProcessDefinitionName());
executionDTO.setBusinessKey(historicInstance.getBusinessKey());
executionDTO.setStartTime(historicInstance.getStartTime());
executionDTO.setEndTime(historicInstance.getEndTime());
executionDTO.setDurationInMillis(historicInstance.getDurationInMillis());
executionDTO.setVariables(variableMap);
// 设置状态
if (historicInstance.getEndTime() != null) {
executionDTO.setStatus(historicInstance.getDeleteReason() == null ?
WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED);
} else {
executionDTO.setStatus(runningInstance != null && runningInstance.isSuspended() ?
WorkflowInstanceStatus.SUSPENDED : WorkflowInstanceStatus.RUNNING);
}
// 7. 创建历史活动实例的映射用于快速查找
Map<String, HistoricActivityInstance> historicActivityMap = historicActivities.stream()
.collect(Collectors.toMap(
HistoricActivityInstance::getActivityId,
activity -> activity,
(existing, replacement) -> existing
));
// 8. 获取所有流程节点并构建阶段列表
List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
Collection<FlowElement> flowElements = process.getFlowElements();
// 记录已执行过的节点ID
Set<String> executedActivityIds = historicActivities.stream()
.map(HistoricActivityInstance::getActivityId)
.collect(Collectors.toSet());
// 获取所有节点的列表按照流程定义的顺序
List<FlowElement> orderedElements = new ArrayList<>();
for (FlowElement element : flowElements) {
if (element instanceof Task || element instanceof Event || element instanceof ServiceTask) {
orderedElements.add(element);
}
}
// 遍历所有节点
for (int i = 0; i < orderedElements.size(); i++) {
FlowElement element = orderedElements.get(i);
WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
stage.setId(element.getId());
stage.setName(element.getName());
// 设置节点类型
if (element instanceof ServiceTask) {
stage.setType("serviceTask");
} else if (element instanceof Event) {
stage.setType("startEvent");
if (element.getId().equals("end")) {
stage.setType("endEvent");
}
} else {
stage.setType(element.getClass().getSimpleName());
}
// 获取历史活动实例
HistoricActivityInstance historicActivity = historicActivityMap.get(element.getId());
// 设置开始和结束时间
if (historicActivity != null) {
stage.setStartTime(historicActivity.getStartTime());
stage.setEndTime(historicActivity.getEndTime());
}
// 设置节点状态
if (historicActivity != null) {
if (historicActivity.getEndTime() != null) {
// 已完成的节点
stage.setStatus(historicActivity.getDeleteReason() == null ?
WorkflowInstanceStatus.COMPLETED : WorkflowInstanceStatus.FAILED);
} else if (activeActivityIds.contains(element.getId())) {
// 正在执行的节点
stage.setStatus(WorkflowInstanceStatus.RUNNING);
} else {
// 已开始但未完成的节点
stage.setStatus(WorkflowInstanceStatus.RUNNING);
}
} else {
// 未执行的节点
if (i == 0 && executedActivityIds.isEmpty()) {
// 如果是第一个节点且没有任何节点执行过设置为RUNNING
stage.setStatus(WorkflowInstanceStatus.RUNNING);
} else if (i > 0 && stages.get(i-1).getStatus() == WorkflowInstanceStatus.COMPLETED) {
// 如果前一个节点已完成当前节点设置为RUNNING
stage.setStatus(WorkflowInstanceStatus.RUNNING);
} else {
// 其他情况设置为NOT_STARTED
stage.setStatus(WorkflowInstanceStatus.NOT_STARTED);
}
}
// 如果是Shell任务获取执行结果
if (element instanceof ServiceTask) {
ServiceTask serviceTask = (ServiceTask) element;
if ("shell".equals(serviceTask.getType()) && historicActivity != null) {
String executionId = historicActivity.getExecutionId();
stage.setOutput((String) variableMap.get(executionId + "_shellOutput"));
stage.setError((String) variableMap.get(executionId + "_shellError"));
stage.setExitCode((Integer) variableMap.get(executionId + "_exitCode"));
}
}
stages.add(stage);
}
executionDTO.setStages(stages);
// 10. 设置当前活动节点
if (!activeActivityIds.isEmpty()) {
String currentActivityId = activeActivityIds.get(0);
dto.setCurrentActivityId(currentActivityId);
// 从流程定义中找到当前节点的名称和类型
FlowElement currentElement = process.getFlowElement(currentActivityId);
if (currentElement != null) {
dto.setCurrentActivityName(currentElement.getName());
dto.setCurrentActivityType(currentElement.getClass().getSimpleName());
executionDTO.setCurrentActivityId(currentActivityId);
executionDTO.setCurrentActivityName(currentElement.getName());
executionDTO.setCurrentActivityType(currentElement.getClass().getSimpleName());
}
}
// 10. 创建历史活动实例的映射用于快速查找
Map<String, HistoricActivityInstance> historicActivityMap = historicActivities.stream()
.collect(Collectors.toMap(
HistoricActivityInstance::getActivityId,
activity -> activity,
(existing, replacement) -> existing // 如果有重复保留第一个
));
// 11. 设置所有节点信息
List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
for (FlowElement element : flowElements) {
// 只处理任务节点和事件节点
if (element instanceof Task || element instanceof Event) {
WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
stage.setId(element.getId());
stage.setName(element.getName());
stage.setType(element.getClass().getSimpleName());
// 获取历史活动实例如果存在
HistoricActivityInstance historicActivity = historicActivityMap.get(element.getId());
if (historicActivity != null) {
// 节点已经执行过
stage.setStartTime(historicActivity.getStartTime());
stage.setEndTime(historicActivity.getEndTime());
if (historicActivity.getEndTime() != null) {
stage.setStatus("COMPLETED");
} else if (activeActivityIds.contains(element.getId())) {
stage.setStatus("RUNNING");
} else {
stage.setStatus("PENDING");
}
// 如果是Shell任务获取输出
if (element instanceof ServiceTask && "shellTask".equals(((ServiceTask) element).getType())) {
String executionId = historicActivity.getExecutionId();
if (executionId != null) {
stage.setOutput((String) variables.get(executionId + "_shellOutput"));
stage.setError((String) variables.get(executionId + "_shellError"));
Integer exitCode = (Integer) variables.get(executionId + "_exitCode");
stage.setExitCode(exitCode != null ? exitCode : -1);
}
}
} else {
// 节点尚未执行
stage.setStatus("PENDING");
}
stages.add(stage);
}
}
dto.setStages(stages);
return dto;
return executionDTO;
} catch (Exception e) {
log.error("Failed to get workflow execution: {}", processInstanceId, e);
throw new RuntimeException("Failed to get workflow execution", e);

View File

@ -0,0 +1,158 @@
package com.qqchen.deploy.backend.workflow.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatus;
import com.qqchen.deploy.backend.workflow.repository.WorkflowInstanceRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.runtime.ProcessInstance;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService {
private final WorkflowInstanceRepository workflowInstanceRepository;
private final RuntimeService runtimeService;
private final HistoryService historyService;
private final ObjectMapper objectMapper;
@Override
@Transactional
public WorkflowInstance createWorkflowInstance(ProcessInstance processInstance, Map<String, Object> variables) {
WorkflowInstance instance = new WorkflowInstance();
instance.setProcessInstanceId(processInstance.getId());
instance.setProcessDefinitionId(Long.valueOf(processInstance.getProcessDefinitionId().split(":")[0]));
instance.setBusinessKey(processInstance.getBusinessKey());
instance.setStatus(WorkflowInstanceStatus.RUNNING);
instance.setStartTime(LocalDateTime.now());
try {
instance.setVariables(objectMapper.writeValueAsString(variables));
} catch (JsonProcessingException e) {
log.error("Failed to serialize variables", e);
instance.setVariables("{}");
}
return workflowInstanceRepository.save(instance);
}
@Override
@Transactional
public WorkflowInstance updateInstanceStatus(String processInstanceId, WorkflowInstanceStatus status) {
WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId)
.orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId));
instance.setStatus(status);
if (status == WorkflowInstanceStatus.COMPLETED || status == WorkflowInstanceStatus.FAILED) {
instance.setEndTime(LocalDateTime.now());
}
return workflowInstanceRepository.save(instance);
}
@Override
public WorkflowInstanceDTO getInstanceDetails(String processInstanceId) {
WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId)
.orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId));
WorkflowInstanceDTO dto = new WorkflowInstanceDTO();
dto.setId(instance.getProcessInstanceId());
dto.setProcessDefinitionId(String.valueOf(instance.getProcessDefinitionId()));
dto.setBusinessKey(instance.getBusinessKey());
dto.setStatus(instance.getStatus());
// Convert LocalDateTime to Date
dto.setStartTime(java.sql.Timestamp.valueOf(instance.getStartTime()));
if (instance.getEndTime() != null) {
dto.setEndTime(java.sql.Timestamp.valueOf(instance.getEndTime()));
}
// Calculate duration if both start and end time exist
if (instance.getStartTime() != null && instance.getEndTime() != null) {
dto.setDurationInMillis(java.time.Duration.between(instance.getStartTime(), instance.getEndTime()).toMillis());
}
// Get variables
try {
@SuppressWarnings("unchecked")
Map<String, Object> variables = objectMapper.readValue(instance.getVariables(), Map.class);
dto.setVariables(variables);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize variables", e);
dto.setVariables(new HashMap<>());
}
// Get activity instances
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime().asc()
.list();
dto.setActivities(activities.stream().map(this::convertToActivityInstance).collect(Collectors.toList()));
return dto;
}
@Override
public List<WorkflowInstanceDTO> findByBusinessKey(String businessKey) {
return workflowInstanceRepository.findByBusinessKey(businessKey).stream()
.map(instance -> getInstanceDetails(instance.getProcessInstanceId()))
.collect(Collectors.toList());
}
@Override
@Transactional
public void completeInstance(String processInstanceId, Map<String, Object> variables) {
// Update Flowable variables
runtimeService.setVariables(processInstanceId, variables);
// Update our instance
WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId)
.orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId));
try {
instance.setVariables(objectMapper.writeValueAsString(variables));
} catch (JsonProcessingException e) {
log.error("Failed to serialize variables", e);
}
instance.setStatus(WorkflowInstanceStatus.COMPLETED);
instance.setEndTime(LocalDateTime.now());
workflowInstanceRepository.save(instance);
}
private WorkflowInstanceDTO.ActivityInstance convertToActivityInstance(HistoricActivityInstance hai) {
WorkflowInstanceDTO.ActivityInstance ai = new WorkflowInstanceDTO.ActivityInstance();
ai.setId(hai.getId());
ai.setActivityId(hai.getActivityId());
ai.setActivityName(hai.getActivityName());
ai.setActivityType(hai.getActivityType());
ai.setStartTime(hai.getStartTime());
ai.setEndTime(hai.getEndTime());
ai.setDurationInMillis(hai.getDurationInMillis());
// Get shell task execution details from variables if available
if ("serviceTask".equals(hai.getActivityType())) {
Map<String, Object> variables = runtimeService.getVariables(hai.getExecutionId());
ai.setShellOutput((String) variables.get("shellOutput"));
ai.setShellError((String) variables.get("shellError"));
ai.setShellExitCode((Integer) variables.get("shellExitCode"));
}
return ai;
}
}

View File

@ -2,7 +2,7 @@ server:
port: 8080
spring:
datasource:
url: jdbc:mysql://localhost:3306/deploy-ease-platform?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
url: jdbc:mysql://localhost:3306/deploy-ease-platform?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true&createDatabaseIfNotExist=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver