diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java index e3c6e79a..52d89988 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java @@ -3,11 +3,13 @@ package com.qqchen.deploy.backend.workflow.delegate; import com.qqchen.deploy.backend.workflow.event.ShellLogEvent; import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums; import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.FlowableException; import org.flowable.engine.RuntimeService; import org.flowable.engine.ManagementService; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.job.api.Job; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import jakarta.annotation.Resource; @@ -176,7 +178,14 @@ public class ShellTaskDelegate implements JavaDelegate { } catch (Exception e) { log.error("Shell脚本执行失败", e); - handleFailure(execution, "Shell脚本执行失败: " + e.getMessage()); +// Job job = managementService.createJobQuery() +// .processInstanceId(execution.getProcessInstanceId()) +// .singleResult(); +// if (job != null) { +// // 设置重试次数为 0,这样就不会进入死信队列 +// managementService.setJobRetries(job.getId(), 0); +// } + throw new FlowableException("脚本执行失败。"); } } @@ -210,11 +219,23 @@ public class ShellTaskDelegate implements JavaDelegate { private void handleFailure(DelegateExecution execution, String errorMessage) { String processInstanceId = execution.getProcessInstanceId(); - try { - // 直接终止流程实例 - runtimeService.deleteProcessInstance(processInstanceId, errorMessage); - } catch (Exception e) { - log.error("处理Shell任务失败时出现错误,流程实例: {}", processInstanceId, e); + + // 获取当前 Job + Job job = managementService.createJobQuery() + .processInstanceId(execution.getProcessInstanceId()) + .singleResult(); + if (job != null) { + // 设置重试次数为 0,这样就不会进入死信队列 + managementService.setJobRetries(job.getId(), 0); } + throw new FlowableException(errorMessage); + +// try { + // 直接终止流程实例 +// runtimeService.deleteProcessInstance(processInstanceId, errorMessage); + +// } catch (Exception e) { +// log.error("处理Shell任务失败时出现错误,流程实例: {}", processInstanceId, e); +// } } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java index 5958c946..f3319a10 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowInstance.java @@ -57,7 +57,7 @@ public class WorkflowInstance extends Entity { /** * 开始时间 */ - @Column(name = "start_time", nullable = false) + @Column(name = "start_time") private LocalDateTime startTime; /** diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java index 6139f98c..b8706eb9 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/entity/WorkflowNodeInstance.java @@ -64,7 +64,7 @@ public class WorkflowNodeInstance extends Entity { /** * 开始时间 */ - @Column(name = "start_time", nullable = false) + @Column(name = "start_time") private LocalDateTime startTime; /** diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowInstanceStatusChangeListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowInstanceStatusChangeListener.java index 04a70afa..f8b009bf 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowInstanceStatusChangeListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowInstanceStatusChangeListener.java @@ -19,7 +19,7 @@ public class WorkflowInstanceStatusChangeListener { @EventListener @Transactional(propagation = Propagation.REQUIRES_NEW) public void handleWorkflowStatusChange(WorkflowInstanceStatusChangeEvent event) { - log.info("Handling workflow status change event: {}", event); +// log.info("Handling workflow status change event: {}", event); workflowInstanceService.updateInstanceStatus(event.getProcessInstanceId(), event.getStatus(), event.getEndTime()); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java index 28b04ce7..58e2a3d5 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/WorkflowNodeInstanceStatusChangeListener.java @@ -19,7 +19,7 @@ public class WorkflowNodeInstanceStatusChangeListener { @EventListener @Transactional(propagation = Propagation.REQUIRES_NEW) public void handleWorkflowStatusChange(WorkflowNodeInstanceStatusChangeEvent event) { - log.info("Handling workflow node instance status change event: {}", event); - workflowNodeInstanceService.saveWorkflowNodeInstance(event); +// log.info("Handling workflow node instance status change event: {}", event); + workflowNodeInstanceService.saveOrUpdateWorkflowNodeInstance(event); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java index 4cce1ffc..89205780 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/ActivityEventHandler.java @@ -27,7 +27,7 @@ public class ActivityEventHandler implements IFlowableEventHandler { public void handle(FlowableEvent event) { String eventType = event.getType().name(); log.info("Processing activity event: {}", eventType); - + if (!(event instanceof FlowableActivityEvent activityEvent)) { return; } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java index 088f3672..e94fbdfb 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/handler/JobEventHandler.java @@ -33,6 +33,8 @@ public class JobEventHandler implements IFlowableEventHandler { if (!(event instanceof FlowableEngineEntityEvent entityEvent)) { return; } + + log.info("Processing job event: {}, jobType: {}", eventType, entityEvent.getType()); JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity(); WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java index d5dbdd64..5c4c7f97 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/repository/IWorkflowNodeInstanceRepository.java @@ -12,4 +12,6 @@ public interface IWorkflowNodeInstanceRepository extends IBaseRepository findByProcessInstanceIdAndExecutionId(String processInstanceId, String executionId); List findByProcessInstanceId(String processInstanceId); + + WorkflowNodeInstance findByNodeId(String nodeId); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java index 0654cbf6..bde0e24a 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowNodeInstanceService.java @@ -4,9 +4,7 @@ import com.qqchen.deploy.backend.framework.service.IBaseService; import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; -import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent; import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; -import com.qqchen.deploy.backend.workflow.query.WorkflowNodeInstanceQuery; import java.util.List; @@ -29,5 +27,5 @@ public interface IWorkflowNodeInstanceService extends IBaseService getNodesByProcessInstanceId(String processInstanceId); - void saveWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event); + void saveOrUpdateWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java index ecdc2578..cbad113d 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/impl/WorkflowNodeInstanceServiceImpl.java @@ -63,12 +63,19 @@ public class WorkflowNodeInstanceServiceImpl extends BaseServiceImpl new RuntimeException("Node instance not found for processInstanceId: " + nodeInstance.getProcessInstanceId())); - nodeInstance.setWorkflowInstanceId(workflowInstance.getId()); - nodeInstance.setWorkflowDefinitionId(workflowInstance.getWorkflowDefinitionId()); - super.repository.save(nodeInstance); + WorkflowNodeInstance workflowNodeInstance = workflowNodeInstanceRepository.findByNodeId(event.getNodeId()); + if (workflowNodeInstance == null) { + nodeInstance.setWorkflowInstanceId(workflowInstance.getId()); + nodeInstance.setWorkflowDefinitionId(workflowInstance.getWorkflowDefinitionId()); + super.repository.save(nodeInstance); + return; + } + workflowNodeInstance.setEndTime(event.getEndTime()); + workflowNodeInstance.setStatus(event.getStatus()); + super.repository.save(workflowNodeInstance); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java index 181c8f1e..b7edce33 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java @@ -119,25 +119,29 @@ public class BpmnConverter { serviceTask.setImplementation(delegate); serviceTask.setAsynchronous(true); // 设置为异步执行 - // 添加字段注入和扩展属性 - List fieldExtensions = new ArrayList<>(); + // 设置失败时不重试 + ExtensionElement failedJobRetryTimeCycle = new ExtensionElement(); + failedJobRetryTimeCycle.setName("failedJobRetryTimeCycle"); + failedJobRetryTimeCycle.setNamespace("http://flowable.org/bpmn"); + failedJobRetryTimeCycle.setNamespacePrefix("flowable"); + failedJobRetryTimeCycle.setElementText("R0/PT1H"); // 设置为0次重试 + + // 添加错误处理配置 + ExtensionElement errorHandling = new ExtensionElement(); + errorHandling.setName("failOnError"); + errorHandling.setNamespace("http://flowable.org/bpmn"); + errorHandling.setNamespacePrefix("flowable"); + errorHandling.setElementText("true"); + Map> extensionElements = new HashMap<>(); + List retryElements = new ArrayList<>(); + retryElements.add(failedJobRetryTimeCycle); + extensionElements.put("failedJobRetryTimeCycle", retryElements); - // 遍历所有配置项 - node.getConfig().forEach((key, value) -> { - if (value != null && !"delegate".equals(key)) { - // 添加为字段注入 - FieldExtension fieldExtension = new FieldExtension(); - fieldExtension.setFieldName(key); - fieldExtension.setStringValue(String.valueOf(value)); - fieldExtensions.add(fieldExtension); - - // 同时也添加为扩展属性(用于兼容性和可视化) - addExtensionElement(extensionElements, key, String.valueOf(value)); - } - }); + List errorElements = new ArrayList<>(); + errorElements.add(errorHandling); + extensionElements.put("failOnError", errorElements); - serviceTask.setFieldExtensions(fieldExtensions); serviceTask.setExtensionElements(extensionElements); } // 后续可以添加其他类型的节点配置 diff --git a/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql b/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql index aa80efb6..a0fb6b0e 100644 --- a/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql +++ b/backend/src/main/resources/db/migration/V1.0.0__init_schema.sql @@ -475,7 +475,7 @@ CREATE TABLE workflow_instance business_key VARCHAR(64) NULL COMMENT '业务标识', status VARCHAR(32) NOT NULL COMMENT '实例状态', variables TEXT NULL COMMENT '流程变量(JSON)', - start_time DATETIME(6) NOT NULL COMMENT '开始时间', + start_time DATETIME(6) NULL COMMENT '开始时间', end_time DATETIME(6) NULL COMMENT '结束时间' -- CONSTRAINT FK_workflow_instance_definition FOREIGN KEY (process_definition_id) REFERENCES workflow_definition(id) @@ -500,7 +500,7 @@ CREATE TABLE workflow_node_instance node_name VARCHAR(100) NOT NULL COMMENT '节点名称', node_type VARCHAR(32) NOT NULL COMMENT '节点类型', status VARCHAR(32) NOT NULL COMMENT '节点状态', - start_time DATETIME(6) NOT NULL COMMENT '开始时间', + start_time DATETIME(6) NULL COMMENT '开始时间', end_time DATETIME(6) NULL COMMENT '结束时间', variables TEXT NULL COMMENT '节点变量(JSON)', error_message TEXT NULL COMMENT '错误信息',