diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowNodeInstanceApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowNodeInstanceApiController.java index ea24e5fc..60f8d9cd 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowNodeInstanceApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowNodeInstanceApiController.java @@ -1,25 +1,56 @@ package com.qqchen.deploy.backend.workflow.api; +import com.qqchen.deploy.backend.framework.api.Response; +import com.qqchen.deploy.backend.framework.controller.BaseController; +import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; import com.qqchen.deploy.backend.workflow.event.ShellLogEvent; +import com.qqchen.deploy.backend.workflow.query.WorkflowNodeInstanceQuery; +import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeInstanceService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +/** + * 工作流节点实例控制器 + */ @Slf4j @RestController +@RequestMapping("/api/v1/workflow/node") +@Tag(name = "工作流节点实例管理", description = "工作流节点实例管理相关接口") @CrossOrigin(origins = "*", allowedHeaders = "*") -@RequestMapping("/api/v1/workflow/instance") -public class WorkflowNodeInstanceApiController { +public class WorkflowNodeInstanceApiController extends BaseController { + + @Resource + private IWorkflowNodeInstanceService workflowNodeInstanceService; // 存储每个进程的SSE发射器 private static final Map EMITTERS = new ConcurrentHashMap<>(); + @Operation(summary = "获取流程实例的所有节点") + @GetMapping("/list/{processInstanceId}") + public Response> getNodeInstances( + @Parameter(description = "流程实例ID") @PathVariable String processInstanceId + ) { + return Response.success(workflowNodeInstanceService.getNodesByProcessInstanceId(processInstanceId)); + } + + @Operation(summary = "获取节点执行日志") @GetMapping(value = "/log/{processInstanceId}", produces = "text/event-stream") - public SseEmitter streamLogs(@PathVariable String processInstanceId) { + public SseEmitter streamLogs( + @Parameter(description = "流程实例ID") @PathVariable String processInstanceId + ) { SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 设置超时回调 @@ -75,4 +106,9 @@ public class WorkflowNodeInstanceApiController { log.warn("No emitter found for process: {}", processInstanceId); } } -} \ No newline at end of file + + @Override + protected void exportData(HttpServletResponse response, List data) { + + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/converter/WorkflowNodeInstanceConverter.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/converter/WorkflowNodeInstanceConverter.java new file mode 100644 index 00000000..6cd4ac7e --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/converter/WorkflowNodeInstanceConverter.java @@ -0,0 +1,25 @@ +package com.qqchen.deploy.backend.workflow.converter; + +import com.qqchen.deploy.backend.framework.converter.BaseConverter; +import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; + +@Mapper(config = BaseConverter.class) +public interface WorkflowNodeInstanceConverter extends BaseConverter { + + @Mappings({ + @Mapping(target = "processInstanceId", source = "processInstanceId"), + @Mapping(target = "startTime", source = "startTime"), + @Mapping(target = "endTime", source = "endTime"), + @Mapping(target = "status", source = "status"), + @Mapping(target = "executionId", source = "executionId"), + @Mapping(target = "nodeId", source = "nodeId"), + @Mapping(target = "nodeName", source = "nodeName"), + @Mapping(target = "nodeType", source = "nodeType"), + }) + WorkflowNodeInstance eventToEntity(WorkflowNodeInstanceStatusChangeEvent event); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowNodeInstanceDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowNodeInstanceDTO.java new file mode 100644 index 00000000..65ab1af4 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/WorkflowNodeInstanceDTO.java @@ -0,0 +1,33 @@ +package com.qqchen.deploy.backend.workflow.dto; + +import com.qqchen.deploy.backend.framework.dto.BaseDTO; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +public class WorkflowNodeInstanceDTO extends BaseDTO { + + private Long id; + + private String processInstanceId; + + private String executionId; + + private String nodeId; + + private String nodeName; + + private String nodeType; + + private WorkflowNodeInstanceStatusEnums status; + + private LocalDateTime startTime; + + private LocalDateTime endTime; + + private LocalDateTime createTime; + + private LocalDateTime updateTime; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java index cf10d229..6139f98c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java @@ -1,7 +1,11 @@ package com.qqchen.deploy.backend.workflow.entity; import com.qqchen.deploy.backend.framework.domain.Entity; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; import jakarta.persistence.Column; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; import jakarta.persistence.Table; import lombok.Data; import lombok.EqualsAndHashCode; @@ -16,55 +20,65 @@ import java.time.LocalDateTime; @jakarta.persistence.Entity @EqualsAndHashCode(callSuper = true) public class WorkflowNodeInstance extends Entity { - + + @Column(name = "workflow_definition_id", nullable = false) + private Long workflowDefinitionId; + /** * 工作流实例ID */ @Column(name = "workflow_instance_id", nullable = false) private Long workflowInstanceId; - + + @Column(name = "process_instance_id", nullable = false) + private String processInstanceId; + + @Column(name = "execution_id", nullable = false) + private String executionId; + /** * 节点ID */ @Column(name = "node_id", nullable = false) private String nodeId; - + /** * 节点名称 */ @Column(name = "node_name", nullable = false) private String nodeName; - + /** * 节点类型 */ @Column(name = "node_type", nullable = false) private String nodeType; - + /** * 节点状态 */ @Column(nullable = false) - private String status; - + @Enumerated(EnumType.STRING) + private WorkflowNodeInstanceStatusEnums status; + /** * 开始时间 */ @Column(name = "start_time", nullable = false) private LocalDateTime startTime; - + /** * 结束时间 */ @Column(name = "end_time") private LocalDateTime endTime; - + /** * 节点变量(JSON) */ @Column(columnDefinition = "TEXT") private String variables; - + /** * 错误信息 */ diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowNodeInstanceStatusEnums.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowNodeInstanceStatusEnums.java index a5f6f517..4075719d 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowNodeInstanceStatusEnums.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/WorkflowNodeInstanceStatusEnums.java @@ -9,10 +9,6 @@ public enum WorkflowNodeInstanceStatusEnums { * 未开始:节点尚未开始执行 */ NOT_STARTED("NOT_STARTED", "未开始"), - /** - * 已创建:流程实例已创建但还未开始运行 - */ - CREATED("CREATED", "已创建"), /** * 运行中:流程实例正在执行 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/event/WorkflowNodeInstanceStatusChangeEvent.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/event/WorkflowNodeInstanceStatusChangeEvent.java new file mode 100644 index 00000000..89d724a1 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/event/WorkflowNodeInstanceStatusChangeEvent.java @@ -0,0 +1,30 @@ +package com.qqchen.deploy.backend.workflow.event; + +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import lombok.Builder; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +@Builder +public class WorkflowNodeInstanceStatusChangeEvent { + + private String processInstanceId; + + private String executionId; + + private String nodeId; + + private String nodeName; + + private String nodeType; + + private WorkflowNodeInstanceStatusEnums status; + + private LocalDateTime startTime; + + private LocalDateTime endTime; + +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java index 7efa1f58..93fa2615 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java @@ -2,7 +2,9 @@ package com.qqchen.deploy.backend.workflow.listener; import cn.hutool.core.date.DateUtil; import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; @@ -15,11 +17,12 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; + @Component @Slf4j public class FlowableJobEventListener implements FlowableEventListener { - @Resource private ApplicationEventPublisher publisher; @@ -27,17 +30,23 @@ public class FlowableJobEventListener implements FlowableEventListener { @Lazy private HistoryService historyService; - @Override public void onEvent(FlowableEvent event) { FlowableEventType eventType = event.getType(); + log.info("Received Flowable event: {}, event class: {}", eventType, event.getClass().getName()); + // 只处理实体事件 + if (!(event instanceof FlowableEngineEntityEvent entity)) { + return; + } + + String processInstanceId = entity.getProcessInstanceId(); + if (isProcessLevelEvent(eventType.name())) { - FlowableEngineEntityEvent entity = (FlowableEngineEntityEvent) event; - String processInstanceId = entity.getProcessInstanceId(); WorkflowInstanceStatusEnums status = convertProcessLevelEventToStatus(eventType.name()); log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, processInstanceId); - HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery().processInstanceId(processInstanceId).singleResult(); - log.info("historicProcessInstance: {}", historicProcessInstance.getEndTime()); + HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(processInstanceId) + .singleResult(); publisher.publishEvent(WorkflowInstanceStatusChangeEvent.builder() .processInstanceId(processInstanceId) .status(status) @@ -45,11 +54,38 @@ public class FlowableJobEventListener implements FlowableEventListener { .build() ); } -// -// if (isActivityInstanceEvent(flowableEvent.name())) { -// System.out.println("节点:" + flowableEvent.name()); -// return; -// } + + if (isTaskInstanceEvent(eventType.name())) { + WorkflowNodeInstanceStatusEnums status = convertJobEventToStatus(eventType.name()); + // 获取Job信息 + String executionId = entity.getExecutionId(); + org.flowable.job.api.Job job = (org.flowable.job.service.impl.persistence.entity.JobEntityImpl) entity.getEntity(); + String activityId = job.getElementId(); + String activityName = job.getElementName(); + String activityType = job.getJobHandlerType(); + + LocalDateTime startTime = job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null; + LocalDateTime endTime = null; + if (status == WorkflowNodeInstanceStatusEnums.COMPLETED || + status == WorkflowNodeInstanceStatusEnums.FAILED) { + endTime = LocalDateTime.now(); // 对于完成或失败状态,使用当前时间作为结束时间 + } + + publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() + .processInstanceId(processInstanceId) + .executionId(executionId) + .nodeId(activityId) + .nodeName(activityName) + .nodeType(activityType) + .status(status) + .startTime(startTime) + .endTime(endTime) + .build() + ); + + log.info("Job event received: {} -> {}, processInstanceId: {}, nodeId: {}, nodeName: {}", + eventType, status, processInstanceId, activityId, activityName); + } } @Override @@ -71,6 +107,11 @@ public class FlowableJobEventListener implements FlowableEventListener { return eventType.startsWith("PROCESS_"); } + private boolean isTaskInstanceEvent(String eventType) { + return eventType.startsWith("JOB_EXECUTION_") || + eventType.startsWith("ACTIVITY_"); // 添加对 ACTIVITY_ 事件的监听 + } + /** * 将Flowable流程级别事件类型转换为工作流实例状态 * 只处理流程实例级别的事件,不处理活动节点级别的事件 @@ -113,4 +154,27 @@ public class FlowableJobEventListener implements FlowableEventListener { return null; } } + + /** + * 将Flowable活动节点事件类型转换为工作流节点实例状态 + * + * @param eventType Flowable事件类型 + * @return 工作流节点实例状态枚举,如果不是节点级别事件则返回null + */ + private WorkflowNodeInstanceStatusEnums convertJobEventToStatus(String eventType) { + switch (eventType) { + case "JOB_EXECUTION_SUCCESS": + return WorkflowNodeInstanceStatusEnums.COMPLETED; + case "JOB_EXECUTION_START": + return WorkflowNodeInstanceStatusEnums.RUNNING; + case "JOB_EXECUTION_FAILURE": + return WorkflowNodeInstanceStatusEnums.FAILED; + case "JOB_EXECUTION_REJECTED": // 当作业被拒绝执行时 + return WorkflowNodeInstanceStatusEnums.FAILED; + case "JOB_EXECUTION_NOJOB_FOUND": // 当找不到要执行的作业时 + return WorkflowNodeInstanceStatusEnums.FAILED; + default: + return null; + } + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowEventListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowEventListener.java deleted file mode 100644 index 61383817..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowEventListener.java +++ /dev/null @@ -1,2 +0,0 @@ -package com.qqchen.deploy.backend.workflow.listener; - diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java new file mode 100644 index 00000000..28b04ce7 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java @@ -0,0 +1,25 @@ +package com.qqchen.deploy.backend.workflow.listener; + +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeInstanceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Component +public class WorkflowNodeInstanceStatusChangeListener { + + @Resource + private IWorkflowNodeInstanceService workflowNodeInstanceService; + + @EventListener + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void handleWorkflowStatusChange(WorkflowNodeInstanceStatusChangeEvent event) { + log.info("Handling workflow node instance status change event: {}", event); + workflowNodeInstanceService.saveWorkflowNodeInstance(event); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/query/WorkflowNodeInstanceQuery.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/query/WorkflowNodeInstanceQuery.java new file mode 100644 index 00000000..090f7766 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/query/WorkflowNodeInstanceQuery.java @@ -0,0 +1,11 @@ +package com.qqchen.deploy.backend.workflow.query; + +import com.qqchen.deploy.backend.framework.query.BaseQuery; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class WorkflowNodeInstanceQuery extends BaseQuery { + +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java new file mode 100644 index 00000000..d5dbdd64 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java @@ -0,0 +1,15 @@ +package com.qqchen.deploy.backend.workflow.repository; + +import com.qqchen.deploy.backend.framework.repository.IBaseRepository; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; + +@Repository +public interface IWorkflowNodeInstanceRepository extends IBaseRepository { + Optional findByProcessInstanceIdAndExecutionId(String processInstanceId, String executionId); + + List findByProcessInstanceId(String processInstanceId); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java new file mode 100644 index 00000000..0654cbf6 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java @@ -0,0 +1,33 @@ +package com.qqchen.deploy.backend.workflow.service; + +import com.qqchen.deploy.backend.framework.service.IBaseService; +import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.query.WorkflowNodeInstanceQuery; + +import java.util.List; + +public interface IWorkflowNodeInstanceService extends IBaseService { + /** + * 更新节点实例状态 + * + * @param processInstanceId 流程实例ID + * @param executionId 执行ID + * @param status 状态 + * @return 更新后的节点实例 + */ + WorkflowNodeInstance updateNodeStatus(String processInstanceId, String executionId, WorkflowNodeInstanceStatusEnums status); + + /** + * 获取流程实例的所有节点 + * + * @param processInstanceId 流程实例ID + * @return 节点列表 + */ + List getNodesByProcessInstanceId(String processInstanceId); + + void saveWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java new file mode 100644 index 00000000..ecdc2578 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java @@ -0,0 +1,74 @@ +package com.qqchen.deploy.backend.workflow.service.impl; + +import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; +import com.qqchen.deploy.backend.workflow.converter.WorkflowNodeInstanceConverter; +import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; +import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; +import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository; +import com.qqchen.deploy.backend.workflow.repository.IWorkflowNodeInstanceRepository; +import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService; +import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeInstanceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class WorkflowNodeInstanceServiceImpl extends BaseServiceImpl implements IWorkflowNodeInstanceService { + + @Resource + private IWorkflowNodeInstanceRepository workflowNodeInstanceRepository; + + @Resource + private WorkflowNodeInstanceConverter workflowNodeInstanceConverter; + + @Resource + private IWorkflowInstanceRepository workflowInstanceRepository; + + @Override + @Transactional(rollbackFor = Exception.class) + public WorkflowNodeInstance updateNodeStatus(String processInstanceId, String executionId, WorkflowNodeInstanceStatusEnums status) { + WorkflowNodeInstance nodeInstance = workflowNodeInstanceRepository.findByProcessInstanceIdAndExecutionId(processInstanceId, executionId) + .orElseThrow(() -> new RuntimeException("Node instance not found for processInstanceId: " + processInstanceId + ", executionId: " + executionId)); + + nodeInstance.setStatus(status); + nodeInstance.setUpdateTime(LocalDateTime.now()); + + if (status == WorkflowNodeInstanceStatusEnums.COMPLETED || + status == WorkflowNodeInstanceStatusEnums.TERMINATED || + status == WorkflowNodeInstanceStatusEnums.FAILED) { + nodeInstance.setEndTime(LocalDateTime.now()); + } + + return workflowNodeInstanceRepository.save(nodeInstance); + } + + @Override + public List getNodesByProcessInstanceId(String processInstanceId) { + List nodes = workflowNodeInstanceRepository.findByProcessInstanceId(processInstanceId); + return nodes.stream() + .map(workflowNodeInstanceConverter::toDto) + .collect(Collectors.toList()); + } + + @Override + @Transactional + public void saveWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event) { + WorkflowNodeInstance nodeInstance = workflowNodeInstanceConverter.eventToEntity(event); + WorkflowInstance workflowInstance = workflowInstanceRepository.findByProcessInstanceId(nodeInstance.getProcessInstanceId()) + .orElseThrow(() -> new RuntimeException("Node instance not found for processInstanceId: " + nodeInstance.getProcessInstanceId())); + nodeInstance.setWorkflowInstanceId(workflowInstance.getId()); + nodeInstance.setWorkflowDefinitionId(workflowInstance.getWorkflowDefinitionId()); + super.repository.save(nodeInstance); + } +} diff --git a/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql b/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql index 7fa6fa8a..a2d40bf7 100644 --- a/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql +++ b/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql @@ -483,23 +483,26 @@ CREATE TABLE workflow_instance -- 工作流节点实例表 CREATE TABLE workflow_node_instance ( - id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', - create_by VARCHAR(255) NULL COMMENT '创建人', - create_time DATETIME(6) NULL COMMENT '创建时间', - deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除,1:已删除)', - update_by VARCHAR(255) NULL COMMENT '更新人', - update_time DATETIME(6) NULL COMMENT '更新时间', - version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号', + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', + create_by VARCHAR(255) NULL COMMENT '创建人', + create_time DATETIME(6) NULL COMMENT '创建时间', + deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除(0:未删除,1:已删除)', + update_by VARCHAR(255) NULL COMMENT '更新人', + update_time DATETIME(6) NULL COMMENT '更新时间', + version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号', - workflow_instance_id BIGINT NOT NULL COMMENT '工作流实例ID', - node_id VARCHAR(64) NOT NULL COMMENT '节点ID', - node_name VARCHAR(100) NOT NULL COMMENT '节点名称', - node_type VARCHAR(32) NOT NULL COMMENT '节点类型', - status VARCHAR(32) NOT NULL COMMENT '节点状态', - start_time DATETIME(6) NOT NULL COMMENT '开始时间', - end_time DATETIME(6) NULL COMMENT '结束时间', - variables TEXT NULL COMMENT '节点变量(JSON)', - error_message TEXT NULL COMMENT '错误信息', + workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID', + workflow_instance_id BIGINT NOT NULL COMMENT '工作流实例ID', + process_instance_id VARCHAR(64) NOT NULL COMMENT '流程实例ID', + execution_id VARCHAR(100) NOT NULL COMMENT '执行实例ID', + node_id VARCHAR(64) NOT NULL COMMENT '节点ID', + node_name VARCHAR(100) NOT NULL COMMENT '节点名称', + node_type VARCHAR(32) NOT NULL COMMENT '节点类型', + status VARCHAR(32) NOT NULL COMMENT '节点状态', + start_time DATETIME(6) NOT NULL COMMENT '开始时间', + end_time DATETIME(6) NULL COMMENT '结束时间', + variables TEXT NULL COMMENT '节点变量(JSON)', + error_message TEXT NULL COMMENT '错误信息', CONSTRAINT FK_workflow_node_instance_instance FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='工作流节点实例表';