This commit is contained in:
asp_ly 2024-12-15 20:00:32 +08:00
parent 42fde0d2d2
commit a18e84b4a7
7 changed files with 283 additions and 18 deletions

View File

@ -1,29 +1,37 @@
package com.qqchen.deploy.backend.workflow.config;
import com.qqchen.deploy.backend.workflow.listener.FlowableJobEventListener;
import jakarta.annotation.Resource;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.EngineConfigurationConfigurer;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
/**
* Flowable配置类
*/
@Configuration
public class FlowableConfig implements EngineConfigurationConfigurer<SpringProcessEngineConfiguration> {
@Resource
private FlowableJobEventListener flowableJobEventListener;
@Override
public void configure(SpringProcessEngineConfiguration engineConfiguration) {
engineConfiguration.setActivityFontName("宋体");
engineConfiguration.setLabelFontName("宋体");
engineConfiguration.setAnnotationFontName("宋体");
engineConfiguration.setDatabaseSchemaUpdate("true");
// 配置异步执行器
engineConfiguration.setAsyncExecutorActivate(true);
// 设置重试次数为0禁用重试
// engineConfiguration.setAsyncExecutorNumberOfRetries(0);
// 设置失败等待时间为最小值
// engineConfiguration.setAsyncFailedJobWaitTime(1);
// // 配置异步执行器参数
// engineConfiguration.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(1000);
// engineConfiguration.setAsyncExecutorDefaultTimerJobAcquireWaitTime(1000);
@ -36,5 +44,8 @@ public class FlowableConfig implements EngineConfigurationConfigurer<SpringProce
//
// // 禁用定义缓存
// engineConfiguration.setEnableProcessDefinitionInfoCache(false);
// 添加事件监听器
engineConfiguration.setEventListeners(Arrays.asList(flowableJobEventListener));
}
}

View File

@ -2,6 +2,7 @@ package com.qqchen.deploy.backend.workflow.dto;
import com.qqchen.deploy.backend.framework.dto.BaseDTO;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import lombok.Data;
import java.util.Date;
import java.util.List;
@ -32,7 +33,7 @@ public class WorkflowExecutionDTO extends BaseDTO {
/**
* 流程状态
*/
private WorkflowInstanceStatusEnums status;
private WorkflowNodeInstanceStatusEnums status;
/**
* 当前活动节点ID
@ -94,7 +95,7 @@ public class WorkflowExecutionDTO extends BaseDTO {
/**
* 节点状态
*/
private WorkflowInstanceStatusEnums status;
private WorkflowNodeInstanceStatusEnums status;
/**
* 开始时间

View File

@ -0,0 +1,104 @@
package com.qqchen.deploy.backend.workflow.enums;
import lombok.Getter;
@Getter
public enum WorkflowNodeInstanceStatusEnums {
/**
* 未开始节点尚未开始执行
*/
NOT_STARTED("NOT_STARTED", "未开始"),
/**
* 已创建流程实例已创建但还未开始运行
*/
CREATED("CREATED", "已创建"),
/**
* 运行中流程实例正在执行
*/
RUNNING("RUNNING", "运行中"),
/**
* 已暂停流程实例被手动暂停
*/
SUSPENDED("SUSPENDED", "已暂停"),
/**
* 已完成流程实例正常完成
*/
COMPLETED("COMPLETED", "已完成"),
/**
* 已终止流程实例被手动终止
*/
TERMINATED("TERMINATED", "已终止"),
/**
* 执行失败流程实例执行过程中发生错误
*/
FAILED("FAILED", "执行失败");
/**
* 状态编码
*/
private final String code;
/**
* 状态描述
*/
private final String description;
WorkflowNodeInstanceStatusEnums(String code, String description) {
this.code = code;
this.description = description;
}
/**
* 根据状态编码获取枚举实例
*
* @param code 状态编码
* @return 对应的枚举实例
*/
public static WorkflowNodeInstanceStatusEnums fromCode(String code) {
for (WorkflowNodeInstanceStatusEnums status : WorkflowNodeInstanceStatusEnums.values()) {
if (status.getCode().equals(code)) {
return status;
}
}
throw new IllegalArgumentException("Invalid workflow instance status code: " + code);
}
/**
* 判断是否为终态不可再变化的状态
*/
public boolean isFinalState() {
return this == COMPLETED || this == TERMINATED || this == FAILED;
}
/**
* 判断是否可以暂停
*/
public boolean canBeSuspended() {
return this == RUNNING;
}
/**
* 判断是否可以恢复
*/
public boolean canBeResumed() {
return this == SUSPENDED;
}
/**
* 判断是否可以终止
*/
public boolean canBeTerminated() {
return this == RUNNING || this == SUSPENDED;
}
@Override
public String toString() {
return this.code;
}
}

View File

@ -0,0 +1,149 @@
package com.qqchen.deploy.backend.workflow.listener;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.service.IWorkflowInstanceService;
import com.qqchen.deploy.backend.workflow.service.IWorkflowNodeInstanceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
@Component
@Slf4j
public class FlowableJobEventListener implements FlowableEventListener {
@Resource
@Lazy
private IWorkflowInstanceService workflowInstanceService;
@Resource
@Lazy
private IWorkflowNodeInstanceService workflowNodeInstanceService;
@Override
@EventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void onEvent(FlowableEvent event) {
log.info("FlowableJobEventListener: {}", event);
FlowableEventType eventType = event.getType();
if (isProcessLevelEvent(eventType.name())) {
WorkflowInstanceStatusEnums status = convertProcessLevelEventToStatus(eventType.name());
FlowableEngineEntityEvent entity = (FlowableEngineEntityEvent) event;
log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, entity.getProcessInstanceId());
workflowInstanceService.updateInstanceStatus(entity.getProcessInstanceId(), status);
return;
}
if (isActivityInstanceEvent(eventType.name())) {
FlowableEngineEntityEvent entity = (FlowableEngineEntityEvent) event;
log.info("Activity level event received: {}, processInstanceId: {}", eventType, entity.getProcessInstanceId());
WorkflowNodeInstanceStatusEnums status = convertActivityEventToStatus(eventType.name());
if (status != null) {
workflowNodeInstanceService.updateNodeStatus(
entity.getProcessInstanceId(),
entity.getExecutionId(),
status
);
}
}
}
@Override
public boolean isFailOnException() {
return false;
}
@Override
public boolean isFireOnTransactionLifecycleEvent() {
return false;
}
@Override
public String getOnTransaction() {
return "";
}
private boolean isProcessLevelEvent(String eventType) {
return eventType.startsWith("PROCESS_");
}
private boolean isActivityInstanceEvent(String eventType) {
return eventType.startsWith("ACTIVITY_");
}
/**
* 将Flowable流程级别事件类型转换为工作流实例状态
* 只处理流程实例级别的事件不处理活动节点级别的事件
*
* @param eventType Flowable事件类型
* @return 工作流实例状态枚举如果不是流程级别事件则返回null
*/
private WorkflowInstanceStatusEnums convertProcessLevelEventToStatus(String eventType) {
switch (eventType) {
// 创建相关
case "PROCESS_CREATED":
return WorkflowInstanceStatusEnums.CREATED;
// 运行相关
case "PROCESS_STARTED":
return WorkflowInstanceStatusEnums.RUNNING;
// 暂停相关
case "PROCESS_SUSPENDED":
return WorkflowInstanceStatusEnums.SUSPENDED;
// 恢复相关
case "PROCESS_RESUMED":
return WorkflowInstanceStatusEnums.RUNNING;
// 完成相关
case "PROCESS_COMPLETED":
return WorkflowInstanceStatusEnums.COMPLETED;
// 终止相关
case "PROCESS_CANCELLED":
return WorkflowInstanceStatusEnums.TERMINATED;
// 失败相关
case "PROCESS_COMPLETED_WITH_ERROR_END_EVENT":
return WorkflowInstanceStatusEnums.FAILED;
default:
// 不是流程级别的事件返回null
return null;
}
}
/**
* 将Flowable活动节点事件类型转换为工作流节点实例状态
*
* @param eventType Flowable事件类型
* @return 工作流节点实例状态枚举如果不是节点级别事件则返回null
*/
private WorkflowNodeInstanceStatusEnums convertActivityEventToStatus(String eventType) {
switch (eventType) {
case "ACTIVITY_STARTED":
return WorkflowNodeInstanceStatusEnums.RUNNING;
case "ACTIVITY_COMPLETED":
return WorkflowNodeInstanceStatusEnums.COMPLETED;
case "ACTIVITY_CANCELLED":
return WorkflowNodeInstanceStatusEnums.TERMINATED;
case "ACTIVITY_ERROR_RECEIVED":
return WorkflowNodeInstanceStatusEnums.FAILED;
default:
return null;
}
}
}

View File

@ -0,0 +1,2 @@
package com.qqchen.deploy.backend.workflow.listener;

View File

@ -10,6 +10,7 @@ import com.qqchen.deploy.backend.workflow.dto.graph.WorkflowDefinitionGraph;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnums;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowDefinitionRepository;
import com.qqchen.deploy.backend.workflow.repository.IWorkflowInstanceRepository;
@ -189,10 +190,10 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
// 设置流程状态
if (historicInstance.getEndTime() != null) {
executionDTO.setStatus(historicInstance.getDeleteReason() == null ?
WorkflowInstanceStatusEnums.COMPLETED : WorkflowInstanceStatusEnums.FAILED);
WorkflowNodeInstanceStatusEnums.COMPLETED : WorkflowNodeInstanceStatusEnums.FAILED);
} else {
executionDTO.setStatus(runningInstance != null && runningInstance.isSuspended() ?
WorkflowInstanceStatusEnums.SUSPENDED : WorkflowInstanceStatusEnums.RUNNING);
WorkflowNodeInstanceStatusEnums.SUSPENDED : WorkflowNodeInstanceStatusEnums.RUNNING);
}
// 7. 构建节点状态列表
@ -207,7 +208,7 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
// 如果已经有节点失败后续节点都是未开始
if (hasFailedNode) {
stage.setStatus(WorkflowInstanceStatusEnums.NOT_STARTED);
stage.setStatus(WorkflowNodeInstanceStatusEnums.NOT_STARTED);
stages.add(stage);
continue;
}
@ -215,7 +216,7 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
// 判断节点状态
if (runningNodes.contains(element.getId())) {
// 正在执行的节点
stage.setStatus(WorkflowInstanceStatusEnums.RUNNING);
stage.setStatus(WorkflowNodeInstanceStatusEnums.RUNNING);
} else {
// 查找历史记录
HistoricActivityInstance historicActivity = historicNodes.get(element.getId());
@ -226,19 +227,19 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
if (historicActivity.getEndTime() != null) {
if (historicActivity.getDeleteReason() != null) {
// 节点失败
stage.setStatus(WorkflowInstanceStatusEnums.FAILED);
stage.setStatus(WorkflowNodeInstanceStatusEnums.FAILED);
hasFailedNode = true;
} else {
// 节点完成
stage.setStatus(WorkflowInstanceStatusEnums.COMPLETED);
stage.setStatus(WorkflowNodeInstanceStatusEnums.COMPLETED);
}
} else {
// 有开始时间但没有结束时间说明正在运行
stage.setStatus(WorkflowInstanceStatusEnums.RUNNING);
stage.setStatus(WorkflowNodeInstanceStatusEnums.RUNNING);
}
} else {
// 没有历史记录未开始
stage.setStatus(WorkflowInstanceStatusEnums.NOT_STARTED);
stage.setStatus(WorkflowNodeInstanceStatusEnums.NOT_STARTED);
}
}

View File

@ -20,6 +20,8 @@ import org.flowable.engine.HistoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.runtime.ProcessInstance;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -64,16 +66,11 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl<WorkflowInstanc
}
@Override
@Transactional
public WorkflowInstance updateInstanceStatus(String processInstanceId, WorkflowInstanceStatusEnums status) {
WorkflowInstance instance = workflowInstanceRepository.findByProcessInstanceId(processInstanceId)
.orElseThrow(() -> new RuntimeException("Workflow instance not found: " + processInstanceId));
instance.setStatus(status);
if (status == WorkflowInstanceStatusEnums.COMPLETED || status == WorkflowInstanceStatusEnums.FAILED) {
instance.setEndTime(LocalDateTime.now());
}
return workflowInstanceRepository.save(instance);
}