42 KiB
42 KiB
Deploy Ease Workflow 工作流引擎设计文档
一、项目概述
1.1 项目目标
构建一个企业级工作流引擎系统,支持审批流程和CI/CD流程的统一管理和执行。系统需要具备高扩展性、高可用性和易用性。
1.2 技术栈
- 后端:Java 21 + Spring Boot 3.x + Spring Cloud
- 前端:React 18 + Ant Design 5.x + TypeScript
- 数据库:PostgreSQL 16
- 缓存:Redis 7.x
- 消息队列:RabbitMQ
- 容器化:Docker + Kubernetes
- 服务网关:Spring Cloud Gateway
- 注册中心:Nacos
- 配置中心:Nacos
- 监控:Prometheus + Grafana
1.3 项目架构
+------------------+ +------------------+ +------------------+
| 前端应用层 | <-> | 网关层 | <-> | 微服务集群 |
+------------------+ +------------------+ +------------------+
↑ ↑ ↑
| | |
v v v
+------------------+ +------------------+ +------------------+
| CDN/对象存储 | <-> | 消息总线 | <-> | 存储层 |
+------------------+ +------------------+ +------------------+
二、实现优先级和里程碑
2.1 第一阶段:核心框架搭建(2周)
-
基础框架搭建
- 多模块项目结构
- 统一异常处理
- 统一响应格式
- 基础CRUD封装
-
核心功能实现
- 用户认证授权
- 基础权限管理
- 系统配置管理
2.2 第二阶段:工作流引擎核心(4周)
-
工作流引擎实现
- 工作流模型设计
- 工作流执行引擎
- 节点抽象和实现
-
插件系统实现
- 插件加载机制
- 插件生命周期管理
- 插件配置管理
2.3 第三阶段:节点开发(4周)
-
审批节点实现
- 表单节点
- 审批节点
- 会签节点
-
CI/CD节点实现
- Git节点
- Maven节点
- Docker节点
- K8S节点
2.4 第四阶段:前端实现(4周)
-
工作流设计器
- 流程设计器
- 节点配置面板
- 表单设计器
-
运行监控
- 实例监控
- 日志查看
- 状态管理
三、详细设计
3.1 数据库设计
3.1.1 工作流定义相关表
-- 工作流定义表
CREATE TABLE workflow_definition (
id BIGINT PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
description TEXT,
version INT NOT NULL,
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
create_by VARCHAR(50) NOT NULL,
update_by VARCHAR(50) NOT NULL
);
-- 节点定义表
CREATE TABLE node_definition (
id BIGINT PRIMARY KEY,
workflow_id BIGINT NOT NULL,
node_type VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL,
description TEXT,
parameters TEXT,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id)
);
-- 节点连接表
CREATE TABLE node_connection (
id BIGINT PRIMARY KEY,
workflow_id BIGINT NOT NULL,
source_node_id BIGINT NOT NULL,
target_node_id BIGINT NOT NULL,
condition_expr TEXT,
create_time TIMESTAMP NOT NULL,
FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id),
FOREIGN KEY (source_node_id) REFERENCES node_definition(id),
FOREIGN KEY (target_node_id) REFERENCES node_definition(id)
);
3.1.2 工作流实例相关表
-- 工作流实例表
CREATE TABLE workflow_instance (
id BIGINT PRIMARY KEY,
workflow_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
variables TEXT,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (workflow_id) REFERENCES workflow_definition(id)
);
-- 节点实例表
CREATE TABLE node_instance (
id BIGINT PRIMARY KEY,
workflow_instance_id BIGINT NOT NULL,
node_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
inputs TEXT,
outputs TEXT,
error_message TEXT,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id),
FOREIGN KEY (node_id) REFERENCES node_definition(id)
);
-- 节点日志表
CREATE TABLE node_log (
id BIGINT PRIMARY KEY,
node_instance_id BIGINT NOT NULL,
level VARCHAR(10) NOT NULL,
content TEXT NOT NULL,
create_time TIMESTAMP NOT NULL,
FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
);
3.1.3 表单相关表
-- 表单定义表
CREATE TABLE form_definition (
id BIGINT PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
description TEXT,
schema TEXT NOT NULL,
ui_schema TEXT,
validation_rules TEXT,
workflow_definition_id BIGINT,
version INT NOT NULL,
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
create_by VARCHAR(50) NOT NULL,
update_by VARCHAR(50) NOT NULL
);
-- 表单实例表
CREATE TABLE form_instance (
id BIGINT PRIMARY KEY,
form_definition_id BIGINT NOT NULL,
form_data TEXT NOT NULL,
workflow_instance_id BIGINT,
submitter_id BIGINT NOT NULL,
submit_time TIMESTAMP NOT NULL,
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (form_definition_id) REFERENCES form_definition(id)
);
3.1.4 审批相关表
-- 审批记录表
CREATE TABLE approval_record (
id BIGINT PRIMARY KEY,
workflow_instance_id BIGINT NOT NULL,
node_instance_id BIGINT NOT NULL,
approver_id BIGINT NOT NULL,
approval_time TIMESTAMP NOT NULL,
result VARCHAR(20) NOT NULL,
comment TEXT,
attachments TEXT,
create_time TIMESTAMP NOT NULL
);
-- 审批配置表
CREATE TABLE approval_config (
id BIGINT PRIMARY KEY,
node_definition_id BIGINT NOT NULL,
approval_type VARCHAR(20) NOT NULL,
approver_ids TEXT,
role_ids TEXT,
department_ids TEXT,
approval_mode VARCHAR(20) NOT NULL,
timeout_hours INT,
reminder_enabled BOOLEAN DEFAULT FALSE,
reminder_interval INT,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (node_definition_id) REFERENCES node_definition(id)
);
3.1.5 插件相关表
-- 插件信息表
CREATE TABLE plugin_info (
id BIGINT PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
description TEXT,
version VARCHAR(20) NOT NULL,
status VARCHAR(20) NOT NULL,
jar_path VARCHAR(200),
config_schema TEXT,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL
);
-- 插件配置表
CREATE TABLE plugin_config (
id BIGINT PRIMARY KEY,
plugin_id BIGINT NOT NULL,
config TEXT NOT NULL,
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
FOREIGN KEY (plugin_id) REFERENCES plugin_info(id)
);
3.2 核心接口设计
3.2.1 工作流引擎接口
/**
* 工作流引擎接口
*/
public interface WorkflowEngine {
/**
* 启动工作流
*/
WorkflowInstance startWorkflow(String workflowId, Map<String, Object> params);
/**
* 暂停工作流
*/
void pauseWorkflow(String instanceId);
/**
* 恢复工作流
*/
void resumeWorkflow(String instanceId);
/**
* 终止工作流
*/
void terminateWorkflow(String instanceId, String reason);
/**
* 获取工作流状态
*/
WorkflowStatus getWorkflowStatus(String instanceId);
}
3.2.2 节点执行器接口
/**
* 节点执行器接口
*/
public interface NodeExecutor {
/**
* 执行节点
*/
NodeExecuteResult execute(NodeContext context);
/**
* 取消执行
*/
void cancel(NodeContext context);
/**
* 获取执行状态
*/
NodeStatus getStatus(NodeContext context);
}
3.2.3 插件接口
/**
* 插件接口
*/
public interface WorkflowPlugin {
/**
* 获取插件信息
*/
PluginInfo getPluginInfo();
/**
* 初始化插件
*/
void init(PluginContext context);
/**
* 获取节点执行器
*/
NodeExecutor getNodeExecutor();
}
3.3 前端组件设计
3.3.1 工作流设计器
// 工作流设计器组件
const WorkflowDesigner: React.FC = () => {
const [nodes, setNodes] = useState<INodeDesigner[]>([]);
const [connections, setConnections] = useState<IConnection[]>([]);
return (
<div className="workflow-designer">
<Toolbar />
<NodePalette />
<Canvas
nodes={nodes}
connections={connections}
onDrop={handleDrop}
onConnect={handleConnect}
/>
<PropertiesPanel />
</div>
);
};
3.3.2 节点配置面板
// 节点配置面板组件
const NodeConfigPanel: React.FC<NodeConfigPanelProps> = ({ node, onChange }) => {
return (
<div className="node-config-panel">
<Form layout="vertical">
<Form.Item label="节点名称" required>
<Input
value={node.name}
onChange={e => onChange({ ...node, name: e.target.value })}
/>
</Form.Item>
<Form.Item label="节点描述">
<Input.TextArea
value={node.description}
onChange={e => onChange({ ...node, description: e.target.value })}
/>
</Form.Item>
<NodeTypeConfig
type={node.type}
config={node.config}
onChange={config => onChange({ ...node, config })}
/>
</Form>
</div>
);
};
3.3.3 表单设计器
// 表单设计器组件
const FormDesigner: React.FC = () => {
const [formSchema, setFormSchema] = useState<FormSchema>({});
const [uiSchema, setUiSchema] = useState<UiSchema>({});
return (
<div className="form-designer">
<div className="toolbar">
<ComponentPalette />
<ActionButtons />
</div>
<div className="design-area">
<FormCanvas
schema={formSchema}
uiSchema={uiSchema}
onChange={(schema, ui) => {
setFormSchema(schema);
setUiSchema(ui);
}}
/>
</div>
<div className="properties-panel">
<SchemaEditor schema={formSchema} />
<ValidationRules schema={formSchema} />
</div>
</div>
);
};
四、部署架构
4.1 系统部署架构
[负载均衡器 Nginx]
↓
[API网关 Spring Cloud Gateway]
↓
+----------------+----------------+----------------+
↓ ↓ ↓ ↓
[工作流服务] [表单服务] [审批服务] [节点服务]
↓ ↓ ↓ ↓
[消息队列 RabbitMQ]
↓
[数据库 PostgreSQL]
↓
[缓存服务器 Redis]
4.2 高可用方案
-
服务高可用
- 服务多实例部署
- 服务注册与发现
- 负载均衡
-
数据高可用
- 数据库主从复制
- 数据定期备份
- 数据异地容灾
-
消息高可用
- 消息队列集群
- 消息持久化
- 消息重试机制
五、安全方案
5.1 认证授权
-
用户认证
- JWT Token认证
- OAuth2.0集成
- 单点登录支持
-
权限控制
- RBAC权限模型
- 数据权限控制
- API权限控制
5.2 数据安全
-
传输安全
- HTTPS加密
- 数据加密传输
- 防重放攻击
-
存储安全
- 敏感数据加密
- 数据脱敏
- 数据备份
六、监控运维
6.1 系统监控
-
性能监控
- JVM监控
- 数据库监控
- 接口性能监控
-
业务监控
- 工作流执行监控
- 节点执行监控
- 审批流程监控
6.2 日志管理
-
日志收集
- 系统日志
- 业务日志
- 审计日志
-
日志分析
- ELK日志分析
- 日志告警
- 日志存档
七、后续规划
7.1 功能扩展
- 更多节点类型支持
- 工作流模板市场
- 移动端支持
- 多语言支持
7.2 性能优化
- 大规模工作流优化
- 复杂表单性能优化
- 查询性能优化
7.3 运维能力
- 自动化部署
- 容器化管理
- 监控告警
- 智能运维
八、数据源管理
8.1 数据源设计
-- 数据源定义表
CREATE TABLE datasource_definition (
id BIGINT PRIMARY KEY,
code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
description TEXT,
type VARCHAR(20) NOT NULL, -- 数据源类型:MYSQL/POSTGRESQL/ORACLE/MONGODB等
config TEXT NOT NULL, -- 加密存储的连接配置
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
update_time TIMESTAMP NOT NULL,
create_by VARCHAR(50) NOT NULL,
update_by VARCHAR(50) NOT NULL
);
-- 数据源授权表
CREATE TABLE datasource_authorization (
id BIGINT PRIMARY KEY,
datasource_id BIGINT NOT NULL,
node_definition_id BIGINT NOT NULL,
permission_type VARCHAR(20) NOT NULL, -- READ/WRITE/ALL
valid_from TIMESTAMP NOT NULL,
valid_until TIMESTAMP,
approved_by VARCHAR(50) NOT NULL,
approved_time TIMESTAMP NOT NULL,
status VARCHAR(20) NOT NULL,
create_time TIMESTAMP NOT NULL,
FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id),
FOREIGN KEY (node_definition_id) REFERENCES node_definition(id)
);
-- 数据源操作审计表
CREATE TABLE datasource_audit_log (
id BIGINT PRIMARY KEY,
datasource_id BIGINT NOT NULL,
node_instance_id BIGINT NOT NULL,
operation_type VARCHAR(20) NOT NULL, -- SELECT/INSERT/UPDATE/DELETE
sql_statement TEXT,
affected_rows INT,
execution_time BIGINT, -- 毫秒
status VARCHAR(20) NOT NULL,
error_message TEXT,
create_time TIMESTAMP NOT NULL,
FOREIGN KEY (datasource_id) REFERENCES datasource_definition(id),
FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
);
8.2 数据源管理核心接口
/**
* 数据源管理接口
*/
public interface DataSourceManager {
/**
* 获取节点可用的数据源列表
*/
List<DataSourceDefinition> getAvailableDataSources(String nodeDefinitionId);
/**
* 验证节点是否有数据源的操作权限
*/
boolean validatePermission(String nodeInstanceId, String datasourceId, OperationType operationType);
/**
* 获取数据源连接
*/
Connection getConnection(String nodeInstanceId, String datasourceId);
/**
* 记录数据源操作审计
*/
void auditOperation(DataSourceAuditLog auditLog);
}
/**
* 数据源操作包装器
*/
public class DataSourceWrapper implements AutoCloseable {
private final Connection connection;
private final DataSourceAuditLogger auditLogger;
public ResultSet executeQuery(String sql, Object... params) {
try {
long startTime = System.currentTimeMillis();
ResultSet rs = executeQueryInternal(sql, params);
auditLogger.logQuery(sql, System.currentTimeMillis() - startTime);
return rs;
} catch (SQLException e) {
auditLogger.logError(sql, e);
throw new DataSourceException("Query execution failed", e);
}
}
public int executeUpdate(String sql, Object... params) {
try {
long startTime = System.currentTimeMillis();
int affected = executeUpdateInternal(sql, params);
auditLogger.logUpdate(sql, affected, System.currentTimeMillis() - startTime);
return affected;
} catch (SQLException e) {
auditLogger.logError(sql, e);
throw new DataSourceException("Update execution failed", e);
}
}
}
九、插件实现示例
9.1 Git操作插件
/**
* Git插件实现
*/
@Plugin(
id = "git-plugin",
name = "Git Plugin",
version = "1.0.0",
description = "Git operations plugin"
)
public class GitPlugin implements WorkflowPlugin {
private PluginContext context;
@Override
public void init(PluginContext context) {
this.context = context;
}
@Override
public NodeExecutor getNodeExecutor() {
return new GitNodeExecutor();
}
/**
* Git节点执行器
*/
public class GitNodeExecutor implements NodeExecutor {
@Override
public NodeExecuteResult execute(NodeContext context) {
try {
// 获取参数
String repoUrl = context.getInput("repoUrl");
String branch = context.getInput("branch");
String targetPath = context.getInput("targetPath");
// 执行Git克隆
context.getLogger().info("开始克隆代码: {}", repoUrl);
Git.cloneRepository()
.setURI(repoUrl)
.setBranch(branch)
.setDirectory(new File(targetPath))
.call();
// 设置输出
Map<String, Object> outputs = new HashMap<>();
outputs.put("clonePath", targetPath);
return NodeExecuteResult.success(outputs);
} catch (Exception e) {
return NodeExecuteResult.failure(e.getMessage());
}
}
@Override
public void cancel(NodeContext context) {
// 实现取消逻辑
}
@Override
public NodeStatus getStatus(NodeContext context) {
// 实现状态查询逻辑
return NodeStatus.RUNNING;
}
}
}
9.2 Maven构建插件
/**
* Maven插件实现
*/
@Plugin(
id = "maven-plugin",
name = "Maven Plugin",
version = "1.0.0",
description = "Maven build plugin"
)
public class MavenPlugin implements WorkflowPlugin {
private PluginContext context;
@Override
public void init(PluginContext context) {
this.context = context;
}
@Override
public NodeExecutor getNodeExecutor() {
return new MavenNodeExecutor();
}
/**
* Maven节点执行器
*/
public class MavenNodeExecutor implements NodeExecutor {
@Override
public NodeExecuteResult execute(NodeContext context) {
try {
String projectPath = context.getInput("projectPath");
String goals = context.getInput("goals", "clean package");
String profile = context.getInput("profile");
context.getLogger().info("开始Maven构建: {}", projectPath);
// 创建Maven请求
InvocationRequest request = new DefaultInvocationRequest();
request.setPomFile(new File(projectPath + "/pom.xml"));
request.setGoals(Arrays.asList(goals.split(" ")));
if (StringUtils.isNotBlank(profile)) {
request.setProfiles(Collections.singletonList(profile));
}
// 执行Maven构建
Invoker invoker = new DefaultInvoker();
InvocationResult result = invoker.execute(request);
if (result.getExitCode() != 0) {
return NodeExecuteResult.failure("Maven构建失败");
}
// 设置输出
Map<String, Object> outputs = new HashMap<>();
outputs.put("buildResult", result.getExitCode());
outputs.put("targetDir", projectPath + "/target");
return NodeExecuteResult.success(outputs);
} catch (Exception e) {
return NodeExecuteResult.failure(e.getMessage());
}
}
}
}
9.3 Docker构建插件
/**
* Docker插件实现
*/
@Plugin(
id = "docker-plugin",
name = "Docker Plugin",
version = "1.0.0",
description = "Docker operations plugin"
)
public class DockerPlugin implements WorkflowPlugin {
private DockerClient dockerClient;
@Override
public void init(PluginContext context) {
// 初始化Docker客户端
this.dockerClient = DockerClientBuilder.getInstance()
.withDockerHost(context.getConfig("dockerHost"))
.withRegistryUsername(context.getConfig("registryUsername"))
.withRegistryPassword(context.getConfig("registryPassword"))
.build();
}
@Override
public NodeExecutor getNodeExecutor() {
return new DockerNodeExecutor(dockerClient);
}
/**
* Docker节点执行器
*/
public class DockerNodeExecutor implements NodeExecutor {
private final DockerClient dockerClient;
public DockerNodeExecutor(DockerClient dockerClient) {
this.dockerClient = dockerClient;
}
@Override
public NodeExecuteResult execute(NodeContext context) {
try {
String dockerfile = context.getInput("dockerfile");
String imageName = context.getInput("imageName");
String imageTag = context.getInput("imageTag");
context.getLogger().info("开始构建Docker镜像: {}:{}", imageName, imageTag);
// 构建镜像
String imageId = dockerClient.buildImageCmd()
.withDockerfile(new File(dockerfile))
.withTag(imageName + ":" + imageTag)
.start()
.awaitImageId();
// 推送镜像
dockerClient.pushImageCmd(imageName)
.withTag(imageTag)
.start()
.awaitCompletion();
// 设置输出
Map<String, Object> outputs = new HashMap<>();
outputs.put("imageId", imageId);
outputs.put("imageName", imageName);
outputs.put("imageTag", imageTag);
return NodeExecuteResult.success(outputs);
} catch (Exception e) {
return NodeExecuteResult.failure(e.getMessage());
}
}
}
}
9.4 Kubernetes部署插件
/**
* Kubernetes插件实现
*/
@Plugin(
id = "kubernetes-plugin",
name = "Kubernetes Plugin",
version = "1.0.0",
description = "Kubernetes deployment plugin"
)
public class KubernetesPlugin implements WorkflowPlugin {
private KubernetesClient k8sClient;
@Override
public void init(PluginContext context) {
// 初始化K8s客户端
Config config = new ConfigBuilder()
.withMasterUrl(context.getConfig("masterUrl"))
.withOauthToken(context.getConfig("token"))
.build();
this.k8sClient = new DefaultKubernetesClient(config);
}
@Override
public NodeExecutor getNodeExecutor() {
return new KubernetesNodeExecutor(k8sClient);
}
/**
* Kubernetes节点执行器
*/
public class KubernetesNodeExecutor implements NodeExecutor {
private final KubernetesClient k8sClient;
public KubernetesNodeExecutor(KubernetesClient k8sClient) {
this.k8sClient = k8sClient;
}
@Override
public NodeExecuteResult execute(NodeContext context) {
try {
String namespace = context.getInput("namespace");
String deploymentFile = context.getInput("deploymentFile");
context.getLogger().info("开始部署到Kubernetes: {}", namespace);
// 读取部署文件
InputStream deploymentYaml = new FileInputStream(deploymentFile);
// 创建或更新部署
k8sClient.load(deploymentYaml)
.inNamespace(namespace)
.createOrReplace();
// 等待部署完成
k8sClient.apps().deployments()
.inNamespace(namespace)
.withName(getDeploymentName(deploymentFile))
.waitUntilReady(5, TimeUnit.MINUTES);
// 设置输出
Map<String, Object> outputs = new HashMap<>();
outputs.put("namespace", namespace);
outputs.put("status", "deployed");
return NodeExecuteResult.success(outputs);
} catch (Exception e) {
return NodeExecuteResult.failure(e.getMessage());
}
}
private String getDeploymentName(String deploymentFile) {
// 从YAML文件中解析部署名称
return "deployment-name";
}
}
}
9.5 插件配置示例
# Git插件配置
plugin.id=git-plugin
plugin.name=Git Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.git.GitPlugin
plugin.description=Git operations plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0
# Maven插件配置
plugin.id=maven-plugin
plugin.name=Maven Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.maven.MavenPlugin
plugin.description=Maven build plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0
# Docker插件配置
plugin.id=docker-plugin
plugin.name=Docker Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.docker.DockerPlugin
plugin.description=Docker operations plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0
# Kubernetes插件配置
plugin.id=kubernetes-plugin
plugin.name=Kubernetes Plugin
plugin.version=1.0.0
plugin.main=com.example.plugin.kubernetes.KubernetesPlugin
plugin.description=Kubernetes deployment plugin
plugin.author=Your Name
plugin.requires=core>=1.0.0
9.6 插件使用示例
// 获取插件实例
GitPlugin gitPlugin = pluginManager.getPlugin("git-plugin");
NodeExecutor gitExecutor = gitPlugin.getNodeExecutor();
// 准备节点上下文
NodeContext context = NodeContext.builder()
.input("repoUrl", "https://github.com/example/repo.git")
.input("branch", "main")
.input("targetPath", "/workspace/code")
.build();
// 执行节点
NodeExecuteResult result = gitExecutor.execute(context);
if (result.isSuccess()) {
String clonePath = result.getOutput("clonePath");
// 处理成功逻辑
} else {
String errorMessage = result.getErrorMessage();
// 处理失败逻辑
}
这些插件实现示例展示了如何:
- 实现基本的插件接口
- 处理插件配置和初始化
- 实现节点执行逻辑
- 处理执行结果和错误
- 提供插件使用示例
您可以基于这些示例快速开发新的插件。需要注意的是:
- 每个插件都应该有清晰的职责
- 做好错误处理和日志记录
- 提供完整的配置说明
- 注意资源的释放和清理
- 考虑并发和性能问题
您觉得这些插件示例是否满足需求?我们可以根据您的<EFBFBD><EFBFBD>体需求进行调整或添加更多示例。
十、节点执行日志功能
10.1 日志模型设计
-- 节点执行日志表(分区表)
CREATE TABLE node_execution_log (
id BIGINT PRIMARY KEY,
workflow_instance_id BIGINT NOT NULL,
node_instance_id BIGINT NOT NULL,
log_type VARCHAR(20) NOT NULL, -- INFO/ERROR/COMMAND/OUTPUT
content TEXT NOT NULL,
log_time TIMESTAMP NOT NULL,
sequence_no BIGINT NOT NULL, -- 日志序号,保证顺序
create_time TIMESTAMP NOT NULL,
INDEX idx_node_seq (node_instance_id, sequence_no) -- 优化查询性能
) PARTITION BY RANGE (create_time);
-- 创建最近一个月的分区
CREATE TABLE node_execution_log_current PARTITION OF node_execution_log
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
10.2 日志查询接口
@RestController
@RequestMapping("/api/v1/workflow/node")
@Tag(name = "节点日志", description = "节点执行日志相关接口")
public class NodeLogController {
@Resource
private NodeLogService nodeLogService;
/**
* 分页查询日志
*/
@GetMapping("/{nodeInstanceId}/logs")
@Operation(summary = "分页查询节点日志")
public PageResponse<LogDTO> queryLogs(
@PathVariable Long nodeInstanceId,
@RequestParam(required = false) Long lastSequenceNo,
@RequestParam(defaultValue = "100") Integer pageSize) {
return nodeLogService.queryLogs(nodeInstanceId, lastSequenceNo, pageSize);
}
/**
* 导出日志
*/
@GetMapping("/{nodeInstanceId}/logs/export")
@Operation(summary = "导出节点日志")
public void exportLogs(
@PathVariable Long nodeInstanceId,
HttpServletResponse response) {
nodeLogService.exportLogs(nodeInstanceId, response);
}
}
@Service
@Slf4j
public class NodeLogService {
@Resource
private NodeLogRepository nodeLogRepository;
/**
* 分页查询日志
* @param nodeInstanceId 节点实例ID
* @param lastSequenceNo 上次查询的最后序号(首次查询传null)
* @param pageSize 每页大小
*/
public PageResponse<LogDTO> queryLogs(Long nodeInstanceId, Long lastSequenceNo, Integer pageSize) {
// 首次查询
if (lastSequenceNo == null) {
return nodeLogRepository.findFirstPage(nodeInstanceId, pageSize);
}
// 增量查询
return nodeLogRepository.findIncrementalLogs(nodeInstanceId, lastSequenceNo, pageSize);
}
/**
* 写入日志
*/
@Async
public void writeLog(NodeLogEvent event) {
try {
NodeLog log = NodeLog.builder()
.nodeInstanceId(event.getNodeInstanceId())
.logType(event.getType())
.content(event.getContent())
.logTime(LocalDateTime.now())
.sequenceNo(generateSequenceNo())
.build();
nodeLogRepository.save(log);
} catch (Exception e) {
log.error("Failed to write log", e);
}
}
/**
* 导出日志
*/
public void exportLogs(Long nodeInstanceId, HttpServletResponse response) {
response.setContentType("text/plain");
response.setHeader("Content-Disposition",
"attachment; filename=node-" + nodeInstanceId + "-logs.txt");
try (PrintWriter writer = response.getWriter()) {
nodeLogRepository.streamAllLogs(nodeInstanceId, log -> {
writer.println(String.format("[%s] [%s] %s",
formatTime(log.getLogTime()),
log.getLogType(),
log.getContent()));
});
} catch (IOException e) {
log.error("Failed to export logs", e);
throw new BusinessException("导出日志失败");
}
}
}
10.3 日志记录器
/**
* 节点日志记录器
*/
public class NodeLogger {
private final Long nodeInstanceId;
private final NodeLogService logService;
public void info(String message) {
writeLog(LogType.INFO, message);
}
public void error(String message) {
writeLog(LogType.ERROR, message);
}
public void command(String command) {
writeLog(LogType.COMMAND, command);
}
public void output(String output) {
writeLog(LogType.OUTPUT, output);
}
private void writeLog(LogType type, String content) {
NodeLogEvent event = NodeLogEvent.builder()
.nodeInstanceId(nodeInstanceId)
.type(type)
.content(content)
.build();
logService.writeLog(event);
}
}
10.4 前端日志组件
interface LogDTO {
id: number;
logType: 'INFO' | 'ERROR' | 'COMMAND' | 'OUTPUT';
content: string;
logTime: string;
sequenceNo: number;
}
const NodeLogViewer: React.FC<{ nodeInstanceId: string }> = ({ nodeInstanceId }) => {
const [logs, setLogs] = useState<LogDTO[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string>();
const [lastSequenceNo, setLastSequenceNo] = useState<number>();
const logEndRef = useRef<HTMLDivElement>(null);
// 首次加载日志
useEffect(() => {
loadLogs();
}, [nodeInstanceId]);
// 定时增量查询
useEffect(() => {
const timer = setInterval(() => {
if (lastSequenceNo) {
loadIncrementalLogs();
}
}, 2000); // 每2秒查询一次
return () => clearInterval(timer);
}, [lastSequenceNo]);
// 加载首页日志
const loadLogs = async () => {
try {
setLoading(true);
const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`);
setLogs(res.data.records);
if (res.data.records.length > 0) {
setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo);
}
} catch (e) {
setError('加载日志失败');
} finally {
setLoading(false);
}
};
// 增量加载日志
const loadIncrementalLogs = async () => {
try {
const res = await axios.get(`/api/v1/workflow/node/${nodeInstanceId}/logs`, {
params: { lastSequenceNo }
});
if (res.data.records.length > 0) {
setLogs(logs => [...logs, ...res.data.records]);
setLastSequenceNo(res.data.records[res.data.records.length - 1].sequenceNo);
// 滚动到最新
logEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}
} catch (e) {
console.error('增量加载日志失败', e);
}
};
// 导出日志
const handleExport = () => {
window.open(`/api/v1/workflow/node/${nodeInstanceId}/logs/export`);
};
return (
<div className="node-log-viewer">
<div className="log-header">
<div className="status">
{loading && <Spin size="small" />}
</div>
<div className="actions">
<Button onClick={() => setLogs([])}>清空日志</Button>
<Button onClick={handleExport}>导出日志</Button>
</div>
</div>
<div className="log-content">
{logs.map((log, index) => (
<div key={index} className={`log-line log-${log.logType.toLowerCase()}`}>
<span className="log-time">{formatTime(log.logTime)}</span>
<span className="log-level">{log.logType}</span>
<span className="log-content">{log.content}</span>
</div>
))}
<div ref={logEndRef} />
</div>
{error && (
<Alert type="error" message={error} />
)}
</div>
);
};
10.5 性能优化措施
-
数据库优化
- 使用分区表按时间分区
- 建立复合索引(node_instance_id, sequence_no)
- 定期归档历史日志
- 使用序列号进行增量查询
-
查询优化
- 分页限制大小
- 增量查询而不是全量
- 适当的轮询间隔(2秒)
- 异步写入日志
-
前端优化
- 虚拟滚动显示大量日志
- 本地缓存已加载的日志
- 按需加载历史日志
- 智能调整轮询间隔
-
系统优化
- 日志异步写入
- 批量写入优化
- 定期清理过期日志
- 监控慢查询
这个基于HTTP轮询的方案相比WebSocket有以下优势:
-
更简单可靠
- 无需维护长连接
- 服务端实现更简单
- 更容易做负载均衡
- 出错重试更容易
-
更好的性能
- 服务器资源占用更少
- 更容易控制并发
- 更好的伸缩性
- 便于性能优化
-
更好的兼容性
- 支持更多的客户端
- 穿透防火墙更容易
- 不依赖特殊协议
- 更容易调试
您觉得这个基于HTTP轮询的方案如何?我们可以根据实际需求调整轮询间隔和其他参<EFBFBD><EFBFBD>。
十一、工作流编排界面实现
[之前提供的完整代码内容]
十二、数据库表设计
12.1 工作流定义表
CREATE TABLE workflow_definition (
id BIGINT PRIMARY KEY, -- 主键
name VARCHAR(100) NOT NULL, -- 工作流名称
code VARCHAR(50) NOT NULL, -- 工作流编码
description TEXT, -- 描述
status VARCHAR(20) NOT NULL, -- 状态:DRAFT/PUBLISHED/DISABLED
version INT NOT NULL DEFAULT 1, -- 版本号
content TEXT NOT NULL, -- 工作流定义内容(JSON)
create_time DATETIME NOT NULL, -- 创建时间
create_by VARCHAR(50) NOT NULL, -- 创建人
update_time DATETIME NOT NULL, -- 更新时间
update_by VARCHAR(50) NOT NULL, -- 更新人
deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除
UNIQUE KEY uk_code_version (code, version)
);
### 12.2 工作流实例表
```sql
CREATE TABLE workflow_instance (
id BIGINT PRIMARY KEY, -- 主键
definition_id BIGINT NOT NULL, -- 工作流定义ID
name VARCHAR(100) NOT NULL, -- 实例名称
status VARCHAR(20) NOT NULL, -- 状态:RUNNING/COMPLETED/FAILED/CANCELED
start_time DATETIME, -- 开始时间
end_time DATETIME, -- 结束时间
variables TEXT, -- 工作流变量(JSON)
create_time DATETIME NOT NULL, -- 创建时间
create_by VARCHAR(50) NOT NULL, -- 创建人
update_time DATETIME NOT NULL, -- 更新时间
update_by VARCHAR(50) NOT NULL, -- 更新人
version INT NOT NULL DEFAULT 1, -- 版本号
deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除
FOREIGN KEY (definition_id) REFERENCES workflow_definition(id)
);
### 12.3 节点实例表
```sql
CREATE TABLE node_instance (
id BIGINT PRIMARY KEY, -- 主键
workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID
node_id VARCHAR(50) NOT NULL, -- 节点ID
node_name VARCHAR(100) NOT NULL, -- 节点名称
node_type VARCHAR(50) NOT NULL, -- 节点类型
status VARCHAR(20) NOT NULL, -- 状态:PENDING/RUNNING/COMPLETED/FAILED/CANCELED
start_time DATETIME, -- 开始时间
end_time DATETIME, -- 结束时间
error TEXT, -- 错误信息
result TEXT, -- 执行结果(JSON)
create_time DATETIME NOT NULL, -- 创建时间
create_by VARCHAR(50) NOT NULL, -- 创建人
update_time DATETIME NOT NULL, -- 更新时间
update_by VARCHAR(50) NOT NULL, -- 更新人
version INT NOT NULL DEFAULT 1, -- 版本号
deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id)
);
### 12.4 节点执行日志表
```sql
CREATE TABLE node_execution_log (
id BIGINT PRIMARY KEY, -- 主键
workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID
node_instance_id BIGINT NOT NULL, -- 节点实例ID
log_type VARCHAR(20) NOT NULL, -- 日志类型:INFO/ERROR/COMMAND/OUTPUT
content TEXT NOT NULL, -- 日志内容
log_time DATETIME NOT NULL, -- 日志时间
sequence_no BIGINT NOT NULL, -- 日志序号
create_time DATETIME NOT NULL, -- 创建时间
create_by VARCHAR(50) NOT NULL, -- 创建人
update_time DATETIME NOT NULL, -- 更新时间
update_by VARCHAR(50) NOT NULL, -- 更新人
version INT NOT NULL DEFAULT 1, -- 版本号
deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id),
FOREIGN KEY (node_instance_id) REFERENCES node_instance(id)
) PARTITION BY RANGE (UNIX_TIMESTAMP(create_time));
-- 创建最近一个月的分区
CREATE TABLE node_execution_log_current
PARTITION OF node_execution_log
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
### 12.5 工作流变量表
```sql
CREATE TABLE workflow_variable (
id BIGINT PRIMARY KEY, -- 主键
workflow_instance_id BIGINT NOT NULL, -- 工作流实例ID
name VARCHAR(100) NOT NULL, -- 变量名
value TEXT, -- 变量值
type VARCHAR(50) NOT NULL, -- 变量类型
scope VARCHAR(20) NOT NULL, -- 作用域:GLOBAL/NODE
node_id VARCHAR(50), -- 节点ID(作用域为NODE时必填)
create_time DATETIME NOT NULL, -- 创建时间
create_by VARCHAR(50) NOT NULL, -- 创建人
update_time DATETIME NOT NULL, -- 更新时间
update_by VARCHAR(50) NOT NULL, -- 更新人
version INT NOT NULL DEFAULT 1, -- 版本号
deleted BOOLEAN NOT NULL DEFAULT FALSE, -- 是否删除
FOREIGN KEY (workflow_instance_id) REFERENCES workflow_instance(id)
);
主要更新内容:
-
统一基础字段
- id:BIGINT PRIMARY KEY
- create_time:DATETIME NOT NULL
- create_by:VARCHAR(50) NOT NULL
- update_time:DATETIME NOT NULL
- update_by:VARCHAR(50) NOT NULL
- version:INT NOT NULL DEFAULT 1
- deleted:BOOLEAN NOT NULL DEFAULT FALSE
-
表关系
- 使用外键关联
- 添加必要的索引
- 分区表支持
-
字段规范
- 统一命名风格
- 添加字段注释
- 明确字段类型和约束
-
数据完整性
- NOT NULL 约束
- 唯一索引
- 默认值
您觉得这个数据库设计是否更符合规范?我们可以根据具体需求进行调整。