反序列化问题。

This commit is contained in:
dengqichen 2024-12-25 14:18:48 +08:00
parent c6c1c164b4
commit 4b2e75733d
8 changed files with 231 additions and 14 deletions

View File

@ -56,10 +56,10 @@ public class DeployNodeDelegate extends BaseNodeDelegate<DeployNodePanelVariable
@Override
protected void executeInternal(DelegateExecution execution, DeployNodePanelVariables panelVariables, DeployNodeLocalVariables localVariables) {
String queueId = jenkinsServiceIntegration.buildWithParameters();
JenkinsQueueBuildInfoResponse buildInfo = waitForBuildToStart(queueId);
// 3. 轮询构建状态
pollBuildStatus("scp-meta", buildInfo.getBuildNumber());
// String queueId = jenkinsServiceIntegration.buildWithParameters();
// JenkinsQueueBuildInfoResponse buildInfo = waitForBuildToStart(queueId);
// // 3. 轮询构建状态
// pollBuildStatus("scp-meta", buildInfo.getBuildNumber());
}
private JenkinsQueueBuildInfoResponse waitForBuildToStart(String queueId) {

View File

@ -0,0 +1,64 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.deploy.enums.JenkinsBuildStatus;
import com.qqchen.deploy.backend.deploy.integration.IJenkinsServiceIntegration;
import com.qqchen.deploy.backend.deploy.integration.response.JenkinsQueueBuildInfoResponse;
import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.DeployNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.NotificationNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.DeployNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.NotificationNodePanelVariables;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Shell脚本任务的委派者实现
*/
@Slf4j
@Component
public class NotificationNodeDelegate extends BaseNodeDelegate<NotificationNodePanelVariables, NotificationNodeLocalVariables> {
@Resource
private ApplicationEventPublisher eventPublisher;
@Resource
private IJenkinsServiceIntegration jenkinsServiceIntegration;
private static final int QUEUE_POLL_INTERVAL = 10; // 10秒
private static final int MAX_QUEUE_POLLS = 30; // 最多等待5分钟
// 轮询间隔
private static final int BUILD_POLL_INTERVAL = 10;
// 最大轮询次数
private static final int MAX_BUILD_POLLS = 180; // 30分钟超时
// 用于存储实时输出的Map
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
@Override
protected Class<NotificationNodePanelVariables> getPanelVariablesClass() {
return NotificationNodePanelVariables.class;
}
@Override
protected Class<NotificationNodeLocalVariables> getLocalVariablesClass() {
return NotificationNodeLocalVariables.class;
}
@Override
protected void executeInternal(DelegateExecution execution, NotificationNodePanelVariables panelVariables, NotificationNodeLocalVariables localVariables) {
System.out.println(panelVariables.getText());
}
}

View File

@ -0,0 +1,19 @@
package com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables;
import com.qqchen.deploy.backend.workflow.annotation.SchemaProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class NotificationNodeLocalVariables extends BaseNodeLocalVariables {
@SchemaProperty(
title = "委派者",
description = "委派者",
defaultValue = "${notificationNodeDelegate}",
required = true
)
private String delegate;
}

View File

@ -0,0 +1,21 @@
package com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables;
import com.qqchen.deploy.backend.workflow.annotation.SchemaProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 脚本执行器配置
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class NotificationNodePanelVariables extends BaseNodePanelVariables {
@SchemaProperty(
title = "测试输出",
description = "测试输出",
required = true
)
private String text;
}

View File

@ -10,14 +10,8 @@ public enum GatewayTypeEnums {
PARALLEL_GATEWAY("parallelGateway", "并行网关"),
INCLUSIVE_GATEWAY("inclusiveGateway", "包容网关");
/**
* 分类编码
*/
private final String code;
/**
* 分类名称
*/
private final String name;
GatewayTypeEnums(String code, String name) {

View File

@ -4,10 +4,12 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.qqchen.deploy.backend.workflow.dto.definition.node.fromVariables.DeployNodeFormVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.fromVariables.ScriptNodeFormVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.DeployNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.NotificationNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.ScriptNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.DeployNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.EndNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.GatewayNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.NotificationNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.ScriptNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.StartNodePanelVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.uiVariables.NodeUiVariables;
@ -89,6 +91,17 @@ public enum NodeTypeEnums {
BpmnNodeTypeEnums.EXCLUSIVE_GATEWAY,
NodeCategoryEnums.GATEWAY,
"条件分支控制"
),
NOTIFICATION_NODE(
"NOTIFICATION_NODE",
"通知节点",
NotificationNodeLocalVariables.class,
NotificationNodePanelVariables.class,
null,
NodeUiVariables.class,
BpmnNodeTypeEnums.SERVICE_TASK,
NodeCategoryEnums.TASK,
"通知节点"
);
//
// /**

View File

@ -0,0 +1,52 @@
package com.qqchen.deploy.backend.workflow.listener.event.handler;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import jakarta.annotation.Resource;
import jakarta.transaction.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.RuntimeService;
import org.flowable.job.api.Job;
import org.flowable.job.service.impl.persistence.entity.JobEntityImpl;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
@Slf4j
@Component
public class DeadLetterJobEventHandler implements IFlowableEventHandler {
@Resource
private RuntimeService runtimeService;
@Override
public boolean canHandle(String eventType) {
return eventType.startsWith("JOB_MOVED_TO_DEADLETTER");
}
@Override
public void handle(FlowableEvent event) {
if (event instanceof FlowableEntityEvent && event.getType() == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) {
FlowableEntityEvent entityEvent = (FlowableEntityEvent) event;
Job job = (Job) entityEvent.getEntity();
String processInstanceId = job.getProcessInstanceId();
try {
// 直接在 Flowable 的事务中执行
runtimeService.deleteProcessInstance(
processInstanceId,
"Gateway condition evaluation failed: " + job.getExceptionMessage()
);
log.info("Process instance {} terminated due to dead letter job", processInstanceId);
} catch (Exception e) {
log.error("Failed to terminate process instance: " + processInstanceId, e);
// 这里我们不抛出异常因为我们希望流程能够正常结束
}
}
}
}

View File

@ -1,9 +1,11 @@
package com.qqchen.deploy.backend.workflow.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraphEdge;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraph;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraphNode;
import com.qqchen.deploy.backend.workflow.enums.GatewayTypeEnums;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnums;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.BpmnAutoLayout;
@ -133,8 +135,14 @@ public class BpmnConverter {
FlowElement element = instanceClass.getDeclaredConstructor().newInstance();
String validId = sanitizeId(node.getId());
idMapping.put(node.getId(), validId);
element.setId(validId);
element.setName(node.getNodeName());
// 如果是网关节点需要特殊处理
if (element instanceof Gateway) {
element = createGatewayElement(node, validId);
} else {
element.setId(validId);
element.setName(node.getNodeName());
}
// 步骤2.5配置节点的特定属性
configureFlowElement(element, node, process);
@ -415,6 +423,43 @@ public class BpmnConverter {
return errorFlow;
}
/**
* 创建网关节点
*
* @param node 工作流节点定义
* @param validId 有效的节点ID
* @return 创建的网关节点
*/
private Gateway createGatewayElement(WorkflowDefinitionGraphNode node, String validId) {
if (node.getPanelVariables() == null) {
throw new IllegalArgumentException("Gateway node must have panel variables");
}
String gatewayTypeCode = node.getPanelVariables().get("gatewayType").asText();
GatewayTypeEnums gatewayType = GatewayTypeEnums.fromCode(gatewayTypeCode);
// 根据网关类型创建对应的网关
Gateway gateway;
switch (gatewayType) {
case EXCLUSIVE_GATEWAY:
gateway = new ExclusiveGateway();
break;
case PARALLEL_GATEWAY:
gateway = new ParallelGateway();
break;
case INCLUSIVE_GATEWAY:
gateway = new InclusiveGateway();
break;
default:
throw new IllegalArgumentException("Unsupported gateway type: " + gatewayType);
}
gateway.setId(validId);
gateway.setName(node.getNodeName());
return gateway;
}
/**
* 转换连线为顺序流
*
@ -431,6 +476,15 @@ public class BpmnConverter {
flow.setName(edge.getName());
flow.setSourceRef(idMapping.get(edge.getFrom()));
flow.setTargetRef(idMapping.get(edge.getTo()));
// 处理条件
if (edge.getConfig() != null && edge.getConfig().getCondition() != null) {
if ("EXPRESSION".equals(edge.getConfig().getCondition().getType())) {
String expression = edge.getConfig().getCondition().getExpression();
flow.setConditionExpression(expression);
}
}
process.addFlowElement(flow);
}
}