反序列化问题。

This commit is contained in:
dengqichen 2024-12-16 15:27:03 +08:00
parent 4559f9f9b5
commit 88b5588531
12 changed files with 71 additions and 37 deletions

View File

@ -3,11 +3,13 @@ package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent; import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums; import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService; import org.flowable.engine.ManagementService;
import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate; import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression; import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.job.api.Job;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -176,7 +178,14 @@ public class ShellTaskDelegate implements JavaDelegate {
} catch (Exception e) { } catch (Exception e) {
log.error("Shell脚本执行失败", e); log.error("Shell脚本执行失败", e);
handleFailure(execution, "Shell脚本执行失败: " + e.getMessage()); // Job job = managementService.createJobQuery()
// .processInstanceId(execution.getProcessInstanceId())
// .singleResult();
// if (job != null) {
// // 设置重试次数为 0这样就不会进入死信队列
// managementService.setJobRetries(job.getId(), 0);
// }
throw new FlowableException("脚本执行失败。");
} }
} }
@ -210,11 +219,23 @@ public class ShellTaskDelegate implements JavaDelegate {
private void handleFailure(DelegateExecution execution, String errorMessage) { private void handleFailure(DelegateExecution execution, String errorMessage) {
String processInstanceId = execution.getProcessInstanceId(); String processInstanceId = execution.getProcessInstanceId();
try {
// 获取当前 Job
Job job = managementService.createJobQuery()
.processInstanceId(execution.getProcessInstanceId())
.singleResult();
if (job != null) {
// 设置重试次数为 0这样就不会进入死信队列
managementService.setJobRetries(job.getId(), 0);
}
throw new FlowableException(errorMessage);
// try {
// 直接终止流程实例 // 直接终止流程实例
runtimeService.deleteProcessInstance(processInstanceId, errorMessage); // runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
} catch (Exception e) {
log.error("处理Shell任务失败时出现错误流程实例: {}", processInstanceId, e); // } catch (Exception e) {
} // log.error("处理Shell任务失败时出现错误流程实例: {}", processInstanceId, e);
// }
} }
} }

View File

@ -57,7 +57,7 @@ public class WorkflowInstance extends Entity<Long> {
/** /**
* 开始时间 * 开始时间
*/ */
@Column(name = "start_time", nullable = false) @Column(name = "start_time")
private LocalDateTime startTime; private LocalDateTime startTime;
/** /**

View File

@ -64,7 +64,7 @@ public class WorkflowNodeInstance extends Entity<Long> {
/** /**
* 开始时间 * 开始时间
*/ */
@Column(name = "start_time", nullable = false) @Column(name = "start_time")
private LocalDateTime startTime; private LocalDateTime startTime;
/** /**

View File

@ -19,7 +19,7 @@ public class WorkflowInstanceStatusChangeListener {
@EventListener @EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW) @Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleWorkflowStatusChange(WorkflowInstanceStatusChangeEvent event) { public void handleWorkflowStatusChange(WorkflowInstanceStatusChangeEvent event) {
log.info("Handling workflow status change event: {}", event); // log.info("Handling workflow status change event: {}", event);
workflowInstanceService.updateInstanceStatus(event.getProcessInstanceId(), event.getStatus(), event.getEndTime()); workflowInstanceService.updateInstanceStatus(event.getProcessInstanceId(), event.getStatus(), event.getEndTime());
} }
} }

View File

@ -19,7 +19,7 @@ public class WorkflowNodeInstanceStatusChangeListener {
@EventListener @EventListener
@Transactional(propagation = Propagation.REQUIRES_NEW) @Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleWorkflowStatusChange(WorkflowNodeInstanceStatusChangeEvent event) { public void handleWorkflowStatusChange(WorkflowNodeInstanceStatusChangeEvent event) {
log.info("Handling workflow node instance status change event: {}", event); // log.info("Handling workflow node instance status change event: {}", event);
workflowNodeInstanceService.saveWorkflowNodeInstance(event); workflowNodeInstanceService.saveOrUpdateWorkflowNodeInstance(event);
} }
} }

View File

@ -34,6 +34,8 @@ public class JobEventHandler implements IFlowableEventHandler {
return; return;
} }
log.info("Processing job event: {}, jobType: {}", eventType, entityEvent.getType());
JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity(); JobEntityImpl job = (JobEntityImpl) entityEvent.getEntity();
WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType); WorkflowNodeInstanceStatusEnums status = convertToNodeStatus(eventType);

View File

@ -12,4 +12,6 @@ public interface IWorkflowNodeInstanceRepository extends IBaseRepository<Workflo
Optional<WorkflowNodeInstance> findByProcessInstanceIdAndExecutionId(String processInstanceId, String executionId); Optional<WorkflowNodeInstance> findByProcessInstanceIdAndExecutionId(String processInstanceId, String executionId);
List<WorkflowNodeInstance> findByProcessInstanceId(String processInstanceId); List<WorkflowNodeInstance> findByProcessInstanceId(String processInstanceId);
WorkflowNodeInstance findByNodeId(String nodeId);
} }

View File

@ -4,9 +4,7 @@ import com.qqchen.deploy.backend.framework.service.IBaseService;
import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO; import com.qqchen.deploy.backend.workflow.dto.WorkflowNodeInstanceDTO;
import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance; import com.qqchen.deploy.backend.workflow.entity.WorkflowNodeInstance;
import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums; import com.qqchen.deploy.backend.workflow.enums.WorkflowNodeInstanceStatusEnums;
import com.qqchen.deploy.backend.workflow.event.WorkflowInstanceStatusChangeEvent;
import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent; import com.qqchen.deploy.backend.workflow.event.WorkflowNodeInstanceStatusChangeEvent;
import com.qqchen.deploy.backend.workflow.query.WorkflowNodeInstanceQuery;
import java.util.List; import java.util.List;
@ -29,5 +27,5 @@ public interface IWorkflowNodeInstanceService extends IBaseService<WorkflowNodeI
*/ */
List<WorkflowNodeInstanceDTO> getNodesByProcessInstanceId(String processInstanceId); List<WorkflowNodeInstanceDTO> getNodesByProcessInstanceId(String processInstanceId);
void saveWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event); void saveOrUpdateWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event);
} }

View File

@ -63,12 +63,19 @@ public class WorkflowNodeInstanceServiceImpl extends BaseServiceImpl<WorkflowNod
@Override @Override
@Transactional @Transactional
public void saveWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event) { public void saveOrUpdateWorkflowNodeInstance(WorkflowNodeInstanceStatusChangeEvent event) {
WorkflowNodeInstance nodeInstance = workflowNodeInstanceConverter.eventToEntity(event); WorkflowNodeInstance nodeInstance = workflowNodeInstanceConverter.eventToEntity(event);
WorkflowInstance workflowInstance = workflowInstanceRepository.findByProcessInstanceId(nodeInstance.getProcessInstanceId()) WorkflowInstance workflowInstance = workflowInstanceRepository.findByProcessInstanceId(nodeInstance.getProcessInstanceId())
.orElseThrow(() -> new RuntimeException("Node instance not found for processInstanceId: " + nodeInstance.getProcessInstanceId())); .orElseThrow(() -> new RuntimeException("Node instance not found for processInstanceId: " + nodeInstance.getProcessInstanceId()));
WorkflowNodeInstance workflowNodeInstance = workflowNodeInstanceRepository.findByNodeId(event.getNodeId());
if (workflowNodeInstance == null) {
nodeInstance.setWorkflowInstanceId(workflowInstance.getId()); nodeInstance.setWorkflowInstanceId(workflowInstance.getId());
nodeInstance.setWorkflowDefinitionId(workflowInstance.getWorkflowDefinitionId()); nodeInstance.setWorkflowDefinitionId(workflowInstance.getWorkflowDefinitionId());
super.repository.save(nodeInstance); super.repository.save(nodeInstance);
return;
}
workflowNodeInstance.setEndTime(event.getEndTime());
workflowNodeInstance.setStatus(event.getStatus());
super.repository.save(workflowNodeInstance);
} }
} }

View File

@ -119,25 +119,29 @@ public class BpmnConverter {
serviceTask.setImplementation(delegate); serviceTask.setImplementation(delegate);
serviceTask.setAsynchronous(true); // 设置为异步执行 serviceTask.setAsynchronous(true); // 设置为异步执行
// 添加字段注入和扩展属性 // 设置失败时不重试
List<FieldExtension> fieldExtensions = new ArrayList<>(); ExtensionElement failedJobRetryTimeCycle = new ExtensionElement();
failedJobRetryTimeCycle.setName("failedJobRetryTimeCycle");
failedJobRetryTimeCycle.setNamespace("http://flowable.org/bpmn");
failedJobRetryTimeCycle.setNamespacePrefix("flowable");
failedJobRetryTimeCycle.setElementText("R0/PT1H"); // 设置为0次重试
// 添加错误处理配置
ExtensionElement errorHandling = new ExtensionElement();
errorHandling.setName("failOnError");
errorHandling.setNamespace("http://flowable.org/bpmn");
errorHandling.setNamespacePrefix("flowable");
errorHandling.setElementText("true");
Map<String, List<ExtensionElement>> extensionElements = new HashMap<>(); Map<String, List<ExtensionElement>> extensionElements = new HashMap<>();
List<ExtensionElement> retryElements = new ArrayList<>();
retryElements.add(failedJobRetryTimeCycle);
extensionElements.put("failedJobRetryTimeCycle", retryElements);
// 遍历所有配置项 List<ExtensionElement> errorElements = new ArrayList<>();
node.getConfig().forEach((key, value) -> { errorElements.add(errorHandling);
if (value != null && !"delegate".equals(key)) { extensionElements.put("failOnError", errorElements);
// 添加为字段注入
FieldExtension fieldExtension = new FieldExtension();
fieldExtension.setFieldName(key);
fieldExtension.setStringValue(String.valueOf(value));
fieldExtensions.add(fieldExtension);
// 同时也添加为扩展属性用于兼容性和可视化
addExtensionElement(extensionElements, key, String.valueOf(value));
}
});
serviceTask.setFieldExtensions(fieldExtensions);
serviceTask.setExtensionElements(extensionElements); serviceTask.setExtensionElements(extensionElements);
} }
// 后续可以添加其他类型的节点配置 // 后续可以添加其他类型的节点配置

View File

@ -475,7 +475,7 @@ CREATE TABLE workflow_instance
business_key VARCHAR(64) NULL COMMENT '业务标识', business_key VARCHAR(64) NULL COMMENT '业务标识',
status VARCHAR(32) NOT NULL COMMENT '实例状态', status VARCHAR(32) NOT NULL COMMENT '实例状态',
variables TEXT NULL COMMENT '流程变量(JSON)', variables TEXT NULL COMMENT '流程变量(JSON)',
start_time DATETIME(6) NOT NULL COMMENT '开始时间', start_time DATETIME(6) NULL COMMENT '开始时间',
end_time DATETIME(6) NULL COMMENT '结束时间' end_time DATETIME(6) NULL COMMENT '结束时间'
-- CONSTRAINT FK_workflow_instance_definition FOREIGN KEY (process_definition_id) REFERENCES workflow_definition(id) -- CONSTRAINT FK_workflow_instance_definition FOREIGN KEY (process_definition_id) REFERENCES workflow_definition(id)
@ -500,7 +500,7 @@ CREATE TABLE workflow_node_instance
node_name VARCHAR(100) NOT NULL COMMENT '节点名称', node_name VARCHAR(100) NOT NULL COMMENT '节点名称',
node_type VARCHAR(32) NOT NULL COMMENT '节点类型', node_type VARCHAR(32) NOT NULL COMMENT '节点类型',
status VARCHAR(32) NOT NULL COMMENT '节点状态', status VARCHAR(32) NOT NULL COMMENT '节点状态',
start_time DATETIME(6) NOT NULL COMMENT '开始时间', start_time DATETIME(6) NULL COMMENT '开始时间',
end_time DATETIME(6) NULL COMMENT '结束时间', end_time DATETIME(6) NULL COMMENT '结束时间',
variables TEXT NULL COMMENT '节点变量(JSON)', variables TEXT NULL COMMENT '节点变量(JSON)',
error_message TEXT NULL COMMENT '错误信息', error_message TEXT NULL COMMENT '错误信息',