diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowInstanceApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowInstanceApiController.java index 64719354..f4cb5729 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowInstanceApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowInstanceApiController.java @@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; +import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery; @@ -50,10 +51,9 @@ public class WorkflowInstanceApiController extends BaseController startWorkflow( - @RequestParam String processKey, - @RequestParam String businessKey) { - return Response.success(workflowInstanceService.startWorkflow(processKey, businessKey)); + public Response startWorkflow(@RequestBody WorkflowInstanceStartRequest request) { + workflowInstanceService.startWorkflow(request); + return Response.success(); } @Override diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/config/FlowableConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/config/FlowableConfig.java index ac290730..6503350e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/config/FlowableConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/config/FlowableConfig.java @@ -1,6 +1,6 @@ package com.qqchen.deploy.backend.workflow.config; -import com.qqchen.deploy.backend.workflow.listener.FlowableJobEventListener; +import com.qqchen.deploy.backend.workflow.listener.FlowableEventDispatcher; import jakarta.annotation.Resource; import org.flowable.spring.SpringProcessEngineConfiguration; import org.flowable.spring.boot.EngineConfigurationConfigurer; @@ -14,9 +14,8 @@ import java.util.Arrays; @Configuration public class FlowableConfig implements EngineConfigurationConfigurer { - @Resource - private FlowableJobEventListener flowableJobEventListener; + private FlowableEventDispatcher flowableEventDispatcher; @Override public void configure(SpringProcessEngineConfiguration engineConfiguration) { @@ -46,6 +45,6 @@ public class FlowableConfig implements EngineConfigurationConfigurer eventHandlers; + + @Override + public void onEvent(FlowableEvent event) { + String eventType = event.getType().name(); + log.info("Dispatching Flowable event: {}, event class: {}", eventType, event.getClass().getName()); + + for (IFlowableEventHandler handler : eventHandlers) { + if (handler.canHandle(eventType)) { + try { + handler.handle(event); + } catch (Exception e) { + log.error("Error handling event {} by handler {}: {}", + eventType, handler.getClass().getSimpleName(), e.getMessage(), e); + } + } + } + } + + @Override + public boolean isFailOnException() { + return false; + } + + @Override + public boolean isFireOnTransactionLifecycleEvent() { + return true; + } + + @Override + public String getOnTransaction() { + return "committed"; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java index 93fa2615..58432aa2 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/FlowableJobEventListener.java @@ -4,22 +4,20 @@ import cn.hutool.core.date.DateUtil; import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; -import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; import org.flowable.common.engine.api.delegate.event.FlowableEvent; import org.flowable.common.engine.api.delegate.event.FlowableEventListener; import org.flowable.common.engine.api.delegate.event.FlowableEventType; import org.flowable.engine.HistoryService; +import org.flowable.engine.delegate.event.FlowableProcessEngineEvent; import org.flowable.engine.history.HistoricProcessInstance; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; -@Component +//@Component @Slf4j public class FlowableJobEventListener implements FlowableEventListener { @@ -35,20 +33,21 @@ public class FlowableJobEventListener implements FlowableEventListener { FlowableEventType eventType = event.getType(); log.info("Received Flowable event: {}, event class: {}", eventType, event.getClass().getName()); // 只处理实体事件 - if (!(event instanceof FlowableEngineEntityEvent entity)) { - return; - } +// if (!(event instanceof FlowableEngineEntityEvent entity)) { +// return; +// } - String processInstanceId = entity.getProcessInstanceId(); +// String processInstanceId = entity.getProcessInstanceId(); if (isProcessLevelEvent(eventType.name())) { + FlowableProcessEngineEvent flowableEntity = (FlowableProcessEngineEvent) event; WorkflowInstanceStatusEnums status = convertProcessLevelEventToStatus(eventType.name()); - log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, processInstanceId); + log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, null); HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() - .processInstanceId(processInstanceId) + .processInstanceId(flowableEntity.getProcessInstanceId()) .singleResult(); publisher.publishEvent(WorkflowInstanceStatusChangeEvent.builder() - .processInstanceId(processInstanceId) + .processInstanceId(flowableEntity.getProcessInstanceId()) .status(status) .endTime(DateUtil.toLocalDateTime(historicProcessInstance.getEndTime())) .build() @@ -56,35 +55,35 @@ public class FlowableJobEventListener implements FlowableEventListener { } if (isTaskInstanceEvent(eventType.name())) { - WorkflowNodeInstanceStatusEnums status = convertJobEventToStatus(eventType.name()); - // 获取Job信息 - String executionId = entity.getExecutionId(); - org.flowable.job.api.Job job = (org.flowable.job.service.impl.persistence.entity.JobEntityImpl) entity.getEntity(); - String activityId = job.getElementId(); - String activityName = job.getElementName(); - String activityType = job.getJobHandlerType(); - - LocalDateTime startTime = job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null; - LocalDateTime endTime = null; - if (status == WorkflowNodeInstanceStatusEnums.COMPLETED || - status == WorkflowNodeInstanceStatusEnums.FAILED) { - endTime = LocalDateTime.now(); // 对于完成或失败状态,使用当前时间作为结束时间 - } - - publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() - .processInstanceId(processInstanceId) - .executionId(executionId) - .nodeId(activityId) - .nodeName(activityName) - .nodeType(activityType) - .status(status) - .startTime(startTime) - .endTime(endTime) - .build() - ); - - log.info("Job event received: {} -> {}, processInstanceId: {}, nodeId: {}, nodeName: {}", - eventType, status, processInstanceId, activityId, activityName); +// WorkflowNodeInstanceStatusEnums status = convertJobEventToStatus(eventType.name()); +// // 获取Job信息 +// String executionId = entity.getExecutionId(); +// org.flowable.job.api.Job job = (org.flowable.job.service.impl.persistence.entity.JobEntityImpl) entity.getEntity(); +// String activityId = job.getElementId(); +// String activityName = job.getElementName(); +// String activityType = job.getJobHandlerType(); +// +// LocalDateTime startTime = job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null; +// LocalDateTime endTime = null; +// if (status == WorkflowNodeInstanceStatusEnums.COMPLETED || +// status == WorkflowNodeInstanceStatusEnums.FAILED) { +// endTime = LocalDateTime.now(); // 对于完成或失败状态,使用当前时间作为结束时间 +// } +// +// publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() +// .processInstanceId(processInstanceId) +// .executionId(executionId) +// .nodeId(activityId) +// .nodeName(activityName) +// .nodeType(activityType) +// .status(status) +// .startTime(startTime) +// .endTime(endTime) +// .build() +// ); +// +// log.info("Job event received: {} -> {}, processInstanceId: {}, nodeId: {}, nodeName: {}", +// eventType, status, processInstanceId, activityId, activityName); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java new file mode 100644 index 00000000..4cce1ffc --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java @@ -0,0 +1,61 @@ +package com.qqchen.deploy.backend.workflow.listener.handler; + +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.engine.delegate.event.FlowableActivityEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; + +import jakarta.annotation.Resource; +import java.time.LocalDateTime; + +@Slf4j +@Component +public class ActivityEventHandler implements IFlowableEventHandler { + + @Resource + private ApplicationEventPublisher publisher; + + @Override + public boolean canHandle(String eventType) { + return eventType.startsWith("ACTIVITY_"); + } + + @Override + public void handle(FlowableEvent event) { + String eventType = event.getType().name(); + log.info("Processing activity event: {}", eventType); + + if (!(event instanceof FlowableActivityEvent activityEvent)) { + return; + } + + WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType); + + if (status != null) { + publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() + .processInstanceId(activityEvent.getProcessInstanceId()) + .executionId(activityEvent.getExecutionId()) + .nodeId(activityEvent.getActivityId()) + .nodeName(activityEvent.getActivityName()) + .nodeType(activityEvent.getActivityType()) + .status(status) + .startTime(status == WorkflowNodeInstanceStatusEnums.RUNNING ? LocalDateTime.now() : null) + .endTime(status.isFinalState() ? LocalDateTime.now() : null) + .build() + ); + } + } + + private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) { + return switch (eventType) { + case "ACTIVITY_STARTED" -> WorkflowNodeInstanceStatusEnums.RUNNING; + case "ACTIVITY_COMPLETED" -> WorkflowNodeInstanceStatusEnums.COMPLETED; + case "ACTIVITY_CANCELLED" -> WorkflowNodeInstanceStatusEnums.TERMINATED; + case "ACTIVITY_ERROR_RECEIVED" -> WorkflowNodeInstanceStatusEnums.FAILED; + default -> null; + }; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/IFlowableEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/IFlowableEventHandler.java new file mode 100644 index 00000000..0a400bc0 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/IFlowableEventHandler.java @@ -0,0 +1,24 @@ +package com.qqchen.deploy.backend.workflow.listener.handler; + +import org.flowable.common.engine.api.delegate.event.FlowableEvent; + +/** + * Flowable事件处理器接口 + */ +public interface IFlowableEventHandler { + + /** + * 判断是否可以处理该事件 + * + * @param eventType 事件类型名称 + * @return 是否可以处理 + */ + boolean canHandle(String eventType); + + /** + * 处理事件 + * + * @param event Flowable事件 + */ + void handle(FlowableEvent event); +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java new file mode 100644 index 00000000..088f3672 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java @@ -0,0 +1,65 @@ +package com.qqchen.deploy.backend.workflow.listener.handler; + +import cn.hutool.core.date.DateUtil; +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.job.service.impl.persistence.entity.JobEntityImpl; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; + +import jakarta.annotation.Resource; +import java.time.LocalDateTime; + +@Slf4j +@Component +public class JobEventHandler implements IFlowableEventHandler { + + @Resource + private ApplicationEventPublisher publisher; + + @Override + public boolean canHandle(String eventType) { + return eventType.startsWith("JOB_EXECUTION_"); + } + + @Override + public void handle(FlowableEvent event) { + String eventType = event.getType().name(); + log.info("Processing job event: {}", eventType); + + if (!(event instanceof FlowableEngineEntityEvent entityEvent)) { + return; + } + + JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity(); + WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType); + + if (status != null) { + publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() + .processInstanceId(job.getProcessInstanceId()) + .executionId(job.getExecutionId()) + .nodeId(job.getElementId()) + .nodeName(job.getElementName()) + .nodeType(job.getJobHandlerType()) + .status(status) + .startTime(job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null) + .endTime(status.isFinalState() ? LocalDateTime.now() : null) + .build() + ); + } + } + + private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) { + return switch (eventType) { + case "JOB_EXECUTION_SUCCESS" -> WorkflowNodeInstanceStatusEnums.COMPLETED; + case "JOB_EXECUTION_START" -> WorkflowNodeInstanceStatusEnums.RUNNING; + case "JOB_EXECUTION_FAILURE" -> WorkflowNodeInstanceStatusEnums.FAILED; + case "JOB_EXECUTION_REJECTED" -> WorkflowNodeInstanceStatusEnums.FAILED; + case "JOB_EXECUTION_NOJOB_FOUND" -> WorkflowNodeInstanceStatusEnums.FAILED; + default -> null; + }; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ProcessEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ProcessEventHandler.java new file mode 100644 index 00000000..a23ff7ab --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ProcessEventHandler.java @@ -0,0 +1,69 @@ +package com.qqchen.deploy.backend.workflow.listener.handler; + +import cn.hutool.core.date.DateUtil; +import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; +import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.engine.ProcessEngine; +import org.flowable.engine.delegate.event.FlowableProcessEngineEvent; +import org.flowable.engine.history.HistoricProcessInstance; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import jakarta.annotation.Resource; + +@Slf4j +@Component +public class ProcessEventHandler implements IFlowableEventHandler { + + @Resource + private ApplicationEventPublisher publisher; + + @Resource + @Lazy + private ProcessEngine processEngine; + + @Override + public boolean canHandle(String eventType) { + return eventType.startsWith("PROCESS_"); + } + + @Override + public void handle(FlowableEvent event) { + String eventType = event.getType().name(); + log.info("Processing event: {}", eventType); + + FlowableProcessEngineEvent processEvent = (FlowableProcessEngineEvent) event; + WorkflowInstanceStatusEnums status = convertToWorkflowStatus(eventType); + + if (status != null) { + HistoricProcessInstance historicProcessInstance = processEngine.getHistoryService() + .createHistoricProcessInstanceQuery() + .processInstanceId(processEvent.getProcessInstanceId()) + .singleResult(); + + publisher.publishEvent(WorkflowInstanceStatusChangeEvent.builder() + .processInstanceId(processEvent.getProcessInstanceId()) + .status(status) + .endTime(historicProcessInstance != null && historicProcessInstance.getEndTime() != null ? + DateUtil.toLocalDateTime(historicProcessInstance.getEndTime()) : null) + .build() + ); + } + } + + private WorkflowInstanceStatusEnums convertToWorkflowStatus(String eventType) { + return switch (eventType) { + case "PROCESS_CREATED" -> WorkflowInstanceStatusEnums.CREATED; + case "PROCESS_STARTED" -> WorkflowInstanceStatusEnums.RUNNING; + case "PROCESS_COMPLETED" -> WorkflowInstanceStatusEnums.COMPLETED; + case "PROCESS_COMPLETED_WITH_ERROR_END_EVENT" -> WorkflowInstanceStatusEnums.FAILED; + case "PROCESS_CANCELLED" -> WorkflowInstanceStatusEnums.TERMINATED; + case "PROCESS_SUSPENDED" -> WorkflowInstanceStatusEnums.SUSPENDED; + case "PROCESS_RESUMED" -> WorkflowInstanceStatusEnums.RUNNING; + default -> null; + }; + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java index 67f20fd9..999e7d66 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowInstanceService.java @@ -3,6 +3,7 @@ package com.qqchen.deploy.backend.workflow.service; import com.qqchen.deploy.backend.framework.service.IBaseService; import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; +import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; @@ -56,5 +57,5 @@ public interface IWorkflowInstanceService extends IBaseService variables); - WorkflowInstanceDTO startWorkflow(String processKey, String businessKey); + WorkflowInstanceDTO startWorkflow(WorkflowInstanceStartRequest request); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java index 8fea1b65..82c710fa 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowInstanceServiceImpl.java @@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.workflow.converter.WorkflowInstanceConverter; import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO; +import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest; import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition; import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance; import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums; @@ -31,7 +32,7 @@ import java.util.stream.Collectors; @Slf4j @Service -public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements IWorkflowInstanceService { +public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements IWorkflowInstanceService { @Resource private IWorkflowDefinitionRepository workflowDefinitionRepository; @@ -148,16 +149,17 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl new RuntimeException("Workflow definition process key not found: " + processKey)); + WorkflowDefinition workflowDefinition = workflowDefinitionRepository.findByKey(request.getProcessKey()) + .orElseThrow(() -> new RuntimeException("Workflow definition process key not found: " + request.getProcessKey())); ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder() - .processDefinitionKey(processKey) - .businessKey(businessKey) + .processDefinitionKey(request.getProcessKey()) + .businessKey(request.getBusinessKey()) .startAsync(); // 异步启动,会自动执行 shell 任务 - return createWorkflowInstance(workflowDefinition.getId(), businessKey, processInstance); + return createWorkflowInstance(workflowDefinition.getId(), request.getBusinessKey(), processInstance); } catch (Exception e) { - log.error("Failed to create workflow: {}", processKey, e); + log.error("Failed to create workflow: {}", request.getProcessKey(), e); throw new RuntimeException("Failed to create workflow", e); } }