flowable-devops/backend/docs/02-后端技术设计.md
dengqichen d42166d2c0 提交
2025-10-13 16:25:13 +08:00

51 KiB
Raw Blame History

后端技术设计文档

版本: v1.0
关联: 01-架构总览.md


一、技术栈详细说明

1.1 核心依赖

<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot (WebFlux仅使用 WebClient) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
        <version>3.2.0</version>
    </dependency>
    
    <!-- Flowable -->
    <dependency>
        <groupId>org.flowable</groupId>
        <artifactId>flowable-spring-boot-starter</artifactId>
        <version>7.0.1</version>
    </dependency>
    
    <!-- MySQL -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.3.0</version>
    </dependency>
    
    <!-- Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- 表达式引擎 (JUEL - Jakarta EL) -->
    <dependency>
        <groupId>jakarta.el</groupId>
        <artifactId>jakarta.el-api</artifactId>
        <version>5.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.glassfish</groupId>
        <artifactId>jakarta.el</artifactId>
        <version>5.0.0</version>
    </dependency>
    
    <!-- 邮件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-mail</artifactId>
    </dependency>
</dependencies>

1.2 配置文件

# application.yml
spring:
  application:
    name: workflow-platform
  
  # 数据源配置(外部 MySQL不使用 Docker
  datasource:
    url: jdbc:mysql://172.22.222.111:3306/flowable-devops?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    username: root  # 请按实际用户名填写
    password: ${SPRING_DATASOURCE_PASSWORD}  # 请通过环境变量安全注入密码
    driver-class-name: com.mysql.cj.jdbc.Driver
    hikari:
      maximum-pool-size: 20
      minimum-idle: 5
      connection-timeout: 30000
  
  # Redis配置外部 Redis不使用 Docker
  redis:
    host: 172.22.222.111
    port: 6379
    database: 5
    password: ${SPRING_REDIS_PASSWORD}
    timeout: 5000
  
  # 邮件配置
  mail:
    host: smtp.example.com
    port: 587
    username: noreply@example.com
    password: ${SMTP_PASSWORD}
    properties:
      mail.smtp.auth: true
      mail.smtp.starttls.enable: true

# Flowable配置MVP 为同步执行,关闭全局异步执行器)
flowable:
  process-definition-location-prefix: classpath*:/processes/
  database-schema-update: true
  async-executor-activate: false
  
# 应用配置
app:
  workflow:
    expression-cache-size: 1000
    node-execution-timeout: 300
    enable-execution-logging: true

二、数据模型设计

2.1 业务表设计非Flowable表

表1: workflow_definitions工作流定义

CREATE TABLE workflow_definitions (
    id VARCHAR(64) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    
    -- 工作流定义JSON格式    definition JSON NOT NULL,
    /* JSON 结构:
    {
      "nodes": [
        {
          "id": "node1",
          "type": "http_request",
          "name": "Get User",
          "position": {"x": 100, "y": 100},
          "config": {
            "url": "https://api.example.com",
            "method": "GET"
          }
        }
      ],
      "edges": [
        {"source": "node1", "target": "node2"}
      ],
      "variables": {}
    }
    */
    
    -- Flowable流程定义ID转换后
    flowable_process_definition_id VARCHAR(64),
    flowable_deployment_id VARCHAR(64),
    
    -- 状态
    status VARCHAR(20) DEFAULT 'draft', -- draft, active, archived
    
    -- 元数据
    created_by VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    -- 索引
    CONSTRAINT chk_status CHECK (status IN ('draft', 'active', 'archived'))
);

-- 索引
CREATE INDEX idx_workflow_status ON workflow_definitions(status);
CREATE INDEX idx_workflow_created_at ON workflow_definitions(created_at);

表2: node_types节点类型元数据

CREATE TABLE node_types (
    id VARCHAR(64) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    display_name VARCHAR(255) NOT NULL,
    category VARCHAR(50), -- api, database, logic, notification
    icon VARCHAR(100),
    description TEXT,
    
    -- 字段定义JSON Schema    fields JSON NOT NULL,
    /* JSON 结构:
    [
      {
        "name": "url",
        "label": "URL",
        "type": "text",
        "required": true,
        "supportsExpression": true,
        "placeholder": "https://api.example.com"
      },
      {
        "name": "method",
        "label": "Method",
        "type": "select",
        "options": ["GET", "POST", "PUT", "DELETE"],
        "defaultValue": "GET"
      }
    ]
    */
    
    -- 输出结构定义JSON Schema    output_schema JSON,
    /* JSON 结构:
    {
      "type": "object",
      "properties": {
        "statusCode": {"type": "number"},
        "body": {"type": "object"},
        "headers": {"type": "object"}
      }
    }
    */
    
    -- Java实现类
    implementation_class VARCHAR(255) NOT NULL,
    
    -- 是否启用
    enabled BOOLEAN DEFAULT true,
    
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 内置节点类型初始数据
INSERT INTO node_types (id, name, display_name, category, icon, fields, output_schema, implementation_class) VALUES
('http_request', 'httpRequest', 'HTTP Request', 'api', 'api', 
 '[{"name":"url","label":"URL","type":"text","required":true,"supportsExpression":true}]',
 '{"type":"object","properties":{"statusCode":{"type":"number"},"body":{"type":"object"}}}',
 'com.workflow.nodes.HttpRequestNode');

表3: workflow_executions工作流执行记录扩展

CREATE TABLE workflow_executions (
    id VARCHAR(64) PRIMARY KEY,
    workflow_id VARCHAR(64) NOT NULL,
    workflow_definition_id VARCHAR(64) NOT NULL,
    
    -- Flowable流程实例ID
    flowable_process_instance_id VARCHAR(64) NOT NULL,
    
    -- 输入参数
    input JSON,
    
    -- 执行状态
    status VARCHAR(20) DEFAULT 'running', -- running, completed, failed, cancelled
    
    -- 开始/结束时间
    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    ended_at TIMESTAMP,
    
    -- 错误信息
    error_message TEXT,
    error_stack TEXT,
    
    -- 触发方式
    trigger_type VARCHAR(20), -- manual, cron, webhook
    triggered_by VARCHAR(100),
    
    FOREIGN KEY (workflow_definition_id) REFERENCES workflow_definitions(id),
    CONSTRAINT chk_execution_status CHECK (status IN ('running', 'completed', 'failed', 'cancelled'))
);

-- 索引
CREATE INDEX idx_execution_workflow ON workflow_executions(workflow_definition_id);
CREATE INDEX idx_execution_status ON workflow_executions(status);
CREATE INDEX idx_execution_started_at ON workflow_executions(started_at);

表4: node_execution_logs节点执行日志

CREATE TABLE node_execution_logs (
    id BIGSERIAL PRIMARY KEY,
    execution_id VARCHAR(64) NOT NULL,
    node_id VARCHAR(64) NOT NULL,
    node_name VARCHAR(255),
    node_type VARCHAR(64),
    
    -- 输入/输出
    input JSON,
    output JSON,
    
    -- 状态
    status VARCHAR(20), -- success, failed, skipped
    
    -- 时间
    started_at TIMESTAMP,
    ended_at TIMESTAMP,
    duration_ms INTEGER,
    
    -- 错误
    error_message TEXT,
    
    FOREIGN KEY (execution_id) REFERENCES workflow_executions(id)
);

-- 索引
CREATE INDEX idx_node_log_execution ON node_execution_logs(execution_id);
CREATE INDEX idx_node_log_status ON node_execution_logs(status);

2.2 Flowable表说明不需要创建自动生成

-- Flowable会自动创建这些表我们只需要知道它们的用途

-- 流程定义相关
ACT_RE_DEPLOYMENT       -- 部署信息
ACT_RE_PROCDEF          -- 流程定义
ACT_RE_MODEL            -- 模型信息

-- 运行时数据
ACT_RU_EXECUTION        -- 流程实例/执行
ACT_RU_TASK             -- 任务User TaskACT_RU_VARIABLE         -- 流程变量(我们的上下文数据)⭐
ACT_RU_JOB              -- 异步任务

-- 历史数据
ACT_HI_PROCINST         -- 流程实例历史
ACT_HI_TASKINST         -- 任务历史
ACT_HI_VARINST          -- 变量历史
ACT_HI_ACTINST          -- 活动历史(每个节点的执行记录)⭐

三、核心模块实现

3.1 节点类型注册系统

NodeTypeRegistry.java

package com.workflow.registry;

import com.workflow.model.NodeTypeMetadata;
import com.workflow.nodes.WorkflowNode;
import com.workflow.repository.NodeTypeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 节点类型注册中心
 * 
 * 职责:
 * 1. 扫描并注册所有节点类型
 * 2. 提供节点类型查询
 * 3. 管理节点元数据
 */
@Service
public class NodeTypeRegistry {
    
    private final Map<String, NodeTypeMetadata> nodeTypes = new ConcurrentHashMap<>();
    private final Map<String, WorkflowNode> nodeInstances = new ConcurrentHashMap<>();
    
    @Autowired
    private ApplicationContext applicationContext;
    
    @Autowired
    private NodeTypeRepository repository;
    
    /**
     * 启动时自动扫描注册
     */
    @PostConstruct
    public void init() {
        // 1. 扫描所有WorkflowNode实现类
        Map<String, WorkflowNode> beans = applicationContext.getBeansOfType(WorkflowNode.class);
        
        for (WorkflowNode node : beans.values()) {
            registerNode(node);
        }
        
        // 2. 从数据库加载(支持动态注册)
        List<NodeTypeMetadata> dbNodeTypes = repository.findAll();
        for (NodeTypeMetadata metadata : dbNodeTypes) {
            nodeTypes.put(metadata.getId(), metadata);
        }
        
        System.out.println("✅ 已注册 " + nodeTypes.size() + " 个节点类型");
    }
    
    /**
     * 注册单个节点
     */
    private void registerNode(WorkflowNode node) {
        NodeTypeMetadata metadata = node.getMetadata();
        
        // 验证必填字段
        if (metadata.getId() == null || metadata.getImplementationClass() == null) {
            throw new IllegalArgumentException("节点元数据不完整: " + metadata);
        }
        
        // 保存元数据
        nodeTypes.put(metadata.getId(), metadata);
        nodeInstances.put(metadata.getId(), node);
        
        // 持久化到数据库
        repository.save(metadata);
        
        System.out.println("  - " + metadata.getDisplayName() + " (" + metadata.getId() + ")");
    }
    
    /**
     * 获取节点元数据
     */
    public NodeTypeMetadata getMetadata(String typeId) {
        return nodeTypes.get(typeId);
    }
    
    /**
     * 获取节点实例(用于执行)
     */
    public WorkflowNode getNodeInstance(String typeId) {
        return nodeInstances.get(typeId);
    }
    
    /**
     * 获取所有节点类型(给前端用)
     */
    public List<NodeTypeMetadata> getAllNodeTypes() {
        return new ArrayList<>(nodeTypes.values());
    }
    
    /**
     * 按分类获取节点类型
     */
    public List<NodeTypeMetadata> getNodeTypesByCategory(String category) {
        return nodeTypes.values().stream()
            .filter(meta -> category.equals(meta.getCategory()))
            .toList();
    }
}

3.2 表达式引擎

ExpressionEngine.java

package com.workflow.engine;

import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.stereotype.Service;

import jakarta.el.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 表达式解析引擎
 * 
 * 支持语法:
 * ${nodes.node1.output.body.email}
 * ${workflow.input.username}
 * ${env.API_KEY}
 * ${nodes.step1.output.count > 10 ? 'high' : 'low'}
 */
@Service
public class ExpressionEngine {
    
    private final ExpressionFactory expressionFactory;
    
    // 表达式编译缓存(性能优化)⭐
    private final Map<String, ValueExpression> expressionCache = new ConcurrentHashMap<>();
    
    public ExpressionEngine() {
        this.expressionFactory = ExpressionFactory.newInstance();
    }
    
    /**
     * 解析单个表达式
     * 
     * @param expression 表达式字符串,例如: "${nodes.node1.output.body.email}"
     * @param execution Flowable执行上下文
     * @return 解析后的值
     */
    public Object evaluate(String expression, DelegateExecution execution) {
        if (expression == null) {
            return null;
        }
        
        // 快速路径:无表达式,直接返回 ⭐
        if (!expression.contains("${")) {
            return expression;
        }
        
        // 提取所有表达式: ${...}
        Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
        Matcher matcher = pattern.matcher(expression);
        
        // 如果是纯表达式(整个字符串就是一个表达式)
        if (matcher.matches()) {
            return evaluateSingle(expression, execution);
        }
        
        // 混合字符串,替换所有表达式
        StringBuffer result = new StringBuffer();
        while (matcher.find()) {
            String fullExpr = matcher.group(0); // ${...}
            Object value = evaluateSingle(fullExpr, execution);
            matcher.appendReplacement(result, Matcher.quoteReplacement(String.valueOf(value)));
        }
        matcher.appendTail(result);
        
        return result.toString();
    }
    
    /**
     * 解析单个完整表达式
     */
    private Object evaluateSingle(String expression, DelegateExecution execution) {
        try {
            // 尝试从缓存获取
            ValueExpression expr = expressionCache.get(expression);
            
            if (expr == null) {
                // 编译表达式
                StandardELContext context = createContext(execution);
                expr = expressionFactory.createValueExpression(context, expression, Object.class);
                
                // 缓存(限制大小)
                if (expressionCache.size() < 1000) {
                    expressionCache.put(expression, expr);
                }
            }
            
            // 求值
            StandardELContext context = createContext(execution);
            return expr.getValue(context);
            
        } catch (Exception e) {
            throw new ExpressionEvaluationException(
                "表达式解析失败: " + expression + ", 错误: " + e.getMessage(), 
                e
            );
        }
    }
    
    /**
     * 创建EL上下文注入变量⭐⭐⭐
     */
    private StandardELContext createContext(DelegateExecution execution) {
        StandardELContext context = new StandardELContext(expressionFactory);
        
        // 1. 注入 nodes节点输出
        Map<String, Object> nodesData = (Map<String, Object>) execution.getVariable("nodes");
        if (nodesData != null) {
            ValueExpression nodesExpr = expressionFactory.createValueExpression(
                nodesData, Map.class
            );
            context.getVariableMapper().setVariable("nodes", nodesExpr);
        }
        
        // 2. 注入 workflow工作流输入和变量
        Map<String, Object> workflowData = (Map<String, Object>) execution.getVariable("workflow");
        if (workflowData != null) {
            ValueExpression workflowExpr = expressionFactory.createValueExpression(
                workflowData, Map.class
            );
            context.getVariableMapper().setVariable("workflow", workflowExpr);
        }
        
        // 3. 注入 env环境变量
        ValueExpression envExpr = expressionFactory.createValueExpression(
            System.getenv(), Map.class
        );
        context.getVariableMapper().setVariable("env", envExpr);
        
        return context;
    }
    
    /**
     * 批量解析对象中的所有表达式(递归)
     */
    public Map<String, Object> resolveObject(Map<String, Object> input, DelegateExecution execution) {
        Map<String, Object> result = new HashMap<>();
        
        for (Map.Entry<String, Object> entry : input.entrySet()) {
            Object value = entry.getValue();
            
            if (value instanceof String) {
                // 解析字符串表达式
                result.put(entry.getKey(), evaluate((String) value, execution));
                
            } else if (value instanceof Map) {
                // 递归解析嵌套对象
                result.put(entry.getKey(), resolveObject((Map<String, Object>) value, execution));
                
            } else if (value instanceof List) {
                // 解析数组
                List<?> list = (List<?>) value;
                List<Object> resolvedList = new ArrayList<>();
                for (Object item : list) {
                    if (item instanceof String) {
                        resolvedList.add(evaluate((String) item, execution));
                    } else if (item instanceof Map) {
                        resolvedList.add(resolveObject((Map<String, Object>) item, execution));
                    } else {
                        resolvedList.add(item);
                    }
                }
                result.put(entry.getKey(), resolvedList);
                
            } else {
                // 其他类型直接返回
                result.put(entry.getKey(), value);
            }
        }
        
        return result;
    }
}

/**
 * 表达式解析异常
 */
class ExpressionEvaluationException extends RuntimeException {
    public ExpressionEvaluationException(String message, Throwable cause) {
        super(message, cause);
    }
}

3.3 JSON → BPMN 转换器

WorkflowConverter.java

package com.workflow.converter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.workflow.model.*;
import org.flowable.bpmn.BpmnAutoLayout;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * 工作流转换器JSON ↔ BPMN XML
 * 
 * 核心职责:
 * 1. 将前端的JSON工作流定义转换为Flowable的BPMN XML
 * 2. 将BPMN XML转换回JSON用于编辑
 */
@Service
public class WorkflowConverter {
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    /**
     * JSON → BPMN XML
     * 
     * @param workflow JSON格式的工作流定义
     * @return BPMN 2.0 XML字符串
     */
    public String convertToBpmn(WorkflowDefinition workflow) {
        // 1. 创建BPMN模型
        BpmnModel bpmnModel = new BpmnModel();
        
        // 2. 创建流程
        Process process = new Process();
        process.setId(workflow.getId());
        process.setName(workflow.getName());
        
        // 3. 添加开始事件
        StartEvent startEvent = new StartEvent();
        startEvent.setId("start");
        startEvent.setName("开始");
        process.addFlowElement(startEvent);
        
        // 4. 转换节点
        Map<String, FlowElement> elementMap = new HashMap<>();
        for (WorkflowNode node : workflow.getNodes()) {
            FlowElement element = convertNode(node);
            process.addFlowElement(element);
            elementMap.put(node.getId(), element);
        }
        
        // 5. 添加结束事件
        EndEvent endEvent = new EndEvent();
        endEvent.setId("end");
        endEvent.setName("结束");
        process.addFlowElement(endEvent);
        
        // 6. 转换连线
        convertEdges(workflow, process, elementMap);
        
        // 7. 添加流程到模型
        bpmnModel.addProcess(process);
        
        // 8. 自动布局(可选)
        new BpmnAutoLayout(bpmnModel).execute();
        
        // 9. 转换为XML
        return convertBpmnModelToXml(bpmnModel);
    }
    
    /**
     * 转换单个节点
     */
    private FlowElement convertNode(WorkflowNode node) {
        // 根据节点类型创建不同的BPMN元素
        
        // User Task审批节点
        if ("approval".equals(node.getType()) || "user_task".equals(node.getType())) {
            return createUserTask(node);
        }
        
        // Service Task普通节点
        return createServiceTask(node);
    }
    
    /**
     * 创建Service Task
     */
    private ServiceTask createServiceTask(WorkflowNode node) {
        ServiceTask task = new ServiceTask();
        task.setId(node.getId());
        task.setName(node.getName());
        
        // 使用通用执行器
        task.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
        task.setImplementation("${genericNodeExecutor}");
        
        // 通过Field Extension传递节点配置 ⭐⭐⭐
        List<FieldExtension> fields = new ArrayList<>();
        
        // 节点类型
        FieldExtension typeField = new FieldExtension();
        typeField.setFieldName("nodeType");
        typeField.setStringValue(node.getType());
        fields.add(typeField);
        
        // 节点配置JSON
        try {
            String configJson = objectMapper.writeValueAsString(node.getConfig());
            FieldExtension configField = new FieldExtension();
            configField.setFieldName("nodeConfig");
            configField.setStringValue(configJson);
            fields.add(configField);
        } catch (Exception e) {
            throw new RuntimeException("序列化节点配置失败: " + node.getId(), e);
        }
        
        task.setFieldExtensions(fields);
        
        return task;
    }
    
    /**
     * 创建User Task审批节点
     */
    private UserTask createUserTask(WorkflowNode node) {
        UserTask task = new UserTask();
        task.setId(node.getId());
        task.setName(node.getName());
        
        // 审批人(支持表达式)
        String assignee = (String) node.getConfig().get("assignee");
        if (assignee != null) {
            task.setAssignee(assignee);
        }
        
        // 候选组
        List<String> candidateGroups = (List<String>) node.getConfig().get("candidateGroups");
        if (candidateGroups != null) {
            task.setCandidateGroups(candidateGroups);
        }
        
        // 表单字段
        List<FormProperty> formProperties = new ArrayList<>();
        List<Map<String, Object>> formFields = 
            (List<Map<String, Object>>) node.getConfig().get("formFields");
        
        if (formFields != null) {
            for (Map<String, Object> field : formFields) {
                FormProperty prop = new FormProperty();
                prop.setId((String) field.get("id"));
                prop.setName((String) field.get("label"));
                prop.setType((String) field.get("type"));
                prop.setRequired((Boolean) field.getOrDefault("required", false));
                formProperties.add(prop);
            }
        }
        
        task.setFormProperties(formProperties);
        
        return task;
    }
    
    /**
     * 转换连线
     */
    private void convertEdges(
        WorkflowDefinition workflow,
        Process process,
        Map<String, FlowElement> elementMap
    ) {
        // 按拓扑排序连接节点
        List<WorkflowNode> sortedNodes = topologicalSort(workflow);
        
        // 连接 start → 第一个节点
        if (!sortedNodes.isEmpty()) {
            WorkflowNode firstNode = sortedNodes.get(0);
            addSequenceFlow(process, "start", firstNode.getId());
        }
        
        // 连接节点间的边
        for (WorkflowEdge edge : workflow.getEdges()) {
            SequenceFlow flow = addSequenceFlow(process, edge.getSource(), edge.getTarget());
            if (edge.getCondition() != null && !edge.getCondition().isBlank()) {
                flow.setConditionExpression(edge.getCondition());
            }
        }
        
        // 连接 最后一个节点 → end
        if (!sortedNodes.isEmpty()) {
            // 找到所有出度为0的节点没有出边的节点
            Set<String> targetNodes = workflow.getEdges().stream()
                .map(WorkflowEdge::getTarget)
                .collect(Collectors.toSet());
            
            for (WorkflowNode node : workflow.getNodes()) {
                if (!targetNodes.contains(node.getId())) {
                    // 这是最后的节点
                    addSequenceFlow(process, node.getId(), "end");
                }
            }
        }
    }
    
    /**
     * 添加序列流
     */
    private SequenceFlow addSequenceFlow(Process process, String sourceId, String targetId) {
        SequenceFlow flow = new SequenceFlow();
        flow.setId("flow_" + sourceId + "_" + targetId);
        flow.setSourceRef(sourceId);
        flow.setTargetRef(targetId);
        process.addFlowElement(flow);
        return flow;
    }
    
    /**
     * 拓扑排序(确保节点执行顺序正确)
     */
    private List<WorkflowNode> topologicalSort(WorkflowDefinition workflow) {
        // 实现拓扑排序算法
        Map<String, WorkflowNode> nodeMap = workflow.getNodes().stream()
            .collect(Collectors.toMap(WorkflowNode::getId, n -> n));
        
        Map<String, Integer> inDegree = new HashMap<>();
        Map<String, List<String>> adjacency = new HashMap<>();
        
        // 初始化
        for (WorkflowNode node : workflow.getNodes()) {
            inDegree.put(node.getId(), 0);
            adjacency.put(node.getId(), new ArrayList<>());
        }
        
        // 构建图
        for (WorkflowEdge edge : workflow.getEdges()) {
            adjacency.get(edge.getSource()).add(edge.getTarget());
            inDegree.put(edge.getTarget(), inDegree.get(edge.getTarget()) + 1);
        }
        
        // BFS
        Queue<String> queue = new LinkedList<>();
        for (Map.Entry<String, Integer> entry : inDegree.entrySet()) {
            if (entry.getValue() == 0) {
                queue.offer(entry.getKey());
            }
        }
        
        List<WorkflowNode> result = new ArrayList<>();
        while (!queue.isEmpty()) {
            String nodeId = queue.poll();
            result.add(nodeMap.get(nodeId));
            
            for (String neighbor : adjacency.get(nodeId)) {
                inDegree.put(neighbor, inDegree.get(neighbor) - 1);
                if (inDegree.get(neighbor) == 0) {
                    queue.offer(neighbor);
                }
            }
        }
        
        // 检测环
        if (result.size() != workflow.getNodes().size()) {
            throw new IllegalArgumentException("工作流存在环,无法执行");
        }
        
        return result;
    }
    
    /**
     * BPMN Model → XML
     */
    private String convertBpmnModelToXml(BpmnModel bpmnModel) {
        BpmnXMLConverter converter = new BpmnXMLConverter();
        byte[] xmlBytes = converter.convertToXML(bpmnModel);
        return new String(xmlBytes, StandardCharsets.UTF_8);
    }
}

四、节点实现示例

4.1 节点接口定义

package com.workflow.nodes;

import com.workflow.model.*;

/**
 * 工作流节点接口
 * 
 * 所有节点必须实现此接口
 */
public interface WorkflowNode {
    
    /**
     * 获取节点元数据(字段定义、输出结构等)
     */
    NodeTypeMetadata getMetadata();
    
    /**
     * 执行节点
     * 
     * @param input 输入参数(已解析表达式)
     * @param context 执行上下文
     * @return 执行结果
     */
    NodeExecutionResult execute(NodeInput input, NodeExecutionContext context);
}

4.2 HTTP Request 节点

package com.workflow.nodes;

import com.workflow.model.*;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.time.Instant;
import java.util.*;

/**
 * HTTP请求节点
 */
@Component
public class HttpRequestNode implements WorkflowNode {
    
    private final RestTemplate restTemplate = new RestTemplate();
    
    @Override
    public NodeTypeMetadata getMetadata() {
        return NodeTypeMetadata.builder()
            .id("http_request")
            .name("httpRequest")
            .displayName("HTTP Request")
            .category("api")
            .icon("api")
            .description("发送HTTP请求到指定URL")
            .fields(List.of(
                // URL字段
                FieldDefinition.builder()
                    .name("url")
                    .label("URL")
                    .type(FieldType.TEXT)
                    .required(true)
                    .supportsExpression(true)
                    .placeholder("https://api.example.com/users")
                    .build(),
                    
                // 请求方法
                FieldDefinition.builder()
                    .name("method")
                    .label("Method")
                    .type(FieldType.SELECT)
                    .required(true)
                    .options(List.of("GET", "POST", "PUT", "DELETE", "PATCH"))
                    .defaultValue("GET")
                    .build(),
                    
                // 请求头
                FieldDefinition.builder()
                    .name("headers")
                    .label("Headers")
                    .type(FieldType.KEY_VALUE)
                    .supportsFieldMapping(true)
                    .build(),
                    
                // 请求体
                FieldDefinition.builder()
                    .name("body")
                    .label("Request Body")
                    .type(FieldType.CODE)
                    .language("json")
                    .supportsExpression(true)
                    .build(),
                    
                // 超时设置
                FieldDefinition.builder()
                    .name("timeout")
                    .label("Timeout (ms)")
                    .type(FieldType.NUMBER)
                    .defaultValue(30000)
                    .build()
            ))
            .outputSchema(Map.of(
                "type", "object",
                "properties", Map.of(
                    "statusCode", Map.of("type", "number", "description", "HTTP状态码"),
                    "body", Map.of("type", "object", "description", "响应体"),
                    "headers", Map.of("type", "object", "description", "响应头"),
                    "elapsed", Map.of("type", "number", "description", "耗时(ms)")
                )
            ))
            .implementationClass("com.workflow.nodes.HttpRequestNode")
            .build();
    }
    
    @Override
    public NodeExecutionResult execute(NodeInput input, NodeExecutionContext context) {
        Instant startTime = Instant.now();
        
        try {
            // 1. 获取参数(表达式已被解析)⭐
            String url = input.getStringRequired("url");
            String method = input.getString("method", "GET");
            Map<String, String> headers = input.getMap("headers", new HashMap<>());
            String body = input.getString("body");
            Integer timeout = input.getInteger("timeout", 30000);
            
            // 2. 构建HTTP请求
            HttpHeaders httpHeaders = new HttpHeaders();
            headers.forEach(httpHeaders::set);
            
            HttpEntity<String> entity = new HttpEntity<>(body, httpHeaders);
            
            // 3. 发送请求
            ResponseEntity<String> response = restTemplate.exchange(
                url,
                HttpMethod.valueOf(method),
                entity,
                String.class
            );
            
            // 4. 解析响应体尝试JSON
            Object responseBody;
            try {
                responseBody = new ObjectMapper().readValue(
                    response.getBody(),
                    Object.class
                );
            } catch (Exception e) {
                // 不是JSON返回原始文本
                responseBody = response.getBody();
            }
            
            // 5. 构建输出
            Instant endTime = Instant.now();
            long elapsed = endTime.toEpochMilli() - startTime.toEpochMilli();
            
            Map<String, Object> output = new HashMap<>();
            output.put("statusCode", response.getStatusCodeValue());
            output.put("body", responseBody);
            output.put("headers", response.getHeaders().toSingleValueMap());
            output.put("elapsed", elapsed);
            
            return NodeExecutionResult.success(output, startTime, endTime);
            
        } catch (Exception e) {
            Instant endTime = Instant.now();
            return NodeExecutionResult.failed(
                e.getMessage(),
                e.getClass().getSimpleName(),
                startTime,
                endTime
            );
        }
    }
}

4.3 通用节点执行器

package com.workflow.executor;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.workflow.engine.ExpressionEngine;
import com.workflow.model.*;
import com.workflow.nodes.WorkflowNode;
import com.workflow.registry.NodeTypeRegistry;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

/**
 * 通用节点执行器
 * 
 * 所有Service Task都使用这个执行器
 * 根据节点类型动态调用对应的实现类
 */
@Component("genericNodeExecutor")
public class GenericNodeExecutor implements JavaDelegate {
    
    @Autowired
    private NodeTypeRegistry nodeTypeRegistry;
    
    @Autowired
    private ExpressionEngine expressionEngine;
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void execute(DelegateExecution execution) {
        String nodeId = execution.getCurrentActivityId();
        
        try {
            // 1. 获取节点类型和配置从Field Extension
            String nodeType = getFieldValue(execution, "nodeType");
            String nodeConfigJson = getFieldValue(execution, "nodeConfig");
            
            Map<String, Object> nodeConfig = objectMapper.readValue(
                nodeConfigJson,
                new TypeReference<Map<String, Object>>() {}
            );
            
            // 2. 解析表达式(处理数据映射)⭐⭐⭐
            Map<String, Object> resolvedConfig = expressionEngine.resolveObject(
                nodeConfig,
                execution
            );
            
            // 3. 获取节点实现类
            WorkflowNode nodeImpl = nodeTypeRegistry.getNodeInstance(nodeType);
            if (nodeImpl == null) {
                throw new RuntimeException("未找到节点实现: " + nodeType);
            }
            
            // 4. 构建执行上下文
            NodeExecutionContext context = buildContext(execution, nodeId);
            
            // 5. 执行节点 ⭐
            NodeInput input = new NodeInput(resolvedConfig);
            NodeExecutionResult result = nodeImpl.execute(input, context);
            
            // 6. 保存节点输出到流程变量 ⭐⭐⭐
            saveNodeOutput(execution, nodeId, resolvedConfig, result);
            
            // 7. 记录日志
            logExecution(execution, nodeId, nodeType, resolvedConfig, result);
            
        } catch (Exception e) {
            // 错误处理
            handleError(execution, nodeId, e);
            throw new RuntimeException("节点执行失败: " + nodeId, e);
        }
    }
    
    /**
     * 保存节点输出到流程变量(关键)⭐⭐⭐
     * 
     * 结构:
     * {
     *   "nodes": {
     *     "node1": {
     *       "status": "success",
     *       "input": {...},
     *       "output": {...},
     *       "startTime": "...",
     *       "endTime": "..."
     *     }
     *   }
     * }
     */
    private void saveNodeOutput(
        DelegateExecution execution,
        String nodeId,
        Map<String, Object> input,
        NodeExecutionResult result
    ) {
        // 获取或创建nodes对象
        Map<String, Object> nodesData = 
            (Map<String, Object>) execution.getVariable("nodes");
        
        if (nodesData == null) {
            nodesData = new HashMap<>();
        }
        
        // 构建节点数据
        Map<String, Object> nodeData = new HashMap<>();
        nodeData.put("status", result.getStatus().name().toLowerCase());
        nodeData.put("input", input);
        nodeData.put("output", result.getOutput());
        nodeData.put("startTime", result.getStartTime().toString());
        nodeData.put("endTime", result.getEndTime().toString());
        
        if (result.getError() != null) {
            nodeData.put("error", result.getError());
        }
        
        // 保存
        nodesData.put(nodeId, nodeData);
        execution.setVariable("nodes", nodesData);
    }
    
    /**
     * 构建执行上下文
     */
    private NodeExecutionContext buildContext(DelegateExecution execution, String nodeId) {
        return NodeExecutionContext.builder()
            .workflowId(execution.getProcessDefinitionId())
            .executionId(execution.getProcessInstanceId())
            .nodeId(nodeId)
            .variables((Map<String, Object>) execution.getVariable("workflow"))
            .nodes((Map<String, Object>) execution.getVariable("nodes"))
            .env(System.getenv())
            .build();
    }
    
    /**
     * 记录执行日志
     */
    private void logExecution(
        DelegateExecution execution,
        String nodeId,
        String nodeType,
        Map<String, Object> input,
        NodeExecutionResult result
    ) {
        // 保存到数据库
        NodeExecutionLog log = new NodeExecutionLog();
        log.setExecutionId(execution.getProcessInstanceId());
        log.setNodeId(nodeId);
        log.setNodeType(nodeType);
        log.setInput(input);
        log.setOutput(result.getOutput());
        log.setStatus(result.getStatus());
        log.setStartedAt(result.getStartTime());
        log.setEndedAt(result.getEndTime());
        
        if (result.getError() != null) {
            log.setErrorMessage(result.getError());
        }
        
        // nodeExecutionLogRepository.save(log);
    }
    
    /**
     * 错误处理
     */
    private void handleError(DelegateExecution execution, String nodeId, Exception e) {
        Map<String, Object> nodesData = 
            (Map<String, Object>) execution.getVariable("nodes");
        
        if (nodesData == null) {
            nodesData = new HashMap<>();
        }
        
        Map<String, Object> nodeData = new HashMap<>();
        nodeData.put("status", "failed");
        nodeData.put("error", e.getMessage());
        nodeData.put("errorType", e.getClass().getSimpleName());
        nodeData.put("endTime", Instant.now().toString());
        
        nodesData.put(nodeId, nodeData);
        execution.setVariable("nodes", nodesData);
    }
    
    /**
     * 获取Field Extension的值
     */
    private String getFieldValue(DelegateExecution execution, String fieldName) {
        // 从当前 ServiceTask 的 FieldExtension 读取
        org.flowable.bpmn.model.FlowElement fe = execution.getCurrentFlowElement();
        if (fe instanceof org.flowable.bpmn.model.ServiceTask) {
            org.flowable.bpmn.model.ServiceTask st = (org.flowable.bpmn.model.ServiceTask) fe;
            return st.getFieldExtensions().stream()
                .filter(f -> fieldName.equals(f.getFieldName()))
                .map(org.flowable.bpmn.model.FieldExtension::getStringValue)
                .findFirst()
                .orElse(null);
        }
        return null;
    }
}

五、REST API 设计

5.1 工作流管理 API

package com.workflow.controller;

import com.workflow.model.*;
import com.workflow.service.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;

/**
 * 工作流管理API
 */
@RestController
@RequestMapping("/api/workflows")
public class WorkflowController {
    
    @Autowired
    private WorkflowService workflowService;
    
    /**
     * 创建工作流
     */
    @PostMapping
    public ResponseEntity<WorkflowDefinition> createWorkflow(
        @RequestBody WorkflowDefinition workflow
    ) {
        WorkflowDefinition created = workflowService.create(workflow);
        return ResponseEntity.ok(created);
    }
    
    /**
     * 更新工作流
     */
    @PutMapping("/{id}")
    public ResponseEntity<WorkflowDefinition> updateWorkflow(
        @PathVariable String id,
        @RequestBody WorkflowDefinition workflow
    ) {
        WorkflowDefinition updated = workflowService.update(id, workflow);
        return ResponseEntity.ok(updated);
    }
    
    /**
     * 获取工作流详情
     */
    @GetMapping("/{id}")
    public ResponseEntity<WorkflowDefinition> getWorkflow(@PathVariable String id) {
        WorkflowDefinition workflow = workflowService.getById(id);
        return ResponseEntity.ok(workflow);
    }
    
    /**
     * 获取工作流列表
     */
    @GetMapping
    public ResponseEntity<List<WorkflowDefinition>> listWorkflows(
        @RequestParam(required = false) String status
    ) {
        List<WorkflowDefinition> workflows = workflowService.list(status);
        return ResponseEntity.ok(workflows);
    }
    
    /**
     * 删除工作流
     */
    @DeleteMapping("/{id}")
    public ResponseEntity<Void> deleteWorkflow(@PathVariable String id) {
        workflowService.delete(id);
        return ResponseEntity.noContent().build();
    }
    
    /**
     * 执行工作流
     */
    @PostMapping("/{id}/execute")
    public ResponseEntity<WorkflowExecutionResult> executeWorkflow(
        @PathVariable String id,
        @RequestBody Map<String, Object> input
    ) {
        WorkflowExecutionResult result = workflowService.execute(id, input);
        return ResponseEntity.ok(result);
    }
    
    /**
     * 获取执行记录
     */
    @GetMapping("/{id}/executions")
    public ResponseEntity<List<WorkflowExecutionRecord>> getExecutions(
        @PathVariable String id
    ) {
        List<WorkflowExecutionRecord> executions = workflowService.getExecutions(id);
        return ResponseEntity.ok(executions);
    }
    
    /**
     * 获取单个执行详情
     */
    @GetMapping("/executions/{executionId}")
    public ResponseEntity<WorkflowExecutionDetail> getExecutionDetail(
        @PathVariable String executionId
    ) {
        WorkflowExecutionDetail detail = workflowService.getExecutionDetail(executionId);
        return ResponseEntity.ok(detail);
    }
}

5.2 节点类型 API

package com.workflow.controller;

import com.workflow.model.NodeTypeMetadata;
import com.workflow.registry.NodeTypeRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
 * 节点类型API
 */
@RestController
@RequestMapping("/api/node-types")
public class NodeTypeController {
    
    @Autowired
    private NodeTypeRegistry nodeTypeRegistry;
    
    /**
     * 获取所有节点类型
     */
    @GetMapping
    public ResponseEntity<List<NodeTypeMetadata>> getAllNodeTypes() {
        List<NodeTypeMetadata> nodeTypes = nodeTypeRegistry.getAllNodeTypes();
        return ResponseEntity.ok(nodeTypes);
    }
    
    /**
     * 获取单个节点类型
     */
    @GetMapping("/{typeId}")
    public ResponseEntity<NodeTypeMetadata> getNodeType(@PathVariable String typeId) {
        NodeTypeMetadata metadata = nodeTypeRegistry.getMetadata(typeId);
        return ResponseEntity.ok(metadata);
    }
    
    /**
     * 按分类获取节点类型
     */
    @GetMapping("/category/{category}")
    public ResponseEntity<List<NodeTypeMetadata>> getNodeTypesByCategory(
        @PathVariable String category
    ) {
        List<NodeTypeMetadata> nodeTypes = 
            nodeTypeRegistry.getNodeTypesByCategory(category);
        return ResponseEntity.ok(nodeTypes);
    }
}

5.3 审批任务 API

package com.workflow.controller;

import com.workflow.model.*;
import com.workflow.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;

/**
 * 审批任务API
 */
@RestController
@RequestMapping("/api/tasks")
public class TaskController {
    
    @Autowired
    private TaskService taskService;
    
    /**
     * 获取待办任务列表
     */
    @GetMapping
    public ResponseEntity<List<TaskInfo>> getTasks(
        @RequestParam(required = false) String assignee
    ) {
        List<TaskInfo> tasks = taskService.getTasks(assignee);
        return ResponseEntity.ok(tasks);
    }
    
    /**
     * 获取任务详情
     */
    @GetMapping("/{taskId}")
    public ResponseEntity<TaskDetail> getTaskDetail(@PathVariable String taskId) {
        TaskDetail detail = taskService.getTaskDetail(taskId);
        return ResponseEntity.ok(detail);
    }
    
    /**
     * 完成任务(审批)
     */
    @PostMapping("/{taskId}/complete")
    public ResponseEntity<Void> completeTask(
        @PathVariable String taskId,
        @RequestBody Map<String, Object> variables
    ) {
        taskService.complete(taskId, variables);
        return ResponseEntity.ok().build();
    }
    
    /**
     * 获取任务表单
     */
    @GetMapping("/{taskId}/form")
    public ResponseEntity<TaskForm> getTaskForm(@PathVariable String taskId) {
        TaskForm form = taskService.getTaskForm(taskId);
        return ResponseEntity.ok(form);
    }
}

六、关键实现细节(重要)

6.1 如何确保数据一致性?

问题:工作流执行过程中,如果服务重启怎么办?

解决方案

1. Flowable自动持久化
   - 流程状态保存在ACT_RU_*
   - 变量保存在ACT_RU_VARIABLE表
   - 任务保存在ACT_RU_TASK表
   
2. 服务重启后自动恢复
   - Flowable的AsyncExecutor会自动重试未完成的任务
   
3. 幂等性设计
   - 每个节点执行前检查是否已执行
   - 使用节点ID作为幂等键

6.2 如何处理节点执行超时?

实现

@Component
public class TimeoutAwareNodeExecutor implements JavaDelegate {
    
    @Override
    public void execute(DelegateExecution execution) {
        String nodeId = execution.getCurrentActivityId();
        int timeout = getTimeout(execution); // 从配置获取
        
        Future<NodeExecutionResult> future = executorService.submit(() -> {
            return actuallyExecuteNode(execution);
        });
        
        try {
            NodeExecutionResult result = future.get(timeout, TimeUnit.SECONDS);
            saveNodeOutput(execution, nodeId, result);
            
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new RuntimeException("节点执行超时: " + nodeId);
        }
    }
}

6.3 如何防止表达式注入攻击?

风险:用户输入 ${Runtime.getRuntime().exec("rm -rf /")}

防护

@Service
public class SecureExpressionEngine extends ExpressionEngine {
    
    // 禁止访问的类
    private static final Set<String> BLOCKED_CLASSES = Set.of(
        "Runtime",
        "ProcessBuilder",
        "System",
        "Class",
        "ClassLoader"
    );
    
    @Override
    public Object evaluate(String expression, DelegateExecution execution) {
        // 1. 检查是否包含危险类
        for (String blocked : BLOCKED_CLASSES) {
            if (expression.contains(blocked)) {
                throw new SecurityException("表达式包含禁止使用的类: " + blocked);
            }
        }
        
        // 2. 限制表达式长度
        if (expression.length() > 1000) {
            throw new SecurityException("表达式过长");
        }
        
        // 3. 正常解析
        return super.evaluate(expression, execution);
    }
}

七、测试策略

7.1 单元测试

@SpringBootTest
public class ExpressionEngineTest {
    
    @Autowired
    private ExpressionEngine expressionEngine;
    
    @Mock
    private DelegateExecution execution;
    
    @Test
    public void testSimpleExpression() {
        // 准备上下文
        Map<String, Object> nodes = Map.of(
            "node1", Map.of(
                "output", Map.of(
                    "body", Map.of("email", "test@example.com")
                )
            )
        );
        when(execution.getVariable("nodes")).thenReturn(nodes);
        
        // 执行
        String result = (String) expressionEngine.evaluate(
            "${nodes.node1.output.body.email}",
            execution
        );
        
        // 验证
        assertEquals("test@example.com", result);
    }
}

7.2 集成测试

@SpringBootTest
public class WorkflowIntegrationTest {
    
    @Autowired
    private WorkflowService workflowService;
    
    @Test
    public void testCompleteWorkflow() {
        // 1. 创建工作流
        WorkflowDefinition workflow = createTestWorkflow();
        WorkflowDefinition created = workflowService.create(workflow);
        
        // 2. 执行
        Map<String, Object> input = Map.of("username", "testuser");
        WorkflowExecutionResult result = workflowService.execute(created.getId(), input);
        
        // 3. 验证
        assertEquals("completed", result.getStatus());
        assertNotNull(result.getOutput());
    }
}

八、部署配置

8.1 Docker Compose

# docker-compose.yml
version: '3.8'

services:
  mysql:
    image: mysql:8
    environment:
      MYSQL_DATABASE: workflow_db
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "3306:3306"
    command: ["--default-authentication-plugin=mysql_native_password", "--character-set-server=utf8mb4", "--collation-server=utf8mb4_unicode_ci"]
    volumes:
      - mysql_data:/var/lib/mysql
  
  redis:
    image: redis:7
    ports:
      - "6379:6379"
  
  workflow-backend:
    build: ./backend
    ports:
      - "8080:8080"
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/workflow_db?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
      SPRING_DATASOURCE_USERNAME: root
      SPRING_DATASOURCE_PASSWORD: root
      SPRING_REDIS_HOST: redis
    depends_on:
      - mysql
      - redis

volumes:
  mysql_data:

8.2 生产环境配置

# application-prod.yml
spring:
  datasource:
    hikari:
      maximum-pool-size: 50
      minimum-idle: 10
  
flowable:
  async-executor-core-pool-size: 20
  async-executor-max-pool-size: 100

logging:
  level:
    com.workflow: INFO
    org.flowable: WARN

下一步:查看 03-前端技术设计.md