deploy-ease-platform/backend/docs/workflow-development.md
2024-12-03 21:16:39 +08:00

12 KiB

工作流引擎开发文档

一、已完成工作

1. 核心实体设计

  • WorkflowDefinition: 工作流定义

    • 包含基本信息(编码、名称、描述等)
    • 包含流程定义内容(JSON格式)
    • 支持版本控制和状态管理
  • WorkflowInstance: 工作流实例

    • 关联工作流定义
    • 记录执行状态和进度
    • 支持暂停、恢复、取消等操作
  • NodeInstance: 节点实例

    • 关联工作流实例
    • 记录节点执行状态和结果
    • 支持重试和跳过等操作

2. 节点类型体系

  • 基础节点类型:

    • START: 开始节点
    • END: 结束节点
    • CONDITION: 条件节点
    • PARALLEL: 并行节点
  • 功能节点类型:

    • APPROVAL: 审批节点
    • JENKINS: Jenkins构建节点
    • SCRIPT: 脚本执行节点
    • GIT: Git操作节点
    • NACOS: 配置中心节点
    • HTTP: HTTP请求节点
    • NOTIFY: 通知节点

3. 工作流引擎实现

  • 核心接口设计

    • WorkflowEngine: 工作流引擎接口
    • NodeExecutor: 节点执行器接口
  • 默认实现

    • DefaultWorkflowEngine: 默认工作流引擎实现
    • AbstractNodeExecutor: 抽象节点执行器
    • ShellNodeExecutor等具体执行器

4. 变量与日志管理

  • WorkflowVariable: 工作流变量

    • 支持全局变量和节点变量
    • 支持变量引用和替换
  • WorkflowLog: 工作流日志

    • 支持不同日志类型和级别
    • 支持详细的执行记录

5. 权限管理

  • WorkflowPermission: 工作流权限
    • 支持角色和用户级别的权限控制
    • 支持操作级别的权限控制

6. API接口设计

  • 工作流定义管理
  • 工作流实例操作
  • 节点实例管理
  • 日志查询等功能

7. 错误码和消息

  • 工作流相关错误码(2700-2799)
  • 详细的错误消息定义

二、待完成工作

1. 高级功能实现

1.1 节点执行引擎增强

public interface NodeExecutor {
    // 新增预执行检查
    boolean preCheck(NodeInstance node, WorkflowContext context);
    
    // 新增补偿操作
    void compensate(NodeInstance node, WorkflowContext context);
    
    // 新增超时处理
    void handleTimeout(NodeInstance node, WorkflowContext context);
}

// 节点执行状态追踪
public class NodeExecutionTracker {
    private Long nodeInstanceId;
    private Date startTime;
    private Date endTime;
    private String status;
    private Map<String, Object> metrics;
    private List<String> logs;
}

1.2 工作流调度管理

@Service
public class WorkflowScheduleService {
    // 创建调度任务
    public void createSchedule(WorkflowSchedule schedule);
    
    // 修改调度配置
    public void updateSchedule(WorkflowSchedule schedule);
    
    // 启用/禁用调度
    public void toggleSchedule(Long scheduleId, boolean enabled);
    
    // 手动触发
    public void triggerSchedule(Long scheduleId);
}

// 调度配置表
CREATE TABLE wf_workflow_schedule (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    workflow_definition_id BIGINT NOT NULL COMMENT '工作流定义ID',
    name VARCHAR(100) NOT NULL COMMENT '调度名称',
    cron VARCHAR(100) NOT NULL COMMENT 'cron表达式',
    variables JSON COMMENT '工作流变量',
    enabled BIT NOT NULL DEFAULT 1 COMMENT '是否启用',
    last_fire_time DATETIME COMMENT '上次触发时间',
    next_fire_time DATETIME COMMENT '下次触发时间',
    -- 其他基础字段
);

1.3 工作流监控告警

public interface WorkflowMonitor {
    // 收集性能指标
    void collectMetrics(WorkflowInstance instance);
    
    // 检查健康状态
    HealthStatus checkHealth();
    
    // 触发告警
    void triggerAlert(AlertType type, String message);
}

// 监控指标表
CREATE TABLE wf_workflow_metrics (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    workflow_instance_id BIGINT NOT NULL,
    metric_name VARCHAR(100) NOT NULL,
    metric_value DECIMAL(19,2) NOT NULL,
    collect_time DATETIME NOT NULL,
    -- 其他基础字段
);

// 告警记录表
CREATE TABLE wf_workflow_alert (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    alert_type VARCHAR(50) NOT NULL,
    target_type VARCHAR(50) NOT NULL,
    target_id BIGINT NOT NULL,
    level VARCHAR(20) NOT NULL,
    message TEXT NOT NULL,
    status VARCHAR(20) NOT NULL,
    -- 其他基础字段
);

1.4 工作流分析统计

public interface WorkflowAnalytics {
    // 执行时长分析
    Map<String, Duration> analyzeExecutionTime(Long workflowId);
    
    // 成功率分析
    Map<String, Double> analyzeSuccessRate(Long workflowId);
    
    // 节点耗时分析
    List<NodeTimeAnalysis> analyzeNodeTime(Long workflowId);
}

// 分析结果表
CREATE TABLE wf_workflow_analytics (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    workflow_definition_id BIGINT NOT NULL,
    analysis_type VARCHAR(50) NOT NULL,
    time_range VARCHAR(50) NOT NULL,
    metrics JSON NOT NULL,
    -- 其他基础字段
);

2. 功能优化计划

2.1 性能优化

  • 引入本地缓存和分布式缓存
  • 实现工作流实例分片执行
  • 优化日志存储和查询
@Configuration
public class WorkflowCacheConfig {
    @Bean
    public Cache<String, WorkflowDefinition> definitionCache() {
        return CacheBuilder.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(1, TimeUnit.HOURS)
            .build();
    }
}

public class ShardingWorkflowEngine implements WorkflowEngine {
    // 分片执行实现
    public void executeWithSharding(WorkflowInstance instance) {
        String shardingKey = calculateShardingKey(instance);
        ShardingContext context = createShardingContext(shardingKey);
        executeInShard(instance, context);
    }
}

2.2 可靠性增强

  • 实现节点执行幂等性
  • 增加全局事务控制
  • 完善补偿机制
public abstract class IdempotentNodeExecutor implements NodeExecutor {
    // 幂等性检查
    protected boolean checkIdempotent(String executionId) {
        return redisTemplate.opsForValue()
            .setIfAbsent("node:execution:" + executionId, "1", 24, TimeUnit.HOURS);
    }
}

@Service
public class CompensationService {
    // 注册补偿操作
    public void registerCompensation(NodeInstance node, Runnable compensation);
    
    // 执行补偿
    public void executeCompensation(WorkflowInstance instance);
}

2.3 扩展性优化

  • 支持自定义节点类型
  • 支持插件化扩展
  • 提供更多扩展点
public interface WorkflowPlugin {
    // 插件初始化
    void init(WorkflowEngine engine);
    
    // 注册扩展点
    void registerExtensions();
    
    // 清理资源
    void destroy();
}

public class PluginManager {
    // 加载插件
    public void loadPlugins();
    
    // 启用插件
    public void enablePlugin(String pluginId);
    
    // 禁用插件
    public void disablePlugin(String pluginId);
}

3. 新特性规划

3.1 工作流模板功能

// 模板定义表
CREATE TABLE wf_workflow_template (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    code VARCHAR(100) NOT NULL UNIQUE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    content JSON NOT NULL,
    category VARCHAR(50),
    tags JSON,
    -- 其他基础字段
);

// 模板参数表
CREATE TABLE wf_template_parameter (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    template_id BIGINT NOT NULL,
    name VARCHAR(100) NOT NULL,
    type VARCHAR(50) NOT NULL,
    required BIT NOT NULL DEFAULT 0,
    default_value VARCHAR(255),
    validation_rule VARCHAR(255),
    -- 其他基础字段
);

3.2 工作流版本管理

// 版本管理表
CREATE TABLE wf_workflow_version (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    workflow_definition_id BIGINT NOT NULL,
    version VARCHAR(20) NOT NULL,
    content JSON NOT NULL,
    change_log TEXT,
    status VARCHAR(20) NOT NULL,
    -- 其他基础字段
);

public interface VersionManager {
    // 创建新版本
    String createVersion(Long workflowId, String content);
    
    // 发布版本
    void publishVersion(Long workflowId, String version);
    
    // 回滚版本
    void rollbackVersion(Long workflowId, String version);
}

3.3 工作流迁移功能

public interface WorkflowMigration {
    // 导出工作流
    byte[] exportWorkflow(Long workflowId);
    
    // 导入工作流
    Long importWorkflow(byte[] content);
    
    // 迁移实例到新版本
    void migrateInstance(Long instanceId, String targetVersion);
}

// 迁移记录表
CREATE TABLE wf_workflow_migration (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    source_workflow_id BIGINT NOT NULL,
    target_workflow_id BIGINT NOT NULL,
    migration_type VARCHAR(50) NOT NULL,
    status VARCHAR(20) NOT NULL,
    error_message TEXT,
    -- 其他基础字段
);

3.4 工作流测试功能

public interface WorkflowTesting {
    // 模拟执行
    TestResult simulateExecution(Long workflowId, Map<String, Object> variables);
    
    // 节点单元测试
    TestResult testNode(NodeConfig config, Map<String, Object> inputs);
    
    // 生成测试报告
    TestReport generateReport(Long testExecutionId);
}

// 测试用例表
CREATE TABLE wf_workflow_test_case (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    workflow_definition_id BIGINT NOT NULL,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    variables JSON,
    expected_result JSON,
    -- 其他基础字段
);

// 测试执行记录表
CREATE TABLE wf_workflow_test_execution (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    test_case_id BIGINT NOT NULL,
    execution_time DATETIME NOT NULL,
    status VARCHAR(20) NOT NULL,
    actual_result JSON,
    error_message TEXT,
    -- 其他基础字段
);

三、技术架构

1. 整体架构

+------------------+
|   API Layer      |
+------------------+
|   Service Layer  |
+------------------+
|   Engine Core    |
+------------------+
|   Storage Layer  |
+------------------+

2. 关键组件

  • 工作流引擎核心
  • 节点执行引擎
  • 变量管理器
  • 日志管理器
  • 权限管理器
  • 调度管理器
  • 监控告警组件
  • 分析统计组件

3. 存储设计

  • 核心业务表
  • 执行记录表
  • 监控指标表
  • 分析统计表
  • 日志记录表

4. 缓存设计

  • 本地缓存

    • 工作流定义缓存
    • 节点配置缓存
  • 分布式缓存

    • 执行状态缓存
    • 变量数据缓存

四、部署运维

1. 部署架构

+---------------+    +---------------+
|  API Server   |    |  API Server   |
+---------------+    +---------------+
        |                   |
+---------------+    +---------------+
| Engine Worker |    | Engine Worker |
+---------------+    +---------------+
        |                   |
+---------------+    +---------------+
|  Node Worker  |    |  Node Worker  |
+---------------+    +---------------+

2. 监控方案

  • 系统监控

    • JVM指标
    • 线程池状态
    • 数据库连接池
  • 业务监控

    • 工作流执行状态
    • 节点执行性能
    • 错误率统计

3. 告警方案

  • 系统告警

    • 资源使用率
    • 错误率阈值
    • 响应时间
  • 业务告警

    • 执行超时
    • 节点失败
    • 异常终止

4. 运维工具

  • 管理控制台
  • 监控面板
  • 运维脚本
  • 诊断工具

五、项目管理

1. 开发计划

  • Phase 1: 核心功能实现 (已完成)
  • Phase 2: 高级特性开发 (进行中)
  • Phase 3: 性能优化和稳定性提升
  • Phase 4: 运维工具和监控体系建设

2. 测试策略

  • 单元测试
  • 集成测试
  • 性能测试
  • 稳定性测试

3. 文档规划

  • 设计文档
  • API文档
  • 使用手册
  • 运维手册

4. 版本规划

  • v1.0: 基础功能版本
  • v1.1: 高级特性版本
  • v1.2: 性能优化版本
  • v2.0: 企业版本