42 KiB
42 KiB
工作流引擎架构文档
1. 系统架构
1.1 整体架构
com.qqchen.deploy.backend.workflow
├── engine/ # 工作流引擎核心
│ ├── context/ # 工作流上下文
│ ├── executor/ # 节点执行器
│ ├── parser/ # 工作流定义解析器
│ ├── transition/ # 流转控制
│ └── node/ # 节点实现
├── entity/ # 实体类
├── repository/ # 数据访问层
├── service/ # 业务逻辑层
├── dto/ # 数据传输对象
├── converter/ # 对象转换器
├── query/ # 查询条件
└── enums/ # 枚举定义
1.2 核心模块说明
1.2.1 工作流引擎核心 (engine)
- WorkflowEngine: 工作流引擎接口,定义工作流生命周期管理方法
- DefaultWorkflowEngine: 默认实现,负责工作流实例的创建、执行、暂停、恢复等
- WorkflowContext: 工作流上下文,管理工作流执行过程中的变量和状态
- NodeExecutor: 节点执行器接口,不同类型节点的执行实现
- TransitionExecutor: 流转执行器,负责节点间的流转逻辑
1.2.2 实体模型 (entity)
- WorkflowDefinition: 工作流定义,包含节点和流转配置
- WorkflowInstance: 工作流实例,工作流定义的运行时实例
- NodeInstance: 节点实例,工作流中具体节点的运行时实例
- NodeConfig: 节点配置,定义节点的属性和行为
- TransitionConfig: 流转配置,定义节点间的连接和条件
- WorkflowVariable: 工作流变量,存储工作流执行过程中的数据
- WorkflowLog: 工作流日志,记录工作流执行过程
1.2.3 数据访问层 (repository)
- IWorkflowDefinitionRepository: 工作流定义仓库
- IWorkflowInstanceRepository: 工作流实例仓库
- INodeInstanceRepository: 节点实例仓库
- IWorkflowVariableRepository: 工作流变量仓库
- IWorkflowLogRepository: 工作流日志仓库
1.2.4 业务逻辑层 (service)
- IWorkflowDefinitionService: 工作流定义服务
- IWorkflowInstanceService: 工作流实例服务
- IWorkflowVariableService: 工作流变量服务
- IWorkflowLogService: 工作流日志服务
2. 已实现功能
2.1 工作流定义
- 工作流基本信息管理(创建、修改、删除)
- 工作流状态管理(草稿、发布、禁
- 节点配置管理
- 流转配置管理
- 配置验证
2.2 工作流执行
- 工作流实例创建
- 节点执行框架
- 基本流转控制
- 变量管理
- 日志记录
2.3 节点类型
- 开始节点
- 结束节点
- 任务节点
- Shell执行器
- HTTP执行器
- Java执行器
- 网关节点
- 排他网关
- 并行网关
- 包容网关
2.4 基础设施
- 数据库表结构
- 基础异常处理
- 基本日志记录
- DTO/Entity转换
3. 待改进功能
3.1 核心功能完善
3.1.1 工作流定义
- 工作流定义版本控制
- 工作流模板管理
- 工作流导入导出
- 工作流克隆
- 表单配置管理
3.1.2 工作流执行
- 工作流实例暂停/恢复
- 子流程支持
- 定时调度
- 条件表达式引擎优化
- 变量作用域隔离
- 变量类型自动转换
- 变量生命周期管理
3.1.3 节点功能
- 节点执行重试机制
- 节点超时控制
- 节点执行资源限制
- Shell执行安全控制
- HTTP请求重试配置
- 自定义节点扩展机制
3.2 性能优化
3.2.1 数据库优化
- 添加必要索引
- NodeConfig表工作定ID索引
- NodeConfig表节点类型索引
- TransitionConfig表源节点和目标节点索引
- 大表分表策略
- 工作流实例表
- 节点实例表
- 工作流日志表
- 数据库连接池优化
3.2.2 并发控制
- 工作流实例锁机制
- 节点实例锁机制
- 分布式锁实现
- 并发限制
3.2.3 缓存优化
- 工作流定义缓存
- 节点配置缓存
- 变量缓存
3.3 可用性提升
3.3.1 高可用
- 工作流热备份
- 故障自动恢复
- 限流熔断
- 任务队列
3.3.2 监控运维
- 工作流监控指标
- 性能统计
- 日志归档
- 数据清理策略
3.4 安全性增强
3.4.1 访问控制
- 工作流权限管理
- 节点权限控制
- 变量访问控制
- 操作审计日志
3.4.2 数据安全
- 敏感数据加密
- 节点执行隔离
- 资源使用限制
- 安全规则配置
4. 技术栈
4.1 基础框架
- Spring Boot
- Spring Data JPA
- Hibernate
- MapStruct
4.2 数据库
- MySQL
- Flyway (数据库版本管理)
4.3 工具库
- Jackson (JSON处理)
- Apache Commons
- Lombok
5. 开发规范
5.1 代码规范
- 遵循阿里巴巴Java开发手册
- 使用统一的代码格式化配置
- 必要的注释和文档
5.2 命名规范
- 包名:com.qqchen.deploy.backend.workflow.*
- 类名:驼峰命名,见名知意
- 方法名:动词开头,驼峰命名
- 变量名:驼峰命名,有意义的名称
5.3 异常处理
- 使用自定义异常类
- 统一的错误码管理
- 详细的错误信息
5.4 日志规范
- 使用SLF4J + Logback
- 分级别记录日志
- 包含必要的上下文信息
6. 部署要求
6.1 环境要求
- JDK 17+
- MySQL 8.0+
- Maven 3.8+
6.2 配置要求
- 数据库连接配置
- 线程池配置
- 日志配置
- 缓存配置
6.3 监控要求
- JVM监控
- 数据库监控
- 业务监控
- 日志监控
7. API接口说明
7.1 工作流定义接口
7.1.1 已实现接口
- 暂无已实现的HTTP接口,但Service层已实现以下功能:
- 创建工作流定义
- 更新工作流定义
- 删除工作流定义
- 查询工作流定义列表
- 获取工作流定义详情
7.2 工作流实例接口
7.2.1 已实现接口
- 暂无已实现的HTTP接口,但Service层已实现以下功能:
- 创建工作流实例
- 查询工作流实例列表
- 获取工作流实例详情
7.3 节点实例接口
7.3.1 已实现接口
- 暂无已实现的HTTP接口,但Service层已实现以下功能:
- 查询节点实例列表
- 获取节点实例详情
7.4 工作流变量接口
7.4.1 已实现接口
- 暂无已实现的HTTP接口,但Service层已实现以下功能:
- 查询工作流变量列表
7.5 工作流日志接口
7.5.1 已实现接口
- 暂无已实现的HTTP接口,但Service层已实现以下功能:
- 查询工作流日志列表
7.6 数据结构说明
7.6.1 工作流定义相关
- WorkflowDefinitionDTO
{
Long id; // 主键ID
String name; // 工作流名称
String description; // 工作流描述
WorkflowStatusEnum status; // 状态(DRAFT/PUBLISHED/DISABLED)
List<NodeConfig> nodes; // 节点配置列表
List<TransitionConfig> transitions; // 流转配置列表
String createBy; // 创建人
LocalDateTime createTime; // 创建时间
String updateBy; // 更新人
LocalDateTime updateTime; // 更新时间
Integer version; // 版本号
}
- NodeConfig
{
String id; // 节点ID
String name; // 节点名称
NodeTypeEnum type; // 节点类型
Map<String, Object> config; // 节点配置(JSON)
}
- TransitionConfig
{
String id; // 流转ID
String sourceNodeId; // 源节点ID
String targetNodeId; // 目标节点ID
String condition; // 流转条件
}
7.6.2 工作流实例相关
- WorkflowInstanceDTO
{
Long id; // 主键ID
Long definitionId; // 工作流定义ID
String name; // 工作流名称
WorkflowStatusEnum status; // 状态
String createBy; // 创建人
LocalDateTime createTime; // 创建时间
String updateBy; // 更新人
LocalDateTime updateTime; // 更新时间
}
- NodeInstanceDTO
{
Long id; // 主键ID
Long workflowInstanceId; // 工作流实例ID
String nodeId; // 节点定义ID
String nodeName; // 节点名称
NodeTypeEnum nodeType; // 节点类型
NodeStatusEnum status; // 节点状态
LocalDateTime startTime; // 开始时间
LocalDateTime endTime; // 结束时间
String error; // 错误信息
}
7.6.3 工作流变量相关
- WorkflowVariableDTO
{
Long id; // 主键ID
Long workflowInstanceId; // 工作流实例ID
Long nodeInstanceId; // 节点实例ID
String name; // 变量名
String value; // 变量值
VariableScopeEnum scope; // 作用域
}
7.6.4 工作流日志相关
- WorkflowLogDTO
{
Long id; // 主键ID
Long workflowInstanceId; // 工作流实例ID
Long nodeInstanceId; // 节点实例ID
String level; // 日志级别
String content; // 日志内容
LocalDateTime createTime; // 创建时间
}
7.7 枚举值说明
- WorkflowStatusEnum
{
DRAFT, // 草稿
PUBLISHED, // 已发布
DISABLED // 已禁用
}
- NodeTypeEnum
{
START, // 开始节点
END, // 结束节点
TASK, // 任务节点
GATEWAY // 网关节点
}
- NodeStatusEnum
{
PENDING, // 待执行
RUNNING, // 执行中
COMPLETED, // 已完成
FAILED, // 执行失败
SKIPPED // 已跳过
}
- VariableScopeEnum
{
GLOBAL, // 全局变量
WORKFLOW, // 工作流级变量
NODE // 节点级变量
}
7.8 错误码说明
- 系统错误 (1xxx)
1000: 系统内<E7BB9F><E58685>错误
1001: 数据库操作失败
1002: 并发操作冲突
- 工作流定义错误 (2xxx)
2000: 工作流定义不存在
2001: 工作流定义名称重复
2002: 非法状态转换
2003: 节点配置无效
2004: 流转配置无效
- 工作流实例错误 (3xxx)
3000: 工作流实例不存在
3001: 实例状态不允许操作
3002: 节点执行失败
3003: 变量不存在
- 权限错误 (4xxx)
4000: 无操作权限
4001: 用户未认证
4002: 用户未授权
8. 详细设计
8.1 工作流引擎核心设计
8.1.1 工作流定义解析
- 解析流程
WorkflowDefinition (JSON) -> WorkflowDefinitionParser -> RuntimeWorkflow
- 主要组件
- WorkflowDefinitionParser: 负责解析工作流定义
- NodeParser: 负责解析节点配置
- TransitionParser: 负责解析流转配置
- ValidatorChain: 配置验证链
- 验证规则
- 节点完整性验证
- 流转完整性验证
- 起始节点验证
- 结束节点验证
- 环路检测
8.1.2 工作流执行引擎
- 核心接口
public interface WorkflowEngine {
// 创建工作流实例
WorkflowInstance createWorkflowInstance(Long definitionId, Map<String, Object> variables);
// 执行工作流实例
void executeWorkflowInstance(Long instanceId);
// 暂停工作流实例
void suspendWorkflowInstance(Long instanceId);
// 恢复工作流实例
void resumeWorkflowInstance(Long instanceId);
// 终止工作流实例
void terminateWorkflowInstance(Long instanceId);
}
- 执行流程
开始
↓
加载工作流定义
↓
创建工作流上下文
↓
执行开始节点
↓
while (存在待执行节点) {
获取下一个节点
执行节点
更新节点状态
处理节点输出
确定下一个节点
}
↓
执行结束节点
↓
结束
- 状态管理
- 工作流状态机
PENDING → RUNNING → COMPLETED
↓ ↓ ↑
└→ FAILED ←┘ CANCELLED
- 节点状态机
PENDING → RUNNING → COMPLETED
↓ ↓ ↑
└→ FAILED ←┘ SKIPPED
8.1.3 节点执行器
- 基础抽象类
public abstract class AbstractNodeExecutor implements NodeExecutor {
// 执行前处理
protected void beforeExecute(NodeInstance node, WorkflowContext context);
// 执行节点
protected abstract ExecuteResult doExecute(NodeInstance node, WorkflowContext context);
// 执行后处理
protected void afterExecute(NodeInstance node, WorkflowContext context, ExecuteResult result);
// 异常处理
protected void handleException(NodeInstance node, WorkflowContext context, Exception e);
}
- 内置执行器
- StartNodeExecutor: 启动节点执行器
- EndNodeExecutor: 结束节点执行器
- TaskNodeExecutor: 任务节点执行器
- GatewayNodeExecutor: 网关节点执行器
- 任务执行器
- ShellTaskExecutor: Shell命令执行器
- HttpTaskExecutor: HTTP请求执行器
- JavaTaskExecutor: Java代码执行器
8.1.4 流转控制
- 流转规则引擎
public interface TransitionRuleEngine {
// 评估流转条件
boolean evaluate(String condition, WorkflowContext context);
// 获取下一个节点
List<String> getNextNodes(String currentNodeId, WorkflowContext context);
}
- 条件表达式
- 支持SpEL表达式
- 支持变量引用
- 支持函数调用
- 支持逻辑运算
- 网关类型
- 排他网关(XOR):只选择一个分支
- 并行网关(AND):并行执行所有分支
- 包容网关(OR):选择满足条件的所有分支
8.2 数据模型设计
8.2.1 工作流定义相关表
- 工作流定义表 (wf_workflow_definition)
CREATE TABLE wf_workflow_definition (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
code VARCHAR(100) NOT NULL COMMENT '工作流编码',
name VARCHAR(100) NOT NULL COMMENT '工作流名称',
description VARCHAR(255) NULL COMMENT '工作流描述',
status VARCHAR(20) NOT NULL COMMENT '工作流状态',
version_no INT NOT NULL DEFAULT 1 COMMENT '版本号',
node_config TEXT NULL COMMENT '节点配置(JSON)',
transition_config TEXT NULL COMMENT '流转配置(JSON)',
form_definition TEXT NULL COMMENT '表单定义(JSON)',
graph_definition TEXT NULL COMMENT '图形信息(JSON)',
enabled BIT NOT NULL DEFAULT 1 COMMENT '是否启用',
CONSTRAINT UK_workflow_definition_code_version UNIQUE (code, version_no)
);
- 节点定义表 (wf_node_definition)
CREATE TABLE wf_node_definition (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID',
node_id VARCHAR(100) NOT NULL COMMENT '节点ID',
name VARCHAR(100) NOT NULL COMMENT '节点名称',
type VARCHAR(20) NOT NULL COMMENT '节点类型',
config TEXT NULL COMMENT '节点配置(JSON)',
order_num INT NOT NULL DEFAULT 0 COMMENT '排序号',
CONSTRAINT FK_node_definition_workflow FOREIGN KEY (workflow_definition_id)
REFERENCES wf_workflow_definition (id),
CONSTRAINT UK_node_definition_workflow_node UNIQUE (workflow_definition_id, node_id)
);
- 节点类型表 (wf_node_type)
CREATE TABLE wf_node_type (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
code VARCHAR(100) NOT NULL COMMENT '节点类型编码',
name VARCHAR(100) NOT NULL COMMENT '节点类型名称',
description TEXT NULL COMMENT '节点类型描述',
category VARCHAR(50) NOT NULL COMMENT '节点类型分类',
icon VARCHAR(100) NULL COMMENT '节点图标',
color VARCHAR(20) NULL COMMENT '节点颜色',
executors TEXT NULL COMMENT '执行器列表(JSON)',
config_schema TEXT NULL COMMENT '节点配置模式(JSON)',
default_config TEXT NULL COMMENT '默认配置(JSON)',
enabled BIT NOT NULL DEFAULT 1 COMMENT '是否启用',
CONSTRAINT UK_node_type_code UNIQUE (code)
);
8.2.2 工作流实例相关表
- 工作流实例表 (wf_workflow_instance)
CREATE TABLE wf_workflow_instance (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID',
business_key VARCHAR(100) NOT NULL COMMENT '业务标识',
status VARCHAR(20) NOT NULL COMMENT '状态',
start_time DATETIME(6) NULL COMMENT '开始时间',
end_time DATETIME(6) NULL COMMENT '结束时间',
error TEXT NULL COMMENT '错误信息',
CONSTRAINT FK_workflow_instance_definition FOREIGN KEY (workflow_definition_id)
REFERENCES wf_workflow_definition (id),
CONSTRAINT UK_workflow_instance_business_key UNIQUE (business_key)
);
- 节点实例表 (wf_node_instance)
CREATE TABLE wf_node_instance (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
workflow_instance_id BIGINT NOT NULL COMMENT '工作流实例ID',
node_id VARCHAR(100) NOT NULL COMMENT '节点ID',
node_type VARCHAR(20) NOT NULL COMMENT '节点类型',
name VARCHAR(100) NOT NULL COMMENT '节点名称',
status VARCHAR(20) NOT NULL COMMENT '状态',
start_time DATETIME(6) NULL COMMENT '开始时间',
end_time DATETIME(6) NULL COMMENT '结束时间',
config TEXT NULL COMMENT '节点配置(JSON)',
input TEXT NULL COMMENT '输入参<E585A5><E58F82><EFBFBD>(JSON)',
output TEXT NULL COMMENT '输出结果(JSON)',
error TEXT NULL COMMENT '错误信息',
pre_node_id VARCHAR(100) NULL COMMENT '前置节点ID',
CONSTRAINT FK_node_instance_workflow FOREIGN KEY (workflow_instance_id)
REFERENCES wf_workflow_instance (id)
);
8.2.3 工作流变量表 (wf_workflow_variable)
CREATE TABLE wf_workflow_variable (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(255) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
update_by VARCHAR(255) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
workflow_instance_id BIGINT NOT NULL COMMENT '工作流实例ID',
name VARCHAR(100) NOT NULL COMMENT '变量名',
value TEXT NULL COMMENT '变量值',
type VARCHAR(20) NOT NULL COMMENT '变量类型',
CONSTRAINT FK_workflow_variable_instance FOREIGN KEY (workflow_instance_id)
REFERENCES wf_workflow_instance (id),
CONSTRAINT UK_workflow_variable_instance_name UNIQUE (workflow_instance_id, name)
);
8.2.4 工作流日志表 (wf_log)
CREATE TABLE wf_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_instance_id BIGINT NOT NULL,
node_id VARCHAR(50),
message VARCHAR(1000) NOT NULL,
level VARCHAR(20) NOT NULL,
detail TEXT,
create_time DATETIME NOT NULL,
create_by VARCHAR(50),
update_time DATETIME,
update_by VARCHAR(50),
version INT DEFAULT 0,
deleted BOOLEAN DEFAULT FALSE
);
-- 索引
CREATE INDEX idx_wf_log_instance ON wf_log (workflow_instance_id);
CREATE INDEX idx_wf_log_node ON wf_log (workflow_instance_id, node_id);
8.2.5 工作流权限表 (wf_workflow_permission)
CREATE TABLE wf_workflow_permission (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID',
type VARCHAR(50) NOT NULL COMMENT '权限类型',
user_id BIGINT COMMENT '用户ID',
role_id BIGINT COMMENT '角色ID',
department_id BIGINT COMMENT '部门ID',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
create_by BIGINT NOT NULL COMMENT '创建人',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
update_by BIGINT NOT NULL COMMENT '更新人',
version INT NOT NULL DEFAULT 0 COMMENT '版本号',
deleted TINYINT(1) NOT NULL DEFAULT 0 COMMENT '是否删除',
INDEX idx_workflow_definition_id (workflow_definition_id),
INDEX idx_user_id (user_id),
INDEX idx_role_id (role_id),
INDEX idx_department_id (department_id)
);
8.3 核心功能设计
8.3.1 版本控制
- 版本策略
- 主版本号:重大更新
- 次版本号:功能更新
- 修订版本号:Bug修复
- 版本管理
- 草稿版本:可以修改
- 发布版本:只读
- 禁用版本:不可用
- 版本操作
- 创建新版本
- 复制版本
- 回滚版本
- 删除版本
8.3.2 变量管理
- 变量作用域
- 全局变量:整个工作流可见
- 工作流变量:当前工作流实例可见
- 节点变量:当前节点可见
- 变量生命周期
- 创建:工作流启动时
- 更新:节点执行时
- 销毁:工作流结束时
- 变量操作
- 设置变量
- 获取变量
- 删除变量
- 清空变量
8.3.3 任务执行
- Shell任务
public class ShellTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
// 1. 解析命令
String command = config.getCommand();
// 2. 设置环境变量
Map<String, String> env = config.getEnvironment();
// 3. 执行命令
Process process = Runtime.getRuntime().exec(command);
// 4. 获取结果
String output = readOutput(process);
// 5. 检查退出码
int exitCode = process.waitFor();
return new ExecuteResult(exitCode == 0, output);
}
}
- HTTP任务
public class HttpTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
// 1. 构建请求
HttpRequest request = buildRequest(config);
// 2. 发送请求
HttpResponse response = httpClient.send(request);
// 3. 处理响应
String body = response.body();
// 4. 检查状态码
boolean success = response.statusCode() >= 200 && response.statusCode() < 300;
return new ExecuteResult(success, body);
}
}
- Java任务
public class JavaTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
// 1. 获取类名和方法名
String className = config.getClassName();
String methodName = config.getMethodName();
// 2. 加载类
Class<?> clazz = Class.forName(className);
// 3. 获取方法
Method method = clazz.getMethod(methodName);
// 4. 执行方法
Object result = method.invoke(null);
return new ExecuteResult(true, result.toString());
}
}
8.3.4 错误处理
- 重试机制
public class RetryableTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
int maxAttempts = config.getMaxAttempts();
long delay = config.getDelay();
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return delegate.execute(config);
} catch (Exception e) {
if (attempt == maxAttempts) {
throw e;
}
Thread.sleep(delay);
}
}
}
}
- 补偿机制
public class CompensableTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
try {
ExecuteResult result = delegate.execute(config);
if (!result.isSuccess()) {
compensate(config);
}
return result;
} catch (Exception e) {
compensate(config);
throw e;
}
}
void compensate(TaskConfig config) {
// 执行补偿逻辑
}
}
- 超时控制
public class TimeoutTaskExecutor implements TaskExecutor {
ExecuteResult execute(TaskConfig config) {
Future<ExecuteResult> future = executor.submit(() -> {
return delegate.execute(config);
});
try {
return future.get(config.getTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new TaskTimeoutException();
}
}
}
8.4 扩展点设计
8.4.1 节点类型扩展
- 自定义节点
@Component
public class CustomNodeExecutor extends AbstractNodeExecutor {
@Override
protected ExecuteResult doExecute(NodeInstance node, WorkflowContext context) {
// 实现自定义节点逻辑
}
}
- 节点注册
public class NodeExecutorRegistry {
private Map<String, NodeExecutor> executors = new HashMap<>();
public void register(String type, NodeExecutor executor) {
executors.put(type, executor);
}
public NodeExecutor get(String type) {
return executors.get(type);
}
}
8.4.2 表达式扩展
- 自定义函数
public class CustomFunctions {
@Function
public static boolean checkCondition(String value) {
// 实现自定义函数逻辑
}
}
- 函数注册
public class FunctionRegistry {
private Map<String, Method> functions = new HashMap<>();
public void register(String name, Method method) {
functions.put(name, method);
}
public Method get(String name) {
return functions.get(name);
}
}
8.4.3 变量处理扩展
- 自定义变量处理器
public class CustomVariableHandler implements VariableHandler {
@Override
public Object serialize(Object value) {
// 实现序列化逻辑
}
@Override
public Object deserialize(String value, Class<?> type) {
// 实现反序列化逻辑
}
}
- 处理器注册
public class VariableHandlerRegistry {
private Map<Class<?>, VariableHandler> handlers = new HashMap<>();
public void register(Class<?> type, VariableHandler handler) {
handlers.put(type, handler);
}
public VariableHandler get(Class<?> type) {
return handlers.get(type);
}
}
8.5 性能优化设计
8.5.1 数据库优化
- 索引设计
-- 工作流定义表索引
ALTER TABLE workflow_definition ADD INDEX idx_code (code);
ALTER TABLE workflow_definition ADD INDEX idx_status (status);
-- 工作流实例表索引
ALTER TABLE workflow_instance ADD INDEX idx_definition_id (definition_id);
ALTER TABLE workflow_instance ADD INDEX idx_status (status);
ALTER TABLE workflow_instance ADD INDEX idx_create_time (create_time);
-- 节点实例表索引
ALTER TABLE node_instance ADD INDEX idx_workflow_instance_id (workflow_instance_id);
ALTER TABLE node_instance ADD INDEX idx_status (status);
-- 工作流变量表索引
ALTER TABLE workflow_variable ADD INDEX idx_workflow_instance_id (workflow_instance_id);
ALTER TABLE workflow_variable ADD INDEX idx_scope (scope);
- 分表策略
- 按时间分表
- 按工作流类型分表
- 按租户分表
- 归档策略
- 定期归档历史数据
- 按时间维度归档
- 支持归档数据查询
8.5.2 缓存设计
- 缓存层次
应用缓存 → 分布式缓存 → 数据库
- 缓存对<EFBFBD><EFBFBD><EFBFBD>
- 工作流定义
- 节点配置
- 常用变量
- 执行统计
- 缓存策略
- LRU淘汰
- 定时刷新
- 主动失效
8.5.3 并发控制
- 锁设计
public interface LockManager {
// 获取锁
boolean acquireLock(String key, long timeout);
// 释放锁
void releaseLock(String key);
// 续期锁
boolean renewLock(String key);
}
- 隔离级别
- 工作流实例级
- 节点实例级
- 变量级
- 并发策略
- 乐观锁
- 悲观锁
- 分布式锁
8.6 监控运维设计
8.6.1 监控指标
- 性能指标
- 工作流执行时间
- 节点执行时间
- 资源使用率
- 并发数量
- 业务指标
- 工作流成功率
- 节点成功率
- 重试次数
- 超次数
- 系统指标
- CPU使用率
- 内存使用率
- 磁盘使用率
- 网络使用率
8.6.2 告警设计
- 告警规则
- 执行超时
- 异常失败
- 资源不足
- 并发超限
- 告警级别
- 严重告警
- 警告告警
- 提示告警
- 告警方式
- 邮件通知
- 短信通知
- 钉钉通知
- Webhook通知
8.6.3 日志设计
- 日志分类
- 系统日志
- 业务日志
- 审计日志
- 性能日志
- 日志格式
{
"timestamp": "2023-12-05 10:00:00",
"level": "INFO",
"thread": "main",
"class": "WorkflowEngine",
"message": "工作流开始执行",
"context": {
"workflowInstanceId": "123",
"nodeId": "node1",
"variables": {}
}
}
- 日志存储
- 文件存储
- 数据库存储
- ES存储
8.7 安全设计
8.7.1 认证授权
- 认证方式
- 用户名密码
- Token认证
- OAuth2认证
- SSO认证
- 权限模型
- 角色权限
- 数据权限
- 功能权限
- 字段权限
- 权限控制
@PreAuthorize("hasPermission('workflow', 'execute')")
public void executeWorkflow(Long instanceId) {
// 执行工作流
}
8.7.2 数据安全
- 敏感数据
- 配置信息
- 执行结果
- 变量数据
- 日志内容
- 加密方案
- 配置加密
- 传输加密
- 存储加密
- 字段加密
- 脱敏策略
public class DataMaskingUtils {
public static String maskSensitiveData(String data, String type) {
switch (type) {
case "mobile":
return maskMobile(data);
case "email":
return maskEmail(data);
case "idcard":
return maskIdCard(data);
default:
return data;
}
}
}
8.7.3 审计日志
- 审计内容
- 操作人
- 操作时间
- 操作类型
- 操作内容
- 操作结果
- 审计方式
@Audit(type = "workflow", operation = "execute")
public void executeWorkflow(Long instanceId) {
// 执行工作流
}
- 审计存储
CREATE TABLE audit_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(50) NOT NULL COMMENT '用户ID',
operation_type VARCHAR(50) NOT NULL COMMENT '操作类型',
operation_time DATETIME NOT NULL COMMENT '操作时间',
resource_type VARCHAR(50) NOT NULL COMMENT '资源类型',
resource_id VARCHAR(50) NOT NULL COMMENT '资源ID',
operation_content TEXT COMMENT '操作内容',
operation_result VARCHAR(20) COMMENT '操作结果',
client_ip VARCHAR(50) COMMENT '客户端IP',
user_agent VARCHAR(200) COMMENT '用户代理'
);
9. 接口实现状态
9.1 工作流定义服务 (IWorkflowDefinitionService)
9.1.1 已实现接口
- 创建工作流定义
WorkflowDefinitionDTO create(WorkflowDefinitionDTO dto)
- 功能:创建新的工作流定义
- 参数:
- dto: 工作流定义数据,包含名称、描述、节点配置等
- 返回:创建后的工作流定义
- 说明:
- 自动设置为草稿状态
- 检查编码唯一性
- 启用状态默认为true
- 更新工作流定义
WorkflowDefinitionDTO update(Long id, WorkflowDefinitionDTO dto)
- 功能:更新现有工作流定义
- 参数:
- id: 工作流定义ID
- dto: 更新的数据
- 返回:更新后的工作流定义
- 说明:
- 仅草稿状态可更新
- 检查编码唯一性
- 支持更新基本信息、节点配置、流转配置等
- 发布工作流定义
WorkflowDefinitionDTO publish(Long id)
- 功能:发布工作流定义
- 参数:
- id: 工作流定义ID
- 返回:发布后的工作流定义
- 说明:
- 仅草稿状态可发布
- 发布前进行配置验证
- 发布后状态变更为PUBLISHED
- 禁用工作流定义
WorkflowDefinitionDTO disable(Long id)
- 功能:禁用工作流定义
- 参数:
- id: 工作流定义ID
- 返回:禁用后的工作流定义
- 说明:
- 仅已发布状态可禁用
- 禁用后状态变更为DISABLED
- 启用工作流定义
WorkflowDefinitionDTO enable(Long id)
- 功能:启用工作流定义
- 参数:
- id: 工作流定义ID
- 返回:启用后的工作流定义
- 说明:
- 仅禁用状态可启用
- 启用后状态变更为PUBLISHED
- 创建新版本
WorkflowDefinitionDTO createNewVersion(Long id)
- 功能:基于现有工作流定义创建新版本
- 参数:
- id: 源工作流定义ID
- 返回:新版本的工作流定义
- 说明:
- 复制所有配置信息
- 版本号自动递增
- 新版本为草稿状态
- 查询相关接口
WorkflowDefinitionDTO findLatestByCode(String code)
WorkflowDefinitionDTO findByCodeAndVersion(String code, Integer version)
List<WorkflowDefinitionDTO> findAllVersions(String code)
- 功能:查询工作流定义
- 说明:
- 支持查询最新版本
- 支持查询指定版本
- 支持查询所有版本
- 验证工作流定义
boolean validate(Long id)
- 功能:验证工作流定义的完整性和正确性
- 参数:
- id: 工作流定义ID
- 返回:验证结果
- 说明:
- 验证节点配置完整性
- 验证流转配置完整性
- 验证表单定义完整性
- 验证图形信息完整性
9.2 工作流实例服务 (IWorkflowInstanceService)
9.2.1 已实现接口
- 基础CRUD接口
- 继承自BaseService,包含基础的增删改查功能
9.2.2 待实现接口
- 创建工作流实例
WorkflowInstanceDTO createInstance(Long definitionId, Map<String, Object> variables)
- 功能:创建工作流实例
- 参数:
- definitionId: 工作流定义ID
- variables: 初始变量
- 返回:创建的工作流实例
- 启动工作流实例
void startInstance(Long instanceId)
- 功能:启动工作流实例
- 参数:
- instanceId: 实例ID
- 暂停工作流实例
void suspendInstance(Long instanceId)
- 功能:暂停工作流实例
- 参数:
- instanceId: 实例ID
- 恢复工作流实例
void resumeInstance(Long instanceId)
- 功能:恢复工作流实例
- 参数:
- instanceId: 实例ID
- 终止工作流实例
void terminateInstance(Long instanceId)
- 功能:终止工作流实例
- 参数:
- instanceId: 实例ID
9.3 节点实例服务 (INodeInstanceService)
9.3.1 已实现接口
- 查询节点实例
List<NodeInstanceDTO> findByWorkflowInstanceId(Long workflowInstanceId)
List<NodeInstanceDTO> findByWorkflowInstanceIdAndStatus(Long workflowInstanceId, NodeStatusEnum status)
- 功能:查询节点实例列表
- 参数:
- workflowInstanceId: 工作流实例ID
- status: 节点状态(可选)
- 返回:节点实例列表
- 更新节点状态
void updateStatus(Long id, NodeStatusEnum status, String output, String error)
- 功能:更新节点状态
- 参数:
- id: 节点实例ID
- status: 新状态
- output: 输出结果
- error: 错误信息
- 说明:
- 自动记录状态变更时间
- 支持记录执行结果和错误信息
9.4 工作流变量服务 (IWorkflowVariableService)
9.4.1 已实现接口
- 设置变量
void setVariable(Long workflowInstanceId, String name, Object value)
- 功能:设置工作流变量
- 参数:
- workflowInstanceId: 工作流实例ID
- name: 变量名
- value: 变量值
- 说明:
- 支持任意类型的变量值
- 自动序列化和类型记录
- 支持变量更新
- 获取变量
Map<String, Object> getVariables(Long workflowInstanceId)
Map<String, Object> getVariablesByScope(Long workflowInstanceId, VariableScopeEnum scope)
- 功能:获取工作流变量
- 参数:
- workflowInstanceId: 工作流实例ID
- scope: 变量作用域(可选)
- 返回:变量Map
- 说明:
- 支持获取所有变量
- 支持按作用域获取变量
- 自动反序列化为原始类型
- 删除变量
void deleteVariables(Long workflowInstanceId)
- 功能:删除工作流变量
- 参数:
- workflowInstanceId: 工作流实例ID
- 说明:
- 删除指定实例的所有变量
9.5 工作流日志服务 (IWorkflowLogService)
9.5.1 已实现接口
- 记录日志
void log(Long workflowInstanceId, String nodeId, String message, LogLevelEnum level, String detail)
- 功能:记录工作流日志
- 参数:
- workflowInstanceId: 工作流实例ID
- nodeId: 节点ID(可选)
- message: 日志消息
- level: 日志级别
- detail: 详细信息
- 说明:
- 支持不同级别的日志
- 支持节点级别的日志
- 支持详细信息记录
- 查询日志
List<WorkflowLogDTO> getLogs(Long workflowInstanceId)
List<WorkflowLogDTO> getNodeLogs(Long workflowInstanceId, String nodeId)
- 功能:查询工作流日志
- 参数:
- workflowInstanceId: 工作流实例ID
- nodeId: 节点ID(可选)
- 返回:日志列表
- 说明:
- 支持查询实例所有日志
- 支持查询节点级别日志
- 删除日志
void deleteLogs(Long workflowInstanceId)
- 功能:删除工作流日志
- 参数:
- workflowInstanceId: 工作流实例ID
- 说明:
- 删除指定实例的所有日志
9.6 节点类型服务 (INodeTypeService)
9.6.1 已实现接口
- 查询节点类型
NodeTypeDTO findByCode(String code)
- 功能:根据编码查询节点类型
- 参数:
- code: 节点类型编码
- 返回:节点类型信息
- 获取执行器列表
List<TaskExecutorDefinition> getExecutors(String code)
- 功能:获取节点类型支持的执行器列表
- 参数:
- code: 节点类型编码
- 返回:执行器定义列表
- 说明:
- 仅任务节点类型支持
- 返回执行器配置模式
- 支持多种执行器
- 启用/禁用节点类型
void enable(Long id)
void disable(Long id)
- 功能:启用或禁用节点类型
- 参数:
- id: 节点类型ID
- 说明:
- 控制节点类型是否可用
- 影响工作流设计时的节点选择
9.7 待实现功能
- 工作流实例管理
- 实例暂停/恢复机制
- 实例强制终止
- 实例重试机制
- 子流程支持
- 节点执行增强
- 节点重试机制
- 节点超时控制
- 节点执行补偿
- 自定义节点扩展
- 变量管理增强
- 变量作用域隔离
- 变量类型转换
- 变量生命周期管理
- 变量访问控制
- 监控和统计
- 执行状态监控
- 性能统计分析
- 告警机制
- 审计日志
- 权限控制
- 工作流权限管理
- 节点权限控制
- 变量访问权限
- 操作审计