# 后端技术设计文档
**版本**: v1.0
**关联**: 01-架构总览.md
---
## 一、技术栈详细说明
### 1.1 核心依赖
```xml
org.springframework.boot
spring-boot-starter-webflux
3.2.0
org.flowable
flowable-spring-boot-starter
7.0.1
com.mysql
mysql-connector-j
8.3.0
org.springframework.boot
spring-boot-starter-data-redis
com.fasterxml.jackson.core
jackson-databind
jakarta.el
jakarta.el-api
5.0.0
org.glassfish
jakarta.el
5.0.0
org.springframework.boot
spring-boot-starter-mail
```
### 1.2 配置文件
```yaml
# application.yml
spring:
application:
name: workflow-platform
# 数据源配置(外部 MySQL,不使用 Docker)
datasource:
url: jdbc:mysql://172.22.222.111:3306/flowable-devops?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
username: root # 请按实际用户名填写
password: ${SPRING_DATASOURCE_PASSWORD} # 请通过环境变量安全注入密码
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
# Redis配置(外部 Redis,不使用 Docker)
redis:
host: 172.22.222.111
port: 6379
database: 5
password: ${SPRING_REDIS_PASSWORD}
timeout: 5000
# 邮件配置
mail:
host: smtp.example.com
port: 587
username: noreply@example.com
password: ${SMTP_PASSWORD}
properties:
mail.smtp.auth: true
mail.smtp.starttls.enable: true
# Flowable配置(MVP 为同步执行,关闭全局异步执行器)
flowable:
process-definition-location-prefix: classpath*:/processes/
database-schema-update: true
async-executor-activate: false
# 应用配置
app:
workflow:
expression-cache-size: 1000
node-execution-timeout: 300
enable-execution-logging: true
```
---
## 二、数据模型设计
### 2.1 业务表设计(非Flowable表)
#### 表1: workflow_definitions(工作流定义)
```sql
CREATE TABLE workflow_definitions (
id VARCHAR(64) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
-- 工作流定义(JSON格式)⭐
definition JSON NOT NULL,
/* JSON 结构:
{
"nodes": [
{
"id": "node1",
"type": "http_request",
"name": "Get User",
"position": {"x": 100, "y": 100},
"config": {
"url": "https://api.example.com",
"method": "GET"
}
}
],
"edges": [
{"source": "node1", "target": "node2"}
],
"variables": {}
}
*/
-- Flowable流程定义ID(转换后)
flowable_process_definition_id VARCHAR(64),
flowable_deployment_id VARCHAR(64),
-- 状态
status VARCHAR(20) DEFAULT 'draft', -- draft, active, archived
-- 元数据
created_by VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- 索引
CONSTRAINT chk_status CHECK (status IN ('draft', 'active', 'archived'))
);
-- 索引
CREATE INDEX idx_workflow_status ON workflow_definitions(status);
CREATE INDEX idx_workflow_created_at ON workflow_definitions(created_at);
```
#### 表2: node_types(节点类型元数据)
```sql
CREATE TABLE node_types (
id VARCHAR(64) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
display_name VARCHAR(255) NOT NULL,
category VARCHAR(50), -- api, database, logic, notification
icon VARCHAR(100),
description TEXT,
-- 字段定义(JSON Schema)⭐
fields JSON NOT NULL,
/* JSON 结构:
[
{
"name": "url",
"label": "URL",
"type": "text",
"required": true,
"supportsExpression": true,
"placeholder": "https://api.example.com"
},
{
"name": "method",
"label": "Method",
"type": "select",
"options": ["GET", "POST", "PUT", "DELETE"],
"defaultValue": "GET"
}
]
*/
-- 输出结构定义(JSON Schema)⭐
output_schema JSON,
/* JSON 结构:
{
"type": "object",
"properties": {
"statusCode": {"type": "number"},
"body": {"type": "object"},
"headers": {"type": "object"}
}
}
*/
-- Java实现类
implementation_class VARCHAR(255) NOT NULL,
-- 是否启用
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 内置节点类型初始数据
INSERT INTO node_types (id, name, display_name, category, icon, fields, output_schema, implementation_class) VALUES
('http_request', 'httpRequest', 'HTTP Request', 'api', 'api',
'[{"name":"url","label":"URL","type":"text","required":true,"supportsExpression":true}]',
'{"type":"object","properties":{"statusCode":{"type":"number"},"body":{"type":"object"}}}',
'com.workflow.nodes.HttpRequestNode');
```
#### 表3: workflow_executions(工作流执行记录扩展)
```sql
CREATE TABLE workflow_executions (
id VARCHAR(64) PRIMARY KEY,
workflow_id VARCHAR(64) NOT NULL,
workflow_definition_id VARCHAR(64) NOT NULL,
-- Flowable流程实例ID
flowable_process_instance_id VARCHAR(64) NOT NULL,
-- 输入参数
input JSON,
-- 执行状态
status VARCHAR(20) DEFAULT 'running', -- running, completed, failed, cancelled
-- 开始/结束时间
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ended_at TIMESTAMP,
-- 错误信息
error_message TEXT,
error_stack TEXT,
-- 触发方式
trigger_type VARCHAR(20), -- manual, cron, webhook
triggered_by VARCHAR(100),
FOREIGN KEY (workflow_definition_id) REFERENCES workflow_definitions(id),
CONSTRAINT chk_execution_status CHECK (status IN ('running', 'completed', 'failed', 'cancelled'))
);
-- 索引
CREATE INDEX idx_execution_workflow ON workflow_executions(workflow_definition_id);
CREATE INDEX idx_execution_status ON workflow_executions(status);
CREATE INDEX idx_execution_started_at ON workflow_executions(started_at);
```
#### 表4: node_execution_logs(节点执行日志)
```sql
CREATE TABLE node_execution_logs (
id BIGSERIAL PRIMARY KEY,
execution_id VARCHAR(64) NOT NULL,
node_id VARCHAR(64) NOT NULL,
node_name VARCHAR(255),
node_type VARCHAR(64),
-- 输入/输出
input JSON,
output JSON,
-- 状态
status VARCHAR(20), -- success, failed, skipped
-- 时间
started_at TIMESTAMP,
ended_at TIMESTAMP,
duration_ms INTEGER,
-- 错误
error_message TEXT,
FOREIGN KEY (execution_id) REFERENCES workflow_executions(id)
);
-- 索引
CREATE INDEX idx_node_log_execution ON node_execution_logs(execution_id);
CREATE INDEX idx_node_log_status ON node_execution_logs(status);
```
### 2.2 Flowable表说明(不需要创建,自动生成)
```sql
-- Flowable会自动创建这些表,我们只需要知道它们的用途
-- 流程定义相关
ACT_RE_DEPLOYMENT -- 部署信息
ACT_RE_PROCDEF -- 流程定义
ACT_RE_MODEL -- 模型信息
-- 运行时数据
ACT_RU_EXECUTION -- 流程实例/执行
ACT_RU_TASK -- 任务(User Task)⭐
ACT_RU_VARIABLE -- 流程变量(我们的上下文数据)⭐
ACT_RU_JOB -- 异步任务
-- 历史数据
ACT_HI_PROCINST -- 流程实例历史
ACT_HI_TASKINST -- 任务历史
ACT_HI_VARINST -- 变量历史
ACT_HI_ACTINST -- 活动历史(每个节点的执行记录)⭐
```
---
## 三、核心模块实现
### 3.1 节点类型注册系统
#### NodeTypeRegistry.java
```java
package com.workflow.registry;
import com.workflow.model.NodeTypeMetadata;
import com.workflow.nodes.WorkflowNode;
import com.workflow.repository.NodeTypeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 节点类型注册中心
*
* 职责:
* 1. 扫描并注册所有节点类型
* 2. 提供节点类型查询
* 3. 管理节点元数据
*/
@Service
public class NodeTypeRegistry {
private final Map nodeTypes = new ConcurrentHashMap<>();
private final Map nodeInstances = new ConcurrentHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Autowired
private NodeTypeRepository repository;
/**
* 启动时自动扫描注册
*/
@PostConstruct
public void init() {
// 1. 扫描所有WorkflowNode实现类
Map beans = applicationContext.getBeansOfType(WorkflowNode.class);
for (WorkflowNode node : beans.values()) {
registerNode(node);
}
// 2. 从数据库加载(支持动态注册)
List dbNodeTypes = repository.findAll();
for (NodeTypeMetadata metadata : dbNodeTypes) {
nodeTypes.put(metadata.getId(), metadata);
}
System.out.println("✅ 已注册 " + nodeTypes.size() + " 个节点类型");
}
/**
* 注册单个节点
*/
private void registerNode(WorkflowNode node) {
NodeTypeMetadata metadata = node.getMetadata();
// 验证必填字段
if (metadata.getId() == null || metadata.getImplementationClass() == null) {
throw new IllegalArgumentException("节点元数据不完整: " + metadata);
}
// 保存元数据
nodeTypes.put(metadata.getId(), metadata);
nodeInstances.put(metadata.getId(), node);
// 持久化到数据库
repository.save(metadata);
System.out.println(" - " + metadata.getDisplayName() + " (" + metadata.getId() + ")");
}
/**
* 获取节点元数据
*/
public NodeTypeMetadata getMetadata(String typeId) {
return nodeTypes.get(typeId);
}
/**
* 获取节点实例(用于执行)
*/
public WorkflowNode getNodeInstance(String typeId) {
return nodeInstances.get(typeId);
}
/**
* 获取所有节点类型(给前端用)
*/
public List getAllNodeTypes() {
return new ArrayList<>(nodeTypes.values());
}
/**
* 按分类获取节点类型
*/
public List getNodeTypesByCategory(String category) {
return nodeTypes.values().stream()
.filter(meta -> category.equals(meta.getCategory()))
.toList();
}
}
```
### 3.2 表达式引擎
#### ExpressionEngine.java
```java
package com.workflow.engine;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.stereotype.Service;
import jakarta.el.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 表达式解析引擎
*
* 支持语法:
* ${nodes.node1.output.body.email}
* ${workflow.input.username}
* ${env.API_KEY}
* ${nodes.step1.output.count > 10 ? 'high' : 'low'}
*/
@Service
public class ExpressionEngine {
private final ExpressionFactory expressionFactory;
// 表达式编译缓存(性能优化)⭐
private final Map expressionCache = new ConcurrentHashMap<>();
public ExpressionEngine() {
this.expressionFactory = ExpressionFactory.newInstance();
}
/**
* 解析单个表达式
*
* @param expression 表达式字符串,例如: "${nodes.node1.output.body.email}"
* @param execution Flowable执行上下文
* @return 解析后的值
*/
public Object evaluate(String expression, DelegateExecution execution) {
if (expression == null) {
return null;
}
// 快速路径:无表达式,直接返回 ⭐
if (!expression.contains("${")) {
return expression;
}
// 提取所有表达式: ${...}
Pattern pattern = Pattern.compile("\\$\\{([^}]+)\\}");
Matcher matcher = pattern.matcher(expression);
// 如果是纯表达式(整个字符串就是一个表达式)
if (matcher.matches()) {
return evaluateSingle(expression, execution);
}
// 混合字符串,替换所有表达式
StringBuffer result = new StringBuffer();
while (matcher.find()) {
String fullExpr = matcher.group(0); // ${...}
Object value = evaluateSingle(fullExpr, execution);
matcher.appendReplacement(result, Matcher.quoteReplacement(String.valueOf(value)));
}
matcher.appendTail(result);
return result.toString();
}
/**
* 解析单个完整表达式
*/
private Object evaluateSingle(String expression, DelegateExecution execution) {
try {
// 尝试从缓存获取
ValueExpression expr = expressionCache.get(expression);
if (expr == null) {
// 编译表达式
StandardELContext context = createContext(execution);
expr = expressionFactory.createValueExpression(context, expression, Object.class);
// 缓存(限制大小)
if (expressionCache.size() < 1000) {
expressionCache.put(expression, expr);
}
}
// 求值
StandardELContext context = createContext(execution);
return expr.getValue(context);
} catch (Exception e) {
throw new ExpressionEvaluationException(
"表达式解析失败: " + expression + ", 错误: " + e.getMessage(),
e
);
}
}
/**
* 创建EL上下文(注入变量)⭐⭐⭐
*/
private StandardELContext createContext(DelegateExecution execution) {
StandardELContext context = new StandardELContext(expressionFactory);
// 1. 注入 nodes(节点输出)
Map nodesData = (Map) execution.getVariable("nodes");
if (nodesData != null) {
ValueExpression nodesExpr = expressionFactory.createValueExpression(
nodesData, Map.class
);
context.getVariableMapper().setVariable("nodes", nodesExpr);
}
// 2. 注入 workflow(工作流输入和变量)
Map workflowData = (Map) execution.getVariable("workflow");
if (workflowData != null) {
ValueExpression workflowExpr = expressionFactory.createValueExpression(
workflowData, Map.class
);
context.getVariableMapper().setVariable("workflow", workflowExpr);
}
// 3. 注入 env(环境变量)
ValueExpression envExpr = expressionFactory.createValueExpression(
System.getenv(), Map.class
);
context.getVariableMapper().setVariable("env", envExpr);
return context;
}
/**
* 批量解析对象中的所有表达式(递归)
*/
public Map resolveObject(Map input, DelegateExecution execution) {
Map result = new HashMap<>();
for (Map.Entry entry : input.entrySet()) {
Object value = entry.getValue();
if (value instanceof String) {
// 解析字符串表达式
result.put(entry.getKey(), evaluate((String) value, execution));
} else if (value instanceof Map) {
// 递归解析嵌套对象
result.put(entry.getKey(), resolveObject((Map) value, execution));
} else if (value instanceof List) {
// 解析数组
List> list = (List>) value;
List