的撒的

This commit is contained in:
戚辰先生 2024-12-09 23:16:51 +08:00
parent 01ab6721c6
commit 3484ace4bf
17 changed files with 983 additions and 108 deletions

View File

@ -70,6 +70,13 @@
<artifactId>mysql-connector-j</artifactId> <artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<!-- Hibernate Types -->
<dependency>
<groupId>com.vladmihalcea</groupId>
<artifactId>hibernate-types-60</artifactId>
<version>2.21.1</version>
</dependency>
<!-- QueryDSL --> <!-- QueryDSL -->
<dependency> <dependency>

View File

@ -0,0 +1,277 @@
package com.qqchen.deploy.backend.workflow.api;
import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.framework.controller.BaseController;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery;
import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.variable.api.history.HistoricVariableInstance;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 工作流定义控制器
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/workflow/definition")
@Tag(name = "工作流定义管理", description = "工作流定义管理相关接口")
public class WorkflowDefinitionApiController extends BaseController<WorkflowDefinition, WorkflowDefinitionDTO, Long, WorkflowDefinitionQuery> {
@Resource
private IWorkflowDefinitionService workflowDefinitionService;
@Resource
private RuntimeService runtimeService;
@Resource
private HistoryService historyService;
@Operation(summary = "部署工作流")
@PostMapping("/deploy")
public Response<WorkflowDefinitionDTO> deployWorkflow(@RequestBody WorkflowDefinitionDTO dto) {
return Response.success(workflowDefinitionService.deployWorkflow(dto));
}
@Operation(summary = "启动工作流实例")
@PostMapping("/start")
public Response<WorkflowInstanceCreateDTO> startWorkflow(
@Parameter(description = "流程标识", required = true) @RequestParam String processKey,
@Parameter(description = "业务标识", required = true) @RequestParam String businessKey
) {
Map<String, Object> variables = new HashMap<>();
try {
// 同步创建实例立即返回实例ID
WorkflowInstanceCreateDTO result = workflowDefinitionService.startWorkflow(processKey, businessKey, variables);
return Response.success(result);
} catch (Exception e) {
log.error("Failed to start workflow", e);
return Response.error(ResponseCode.WORKFLOW_EXECUTION_ERROR);
}
}
@Operation(summary = "挂起工作流实例")
@PostMapping("/{processInstanceId}/suspend")
public Response<Void> suspendWorkflow(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId
) {
workflowDefinitionService.suspendWorkflow(processInstanceId);
return Response.success();
}
@Operation(summary = "恢复工作流实例")
@PostMapping("/{processInstanceId}/resume")
public Response<Void> resumeWorkflow(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId
) {
workflowDefinitionService.resumeWorkflow(processInstanceId);
return Response.success();
}
@Operation(summary = "查询工作流实例")
@GetMapping("/instance/{processInstanceId}")
public Response<WorkflowInstanceDTO> getWorkflowInstance(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId
) {
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.includeProcessVariables()
.singleResult();
if (historicProcessInstance == null) {
return Response.error(ResponseCode.WORKFLOW_NOT_FOUND);
}
WorkflowInstanceDTO instanceDTO = new WorkflowInstanceDTO();
instanceDTO.setId(historicProcessInstance.getId());
instanceDTO.setProcessDefinitionId(historicProcessInstance.getProcessDefinitionId());
instanceDTO.setBusinessKey(historicProcessInstance.getBusinessKey());
instanceDTO.setStartTime(historicProcessInstance.getStartTime());
instanceDTO.setEndTime(historicProcessInstance.getEndTime());
instanceDTO.setDurationInMillis(historicProcessInstance.getDurationInMillis());
instanceDTO.setStartUserId(historicProcessInstance.getStartUserId());
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? "COMPLETED" : "RUNNING");
instanceDTO.setVariables(historicProcessInstance.getProcessVariables());
// 查询活动节点历史
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
List<WorkflowInstanceDTO.ActivityInstance> activityInstances = activities.stream()
.map(activity -> {
WorkflowInstanceDTO.ActivityInstance activityInstance = new WorkflowInstanceDTO.ActivityInstance();
activityInstance.setId(activity.getId());
activityInstance.setActivityId(activity.getActivityId());
activityInstance.setActivityName(activity.getActivityName());
activityInstance.setActivityType(activity.getActivityType());
activityInstance.setStartTime(activity.getStartTime());
activityInstance.setEndTime(activity.getEndTime());
activityInstance.setDurationInMillis(activity.getDurationInMillis());
// 如果是Shell任务获取Shell相关变量
if ("serviceTask".equals(activity.getActivityType())) {
Map<String, Object> variables = historicProcessInstance.getProcessVariables();
activityInstance.setShellOutput((String) variables.get("shellOutput"));
activityInstance.setShellError((String) variables.get("shellError"));
activityInstance.setShellExitCode((Integer) variables.get("shellExitCode"));
}
return activityInstance;
})
.collect(Collectors.toList());
instanceDTO.setActivities(activityInstances);
return Response.success(instanceDTO);
}
@Operation(summary = "获取工作流执行状态")
@GetMapping("/instance/{processInstanceId}/execution")
public ResponseEntity<WorkflowExecutionDTO> getWorkflowExecution(@PathVariable String processInstanceId) {
// 获取历史活动实例
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
// 获取流程实例
HistoricProcessInstance processInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
return ResponseEntity.notFound().build();
}
// 获取所有流程变量
List<HistoricVariableInstance> allVariables = historyService.createHistoricVariableInstanceQuery()
.processInstanceId(processInstanceId)
.list();
// 构建变量映射
Map<String, Object> variableMap = new HashMap<>();
for (HistoricVariableInstance variable : allVariables) {
variableMap.put(variable.getVariableName(), variable.getValue());
}
// 构建执行状态DTO
WorkflowExecutionDTO executionDTO = new WorkflowExecutionDTO();
executionDTO.setProcessInstanceId(processInstance.getId());
executionDTO.setProcessDefinitionId(processInstance.getProcessDefinitionId());
// executionDTO.setProcessDefinitionKey(processInstance.getProcessDefinitionKey());
executionDTO.setBusinessKey(processInstance.getBusinessKey());
// 设置流程状态
if (processInstance.getEndTime() != null) {
executionDTO.setStatus(processInstance.getDeleteReason() == null ? "COMPLETED" : "FAILED");
} else {
executionDTO.setStatus("RUNNING");
}
// 构建阶段列表
List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
for (HistoricActivityInstance activity : activities) {
WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
stage.setId(activity.getActivityId());
stage.setName(activity.getActivityName());
stage.setType(activity.getActivityType());
// stage.setStartTime(activity.getStartTime() != null ? activity.getStartTime().toString() : null);
// stage.setEndTime(activity.getEndTime() != null ? activity.getEndTime().toString() : null);
// 设置阶段状态
if (activity.getEndTime() == null) {
stage.setStatus("RUNNING");
} else {
stage.setStatus(activity.getDeleteReason() == null ? "COMPLETED" : "FAILED");
}
// 如果是Shell任务获取输出
if ("serviceTask".equals(activity.getActivityType()) &&
activity.getActivityId().toLowerCase().contains("shell")) {
// 获取Shell任务的输出
String output = (String) variableMap.get("shellOutput");
String error = (String) variableMap.get("shellError");
Integer exitCode = (Integer) variableMap.get("shellExitCode");
stage.setOutput(output);
stage.setError(error);
stage.setExitCode(exitCode);
}
stages.add(stage);
}
executionDTO.setStages(stages);
executionDTO.setVariables(variableMap);
return ResponseEntity.ok(executionDTO);
}
@Operation(summary = "查询工作流实例列表")
@GetMapping("/instances")
public Response<List<WorkflowInstanceDTO>> listWorkflowInstances(
@Parameter(description = "流程标识") @RequestParam(required = false) String processKey,
@Parameter(description = "业务标识") @RequestParam(required = false) String businessKey
) {
List<HistoricProcessInstance> historicProcessInstances = historyService.createHistoricProcessInstanceQuery()
.processDefinitionKey(processKey)
.processInstanceBusinessKey(businessKey)
.includeProcessVariables()
.orderByProcessInstanceStartTime()
.desc()
.list();
List<WorkflowInstanceDTO> instanceDTOs = historicProcessInstances.stream()
.map(historicProcessInstance -> {
WorkflowInstanceDTO instanceDTO = new WorkflowInstanceDTO();
instanceDTO.setId(historicProcessInstance.getId());
instanceDTO.setProcessDefinitionId(historicProcessInstance.getProcessDefinitionId());
instanceDTO.setBusinessKey(historicProcessInstance.getBusinessKey());
instanceDTO.setStartTime(historicProcessInstance.getStartTime());
instanceDTO.setEndTime(historicProcessInstance.getEndTime());
instanceDTO.setDurationInMillis(historicProcessInstance.getDurationInMillis());
instanceDTO.setStartUserId(historicProcessInstance.getStartUserId());
instanceDTO.setStatus(historicProcessInstance.getEndTime() != null ? "COMPLETED" : "RUNNING");
instanceDTO.setVariables(historicProcessInstance.getProcessVariables());
return instanceDTO;
})
.collect(Collectors.toList());
return Response.success(instanceDTOs);
}
@Override
protected void exportData(HttpServletResponse response, List<WorkflowDefinitionDTO> data) {
}
}

View File

@ -1,4 +1,4 @@
package com.qqchen.deploy.backend.workflow.controller; package com.qqchen.deploy.backend.workflow.api;
import com.qqchen.deploy.backend.framework.api.Response; import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.framework.controller.BaseController; import com.qqchen.deploy.backend.framework.controller.BaseController;
@ -22,7 +22,7 @@ import java.util.List;
@RestController @RestController
@RequestMapping("/api/v1/workflow/design") @RequestMapping("/api/v1/workflow/design")
@Tag(name = "工作流设计", description = "工作流设计相关接口") @Tag(name = "工作流设计", description = "工作流设计相关接口")
public class WorkflowDesignController extends BaseController<WorkflowDefinition, WorkflowDesignDTO, Long, WorkflowDefinitionQuery> { public class WorkflowDesignApiController extends BaseController<WorkflowDefinition, WorkflowDesignDTO, Long, WorkflowDefinitionQuery> {
@Resource @Resource
private IWorkflowDesignService workflowDesignService; private IWorkflowDesignService workflowDesignService;

View File

@ -1,72 +0,0 @@
package com.qqchen.deploy.backend.workflow.controller;
import com.qqchen.deploy.backend.framework.api.Response;
import com.qqchen.deploy.backend.framework.controller.BaseController;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery;
import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.runtime.ProcessInstance;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
/**
* 工作流定义控制器
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/workflow/definition")
@Tag(name = "工作流定义管理", description = "工作流定义管理相关接口")
public class WorkflowDefinitionController extends BaseController<WorkflowDefinition, WorkflowDefinitionDTO, Long, WorkflowDefinitionQuery> {
@Resource
private IWorkflowDefinitionService workflowDefinitionService;
@Operation(summary = "部署工作流")
@PostMapping("/deploy")
public Response<WorkflowDefinitionDTO> deployWorkflow(@RequestBody WorkflowDefinitionDTO dto) {
return Response.success(workflowDefinitionService.deployWorkflow(dto));
}
@Operation(summary = "启动工作流实例")
@PostMapping("/start")
public Response<String> startWorkflow(
@Parameter(description = "流程标识", required = true) @RequestParam String processKey,
@Parameter(description = "业务标识", required = true) @RequestParam String businessKey,
@Parameter(description = "流程变量") @RequestBody(required = false) Map<String, Object> variables
) {
ProcessInstance instance = workflowDefinitionService.startWorkflow(processKey, businessKey, variables);
return Response.success(instance.getId());
}
@Operation(summary = "挂起工作流实例")
@PostMapping("/{processInstanceId}/suspend")
public Response<Void> suspendWorkflow(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId
) {
workflowDefinitionService.suspendWorkflow(processInstanceId);
return Response.success();
}
@Operation(summary = "恢复工作流实例")
@PostMapping("/{processInstanceId}/resume")
public Response<Void> resumeWorkflow(
@Parameter(description = "流程实例ID", required = true) @PathVariable String processInstanceId
) {
workflowDefinitionService.resumeWorkflow(processInstanceId);
return Response.success();
}
@Override
protected void exportData(HttpServletResponse response, List<WorkflowDefinitionDTO> data) {
}
}

View File

@ -0,0 +1,160 @@
package com.qqchen.deploy.backend.workflow.delegate;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Shell任务委托执行器
*/
@Slf4j
@Component
public class ShellTaskDelegate implements JavaDelegate {
private Expression script;
private Expression workDir;
private Expression env;
@Override
public void execute(DelegateExecution execution) {
// 从字段注入中获取值
String scriptValue = script != null ? script.getValue(execution).toString() : null;
String workDirValue = workDir != null ? workDir.getValue(execution).toString() : null;
@SuppressWarnings("unchecked")
Map<String, String> envValue = env != null ? (Map<String, String>) env.getValue(execution) : null;
// 如果流程变量中有值优先使用流程变量
if (execution.hasVariable("script")) {
scriptValue = (String) execution.getVariable("script");
}
if (execution.hasVariable("workDir")) {
workDirValue = (String) execution.getVariable("workDir");
}
if (execution.hasVariable("env")) {
@SuppressWarnings("unchecked")
Map<String, String> envFromVar = (Map<String, String>) execution.getVariable("env");
envValue = envFromVar;
}
if (scriptValue == null) {
throw new RuntimeException("Script is required but not provided");
}
try {
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("bash", "-c", scriptValue);
// 设置工作目录
if (StringUtils.hasText(workDirValue)) {
processBuilder.directory(new File(workDirValue));
}
// 设置环境变量
if (envValue != null) {
processBuilder.environment().putAll(envValue);
}
// 执行命令
log.info("Executing shell script: {}", scriptValue);
Process process = processBuilder.start();
// 使用BufferedReader实时读取输出
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
StringBuilder output = new StringBuilder();
StringBuilder error = new StringBuilder();
// 创建线程池处理输出
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 处理标准输出
Future<?> outputFuture = executorService.submit(() -> {
String line;
try {
while ((line = reader.readLine()) != null) {
synchronized (output) {
output.append(line).append("\n");
}
log.info("Shell output: {}", line);
Thread.sleep(500); // 增加延迟让输出更容易观察
}
} catch (Exception e) {
log.error("Error reading process output", e);
}
});
// 处理错误输出
Future<?> errorFuture = executorService.submit(() -> {
String line;
try {
while ((line = errorReader.readLine()) != null) {
synchronized (error) {
error.append(line).append("\n");
}
log.error("Shell error: {}", line);
}
} catch (IOException e) {
log.error("Error reading process error", e);
}
});
// 定期更新变量
while (!outputFuture.isDone() || !errorFuture.isDone()) {
synchronized (output) {
execution.setVariable("shellOutput", output.toString());
}
synchronized (error) {
execution.setVariable("shellError", error.toString());
}
Thread.sleep(1000); // 每秒更新一次变量
}
// 等待进程完成
int exitCode = process.waitFor();
// 等待输出处理完成
outputFuture.get(5, TimeUnit.SECONDS);
errorFuture.get(5, TimeUnit.SECONDS);
// 关闭线程池
executorService.shutdown();
// 设置最终结果
synchronized (output) {
execution.setVariable("shellOutput", output.toString());
}
synchronized (error) {
execution.setVariable("shellError", error.toString());
}
execution.setVariable("shellExitCode", exitCode);
if (exitCode != 0) {
log.error("Shell script execution failed with exit code: {}", exitCode);
log.error("Error output: {}", error);
throw new RuntimeException("Shell script execution failed with exit code: " + exitCode);
}
log.info("Shell script executed successfully");
log.debug("Script output: {}", output);
} catch (Exception e) {
log.error("Shell script execution failed", e);
throw new RuntimeException("Shell script execution failed: " + e.getMessage(), e);
}
}
}

View File

@ -24,7 +24,7 @@ public class WorkflowDefinitionDTO extends BaseDTO {
/** /**
* 流程版本 * 流程版本
*/ */
private Integer version; private Integer flowVersion;
/** /**
* BPMN XML内容 * BPMN XML内容

View File

@ -1,5 +1,6 @@
package com.qqchen.deploy.backend.workflow.dto; package com.qqchen.deploy.backend.workflow.dto;
import com.fasterxml.jackson.databind.JsonNode;
import com.qqchen.deploy.backend.framework.dto.BaseDTO; import com.qqchen.deploy.backend.framework.dto.BaseDTO;
import lombok.Data; import lombok.Data;
@ -26,5 +27,5 @@ public class WorkflowDesignDTO extends BaseDTO {
/** /**
* 流程图数据 * 流程图数据
*/ */
private String graphJson; private JsonNode graphJson;
} }

View File

@ -0,0 +1,122 @@
package com.qqchen.deploy.backend.workflow.dto;
import lombok.Data;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Data
public class WorkflowExecutionDTO {
/**
* 流程实例ID
*/
private String processInstanceId;
/**
* 流程定义ID
*/
private String processDefinitionId;
/**
* 流程定义名称
*/
private String processDefinitionName;
/**
* 业务标识
*/
private String businessKey;
/**
* 流程状态RUNNING, SUSPENDED, COMPLETED
*/
private String status;
/**
* 当前活动节点ID
*/
private String currentActivityId;
/**
* 当前活动节点名称
*/
private String currentActivityName;
/**
* 当前活动节点类型
*/
private String currentActivityType;
/**
* 开始时间
*/
private Date startTime;
/**
* 结束时间
*/
private Date endTime;
/**
* 持续时间毫秒
*/
private Long durationInMillis;
/**
* 流程变量
*/
private Map<String, Object> variables;
/**
* 流程阶段列表
*/
private List<StageDTO> stages;
@Data
public static class StageDTO {
/**
* 节点ID
*/
private String id;
/**
* 节点名称
*/
private String name;
/**
* 节点类型START_EVENT, USER_TASK, SERVICE_TASK, SHELL_TASK
*/
private String type;
/**
* 节点状态PENDING, RUNNING, COMPLETED, FAILED
*/
private String status;
/**
* 开始时间
*/
private Date startTime;
/**
* 结束时间
*/
private Date endTime;
/**
* Shell任务的输出如果是 SHELL_TASK 类型
*/
private String output;
/**
* Shell任务的错误信息如果是 SHELL_TASK 类型
*/
private String error;
/**
* Shell任务的退出码如果是 SHELL_TASK 类型
*/
private Integer exitCode;
}
}

View File

@ -0,0 +1,20 @@
package com.qqchen.deploy.backend.workflow.dto;
import lombok.Data;
@Data
public class WorkflowInstanceCreateDTO {
private String processInstanceId;
private String processDefinitionKey;
private String businessKey;
private String status; // CREATING, RUNNING, ERROR
public static WorkflowInstanceCreateDTO of(String processInstanceId, String processDefinitionKey, String businessKey) {
WorkflowInstanceCreateDTO dto = new WorkflowInstanceCreateDTO();
dto.setProcessInstanceId(processInstanceId);
dto.setProcessDefinitionKey(processDefinitionKey);
dto.setBusinessKey(businessKey);
dto.setStatus("CREATING");
return dto;
}
}

View File

@ -0,0 +1,75 @@
package com.qqchen.deploy.backend.workflow.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Data
@Schema(description = "工作流实例信息")
public class WorkflowInstanceDTO {
@Schema(description = "实例ID")
private String id;
@Schema(description = "流程定义ID")
private String processDefinitionId;
@Schema(description = "业务标识")
private String businessKey;
@Schema(description = "开始时间")
private Date startTime;
@Schema(description = "结束时间")
private Date endTime;
@Schema(description = "执行时长(毫秒)")
private Long durationInMillis;
@Schema(description = "启动用户ID")
private String startUserId;
@Schema(description = "状态(RUNNING/COMPLETED/FAILED)")
private String status;
@Data
@Schema(description = "活动节点信息")
public static class ActivityInstance {
@Schema(description = "活动ID")
private String id;
@Schema(description = "活动定义ID")
private String activityId;
@Schema(description = "活动名称")
private String activityName;
@Schema(description = "活动类型")
private String activityType;
@Schema(description = "开始时间")
private Date startTime;
@Schema(description = "结束时间")
private Date endTime;
@Schema(description = "执行时长(毫秒)")
private Long durationInMillis;
@Schema(description = "Shell输出")
private String shellOutput;
@Schema(description = "Shell错误")
private String shellError;
@Schema(description = "Shell退出码")
private Integer shellExitCode;
}
@Schema(description = "活动节点列表")
private List<ActivityInstance> activities;
@Schema(description = "流程变量")
private Map<String, Object> variables;
}

View File

@ -1,10 +1,13 @@
package com.qqchen.deploy.backend.workflow.entity; package com.qqchen.deploy.backend.workflow.entity;
import com.fasterxml.jackson.databind.JsonNode;
import com.vladmihalcea.hibernate.type.json.JsonType;
import com.qqchen.deploy.backend.framework.domain.Entity; import com.qqchen.deploy.backend.framework.domain.Entity;
import jakarta.persistence.Column; import jakarta.persistence.Column;
import jakarta.persistence.Table; import jakarta.persistence.Table;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import org.hibernate.annotations.Type;
/** /**
* 工作流定义实体 * 工作流定义实体
@ -24,14 +27,14 @@ public class WorkflowDefinition extends Entity<Long> {
/** /**
* 流程标识 * 流程标识
*/ */
@Column(nullable = false) @Column(name = "`key`", nullable = false)
private String key; private String key;
/** /**
* 流程版本 * 流程版本
*/ */
@Column(nullable = false) @Column(name = "flow_version", nullable = false)
private Integer version; private Integer flowVersion;
/** /**
* BPMN XML内容 * BPMN XML内容
@ -42,8 +45,9 @@ public class WorkflowDefinition extends Entity<Long> {
/** /**
* 流程图JSON数据 * 流程图JSON数据
*/ */
@Column(name = "graph_json", columnDefinition = "TEXT") @Type(JsonType.class)
private String graphJson; @Column(name = "graph_json", columnDefinition = "json")
private JsonNode graphJson;
/** /**
* 流程描述 * 流程描述

View File

@ -2,6 +2,8 @@ package com.qqchen.deploy.backend.workflow.service;
import com.qqchen.deploy.backend.framework.service.IBaseService; import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
@ -28,7 +30,7 @@ public interface IWorkflowDefinitionService extends IBaseService<WorkflowDefinit
* @param variables 流程变量 * @param variables 流程变量
* @return 流程实例 * @return 流程实例
*/ */
ProcessInstance startWorkflow(String processKey, String businessKey, Map<String, Object> variables); WorkflowInstanceCreateDTO startWorkflow(String processKey, String businessKey, Map<String, Object> variables);
/** /**
* 挂起工作流实例 * 挂起工作流实例
@ -43,4 +45,12 @@ public interface IWorkflowDefinitionService extends IBaseService<WorkflowDefinit
* @param processInstanceId 流程实例ID * @param processInstanceId 流程实例ID
*/ */
void resumeWorkflow(String processInstanceId); void resumeWorkflow(String processInstanceId);
/**
* 获取工作流执行状态
*
* @param processInstanceId 流程实例ID
* @return 工作流执行状态DTO
*/
WorkflowExecutionDTO getWorkflowExecution(String processInstanceId);
} }

View File

@ -2,18 +2,34 @@ package com.qqchen.deploy.backend.workflow.service.impl;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService; import com.qqchen.deploy.backend.workflow.service.IWorkflowDefinitionService;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.HistoryService;
import org.flowable.engine.RepositoryService; import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.repository.Deployment; import org.flowable.engine.repository.Deployment;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.Task;
import org.flowable.bpmn.model.Event;
import org.flowable.bpmn.model.ServiceTask;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* 工作流定义服务实现 * 工作流定义服务实现
@ -29,6 +45,12 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
@Resource @Resource
private RuntimeService runtimeService; private RuntimeService runtimeService;
@Resource
private HistoryService historyService;
@Resource
private IWorkflowDefinitionRepository workflowDefinitionRepository;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public WorkflowDefinitionDTO deployWorkflow(WorkflowDefinitionDTO dto) { public WorkflowDefinitionDTO deployWorkflow(WorkflowDefinitionDTO dto) {
@ -40,9 +62,15 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
.deploy(); .deploy();
log.info("Successfully deployed workflow: {}", dto.getName()); log.info("Successfully deployed workflow: {}", dto.getName());
// 保存流程定义 // 查找已存在的工作流定义
return super.create(dto); WorkflowDefinition definition = workflowDefinitionRepository.findById(dto.getId())
.orElseThrow(() -> new RuntimeException("Workflow definition not found: " + dto.getId()));
// 转换为DTO
dto.setFlowVersion(definition.getFlowVersion());
return dto;
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to deploy workflow: {}", dto.getName(), e); log.error("Failed to deploy workflow: {}", dto.getName(), e);
throw new RuntimeException("Failed to deploy workflow", e); throw new RuntimeException("Failed to deploy workflow", e);
@ -50,19 +78,27 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
} }
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional
public ProcessInstance startWorkflow(String processKey, String businessKey, Map<String, Object> variables) { public WorkflowInstanceCreateDTO startWorkflow(String processKey, String businessKey, Map<String, Object> variables) {
try { try {
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey( // 1. 创建并异步启动流程实例
processKey, ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
businessKey, .processDefinitionKey(processKey)
variables .businessKey(businessKey)
); .variables(variables)
log.info("Started workflow instance: {}", processInstance.getId()); .startAsync(); // 异步启动会自动执行 shell 任务
return processInstance;
// 2. 返回实例信息
WorkflowInstanceCreateDTO dto = new WorkflowInstanceCreateDTO();
dto.setProcessInstanceId(processInstance.getId());
dto.setProcessDefinitionKey(processKey);
dto.setBusinessKey(businessKey);
dto.setStatus("RUNNING"); // 因为实例已经在运行了
return dto;
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to start workflow: {}", processKey, e); log.error("Failed to create workflow: {}", processKey, e);
throw new RuntimeException("Failed to start workflow", e); throw new RuntimeException("Failed to create workflow", e);
} }
} }
@ -87,4 +123,132 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
throw new RuntimeException("Failed to resume workflow instance", e); throw new RuntimeException("Failed to resume workflow instance", e);
} }
} }
@Override
public WorkflowExecutionDTO getWorkflowExecution(String processInstanceId) {
try {
// 1. 查询流程实例
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
if (processInstance == null) {
throw new RuntimeException("Process instance not found: " + processInstanceId);
}
// 2. 查询当前活动的节点
List<String> activeActivityIds = runtimeService.getActiveActivityIds(processInstanceId);
// 3. 获取历史活动节点
List<HistoricActivityInstance> historicActivities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.orderByHistoricActivityInstanceStartTime()
.asc()
.list();
// 4. 获取流程实例的历史信息
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
// 5. 获取流程定义模型
String processDefinitionId = processInstance.getProcessDefinitionId();
BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinitionId);
Process process = bpmnModel.getMainProcess();
Collection<FlowElement> flowElements = process.getFlowElements();
// 6. 创建返回对象
WorkflowExecutionDTO dto = new WorkflowExecutionDTO();
dto.setProcessInstanceId(processInstanceId);
dto.setProcessDefinitionId(processInstance.getProcessDefinitionId());
dto.setProcessDefinitionName(processInstance.getProcessDefinitionName());
dto.setBusinessKey(processInstance.getBusinessKey());
dto.setStatus(processInstance.isSuspended() ? "SUSPENDED" :
(historicProcessInstance != null && historicProcessInstance.getEndTime() != null) ? "COMPLETED" : "RUNNING");
// 7. 设置时间信息
if (historicProcessInstance != null) {
dto.setStartTime(historicProcessInstance.getStartTime());
dto.setEndTime(historicProcessInstance.getEndTime());
dto.setDurationInMillis(historicProcessInstance.getDurationInMillis());
}
// 8. 获取流程变量
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
dto.setVariables(variables);
// 9. 设置当前活动节点
if (!activeActivityIds.isEmpty()) {
String currentActivityId = activeActivityIds.get(0);
dto.setCurrentActivityId(currentActivityId);
// 从流程定义中找到当前节点的名称和类型
FlowElement currentElement = process.getFlowElement(currentActivityId);
if (currentElement != null) {
dto.setCurrentActivityName(currentElement.getName());
dto.setCurrentActivityType(currentElement.getClass().getSimpleName());
}
}
// 10. 创建历史活动实例的映射用于快速查找
Map<String, HistoricActivityInstance> historicActivityMap = historicActivities.stream()
.collect(Collectors.toMap(
HistoricActivityInstance::getActivityId,
activity -> activity,
(existing, replacement) -> existing // 如果有重复保留第一个
));
// 11. 设置所有节点信息
List<WorkflowExecutionDTO.StageDTO> stages = new ArrayList<>();
for (FlowElement element : flowElements) {
// 只处理任务节点和事件节点
if (element instanceof Task || element instanceof Event) {
WorkflowExecutionDTO.StageDTO stage = new WorkflowExecutionDTO.StageDTO();
stage.setId(element.getId());
stage.setName(element.getName());
stage.setType(element.getClass().getSimpleName());
// 获取历史活动实例如果存在
HistoricActivityInstance historicActivity = historicActivityMap.get(element.getId());
if (historicActivity != null) {
// 节点已经执行过
stage.setStartTime(historicActivity.getStartTime());
stage.setEndTime(historicActivity.getEndTime());
if (historicActivity.getEndTime() != null) {
stage.setStatus("COMPLETED");
} else if (activeActivityIds.contains(element.getId())) {
stage.setStatus("RUNNING");
} else {
stage.setStatus("PENDING");
}
// 如果是Shell任务获取输出
if (element instanceof ServiceTask && "shellTask".equals(((ServiceTask) element).getType())) {
String executionId = historicActivity.getExecutionId();
if (executionId != null) {
stage.setOutput((String) variables.get(executionId + "_shellOutput"));
stage.setError((String) variables.get(executionId + "_shellError"));
Integer exitCode = (Integer) variables.get(executionId + "_exitCode");
stage.setExitCode(exitCode != null ? exitCode : -1);
}
}
} else {
// 节点尚未执行
stage.setStatus("PENDING");
}
stages.add(stage);
}
}
dto.setStages(stages);
return dto;
} catch (Exception e) {
log.error("Failed to get workflow execution: {}", processInstanceId, e);
throw new RuntimeException("Failed to get workflow execution", e);
}
}
} }

View File

@ -2,6 +2,7 @@ package com.qqchen.deploy.backend.workflow.service.impl;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDesignDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowDesignDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery; import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository; import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
@ -40,8 +41,27 @@ public class WorkflowDesignServiceImpl extends BaseServiceImpl<WorkflowDefinitio
definition.setDescription(dto.getDescription()); definition.setDescription(dto.getDescription());
definition.setGraphJson(dto.getGraphJson()); definition.setGraphJson(dto.getGraphJson());
definition.setBpmnXml(bpmnXml); definition.setBpmnXml(bpmnXml);
definition.setFlowVersion(1); // 设置初始版本为1
// 保存工作流定义 // 保存工作流定义
return super.converter.toDto(workflowDefinitionRepository.save(definition)); definition = workflowDefinitionRepository.save(definition);
// 部署工作流
WorkflowDefinitionDTO deployDto = new WorkflowDefinitionDTO();
deployDto.setId(definition.getId());
deployDto.setName(definition.getName());
deployDto.setKey(definition.getKey());
deployDto.setBpmnXml(definition.getBpmnXml());
deployDto.setDescription(definition.getDescription());
workflowDefinitionService.deployWorkflow(deployDto);
// 手动转换为 WorkflowDesignDTO
WorkflowDesignDTO result = new WorkflowDesignDTO();
result.setId(definition.getId());
result.setName(definition.getName());
result.setKey(definition.getKey());
result.setDescription(definition.getDescription());
result.setGraphJson(definition.getGraphJson());
return result;
} }
} }

View File

@ -25,9 +25,7 @@ public class BpmnConverter {
/** /**
* 将X6 JSON转换为BPMN XML * 将X6 JSON转换为BPMN XML
*/ */
public String convertToBpmnXml(String x6Json, String processId) throws Exception { public String convertToBpmnXml(JsonNode x6Json, String processId) throws Exception {
JsonNode jsonNode = objectMapper.readTree(x6Json);
// 创建BPMN模型 // 创建BPMN模型
BpmnModel bpmnModel = new BpmnModel(); BpmnModel bpmnModel = new BpmnModel();
Process process = new Process(); Process process = new Process();
@ -38,7 +36,7 @@ public class BpmnConverter {
// 解析节点 // 解析节点
Map<String, FlowElement> elementMap = new HashMap<>(); Map<String, FlowElement> elementMap = new HashMap<>();
JsonNode cells = jsonNode.get("cells"); JsonNode cells = x6Json.get("cells");
if (cells != null) { if (cells != null) {
for (JsonNode cell : cells) { for (JsonNode cell : cells) {
String shape = cell.get("shape").asText(); String shape = cell.get("shape").asText();
@ -69,6 +67,29 @@ public class BpmnConverter {
ServiceTask serviceTask = new ServiceTask(); ServiceTask serviceTask = new ServiceTask();
serviceTask.setId(id); serviceTask.setId(id);
serviceTask.setName(label); serviceTask.setName(label);
// 解析服务任务配置
JsonNode serviceTaskConfig = cell.path("data").path("serviceTask");
if (serviceTaskConfig != null) {
String type = serviceTaskConfig.path("type").asText();
String implementation = serviceTaskConfig.path("implementation").asText();
if ("shell".equals(type)) {
// 设置委托表达式指向 ShellTaskDelegate
serviceTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
serviceTask.setImplementation("${shellTaskDelegate}");
JsonNode fields = serviceTaskConfig.path("fields");
if (fields != null) {
fields.fields().forEachRemaining(entry -> {
FieldExtension field = new FieldExtension();
field.setFieldName(entry.getKey());
field.setStringValue(entry.getValue().asText());
serviceTask.getFieldExtensions().add(field);
});
}
}
}
element = serviceTask; element = serviceTask;
break; break;
} }
@ -82,8 +103,8 @@ public class BpmnConverter {
// 解析连线 // 解析连线
for (JsonNode cell : cells) { for (JsonNode cell : cells) {
if (cell.has("source") && cell.has("target")) { if (cell.has("source") && cell.has("target")) {
String sourceId = cell.get("source").get("cell").asText(); String sourceId = cell.get("source").asText();
String targetId = cell.get("target").get("cell").asText(); String targetId = cell.get("target").asText();
String id = cell.get("id").asText(); String id = cell.get("id").asText();
FlowElement sourceElement = elementMap.get(sourceId); FlowElement sourceElement = elementMap.get(sourceId);
@ -104,9 +125,9 @@ public class BpmnConverter {
new BpmnAutoLayout(bpmnModel).execute(); new BpmnAutoLayout(bpmnModel).execute();
// 转换为XML // 转换为XML
org.flowable.bpmn.converter.BpmnXMLConverter converter = org.flowable.bpmn.converter.BpmnXMLConverter converter = new org.flowable.bpmn.converter.BpmnXMLConverter();
new org.flowable.bpmn.converter.BpmnXMLConverter(); byte[] xml = converter.convertToXML(bpmnModel);
byte[] xmlBytes = converter.convertToXML(bpmnModel);
return new String(xmlBytes); return new String(xml);
} }
} }

View File

@ -396,7 +396,7 @@ CREATE TABLE workflow_definition (
`key` VARCHAR(50) NOT NULL COMMENT '流程标识', `key` VARCHAR(50) NOT NULL COMMENT '流程标识',
flow_version INT NOT NULL COMMENT '流程版本', flow_version INT NOT NULL COMMENT '流程版本',
bpmn_xml TEXT NOT NULL COMMENT 'BPMN XML内容', bpmn_xml TEXT NOT NULL COMMENT 'BPMN XML内容',
graph_json TEXT COMMENT 'x6 JSON内容', graph_json JSON COMMENT 'x6 JSON内容',
description VARCHAR(255) NULL COMMENT '流程描述', description VARCHAR(255) NULL COMMENT '流程描述',
CONSTRAINT UK_workflow_definition_key_version UNIQUE (`key`, flow_version) CONSTRAINT UK_workflow_definition_key_version UNIQUE (`key`, flow_version)

View File

@ -0,0 +1,66 @@
{
"cells": [
{
"id": "start",
"shape": "start",
"data": {
"label": "开始"
},
"position": {
"x": 100,
"y": 100
}
},
{
"id": "shell",
"shape": "serviceTask",
"data": {
"label": "Shell脚本",
"serviceTask": {
"type": "shell",
"implementation": "${shellTaskDelegate}",
"fields": {
"script": "for i in {1..20}; do echo \"Step $i: Starting...\"; sleep 1; echo \"Step $i: Running tests...\"; sleep 1; echo \"Step $i: Deploying...\"; sleep 1; echo \"Step $i: Completed\"; echo \"-------------------\"; done",
"workDir": "/tmp",
"env": {
"TEST_VAR": "test_value"
}
}
}
},
"position": {
"x": 300,
"y": 100
}
},
{
"id": "end",
"shape": "end",
"data": {
"label": "结束"
},
"position": {
"x": 500,
"y": 100
}
},
{
"id": "flow1",
"shape": "edge",
"source": "start",
"target": "shell",
"data": {
"label": "流转到Shell"
}
},
{
"id": "flow2",
"shape": "edge",
"source": "shell",
"target": "end",
"data": {
"label": "完成"
}
}
]
}