deploy-ease-platform/backend/docs/deploy-ease-workflow.md
2024-12-02 17:22:31 +08:00

42 KiB
Raw Blame History

Deploy Ease Workflow 工作流引擎设计文档

一、项目概述

1.1 项目目标

构建一个企业级工作流引擎系统支持审批流程和CI/CD流程的统一管理和执行。系统需要具备高扩展性、高可用性和易用性。

1.2 技术栈

  • 后端Java 21 + Spring Boot 3.x + Spring Cloud
  • 前端React 18 + Ant Design 5.x + TypeScript
  • 数据库PostgreSQL 16
  • 缓存Redis 7.x
  • 消息队列RabbitMQ
  • 容器化Docker + Kubernetes
  • 服务网关Spring Cloud Gateway
  • 注册中心Nacos
  • 配置中心Nacos
  • 监控Prometheus + Grafana

1.3 项目架构

+------------------+     +------------------+     +------------------+
|   前端应用层      | <-> |    网关层        | <-> |   微服务集群      |
+------------------+     +------------------+     +------------------+
         ↑                       ↑                       ↑
         |                       |                       |
         v                       v                       v
+------------------+     +------------------+     +------------------+
|   CDN/对象存储    | <-> |    消息总线       | <-> |    存储层        |
+------------------+     +------------------+     +------------------+

二、实现优先级和里程碑

2.1 第一阶段核心框架搭建2周

  1. 基础框架搭建

    • 多模块项目结构
    • 统一异常处理
    • 统一响应格式
    • 基础CRUD封装
  2. 核心功能实现

    • 用户认证授权
    • 基础权限管理
    • 系统配置管理

2.2 第二阶段工作流引擎核心4周

  1. 工作流引擎实现

    • 工作流模型设计
    • 工作流执行引擎
    • 节点抽象和实现
  2. 插件系统实现

    • 插件加载机制
    • 插件生命周期管理
    • 插件配置管理

2.3 第三阶段节点开发4周

  1. 审批节点实现

    • 表单节点
    • 审批节点
    • 会签节点
  2. CI/CD节点实现

    • Git节点
    • Maven节点
    • Docker节点
    • K8S节点

2.4 第四阶段前端实现4周

  1. 工作流设计器

    • 流程设计器
    • 节点配置面板
    • 表单设计器
  2. 运行监控

    • 实例监控
    • 日志查看
    • 状态管理

三、详细设计

3.1 数据库设计

3.1.1 工作流定义相关表

-- 工作流定义表
CREATE TABLE workflow_definition (
    id BIGINT PRIMARY KEY,
    code VARCHAR(50) NOT NULL UNIQUE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    version INT NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    create_by VARCHAR(50) NOT NULL,
    update_by VARCHAR(50) NOT NULL
);

-- 节点定义表
CREATE TABLE node_definition (
    id BIGINT PRIMARY KEY,
    workflow_id BIGINT NOT NULL,
    node_type VARCHAR(50) NOT NULL,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    parameters TEXT,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id)
);

-- 节点连接表
CREATE TABLE node_connection (
    id BIGINT PRIMARY KEY,
    workflow_id BIGINT NOT NULL,
    source_node_id BIGINT NOT NULL,
    target_node_id BIGINT NOT NULL,
    condition_expr TEXT,
    create_time TIMESTAMP NOT NULL,
    FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id),
    FOREIGN KEY (source_node_id) REFERENCES node_definition(id),
    FOREIGN KEY (target_node_id) REFERENCES node_definition(id)
);

3.1.2 工作流实例相关表

-- 工作流实例表
CREATE TABLE workflow_instance (
    id BIGINT PRIMARY KEY,
    workflow_id BIGINT NOT NULL,
    status VARCHAR(20) NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP,
    variables TEXT,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id)
);

-- 节点实例表
CREATE TABLE node_instance (
    id BIGINT PRIMARY KEY,
    workflow_instance_id BIGINT NOT NULL,
    node_id BIGINT NOT NULL,
    status VARCHAR(20) NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP,
    inputs TEXT,
    outputs TEXT,
    error_message TEXT,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id),
    FOREIGN KEY (node_id) REFERENCES node_definition(id)
);

-- 节点日志表
CREATE TABLE node_log (
    id BIGINT PRIMARY KEY,
    node_instance_id BIGINT NOT NULL,
    level VARCHAR(10) NOT NULL,
    content TEXT NOT NULL,
    create_time TIMESTAMP NOT NULL,
    FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
);

3.1.3 表单相关表

-- 表单定义表
CREATE TABLE form_definition (
    id BIGINT PRIMARY KEY,
    code VARCHAR(50) NOT NULL UNIQUE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    schema TEXT NOT NULL,
    ui_schema TEXT,
    validation_rules TEXT,
    workflow_definition_id BIGINT,
    version INT NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    create_by VARCHAR(50) NOT NULL,
    update_by VARCHAR(50) NOT NULL
);

-- 表单实例表
CREATE TABLE form_instance (
    id BIGINT PRIMARY KEY,
    form_definition_id BIGINT NOT NULL,
    form_data TEXT NOT NULL,
    workflow_instance_id BIGINT,
    submitter_id BIGINT NOT NULL,
    submit_time TIMESTAMP NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (form_definition_id) REFERENCES form_definition(id)
);

3.1.4 审批相关表

-- 审批记录表
CREATE TABLE approval_record (
    id BIGINT PRIMARY KEY,
    workflow_instance_id BIGINT NOT NULL,
    node_instance_id BIGINT NOT NULL,
    approver_id BIGINT NOT NULL,
    approval_time TIMESTAMP NOT NULL,
    result VARCHAR(20) NOT NULL,
    comment TEXT,
    attachments TEXT,
    create_time TIMESTAMP NOT NULL
);

-- 审批配置表
CREATE TABLE approval_config (
    id BIGINT PRIMARY KEY,
    node_definition_id BIGINT NOT NULL,
    approval_type VARCHAR(20) NOT NULL,
    approver_ids TEXT,
    role_ids TEXT,
    department_ids TEXT,
    approval_mode VARCHAR(20) NOT NULL,
    timeout_hours INT,
    reminder_enabled BOOLEAN DEFAULT FALSE,
    reminder_interval INT,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (node_definition_id) REFERENCES node_definition(id)
);

3.1.5 插件相关表

-- 插件信息表
CREATE TABLE plugin_info (
    id BIGINT PRIMARY KEY,
    code VARCHAR(50) NOT NULL UNIQUE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    version VARCHAR(20) NOT NULL,
    status VARCHAR(20) NOT NULL,
    jar_path VARCHAR(200),
    config_schema TEXT,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL
);

-- 插件配置表
CREATE TABLE plugin_config (
    id BIGINT PRIMARY KEY,
    plugin_id BIGINT NOT NULL,
    config TEXT NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    FOREIGN KEY (plugin_id) REFERENCES plugin_info(id)
);

3.2 核心接口设计

3.2.1 工作流引擎接口

/**
 * 工作流引擎接口
 */
public interface WorkflowEngine {
    /**
     * 启动工作流
     */
    WorkflowInstance startWorkflow(String workflowId, Map<String, Object> params);
    
    /**
     * 暂停工作流
     */
    void pauseWorkflow(String instanceId);
    
    /**
     * 恢复工作流
     */
    void resumeWorkflow(String instanceId);
    
    /**
     * 终止工作流
     */
    void terminateWorkflow(String instanceId, String reason);
    
    /**
     * 获取工作流状态
     */
    WorkflowStatus getWorkflowStatus(String instanceId);
}

3.2.2 节点执行器接口

/**
 * 节点执行器接口
 */
public interface NodeExecutor {
    /**
     * 执行节点
     */
    NodeExecuteResult execute(NodeContext context);
    
    /**
     * 取消执行
     */
    void cancel(NodeContext context);
    
    /**
     * 获取执行状态
     */
    NodeStatus getStatus(NodeContext context);
}

3.2.3 插件接口

/**
 * 插件接口
 */
public interface WorkflowPlugin {
    /**
     * 获取插件信息
     */
    PluginInfo getPluginInfo();
    
    /**
     * 初始化插件
     */
    void init(PluginContext context);
    
    /**
     * 获取节点执行器
     */
    NodeExecutor getNodeExecutor();
}

3.3 前端组件设计

3.3.1 工作流设计器

// 工作流设计器组件
const WorkflowDesigner: React.FC = () => {
  const [nodes, setNodes] = useState<INodeDesigner[]>([]);
  const [connections, setConnections] = useState<IConnection[]>([]);
  
  return (
    <div className="workflow-designer">
      <Toolbar />
      <NodePalette />
      <Canvas
        nodes={nodes}
        connections={connections}
        onDrop={handleDrop}
        onConnect={handleConnect}
      />
      <PropertiesPanel />
    </div>
  );
};

3.3.2 节点配置面板

// 节点配置面板组件
const NodeConfigPanel: React.FC<NodeConfigPanelProps> = ({ node, onChange }) => {
  return (
    <div className="node-config-panel">
      <Form layout="vertical">
        <Form.Item label="节点名称" required>
          <Input 
            value={node.name}
            onChange={e => onChange({ ...node, name: e.target.value })}
          />
        </Form.Item>
        
        <Form.Item label="节点描述">
          <Input.TextArea
            value={node.description}
            onChange={e => onChange({ ...node, description: e.target.value })}
          />
        </Form.Item>
        
        <NodeTypeConfig
          type={node.type}
          config={node.config}
          onChange={config => onChange({ ...node, config })}
        />
      </Form>
    </div>
  );
};

3.3.3 表单设计器

// 表单设计器组件
const FormDesigner: React.FC = () => {
  const [formSchema, setFormSchema] = useState<FormSchema>({});
  const [uiSchema, setUiSchema] = useState<UiSchema>({});
  
  return (
    <div className="form-designer">
      <div className="toolbar">
        <ComponentPalette />
        <ActionButtons />
      </div>
      
      <div className="design-area">
        <FormCanvas
          schema={formSchema}
          uiSchema={uiSchema}
          onChange={(schema, ui) => {
            setFormSchema(schema);
            setUiSchema(ui);
          }}
        />
      </div>
      
      <div className="properties-panel">
        <SchemaEditor schema={formSchema} />
        <ValidationRules schema={formSchema} />
      </div>
    </div>
  );
};

四、部署架构

4.1 系统部署架构

                        [负载均衡器 Nginx]
                              ↓
                    [API网关 Spring Cloud Gateway]
                              ↓
     +----------------+----------------+----------------+
     ↓                ↓                ↓                ↓
[工作流服务]     [表单服务]      [审批服务]      [节点服务]
     ↓                ↓                ↓                ↓
                    [消息队列 RabbitMQ]
                              ↓
                    [数据库 PostgreSQL]
                              ↓
                    [缓存服务器 Redis]

4.2 高可用方案

  1. 服务高可用

    • 服务多实例部署
    • 服务注册与发现
    • 负载均衡
  2. 数据高可用

    • 数据库主从复制
    • 数据定期备份
    • 数据异地容灾
  3. 消息高可用

    • 消息队列集群
    • 消息持久化
    • 消息重试机制

五、安全方案

5.1 认证授权

  1. 用户认证

    • JWT Token认证
    • OAuth2.0集成
    • 单点登录支持
  2. 权限控制

    • RBAC权限模型
    • 数据权限控制
    • API权限控制

5.2 数据安全

  1. 传输安全

    • HTTPS加密
    • 数据加密传输
    • 防重放攻击
  2. 存储安全

    • 敏感数据加密
    • 数据脱敏
    • 数据备份

六、监控运维

6.1 系统监控

  1. 性能监控

    • JVM监控
    • 数据库监控
    • 接口性能监控
  2. 业务监控

    • 工作流执行监控
    • 节点执行监控
    • 审批流程监控

6.2 日志管理

  1. 日志收集

    • 系统日志
    • 业务日志
    • 审计日志
  2. 日志分析

    • ELK日志分析
    • 日志告警
    • 日志存档

七、后续规划

7.1 功能扩展

  1. 更多节点类型支持
  2. 工作流模板市场
  3. 移动端支持
  4. 多语言支持

7.2 性能优化

  1. 大规模工作流优化
  2. 复杂表单性能优化
  3. 查询性能优化

7.3 运维能力

  1. 自动化部署
  2. 容器化管理
  3. 监控告警
  4. 智能运维

八、数据源管理

8.1 数据源设计

-- 数据源定义表
CREATE TABLE datasource_definition (
    id BIGINT PRIMARY KEY,
    code VARCHAR(50) NOT NULL UNIQUE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    type VARCHAR(20) NOT NULL, -- 数据源类型MYSQL/POSTGRESQL/ORACLE/MONGODB等
    config TEXT NOT NULL,      -- 加密存储的连接配置
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    update_time TIMESTAMP NOT NULL,
    create_by VARCHAR(50) NOT NULL,
    update_by VARCHAR(50) NOT NULL
);

-- 数据源授权表
CREATE TABLE datasource_authorization (
    id BIGINT PRIMARY KEY,
    datasource_id BIGINT NOT NULL,
    node_definition_id BIGINT NOT NULL,
    permission_type VARCHAR(20) NOT NULL, -- READ/WRITE/ALL
    valid_from TIMESTAMP NOT NULL,
    valid_until TIMESTAMP,
    approved_by VARCHAR(50) NOT NULL,
    approved_time TIMESTAMP NOT NULL,
    status VARCHAR(20) NOT NULL,
    create_time TIMESTAMP NOT NULL,
    FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id),
    FOREIGN KEY (node_definition_id) REFERENCES node_definition(id)
);

-- 数据源操作审计表
CREATE TABLE datasource_audit_log (
    id BIGINT PRIMARY KEY,
    datasource_id BIGINT NOT NULL,
    node_instance_id BIGINT NOT NULL,
    operation_type VARCHAR(20) NOT NULL, -- SELECT/INSERT/UPDATE/DELETE
    sql_statement TEXT,
    affected_rows INT,
    execution_time BIGINT, -- 毫秒
    status VARCHAR(20) NOT NULL,
    error_message TEXT,
    create_time TIMESTAMP NOT NULL,
    FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id),
    FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
);

8.2 数据源管理核心接口

/**
 * 数据源管理接口
 */
public interface DataSourceManager {
    /**
     * 获取节点可用的数据源列表
     */
    List<DataSourceDefinition> getAvailableDataSources(String nodeDefinitionId);
    
    /**
     * 验证节点是否有数据源的操作权限
     */
    boolean validatePermission(String nodeInstanceId, String datasourceId, OperationType operationType);
    
    /**
     * 获取数据源连接
     */
    Connection getConnection(String nodeInstanceId, String datasourceId);
    
    /**
     * 记录数据源操作审计
     */
    void auditOperation(DataSourceAuditLog auditLog);
}

/**
 * 数据源操作包装器
 */
public class DataSourceWrapper implements AutoCloseable {
    private final Connection connection;
    private final DataSourceAuditLogger auditLogger;
    
    public ResultSet executeQuery(String sql, Object... params) {
        try {
            long startTime = System.currentTimeMillis();
            ResultSet rs = executeQueryInternal(sql, params);
            auditLogger.logQuery(sql, System.currentTimeMillis() - startTime);
            return rs;
        } catch (SQLException e) {
            auditLogger.logError(sql, e);
            throw new DataSourceException("Query execution failed", e);
        }
    }
    
    public int executeUpdate(String sql, Object... params) {
        try {
            long startTime = System.currentTimeMillis();
            int affected = executeUpdateInternal(sql, params);
            auditLogger.logUpdate(sql, affected, System.currentTimeMillis() - startTime);
            return affected;
        } catch (SQLException e) {
            auditLogger.logError(sql, e);
            throw new DataSourceException("Update execution failed", e);
        }
    }
}

九、插件实现示例

9.1 Git操作插件

/**
 * Git插件实现
 */
@Plugin(
    id = "git-plugin",
    name = "Git Plugin",
    version = "1.0.0",
    description = "Git operations plugin"
)
public class GitPlugin implements WorkflowPlugin {
    private PluginContext context;
    
    @Override
    public void init(PluginContext context) {
        this.context = context;
    }
    
    @Override
    public NodeExecutor getNodeExecutor() {
        return new GitNodeExecutor();
    }
    
    /**
     * Git节点执行器
     */
    public class GitNodeExecutor implements NodeExecutor {
        @Override
        public NodeExecuteResult execute(NodeContext context) {
            try {
                // 获取参数
                String repoUrl = context.getInput("repoUrl");
                String branch = context.getInput("branch");
                String targetPath = context.getInput("targetPath");
                
                // 执行Git克隆
                context.getLogger().info("开始克隆代码: {}", repoUrl);
                Git.cloneRepository()
                   .setURI(repoUrl)
                   .setBranch(branch)
                   .setDirectory(new File(targetPath))
                   .call();
                
                // 设置输出
                Map<String, Object> outputs = new HashMap<>();
                outputs.put("clonePath", targetPath);
                
                return NodeExecuteResult.success(outputs);
            } catch (Exception e) {
                return NodeExecuteResult.failure(e.getMessage());
            }
        }
        
        @Override
        public void cancel(NodeContext context) {
            // 实现取消逻辑
        }
        
        @Override
        public NodeStatus getStatus(NodeContext context) {
            // 实现状态查询逻辑
            return NodeStatus.RUNNING;
        }
    }
}

9.2 Maven构建插件

/**
 * Maven插件实现
 */
@Plugin(
    id = "maven-plugin",
    name = "Maven Plugin",
    version = "1.0.0",
    description = "Maven build plugin"
)
public class MavenPlugin implements WorkflowPlugin {
    private PluginContext context;
    
    @Override
    public void init(PluginContext context) {
        this.context = context;
    }
    
    @Override
    public NodeExecutor getNodeExecutor() {
        return new MavenNodeExecutor();
    }
    
    /**
     * Maven节点执行器
     */
    public class MavenNodeExecutor implements NodeExecutor {
        @Override
        public NodeExecuteResult execute(NodeContext context) {
            try {
                String projectPath = context.getInput("projectPath");
                String goals = context.getInput("goals", "clean package");
                String profile = context.getInput("profile");
                
                context.getLogger().info("开始Maven构建: {}", projectPath);
                
                // 创建Maven请求
                InvocationRequest request = new DefaultInvocationRequest();
                request.setPomFile(new File(projectPath + "/pom.xml"));
                request.setGoals(Arrays.asList(goals.split(" ")));
                
                if (StringUtils.isNotBlank(profile)) {
                    request.setProfiles(Collections.singletonList(profile));
                }
                
                // 执行Maven构建
                Invoker invoker = new DefaultInvoker();
                InvocationResult result = invoker.execute(request);
                
                if (result.getExitCode() != 0) {
                    return NodeExecuteResult.failure("Maven构建失败");
                }
                
                // 设置输出
                Map<String, Object> outputs = new HashMap<>();
                outputs.put("buildResult", result.getExitCode());
                outputs.put("targetDir", projectPath + "/target");
                
                return NodeExecuteResult.success(outputs);
            } catch (Exception e) {
                return NodeExecuteResult.failure(e.getMessage());
            }
        }
    }
}

9.3 Docker构建插件

/**
 * Docker插件实现
 */
@Plugin(
    id = "docker-plugin",
    name = "Docker Plugin",
    version = "1.0.0",
    description = "Docker operations plugin"
)
public class DockerPlugin implements WorkflowPlugin {
    private DockerClient dockerClient;
    
    @Override
    public void init(PluginContext context) {
        // 初始化Docker客户端
        this.dockerClient = DockerClientBuilder.getInstance()
            .withDockerHost(context.getConfig("dockerHost"))
            .withRegistryUsername(context.getConfig("registryUsername"))
            .withRegistryPassword(context.getConfig("registryPassword"))
            .build();
    }
    
    @Override
    public NodeExecutor getNodeExecutor() {
        return new DockerNodeExecutor(dockerClient);
    }
    
    /**
     * Docker节点执行器
     */
    public class DockerNodeExecutor implements NodeExecutor {
        private final DockerClient dockerClient;
        
        public DockerNodeExecutor(DockerClient dockerClient) {
            this.dockerClient = dockerClient;
        }
        
        @Override
        public NodeExecuteResult execute(NodeContext context) {
            try {
                String dockerfile = context.getInput("dockerfile");
                String imageName = context.getInput("imageName");
                String imageTag = context.getInput("imageTag");
                
                context.getLogger().info("开始构建Docker镜像: {}:{}", imageName, imageTag);
                
                // 构建镜像
                String imageId = dockerClient.buildImageCmd()
                    .withDockerfile(new File(dockerfile))
                    .withTag(imageName + ":" + imageTag)
                    .start()
                    .awaitImageId();
                
                // 推送镜像
                dockerClient.pushImageCmd(imageName)
                    .withTag(imageTag)
                    .start()
                    .awaitCompletion();
                
                // 设置输出
                Map<String, Object> outputs = new HashMap<>();
                outputs.put("imageId", imageId);
                outputs.put("imageName", imageName);
                outputs.put("imageTag", imageTag);
                
                return NodeExecuteResult.success(outputs);
            } catch (Exception e) {
                return NodeExecuteResult.failure(e.getMessage());
            }
        }
    }
}

9.4 Kubernetes部署插件

/**
 * Kubernetes插件实现
 */
@Plugin(
    id = "kubernetes-plugin",
    name = "Kubernetes Plugin",
    version = "1.0.0",
    description = "Kubernetes deployment plugin"
)
public class KubernetesPlugin implements WorkflowPlugin {
    private KubernetesClient k8sClient;
    
    @Override
    public void init(PluginContext context) {
        // 初始化K8s客户端
        Config config = new ConfigBuilder()
            .withMasterUrl(context.getConfig("masterUrl"))
            .withOauthToken(context.getConfig("token"))
            .build();
            
        this.k8sClient = new DefaultKubernetesClient(config);
    }
    
    @Override
    public NodeExecutor getNodeExecutor() {
        return new KubernetesNodeExecutor(k8sClient);
    }
    
    /**
     * Kubernetes节点执行器
     */
    public class KubernetesNodeExecutor implements NodeExecutor {
        private final KubernetesClient k8sClient;
        
        public KubernetesNodeExecutor(KubernetesClient k8sClient) {
            this.k8sClient = k8sClient;
        }
        
        @Override
        public NodeExecuteResult execute(NodeContext context) {
            try {
                String namespace = context.getInput("namespace");
                String deploymentFile = context.getInput("deploymentFile");
                
                context.getLogger().info("开始部署到Kubernetes: {}", namespace);
                
                // 读取部署文件
                InputStream deploymentYaml = new FileInputStream(deploymentFile);
                
                // 创建或更新部署
                k8sClient.load(deploymentYaml)
                    .inNamespace(namespace)
                    .createOrReplace();
                
                // 等待部署完成
                k8sClient.apps().deployments()
                    .inNamespace(namespace)
                    .withName(getDeploymentName(deploymentFile))
                    .waitUntilReady(5, TimeUnit.MINUTES);
                
                // 设置输出
                Map<String, Object> outputs = new HashMap<>();
                outputs.put("namespace", namespace);
                outputs.put("status", "deployed");
                
                return NodeExecuteResult.success(outputs);
            } catch (Exception e) {
                return NodeExecuteResult.failure(e.getMessage());
            }
        }
        
        private String getDeploymentName(String deploymentFile) {
            // 从YAML文件中解析部署名称
            return "deployment-name";
        }
    }
}

9.5 插件配置示例

# Git插件配置
plugin.id=git-plugin
plugin.name=Git Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.git.GitPlugin
plugin.description=Git operations plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0

# Maven插件配置
plugin.id=maven-plugin
plugin.name=Maven Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.maven.MavenPlugin
plugin.description=Maven build plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0

# Docker插件配置
plugin.id=docker-plugin
plugin.name=Docker Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.docker.DockerPlugin
plugin.description=Docker operations plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0

# Kubernetes插件配置
plugin.id=kubernetes-plugin
plugin.name=Kubernetes Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.kubernetes.KubernetesPlugin
plugin.description=Kubernetes deployment plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0

9.6 插件使用示例

// 获取插件实例
GitPlugin gitPlugin = pluginManager.getPlugin("git-plugin");
NodeExecutor gitExecutor = gitPlugin.getNodeExecutor();

// 准备节点上下文
NodeContext context = NodeContext.builder()
    .input("repoUrl", "https://github.com/example/repo.git")
    .input("branch", "main")
    .input("targetPath", "/workspace/code")
    .build();

// 执行节点
NodeExecuteResult result = gitExecutor.execute(context);

if (result.isSuccess()) {
    String clonePath = result.getOutput("clonePath");
    // 处理成功逻辑
} else {
    String errorMessage = result.getErrorMessage();
    // 处理失败逻辑
}

这些插件实现示例展示了如何:

  1. 实现基本的插件接口
  2. 处理插件配置和初始化
  3. 实现节点执行逻辑
  4. 处理执行结果和错误
  5. 提供插件使用示例

您可以基于这些示例快速开发新的插件。需要注意的是:

  1. 每个插件都应该有清晰的职责
  2. 做好错误处理和日志记录
  3. 提供完整的配置说明
  4. 注意资源的释放和清理
  5. 考虑并发和性能问题

您觉得这些插件示例是否满足需求我们可以根据您的<EFBFBD><EFBFBD>体需求进行调整或添加更多示例。

十、节点执行日志功能

10.1 日志模型设计

-- 节点执行日志表(分区表)
CREATE TABLE node_execution_log (
    id BIGINT PRIMARY KEY,
    workflow_instance_id BIGINT NOT NULL,
    node_instance_id BIGINT NOT NULL,
    log_type VARCHAR(20) NOT NULL,  -- INFO/ERROR/COMMAND/OUTPUT
    content TEXT NOT NULL,
    log_time TIMESTAMP NOT NULL,
    sequence_no BIGINT NOT NULL,    -- 日志序号,保证顺序
    create_time TIMESTAMP NOT NULL,
    INDEX idx_node_seq (node_instance_id, sequence_no)  -- 优化查询性能
) PARTITION BY RANGE (create_time);

-- 创建最近一个月的分区
CREATE TABLE node_execution_log_current PARTITION OF node_execution_log
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

10.2 日志查询接口

@RestController
@RequestMapping("/api/v1/workflow/node")
@Tag(name = "节点日志", description = "节点执行日志相关接口")
public class NodeLogController {
    
    @Resource
    private NodeLogService nodeLogService;
    
    /**
     * 分页查询日志
     */
    @GetMapping("/{nodeInstanceId}/logs")
    @Operation(summary = "分页查询节点日志")
    public PageResponse<LogDTO> queryLogs(
            @PathVariable Long nodeInstanceId,
            @RequestParam(required = false) Long lastSequenceNo,
            @RequestParam(defaultValue = "100") Integer pageSize) {
        return nodeLogService.queryLogs(nodeInstanceId, lastSequenceNo, pageSize);
    }
    
    /**
     * 导出日志
     */
    @GetMapping("/{nodeInstanceId}/logs/export")
    @Operation(summary = "导出节点日志")
    public void exportLogs(
            @PathVariable Long nodeInstanceId,
            HttpServletResponse response) {
        nodeLogService.exportLogs(nodeInstanceId, response);
    }
}

@Service
@Slf4j
public class NodeLogService {
    
    @Resource
    private NodeLogRepository nodeLogRepository;
    
    /**
     * 分页查询日志
     * @param nodeInstanceId 节点实例ID
     * @param lastSequenceNo 上次查询的最后序号首次查询传null
     * @param pageSize 每页大小
     */
    public PageResponse<LogDTO> queryLogs(Long nodeInstanceId, Long lastSequenceNo, Integer pageSize) {
        // 首次查询
        if (lastSequenceNo == null) {
            return nodeLogRepository.findFirstPage(nodeInstanceId, pageSize);
        }
        
        // 增量查询
        return nodeLogRepository.findIncrementalLogs(nodeInstanceId, lastSequenceNo, pageSize);
    }
    
    /**
     * 写入日志
     */
    @Async
    public void writeLog(NodeLogEvent event) {
        try {
            NodeLog log = NodeLog.builder()
                .nodeInstanceId(event.getNodeInstanceId())
                .logType(event.getType())
                .content(event.getContent())
                .logTime(LocalDateTime.now())
                .sequenceNo(generateSequenceNo())
                .build();
            
            nodeLogRepository.save(log);
        } catch (Exception e) {
            log.error("Failed to write log", e);
        }
    }
    
    /**
     * 导出日志
     */
    public void exportLogs(Long nodeInstanceId, HttpServletResponse response) {
        response.setContentType("text/plain");
        response.setHeader("Content-Disposition", 
            "attachment; filename=node-" + nodeInstanceId + "-logs.txt");
            
        try (PrintWriter writer = response.getWriter()) {
            nodeLogRepository.streamAllLogs(nodeInstanceId, log -> {
                writer.println(String.format("[%s] [%s] %s",
                    formatTime(log.getLogTime()),
                    log.getLogType(),
                    log.getContent()));
            });
        } catch (IOException e) {
            log.error("Failed to export logs", e);
            throw new BusinessException("导出日志失败");
        }
    }
}

10.3 日志记录器

/**
 * 节点日志记录器
 */
public class NodeLogger {
    private final Long nodeInstanceId;
    private final NodeLogService logService;
    
    public void info(String message) {
        writeLog(LogType.INFO, message);
    }
    
    public void error(String message) {
        writeLog(LogType.ERROR, message);
    }
    
    public void command(String command) {
        writeLog(LogType.COMMAND, command);
    }
    
    public void output(String output) {
        writeLog(LogType.OUTPUT, output);
    }
    
    private void writeLog(LogType type, String content) {
        NodeLogEvent event = NodeLogEvent.builder()
            .nodeInstanceId(nodeInstanceId)
            .type(type)
            .content(content)
            .build();
            
        logService.writeLog(event);
    }
}

10.4 前端日志组件

interface LogDTO {
  id: number;
  logType: 'INFO' | 'ERROR' | 'COMMAND' | 'OUTPUT';
  content: string;
  logTime: string;
  sequenceNo: number;
}

const NodeLogViewer: React.FC<{ nodeInstanceId: string }> = ({ nodeInstanceId }) => {
  const [logs, setLogs] = useState<LogDTO[]>([]);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState<string>();
  const [lastSequenceNo, setLastSequenceNo] = useState<number>();
  const logEndRef = useRef<HTMLDivElement>(null);
  
  // 首次加载日志
  useEffect(() => {
    loadLogs();
  }, [nodeInstanceId]);
  
  // 定时增量查询
  useEffect(() => {
    const timer = setInterval(() => {
      if (lastSequenceNo) {
        loadIncrementalLogs();
      }
    }, 2000); // 每2秒查询一次
    
    return () => clearInterval(timer);
  }, [lastSequenceNo]);
  
  // 加载首页日志
  const loadLogs = async () => {
    try {
      setLoading(true);
      const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`);
      setLogs(res.data.records);
      if (res.data.records.length > 0) {
        setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo);
      }
    } catch (e) {
      setError('加载日志失败');
    } finally {
      setLoading(false);
    }
  };
  
  // 增量加载日志
  const loadIncrementalLogs = async () => {
    try {
      const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`, {
        params: { lastSequenceNo }
      });
      if (res.data.records.length > 0) {
        setLogs(logs => [...logs, ...res.data.records]);
        setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo);
        // 滚动到最新
        logEndRef.current?.scrollIntoView({ behavior: 'smooth' });
      }
    } catch (e) {
      console.error('增量加载日志失败', e);
    }
  };
  
  // 导出日志
  const handleExport = () => {
    window.open(`/api/v1/workflow/node/${nodeInstanceId}/logs/export`);
  };
  
  return (
    <div className="node-log-viewer">
      <div className="log-header">
        <div className="status">
          {loading && <Spin size="small" />}
        </div>
        <div className="actions">
          <Button onClick={() => setLogs([])}>清空日志</Button>
          <Button onClick={handleExport}>导出日志</Button>
        </div>
      </div>
      
      <div className="log-content">
        {logs.map((log, index) => (
          <div key={index} className={`log-line log-${log.logType.toLowerCase()}`}>
            <span className="log-time">{formatTime(log.logTime)}</span>
            <span className="log-level">{log.logType}</span>
            <span className="log-content">{log.content}</span>
          </div>
        ))}
        <div ref={logEndRef} />
      </div>
      
      {error && (
        <Alert type="error" message={error} />
      )}
    </div>
  );
};

10.5 性能优化措施

  1. 数据库优化

    • 使用分区表按时间分区
    • 建立复合索引(node_instance_id, sequence_no)
    • 定期归档历史日志
    • 使用序列号进行增量查询
  2. 查询优化

    • 分页限制大小
    • 增量查询而不是全量
    • 适当的轮询间隔(2秒)
    • 异步写入日志
  3. 前端优化

    • 虚拟滚动显示大量日志
    • 本地缓存已加载的日志
    • 按需加载历史日志
    • 智能调整轮询间隔
  4. 系统优化

    • 日志异步写入
    • 批量写入优化
    • 定期清理过期日志
    • 监控慢查询

这个基于HTTP轮询的方案相比WebSocket有以下优势

  1. 更简单可靠

    • 无需维护长连接
    • 服务端实现更简单
    • 更容易做负载均衡
    • 出错重试更容易
  2. 更好的性能

    • 服务器资源占用更少
    • 更容易控制并发
    • 更好的伸缩性
    • 便于性能优化
  3. 更好的兼容性

    • 支持更多的客户端
    • 穿透防火墙更容易
    • 不依赖特殊协议
    • 更容易调试

您觉得这个基于HTTP轮询的方案如何我们可以根据实际需求调整轮询间隔和其他参<EFBFBD><EFBFBD>

十一、工作流编排界面实现

[之前提供的完整代码内容]

十二、数据库表设计

12.1 工作流定义表

CREATE TABLE workflow_definition (
    id BIGINT PRIMARY KEY,                    -- 主键
    name VARCHAR(100) NOT NULL,               -- 工作流名称
    code VARCHAR(50) NOT NULL,                -- 工作流编码
    description TEXT,                         -- 描述
    status VARCHAR(20) NOT NULL,              -- 状态DRAFT/PUBLISHED/DISABLED
    version INT NOT NULL DEFAULT 1,           -- 版本号
    content TEXT NOT NULL,                    -- 工作流定义内容(JSON)
    create_time DATETIME NOT NULL,            -- 创建时间
    create_by VARCHAR(50) NOT NULL,           -- 创建人
    update_time DATETIME NOT NULL,            -- 更新时间
    update_by VARCHAR(50) NOT NULL,           -- 更新人
    deleted BOOLEAN NOT NULL DEFAULT FALSE,   -- 是否删除
    UNIQUE KEY uk_code_version (code, version)
);

### 12.2 工作流实例表
```sql
CREATE TABLE workflow_instance (
    id BIGINT PRIMARY KEY,                    -- 主键
    definition_id BIGINT NOT NULL,            -- 工作流定义ID
    name VARCHAR(100) NOT NULL,               -- 实例名称
    status VARCHAR(20) NOT NULL,              -- 状态RUNNING/COMPLETED/FAILED/CANCELED
    start_time DATETIME,                      -- 开始时间
    end_time DATETIME,                        -- 结束时间
    variables TEXT,                           -- 工作流变量(JSON)
    create_time DATETIME NOT NULL,            -- 创建时间
    create_by VARCHAR(50) NOT NULL,           -- 创建人
    update_time DATETIME NOT NULL,            -- 更新时间
    update_by VARCHAR(50) NOT NULL,           -- 更新人
    version INT NOT NULL DEFAULT 1,           -- 版本号
    deleted BOOLEAN NOT NULL DEFAULT FALSE,   -- 是否删除
    FOREIGN KEY (definition_id) REFERENCES workflow_definition(id)
);

### 12.3 节点实例表
```sql
CREATE TABLE node_instance (
    id BIGINT PRIMARY KEY,                    -- 主键
    workflow_instance_id BIGINT NOT NULL,     -- 工作流实例ID
    node_id VARCHAR(50) NOT NULL,             -- 节点ID
    node_name VARCHAR(100) NOT NULL,          -- 节点名称
    node_type VARCHAR(50) NOT NULL,           -- 节点类型
    status VARCHAR(20) NOT NULL,              -- 状态PENDING/RUNNING/COMPLETED/FAILED/CANCELED
    start_time DATETIME,                      -- 开始时间
    end_time DATETIME,                        -- 结束时间
    error TEXT,                               -- 错误信息
    result TEXT,                              -- 执行结果(JSON)
    create_time DATETIME NOT NULL,            -- 创建时间
    create_by VARCHAR(50) NOT NULL,           -- 创建人
    update_time DATETIME NOT NULL,            -- 更新时间
    update_by VARCHAR(50) NOT NULL,           -- 更新人
    version INT NOT NULL DEFAULT 1,           -- 版本号
    deleted BOOLEAN NOT NULL DEFAULT FALSE,   -- 是否删除
    FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id)
);

### 12.4 节点执行日志表
```sql
CREATE TABLE node_execution_log (
    id BIGINT PRIMARY KEY,                    -- 主键
    workflow_instance_id BIGINT NOT NULL,     -- 工作流实例ID
    node_instance_id BIGINT NOT NULL,         -- 节点实例ID
    log_type VARCHAR(20) NOT NULL,            -- 日志类型INFO/ERROR/COMMAND/OUTPUT
    content TEXT NOT NULL,                    -- 日志内容
    log_time DATETIME NOT NULL,               -- 日志时间
    sequence_no BIGINT NOT NULL,              -- 日志序号
    create_time DATETIME NOT NULL,            -- 创建时间
    create_by VARCHAR(50) NOT NULL,           -- 创建人
    update_time DATETIME NOT NULL,            -- 更新时间
    update_by VARCHAR(50) NOT NULL,           -- 更新人
    version INT NOT NULL DEFAULT 1,           -- 版本号
    deleted BOOLEAN NOT NULL DEFAULT FALSE,   -- 是否删除
    FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id),
    FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
) PARTITION BY RANGE (UNIX_TIMESTAMP(create_time));

-- 创建最近一个月的分区
CREATE TABLE node_execution_log_current 
    PARTITION OF node_execution_log
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

### 12.5 工作流变量表
```sql
CREATE TABLE workflow_variable (
    id BIGINT PRIMARY KEY,                    -- 主键
    workflow_instance_id BIGINT NOT NULL,     -- 工作流实例ID
    name VARCHAR(100) NOT NULL,               -- 变量名
    value TEXT,                               -- 变量值
    type VARCHAR(50) NOT NULL,                -- 变量类型
    scope VARCHAR(20) NOT NULL,               -- 作用域GLOBAL/NODE
    node_id VARCHAR(50),                      -- 节点ID(作用域为NODE时必填)
    create_time DATETIME NOT NULL,            -- 创建时间
    create_by VARCHAR(50) NOT NULL,           -- 创建人
    update_time DATETIME NOT NULL,            -- 更新时间
    update_by VARCHAR(50) NOT NULL,           -- 更新人
    version INT NOT NULL DEFAULT 1,           -- 版本号
    deleted BOOLEAN NOT NULL DEFAULT FALSE,   -- 是否删除
    FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id)
);

主要更新内容:

  1. 统一基础字段

    • idBIGINT PRIMARY KEY
    • create_timeDATETIME NOT NULL
    • create_byVARCHAR(50) NOT NULL
    • update_timeDATETIME NOT NULL
    • update_byVARCHAR(50) NOT NULL
    • versionINT NOT NULL DEFAULT 1
    • deletedBOOLEAN NOT NULL DEFAULT FALSE
  2. 表关系

    • 使用外键关联
    • 添加必要的索引
    • 分区表支持
  3. 字段规范

    • 统一命名风格
    • 添加字段注释
    • 明确字段类型和约束
  4. 数据完整性

    • NOT NULL 约束
    • 唯一索引
    • 默认值

您觉得这个数据库设计是否更符合规范?我们可以根据具体需求进行调整。