# Deploy Ease Workflow 工作流引擎设计文档 ## 一、项目概述 ### 1.1 项目目标 构建一个企业级工作流引擎系统,支持审批流程和CI/CD流程的统一管理和执行。系统需要具备高扩展性、高可用性和易用性。 ### 1.2 技术栈 - 后端:Java 21 + Spring Boot 3.x + Spring Cloud - 前端:React 18 + Ant Design 5.x + TypeScript - 数据库:PostgreSQL 16 - 缓存:Redis 7.x - 消息队列:RabbitMQ - 容器化:Docker + Kubernetes - 服务网关:Spring Cloud Gateway - 注册中心:Nacos - 配置中心:Nacos - 监控:Prometheus + Grafana ### 1.3 项目架构 ``` +------------------+ +------------------+ +------------------+ | 前端应用层 | <-> | 网关层 | <-> | 微服务集群 | +------------------+ +------------------+ +------------------+ ↑ ↑ ↑ | | | v v v +------------------+ +------------------+ +------------------+ | CDN/对象存储 | <-> | 消息总线 | <-> | 存储层 | +------------------+ +------------------+ +------------------+ ``` ## 二、实现优先级和里程碑 ### 2.1 第一阶段:核心框架搭建(2周) 1. 基础框架搭建 - 多模块项目结构 - 统一异常处理 - 统一响应格式 - 基础CRUD封装 2. 核心功能实现 - 用户认证授权 - 基础权限管理 - 系统配置管理 ### 2.2 第二阶段:工作流引擎核心(4周) 1. 工作流引擎实现 - 工作流模型设计 - 工作流执行引擎 - 节点抽象和实现 2. 插件系统实现 - 插件加载机制 - 插件生命周期管理 - 插件配置管理 ### 2.3 第三阶段:节点开发(4周) 1. 审批节点实现 - 表单节点 - 审批节点 - 会签节点 2. CI/CD节点实现 - Git节点 - Maven节点 - Docker节点 - K8S节点 ### 2.4 第四阶段:前端实现(4周) 1. 工作流设计器 - 流程设计器 - 节点配置面板 - 表单设计器 2. 运行监控 - 实例监控 - 日志查看 - 状态管理 ## 三、详细设计 ### 3.1 数据库设计 #### 3.1.1 工作流定义相关表 ```sql -- 工作流定义表 CREATE TABLE workflow_definition ( id BIGINT PRIMARY KEY, code VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, description TEXT, version INT NOT NULL, status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, create_by VARCHAR(50) NOT NULL, update_by VARCHAR(50) NOT NULL ); -- 节点定义表 CREATE TABLE node_definition ( id BIGINT PRIMARY KEY, workflow_id BIGINT NOT NULL, node_type VARCHAR(50) NOT NULL, name VARCHAR(100) NOT NULL, description TEXT, parameters TEXT, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id) ); -- 节点连接表 CREATE TABLE node_connection ( id BIGINT PRIMARY KEY, workflow_id BIGINT NOT NULL, source_node_id BIGINT NOT NULL, target_node_id BIGINT NOT NULL, condition_expr TEXT, create_time TIMESTAMP NOT NULL, FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id), FOREIGN KEY (source_node_id) REFERENCES node_definition(id), FOREIGN KEY (target_node_id) REFERENCES node_definition(id) ); ``` #### 3.1.2 工作流实例相关表 ```sql -- 工作流实例表 CREATE TABLE workflow_instance ( id BIGINT PRIMARY KEY, workflow_id BIGINT NOT NULL, status VARCHAR(20) NOT NULL, start_time TIMESTAMP NOT NULL, end_time TIMESTAMP, variables TEXT, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id) ); -- 节点实例表 CREATE TABLE node_instance ( id BIGINT PRIMARY KEY, workflow_instance_id BIGINT NOT NULL, node_id BIGINT NOT NULL, status VARCHAR(20) NOT NULL, start_time TIMESTAMP NOT NULL, end_time TIMESTAMP, inputs TEXT, outputs TEXT, error_message TEXT, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id), FOREIGN KEY (node_id) REFERENCES node_definition(id) ); -- 节点日志表 CREATE TABLE node_log ( id BIGINT PRIMARY KEY, node_instance_id BIGINT NOT NULL, level VARCHAR(10) NOT NULL, content TEXT NOT NULL, create_time TIMESTAMP NOT NULL, FOREIGN KEY (node_instance_id) REFERENCES node_instance(id) ); ``` #### 3.1.3 表单相关表 ```sql -- 表单定义表 CREATE TABLE form_definition ( id BIGINT PRIMARY KEY, code VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, description TEXT, schema TEXT NOT NULL, ui_schema TEXT, validation_rules TEXT, workflow_definition_id BIGINT, version INT NOT NULL, status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, create_by VARCHAR(50) NOT NULL, update_by VARCHAR(50) NOT NULL ); -- 表单实例表 CREATE TABLE form_instance ( id BIGINT PRIMARY KEY, form_definition_id BIGINT NOT NULL, form_data TEXT NOT NULL, workflow_instance_id BIGINT, submitter_id BIGINT NOT NULL, submit_time TIMESTAMP NOT NULL, status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (form_definition_id) REFERENCES form_definition(id) ); ``` #### 3.1.4 审批相关表 ```sql -- 审批记录表 CREATE TABLE approval_record ( id BIGINT PRIMARY KEY, workflow_instance_id BIGINT NOT NULL, node_instance_id BIGINT NOT NULL, approver_id BIGINT NOT NULL, approval_time TIMESTAMP NOT NULL, result VARCHAR(20) NOT NULL, comment TEXT, attachments TEXT, create_time TIMESTAMP NOT NULL ); -- 审批配置表 CREATE TABLE approval_config ( id BIGINT PRIMARY KEY, node_definition_id BIGINT NOT NULL, approval_type VARCHAR(20) NOT NULL, approver_ids TEXT, role_ids TEXT, department_ids TEXT, approval_mode VARCHAR(20) NOT NULL, timeout_hours INT, reminder_enabled BOOLEAN DEFAULT FALSE, reminder_interval INT, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (node_definition_id) REFERENCES node_definition(id) ); ``` #### 3.1.5 插件相关表 ```sql -- 插件信息表 CREATE TABLE plugin_info ( id BIGINT PRIMARY KEY, code VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, description TEXT, version VARCHAR(20) NOT NULL, status VARCHAR(20) NOT NULL, jar_path VARCHAR(200), config_schema TEXT, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL ); -- 插件配置表 CREATE TABLE plugin_config ( id BIGINT PRIMARY KEY, plugin_id BIGINT NOT NULL, config TEXT NOT NULL, status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, FOREIGN KEY (plugin_id) REFERENCES plugin_info(id) ); ``` ### 3.2 核心接口设计 #### 3.2.1 工作流引擎接口 ```java /** * 工作流引擎接口 */ public interface WorkflowEngine { /** * 启动工作流 */ WorkflowInstance startWorkflow(String workflowId, Map params); /** * 暂停工作流 */ void pauseWorkflow(String instanceId); /** * 恢复工作流 */ void resumeWorkflow(String instanceId); /** * 终止工作流 */ void terminateWorkflow(String instanceId, String reason); /** * 获取工作流状态 */ WorkflowStatus getWorkflowStatus(String instanceId); } ``` #### 3.2.2 节点执行器接口 ```java /** * 节点执行器接口 */ public interface NodeExecutor { /** * 执行节点 */ NodeExecuteResult execute(NodeContext context); /** * 取消执行 */ void cancel(NodeContext context); /** * 获取执行状态 */ NodeStatus getStatus(NodeContext context); } ``` #### 3.2.3 插件接口 ```java /** * 插件接口 */ public interface WorkflowPlugin { /** * 获取插件信息 */ PluginInfo getPluginInfo(); /** * 初始化插件 */ void init(PluginContext context); /** * 获取节点执行器 */ NodeExecutor getNodeExecutor(); } ``` ### 3.3 前端组件设计 #### 3.3.1 工作流设计器 ```typescript // 工作流设计器组件 const WorkflowDesigner: React.FC = () => { const [nodes, setNodes] = useState([]); const [connections, setConnections] = useState([]); return (
); }; ``` #### 3.3.2 节点配置面板 ```typescript // 节点配置面板组件 const NodeConfigPanel: React.FC = ({ node, onChange }) => { return (
onChange({ ...node, name: e.target.value })} /> onChange({ ...node, description: e.target.value })} /> onChange({ ...node, config })} />
); }; ``` #### 3.3.3 表单设计器 ```typescript // 表单设计器组件 const FormDesigner: React.FC = () => { const [formSchema, setFormSchema] = useState({}); const [uiSchema, setUiSchema] = useState({}); return (
{ setFormSchema(schema); setUiSchema(ui); }} />
); }; ``` ## 四、部署架构 ### 4.1 系统部署架构 ``` [负载均衡器 Nginx] ↓ [API网关 Spring Cloud Gateway] ↓ +----------------+----------------+----------------+ ↓ ↓ ↓ ↓ [工作流服务] [表单服务] [审批服务] [节点服务] ↓ ↓ ↓ ↓ [消息队列 RabbitMQ] ↓ [数据库 PostgreSQL] ↓ [缓存服务器 Redis] ``` ### 4.2 高可用方案 1. 服务高可用 - 服务多实例部署 - 服务注册与发现 - 负载均衡 2. 数据高可用 - 数据库主从复制 - 数据定期备份 - 数据异地容灾 3. 消息高可用 - 消息队列集群 - 消息持久化 - 消息重试机制 ## 五、安全方案 ### 5.1 认证授权 1. 用户认证 - JWT Token认证 - OAuth2.0集成 - 单点登录支持 2. 权限控制 - RBAC权限模型 - 数据权限控制 - API权限控制 ### 5.2 数据安全 1. 传输安全 - HTTPS加密 - 数据加密传输 - 防重放攻击 2. 存储安全 - 敏感数据加密 - 数据脱敏 - 数据备份 ## 六、监控运维 ### 6.1 系统监控 1. 性能监控 - JVM监控 - 数据库监控 - 接口性能监控 2. 业务监控 - 工作流执行监控 - 节点执行监控 - 审批流程监控 ### 6.2 日志管理 1. 日志收集 - 系统日志 - 业务日志 - 审计日志 2. 日志分析 - ELK日志分析 - 日志告警 - 日志存档 ## 七、后续规划 ### 7.1 功能扩展 1. 更多节点类型支持 2. 工作流模板市场 3. 移动端支持 4. 多语言支持 ### 7.2 性能优化 1. 大规模工作流优化 2. 复杂表单性能优化 3. 查询性能优化 ### 7.3 运维能力 1. 自动化部署 2. 容器化管理 3. 监控告警 4. 智能运维 ## 八、数据源管理 ### 8.1 数据源设计 ```sql -- 数据源定义表 CREATE TABLE datasource_definition ( id BIGINT PRIMARY KEY, code VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, description TEXT, type VARCHAR(20) NOT NULL, -- 数据源类型:MYSQL/POSTGRESQL/ORACLE/MONGODB等 config TEXT NOT NULL, -- 加密存储的连接配置 status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, update_time TIMESTAMP NOT NULL, create_by VARCHAR(50) NOT NULL, update_by VARCHAR(50) NOT NULL ); -- 数据源授权表 CREATE TABLE datasource_authorization ( id BIGINT PRIMARY KEY, datasource_id BIGINT NOT NULL, node_definition_id BIGINT NOT NULL, permission_type VARCHAR(20) NOT NULL, -- READ/WRITE/ALL valid_from TIMESTAMP NOT NULL, valid_until TIMESTAMP, approved_by VARCHAR(50) NOT NULL, approved_time TIMESTAMP NOT NULL, status VARCHAR(20) NOT NULL, create_time TIMESTAMP NOT NULL, FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id), FOREIGN KEY (node_definition_id) REFERENCES node_definition(id) ); -- 数据源操作审计表 CREATE TABLE datasource_audit_log ( id BIGINT PRIMARY KEY, datasource_id BIGINT NOT NULL, node_instance_id BIGINT NOT NULL, operation_type VARCHAR(20) NOT NULL, -- SELECT/INSERT/UPDATE/DELETE sql_statement TEXT, affected_rows INT, execution_time BIGINT, -- 毫秒 status VARCHAR(20) NOT NULL, error_message TEXT, create_time TIMESTAMP NOT NULL, FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id), FOREIGN KEY (node_instance_id) REFERENCES node_instance(id) ); ``` ### 8.2 数据源管理核心接口 ```java /** * 数据源管理接口 */ public interface DataSourceManager { /** * 获取节点可用的数据源列表 */ List getAvailableDataSources(String nodeDefinitionId); /** * 验证节点是否有数据源的操作权限 */ boolean validatePermission(String nodeInstanceId, String datasourceId, OperationType operationType); /** * 获取数据源连接 */ Connection getConnection(String nodeInstanceId, String datasourceId); /** * 记录数据源操作审计 */ void auditOperation(DataSourceAuditLog auditLog); } /** * 数据源操作包装器 */ public class DataSourceWrapper implements AutoCloseable { private final Connection connection; private final DataSourceAuditLogger auditLogger; public ResultSet executeQuery(String sql, Object... params) { try { long startTime = System.currentTimeMillis(); ResultSet rs = executeQueryInternal(sql, params); auditLogger.logQuery(sql, System.currentTimeMillis() - startTime); return rs; } catch (SQLException e) { auditLogger.logError(sql, e); throw new DataSourceException("Query execution failed", e); } } public int executeUpdate(String sql, Object... params) { try { long startTime = System.currentTimeMillis(); int affected = executeUpdateInternal(sql, params); auditLogger.logUpdate(sql, affected, System.currentTimeMillis() - startTime); return affected; } catch (SQLException e) { auditLogger.logError(sql, e); throw new DataSourceException("Update execution failed", e); } } } ``` ## 九、插件实现示例 ### 9.1 Git操作插件 ```java /** * Git插件实现 */ @Plugin( id = "git-plugin", name = "Git Plugin", version = "1.0.0", description = "Git operations plugin" ) public class GitPlugin implements WorkflowPlugin { private PluginContext context; @Override public void init(PluginContext context) { this.context = context; } @Override public NodeExecutor getNodeExecutor() { return new GitNodeExecutor(); } /** * Git节点执行器 */ public class GitNodeExecutor implements NodeExecutor { @Override public NodeExecuteResult execute(NodeContext context) { try { // 获取参数 String repoUrl = context.getInput("repoUrl"); String branch = context.getInput("branch"); String targetPath = context.getInput("targetPath"); // 执行Git克隆 context.getLogger().info("开始克隆代码: {}", repoUrl); Git.cloneRepository() .setURI(repoUrl) .setBranch(branch) .setDirectory(new File(targetPath)) .call(); // 设置输出 Map outputs = new HashMap<>(); outputs.put("clonePath", targetPath); return NodeExecuteResult.success(outputs); } catch (Exception e) { return NodeExecuteResult.failure(e.getMessage()); } } @Override public void cancel(NodeContext context) { // 实现取消逻辑 } @Override public NodeStatus getStatus(NodeContext context) { // 实现状态查询逻辑 return NodeStatus.RUNNING; } } } ``` ### 9.2 Maven构建插件 ```java /** * Maven插件实现 */ @Plugin( id = "maven-plugin", name = "Maven Plugin", version = "1.0.0", description = "Maven build plugin" ) public class MavenPlugin implements WorkflowPlugin { private PluginContext context; @Override public void init(PluginContext context) { this.context = context; } @Override public NodeExecutor getNodeExecutor() { return new MavenNodeExecutor(); } /** * Maven节点执行器 */ public class MavenNodeExecutor implements NodeExecutor { @Override public NodeExecuteResult execute(NodeContext context) { try { String projectPath = context.getInput("projectPath"); String goals = context.getInput("goals", "clean package"); String profile = context.getInput("profile"); context.getLogger().info("开始Maven构建: {}", projectPath); // 创建Maven请求 InvocationRequest request = new DefaultInvocationRequest(); request.setPomFile(new File(projectPath + "/pom.xml")); request.setGoals(Arrays.asList(goals.split(" "))); if (StringUtils.isNotBlank(profile)) { request.setProfiles(Collections.singletonList(profile)); } // 执行Maven构建 Invoker invoker = new DefaultInvoker(); InvocationResult result = invoker.execute(request); if (result.getExitCode() != 0) { return NodeExecuteResult.failure("Maven构建失败"); } // 设置输出 Map outputs = new HashMap<>(); outputs.put("buildResult", result.getExitCode()); outputs.put("targetDir", projectPath + "/target"); return NodeExecuteResult.success(outputs); } catch (Exception e) { return NodeExecuteResult.failure(e.getMessage()); } } } } ``` ### 9.3 Docker构建插件 ```java /** * Docker插件实现 */ @Plugin( id = "docker-plugin", name = "Docker Plugin", version = "1.0.0", description = "Docker operations plugin" ) public class DockerPlugin implements WorkflowPlugin { private DockerClient dockerClient; @Override public void init(PluginContext context) { // 初始化Docker客户端 this.dockerClient = DockerClientBuilder.getInstance() .withDockerHost(context.getConfig("dockerHost")) .withRegistryUsername(context.getConfig("registryUsername")) .withRegistryPassword(context.getConfig("registryPassword")) .build(); } @Override public NodeExecutor getNodeExecutor() { return new DockerNodeExecutor(dockerClient); } /** * Docker节点执行器 */ public class DockerNodeExecutor implements NodeExecutor { private final DockerClient dockerClient; public DockerNodeExecutor(DockerClient dockerClient) { this.dockerClient = dockerClient; } @Override public NodeExecuteResult execute(NodeContext context) { try { String dockerfile = context.getInput("dockerfile"); String imageName = context.getInput("imageName"); String imageTag = context.getInput("imageTag"); context.getLogger().info("开始构建Docker镜像: {}:{}", imageName, imageTag); // 构建镜像 String imageId = dockerClient.buildImageCmd() .withDockerfile(new File(dockerfile)) .withTag(imageName + ":" + imageTag) .start() .awaitImageId(); // 推送镜像 dockerClient.pushImageCmd(imageName) .withTag(imageTag) .start() .awaitCompletion(); // 设置输出 Map outputs = new HashMap<>(); outputs.put("imageId", imageId); outputs.put("imageName", imageName); outputs.put("imageTag", imageTag); return NodeExecuteResult.success(outputs); } catch (Exception e) { return NodeExecuteResult.failure(e.getMessage()); } } } } ``` ### 9.4 Kubernetes部署插件 ```java /** * Kubernetes插件实现 */ @Plugin( id = "kubernetes-plugin", name = "Kubernetes Plugin", version = "1.0.0", description = "Kubernetes deployment plugin" ) public class KubernetesPlugin implements WorkflowPlugin { private KubernetesClient k8sClient; @Override public void init(PluginContext context) { // 初始化K8s客户端 Config config = new ConfigBuilder() .withMasterUrl(context.getConfig("masterUrl")) .withOauthToken(context.getConfig("token")) .build(); this.k8sClient = new DefaultKubernetesClient(config); } @Override public NodeExecutor getNodeExecutor() { return new KubernetesNodeExecutor(k8sClient); } /** * Kubernetes节点执行器 */ public class KubernetesNodeExecutor implements NodeExecutor { private final KubernetesClient k8sClient; public KubernetesNodeExecutor(KubernetesClient k8sClient) { this.k8sClient = k8sClient; } @Override public NodeExecuteResult execute(NodeContext context) { try { String namespace = context.getInput("namespace"); String deploymentFile = context.getInput("deploymentFile"); context.getLogger().info("开始部署到Kubernetes: {}", namespace); // 读取部署文件 InputStream deploymentYaml = new FileInputStream(deploymentFile); // 创建或更新部署 k8sClient.load(deploymentYaml) .inNamespace(namespace) .createOrReplace(); // 等待部署完成 k8sClient.apps().deployments() .inNamespace(namespace) .withName(getDeploymentName(deploymentFile)) .waitUntilReady(5, TimeUnit.MINUTES); // 设置输出 Map outputs = new HashMap<>(); outputs.put("namespace", namespace); outputs.put("status", "deployed"); return NodeExecuteResult.success(outputs); } catch (Exception e) { return NodeExecuteResult.failure(e.getMessage()); } } private String getDeploymentName(String deploymentFile) { // 从YAML文件中解析部署名称 return "deployment-name"; } } } ``` ### 9.5 插件配置示例 ```properties # Git插件配置 plugin.id=git-plugin plugin.name=Git Plugin plugin.version=1.0.0 plugin.main=com.example.plugin.git.GitPlugin plugin.description=Git operations plugin plugin.author=Your Name plugin.requires=core>=1.0.0 # Maven插件配置 plugin.id=maven-plugin plugin.name=Maven Plugin plugin.version=1.0.0 plugin.main=com.example.plugin.maven.MavenPlugin plugin.description=Maven build plugin plugin.author=Your Name plugin.requires=core>=1.0.0 # Docker插件配置 plugin.id=docker-plugin plugin.name=Docker Plugin plugin.version=1.0.0 plugin.main=com.example.plugin.docker.DockerPlugin plugin.description=Docker operations plugin plugin.author=Your Name plugin.requires=core>=1.0.0 # Kubernetes插件配置 plugin.id=kubernetes-plugin plugin.name=Kubernetes Plugin plugin.version=1.0.0 plugin.main=com.example.plugin.kubernetes.KubernetesPlugin plugin.description=Kubernetes deployment plugin plugin.author=Your Name plugin.requires=core>=1.0.0 ``` ### 9.6 插件使用示例 ```java // 获取插件实例 GitPlugin gitPlugin = pluginManager.getPlugin("git-plugin"); NodeExecutor gitExecutor = gitPlugin.getNodeExecutor(); // 准备节点上下文 NodeContext context = NodeContext.builder() .input("repoUrl", "https://github.com/example/repo.git") .input("branch", "main") .input("targetPath", "/workspace/code") .build(); // 执行节点 NodeExecuteResult result = gitExecutor.execute(context); if (result.isSuccess()) { String clonePath = result.getOutput("clonePath"); // 处理成功逻辑 } else { String errorMessage = result.getErrorMessage(); // 处理失败逻辑 } ``` 这些插件实现示例展示了如何: 1. 实现基本的插件接口 2. 处理插件配置和初始化 3. 实现节点执行逻辑 4. 处理执行结果和错误 5. 提供插件使用示例 您可以基于这些示例快速开发新的插件。需要注意的是: 1. 每个插件都应该有清晰的职责 2. 做好错误处理和日志记录 3. 提供完整的配置说明 4. 注意资源的释放和清理 5. 考虑并发和性能问题 您觉得这些插件示例是否满足需求?我们可以根据您的��体需求进行调整或添加更多示例。 ## 十、节点执行日志功能 ### 10.1 日志模型设计 ```sql -- 节点执行日志表(分区表) CREATE TABLE node_execution_log ( id BIGINT PRIMARY KEY, workflow_instance_id BIGINT NOT NULL, node_instance_id BIGINT NOT NULL, log_type VARCHAR(20) NOT NULL, -- INFO/ERROR/COMMAND/OUTPUT content TEXT NOT NULL, log_time TIMESTAMP NOT NULL, sequence_no BIGINT NOT NULL, -- 日志序号,保证顺序 create_time TIMESTAMP NOT NULL, INDEX idx_node_seq (node_instance_id, sequence_no) -- 优化查询性能 ) PARTITION BY RANGE (create_time); -- 创建最近一个月的分区 CREATE TABLE node_execution_log_current PARTITION OF node_execution_log FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); ``` ### 10.2 日志查询接口 ```java @RestController @RequestMapping("/api/v1/workflow/node") @Tag(name = "节点日志", description = "节点执行日志相关接口") public class NodeLogController { @Resource private NodeLogService nodeLogService; /** * 分页查询日志 */ @GetMapping("/{nodeInstanceId}/logs") @Operation(summary = "分页查询节点日志") public PageResponse queryLogs( @PathVariable Long nodeInstanceId, @RequestParam(required = false) Long lastSequenceNo, @RequestParam(defaultValue = "100") Integer pageSize) { return nodeLogService.queryLogs(nodeInstanceId, lastSequenceNo, pageSize); } /** * 导出日志 */ @GetMapping("/{nodeInstanceId}/logs/export") @Operation(summary = "导出节点日志") public void exportLogs( @PathVariable Long nodeInstanceId, HttpServletResponse response) { nodeLogService.exportLogs(nodeInstanceId, response); } } @Service @Slf4j public class NodeLogService { @Resource private NodeLogRepository nodeLogRepository; /** * 分页查询日志 * @param nodeInstanceId 节点实例ID * @param lastSequenceNo 上次查询的最后序号(首次查询传null) * @param pageSize 每页大小 */ public PageResponse queryLogs(Long nodeInstanceId, Long lastSequenceNo, Integer pageSize) { // 首次查询 if (lastSequenceNo == null) { return nodeLogRepository.findFirstPage(nodeInstanceId, pageSize); } // 增量查询 return nodeLogRepository.findIncrementalLogs(nodeInstanceId, lastSequenceNo, pageSize); } /** * 写入日志 */ @Async public void writeLog(NodeLogEvent event) { try { NodeLog log = NodeLog.builder() .nodeInstanceId(event.getNodeInstanceId()) .logType(event.getType()) .content(event.getContent()) .logTime(LocalDateTime.now()) .sequenceNo(generateSequenceNo()) .build(); nodeLogRepository.save(log); } catch (Exception e) { log.error("Failed to write log", e); } } /** * 导出日志 */ public void exportLogs(Long nodeInstanceId, HttpServletResponse response) { response.setContentType("text/plain"); response.setHeader("Content-Disposition", "attachment; filename=node-" + nodeInstanceId + "-logs.txt"); try (PrintWriter writer = response.getWriter()) { nodeLogRepository.streamAllLogs(nodeInstanceId, log -> { writer.println(String.format("[%s] [%s] %s", formatTime(log.getLogTime()), log.getLogType(), log.getContent())); }); } catch (IOException e) { log.error("Failed to export logs", e); throw new BusinessException("导出日志失败"); } } } ``` ### 10.3 日志记录器 ```java /** * 节点日志记录器 */ public class NodeLogger { private final Long nodeInstanceId; private final NodeLogService logService; public void info(String message) { writeLog(LogType.INFO, message); } public void error(String message) { writeLog(LogType.ERROR, message); } public void command(String command) { writeLog(LogType.COMMAND, command); } public void output(String output) { writeLog(LogType.OUTPUT, output); } private void writeLog(LogType type, String content) { NodeLogEvent event = NodeLogEvent.builder() .nodeInstanceId(nodeInstanceId) .type(type) .content(content) .build(); logService.writeLog(event); } } ``` ### 10.4 前端日志组件 ```typescript interface LogDTO { id: number; logType: 'INFO' | 'ERROR' | 'COMMAND' | 'OUTPUT'; content: string; logTime: string; sequenceNo: number; } const NodeLogViewer: React.FC<{ nodeInstanceId: string }> = ({ nodeInstanceId }) => { const [logs, setLogs] = useState([]); const [loading, setLoading] = useState(false); const [error, setError] = useState(); const [lastSequenceNo, setLastSequenceNo] = useState(); const logEndRef = useRef(null); // 首次加载日志 useEffect(() => { loadLogs(); }, [nodeInstanceId]); // 定时增量查询 useEffect(() => { const timer = setInterval(() => { if (lastSequenceNo) { loadIncrementalLogs(); } }, 2000); // 每2秒查询一次 return () => clearInterval(timer); }, [lastSequenceNo]); // 加载首页日志 const loadLogs = async () => { try { setLoading(true); const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`); setLogs(res.data.records); if (res.data.records.length > 0) { setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo); } } catch (e) { setError('加载日志失败'); } finally { setLoading(false); } }; // 增量加载日志 const loadIncrementalLogs = async () => { try { const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`, { params: { lastSequenceNo } }); if (res.data.records.length > 0) { setLogs(logs => [...logs, ...res.data.records]); setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo); // 滚动到最新 logEndRef.current?.scrollIntoView({ behavior: 'smooth' }); } } catch (e) { console.error('增量加载日志失败', e); } }; // 导出日志 const handleExport = () => { window.open(`/api/v1/workflow/node/${nodeInstanceId}/logs/export`); }; return (
{loading && }
{logs.map((log, index) => (
{formatTime(log.logTime)} {log.logType} {log.content}
))}
{error && ( )}
); }; ``` ### 10.5 性能优化措施 1. **数据库优化** - 使用分区表按时间分区 - 建立复合索引(node_instance_id, sequence_no) - 定期归档历史日志 - 使用序列号进行增量查询 2. **查询优化** - 分页限制大小 - 增量查询而不是全量 - 适当的轮询间隔(2秒) - 异步写入日志 3. **前端优化** - 虚拟滚动显示大量日志 - 本地缓存已加载的日志 - 按需加载历史日志 - 智能调整轮询间隔 4. **系统优化** - 日志异步写入 - 批量写入优化 - 定期清理过期日志 - 监控慢查询 这个基于HTTP轮询的方案相比WebSocket有以下优势: 1. **更简单可靠** - 无需维护长连接 - 服务端实现更简单 - 更容易做负载均衡 - 出错重试更容易 2. **更好的性能** - 服务器资源占用更少 - 更容易控制并发 - 更好的伸缩性 - 便于性能优化 3. **更好的兼容性** - 支持更多的客户端 - 穿透防火墙更容易 - 不依赖特殊协议 - 更容易调试 您觉得这个基于HTTP轮询的方案如何?我们可以根据实际需求调整轮询间隔和其他参��。 ## 十一、工作流编排界面实现 [之前提供的完整代码内容] ## 十二、数据库表设计 ### 12.1 工作流定义表 ```sql CREATE TABLE workflow_definition ( id BIGINT PRIMARY KEY, -- 主键 name VARCHAR(100) NOT NULL, -- 工作流名称 code VARCHAR(50) NOT NULL, -- 工作流编码 description TEXT, -- 描述 status VARCHAR(20) NOT NULL, -- 状态:DRAFT/PUBLISHED/DISABLED version INT NOT NULL DEFAULT 1, -- 版本号 content TEXT NOT NULL, -- 工作流定义内容(JSON) create_time DATETIME NOT NULL, -- 创建时间 create_by VARCHAR(50) NOT NULL, -- 创建人 update_time DATETIME NOT NULL, -- 更新时间 update_by VARCHAR(50) NOT NULL, -- 更新人 deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除 UNIQUE KEY uk_code_version (code, version) ); ### 12.2 工作流实例表 ```sql CREATE TABLE workflow_instance ( id BIGINT PRIMARY KEY, -- 主键 definition_id BIGINT NOT NULL, -- 工作流定义ID name VARCHAR(100) NOT NULL, -- 实例名称 status VARCHAR(20) NOT NULL, -- 状态:RUNNING/COMPLETED/FAILED/CANCELED start_time DATETIME, -- 开始时间 end_time DATETIME, -- 结束时间 variables TEXT, -- 工作流变量(JSON) create_time DATETIME NOT NULL, -- 创建时间 create_by VARCHAR(50) NOT NULL, -- 创建人 update_time DATETIME NOT NULL, -- 更新时间 update_by VARCHAR(50) NOT NULL, -- 更新人 version INT NOT NULL DEFAULT 1, -- 版本号 deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除 FOREIGN KEY (definition_id) REFERENCES workflow_definition(id) ); ### 12.3 节点实例表 ```sql CREATE TABLE node_instance ( id BIGINT PRIMARY KEY, -- 主键 workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID node_id VARCHAR(50) NOT NULL, -- 节点ID node_name VARCHAR(100) NOT NULL, -- 节点名称 node_type VARCHAR(50) NOT NULL, -- 节点类型 status VARCHAR(20) NOT NULL, -- 状态:PENDING/RUNNING/COMPLETED/FAILED/CANCELED start_time DATETIME, -- 开始时间 end_time DATETIME, -- 结束时间 error TEXT, -- 错误信息 result TEXT, -- 执行结果(JSON) create_time DATETIME NOT NULL, -- 创建时间 create_by VARCHAR(50) NOT NULL, -- 创建人 update_time DATETIME NOT NULL, -- 更新时间 update_by VARCHAR(50) NOT NULL, -- 更新人 version INT NOT NULL DEFAULT 1, -- 版本号 deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除 FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id) ); ### 12.4 节点执行日志表 ```sql CREATE TABLE node_execution_log ( id BIGINT PRIMARY KEY, -- 主键 workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID node_instance_id BIGINT NOT NULL, -- 节点实例ID log_type VARCHAR(20) NOT NULL, -- 日志类型:INFO/ERROR/COMMAND/OUTPUT content TEXT NOT NULL, -- 日志内容 log_time DATETIME NOT NULL, -- 日志时间 sequence_no BIGINT NOT NULL, -- 日志序号 create_time DATETIME NOT NULL, -- 创建时间 create_by VARCHAR(50) NOT NULL, -- 创建人 update_time DATETIME NOT NULL, -- 更新时间 update_by VARCHAR(50) NOT NULL, -- 更新人 version INT NOT NULL DEFAULT 1, -- 版本号 deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除 FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id), FOREIGN KEY (node_instance_id) REFERENCES node_instance(id) ) PARTITION BY RANGE (UNIX_TIMESTAMP(create_time)); -- 创建最近一个月的分区 CREATE TABLE node_execution_log_current PARTITION OF node_execution_log FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); ### 12.5 工作流变量表 ```sql CREATE TABLE workflow_variable ( id BIGINT PRIMARY KEY, -- 主键 workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID name VARCHAR(100) NOT NULL, -- 变量名 value TEXT, -- 变量值 type VARCHAR(50) NOT NULL, -- 变量类型 scope VARCHAR(20) NOT NULL, -- 作用域:GLOBAL/NODE node_id VARCHAR(50), -- 节点ID(作用域为NODE时必填) create_time DATETIME NOT NULL, -- 创建时间 create_by VARCHAR(50) NOT NULL, -- 创建人 update_time DATETIME NOT NULL, -- 更新时间 update_by VARCHAR(50) NOT NULL, -- 更新人 version INT NOT NULL DEFAULT 1, -- 版本号 deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除 FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id) ); ``` 主要更新内容: 1. **统一基础字段** - id:BIGINT PRIMARY KEY - create_time:DATETIME NOT NULL - create_by:VARCHAR(50) NOT NULL - update_time:DATETIME NOT NULL - update_by:VARCHAR(50) NOT NULL - version:INT NOT NULL DEFAULT 1 - deleted:BOOLEAN NOT NULL DEFAULT FALSE 2. **表关系** - 使用外键关联 - 添加必要的索引 - 分区表支持 3. **字段规范** - 统一命名风格 - 添加字段注释 - 明确字段类型和约束 4. **数据完整性** - NOT NULL 约束 - 唯一索引 - 默认值 您觉得这个数据库设计是否更符合规范?我们可以根据具体需求进行调整。