12 KiB
12 KiB
工作流引擎开发文档
一、已完成工作
1. 核心实体设计
-
WorkflowDefinition: 工作流定义
- 包含基本信息(编码、名称、描述等)
- 包含流程定义内容(JSON格式)
- 支持版本控制和状态管理
-
WorkflowInstance: 工作流实例
- 关联工作流定义
- 记录执行状态和进度
- 支持暂停、恢复、取消等操作
-
NodeInstance: 节点实例
- 关联工作流实例
- 记录节点执行状态和结果
- 支持重试和跳过等操作
2. 节点类型体系
-
基础节点类型:
- START: 开始节点
- END: 结束节点
- CONDITION: 条件节点
- PARALLEL: 并行节点
-
功能节点类型:
- APPROVAL: 审批节点
- JENKINS: Jenkins构建节点
- SCRIPT: 脚本执行节点
- GIT: Git操作节点
- NACOS: 配置中心节点
- HTTP: HTTP请求节点
- NOTIFY: 通知节点
3. 工作流引擎实现
-
核心接口设计
- WorkflowEngine: 工作流引擎接口
- NodeExecutor: 节点执行器接口
-
默认实现
- DefaultWorkflowEngine: 默认工作流引擎实现
- AbstractNodeExecutor: 抽象节点执行器
- ShellNodeExecutor等具体执行器
4. 变量与日志管理
-
WorkflowVariable: 工作流变量
- 支持全局变量和节点变量
- 支持变量引用和替换
-
WorkflowLog: 工作流日志
- 支持不同日志类型和级别
- 支持详细的执行记录
5. 权限管理
- WorkflowPermission: 工作流权限
- 支持角色和用户级别的权限控制
- 支持操作级别的权限控制
6. API接口设计
- 工作流定义管理
- 工作流实例操作
- 节点实例管理
- 日志查询等功能
7. 错误码和消息
- 工作流相关错误码(2700-2799)
- 详细的错误消息定义
二、待完成工作
1. 高级功能实现
1.1 节点执行引擎增强
public interface NodeExecutor {
// 新增预执行检查
boolean preCheck(NodeInstance node, WorkflowContext context);
// 新增补偿操作
void compensate(NodeInstance node, WorkflowContext context);
// 新增超时处理
void handleTimeout(NodeInstance node, WorkflowContext context);
}
// 节点执行状态追踪
public class NodeExecutionTracker {
private Long nodeInstanceId;
private Date startTime;
private Date endTime;
private String status;
private Map<String, Object> metrics;
private List<String> logs;
}
1.2 工作流调度管理
@Service
public class WorkflowScheduleService {
// 创建调度任务
public void createSchedule(WorkflowSchedule schedule);
// 修改调度配置
public void updateSchedule(WorkflowSchedule schedule);
// 启用/禁用调度
public void toggleSchedule(Long scheduleId, boolean enabled);
// 手动触发
public void triggerSchedule(Long scheduleId);
}
// 调度配置表
CREATE TABLE wf_workflow_schedule (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID',
name VARCHAR(100) NOT NULL COMMENT '调度名称',
cron VARCHAR(100) NOT NULL COMMENT 'cron表达式',
variables JSON COMMENT '工作流变量',
enabled BIT NOT NULL DEFAULT 1 COMMENT '是否启用',
last_fire_time DATETIME COMMENT '上次触发时间',
next_fire_time DATETIME COMMENT '下次触发时间',
-- 其他基础字段
);
1.3 工作流监控告警
public interface WorkflowMonitor {
// 收集性能指标
void collectMetrics(WorkflowInstance instance);
// 检查健康状态
HealthStatus checkHealth();
// 触发告警
void triggerAlert(AlertType type, String message);
}
// 监控指标表
CREATE TABLE wf_workflow_metrics (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_instance_id BIGINT NOT NULL,
metric_name VARCHAR(100) NOT NULL,
metric_value DECIMAL(19,2) NOT NULL,
collect_time DATETIME NOT NULL,
-- 其他基础字段
);
// 告警记录表
CREATE TABLE wf_workflow_alert (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
alert_type VARCHAR(50) NOT NULL,
target_type VARCHAR(50) NOT NULL,
target_id BIGINT NOT NULL,
level VARCHAR(20) NOT NULL,
message TEXT NOT NULL,
status VARCHAR(20) NOT NULL,
-- 其他基础字段
);
1.4 工作流分析统计
public interface WorkflowAnalytics {
// 执行时长分析
Map<String, Duration> analyzeExecutionTime(Long workflowId);
// 成功率分析
Map<String, Double> analyzeSuccessRate(Long workflowId);
// 节点耗时分析
List<NodeTimeAnalysis> analyzeNodeTime(Long workflowId);
}
// 分析结果表
CREATE TABLE wf_workflow_analytics (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_definition_id BIGINT NOT NULL,
analysis_type VARCHAR(50) NOT NULL,
time_range VARCHAR(50) NOT NULL,
metrics JSON NOT NULL,
-- 其他基础字段
);
2. 功能优化计划
2.1 性能优化
- 引入本地缓存和分布式缓存
- 实现工作流实例分片执行
- 优化日志存储和查询
@Configuration
public class WorkflowCacheConfig {
@Bean
public Cache<String, WorkflowDefinition> definitionCache() {
return CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build();
}
}
public class ShardingWorkflowEngine implements WorkflowEngine {
// 分片执行实现
public void executeWithSharding(WorkflowInstance instance) {
String shardingKey = calculateShardingKey(instance);
ShardingContext context = createShardingContext(shardingKey);
executeInShard(instance, context);
}
}
2.2 可靠性增强
- 实现节点执行幂等性
- 增加全局事务控制
- 完善补偿机制
public abstract class IdempotentNodeExecutor implements NodeExecutor {
// 幂等性检查
protected boolean checkIdempotent(String executionId) {
return redisTemplate.opsForValue()
.setIfAbsent("node:execution:" + executionId, "1", 24, TimeUnit.HOURS);
}
}
@Service
public class CompensationService {
// 注册补偿操作
public void registerCompensation(NodeInstance node, Runnable compensation);
// 执行补偿
public void executeCompensation(WorkflowInstance instance);
}
2.3 扩展性优化
- 支持自定义节点类型
- 支持插件化扩展
- 提供更多扩展点
public interface WorkflowPlugin {
// 插件初始化
void init(WorkflowEngine engine);
// 注册扩展点
void registerExtensions();
// 清理资源
void destroy();
}
public class PluginManager {
// 加载插件
public void loadPlugins();
// 启用插件
public void enablePlugin(String pluginId);
// 禁用插件
public void disablePlugin(String pluginId);
}
3. 新特性规划
3.1 工作流模板功能
// 模板定义表
CREATE TABLE wf_workflow_template (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
code VARCHAR(100) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
description TEXT,
content JSON NOT NULL,
category VARCHAR(50),
tags JSON,
-- 其他基础字段
);
// 模板参数表
CREATE TABLE wf_template_parameter (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
template_id BIGINT NOT NULL,
name VARCHAR(100) NOT NULL,
type VARCHAR(50) NOT NULL,
required BIT NOT NULL DEFAULT 0,
default_value VARCHAR(255),
validation_rule VARCHAR(255),
-- 其他基础字段
);
3.2 工作流版本管理
// 版本管理表
CREATE TABLE wf_workflow_version (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_definition_id BIGINT NOT NULL,
version VARCHAR(20) NOT NULL,
content JSON NOT NULL,
change_log TEXT,
status VARCHAR(20) NOT NULL,
-- 其他基础字段
);
public interface VersionManager {
// 创建新版本
String createVersion(Long workflowId, String content);
// 发布版本
void publishVersion(Long workflowId, String version);
// 回滚版本
void rollbackVersion(Long workflowId, String version);
}
3.3 工作流迁移功能
public interface WorkflowMigration {
// 导出工作流
byte[] exportWorkflow(Long workflowId);
// 导入工作流
Long importWorkflow(byte[] content);
// 迁移实例到新版本
void migrateInstance(Long instanceId, String targetVersion);
}
// 迁移记录表
CREATE TABLE wf_workflow_migration (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
source_workflow_id BIGINT NOT NULL,
target_workflow_id BIGINT NOT NULL,
migration_type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
error_message TEXT,
-- 其他基础字段
);
3.4 工作流测试功能
public interface WorkflowTesting {
// 模拟执行
TestResult simulateExecution(Long workflowId, Map<String, Object> variables);
// 节点单元测试
TestResult testNode(NodeConfig config, Map<String, Object> inputs);
// 生成测试报告
TestReport generateReport(Long testExecutionId);
}
// 测试用例表
CREATE TABLE wf_workflow_test_case (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
workflow_definition_id BIGINT NOT NULL,
name VARCHAR(100) NOT NULL,
description TEXT,
variables JSON,
expected_result JSON,
-- 其他基础字段
);
// 测试执行记录表
CREATE TABLE wf_workflow_test_execution (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
test_case_id BIGINT NOT NULL,
execution_time DATETIME NOT NULL,
status VARCHAR(20) NOT NULL,
actual_result JSON,
error_message TEXT,
-- 其他基础字段
);
三、技术架构
1. 整体架构
+------------------+
| API Layer |
+------------------+
| Service Layer |
+------------------+
| Engine Core |
+------------------+
| Storage Layer |
+------------------+
2. 关键组件
- 工作流引擎核心
- 节点执行引擎
- 变量管理器
- 日志管理器
- 权限管理器
- 调度管理器
- 监控告警组件
- 分析统计组件
3. 存储设计
- 核心业务表
- 执行记录表
- 监控指标表
- 分析统计表
- 日志记录表
4. 缓存设计
-
本地缓存
- 工作流定义缓存
- 节点配置缓存
-
分布式缓存
- 执行状态缓存
- 变量数据缓存
四、部署运维
1. 部署架构
+---------------+ +---------------+
| API Server | | API Server |
+---------------+ +---------------+
| |
+---------------+ +---------------+
| Engine Worker | | Engine Worker |
+---------------+ +---------------+
| |
+---------------+ +---------------+
| Node Worker | | Node Worker |
+---------------+ +---------------+
2. 监控方案
-
系统监控
- JVM指标
- 线程池状态
- 数据库连接池
-
业务监控
- 工作流执行状态
- 节点执行性能
- 错误率统计
3. 告警方案
-
系统告警
- 资源使用率
- 错误率阈值
- 响应时间
-
业务告警
- 执行超时
- 节点失败
- 异常终止
4. 运维工具
- 管理控制台
- 监控面板
- 运维脚本
- 诊断工具
五、项目管理
1. 开发计划
- Phase 1: 核心功能实现 (已完成)
- Phase 2: 高级特性开发 (进行中)
- Phase 3: 性能优化和稳定性提升
- Phase 4: 运维工具和监控体系建设
2. 测试策略
- 单元测试
- 集成测试
- 性能测试
- 稳定性测试
3. 文档规划
- 设计文档
- API文档
- 使用手册
- 运维手册
4. 版本规划
- v1.0: 基础功能版本
- v1.1: 高级特性版本
- v1.2: 性能优化版本
- v2.0: 企业版本