反序列化问题。

This commit is contained in:
dengqichen 2024-12-16 13:19:59 +08:00
parent 2d3cd8db92
commit 0f5f590906
11 changed files with 350 additions and 56 deletions

View File

@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.query.WorkflowDefinitionQuery;
@ -50,10 +51,9 @@ public class WorkflowInstanceApiController extends BaseController<WorkflowInstan
@Operation(summary = "启动工作流")
@PostMapping("/start")
public Response<WorkflowInstanceDTO> startWorkflow(
@RequestParam String processKey,
@RequestParam String businessKey) {
return Response.success(workflowInstanceService.startWorkflow(processKey, businessKey));
public Response<Void> startWorkflow(@RequestBody WorkflowInstanceStartRequest request) {
workflowInstanceService.startWorkflow(request);
return Response.success();
}
@Override

View File

@ -1,6 +1,6 @@
package com.qqchen.deploy.backend.workflow.config;
import com.qqchen.deploy.backend.workflow.listener.FlowableJobEventListener;
import com.qqchen.deploy.backend.workflow.listener.FlowableEventDispatcher;
import jakarta.annotation.Resource;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.EngineConfigurationConfigurer;
@ -14,9 +14,8 @@ import java.util.Arrays;
@Configuration
public class FlowableConfig implements EngineConfigurationConfigurer<SpringProcessEngineConfiguration> {
@Resource
private FlowableJobEventListener flowableJobEventListener;
private FlowableEventDispatcher flowableEventDispatcher;
@Override
public void configure(SpringProcessEngineConfiguration engineConfiguration) {
@ -46,6 +45,6 @@ public class FlowableConfig implements EngineConfigurationConfigurer<SpringProce
// engineConfiguration.setEnableProcessDefinitionInfoCache(false);
// 添加事件监听器
engineConfiguration.setEventListeners(Arrays.asList(flowableJobEventListener));
engineConfiguration.setEventListeners(Arrays.asList(flowableEventDispatcher));
}
}

View File

@ -0,0 +1,24 @@
package com.qqchen.deploy.backend.workflow.dto;
import com.qqchen.deploy.backend.framework.dto.BaseDTO;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@Schema(description = "工作流实例启动入参")
public class WorkflowInstanceStartRequest {
private String processKey;
/**
* 业务标识
*/
private String businessKey;
}

View File

@ -0,0 +1,50 @@
package com.qqchen.deploy.backend.workflow.listener;
import com.qqchen.deploy.backend.workflow.listener.handler.IFlowableEventHandler;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import java.util.List;
@Slf4j
@Component
public class FlowableEventDispatcher implements FlowableEventListener {
@Resource
private List<IFlowableEventHandler> eventHandlers;
@Override
public void onEvent(FlowableEvent event) {
String eventType = event.getType().name();
log.info("Dispatching Flowable event: {}, event class: {}", eventType, event.getClass().getName());
for (IFlowableEventHandler handler : eventHandlers) {
if (handler.canHandle(eventType)) {
try {
handler.handle(event);
} catch (Exception e) {
log.error("Error handling event {} by handler {}: {}",
eventType, handler.getClass().getSimpleName(), e.getMessage(), e);
}
}
}
}
@Override
public boolean isFailOnException() {
return false;
}
@Override
public boolean isFireOnTransactionLifecycleEvent() {
return true;
}
@Override
public String getOnTransaction() {
return "committed";
}
}

View File

@ -4,22 +4,20 @@ import cn.hutool.core.date.DateUtil;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent;
import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
import org.flowable.common.engine.api.delegate.event.FlowableEventType;
import org.flowable.engine.HistoryService;
import org.flowable.engine.delegate.event.FlowableProcessEngineEvent;
import org.flowable.engine.history.HistoricProcessInstance;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
//@Component
@Slf4j
public class FlowableJobEventListener implements FlowableEventListener {
@ -35,20 +33,21 @@ public class FlowableJobEventListener implements FlowableEventListener {
FlowableEventType eventType = event.getType();
log.info("Received Flowable event: {}, event class: {}", eventType, event.getClass().getName());
// 只处理实体事件
if (!(event instanceof FlowableEngineEntityEvent entity)) {
return;
}
// if (!(event instanceof FlowableEngineEntityEvent entity)) {
// return;
// }
String processInstanceId = entity.getProcessInstanceId();
// String processInstanceId = entity.getProcessInstanceId();
if (isProcessLevelEvent(eventType.name())) {
FlowableProcessEngineEvent flowableEntity = (FlowableProcessEngineEvent) event;
WorkflowInstanceStatusEnums status = convertProcessLevelEventToStatus(eventType.name());
log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, processInstanceId);
log.info("Process level event received: {} -> {}, processInstanceId: {}", eventType, status, null);
HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.processInstanceId(flowableEntity.getProcessInstanceId())
.singleResult();
publisher.publishEvent(WorkflowInstanceStatusChangeEvent.builder()
.processInstanceId(processInstanceId)
.processInstanceId(flowableEntity.getProcessInstanceId())
.status(status)
.endTime(DateUtil.toLocalDateTime(historicProcessInstance.getEndTime()))
.build()
@ -56,35 +55,35 @@ public class FlowableJobEventListener implements FlowableEventListener {
}
if (isTaskInstanceEvent(eventType.name())) {
WorkflowNodeInstanceStatusEnums status = convertJobEventToStatus(eventType.name());
// 获取Job信息
String executionId = entity.getExecutionId();
org.flowable.job.api.Job job = (org.flowable.job.service.impl.persistence.entity.JobEntityImpl) entity.getEntity();
String activityId = job.getElementId();
String activityName = job.getElementName();
String activityType = job.getJobHandlerType();
LocalDateTime startTime = job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null;
LocalDateTime endTime = null;
if (status == WorkflowNodeInstanceStatusEnums.COMPLETED ||
status == WorkflowNodeInstanceStatusEnums.FAILED) {
endTime = LocalDateTime.now(); // 对于完成或失败状态使用当前时间作为结束时间
}
publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder()
.processInstanceId(processInstanceId)
.executionId(executionId)
.nodeId(activityId)
.nodeName(activityName)
.nodeType(activityType)
.status(status)
.startTime(startTime)
.endTime(endTime)
.build()
);
log.info("Job event received: {} -> {}, processInstanceId: {}, nodeId: {}, nodeName: {}",
eventType, status, processInstanceId, activityId, activityName);
// WorkflowNodeInstanceStatusEnums status = convertJobEventToStatus(eventType.name());
// // 获取Job信息
// String executionId = entity.getExecutionId();
// org.flowable.job.api.Job job = (org.flowable.job.service.impl.persistence.entity.JobEntityImpl) entity.getEntity();
// String activityId = job.getElementId();
// String activityName = job.getElementName();
// String activityType = job.getJobHandlerType();
//
// LocalDateTime startTime = job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null;
// LocalDateTime endTime = null;
// if (status == WorkflowNodeInstanceStatusEnums.COMPLETED ||
// status == WorkflowNodeInstanceStatusEnums.FAILED) {
// endTime = LocalDateTime.now(); // 对于完成或失败状态使用当前时间作为结束时间
// }
//
// publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder()
// .processInstanceId(processInstanceId)
// .executionId(executionId)
// .nodeId(activityId)
// .nodeName(activityName)
// .nodeType(activityType)
// .status(status)
// .startTime(startTime)
// .endTime(endTime)
// .build()
// );
//
// log.info("Job event received: {} -> {}, processInstanceId: {}, nodeId: {}, nodeName: {}",
// eventType, status, processInstanceId, activityId, activityName);
}
}

View File

@ -0,0 +1,61 @@
package com.qqchen.deploy.backend.workflow.listener.handler;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.delegate.event.FlowableActivityEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import java.time.LocalDateTime;
@Slf4j
@Component
public class ActivityEventHandler implements IFlowableEventHandler {
@Resource
private ApplicationEventPublisher publisher;
@Override
public boolean canHandle(String eventType) {
return eventType.startsWith("ACTIVITY_");
}
@Override
public void handle(FlowableEvent event) {
String eventType = event.getType().name();
log.info("Processing activity event: {}", eventType);
if (!(event instanceof FlowableActivityEvent activityEvent)) {
return;
}
WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType);
if (status != null) {
publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder()
.processInstanceId(activityEvent.getProcessInstanceId())
.executionId(activityEvent.getExecutionId())
.nodeId(activityEvent.getActivityId())
.nodeName(activityEvent.getActivityName())
.nodeType(activityEvent.getActivityType())
.status(status)
.startTime(status == WorkflowNodeInstanceStatusEnums.RUNNING ? LocalDateTime.now() : null)
.endTime(status.isFinalState() ? LocalDateTime.now() : null)
.build()
);
}
}
private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) {
return switch (eventType) {
case "ACTIVITY_STARTED" -> WorkflowNodeInstanceStatusEnums.RUNNING;
case "ACTIVITY_COMPLETED" -> WorkflowNodeInstanceStatusEnums.COMPLETED;
case "ACTIVITY_CANCELLED" -> WorkflowNodeInstanceStatusEnums.TERMINATED;
case "ACTIVITY_ERROR_RECEIVED" -> WorkflowNodeInstanceStatusEnums.FAILED;
default -> null;
};
}
}

View File

@ -0,0 +1,24 @@
package com.qqchen.deploy.backend.workflow.listener.handler;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
/**
* Flowable事件处理器接口
*/
public interface IFlowableEventHandler {
/**
* 判断是否可以处理该事件
*
* @param eventType 事件类型名称
* @return 是否可以处理
*/
boolean canHandle(String eventType);
/**
* 处理事件
*
* @param event Flowable事件
*/
void handle(FlowableEvent event);
}

View File

@ -0,0 +1,65 @@
package com.qqchen.deploy.backend.workflow.listener.handler;
import cn.hutool.core.date.DateUtil;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.job.service.impl.persistence.entity.JobEntityImpl;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import java.time.LocalDateTime;
@Slf4j
@Component
public class JobEventHandler implements IFlowableEventHandler {
@Resource
private ApplicationEventPublisher publisher;
@Override
public boolean canHandle(String eventType) {
return eventType.startsWith("JOB_EXECUTION_");
}
@Override
public void handle(FlowableEvent event) {
String eventType = event.getType().name();
log.info("Processing job event: {}", eventType);
if (!(event instanceof FlowableEngineEntityEvent entityEvent)) {
return;
}
JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity();
WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType);
if (status != null) {
publisher.publishEvent(WorkflowNodeInstanceStatusChangeEvent.builder()
.processInstanceId(job.getProcessInstanceId())
.executionId(job.getExecutionId())
.nodeId(job.getElementId())
.nodeName(job.getElementName())
.nodeType(job.getJobHandlerType())
.status(status)
.startTime(job.getCreateTime() != null ? DateUtil.toLocalDateTime(job.getCreateTime()) : null)
.endTime(status.isFinalState() ? LocalDateTime.now() : null)
.build()
);
}
}
private WorkflowNodeInstanceStatusEnums convertToNodeStatus(String eventType) {
return switch (eventType) {
case "JOB_EXECUTION_SUCCESS" -> WorkflowNodeInstanceStatusEnums.COMPLETED;
case "JOB_EXECUTION_START" -> WorkflowNodeInstanceStatusEnums.RUNNING;
case "JOB_EXECUTION_FAILURE" -> WorkflowNodeInstanceStatusEnums.FAILED;
case "JOB_EXECUTION_REJECTED" -> WorkflowNodeInstanceStatusEnums.FAILED;
case "JOB_EXECUTION_NOJOB_FOUND" -> WorkflowNodeInstanceStatusEnums.FAILED;
default -> null;
};
}
}

View File

@ -0,0 +1,69 @@
package com.qqchen.deploy.backend.workflow.listener.handler;
import cn.hutool.core.date.DateUtil;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
import org.flowable.engine.ProcessEngine;
import org.flowable.engine.delegate.event.FlowableProcessEngineEvent;
import org.flowable.engine.history.HistoricProcessInstance;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
@Slf4j
@Component
public class ProcessEventHandler implements IFlowableEventHandler {
@Resource
private ApplicationEventPublisher publisher;
@Resource
@Lazy
private ProcessEngine processEngine;
@Override
public boolean canHandle(String eventType) {
return eventType.startsWith("PROCESS_");
}
@Override
public void handle(FlowableEvent event) {
String eventType = event.getType().name();
log.info("Processing event: {}", eventType);
FlowableProcessEngineEvent processEvent = (FlowableProcessEngineEvent) event;
WorkflowInstanceStatusEnums status = convertToWorkflowStatus(eventType);
if (status != null) {
HistoricProcessInstance historicProcessInstance = processEngine.getHistoryService()
.createHistoricProcessInstanceQuery()
.processInstanceId(processEvent.getProcessInstanceId())
.singleResult();
publisher.publishEvent(WorkflowInstanceStatusChangeEvent.builder()
.processInstanceId(processEvent.getProcessInstanceId())
.status(status)
.endTime(historicProcessInstance != null && historicProcessInstance.getEndTime() != null ?
DateUtil.toLocalDateTime(historicProcessInstance.getEndTime()) : null)
.build()
);
}
}
private WorkflowInstanceStatusEnums convertToWorkflowStatus(String eventType) {
return switch (eventType) {
case "PROCESS_CREATED" -> WorkflowInstanceStatusEnums.CREATED;
case "PROCESS_STARTED" -> WorkflowInstanceStatusEnums.RUNNING;
case "PROCESS_COMPLETED" -> WorkflowInstanceStatusEnums.COMPLETED;
case "PROCESS_COMPLETED_WITH_ERROR_END_EVENT" -> WorkflowInstanceStatusEnums.FAILED;
case "PROCESS_CANCELLED" -> WorkflowInstanceStatusEnums.TERMINATED;
case "PROCESS_SUSPENDED" -> WorkflowInstanceStatusEnums.SUSPENDED;
case "PROCESS_RESUMED" -> WorkflowInstanceStatusEnums.RUNNING;
default -> null;
};
}
}

View File

@ -3,6 +3,7 @@ package com.qqchen.deploy.backend.workflow.service;
import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
@ -56,5 +57,5 @@ public interface IWorkflowInstanceService extends IBaseService<WorkflowInstance,
*/
void completeInstance(String processInstanceId, Map<String, Object> variables);
WorkflowInstanceDTO startWorkflow(String processKey, String businessKey);
WorkflowInstanceDTO startWorkflow(WorkflowInstanceStartRequest request);
}

View File

@ -6,6 +6,7 @@ import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.workflow.converter.WorkflowInstanceConverter;
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceStartRequest;
import com.qqchen.deploy.backend.workflow.entity.WorkflowDefinition;
import com.qqchen.deploy.backend.workflow.entity.WorkflowInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowInstanceStatusEnums;
@ -148,16 +149,17 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl<WorkflowInstanc
@Override
@Transactional
public WorkflowInstanceDTO startWorkflow(String processKey, String businessKey) {
public WorkflowInstanceDTO startWorkflow(WorkflowInstanceStartRequest request) {
try {
WorkflowDefinition workflowDefinition = workflowDefinitionRepository.findByKey(processKey).orElseThrow(() -> new RuntimeException("Workflow definition process key not found: " + processKey));
WorkflowDefinition workflowDefinition = workflowDefinitionRepository.findByKey(request.getProcessKey())
.orElseThrow(() -> new RuntimeException("Workflow definition process key not found: " + request.getProcessKey()));
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
.processDefinitionKey(processKey)
.businessKey(businessKey)
.processDefinitionKey(request.getProcessKey())
.businessKey(request.getBusinessKey())
.startAsync(); // 异步启动会自动执行 shell 任务
return createWorkflowInstance(workflowDefinition.getId(), businessKey, processInstance);
return createWorkflowInstance(workflowDefinition.getId(), request.getBusinessKey(), processInstance);
} catch (Exception e) {
log.error("Failed to create workflow: {}", processKey, e);
log.error("Failed to create workflow: {}", request.getProcessKey(), e);
throw new RuntimeException("Failed to create workflow", e);
}
}