解决死信队列的问题。

This commit is contained in:
dengqichen 2024-12-10 20:00:00 +08:00
parent 76ffdb89ad
commit 412ead95a0
5 changed files with 155 additions and 42 deletions

View File

@ -1036,4 +1036,19 @@ workflow_log工作流日志表
死信队列的问题:
需要再异常里直接移除:
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
const eventSource = new EventSource(`/api/v1/shell-logs/${processInstanceId}`);
eventSource.addEventListener('STDOUT', (event) => {
console.log('Output:', event.data);
});
eventSource.addEventListener('STDERR', (event) => {
console.error('Error:', event.data);
});
// 当流程结束时关闭连接
eventSource.onerror = () => {
eventSource.close();
};

View File

@ -0,0 +1,71 @@
package com.qqchen.deploy.backend.workflow.controller;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@RestController
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RequestMapping("/api/v1/shell-logs")
public class ShellLogController {
// 存储每个进程的SSE发射器
private static final Map<String, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@GetMapping(value = "/{processInstanceId}", produces = "text/event-stream")
public SseEmitter streamLogs(@PathVariable String processInstanceId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
// 设置超时回调
emitter.onTimeout(() -> {
EMITTERS.remove(processInstanceId);
log.debug("SSE connection timeout for process: {}", processInstanceId);
});
emitter.onCompletion(() -> {
EMITTERS.remove(processInstanceId);
log.debug("SSE connection completed for process: {}", processInstanceId);
});
emitter.onError(ex -> {
EMITTERS.remove(processInstanceId);
log.error("SSE connection error for process: {}", processInstanceId, ex);
});
EMITTERS.put(processInstanceId, emitter);
// 发送初始连接成功消息
try {
emitter.send(SseEmitter.event()
.name("STDOUT")
.data("Connected successfully to process " + processInstanceId));
} catch (IOException e) {
log.error("Error sending initial message", e);
}
return emitter;
}
@EventListener
public void handleShellLogEvent(ShellLogEvent event) {
SseEmitter emitter = EMITTERS.get(event.getProcessInstanceId());
if (emitter != null) {
try {
// 发送日志事件到客户端
emitter.send(SseEmitter.event()
.name(event.getLogType().toString())
.data(event.getLogMessage()));
} catch (IOException e) {
log.error("Error sending event for process: {}", event.getProcessInstanceId(), e);
EMITTERS.remove(event.getProcessInstanceId());
emitter.completeWithError(e);
}
}
}
}

View File

@ -1,25 +1,21 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
import com.qqchen.deploy.backend.workflow.enums.LogType;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import jakarta.annotation.Resource;
import org.springframework.util.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.*;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
/**
* Shell任务委托执行器
@ -28,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class ShellTaskDelegate implements JavaDelegate {
@Resource
private ApplicationEventPublisher eventPublisher;
@Resource
private RuntimeService runtimeService;
@ -42,6 +41,33 @@ public class ShellTaskDelegate implements JavaDelegate {
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
private void processInputStream(InputStream inputStream, String processInstanceId, LogType logType) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
// 发布日志事件
eventPublisher.publishEvent(new ShellLogEvent(processInstanceId, line, logType));
// 同时保存到StringBuilder中
if (logType == LogType.STDOUT) {
StringBuilder output = outputMap.get(processInstanceId);
synchronized (output) {
output.append(line).append("\n");
}
log.info("Shell output: {}", line);
} else {
StringBuilder error = errorMap.get(processInstanceId);
synchronized (error) {
error.append(line).append("\n");
}
log.error("Shell error: {}", line);
}
}
} catch (IOException e) {
log.error("Error reading process output", e);
}
}
@Override
public void execute(DelegateExecution execution) {
// 从字段注入中获取值
@ -122,36 +148,12 @@ public class ShellTaskDelegate implements JavaDelegate {
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 处理标准输出
Future<?> outputFuture = executorService.submit(() -> {
String line;
try {
while ((line = reader.readLine()) != null) {
StringBuilder output = outputMap.get(executionId);
synchronized (output) {
output.append(line).append("\n");
}
log.info("Shell output: {}", line);
}
} catch (Exception e) {
log.error("Error reading process output", e);
}
});
Future<?> outputFuture = executorService.submit(() ->
processInputStream(process.getInputStream(), executionId, LogType.STDOUT));
// 处理错误输出
Future<?> errorFuture = executorService.submit(() -> {
String line;
try {
while ((line = errorReader.readLine()) != null) {
StringBuilder error = errorMap.get(executionId);
synchronized (error) {
error.append(line).append("\n");
}
log.error("Shell error: {}", line);
}
} catch (IOException e) {
log.error("Error reading process error", e);
}
});
Future<?> errorFuture = executorService.submit(() ->
processInputStream(process.getErrorStream(), executionId, LogType.STDERR));
// 等待进程完成
int exitCode = process.waitFor();

View File

@ -0,0 +1,9 @@
package com.qqchen.deploy.backend.workflow.enums;
/**
* 日志类型枚举
*/
public enum LogType {
STDOUT, // 标准输出
STDERR // 错误输出
}

View File

@ -0,0 +1,16 @@
package com.qqchen.deploy.backend.workflow.event;
import com.qqchen.deploy.backend.workflow.enums.LogType;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Shell日志事件
*/
@Getter
@AllArgsConstructor
public class ShellLogEvent {
private String processInstanceId;
private String logMessage;
private LogType logType;
}