From 82e613d2376b37ef738d77e97559dbd68d1d2834 Mon Sep 17 00:00:00 2001 From: dengqichen Date: Fri, 13 Dec 2024 17:35:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/WorkflowDefinitionApiController.java | 6 + .../workflow/delegate/ShellTaskDelegate.java | 205 ++++++++++++++++++ .../service/IWorkflowDefinitionService.java | 2 + .../impl/WorkflowDefinitionServiceImpl.java | 10 + 4 files changed, 223 insertions(+) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java index 3a0294b0..5c13570b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/api/WorkflowDefinitionApiController.java @@ -55,6 +55,12 @@ public class WorkflowDefinitionApiController extends BaseController publishedWorkflowDesign(@PathVariable Long workflowDefinitionId){ + workflowDefinitionService.publishedWorkflowDesign(workflowDefinitionId); + return Response.success(); + } + // @Operation(summary = "部署工作流") // @PostMapping("/deploy") // public Response deployWorkflow(@RequestBody WorkflowDefinitionDTO dto) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java new file mode 100644 index 00000000..d51cbb75 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/delegate/ShellTaskDelegate.java @@ -0,0 +1,205 @@ +package com.qqchen.deploy.backend.workflow.delegate; + +import com.qqchen.deploy.backend.workflow.event.ShellLogEvent; +import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums; +import lombok.extern.slf4j.Slf4j; +import org.flowable.engine.RuntimeService; +import org.flowable.engine.ManagementService; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.flowable.common.engine.api.delegate.Expression; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; +import jakarta.annotation.Resource; +import org.springframework.util.StringUtils; + +import java.io.*; +import java.util.Map; +import java.util.concurrent.*; + +/** + * Shell任务委托执行器 + */ +@Slf4j +@Component +public class ShellTaskDelegate implements JavaDelegate { + + @Resource + private ApplicationEventPublisher eventPublisher; + + @Resource + private RuntimeService runtimeService; + + @Resource + private ManagementService managementService; + + private Expression script; + private Expression workDir; + private Expression env; + + // 用于存储实时输出的Map + private static final Map outputMap = new ConcurrentHashMap<>(); + private static final Map errorMap = new ConcurrentHashMap<>(); + + private void processInputStream(InputStream inputStream, String processInstanceId, NodeLogTypeEnums logType) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + String line; + while ((line = reader.readLine()) != null) { + // 发布日志事件 + eventPublisher.publishEvent(new ShellLogEvent(processInstanceId, line, logType)); + + // 同时保存到StringBuilder中 + if (logType == NodeLogTypeEnums.STDOUT) { + StringBuilder output = outputMap.get(processInstanceId); + synchronized (output) { + output.append(line).append("\n"); + } + log.info("Shell output: {}", line); + } else { + StringBuilder error = errorMap.get(processInstanceId); + synchronized (error) { + error.append(line).append("\n"); + } + log.error("Shell error: {}", line); + } + } + } catch (IOException e) { + log.error("Error reading process output", e); + } + } + + @Override + public void execute(DelegateExecution execution) { + // 从字段注入中获取值 + String scriptValue = script != null ? script.getValue(execution).toString() : null; + String workDirValue = workDir != null ? workDir.getValue(execution).toString() : null; + @SuppressWarnings("unchecked") + Map envValue = env != null ? (Map) env.getValue(execution) : null; + + // 如果流程变量中有值,优先使用流程变量 + if (execution.hasVariable("script")) { + scriptValue = (String) execution.getVariable("script"); + } + if (execution.hasVariable("workDir")) { + workDirValue = (String) execution.getVariable("workDir"); + } + if (execution.hasVariable("env")) { + @SuppressWarnings("unchecked") + Map envFromVar = (Map) execution.getVariable("env"); + envValue = envFromVar; + } + + if (scriptValue == null) { + handleFailure(execution, "Script is required but not provided"); + return; + } + + try { + // 使用processInstanceId而不是executionId + String processInstanceId = execution.getProcessInstanceId(); + outputMap.put(processInstanceId, new StringBuilder()); + errorMap.put(processInstanceId, new StringBuilder()); + + // 创建进程构建器 + ProcessBuilder processBuilder = new ProcessBuilder(); + + // 根据操作系统选择合适的shell + String os = System.getProperty("os.name").toLowerCase(); + if (os.contains("win")) { + // Windows系统使用cmd + processBuilder.command("cmd", "/c", scriptValue); + } else { + // Unix-like系统使用bash + processBuilder.command("bash", "-c", scriptValue); + } + + // 设置工作目录 + if (StringUtils.hasText(workDirValue)) { + // Windows系统路径处理 + if (os.contains("win")) { + // 确保使用Windows风格的路径分隔符 + workDirValue = workDirValue.replace("/", "\\"); + // 如果路径以\开头,去掉第一个\ + if (workDirValue.startsWith("\\")) { + workDirValue = workDirValue.substring(1); + } + } + File workDirFile = new File(workDirValue); + if (!workDirFile.exists()) { + workDirFile.mkdirs(); + } + processBuilder.directory(workDirFile); + } + + // 设置环境变量 + if (envValue != null) { + processBuilder.environment().putAll(envValue); + } + + // 执行命令 + log.info("Executing shell script: {}", scriptValue); + Process process = processBuilder.start(); + + // 使用BufferedReader实时读取输出 + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); + + // 创建线程池处理输出 + ExecutorService executorService = Executors.newFixedThreadPool(2); + + // 处理标准输出 + Future outputFuture = executorService.submit(() -> + processInputStream(process.getInputStream(), processInstanceId, NodeLogTypeEnums.STDOUT)); + + // 处理错误输出 + Future errorFuture = executorService.submit(() -> + processInputStream(process.getErrorStream(), processInstanceId, NodeLogTypeEnums.STDERR)); + + // 等待进程完成 + int exitCode = process.waitFor(); + + // 等待输出处理完成 + outputFuture.get(5, TimeUnit.SECONDS); + errorFuture.get(5, TimeUnit.SECONDS); + + // 关闭线程池 + executorService.shutdown(); + + // 设置最终结果 + StringBuilder finalOutput = outputMap.get(processInstanceId); + StringBuilder finalError = errorMap.get(processInstanceId); + + execution.setVariable("shellOutput", finalOutput.toString()); + execution.setVariable("shellError", finalError.toString()); + execution.setVariable("shellExitCode", exitCode); + + // 清理缓存 + outputMap.remove(processInstanceId); + errorMap.remove(processInstanceId); + + if (exitCode != 0) { + log.error("Shell script execution failed with exit code: {}", exitCode); + log.error("Error output: {}", finalError); + handleFailure(execution, "Shell script execution failed with exit code: " + exitCode); + return; + } + + log.info("Shell script executed successfully"); + log.debug("Script output: {}", finalOutput); + + } catch (Exception e) { + log.error("Shell script execution failed", e); + handleFailure(execution, "Shell script execution failed: " + e.getMessage()); + } + } + + private void handleFailure(DelegateExecution execution, String errorMessage) { + String processInstanceId = execution.getProcessInstanceId(); + try { + // 直接终止流程实例 + runtimeService.deleteProcessInstance(processInstanceId, errorMessage); + } catch (Exception e) { + log.error("Error while handling shell task failure for process instance: {}", processInstanceId, e); + } + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowDefinitionService.java b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowDefinitionService.java index 9eb47556..f18ed7d8 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowDefinitionService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/workflow/service/IWorkflowDefinitionService.java @@ -61,4 +61,6 @@ public interface IWorkflowDefinitionService extends IBaseService new RuntimeException("Workflow definition not found: " + workflowDefinitionId)); + definition.setStatus(WorkflowStatusEnums.PUBLISHED); + workflowDefinitionRepository.save(definition); + log.info("Successfully published workflow definition: {}", workflowDefinitionId); + } + }