deploy-ease-platform/backend/docs/workflow/workflow_instance_sync.md
2024-12-11 23:33:05 +08:00

11 KiB
Raw Blame History

工作流实例同步机制设计文档

目录

  1. 整体数据流向
  2. Flowable与系统映射关系
  3. 数据同步实现
  4. 事件监听机制
  5. 状态管理
  6. 最佳实践

整体数据流向

[前端流程设计器]
    |
    | 1. 用户拖拽设计流程
    | - 选择节点类型从workflow_node_definition获取可用节点
    | - 配置节点属性根据节点的form_config渲染表单
    | - 连接节点、设置条件
    ↓
[前端数据模型]
    | - 图形数据nodes、edges
    | - 表单数据(各节点的配置信息)
    | - 流程基本信息(名称、描述等)
    |
    | 2. 保存流程设计
    ↓
[后端 WorkflowDefinitionController]
    |
    | 3. 接收流程设计数据
    ↓
[后端 WorkflowDefinitionService]
    | 
    | 4. 处理流程数据
    | - 保存流程定义基本信息到 workflow_definition
    | - 转换图形数据为BPMN XML
    | - 解析节点配置更新workflow_node_definition
    |   * 对于新的节点类型:创建新记录
    |   * 对于已有节点:更新配置
    |
    | 5. 部署到Flowable
    ↓
[Flowable引擎]
    |
    | 6. 部署流程
    | - 保存BPMN XML
    | - 生成流程定义ID
    |
    | 7. 启动流程实例
    ↓
[后端 WorkflowInstanceService]
    |
    | 8. 创建流程实例
    | - 创建workflow_instance记录
    | - 关联workflow_definition
    |
    | 9. 节点实例化
    | - 创建workflow_node_instance记录
    | - 关联workflow_node_definition
    ↓
[数据库]
    |
    | 实时同步的表:
    | - workflow_definition流程定义
    | - workflow_node_definition节点定义
    | - workflow_instance流程实例
    | - workflow_node_instance节点实例
    |
    | Flowable表
    | - ACT_RE_*(流程定义相关表)
    | - ACT_RU_*(运行时数据表)
    | - ACT_HI_*(历史数据表)
    ↓
[前端任务列表/监控页面]
    |
    | 10. 展示流程实例
    | - 查询实例状态
    | - 显示节点执行情况
    | - 处理用户任务
    | - 查看历史记录

Flowable与系统映射关系

表结构映射

Flowable核心表

[流程定义相关]
ACT_RE_DEPLOYMENT: 流程部署表
ACT_RE_PROCDEF: 流程定义表

[流程实例相关]
ACT_RU_EXECUTION: 运行时流程实例表
ACT_RU_TASK: 运行时任务表
ACT_RU_VARIABLE: 运行时变量表

[历史数据]
ACT_HI_PROCINST: 历史流程实例表
ACT_HI_TASKINST: 历史任务实例表
ACT_HI_ACTINST: 历史活动实例表

系统表与Flowable映射关系

我们的表                    Flowable的表
--------------------------------------------------
workflow_definition     <-> ACT_RE_PROCDEF
workflow_instance      <-> ACT_RU_EXECUTION/ACT_HI_PROCINST
workflow_node_instance <-> ACT_RU_TASK/ACT_HI_TASKINST

数据同步实现

流程实例服务实现

@Service
@Transactional
public class WorkflowInstanceService {
    
    @Resource
    private RuntimeService runtimeService;  // Flowable的运行时服务
    
    @Resource
    private TaskService taskService;        // Flowable的任务服务
    
    public WorkflowInstance startProcess(WorkflowInstanceCreateDTO createDTO) {
        // 1. 启动Flowable流程实例
        ProcessInstance processInstance = runtimeService.startProcessInstanceById(
            createDTO.getProcessDefinitionId(),
            createDTO.getBusinessKey(),
            createDTO.getVariables()
        );
        
        // 2. 创建我们自己的流程实例记录
        WorkflowInstance workflowInstance = new WorkflowInstance();
        workflowInstance.setProcessInstanceId(processInstance.getId());
        workflowInstance.setProcessDefinitionId(createDTO.getProcessDefinitionId());
        workflowInstance.setBusinessKey(createDTO.getBusinessKey());
        workflowInstance.setStatus(WorkflowInstanceStatusEnums.RUNNING);
        workflowInstance.setVariables(JsonUtils.toJsonString(createDTO.getVariables()));
        workflowInstance.setStartTime(LocalDateTime.now());
        workflowInstanceRepository.save(workflowInstance);
        
        // 3. 获取当前活动的任务
        List<Task> tasks = taskService.createTaskQuery()
            .processInstanceId(processInstance.getId())
            .list();
            
        // 4. 为每个活动的任务创建节点实例
        for (Task task : tasks) {
            WorkflowNodeInstance nodeInstance = new WorkflowNodeInstance();
            nodeInstance.setWorkflowInstanceId(workflowInstance.getId());
            nodeInstance.setNodeId(task.getTaskDefinitionKey());
            nodeInstance.setNodeName(task.getName());
            nodeInstance.setNodeType("userTask");  // 或者从定义中获取
            nodeInstance.setStatus("ACTIVE");
            nodeInstance.setStartTime(LocalDateTime.now());
            workflowNodeInstanceRepository.save(nodeInstance);
        }
        
        return workflowInstance;
    }
}

数据同步服务实现

@Service
public class WorkflowSyncService {
    
    @Scheduled(fixedRate = 60000)  // 每分钟执行一次
    public void syncWorkflowStatus() {
        // 1. 查找所有运行中的实例
        List<WorkflowInstance> runningInstances = workflowInstanceRepository
            .findByStatus(WorkflowInstanceStatusEnums.RUNNING);
            
        for (WorkflowInstance instance : runningInstances) {
            // 2. 检查Flowable中的状态
            ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
                .processInstanceId(instance.getProcessInstanceId())
                .singleResult();
                
            if (processInstance == null) {
                // Flowable中实例已结束更新我们的状态
                HistoricProcessInstance historicInstance = historyService
                    .createHistoricProcessInstanceQuery()
                    .processInstanceId(instance.getProcessInstanceId())
                    .singleResult();
                    
                instance.setStatus(convertFlowableStatus(historicInstance.getEndActivityId()));
                instance.setEndTime(LocalDateTime.now());
                workflowInstanceRepository.save(instance);
            }
            
            // 3. 同步节点实例状态
            syncNodeInstances(instance.getId(), instance.getProcessInstanceId());
        }
    }
    
    private void syncNodeInstances(Long workflowInstanceId, String processInstanceId) {
        // 1. 获取当前活动的任务
        List<Task> activeTasks = taskService.createTaskQuery()
            .processInstanceId(processInstanceId)
            .list();
            
        // 2. 获取历史任务
        List<HistoricTaskInstance> historicTasks = historyService
            .createHistoricTaskInstanceQuery()
            .processInstanceId(processInstanceId)
            .finished()
            .list();
            
        // 3. 更新节点实例状态
        Set<String> processedTaskIds = new HashSet<>();
        
        // 处理活动任务
        for (Task task : activeTasks) {
            WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance(
                workflowInstanceId, task.getTaskDefinitionKey());
            updateNodeInstance(nodeInstance, task, null);
            processedTaskIds.add(task.getTaskDefinitionKey());
        }
        
        // 处理历史任务
        for (HistoricTaskInstance historicTask : historicTasks) {
            if (!processedTaskIds.contains(historicTask.getTaskDefinitionKey())) {
                WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance(
                    workflowInstanceId, historicTask.getTaskDefinitionKey());
                updateNodeInstance(nodeInstance, null, historicTask);
            }
        }
    }
}

事件监听机制

Flowable事件监听器

@Component
public class FlowableEventListener implements TaskListener, ExecutionListener {
    
    @Resource
    private WorkflowInstanceService workflowInstanceService;
    
    @Override
    public void notify(DelegateTask task) {
        // 任务事件处理
        String eventName = task.getEventName();
        switch (eventName) {
            case "create":
                workflowInstanceService.onTaskCreate(task);
                break;
            case "complete":
                workflowInstanceService.onTaskComplete(task);
                break;
            case "delete":
                workflowInstanceService.onTaskDelete(task);
                break;
        }
    }
    
    @Override
    public void notify(DelegateExecution execution) {
        // 流程执行事件处理
        String eventName = execution.getEventName();
        switch (eventName) {
            case "start":
                workflowInstanceService.onProcessStart(execution);
                break;
            case "end":
                workflowInstanceService.onProcessEnd(execution);
                break;
        }
    }
}

状态管理

工作流实例状态枚举

public enum WorkflowInstanceStatusEnums {
    NOT_STARTED,   // 未开始
    RUNNING,       // 运行中
    SUSPENDED,     // 已暂停
    COMPLETED,     // 已完成
    TERMINATED,    // 已终止
    FAILED;        // 执行失败
    
    public static WorkflowInstanceStatusEnums fromFlowableStatus(String flowableStatus) {
        switch (flowableStatus) {
            case "active":
                return RUNNING;
            case "suspended":
                return SUSPENDED;
            case "completed":
                return COMPLETED;
            case "terminated":
                return TERMINATED;
            default:
                return FAILED;
        }
    }
}

最佳实践

为什么需要自己维护实例表

  1. 业务扩展性

    • 可以添加更多业务相关的字段
    • 可以实现自定义的状态管理
    • 可以关联更多业务数据
  2. 性能优化

    • 避免频繁查询Flowable表
    • 可以建立更适合业务查询的索引
    • 可以实现更好的缓存策略
  3. 数据完整性

    • 保存完整的业务上下文
    • 记录更详细的审计信息
    • 支持自定义的数据分析

数据一致性保证

  1. 事务管理
@Transactional
public void startProcess() {
    // 1. 启动Flowable流程
    // 2. 创建系统流程实例
    // 3. 创建节点实例
}
  1. 定时同步
  • 定期检查运行中的实例状态
  • 自动修复不一致的数据
  • 记录同步日志
  1. 事件驱动
  • 监听Flowable事件
  • 实时更新系统状态
  • 保证数据实时性

性能优化建议

  1. 索引优化
  • 为常用查询字段建立索引
  • 使用复合索引优化多字段查询
  • 避免过多索引影响写入性能
  1. 缓存策略
  • 缓存活动的流程实例
  • 缓存常用的节点定义
  • 使用分布式缓存提高性能
  1. 批量处理
  • 批量同步数据
  • 批量更新状态
  • 使用队列处理异步任务

总结

  1. Flowable负责流程的实际执行和调度
  2. 系统维护业务层面的状态和数据
  3. 通过事件监听和定时同步保证数据一致性
  4. 使用状态映射处理不同系统间的状态转换
  5. 通过事务确保关键操作的原子性

这样的设计可以:

  • 保持与Flowable的数据同步
  • 支持业务扩展
  • 提供更好的性能
  • 确保数据一致性
  • 便于问题追踪和修复