解决死信队列的问题。

This commit is contained in:
dengqichen 2024-12-10 18:06:47 +08:00
parent 9312c3a2b3
commit 76ffdb89ad
6 changed files with 188 additions and 5 deletions

View File

@ -910,4 +910,130 @@ workflow_log工作流日志表
提供了更丰富的元数据和状态信息 提供了更丰富的元数据和状态信息
支持更细粒度的日志记录 支持更细粒度的日志记录
可以存储自定义的业务数据 可以存储自定义的业务数据
这样的设计让我们可以在使用Flowable强大的工作流引擎的同时也能满足特定的业务需求。 这样的设计让我们可以在使用Flowable强大的工作流引擎的同时也能满足特定的业务需求。
{
"name": "Simple Shell Workflow6",
"key": "simple_shell_workflow",
"description": "A simple workflow with shell task",
"graphJson": {
"cells": [
{
"id": "start",
"shape": "start",
"data": {
"label": "开始"
},
"position": {
"x": 100,
"y": 100
}
},
{
"id": "shell",
"shape": "serviceTask",
"data": {
"label": "Shell脚本",
"serviceTask": {
"type": "shell",
"implementation": "${shellTaskDelegate}",
"fields": {
"script": "timeout 10000000",
"workDir": "/tmp",
"env": {
"TEST_VAR": "test_value"
}
}
}
},
"position": {
"x": 300,
"y": 100
}
},
{
"id": "shellErrorBoundary",
"shape": "boundaryEvent",
"data": {
"label": "Shell错误",
"boundaryEvent": {
"type": "error",
"attachedToRef": "shell",
"errorRef": "SHELL_EXECUTION_ERROR"
}
},
"position": {
"x": 300,
"y": 160
}
},
{
"id": "errorHandler",
"shape": "serviceTask",
"data": {
"label": "错误处理",
"serviceTask": {
"type": "service",
"implementation": "${errorHandlerDelegate}"
}
},
"position": {
"x": 300,
"y": 220
}
},
{
"id": "end",
"shape": "end",
"data": {
"label": "结束"
},
"position": {
"x": 500,
"y": 100
}
},
{
"id": "flow1",
"shape": "edge",
"source": "start",
"target": "shell",
"data": {
"label": "流转到Shell"
}
},
{
"id": "flow2",
"shape": "edge",
"source": "shell",
"target": "end",
"data": {
"label": "完成"
}
},
{
"id": "flow_error",
"shape": "edge",
"source": "shellErrorBoundary",
"target": "errorHandler",
"data": {
"label": "错误处理"
}
},
{
"id": "flow_error_end",
"shape": "edge",
"source": "errorHandler",
"target": "end",
"data": {
"label": "处理完成"
}
}
]
}
}
死信队列的问题:
需要再异常里直接移除:
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);

View File

@ -70,7 +70,7 @@
<artifactId>mysql-connector-j</artifactId> <artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<!-- Hibernate Types --> <!-- Hibernate Types -->
<dependency> <dependency>
<groupId>com.vladmihalcea</groupId> <groupId>com.vladmihalcea</groupId>
@ -220,6 +220,11 @@
<artifactId>flowable-bpmn-layout</artifactId> <artifactId>flowable-bpmn-layout</artifactId>
<version>${flowable.version}</version> <version>${flowable.version}</version>
</dependency> </dependency>
<!-- <dependency>-->
<!-- <groupId>org.flowable</groupId>-->
<!-- <artifactId>flowable-spring-boot-starter-rest</artifactId>-->
<!-- <version>${flowable.version}</version>-->
<!-- </dependency>-->
</dependencies> </dependencies>
<build> <build>

View File

@ -16,5 +16,25 @@ public class FlowableConfig implements EngineConfigurationConfigurer<SpringProce
engineConfiguration.setLabelFontName("宋体"); engineConfiguration.setLabelFontName("宋体");
engineConfiguration.setAnnotationFontName("宋体"); engineConfiguration.setAnnotationFontName("宋体");
engineConfiguration.setDatabaseSchemaUpdate("true"); engineConfiguration.setDatabaseSchemaUpdate("true");
// 配置异步执行器
engineConfiguration.setAsyncExecutorActivate(true);
// 设置重试次数为0禁用重试
// engineConfiguration.setAsyncExecutorNumberOfRetries(0);
// 设置失败等待时间为最小值
// engineConfiguration.setAsyncFailedJobWaitTime(1);
// // 配置异步执行器参数
// engineConfiguration.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(1000);
// engineConfiguration.setAsyncExecutorDefaultTimerJobAcquireWaitTime(1000);
// engineConfiguration.setAsyncExecutorDefaultQueueSizeFullWaitTime(1000);
// engineConfiguration.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(1);
// engineConfiguration.setAsyncExecutorMaxTimerJobsPerAcquisition(1);
//
// // 禁用过期任务重置
// engineConfiguration.setAsyncExecutorResetExpiredJobsInterval(0);
//
// // 禁用定义缓存
// engineConfiguration.setEnableProcessDefinitionInfoCache(false);
} }
} }

View File

@ -5,6 +5,7 @@ import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate; import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression; import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.RuntimeService; import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@ -29,6 +30,9 @@ public class ShellTaskDelegate implements JavaDelegate {
@Resource @Resource
private RuntimeService runtimeService; private RuntimeService runtimeService;
@Resource
private ManagementService managementService;
private Expression script; private Expression script;
private Expression workDir; private Expression workDir;
@ -60,7 +64,8 @@ public class ShellTaskDelegate implements JavaDelegate {
} }
if (scriptValue == null) { if (scriptValue == null) {
throw new RuntimeException("Script is required but not provided"); handleFailure(execution, "Script is required but not provided");
return;
} }
try { try {
@ -173,7 +178,8 @@ public class ShellTaskDelegate implements JavaDelegate {
if (exitCode != 0) { if (exitCode != 0) {
log.error("Shell script execution failed with exit code: {}", exitCode); log.error("Shell script execution failed with exit code: {}", exitCode);
log.error("Error output: {}", finalError); log.error("Error output: {}", finalError);
throw new RuntimeException("Shell script execution failed with exit code: " + exitCode); handleFailure(execution, "Shell script execution failed with exit code: " + exitCode);
return;
} }
log.info("Shell script executed successfully"); log.info("Shell script executed successfully");
@ -181,7 +187,17 @@ public class ShellTaskDelegate implements JavaDelegate {
} catch (Exception e) { } catch (Exception e) {
log.error("Shell script execution failed", e); log.error("Shell script execution failed", e);
throw new RuntimeException("Shell script execution failed: " + e.getMessage(), e); handleFailure(execution, "Shell script execution failed: " + e.getMessage());
}
}
private void handleFailure(DelegateExecution execution, String errorMessage) {
String processInstanceId = execution.getProcessInstanceId();
try {
// 直接终止流程实例
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
} catch (Exception e) {
log.error("Error while handling shell task failure for process instance: {}", processInstanceId, e);
} }
} }
} }

View File

@ -104,6 +104,7 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
.businessKey(businessKey) .businessKey(businessKey)
.variables(variables) .variables(variables)
.startAsync(); // 异步启动会自动执行 shell 任务 .startAsync(); // 异步启动会自动执行 shell 任务
// .start();
// 2. 返回实例信息 // 2. 返回实例信息
WorkflowInstanceCreateDTO dto = new WorkflowInstanceCreateDTO(); WorkflowInstanceCreateDTO dto = new WorkflowInstanceCreateDTO();

View File

@ -30,8 +30,23 @@ spring:
always-use-message-format: false always-use-message-format: false
use-code-as-default-message: true use-code-as-default-message: true
cache-duration: 3600 cache-duration: 3600
flowable:
database-schema-update: true
# id-generator: org.flowable.common.engine.impl.db.DbIdGenerator
#flowable:
# async-executor:
# default-async-job-acquire-wait-time: 60000
# default-timer-job-acquire-wait-time: 60000
# max-async-jobs-due-per-acquisition: 1
# retry-wait-time-in-millis: 60000
# number-of-retries: 1
# # 自动清理死信任务
# move-to-dead-letter-on-failure: true
# # 死信任务的保留时间(毫秒)
# dead-letter-timeout: 1000
logging: logging:
level: level:
springframework: DEBUG
org.springframework.web: DEBUG org.springframework.web: DEBUG
org.springframework.context.i18n: DEBUG org.springframework.context.i18n: DEBUG
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping: TRACE org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping: TRACE