大声道撒旦

This commit is contained in:
dengqichen 2024-12-25 17:09:57 +08:00
parent 4b2e75733d
commit daf582544f
8 changed files with 206 additions and 75 deletions

View File

@ -2,11 +2,13 @@ package com.qqchen.deploy.backend.workflow.config;
import com.qqchen.deploy.backend.workflow.listener.event.FlowableEventDispatcher; import com.qqchen.deploy.backend.workflow.listener.event.FlowableEventDispatcher;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.spring.SpringProcessEngineConfiguration; import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.EngineConfigurationConfigurer; import org.flowable.spring.boot.EngineConfigurationConfigurer;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
/** /**
* Flowable配置类 * Flowable配置类
@ -46,5 +48,13 @@ public class FlowableConfig implements EngineConfigurationConfigurer<SpringProce
// 添加事件监听器 // 添加事件监听器
engineConfiguration.setEventListeners(Collections.singletonList(flowableEventDispatcher)); engineConfiguration.setEventListeners(Collections.singletonList(flowableEventDispatcher));
engineConfiguration.setTypedEventListeners(
Map.of(
FlowableEngineEventType.PROCESS_CANCELLED.name(), Collections.singletonList(flowableEventDispatcher),
FlowableEngineEventType.PROCESS_COMPLETED.name(), Collections.singletonList(flowableEventDispatcher),
FlowableEngineEventType.PROCESS_COMPLETED_WITH_ERROR_END_EVENT.name(), Collections.singletonList(flowableEventDispatcher),
FlowableEngineEventType.PROCESS_COMPLETED_WITH_TERMINATE_END_EVENT.name(), Collections.singletonList(flowableEventDispatcher)
)
);
} }
} }

View File

@ -0,0 +1,15 @@
package com.qqchen.deploy.backend.workflow.event;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class TerminationProcessInstanceListenerEvent {
private String jobId;
private String processInstanceId;
private String reason;
}

View File

@ -0,0 +1,58 @@
package com.qqchen.deploy.backend.workflow.listener;
import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.TerminationProcessInstanceListenerEvent;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowNodeInstanceRepository;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.Gateway;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.engine.ManagementService;
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.job.api.Job;
import org.flowable.task.api.Task;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
@Slf4j
@Component
public class TerminationProcessInstanceListener {
@Resource
private RuntimeService runtimeService;
@Resource
private IWorkflowNodeInstanceRepository workflowNodeInstanceRepository;
@Async
@EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handler(TerminationProcessInstanceListenerEvent event) {
try {
// 先删除死信任务
// managementService.deleteDeadLetterJob(event.getJobId());
WorkflowNodeInstance workflowNodeInstance = workflowNodeInstanceRepository.findTop1ByProcessInstanceIdOrderByStartTimeDesc(event.getProcessInstanceId());
if (workflowNodeInstance != null) {
workflowNodeInstance.setStatus(WorkflowNodeInstanceStatusEnums.FAILED);
workflowNodeInstance.setEndTime(LocalDateTime.now());
workflowNodeInstanceRepository.save(workflowNodeInstance);
}
runtimeService.deleteProcessInstance(event.getProcessInstanceId(), event.getReason());
log.info("Process instance {} deleted due to: {}", event.getProcessInstanceId(), event.getReason());
} catch (Exception e) {
log.error("Failed to delete process instance: " + event.getProcessInstanceId(), e);
}
}
}

View File

@ -1,52 +1,74 @@
package com.qqchen.deploy.backend.workflow.listener.event.handler; //package com.qqchen.deploy.backend.workflow.listener.event.handler;
//
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; //import com.qqchen.deploy.backend.workflow.event.ProcessInstanceDeleteEvent;
import jakarta.annotation.Resource; //import jakarta.annotation.Resource;
import jakarta.transaction.Transactional; //import lombok.extern.slf4j.Slf4j;
import lombok.extern.slf4j.Slf4j; //import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; //import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; //import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent; //import org.flowable.engine.ManagementService;
import org.flowable.common.engine.api.delegate.event.FlowableEvent; //import org.flowable.engine.RuntimeService;
import org.flowable.engine.RuntimeService; //import org.flowable.engine.runtime.Execution;
import org.flowable.job.api.Job; //import org.flowable.job.api.Job;
import org.flowable.job.service.impl.persistence.entity.JobEntityImpl; //import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisher; //import org.springframework.stereotype.Component;
import org.springframework.stereotype.Component; //import org.flowable.engine.HistoryService;
import org.springframework.transaction.support.TransactionTemplate; //
//@Slf4j
@Slf4j //@Component
@Component //public class DeadLetterJobEventHandler implements IFlowableEventHandler {
public class DeadLetterJobEventHandler implements IFlowableEventHandler { //
// @Resource
@Resource // private RuntimeService runtimeService;
private RuntimeService runtimeService; //
// @Resource
// private ManagementService managementService;
@Override //
public boolean canHandle(String eventType) { // @Resource
return eventType.startsWith("JOB_MOVED_TO_DEADLETTER"); // private ApplicationEventPublisher eventPublisher;
} //
// @Resource
@Override // private HistoryService historyService;
public void handle(FlowableEvent event) { //
if (event instanceof FlowableEntityEvent && event.getType() == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) { // @Override
FlowableEntityEvent entityEvent = (FlowableEntityEvent) event; // public boolean canHandle(String eventType) {
Job job = (Job) entityEvent.getEntity(); // return eventType.startsWith("JOB_EXECUTION_FAILURE");
String processInstanceId = job.getProcessInstanceId(); // }
//
try { // @Override
// 直接在 Flowable 的事务中执行 // public void handle(FlowableEvent event) {
runtimeService.deleteProcessInstance( // if (event instanceof FlowableEntityEvent && event.getType() == FlowableEngineEventType.JOB_EXECUTION_FAILURE) {
processInstanceId, // FlowableEntityEvent entityEvent = (FlowableEntityEvent) event;
"Gateway condition evaluation failed: " + job.getExceptionMessage() // Job job = (Job) entityEvent.getEntity();
); //
log.info("Process instance {} terminated due to dead letter job", processInstanceId); // // 获取执行ID和当前活动ID
} catch (Exception e) { // String executionId = job.getExecutionId();
log.error("Failed to terminate process instance: " + processInstanceId, e); // Execution execution = runtimeService.createExecutionQuery()
// 这里我们不抛出异常因为我们希望流程能够正常结束 // .executionId(executionId)
} // .singleResult();
} //
} // // 从历史服务中获取最后一个活动节点
// String lastActivityId = historyService.createHistoricActivityInstanceQuery()
} // .executionId(executionId)
// .orderByHistoricActivityInstanceEndTime()
// .desc()
// .list()
// .stream()
// .findFirst()
// .map(activity -> activity.getActivityId())
// .orElse(null);
//
// log.info("Job full details - executionId: {}, elementId: {}, jobHandlerType: {}, jobHandlerConfiguration: {}, currentActivityId: {}, lastActivityId: {}",
// executionId,
// job.getElementId(),
// job.getJobHandlerType(),
// job.getJobHandlerConfiguration(),
// execution != null ? execution.getActivityId() : "null",
// lastActivityId);
//
// String processInstanceId = job.getProcessInstanceId();
// String errorMessage = job.getExceptionMessage();
// eventPublisher.publishEvent(new ProcessInstanceDeleteEvent(job.getId(), processInstanceId, errorMessage));
// }
// }
//}

View File

@ -1,16 +1,24 @@
package com.qqchen.deploy.backend.workflow.listener.event.handler; package com.qqchen.deploy.backend.workflow.listener.event.handler;
import cn.hutool.json.JSONUtil;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.TerminationProcessInstanceListenerEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent; import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.impl.event.FlowableEntityExceptionEventImpl;
import org.flowable.engine.ManagementService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.runtime.Execution;
import org.flowable.job.api.Job;
import org.flowable.job.service.impl.persistence.entity.JobEntityImpl; import org.flowable.job.service.impl.persistence.entity.JobEntityImpl;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@Component @Component
public class JobEventHandler implements IFlowableEventHandler { public class JobEventHandler implements IFlowableEventHandler {
@ -18,6 +26,12 @@ public class JobEventHandler implements IFlowableEventHandler {
@Resource @Resource
private ApplicationEventPublisher publisher; private ApplicationEventPublisher publisher;
@Resource
private RuntimeService runtimeService;
@Resource
private ManagementService managementService;
@Override @Override
public boolean canHandle(String eventType) { public boolean canHandle(String eventType) {
return eventType.startsWith("JOB_EXECUTION_"); return eventType.startsWith("JOB_EXECUTION_");
@ -29,11 +43,14 @@ public class JobEventHandler implements IFlowableEventHandler {
if (!(event instanceof FlowableEngineEntityEvent entityEvent)) { if (!(event instanceof FlowableEngineEntityEvent entityEvent)) {
return; return;
} }
//log.info("Processing job event: {}, jobType: {}", eventType, entityEvent.getType());
JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity(); JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity();
//log.info("Processing job event: {}", JSONUtil.toJsonStr(job));
WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType); WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType);
if (status != null) { switch (status) {
case FAILED -> {
publisher.publishEvent(new TerminationProcessInstanceListenerEvent(job.getId(), job.getProcessInstanceId(), ((FlowableEntityExceptionEventImpl) event).getCause().getMessage()));
}
}
// if (status != null) {
// publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder() // publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder()
// .processInstanceId(job.getProcessInstanceId()) // .processInstanceId(job.getProcessInstanceId())
// .executionId(job.getExecutionId()) // .executionId(job.getExecutionId())
@ -45,17 +62,17 @@ public class JobEventHandler implements IFlowableEventHandler {
// .endTime(status.isFinalState() ? LocalDateTime.now() : null) // .endTime(status.isFinalState() ? LocalDateTime.now() : null)
// .build() // .build()
// ); // );
} // }
} }
private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) { private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) {
return switch (eventType) { return switch (eventType) {
case "JOB_EXECUTION_SUCCESS" -> WorkflowNodeInstanceStatusEnums.RUNNING; // case "JOB_EXECUTION_SUCCESS" -> WorkflowNodeInstanceStatusEnums.RUNNING;
// case "JOB_EXECUTION_START" -> WorkflowNodeInstanceStatusEnums.RUNNING; // case "JOB_EXECUTION_START" -> WorkflowNodeInstanceStatusEnums.RUNNING;
// case "JOB_EXECUTION_FAILURE" -> WorkflowNodeInstanceStatusEnums.FAILED; case "JOB_EXECUTION_FAILURE" -> WorkflowNodeInstanceStatusEnums.FAILED;
// case "JOB_EXECUTION_REJECTED" -> WorkflowNodeInstanceStatusEnums.FAILED; // case "JOB_EXECUTION_REJECTED" -> WorkflowNodeInstanceStatusEnums.FAILED;
// case "JOB_EXECUTION_NOJOB_FOUND" -> WorkflowNodeInstanceStatusEnums.FAILED; // case "JOB_EXECUTION_NOJOB_FOUND" -> WorkflowNodeInstanceStatusEnums.FAILED;
default -> null; default -> null;
}; };
} }
} }

View File

@ -5,6 +5,7 @@ import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent; import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.HistoryService;
import org.flowable.engine.ProcessEngine; import org.flowable.engine.ProcessEngine;
import org.flowable.engine.delegate.event.FlowableProcessEngineEvent; import org.flowable.engine.delegate.event.FlowableProcessEngineEvent;
import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.history.HistoricProcessInstance;
@ -25,6 +26,9 @@ public class ProcessEventHandler implements IFlowableEventHandler {
@Lazy @Lazy
private ProcessEngine processEngine; private ProcessEngine processEngine;
@Resource
private HistoryService historyService;
@Override @Override
public boolean canHandle(String eventType) { public boolean canHandle(String eventType) {
return eventType.startsWith("PROCESS_"); return eventType.startsWith("PROCESS_");
@ -37,6 +41,7 @@ public class ProcessEventHandler implements IFlowableEventHandler {
WorkflowInstanceStatusEnums status = convertToWorkflowStatus(eventType); WorkflowInstanceStatusEnums status = convertToWorkflowStatus(eventType);
if (status != null) { if (status != null) {
HistoricProcessInstance historicProcessInstance = processEngine.getHistoryService() HistoricProcessInstance historicProcessInstance = processEngine.getHistoryService()
.createHistoricProcessInstanceQuery() .createHistoricProcessInstanceQuery()
.processInstanceId(processEvent.getProcessInstanceId()) .processInstanceId(processEvent.getProcessInstanceId())

View File

@ -18,4 +18,6 @@ public interface IWorkflowNodeInstanceRepository extends IBaseRepository<Workflo
WorkflowNodeInstance findByProcessInstanceIdAndNodeId(String processInstanceId, String nodeId); WorkflowNodeInstance findByProcessInstanceIdAndNodeId(String processInstanceId, String nodeId);
List<WorkflowNodeInstance> findByWorkflowInstanceId(Long workflowInstanceId); List<WorkflowNodeInstance> findByWorkflowInstanceId(Long workflowInstanceId);
WorkflowNodeInstance findTop1ByProcessInstanceIdOrderByStartTimeDesc(String processInstanceId);
} }

View File

@ -173,6 +173,9 @@ public class BpmnConverter {
// 步骤2根据节点类型进行特定配置 // 步骤2根据节点类型进行特定配置
if (element instanceof ServiceTask) { if (element instanceof ServiceTask) {
configureServiceTask((ServiceTask) element, node, process, extensionElements); configureServiceTask((ServiceTask) element, node, process, extensionElements);
} else if (element instanceof Gateway) {
// 为网关节点只添加监听器不添加边界事件
element.setExtensionElements(extensionElements);
} else if (element instanceof StartEvent || element instanceof EndEvent) { } else if (element instanceof StartEvent || element instanceof EndEvent) {
// 为开始节点和结束节点设置监听器 // 为开始节点和结束节点设置监听器
element.setExtensionElements(extensionElements); element.setExtensionElements(extensionElements);
@ -347,15 +350,15 @@ public class BpmnConverter {
} }
/** /**
* 服务任务添加错误边界事件和错误结束事件 * 活动节点添加错误边界事件和错误结束事件
* 服务任务执行失败时会触发错误边界事件并流转到错误结束事件 * 节点执行失败时会触发错误边界事件并流转到错误结束事件
* *
* @param process BPMN流程定义 * @param process BPMN流程定义
* @param serviceTask 需要添加错误处理的服务任务 * @param activity 需要添加错误处理的活动节点
*/ */
private void addErrorBoundaryEventHandler(Process process, ServiceTask serviceTask) { private void addErrorBoundaryEventHandler(Process process, Activity activity) {
BoundaryEvent boundaryEvent = createErrorBoundaryEvent(serviceTask); BoundaryEvent boundaryEvent = createErrorBoundaryEvent(activity);
EndEvent errorEndEvent = createErrorEndEvent(serviceTask); EndEvent errorEndEvent = createErrorEndEvent(activity);
SequenceFlow errorFlow = createErrorSequenceFlow(boundaryEvent, errorEndEvent); SequenceFlow errorFlow = createErrorSequenceFlow(boundaryEvent, errorEndEvent);
process.addFlowElement(boundaryEvent); process.addFlowElement(boundaryEvent);
@ -366,15 +369,15 @@ public class BpmnConverter {
/** /**
* 创建错误边界事件 * 创建错误边界事件
* *
* @param serviceTask 关联的服务任务 * @param activity 关联的活动节点
* @return 配置好的错误边界事件 * @return 配置好的错误边界事件
*/ */
private BoundaryEvent createErrorBoundaryEvent(ServiceTask serviceTask) { private BoundaryEvent createErrorBoundaryEvent(Activity activity) {
BoundaryEvent boundaryEvent = new BoundaryEvent(); BoundaryEvent boundaryEvent = new BoundaryEvent();
boundaryEvent.setId(WorkFlowConstants.BOUNDARY_EVENT_ERROR_PREFIX + serviceTask.getId()); boundaryEvent.setId(WorkFlowConstants.BOUNDARY_EVENT_ERROR_PREFIX + activity.getId());
boundaryEvent.setName("边界事件异常"); boundaryEvent.setName("边界事件异常");
boundaryEvent.setAttachedToRef(serviceTask); boundaryEvent.setAttachedToRef(activity);
boundaryEvent.setAttachedToRefId(serviceTask.getId()); boundaryEvent.setAttachedToRefId(activity.getId());
boundaryEvent.setCancelActivity(true); // 确保取消原有活动 boundaryEvent.setCancelActivity(true); // 确保取消原有活动
// 配置错误事件定义 // 配置错误事件定义
@ -388,12 +391,12 @@ public class BpmnConverter {
/** /**
* 创建错误结束事件 * 创建错误结束事件
* *
* @param serviceTask 关联的服务任务 * @param activity 关联的活动节点
* @return 配置好的错误结束事件 * @return 配置好的错误结束事件
*/ */
private EndEvent createErrorEndEvent(ServiceTask serviceTask) { private EndEvent createErrorEndEvent(Activity activity) {
EndEvent errorEndEvent = new EndEvent(); EndEvent errorEndEvent = new EndEvent();
errorEndEvent.setId(WorkFlowConstants.END_EVENT_ERROR_PREFIX + serviceTask.getId()); errorEndEvent.setId(WorkFlowConstants.END_EVENT_ERROR_PREFIX + activity.getId());
errorEndEvent.setName("结束事件异常"); errorEndEvent.setName("结束事件异常");
// 添加终止定义 // 添加终止定义
TerminateEventDefinition terminateEventDefinition = new TerminateEventDefinition(); TerminateEventDefinition terminateEventDefinition = new TerminateEventDefinition();
@ -438,7 +441,6 @@ public class BpmnConverter {
String gatewayTypeCode = node.getPanelVariables().get("gatewayType").asText(); String gatewayTypeCode = node.getPanelVariables().get("gatewayType").asText();
GatewayTypeEnums gatewayType = GatewayTypeEnums.fromCode(gatewayTypeCode); GatewayTypeEnums gatewayType = GatewayTypeEnums.fromCode(gatewayTypeCode);
// 根据网关类型创建对应的网关
Gateway gateway; Gateway gateway;
switch (gatewayType) { switch (gatewayType) {
case EXCLUSIVE_GATEWAY: case EXCLUSIVE_GATEWAY: