11 KiB
11 KiB
工作流实例同步机制设计文档
目录
整体数据流向
[前端流程设计器]
|
| 1. 用户拖拽设计流程
| - 选择节点类型(从workflow_node_definition获取可用节点)
| - 配置节点属性(根据节点的form_config渲染表单)
| - 连接节点、设置条件
↓
[前端数据模型]
| - 图形数据(nodes、edges)
| - 表单数据(各节点的配置信息)
| - 流程基本信息(名称、描述等)
|
| 2. 保存流程设计
↓
[后端 WorkflowDefinitionController]
|
| 3. 接收流程设计数据
↓
[后端 WorkflowDefinitionService]
|
| 4. 处理流程数据
| - 保存流程定义基本信息到 workflow_definition
| - 转换图形数据为BPMN XML
| - 解析节点配置,更新workflow_node_definition
| * 对于新的节点类型:创建新记录
| * 对于已有节点:更新配置
|
| 5. 部署到Flowable
↓
[Flowable引擎]
|
| 6. 部署流程
| - 保存BPMN XML
| - 生成流程定义ID
|
| 7. 启动流程实例
↓
[后端 WorkflowInstanceService]
|
| 8. 创建流程实例
| - 创建workflow_instance记录
| - 关联workflow_definition
|
| 9. 节点实例化
| - 创建workflow_node_instance记录
| - 关联workflow_node_definition
↓
[数据库]
|
| 实时同步的表:
| - workflow_definition(流程定义)
| - workflow_node_definition(节点定义)
| - workflow_instance(流程实例)
| - workflow_node_instance(节点实例)
|
| Flowable表:
| - ACT_RE_*(流程定义相关表)
| - ACT_RU_*(运行时数据表)
| - ACT_HI_*(历史数据表)
↓
[前端任务列表/监控页面]
|
| 10. 展示流程实例
| - 查询实例状态
| - 显示节点执行情况
| - 处理用户任务
| - 查看历史记录
Flowable与系统映射关系
表结构映射
Flowable核心表
[流程定义相关]
ACT_RE_DEPLOYMENT: 流程部署表
ACT_RE_PROCDEF: 流程定义表
[流程实例相关]
ACT_RU_EXECUTION: 运行时流程实例表
ACT_RU_TASK: 运行时任务表
ACT_RU_VARIABLE: 运行时变量表
[历史数据]
ACT_HI_PROCINST: 历史流程实例表
ACT_HI_TASKINST: 历史任务实例表
ACT_HI_ACTINST: 历史活动实例表
系统表与Flowable映射关系
我们的表 Flowable的表
--------------------------------------------------
workflow_definition <-> ACT_RE_PROCDEF
workflow_instance <-> ACT_RU_EXECUTION/ACT_HI_PROCINST
workflow_node_instance <-> ACT_RU_TASK/ACT_HI_TASKINST
数据同步实现
流程实例服务实现
@Service
@Transactional
public class WorkflowInstanceService {
@Resource
private RuntimeService runtimeService; // Flowable的运行时服务
@Resource
private TaskService taskService; // Flowable的任务服务
public WorkflowInstance startProcess(WorkflowInstanceCreateDTO createDTO) {
// 1. 启动Flowable流程实例
ProcessInstance processInstance = runtimeService.startProcessInstanceById(
createDTO.getProcessDefinitionId(),
createDTO.getBusinessKey(),
createDTO.getVariables()
);
// 2. 创建我们自己的流程实例记录
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setProcessInstanceId(processInstance.getId());
workflowInstance.setProcessDefinitionId(createDTO.getProcessDefinitionId());
workflowInstance.setBusinessKey(createDTO.getBusinessKey());
workflowInstance.setStatus(WorkflowInstanceStatusEnums.RUNNING);
workflowInstance.setVariables(JsonUtils.toJsonString(createDTO.getVariables()));
workflowInstance.setStartTime(LocalDateTime.now());
workflowInstanceRepository.save(workflowInstance);
// 3. 获取当前活动的任务
List<Task> tasks = taskService.createTaskQuery()
.processInstanceId(processInstance.getId())
.list();
// 4. 为每个活动的任务创建节点实例
for (Task task : tasks) {
WorkflowNodeInstance nodeInstance = new WorkflowNodeInstance();
nodeInstance.setWorkflowInstanceId(workflowInstance.getId());
nodeInstance.setNodeId(task.getTaskDefinitionKey());
nodeInstance.setNodeName(task.getName());
nodeInstance.setNodeType("userTask"); // 或者从定义中获取
nodeInstance.setStatus("ACTIVE");
nodeInstance.setStartTime(LocalDateTime.now());
workflowNodeInstanceRepository.save(nodeInstance);
}
return workflowInstance;
}
}
数据同步服务实现
@Service
public class WorkflowSyncService {
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void syncWorkflowStatus() {
// 1. 查找所有运行中的实例
List<WorkflowInstance> runningInstances = workflowInstanceRepository
.findByStatus(WorkflowInstanceStatusEnums.RUNNING);
for (WorkflowInstance instance : runningInstances) {
// 2. 检查Flowable中的状态
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(instance.getProcessInstanceId())
.singleResult();
if (processInstance == null) {
// Flowable中实例已结束,更新我们的状态
HistoricProcessInstance historicInstance = historyService
.createHistoricProcessInstanceQuery()
.processInstanceId(instance.getProcessInstanceId())
.singleResult();
instance.setStatus(convertFlowableStatus(historicInstance.getEndActivityId()));
instance.setEndTime(LocalDateTime.now());
workflowInstanceRepository.save(instance);
}
// 3. 同步节点实例状态
syncNodeInstances(instance.getId(), instance.getProcessInstanceId());
}
}
private void syncNodeInstances(Long workflowInstanceId, String processInstanceId) {
// 1. 获取当前活动的任务
List<Task> activeTasks = taskService.createTaskQuery()
.processInstanceId(processInstanceId)
.list();
// 2. 获取历史任务
List<HistoricTaskInstance> historicTasks = historyService
.createHistoricTaskInstanceQuery()
.processInstanceId(processInstanceId)
.finished()
.list();
// 3. 更新节点实例状态
Set<String> processedTaskIds = new HashSet<>();
// 处理活动任务
for (Task task : activeTasks) {
WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance(
workflowInstanceId, task.getTaskDefinitionKey());
updateNodeInstance(nodeInstance, task, null);
processedTaskIds.add(task.getTaskDefinitionKey());
}
// 处理历史任务
for (HistoricTaskInstance historicTask : historicTasks) {
if (!processedTaskIds.contains(historicTask.getTaskDefinitionKey())) {
WorkflowNodeInstance nodeInstance = getOrCreateNodeInstance(
workflowInstanceId, historicTask.getTaskDefinitionKey());
updateNodeInstance(nodeInstance, null, historicTask);
}
}
}
}
事件监听机制
Flowable事件监听器
@Component
public class FlowableEventListener implements TaskListener, ExecutionListener {
@Resource
private WorkflowInstanceService workflowInstanceService;
@Override
public void notify(DelegateTask task) {
// 任务事件处理
String eventName = task.getEventName();
switch (eventName) {
case "create":
workflowInstanceService.onTaskCreate(task);
break;
case "complete":
workflowInstanceService.onTaskComplete(task);
break;
case "delete":
workflowInstanceService.onTaskDelete(task);
break;
}
}
@Override
public void notify(DelegateExecution execution) {
// 流程执行事件处理
String eventName = execution.getEventName();
switch (eventName) {
case "start":
workflowInstanceService.onProcessStart(execution);
break;
case "end":
workflowInstanceService.onProcessEnd(execution);
break;
}
}
}
状态管理
工作流实例状态枚举
public enum WorkflowInstanceStatusEnums {
NOT_STARTED, // 未开始
RUNNING, // 运行中
SUSPENDED, // 已暂停
COMPLETED, // 已完成
TERMINATED, // 已终止
FAILED; // 执行失败
public static WorkflowInstanceStatusEnums fromFlowableStatus(String flowableStatus) {
switch (flowableStatus) {
case "active":
return RUNNING;
case "suspended":
return SUSPENDED;
case "completed":
return COMPLETED;
case "terminated":
return TERMINATED;
default:
return FAILED;
}
}
}
最佳实践
为什么需要自己维护实例表
-
业务扩展性
- 可以添加更多业务相关的字段
- 可以实现自定义的状态管理
- 可以关联更多业务数据
-
性能优化
- 避免频繁查询Flowable表
- 可以建立更适合业务查询的索引
- 可以实现更好的缓存策略
-
数据完整性
- 保存完整的业务上下文
- 记录更详细的审计信息
- 支持自定义的数据分析
数据一致性保证
- 事务管理
@Transactional
public void startProcess() {
// 1. 启动Flowable流程
// 2. 创建系统流程实例
// 3. 创建节点实例
}
- 定时同步
- 定期检查运行中的实例状态
- 自动修复不一致的数据
- 记录同步日志
- 事件驱动
- 监听Flowable事件
- 实时更新系统状态
- 保证数据实时性
性能优化建议
- 索引优化
- 为常用查询字段建立索引
- 使用复合索引优化多字段查询
- 避免过多索引影响写入性能
- 缓存策略
- 缓存活动的流程实例
- 缓存常用的节点定义
- 使用分布式缓存提高性能
- 批量处理
- 批量同步数据
- 批量更新状态
- 使用队列处理异步任务
总结
- Flowable负责流程的实际执行和调度
- 系统维护业务层面的状态和数据
- 通过事件监听和定时同步保证数据一致性
- 使用状态映射处理不同系统间的状态转换
- 通过事务确保关键操作的原子性
这样的设计可以:
- 保持与Flowable的数据同步
- 支持业务扩展
- 提供更好的性能
- 确保数据一致性
- 便于问题追踪和修复