反序列化问题。

This commit is contained in:
dengqichen 2024-12-20 18:30:52 +08:00
parent 1b0e079da4
commit 4f58a92167
6 changed files with 775 additions and 264 deletions

View File

@ -0,0 +1,78 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 任务委派者基类
* 负责处理panelVariables和localVariables的转换和注入
*
* @param <P> Panel变量类型
* @param <L> Local变量类型
*/
@Slf4j
public abstract class BaseTaskDelegate<P, L> implements JavaDelegate {
@Autowired
private ObjectMapper objectMapper;
// 字段注入由Flowable自动设置
protected Expression panelVariables;
protected Expression localVariables;
@Override
public void execute(DelegateExecution execution) {
try {
// 获取并转换Panel变量
P panelVars = null;
if (panelVariables != null) {
String panelVarsJson = panelVariables.getValue(execution).toString();
JsonNode panelVarsNode = objectMapper.readTree(panelVarsJson);
panelVars = objectMapper.treeToValue(panelVarsNode, getPanelVariablesClass());
}
// 获取并转换Local变量
L localVars = null;
if (localVariables != null) {
String localVarsJson = localVariables.getValue(execution).toString();
JsonNode localVarsNode = objectMapper.readTree(localVarsJson);
localVars = objectMapper.treeToValue(localVarsNode, getLocalVariablesClass());
}
// 执行具体的任务逻辑
executeInternal(execution, panelVars, localVars);
} catch (Exception e) {
log.error("Task execution failed", e);
throw new RuntimeException("Task execution failed: " + e.getMessage(), e);
}
}
/**
* 获取Panel变量的类型
*
* @return Panel变量的Class对象
*/
protected abstract Class<P> getPanelVariablesClass();
/**
* 获取Local变量的类型
*
* @return Local变量的Class对象
*/
protected abstract Class<L> getLocalVariablesClass();
/**
* 执行具体的任务逻辑
*
* @param execution DelegateExecution对象
* @param panelVariables 转换后的Panel变量
* @param localVariables 转换后的Local变量
*/
protected abstract void executeInternal(DelegateExecution execution, P panelVariables, L localVariables);
}

View File

@ -1,189 +1,156 @@
package com.qqchen.deploy.backend.workflow.delegate; package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants; import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent; import com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables.ScriptNodeLocalVariables;
import com.qqchen.deploy.backend.workflow.dto.definition.node.panelVariables.ScriptNodePanelVariables;
import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums; import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.impl.el.FixedValue;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService;
import org.flowable.engine.delegate.BpmnError; import org.flowable.engine.delegate.BpmnError;
import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.io.*; import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
* Shell任务委托执行器 * Shell脚本任务的委派者实现
*/ */
@Slf4j @Slf4j
@Component @Component
public class ShellTaskDelegate implements JavaDelegate { public class ShellTaskDelegate extends BaseTaskDelegate<ScriptNodePanelVariables, ScriptNodeLocalVariables> {
@Resource @Resource
private ApplicationEventPublisher eventPublisher; private ApplicationEventPublisher eventPublisher;
@Resource
private RuntimeService runtimeService;
@Resource
private ManagementService managementService;
private FixedValue code; // 添加字段
private FixedValue name;
private FixedValue script;
private FixedValue language;
private FixedValue interpreter;
private Expression workDir;
private Expression env;
// 用于存储实时输出的Map // 用于存储实时输出的Map
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>(); private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>(); private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
@Override
protected Class<ScriptNodePanelVariables> getPanelVariablesClass() {
return ScriptNodePanelVariables.class;
}
@Override @Override
public void execute(DelegateExecution execution) { protected Class<ScriptNodeLocalVariables> getLocalVariablesClass() {
log.info("ShellTaskDelegate开始执行, processInstanceId={}, executionId={}", return ScriptNodeLocalVariables.class;
execution.getProcessInstanceId(), execution.getId());
// 从字段注入中获取值
String scriptValue = script != null ? script.getValue(execution).toString() : null;
String workDirValue = workDir != null ? workDir.getValue(execution).toString() : null;
@SuppressWarnings("unchecked")
Map<String, String> envValue = env != null ? (Map<String, String>) env.getValue(execution) : null;
log.info("字段注入的值: script={}, workDir={}, env={}", scriptValue, workDirValue, envValue);
// 如果流程变量中有值优先使用流程变量
if (execution.hasVariable("script")) {
scriptValue = (String) execution.getVariable("script");
// log.info("从流程变量获取到script={}", scriptValue);
}
if (execution.hasVariable("workDir")) {
workDirValue = (String) execution.getVariable("workDir");
// log.info("从流程变量获取到workDir={}", workDirValue);
}
if (execution.hasVariable("env")) {
@SuppressWarnings("unchecked")
Map<String, String> envFromVar = (Map<String, String>) execution.getVariable("env");
envValue = envFromVar;
// log.info("从流程变量获取到env={}", envValue);
} }
if (scriptValue == null) { @Override
// log.error("脚本内容为空,执行失败"); protected void executeInternal(DelegateExecution execution,
ScriptNodePanelVariables panelVariables,
ScriptNodeLocalVariables localVariables) {
if (panelVariables == null || panelVariables.getScript() == null) {
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Script is required but not provided"); throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Script is required but not provided");
} }
try { // try {
log.info("准备执行脚本: {}", scriptValue); // log.info("准备执行脚本: {}", panelVariables.getScript());
// 使用processInstanceId而不是executionId // // 使用processInstanceId而不是executionId
String processInstanceId = execution.getProcessInstanceId(); // String processInstanceId = execution.getProcessInstanceId();
outputMap.put(processInstanceId, new StringBuilder()); // outputMap.put(processInstanceId, new StringBuilder());
errorMap.put(processInstanceId, new StringBuilder()); // errorMap.put(processInstanceId, new StringBuilder());
//
// 创建进程构建器 // // 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(); // ProcessBuilder processBuilder = new ProcessBuilder();
//
// 根据操作系统选择合适的shell // // 根据操作系统选择合适的shell
String os = System.getProperty("os.name").toLowerCase(); // String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) { // if (os.contains("win")) {
// Windows系统使用cmd // // Windows系统使用cmd
processBuilder.command("cmd", "/c", scriptValue); // processBuilder.command("cmd", "/c", panelVariables.getScript());
} else { // } else {
// Unix-like系统使用bash // // Unix-like系统使用bash
processBuilder.command("bash", "-c", scriptValue); // processBuilder.command("bash", "-c", panelVariables.getScript());
} // }
//
// 设置工作目录 // // 设置工作目录
if (StringUtils.hasText(workDirValue)) { // if (StringUtils.hasText(localVariables.getWorkDir())) {
// Windows系统路径处理 // // Windows系统路径处理
if (os.contains("win")) { // String workDirValue = localVariables.getWorkDir();
// 确保使用Windows风格的路径分隔符 // if (os.contains("win")) {
workDirValue = workDirValue.replace("/", "\\"); // // 确保使用Windows风格的路径分隔符
// 如果路径以\开头去掉第一个\ // workDirValue = workDirValue.replace("/", "\\");
if (workDirValue.startsWith("\\")) { // // 如果路径以\开头去掉第一个\
workDirValue = workDirValue.substring(1); // if (workDirValue.startsWith("\\")) {
} // workDirValue = workDirValue.substring(1);
} // }
File workDirFile = new File(workDirValue); // }
if (!workDirFile.exists()) { // File workDirFile = new File(workDirValue);
workDirFile.mkdirs(); // if (!workDirFile.exists()) {
} // workDirFile.mkdirs();
processBuilder.directory(workDirFile); // }
} // processBuilder.directory(workDirFile);
// }
// 设置环境变量 //
if (envValue != null) { // // 设置环境变量
processBuilder.environment().putAll(envValue); // if (localVariables.getEnv() != null) {
} // processBuilder.environment().putAll(localVariables.getEnv());
// }
// 执行命令 //
log.info("执行shell脚本: {}", scriptValue); // // 执行命令
Process process = processBuilder.start(); // log.info("执行shell脚本: {}", panelVariables.getScript());
// Process process = processBuilder.start();
// 使用BufferedReader实时读取输出 //
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); // // 创建线程池处理输出
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); // ExecutorService executorService = Executors.newFixedThreadPool(2);
//
// 创建线程池处理输出 // // 处理标准输出
ExecutorService executorService = Executors.newFixedThreadPool(2); // Future<?> outputFuture = executorService.submit(() ->
// processInputStream(process.getInputStream(), processInstanceId, NodeLogTypeEnums.STDOUT));
// 处理标准输出 //
Future<?> outputFuture = executorService.submit(() -> // // 处理错误输出
processInputStream(process.getInputStream(), processInstanceId, NodeLogTypeEnums.STDOUT)); // Future<?> errorFuture = executorService.submit(() ->
// processInputStream(process.getErrorStream(), processInstanceId, NodeLogTypeEnums.STDERR));
// 处理错误输出 //
Future<?> errorFuture = executorService.submit(() -> // // 等待进程完成
processInputStream(process.getErrorStream(), processInstanceId, NodeLogTypeEnums.STDERR)); // int exitCode = process.waitFor();
//
// 等待进程完成 // // 等待输出处理完成
int exitCode = process.waitFor(); // outputFuture.get(5, TimeUnit.SECONDS);
// errorFuture.get(5, TimeUnit.SECONDS);
// 等待输出处理完成 //
outputFuture.get(5, TimeUnit.SECONDS); // // 关闭线程池
errorFuture.get(5, TimeUnit.SECONDS); // executorService.shutdown();
//
// 关闭线程池 // // 设置最终结果
executorService.shutdown(); // StringBuilder finalOutput = outputMap.get(processInstanceId);
// StringBuilder finalError = errorMap.get(processInstanceId);
// 设置最终结果 //
StringBuilder finalOutput = outputMap.get(processInstanceId); // execution.setVariable("shellOutput", finalOutput.toString());
StringBuilder finalError = errorMap.get(processInstanceId); // execution.setVariable("shellError", finalError.toString());
// execution.setVariable("shellExitCode", exitCode);
execution.setVariable("shellOutput", finalOutput.toString()); //
execution.setVariable("shellError", finalError.toString()); // // 清理缓存
execution.setVariable("shellExitCode", exitCode); // outputMap.remove(processInstanceId);
// errorMap.remove(processInstanceId);
// 清理缓存 //
outputMap.remove(processInstanceId); // if (exitCode != 0) {
errorMap.remove(processInstanceId); // log.error("Shell脚本执行失败退出码: {}", exitCode);
// execution.setVariable("errorDetail", "Shell脚本执行失败退出码: " + exitCode);
if (exitCode != 0) { // throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Shell脚本执行失败退出码: " + exitCode);
log.error("Shell脚本执行失败退出码: {}", exitCode); // }
execution.setVariable("errorDetail", "Shell脚本执行失败退出码: " + exitCode); // log.info("Shell脚本执行成功");
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, "Shell脚本执行失败退出码: " + exitCode); // log.debug("脚本输出: {}", finalOutput);
// throw new RuntimeException("Shell脚本执行失败退出码: " + exitCode); //
} // } catch (Exception e) {
log.info("Shell脚本执行成功"); // log.error("Shell脚本执行失败", e);
log.debug("脚本输出: {}", finalOutput); // throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, e.getMessage());
// }
} catch (Exception e) {
log.error("Shell脚本执行失败", e);
// runtimeService.deleteProcessInstance(execution.getProcessInstanceId(), e.getMessage());
// throw new FlowableException("任务执行失败: " + e.getMessage(), e);
throw new BpmnError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, e.getMessage());
}
} }
private void processInputStream(InputStream inputStream, String processInstanceId, NodeLogTypeEnums logType) { private void processInputStream(InputStream inputStream, String processInstanceId, NodeLogTypeEnums logType) {
@ -212,6 +179,4 @@ public class ShellTaskDelegate implements JavaDelegate {
log.error("Error reading process output", e); log.error("Error reading process output", e);
} }
} }
} }

View File

@ -9,12 +9,4 @@ import lombok.Data;
@Data @Data
public class BaseNodeLocalVariables { public class BaseNodeLocalVariables {
@SchemaProperty(
title = "委派者",
description = "委派者",
defaultValue = "com.qqchen.deploy.backend.workflow.delegate.ShellTaskDelegate",
required = true
)
private String delegate;
} }

View File

@ -1,5 +1,6 @@
package com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables; package com.qqchen.deploy.backend.workflow.dto.definition.node.localVariables;
import com.qqchen.deploy.backend.workflow.annotation.SchemaProperty;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -7,4 +8,12 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class ScriptNodeLocalVariables extends BaseNodeLocalVariables { public class ScriptNodeLocalVariables extends BaseNodeLocalVariables {
@SchemaProperty(
title = "委派者",
description = "委派者",
defaultValue = "${shellTaskDelegate}",
required = true
)
private String delegate;
} }

View File

@ -41,58 +41,17 @@ public class BpmnConverter {
try { try {
log.debug("开始转换工作流定义为XML, processKey: {}", processKey); log.debug("开始转换工作流定义为XML, processKey: {}", processKey);
// 创建BPMN模型 // 步骤1创建和初始化BPMN模型
BpmnModel bpmnModel = new BpmnModel(); BpmnModel bpmnModel = new BpmnModel();
bpmnModel.setTargetNamespace("http://www.flowable.org/test"); Process process = createAndInitializeBpmnModel(processKey, bpmnModel);
bpmnModel.addError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, WorkFlowConstants.WORKFLOW_EXEC_ERROR);
Process process = new Process(); // 步骤2转换所有节点
// 确保processKey符合NCName规则 Map<String, String> idMapping = convertNodes(graph.getNodes(), process);
String validProcessKey = processKey.replaceAll("[^a-zA-Z0-9-_.]", "_");
process.setId(validProcessKey);
process.setName(processKey); // 保持原始processKey作为显示名称
process.setExecutable(true);
bpmnModel.addProcess(process);
// 创建节点ID到规范化ID的映射 // 步骤3转换连线
Map<String, String> idMapping = new HashMap<>(); convertEdgesToSequenceFlows(graph.getEdges(), idMapping, process);
// 转换节点 // 步骤4自动布局
for (WorkflowDefinitionNode node : graph.getNodes()) {
log.debug("转换节点: {}, 类型: {}", node.getNodeName(), node.getNodeCode());
// 通过NodeTypeEnums获取对应的BpmnTypeEnums中定义的实例类型
@SuppressWarnings("unchecked")
Class<? extends FlowElement> instanceClass = (Class<? extends FlowElement>) NodeTypeEnums.valueOf(node.getNodeCode())
.getBpmnType()
.getInstance();
// 创建节点实例
FlowElement element = instanceClass.getDeclaredConstructor().newInstance();
String validId = sanitizeId(node.getId());
idMapping.put(node.getId(), validId);
element.setId(validId);
element.setName(node.getNodeName());
// 设置节点特定属性
configureFlowElement(element, node, process);
process.addFlowElement(element);
}
// 转换连线
for (WorkflowDefinitionEdge edge : graph.getEdges()) {
log.debug("转换连线: from {} to {}", edge.getFrom(), edge.getTo());
SequenceFlow flow = new SequenceFlow();
flow.setId("FLOW_" + edge.getId().replaceAll("[^a-zA-Z0-9-_.]", "_"));
flow.setName(edge.getName());
flow.setSourceRef(idMapping.get(edge.getFrom()));
flow.setTargetRef(idMapping.get(edge.getTo()));
process.addFlowElement(flow);
}
// 自动布局
new BpmnAutoLayout(bpmnModel).execute(); new BpmnAutoLayout(bpmnModel).execute();
// 转换为XML // 转换为XML
@ -109,6 +68,85 @@ public class BpmnConverter {
} }
} }
/**
* 创建并初始化BPMN模型
*
* @param processKey 流程定义的唯一标识
* @param bpmnModel BPMN模型对象
* @return 初始化后的Process对象
*/
private Process createAndInitializeBpmnModel(String processKey, BpmnModel bpmnModel) {
// 步骤1.1设置BPMN模型的基本属性
bpmnModel.setTargetNamespace("http://www.flowable.org/test");
bpmnModel.addError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, WorkFlowConstants.WORKFLOW_EXEC_ERROR);
// 步骤1.2创建并配置Process对象
Process process = new Process();
// 确保processKey符合NCName规则
String validProcessKey = processKey.replaceAll("[^a-zA-Z0-9-_.]", "_");
process.setId(validProcessKey);
process.setName(processKey); // 保持原始processKey作为显示名称
process.setExecutable(true);
// 步骤1.3将Process添加到BPMN模型中
bpmnModel.addProcess(process);
return process;
}
/**
* 转换工作流节点为BPMN节点
*
* @param nodes 工作流定义节点列表
* @param process 流程定义对象
* @return 节点ID映射关系
*/
private Map<String, String> convertNodes(List<WorkflowDefinitionNode> nodes, Process process) {
// 步骤2.1创建节点ID映射
Map<String, String> idMapping = new HashMap<>();
// 步骤2.2遍历并转换每个节点
for (WorkflowDefinitionNode node : nodes) {
log.debug("转换节点: {}, 类型: {}", node.getNodeName(), node.getNodeCode());
convertSingleNode(node, process, idMapping);
}
return idMapping;
}
/**
* 转换单个工作流节点为BPMN节点
*
* @param node 工作流节点定义
* @param process 流程定义对象
* @param idMapping 节点ID映射关系
*/
private void convertSingleNode(WorkflowDefinitionNode node, Process process, Map<String, String> idMapping) {
try {
// 步骤2.3获取节点对应的BPMN元素类型
@SuppressWarnings("unchecked")
Class<? extends FlowElement> instanceClass = (Class<? extends FlowElement>) NodeTypeEnums.valueOf(node.getNodeCode())
.getBpmnType()
.getInstance();
// 步骤2.4创建节点实例并设置基本属性
FlowElement element = instanceClass.getDeclaredConstructor().newInstance();
String validId = sanitizeId(node.getId());
idMapping.put(node.getId(), validId);
element.setId(validId);
element.setName(node.getNodeName());
// 步骤2.5配置节点的特定属性
configureFlowElement(element, node, process);
// 步骤2.6将节点添加到流程中
process.addFlowElement(element);
} catch (Exception e) {
log.error("节点转换失败: {}", node.getNodeName(), e);
throw new RuntimeException("节点转换失败: " + e.getMessage(), e);
}
}
private String sanitizeId(String id) { private String sanitizeId(String id) {
return "NODE_" + id.replaceAll("[^a-zA-Z0-9-_.]", "_"); return "NODE_" + id.replaceAll("[^a-zA-Z0-9-_.]", "_");
} }
@ -121,29 +159,144 @@ public class BpmnConverter {
* @param process 当前流程 * @param process 当前流程
*/ */
private void configureFlowElement(FlowElement element, WorkflowDefinitionNode node, Process process) { private void configureFlowElement(FlowElement element, WorkflowDefinitionNode node, Process process) {
// 为所有节点添加执行监听器 // 步骤1创建基本的扩展元素
Map<String, List<ExtensionElement>> extensionElements = createBaseExtensionElements();
// 步骤2根据节点类型进行特定配置
if (element instanceof ServiceTask) {
configureServiceTask((ServiceTask) element, node, process, extensionElements);
} else if (element instanceof StartEvent || element instanceof EndEvent) {
// 为开始节点和结束节点设置监<EFBFBD><EFBFBD>
element.setExtensionElements(extensionElements);
}
}
/**
* 创建基本的扩展元素包含执行监听器
*
* @return 包含基本扩展元素的Map
*/
private Map<String, List<ExtensionElement>> createBaseExtensionElements() {
Map<String, List<ExtensionElement>> extensionElements = new HashMap<>(); Map<String, List<ExtensionElement>> extensionElements = new HashMap<>();
List<ExtensionElement> executionListeners = new ArrayList<>(); List<ExtensionElement> executionListeners = new ArrayList<>();
// 开始事件监听器 // 添加开始事件监听器
ExtensionElement startListener = createExecutionListener("start", "${globalNodeExecutionListener}"); ExtensionElement startListener = createExecutionListener("start", "${globalNodeExecutionListener}");
executionListeners.add(startListener); executionListeners.add(startListener);
// 结束事件监听器 // 添加结束事件监听器
ExtensionElement endListener = createExecutionListener("end", "${globalNodeExecutionListener}"); ExtensionElement endListener = createExecutionListener("end", "${globalNodeExecutionListener}");
executionListeners.add(endListener); executionListeners.add(endListener);
extensionElements.put("executionListener", executionListeners); extensionElements.put("executionListener", executionListeners);
return extensionElements;
}
// 根据节点类型进行特定配置 /**
if (element instanceof ServiceTask) { * 配置服务任务节点
ServiceTask serviceTask = (ServiceTask) element; *
// 设置委托表达式 * @param serviceTask 服务任务节点
// String delegate = (String) node.getLocalVariables().get("delegate"); * @param node 工作流节点定义
// serviceTask.setImplementationType("delegateExpression"); * @param process 当前流程
// serviceTask.setImplementation(delegate); * @param extensionElements 扩展元素
*/
private void configureServiceTask(ServiceTask serviceTask, WorkflowDefinitionNode node, Process process, Map<String, List<ExtensionElement>> extensionElements) {
if (node.getLocalVariables() != null && node.getLocalVariables().has("delegate")) {
String delegate = node.getLocalVariables().get("delegate").asText();
serviceTask.setImplementationType("delegateExpression");
serviceTask.setImplementation(delegate);
}
// 配置重试策略 addExecutionVariables(extensionElements, node);
addRetryStrategy(extensionElements);
serviceTask.setExtensionElements(extensionElements);
addErrorBoundaryEventHandler(process, serviceTask);
}
/**
* 创建扩展属性
*
* @param name 属性名
* @param value 属性值
* @return 扩展属性
*/
private ExtensionAttribute createAttribute(String name, String value) {
ExtensionAttribute attribute = new ExtensionAttribute();
attribute.setName(name);
attribute.setValue(value);
return attribute;
}
/**
* 添加执行实例级变量
*
* @param extensionElements 扩展元素
* @param node 工作流节点定义
*/
private void addExecutionVariables(Map<String, List<ExtensionElement>> extensionElements, WorkflowDefinitionNode node) {
// 添加panelVariables变量
if (node.getPanelVariables() != null) {
ExtensionElement fieldElement = new ExtensionElement();
fieldElement.setName("field");
fieldElement.setNamespace("http://flowable.org/bpmn");
fieldElement.setNamespacePrefix("flowable");
// 创建field的子元素string
ExtensionElement stringElement = new ExtensionElement();
stringElement.setName("string");
stringElement.setNamespace("http://flowable.org/bpmn");
stringElement.setNamespacePrefix("flowable");
// 转义JSON内容
String escapedJson = escapeXml(node.getPanelVariables().toString());
stringElement.setElementText(escapedJson);
// 设置field的name属性
Map<String, List<ExtensionAttribute>> fieldAttributes = new HashMap<>();
fieldAttributes.put("name", Collections.singletonList(createAttribute("name", "panelVariables")));
fieldElement.setAttributes(fieldAttributes);
// 添加string子元素到field
fieldElement.addChildElement(stringElement);
// 添加field到extensionElements
extensionElements.computeIfAbsent("field", k -> new ArrayList<>()).add(fieldElement);
}
// 添加localVariables变量
if (node.getLocalVariables() != null) {
ExtensionElement fieldElement = new ExtensionElement();
fieldElement.setName("field");
fieldElement.setNamespace("http://flowable.org/bpmn");
fieldElement.setNamespacePrefix("flowable");
// 创建field的子元素string
ExtensionElement stringElement = new ExtensionElement();
stringElement.setName("string");
stringElement.setNamespace("http://flowable.org/bpmn");
stringElement.setNamespacePrefix("flowable");
// 转义JSON内容
String escapedJson = escapeXml(node.getLocalVariables().toString());
stringElement.setElementText(escapedJson);
// 设置field的name属性
Map<String, List<ExtensionAttribute>> fieldAttributes = new HashMap<>();
fieldAttributes.put("name", Collections.singletonList(createAttribute("name", "localVariables")));
fieldElement.setAttributes(fieldAttributes);
// 添加string子元素到field
fieldElement.addChildElement(stringElement);
// 添加field到extensionElements
extensionElements.computeIfAbsent("field", k -> new ArrayList<>()).add(fieldElement);
}
}
/**
* 添加重试策略配置
*
* @param extensionElements 扩展元素
*/
private void addRetryStrategy(Map<String, List<ExtensionElement>> extensionElements) {
ExtensionElement retryConfig = new ExtensionElement(); ExtensionElement retryConfig = new ExtensionElement();
retryConfig.setName("failedJobRetryTimeCycle"); retryConfig.setName("failedJobRetryTimeCycle");
retryConfig.setNamespace("http://flowable.org/bpmn"); retryConfig.setNamespace("http://flowable.org/bpmn");
@ -153,32 +306,10 @@ public class BpmnConverter {
List<ExtensionElement> retryElements = new ArrayList<>(); List<ExtensionElement> retryElements = new ArrayList<>();
retryElements.add(retryConfig); retryElements.add(retryConfig);
extensionElements.put("failedJobRetryTimeCycle", retryElements); extensionElements.put("failedJobRetryTimeCycle", retryElements);
// 添加字段注入
// List<FieldExtension> fieldExtensions = new ArrayList<>();
// node.getLocalVariables().forEach((key, value) -> {
// if (value != null && !"delegate".equals(key)) {
// FieldExtension fieldExtension = new FieldExtension();
// fieldExtension.setFieldName(key);
// fieldExtension.setStringValue(String.valueOf(value));
// fieldExtensions.add(fieldExtension);
// }
// });
// 设置到服务任务
// serviceTask.setFieldExtensions(fieldExtensions);
serviceTask.setExtensionElements(extensionElements);
// 添加错误边界事件
addErrorBoundaryEventHandler(process, serviceTask);
} else if (element instanceof StartEvent || element instanceof EndEvent) {
// 为开始节点和结束节点设置监听器
element.setExtensionElements(extensionElements);
}
} }
/** /**
* 创建执行监听器扩展 * 创建执行监听器扩展<EFBFBD><EFBFBD>
* *
* @param event 事件类型start/end * @param event 事件类型start/end
* @param delegateExpression 委托表达式 * @param delegateExpression 委托表达式
@ -210,7 +341,7 @@ public class BpmnConverter {
} }
/** /**
* 为服务任务添加错误边界事件和错误结<EFBFBD><EFBFBD><EFBFBD>事件 * 为服务任务添加错误边界事件和错误结事件
* 当服务任务执行失败时会触发错误边界事件并流转到错误结束事件 * 当服务任务执行失败时会触发错误边界事件并流转到错误结束事件
* *
* @param process BPMN流程定义 * @param process BPMN流程定义
@ -286,4 +417,41 @@ public class BpmnConverter {
return errorFlow; return errorFlow;
} }
/**
* 转换连<EFBFBD><EFBFBD>为顺序流
*
* @param edges 工作流定义边集合
* @param idMapping 节点ID映射
* @param process 流程定义
*/
private void convertEdgesToSequenceFlows(List<WorkflowDefinitionEdge> edges, Map<String, String> idMapping, Process process) {
for (WorkflowDefinitionEdge edge : edges) {
log.debug("转换连线: from {} to {}", edge.getFrom(), edge.getTo());
SequenceFlow flow = new SequenceFlow();
flow.setId("FLOW_" + edge.getId().replaceAll("[^a-zA-Z0-9-_.]", "_"));
flow.setName(edge.getName());
flow.setSourceRef(idMapping.get(edge.getFrom()));
flow.setTargetRef(idMapping.get(edge.getTo()));
process.addFlowElement(flow);
}
}
/**
* 转义XML特殊字符
*
* @param input 需要转义的字符串
* @return 转义后的字符串
*/
private String escapeXml(String input) {
if (input == null) {
return null;
}
return input.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
.replace("'", "&apos;");
}
} }

View File

@ -0,0 +1,299 @@
package com.qqchen.deploy.backend.workflow.util;
import com.qqchen.deploy.backend.workflow.constants.WorkFlowConstants;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionEdge;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionGraph;
import com.qqchen.deploy.backend.workflow.dto.definition.workflow.WorkflowDefinitionNode;
import com.qqchen.deploy.backend.workflow.enums.NodeTypeEnums;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.BpmnAutoLayout;
import org.flowable.bpmn.converter.BpmnXMLConverter;
import org.flowable.bpmn.model.BoundaryEvent;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.ErrorEventDefinition;
import org.flowable.bpmn.model.ExtensionAttribute;
import org.flowable.bpmn.model.ExtensionElement;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
import org.flowable.bpmn.model.StartEvent;
import org.flowable.bpmn.model.TerminateEventDefinition;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* BPMN 模型转换工具
*
* @author cascade
* @date 2024-12-11
*/
@Slf4j
@Component
public class BpmnConverter1 {
/**
* 将工作流定义图转换为Flowable XML
*
* @param graph 工作流定义图
* @param processKey 流程定义的唯一标识
* @return Flowable XML字符串
* @throws RuntimeException 当转换失败时抛出
*/
public String convertToXml(WorkflowDefinitionGraph graph, String processKey) {
try {
log.debug("开始转换工作流定义为XML, processKey: {}", processKey);
// 创建BPMN模型
BpmnModel bpmnModel = new BpmnModel();
bpmnModel.setTargetNamespace("http://www.flowable.org/test");
bpmnModel.addError(WorkFlowConstants.WORKFLOW_EXEC_ERROR, WorkFlowConstants.WORKFLOW_EXEC_ERROR);
Process process = new Process();
// 确保processKey符合NCName规则
String validProcessKey = processKey.replaceAll("[^a-zA-Z0-9-_.]", "_");
process.setId(validProcessKey);
process.setName(processKey); // 保持原始processKey作为显示名称
process.setExecutable(true);
bpmnModel.addProcess(process);
// 创建节点ID到规范化ID的映射
Map<String, String> idMapping = new HashMap<>();
// 转换节点
for (WorkflowDefinitionNode node : graph.getNodes()) {
log.debug("转换节点: {}, 类型: {}", node.getNodeName(), node.getNodeCode());
// 通过NodeTypeEnums获取对应的BpmnTypeEnums中定义的实例类型
@SuppressWarnings("unchecked")
Class<? extends FlowElement> instanceClass = (Class<? extends FlowElement>) NodeTypeEnums.valueOf(node.getNodeCode())
.getBpmnType()
.getInstance();
// 创建节点实例
FlowElement element = instanceClass.getDeclaredConstructor().newInstance();
String validId = sanitizeId(node.getId());
idMapping.put(node.getId(), validId);
element.setId(validId);
element.setName(node.getNodeName());
// 设置节点特定属性
configureFlowElement(element, node, process);
process.addFlowElement(element);
}
// 转换连线
for (WorkflowDefinitionEdge edge : graph.getEdges()) {
log.debug("转换连线: from {} to {}", edge.getFrom(), edge.getTo());
SequenceFlow flow = new SequenceFlow();
flow.setId("FLOW_" + edge.getId().replaceAll("[^a-zA-Z0-9-_.]", "_"));
flow.setName(edge.getName());
flow.setSourceRef(idMapping.get(edge.getFrom()));
flow.setTargetRef(idMapping.get(edge.getTo()));
process.addFlowElement(flow);
}
// 自动布局
new BpmnAutoLayout(bpmnModel).execute();
// 转换为XML
BpmnXMLConverter converter = new BpmnXMLConverter();
byte[] bytes = converter.convertToXML(bpmnModel);
String xml = new String(bytes, StandardCharsets.UTF_8);
log.debug("工作流定义转换完成");
return xml;
} catch (Exception e) {
log.error("转换工作流定义为XML失败", e);
throw new RuntimeException("转换工作流定义为XML失败: " + e.getMessage(), e);
}
}
private String sanitizeId(String id) {
return "NODE_" + id.replaceAll("[^a-zA-Z0-9-_.]", "_");
}
/**
* 配置流程节点的特定属性
*
* @param element 流程节点元素
* @param node 工作流节点定义
* @param process 当前流程
*/
private void configureFlowElement(FlowElement element, WorkflowDefinitionNode node, Process process) {
// 为所有节点添加执行监听器
Map<String, List<ExtensionElement>> extensionElements = new HashMap<>();
List<ExtensionElement> executionListeners = new ArrayList<>();
// 开始事件监听器
ExtensionElement startListener = createExecutionListener("start", "${globalNodeExecutionListener}");
executionListeners.add(startListener);
// 结束事件监听器
ExtensionElement endListener = createExecutionListener("end", "${globalNodeExecutionListener}");
executionListeners.add(endListener);
extensionElements.put("executionListener", executionListeners);
// 根据节点类型进行特定配置
if (element instanceof ServiceTask) {
ServiceTask serviceTask = (ServiceTask) element;
// 设置委托表达式
// String delegate = (String) node.getLocalVariables().get("delegate");
// serviceTask.setImplementationType("delegateExpression");
// serviceTask.setImplementation(delegate);
// 配置重试策略
ExtensionElement retryConfig = new ExtensionElement();
retryConfig.setName("failedJobRetryTimeCycle");
retryConfig.setNamespace("http://flowable.org/bpmn");
retryConfig.setNamespacePrefix("flowable");
retryConfig.setElementText("R0/PT1H"); // 设置为0次重试
List<ExtensionElement> retryElements = new ArrayList<>();
retryElements.add(retryConfig);
extensionElements.put("failedJobRetryTimeCycle", retryElements);
// 添加字段注入
// List<FieldExtension> fieldExtensions = new ArrayList<>();
// node.getLocalVariables().forEach((key, value) -> {
// if (value != null && !"delegate".equals(key)) {
// FieldExtension fieldExtension = new FieldExtension();
// fieldExtension.setFieldName(key);
// fieldExtension.setStringValue(String.valueOf(value));
// fieldExtensions.add(fieldExtension);
// }
// });
// 设置到服务任务
// serviceTask.setFieldExtensions(fieldExtensions);
serviceTask.setExtensionElements(extensionElements);
// 添加错误边界事件
addErrorBoundaryEventHandler(process, serviceTask);
} else if (element instanceof StartEvent || element instanceof EndEvent) {
// 为开始节点和结束节点设置监听器
element.setExtensionElements(extensionElements);
}
}
/**
* 创建执行监听器扩展元素
*
* @param event 事件类型start/end
* @param delegateExpression 委托表达式
* @return 配置好的监听器扩展元素
*/
private ExtensionElement createExecutionListener(String event, String delegateExpression) {
ExtensionElement listener = new ExtensionElement();
listener.setName("executionListener");
listener.setNamespace("http://flowable.org/bpmn");
listener.setNamespacePrefix("flowable");
// 设置事件属性
ExtensionAttribute eventAttr = new ExtensionAttribute();
eventAttr.setName("event");
eventAttr.setValue(event);
// 设置委托表达式属性
ExtensionAttribute delegateAttr = new ExtensionAttribute();
delegateAttr.setName("delegateExpression");
delegateAttr.setValue(delegateExpression);
// 添加属性到监听器
Map<String, List<ExtensionAttribute>> attributes = new HashMap<>();
attributes.put("event", Collections.singletonList(eventAttr));
attributes.put("delegateExpression", Collections.singletonList(delegateAttr));
listener.setAttributes(attributes);
return listener;
}
/**
* 为服务任务添加错误边界事件和错误结<EFBFBD><EFBFBD><EFBFBD>事件
* 当服务任务执行失败时会触发错误边界事件并流转到错误结束事件
*
* @param process BPMN流程定义
* @param serviceTask 需要添加错误处理的服务任务
*/
private void addErrorBoundaryEventHandler(Process process, ServiceTask serviceTask) {
BoundaryEvent boundaryEvent = createErrorBoundaryEvent(serviceTask);
EndEvent errorEndEvent = createErrorEndEvent(serviceTask);
SequenceFlow errorFlow = createErrorSequenceFlow(boundaryEvent, errorEndEvent);
// 将错误处理相关的元素添加到流程中
process.addFlowElement(boundaryEvent);
process.addFlowElement(errorEndEvent);
process.addFlowElement(errorFlow);
}
/**
* 创建错误边界事件
*
* @param serviceTask 关联的服务任务
* @return 配置好的错误边界事件
*/
private BoundaryEvent createErrorBoundaryEvent(ServiceTask serviceTask) {
BoundaryEvent boundaryEvent = new BoundaryEvent();
boundaryEvent.setId(WorkFlowConstants.BOUNDARY_EVENT_ERROR_PREFIX + serviceTask.getId());
boundaryEvent.setName("边界事件异常");
boundaryEvent.setAttachedToRef(serviceTask);
boundaryEvent.setAttachedToRefId(serviceTask.getId());
boundaryEvent.setCancelActivity(true); // 确保取消原有活动
// 配置错误事件定义
ErrorEventDefinition errorEventDefinition = new ErrorEventDefinition();
errorEventDefinition.setErrorCode(WorkFlowConstants.WORKFLOW_EXEC_ERROR);
boundaryEvent.addEventDefinition(errorEventDefinition);
return boundaryEvent;
}
/**
* 创建错误结束事件
*
* @param serviceTask 关联的服务任务
* @return 配置好的错误结束事件
*/
private EndEvent createErrorEndEvent(ServiceTask serviceTask) {
EndEvent errorEndEvent = new EndEvent();
errorEndEvent.setId(WorkFlowConstants.END_EVENT_ERROR_PREFIX + serviceTask.getId());
errorEndEvent.setName("结束事件异常");
// 添加终止定义
TerminateEventDefinition terminateEventDefinition = new TerminateEventDefinition();
errorEndEvent.addEventDefinition(terminateEventDefinition);
// ErrorEventDefinition errorEventDefinition = new ErrorEventDefinition();
// errorEventDefinition.setErrorCode(WorkFlowConstants.WORKFLOW_EXEC_ERROR);
// errorEndEvent.addEventDefinition(errorEventDefinition);
return errorEndEvent;
}
/**
* 创建错误处理流程的连线
*
* @param boundaryEvent 错误边界事件
* @param errorEndEvent 错误结束事件
* @return 配置好的连线
*/
private SequenceFlow createErrorSequenceFlow(BoundaryEvent boundaryEvent, EndEvent errorEndEvent) {
SequenceFlow errorFlow = new SequenceFlow();
errorFlow.setId(WorkFlowConstants.SEQUENCE_FLOW_ERROR_PREFIX + boundaryEvent.getAttachedToRefId());
errorFlow.setName("连接线异常");
errorFlow.setSourceRef(boundaryEvent.getId());
errorFlow.setTargetRef(errorEndEvent.getId());
return errorFlow;
}
}