反序列化问题。

This commit is contained in:
dengqichen 2024-12-13 17:35:53 +08:00
parent e2d06ece26
commit 82e613d237
4 changed files with 223 additions and 0 deletions

View File

@ -55,6 +55,12 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
return Response.success(workflowDefinitionService.saveWorkflowDesign(dto)); return Response.success(workflowDefinitionService.saveWorkflowDesign(dto));
} }
@PostMapping("/{workflowDefinitionId}/published")
public Response<Void> publishedWorkflowDesign(@PathVariable Long workflowDefinitionId){
workflowDefinitionService.publishedWorkflowDesign(workflowDefinitionId);
return Response.success();
}
// @Operation(summary = "部署工作流") // @Operation(summary = "部署工作流")
// @PostMapping("/deploy") // @PostMapping("/deploy")
// public Response<WorkflowDefinitionDTO> deployWorkflow(@RequestBody WorkflowDefinitionDTO dto) { // public Response<WorkflowDefinitionDTO> deployWorkflow(@RequestBody WorkflowDefinitionDTO dto) {

View File

@ -0,0 +1,205 @@
package com.qqchen.deploy.backend.workflow.delegate;
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.ManagementService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.common.engine.api.delegate.Expression;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.Map;
import java.util.concurrent.*;
/**
* Shell任务委托执行器
*/
@Slf4j
@Component
public class ShellTaskDelegate implements JavaDelegate {
@Resource
private ApplicationEventPublisher eventPublisher;
@Resource
private RuntimeService runtimeService;
@Resource
private ManagementService managementService;
private Expression script;
private Expression workDir;
private Expression env;
// 用于存储实时输出的Map
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
private void processInputStream(InputStream inputStream, String processInstanceId, NodeLogTypeEnums logType) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
// 发布日志事件
eventPublisher.publishEvent(new ShellLogEvent(processInstanceId, line, logType));
// 同时保存到StringBuilder中
if (logType == NodeLogTypeEnums.STDOUT) {
StringBuilder output = outputMap.get(processInstanceId);
synchronized (output) {
output.append(line).append("\n");
}
log.info("Shell output: {}", line);
} else {
StringBuilder error = errorMap.get(processInstanceId);
synchronized (error) {
error.append(line).append("\n");
}
log.error("Shell error: {}", line);
}
}
} catch (IOException e) {
log.error("Error reading process output", e);
}
}
@Override
public void execute(DelegateExecution execution) {
// 从字段注入中获取值
String scriptValue = script != null ? script.getValue(execution).toString() : null;
String workDirValue = workDir != null ? workDir.getValue(execution).toString() : null;
@SuppressWarnings("unchecked")
Map<String, String> envValue = env != null ? (Map<String, String>) env.getValue(execution) : null;
// 如果流程变量中有值优先使用流程变量
if (execution.hasVariable("script")) {
scriptValue = (String) execution.getVariable("script");
}
if (execution.hasVariable("workDir")) {
workDirValue = (String) execution.getVariable("workDir");
}
if (execution.hasVariable("env")) {
@SuppressWarnings("unchecked")
Map<String, String> envFromVar = (Map<String, String>) execution.getVariable("env");
envValue = envFromVar;
}
if (scriptValue == null) {
handleFailure(execution, "Script is required but not provided");
return;
}
try {
// 使用processInstanceId而不是executionId
String processInstanceId = execution.getProcessInstanceId();
outputMap.put(processInstanceId, new StringBuilder());
errorMap.put(processInstanceId, new StringBuilder());
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder();
// 根据操作系统选择合适的shell
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
// Windows系统使用cmd
processBuilder.command("cmd", "/c", scriptValue);
} else {
// Unix-like系统使用bash
processBuilder.command("bash", "-c", scriptValue);
}
// 设置工作目录
if (StringUtils.hasText(workDirValue)) {
// Windows系统路径处理
if (os.contains("win")) {
// 确保使用Windows风格的路径分隔符
workDirValue = workDirValue.replace("/", "\\");
// 如果路径以\开头去掉第一个\
if (workDirValue.startsWith("\\")) {
workDirValue = workDirValue.substring(1);
}
}
File workDirFile = new File(workDirValue);
if (!workDirFile.exists()) {
workDirFile.mkdirs();
}
processBuilder.directory(workDirFile);
}
// 设置环境变量
if (envValue != null) {
processBuilder.environment().putAll(envValue);
}
// 执行命令
log.info("Executing shell script: {}", scriptValue);
Process process = processBuilder.start();
// 使用BufferedReader实时读取输出
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
// 创建线程池处理输出
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 处理标准输出
Future<?> outputFuture = executorService.submit(() ->
processInputStream(process.getInputStream(), processInstanceId, NodeLogTypeEnums.STDOUT));
// 处理错误输出
Future<?> errorFuture = executorService.submit(() ->
processInputStream(process.getErrorStream(), processInstanceId, NodeLogTypeEnums.STDERR));
// 等待进程完成
int exitCode = process.waitFor();
// 等待输出处理完成
outputFuture.get(5, TimeUnit.SECONDS);
errorFuture.get(5, TimeUnit.SECONDS);
// 关闭线程池
executorService.shutdown();
// 设置最终结果
StringBuilder finalOutput = outputMap.get(processInstanceId);
StringBuilder finalError = errorMap.get(processInstanceId);
execution.setVariable("shellOutput", finalOutput.toString());
execution.setVariable("shellError", finalError.toString());
execution.setVariable("shellExitCode", exitCode);
// 清理缓存
outputMap.remove(processInstanceId);
errorMap.remove(processInstanceId);
if (exitCode != 0) {
log.error("Shell script execution failed with exit code: {}", exitCode);
log.error("Error output: {}", finalError);
handleFailure(execution, "Shell script execution failed with exit code: " + exitCode);
return;
}
log.info("Shell script executed successfully");
log.debug("Script output: {}", finalOutput);
} catch (Exception e) {
log.error("Shell script execution failed", e);
handleFailure(execution, "Shell script execution failed: " + e.getMessage());
}
}
private void handleFailure(DelegateExecution execution, String errorMessage) {
String processInstanceId = execution.getProcessInstanceId();
try {
// 直接终止流程实例
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
} catch (Exception e) {
log.error("Error while handling shell task failure for process instance: {}", processInstanceId, e);
}
}
}

View File

@ -61,4 +61,6 @@ public interface IWorkflowDefinitionService extends IBaseService<WorkflowDefinit
void disable(Long id); void disable(Long id);
void enable(Long id); void enable(Long id);
void publishedWorkflowDesign(Long workflowDefinitionId);
} }

View File

@ -363,4 +363,14 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
.key(definition.getKey()).name(definition.getName()).deploy(); .key(definition.getKey()).name(definition.getName()).deploy();
} }
@Override
@Transactional(rollbackFor = Exception.class)
public void publishedWorkflowDesign(Long workflowDefinitionId) {
WorkflowDefinition definition = workflowDefinitionRepository.findById(workflowDefinitionId)
.orElseThrow(() -> new RuntimeException("Workflow definition not found: " + workflowDefinitionId));
definition.setStatus(WorkflowStatusEnums.PUBLISHED);
workflowDefinitionRepository.save(definition);
log.info("Successfully published workflow definition: {}", workflowDefinitionId);
}
} }