From 4b2e75733df2b5e8df4fdd4f3bffeb94f2a5267a Mon Sep 17 00:00:00 2001 From: dengqichen Date: Wed, 25 Dec 2024 14:18:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/delegate/DeployNodeDelegate.java | 8 +-- .../delegate/NotificationNodeDelegate.java | 64 +++++++++++++++++++ .../NotificationNodeLocalVariables.java | 19 ++++++ .../NotificationNodePanelVariables.java | 21 ++++++ .../workflow/enums/GatewayTypeEnums.java | 6 -- .../backend/workflow/enums/NodeTypeEnums.java | 13 ++++ .../handler/DeadLetterJobEventHandler.java | 52 +++++++++++++++ .../backend/workflow/util/BpmnConverter.java | 62 ++++++++++++++++-- 8 files changed, 231 insertions(+), 14 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/NotificationNodeDelegate.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/localVariables/NotificationNodeLocalVariables.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/panelVariables/NotificationNodePanelVariables.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/event/handler/DeadLetterJobEventHandler.java diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/DeployNodeDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/DeployNodeDelegate.java index 48e22b58..e81f4eb3 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/DeployNodeDelegate.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/DeployNodeDelegate.java @@ -56,10 +56,10 @@ public class DeployNodeDelegate extends BaseNodeDelegate { + + @Resource + private ApplicationEventPublisher eventPublisher; + + @Resource + private IJenkinsServiceIntegration jenkinsServiceIntegration; + + private static final int QUEUE_POLL_INTERVAL = 10; // 10秒 + + private static final int MAX_QUEUE_POLLS = 30; // 最多等待5分钟 + + // 轮询间隔(秒) + private static final int BUILD_POLL_INTERVAL = 10; + + // 最大轮询次数 + private static final int MAX_BUILD_POLLS = 180; // 30分钟超时 + + // 用于存储实时输出的Map + private static final Map outputMap = new ConcurrentHashMap<>(); + + private static final Map errorMap = new ConcurrentHashMap<>(); + + @Override + protected Class getPanelVariablesClass() { + return NotificationNodePanelVariables.class; + } + + @Override + protected Class getLocalVariablesClass() { + return NotificationNodeLocalVariables.class; + } + + @Override + protected void executeInternal(DelegateExecution execution, NotificationNodePanelVariables panelVariables, NotificationNodeLocalVariables localVariables) { + System.out.println(panelVariables.getText()); + } + +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/localVariables/NotificationNodeLocalVariables.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/localVariables/NotificationNodeLocalVariables.java new file mode 100644 index 00000000..44c922ab --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/localVariables/NotificationNodeLocalVariables.java @@ -0,0 +1,19 @@ +package com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables; + +import com.qqchen.deploy.backend.workflow.annotation.SchemaProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class NotificationNodeLocalVariables extends BaseNodeLocalVariables { + + @SchemaProperty( + title = "委派者", + description = "委派者", + defaultValue = "${notificationNodeDelegate}", + required = true + ) + private String delegate; + +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/panelVariables/NotificationNodePanelVariables.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/panelVariables/NotificationNodePanelVariables.java new file mode 100644 index 00000000..3744a62f --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/dto/definition/node/panelVariables/NotificationNodePanelVariables.java @@ -0,0 +1,21 @@ +package com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables; + +import com.qqchen.deploy.backend.workflow.annotation.SchemaProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * 脚本执行器配置 + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class NotificationNodePanelVariables extends BaseNodePanelVariables { + + + @SchemaProperty( + title = "测试输出", + description = "测试输出", + required = true + ) + private String text; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/GatewayTypeEnums.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/GatewayTypeEnums.java index e6a73422..bb12caf5 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/GatewayTypeEnums.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/GatewayTypeEnums.java @@ -10,14 +10,8 @@ public enum GatewayTypeEnums { PARALLEL_GATEWAY("parallelGateway", "并行网关"), INCLUSIVE_GATEWAY("inclusiveGateway", "包容网关"); - /** - * 分类编码 - */ private final String code; - /** - * 分类名称 - */ private final String name; GatewayTypeEnums(String code, String name) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnums.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnums.java index ceabf342..3c5d8d45 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnums.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/enums/NodeTypeEnums.java @@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonValue; import com.qqchen.deploy.backend.workflow.dto.definition.node.fromVariables.DeployNodeFormVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.fromVariables.ScriptNodeFormVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.DeployNodeLocalVariables; +import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.NotificationNodeLocalVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.ScriptNodeLocalVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.DeployNodePanelVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.EndNodePanelVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.GatewayNodePanelVariables; +import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.NotificationNodePanelVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.ScriptNodePanelVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.StartNodePanelVariables; import com.qqchen.deploy.backend.workflow.dto.definition.node.uiVariables.NodeUiVariables; @@ -89,6 +91,17 @@ public enum NodeTypeEnums { BpmnNodeTypeEnums.EXCLUSIVE_GATEWAY, NodeCategoryEnums.GATEWAY, "条件分支控制" + ), + NOTIFICATION_NODE( + "NOTIFICATION_NODE", + "通知节点", + NotificationNodeLocalVariables.class, + NotificationNodePanelVariables.class, + null, + NodeUiVariables.class, + BpmnNodeTypeEnums.SERVICE_TASK, + NodeCategoryEnums.TASK, + "通知节点" ); // // /** diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/event/handler/DeadLetterJobEventHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/event/handler/DeadLetterJobEventHandler.java new file mode 100644 index 00000000..8ce56b4f --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/listener/event/handler/DeadLetterJobEventHandler.java @@ -0,0 +1,52 @@ +package com.qqchen.deploy.backend.workflow.listener.event.handler; + +import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; +import jakarta.annotation.Resource; +import jakarta.transaction.Transactional; +import lombok.extern.slf4j.Slf4j; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType; +import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent; +import org.flowable.common.engine.api.delegate.event.FlowableEvent; +import org.flowable.engine.RuntimeService; +import org.flowable.job.api.Job; +import org.flowable.job.service.impl.persistence.entity.JobEntityImpl; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +@Slf4j +@Component +public class DeadLetterJobEventHandler implements IFlowableEventHandler { + + @Resource + private RuntimeService runtimeService; + + + @Override + public boolean canHandle(String eventType) { + return eventType.startsWith("JOB_MOVED_TO_DEADLETTER"); + } + + @Override + public void handle(FlowableEvent event) { + if (event instanceof FlowableEntityEvent && event.getType() == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) { + FlowableEntityEvent entityEvent = (FlowableEntityEvent) event; + Job job = (Job) entityEvent.getEntity(); + String processInstanceId = job.getProcessInstanceId(); + + try { + // 直接在 Flowable 的事务中执行 + runtimeService.deleteProcessInstance( + processInstanceId, + "Gateway condition evaluation failed: " + job.getExceptionMessage() + ); + log.info("Process instance {} terminated due to dead letter job", processInstanceId); + } catch (Exception e) { + log.error("Failed to terminate process instance: " + processInstanceId, e); + // 这里我们不抛出异常,因为我们希望流程能够正常结束 + } + } + } + +} \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java index a7b27587..1b2d4204 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/util/BpmnConverter.java @@ -1,9 +1,11 @@ package com.qqchen.deploy.backend.workflow.util; +import com.fasterxml.jackson.databind.JsonNode; import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants; import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraphEdge; import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraph; import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraphNode; +import com.qqchen.deploy.backend.workflow.enums.GatewayTypeEnums; import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnums; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.BpmnAutoLayout; @@ -70,7 +72,7 @@ public class BpmnConverter { /** * 创建并初始化BPMN模型 - * + * * @param processKey 流程定义的唯一标识 * @param bpmnModel BPMN模型对象 * @return 初始化后的Process对象 @@ -96,7 +98,7 @@ public class BpmnConverter { /** * 转换工作流节点为BPMN节点 - * + * * @param nodes 工作流定义节点列表 * @param process 流程定义对象 * @return 节点ID映射关系 @@ -133,8 +135,14 @@ public class BpmnConverter { FlowElement element = instanceClass.getDeclaredConstructor().newInstance(); String validId = sanitizeId(node.getId()); idMapping.put(node.getId(), validId); - element.setId(validId); - element.setName(node.getNodeName()); + + // 如果是网关节点,需要特殊处理 + if (element instanceof Gateway) { + element = createGatewayElement(node, validId); + } else { + element.setId(validId); + element.setName(node.getNodeName()); + } // 步骤2.5:配置节点的特定属性 configureFlowElement(element, node, process); @@ -415,6 +423,43 @@ public class BpmnConverter { return errorFlow; } + /** + * 创建网关节点 + * + * @param node 工作流节点定义 + * @param validId 有效的节点ID + * @return 创建的网关节点 + */ + private Gateway createGatewayElement(WorkflowDefinitionGraphNode node, String validId) { + if (node.getPanelVariables() == null) { + throw new IllegalArgumentException("Gateway node must have panel variables"); + } + + String gatewayTypeCode = node.getPanelVariables().get("gatewayType").asText(); + GatewayTypeEnums gatewayType = GatewayTypeEnums.fromCode(gatewayTypeCode); + + // 根据网关类型创建对应的网关 + Gateway gateway; + switch (gatewayType) { + case EXCLUSIVE_GATEWAY: + gateway = new ExclusiveGateway(); + break; + case PARALLEL_GATEWAY: + gateway = new ParallelGateway(); + break; + case INCLUSIVE_GATEWAY: + gateway = new InclusiveGateway(); + break; + default: + throw new IllegalArgumentException("Unsupported gateway type: " + gatewayType); + } + + gateway.setId(validId); + gateway.setName(node.getNodeName()); + + return gateway; + } + /** * 转换连线为顺序流 * @@ -431,6 +476,15 @@ public class BpmnConverter { flow.setName(edge.getName()); flow.setSourceRef(idMapping.get(edge.getFrom())); flow.setTargetRef(idMapping.get(edge.getTo())); + + // 处理条件 + if (edge.getConfig() != null && edge.getConfig().getCondition() != null) { + if ("EXPRESSION".equals(edge.getConfig().getCondition().getType())) { + String expression = edge.getConfig().getCondition().getExpression(); + flow.setConditionExpression(expression); + } + } + process.addFlowElement(flow); } }