# 工作流实例同步机制设计文档 ## 目录 1. [整体数据流向](#整体数据流向) 2. [Flowable与系统映射关系](#Flowable与系统映射关系) 3. [数据同步实现](#数据同步实现) 4. [事件监听机制](#事件监听机制) 5. [状态管理](#状态管理) 6. [最佳实践](#最佳实践) ## 整体数据流向 ``` [前端流程设计器] | | 1. 用户拖拽设计流程 | - 选择节点类型(从workflow_node_definition获取可用节点) | - 配置节点属性(根据节点的form_config渲染表单) | - 连接节点、设置条件 ↓ [前端数据模型] | - 图形数据(nodes、edges) | - 表单数据(各节点的配置信息) | - 流程基本信息(名称、描述等) | | 2. 保存流程设计 ↓ [后端 WorkflowDefinitionController] | | 3. 接收流程设计数据 ↓ [后端 WorkflowDefinitionService] | | 4. 处理流程数据 | - 保存流程定义基本信息到 workflow_definition | - 转换图形数据为BPMN XML | - 解析节点配置,更新workflow_node_definition | * 对于新的节点类型:创建新记录 | * 对于已有节点:更新配置 | | 5. 部署到Flowable ↓ [Flowable引擎] | | 6. 部署流程 | - 保存BPMN XML | - 生成流程定义ID | | 7. 启动流程实例 ↓ [后端 WorkflowInstanceService] | | 8. 创建流程实例 | - 创建workflow_instance记录 | - 关联workflow_definition | | 9. 节点实例化 | - 创建workflow_node_instance记录 | - 关联workflow_node_definition ↓ [数据库] | | 实时同步的表: | - workflow_definition(流程定义) | - workflow_node_definition(节点定义) | - workflow_instance(流程实例) | - workflow_node_instance(节点实例) | | Flowable表: | - ACT_RE_*(流程定义相关表) | - ACT_RU_*(运行时数据表) | - ACT_HI_*(历史数据表) ↓ [前端任务列表/监控页面] | | 10. 展示流程实例 | - 查询实例状态 | - 显示节点执行情况 | - 处理用户任务 | - 查看历史记录 ``` ## Flowable与系统映射关系 ### 表结构映射 #### Flowable核心表 ``` [流程定义相关] ACT_RE_DEPLOYMENT: 流程部署表 ACT_RE_PROCDEF: 流程定义表 [流程实例相关] ACT_RU_EXECUTION: 运行时流程实例表 ACT_RU_TASK: 运行时任务表 ACT_RU_VARIABLE: 运行时变量表 [历史数据] ACT_HI_PROCINST: 历史流程实例表 ACT_HI_TASKINST: 历史任务实例表 ACT_HI_ACTINST: 历史活动实例表 ``` #### 系统表与Flowable映射关系 ``` 我们的表 Flowable的表 -------------------------------------------------- workflow_definition <-> ACT_RE_PROCDEF workflow_instance <-> ACT_RU_EXECUTION/ACT_HI_PROCINST workflow_node_instance <-> ACT_RU_TASK/ACT_HI_TASKINST ``` ## 数据同步实现 ### 流程实例服务实现 ```java @Service @Transactional public class WorkflowInstanceService { @Resource private RuntimeService runtimeService; // Flowable的运行时服务 @Resource private TaskService taskService; // Flowable的任务服务 public WorkflowInstance startProcess(WorkflowInstanceCreateDTO createDTO) { // 1. 启动Flowable流程实例 ProcessInstance processInstance = runtimeService.startProcessInstanceById( createDTO.getProcessDefinitionId(), createDTO.getBusinessKey(), createDTO.getVariables() ); // 2. 创建我们自己的流程实例记录 WorkflowInstance workflowInstance = new WorkflowInstance(); workflowInstance.setProcessInstanceId(processInstance.getId()); workflowInstance.setProcessDefinitionId(createDTO.getProcessDefinitionId()); workflowInstance.setBusinessKey(createDTO.getBusinessKey()); workflowInstance.setStatus(WorkflowInstanceStatusEnums.RUNNING); workflowInstance.setVariables(JsonUtils.toJsonString(createDTO.getVariables())); workflowInstance.setStartTime(LocalDateTime.now()); workflowInstanceRepository.save(workflowInstance); // 3. 获取当前活动的任务 List tasks = taskService.createTaskQuery() .processInstanceId(processInstance.getId()) .list(); // 4. 为每个活动的任务创建节点实例 for (Task task : tasks) { WorkflowNodeInstance nodeInstance = new WorkflowNodeInstance(); nodeInstance.setWorkflowInstanceId(workflowInstance.getId()); nodeInstance.setNodeId(task.getTaskDefinitionKey()); nodeInstance.setNodeName(task.getName()); nodeInstance.setNodeType("userTask"); // 或者从定义中获取 nodeInstance.setStatus("ACTIVE"); nodeInstance.setStartTime(LocalDateTime.now()); workflowNodeInstanceRepository.save(nodeInstance); } return workflowInstance; } } ``` ### 数据同步服务实现 ```java @Service public class WorkflowSyncService { @Scheduled(fixedRate = 60000) // 每分钟执行一次 public void syncWorkflowStatus() { // 1. 查找所有运行中的实例 List runningInstances = workflowInstanceRepository .findByStatus(WorkflowInstanceStatusEnums.RUNNING); for (WorkflowInstance instance : runningInstances) { // 2. 检查Flowable中的状态 ProcessInstance processInstance = runtimeService.createProcessInstanceQuery() .processInstanceId(instance.getProcessInstanceId()) .singleResult(); if (processInstance == null) { // Flowable中实例已结束,更新我们的状态 HistoricProcessInstance historicInstance = historyService .createHistoricProcessInstanceQuery() .processInstanceId(instance.getProcessInstanceId()) .singleResult(); instance.setStatus(convertFlowableStatus(historicInstance.getEndActivityId())); instance.setEndTime(LocalDateTime.now()); workflowInstanceRepository.save(instance); } // 3. 同步节点实例状态 syncNodeInstances(instance.getId(), instance.getProcessInstanceId()); } } private void syncNodeInstances(Long workflowInstanceId, String processInstanceId) { // 1. 获取当前活动的任务 List activeTasks = taskService.createTaskQuery() .processInstanceId(processInstanceId) .list(); // 2. 获取历史任务 List historicTasks = historyService .createHistoricTaskInstanceQuery() .processInstanceId(processInstanceId) .finished() .list(); // 3. 更新节点实例状态 Set processedTaskIds = new HashSet<>(); // 处理活动任务 for (Task task : activeTasks) { WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance( workflowInstanceId, task.getTaskDefinitionKey()); updateNodeInstance(nodeInstance, task, null); processedTaskIds.add(task.getTaskDefinitionKey()); } // 处理历史任务 for (HistoricTaskInstance historicTask : historicTasks) { if (!processedTaskIds.contains(historicTask.getTaskDefinitionKey())) { WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance( workflowInstanceId, historicTask.getTaskDefinitionKey()); updateNodeInstance(nodeInstance, null, historicTask); } } } } ``` ## 事件监听机制 ### Flowable事件监听器 ```java @Component public class FlowableEventListener implements TaskListener, ExecutionListener { @Resource private WorkflowInstanceService workflowInstanceService; @Override public void notify(DelegateTask task) { // 任务事件处理 String eventName = task.getEventName(); switch (eventName) { case "create": workflowInstanceService.onTaskCreate(task); break; case "complete": workflowInstanceService.onTaskComplete(task); break; case "delete": workflowInstanceService.onTaskDelete(task); break; } } @Override public void notify(DelegateExecution execution) { // 流程执行事件处理 String eventName = execution.getEventName(); switch (eventName) { case "start": workflowInstanceService.onProcessStart(execution); break; case "end": workflowInstanceService.onProcessEnd(execution); break; } } } ``` ## 状态管理 ### 工作流实例状态枚举 ```java public enum WorkflowInstanceStatusEnums { NOT_STARTED, // 未开始 RUNNING, // 运行中 SUSPENDED, // 已暂停 COMPLETED, // 已完成 TERMINATED, // 已终止 FAILED; // 执行失败 public static WorkflowInstanceStatusEnums fromFlowableStatus(String flowableStatus) { switch (flowableStatus) { case "active": return RUNNING; case "suspended": return SUSPENDED; case "completed": return COMPLETED; case "terminated": return TERMINATED; default: return FAILED; } } } ``` ## 最佳实践 ### 为什么需要自己维护实例表 1. **业务扩展性** - 可以添加更多业务相关的字段 - 可以实现自定义的状态管理 - 可以关联更多业务数据 2. **性能优化** - 避免频繁查询Flowable表 - 可以建立更适合业务查询的索引 - 可以实现更好的缓存策略 3. **数据完整性** - 保存完整的业务上下文 - 记录更详细的审计信息 - 支持自定义的数据分析 ### 数据一致性保证 1. **事务管理** ```java @Transactional public void startProcess() { // 1. 启动Flowable流程 // 2. 创建系统流程实例 // 3. 创建节点实例 } ``` 2. **定时同步** - 定期检查运行中的实例状态 - 自动修复不一致的数据 - 记录同步日志 3. **事件驱动** - 监听Flowable事件 - 实时更新系统状态 - 保证数据实时性 ### 性能优化建议 1. **索引优化** - 为常用查询字段建立索引 - 使用复合索引优化多字段查询 - 避免过多索引影响写入性能 2. **缓存策略** - 缓存活动的流程实例 - 缓存常用的节点定义 - 使用分布式缓存提高性能 3. **批量处理** - 批量同步数据 - 批量更新状态 - 使用队列处理异步任务 ## 总结 1. Flowable负责流程的实际执行和调度 2. 系统维护业务层面的状态和数据 3. 通过事件监听和定时同步保证数据一致性 4. 使用状态映射处理不同系统间的状态转换 5. 通过事务确保关键操作的原子性 这样的设计可以: - 保持与Flowable的数据同步 - 支持业务扩展 - 提供更好的性能 - 确保数据一致性 - 便于问题追踪和修复