通用解析
This commit is contained in:
parent
2b90d32d8d
commit
559e335d9a
@ -4,7 +4,6 @@ import com.qqchen.deploy.backend.framework.api.Response;
|
||||
import com.qqchen.deploy.backend.framework.controller.BaseController;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.workflow.dto.WorkflowDefinitionDTO;
|
||||
import com.qqchen.deploy.backend.workflow.dto.WorkflowDesignDTO;
|
||||
import com.qqchen.deploy.backend.workflow.dto.WorkflowExecutionDTO;
|
||||
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceDTO;
|
||||
import com.qqchen.deploy.backend.workflow.dto.WorkflowInstanceCreateDTO;
|
||||
@ -52,7 +51,7 @@ public class WorkflowDefinitionApiController extends BaseController<WorkflowDefi
|
||||
|
||||
@Operation(summary = "保存工作流设计")
|
||||
@PostMapping("/design")
|
||||
public Response<WorkflowDesignDTO> saveWorkflowDesign(@RequestBody WorkflowDefinitionDTO dto) throws Exception {
|
||||
public Response<WorkflowDefinitionDTO> saveWorkflowDesign(@RequestBody WorkflowDefinitionDTO dto) throws Exception {
|
||||
return Response.success(workflowDefinitionService.saveWorkflowDesign(dto));
|
||||
}
|
||||
|
||||
|
||||
@ -1,205 +0,0 @@
|
||||
package com.qqchen.deploy.backend.workflow.delegate;
|
||||
|
||||
import com.qqchen.deploy.backend.workflow.event.ShellLogEvent;
|
||||
import com.qqchen.deploy.backend.workflow.enums.NodeLogTypeEnums;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.RuntimeService;
|
||||
import org.flowable.engine.ManagementService;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.flowable.common.engine.api.delegate.Expression;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Component;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Shell任务委托执行器
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ShellTaskDelegate implements JavaDelegate {
|
||||
|
||||
@Resource
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@Resource
|
||||
private RuntimeService runtimeService;
|
||||
|
||||
@Resource
|
||||
private ManagementService managementService;
|
||||
|
||||
private Expression script;
|
||||
private Expression workDir;
|
||||
private Expression env;
|
||||
|
||||
// 用于存储实时输出的Map
|
||||
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
|
||||
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
|
||||
|
||||
private void processInputStream(InputStream inputStream, String processInstanceId, NodeLogTypeEnums logType) {
|
||||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
// 发布日志事件
|
||||
eventPublisher.publishEvent(new ShellLogEvent(processInstanceId, line, logType));
|
||||
|
||||
// 同时保存到StringBuilder中
|
||||
if (logType == NodeLogTypeEnums.STDOUT) {
|
||||
StringBuilder output = outputMap.get(processInstanceId);
|
||||
synchronized (output) {
|
||||
output.append(line).append("\n");
|
||||
}
|
||||
log.info("Shell output: {}", line);
|
||||
} else {
|
||||
StringBuilder error = errorMap.get(processInstanceId);
|
||||
synchronized (error) {
|
||||
error.append(line).append("\n");
|
||||
}
|
||||
log.error("Shell error: {}", line);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading process output", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
// 从字段注入中获取值
|
||||
String scriptValue = script != null ? script.getValue(execution).toString() : null;
|
||||
String workDirValue = workDir != null ? workDir.getValue(execution).toString() : null;
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> envValue = env != null ? (Map<String, String>) env.getValue(execution) : null;
|
||||
|
||||
// 如果流程变量中有值,优先使用流程变量
|
||||
if (execution.hasVariable("script")) {
|
||||
scriptValue = (String) execution.getVariable("script");
|
||||
}
|
||||
if (execution.hasVariable("workDir")) {
|
||||
workDirValue = (String) execution.getVariable("workDir");
|
||||
}
|
||||
if (execution.hasVariable("env")) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> envFromVar = (Map<String, String>) execution.getVariable("env");
|
||||
envValue = envFromVar;
|
||||
}
|
||||
|
||||
if (scriptValue == null) {
|
||||
handleFailure(execution, "Script is required but not provided");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 使用processInstanceId而不是executionId
|
||||
String processInstanceId = execution.getProcessInstanceId();
|
||||
outputMap.put(processInstanceId, new StringBuilder());
|
||||
errorMap.put(processInstanceId, new StringBuilder());
|
||||
|
||||
// 创建进程构建器
|
||||
ProcessBuilder processBuilder = new ProcessBuilder();
|
||||
|
||||
// 根据操作系统选择合适的shell
|
||||
String os = System.getProperty("os.name").toLowerCase();
|
||||
if (os.contains("win")) {
|
||||
// Windows系统使用cmd
|
||||
processBuilder.command("cmd", "/c", scriptValue);
|
||||
} else {
|
||||
// Unix-like系统使用bash
|
||||
processBuilder.command("bash", "-c", scriptValue);
|
||||
}
|
||||
|
||||
// 设置工作目录
|
||||
if (StringUtils.hasText(workDirValue)) {
|
||||
// Windows系统路径处理
|
||||
if (os.contains("win")) {
|
||||
// 确保使用Windows风格的路径分隔符
|
||||
workDirValue = workDirValue.replace("/", "\\");
|
||||
// 如果路径以\开头,去掉第一个\
|
||||
if (workDirValue.startsWith("\\")) {
|
||||
workDirValue = workDirValue.substring(1);
|
||||
}
|
||||
}
|
||||
File workDirFile = new File(workDirValue);
|
||||
if (!workDirFile.exists()) {
|
||||
workDirFile.mkdirs();
|
||||
}
|
||||
processBuilder.directory(workDirFile);
|
||||
}
|
||||
|
||||
// 设置环境变量
|
||||
if (envValue != null) {
|
||||
processBuilder.environment().putAll(envValue);
|
||||
}
|
||||
|
||||
// 执行命令
|
||||
log.info("Executing shell script: {}", scriptValue);
|
||||
Process process = processBuilder.start();
|
||||
|
||||
// 使用BufferedReader实时读取输出
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
|
||||
BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
|
||||
|
||||
// 创建线程池处理输出
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
|
||||
// 处理标准输出
|
||||
Future<?> outputFuture = executorService.submit(() ->
|
||||
processInputStream(process.getInputStream(), processInstanceId, NodeLogTypeEnums.STDOUT));
|
||||
|
||||
// 处理错误输出
|
||||
Future<?> errorFuture = executorService.submit(() ->
|
||||
processInputStream(process.getErrorStream(), processInstanceId, NodeLogTypeEnums.STDERR));
|
||||
|
||||
// 等待进程完成
|
||||
int exitCode = process.waitFor();
|
||||
|
||||
// 等待输出处理完成
|
||||
outputFuture.get(5, TimeUnit.SECONDS);
|
||||
errorFuture.get(5, TimeUnit.SECONDS);
|
||||
|
||||
// 关闭线程池
|
||||
executorService.shutdown();
|
||||
|
||||
// 设置最终结果
|
||||
StringBuilder finalOutput = outputMap.get(processInstanceId);
|
||||
StringBuilder finalError = errorMap.get(processInstanceId);
|
||||
|
||||
execution.setVariable("shellOutput", finalOutput.toString());
|
||||
execution.setVariable("shellError", finalError.toString());
|
||||
execution.setVariable("shellExitCode", exitCode);
|
||||
|
||||
// 清理缓存
|
||||
outputMap.remove(processInstanceId);
|
||||
errorMap.remove(processInstanceId);
|
||||
|
||||
if (exitCode != 0) {
|
||||
log.error("Shell script execution failed with exit code: {}", exitCode);
|
||||
log.error("Error output: {}", finalError);
|
||||
handleFailure(execution, "Shell script execution failed with exit code: " + exitCode);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Shell script executed successfully");
|
||||
log.debug("Script output: {}", finalOutput);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Shell script execution failed", e);
|
||||
handleFailure(execution, "Shell script execution failed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFailure(DelegateExecution execution, String errorMessage) {
|
||||
String processInstanceId = execution.getProcessInstanceId();
|
||||
try {
|
||||
// 直接终止流程实例
|
||||
runtimeService.deleteProcessInstance(processInstanceId, errorMessage);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while handling shell task failure for process instance: {}", processInstanceId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,29 +0,0 @@
|
||||
package com.qqchen.deploy.backend.workflow.delegate;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.engine.delegate.DelegateExecution;
|
||||
import org.flowable.engine.delegate.JavaDelegate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 简单任务委托实现
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SimpleTaskDelegate implements JavaDelegate {
|
||||
|
||||
@Override
|
||||
public void execute(DelegateExecution execution) {
|
||||
String taskName = execution.getCurrentFlowElement().getName();
|
||||
log.info("Executing task: {}", taskName);
|
||||
|
||||
// 获取流程变量
|
||||
String input = (String) execution.getVariable("input");
|
||||
log.info("Task input: {}", input);
|
||||
|
||||
// 设置输出变量
|
||||
execution.setVariable("output", "Processed: " + input);
|
||||
|
||||
log.info("Task completed: {}", taskName);
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package com.qqchen.deploy.backend.workflow.entity;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.qqchen.deploy.backend.framework.annotation.LogicDelete;
|
||||
import com.qqchen.deploy.backend.workflow.enums.WorkflowStatusEnums;
|
||||
import com.vladmihalcea.hibernate.type.json.JsonType;
|
||||
import com.qqchen.deploy.backend.framework.domain.Entity;
|
||||
@ -19,6 +20,7 @@ import org.hibernate.annotations.Type;
|
||||
@Table(name = "workflow_definition")
|
||||
@jakarta.persistence.Entity
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@LogicDelete
|
||||
public class WorkflowDefinition extends Entity<Long> {
|
||||
|
||||
/**
|
||||
|
||||
@ -16,7 +16,7 @@ import java.util.Map;
|
||||
*/
|
||||
public interface IWorkflowDefinitionService extends IBaseService<WorkflowDefinition, WorkflowDefinitionDTO, Long> {
|
||||
|
||||
WorkflowDesignDTO saveWorkflowDesign(WorkflowDefinitionDTO dto) throws Exception;
|
||||
WorkflowDefinitionDTO saveWorkflowDesign(WorkflowDefinitionDTO dto) throws Exception;
|
||||
|
||||
/**
|
||||
* 部署工作流
|
||||
|
||||
@ -69,39 +69,42 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl<WorkflowDefin
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public WorkflowDesignDTO saveWorkflowDesign(WorkflowDefinitionDTO dto) throws Exception {
|
||||
public WorkflowDefinitionDTO saveWorkflowDesign(WorkflowDefinitionDTO dto) throws Exception {
|
||||
// 转换图形JSON为BPMN XML
|
||||
String bpmnXml = bpmnConverter.convertToBpmnXml(null, dto.getKey());
|
||||
String bpmnXml = bpmnConverter.convertToBpmnXml(dto.getGraphJson().toString(), dto.getKey());
|
||||
|
||||
// 创建工作流定义
|
||||
WorkflowDefinition definition = new WorkflowDefinition();
|
||||
definition.setName(dto.getName());
|
||||
definition.setKey(dto.getKey());
|
||||
definition.setDescription(dto.getDescription());
|
||||
definition.setGraphJson(dto.getGraphJson());
|
||||
definition.setBpmnXml(bpmnXml);
|
||||
definition.setFlowVersion(1); // 设置初始版本为1
|
||||
|
||||
// 保存工作流定义
|
||||
definition = workflowDefinitionRepository.save(definition);
|
||||
|
||||
// 部署工作流
|
||||
WorkflowDefinitionDTO deployDto = new WorkflowDefinitionDTO();
|
||||
deployDto.setId(definition.getId());
|
||||
deployDto.setName(definition.getName());
|
||||
deployDto.setKey(definition.getKey());
|
||||
deployDto.setBpmnXml(definition.getBpmnXml());
|
||||
deployDto.setDescription(definition.getDescription());
|
||||
// workflowDefinitionService.deployWorkflow(deployDto);
|
||||
|
||||
// 手动转换为 WorkflowDesignDTO
|
||||
WorkflowDesignDTO result = new WorkflowDesignDTO();
|
||||
result.setId(definition.getId());
|
||||
result.setName(definition.getName());
|
||||
result.setKey(definition.getKey());
|
||||
result.setDescription(definition.getDescription());
|
||||
result.setGraphJson(definition.getGraphJson());
|
||||
return result;
|
||||
dto.setFlowVersion(1);
|
||||
dto.setBpmnXml(bpmnXml);
|
||||
// // 创建工作流定义
|
||||
// WorkflowDefinition definition = new WorkflowDefinition();
|
||||
// definition.setName(dto.getName());
|
||||
// definition.setKey(dto.getKey());
|
||||
// definition.setDescription(dto.getDescription());
|
||||
// definition.setGraphJson(dto.getGraphJson());
|
||||
// definition.setBpmnXml(bpmnXml);
|
||||
// definition.setFlowVersion(1); // 设置初始版本为1
|
||||
//
|
||||
// // 保存工作流定义
|
||||
WorkflowDefinition definition = super.converter.toEntity(dto);
|
||||
workflowDefinitionRepository.save(definition);
|
||||
//
|
||||
// // 部署工作流
|
||||
// WorkflowDefinitionDTO deployDto = new WorkflowDefinitionDTO();
|
||||
// deployDto.setId(definition.getId());
|
||||
// deployDto.setName(definition.getName());
|
||||
// deployDto.setKey(definition.getKey());
|
||||
// deployDto.setBpmnXml(definition.getBpmnXml());
|
||||
// deployDto.setDescription(definition.getDescription());
|
||||
//// workflowDefinitionService.deployWorkflow(deployDto);
|
||||
//
|
||||
// // 手动转换为 WorkflowDesignDTO
|
||||
// WorkflowDesignDTO result = new WorkflowDesignDTO();
|
||||
// result.setId(definition.getId());
|
||||
// result.setName(definition.getName());
|
||||
// result.setKey(definition.getKey());
|
||||
// result.setDescription(definition.getDescription());
|
||||
// result.setGraphJson(definition.getGraphJson());
|
||||
return super.converter.toDto(definition);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
Reference in New Issue
Block a user