增加ssh链接框架

This commit is contained in:
dengqichen 2025-12-06 17:01:15 +08:00
parent d4eb907536
commit 00628d2811
41 changed files with 2730 additions and 843 deletions

View File

@ -1,6 +1,7 @@
package com.qqchen.deploy.backend.deploy.api; package com.qqchen.deploy.backend.deploy.api;
import com.qqchen.deploy.backend.deploy.dto.ServerDTO; import com.qqchen.deploy.backend.deploy.dto.ServerDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInfoDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO; import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO;
import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.entity.Server;
import com.qqchen.deploy.backend.deploy.query.ServerQuery; import com.qqchen.deploy.backend.deploy.query.ServerQuery;
@ -84,13 +85,13 @@ public class ServerApiController
return Response.success(result); return Response.success(result);
} }
@Operation(summary = "测试SSH连接", description = "测试服务器SSH连接是否正常") @Operation(summary = "测试SSH连接并获取服务器信息", description = "测试服务器SSH连接并自动采集硬件信息")
@PostMapping("/{id}/test-connection") @PostMapping("/{id}/test-connection")
public Response<Boolean> testConnection( public Response<ServerInfoDTO> testConnection(
@Parameter(description = "服务器ID", required = true) @PathVariable Long id @Parameter(description = "服务器ID", required = true) @PathVariable Long id
) { ) {
boolean success = serverService.testConnection(id); ServerInfoDTO info = serverService.testConnection(id);
return Response.success(success); return Response.success(info);
} }
@Override @Override

View File

@ -88,41 +88,8 @@ public class ThreadPoolConfig {
return executor; return executor;
} }
/** // ========== 注意 ==========
* SSH输出监听线程池 - 使用虚拟线程Java 21+ // sshOutputExecutor 已迁移到 Framework
* // : framework.ssh.websocket.SSHWebSocketConfig
* 为什么使用虚拟线程 // SSH WebSocket 框架自己管理线程池业务层无需关心
* 1. SSH输出监听是典型的**阻塞I/O密集型**任务
* 2. 每个SSH连接需要2个长期阻塞的线程stdout + stderr
* 3. 虚拟线程几乎无资源开销支持数百万并发
* 4. 完美适配大量SSH长连接场景
*
* 📊 性能对比
* - 平台线程50个SSH连接 = 100个线程 100-200MB内存
* - 虚拟线程50个SSH连接 = 100个虚拟线程 几MB内存
*
* 💡 方案选择
* - 方案1当前SimpleAsyncTaskExecutor - Spring集成支持优雅关闭可定制线程名
* - 方案2Executors.newVirtualThreadPerTaskExecutor() - 原生API最简洁性能略优
*/
@Bean("sshOutputExecutor")
public org.springframework.core.task.SimpleAsyncTaskExecutor sshOutputExecutor() {
// 方案1Spring封装的虚拟线程Executor推荐
// 优点与Spring集成支持优雅关闭线程名可定制便于调试
org.springframework.core.task.SimpleAsyncTaskExecutor executor =
new org.springframework.core.task.SimpleAsyncTaskExecutor("ssh-virtual-");
// 关键启用虚拟线程Java 21+
executor.setVirtualThreads(true);
// 并发限制-1表示无限制虚拟线程资源消耗极低
executor.setConcurrencyLimit(-1);
return executor;
// 方案2原生虚拟线程Executor可选
// 如果需要纯Java实现无Spring依赖可以使用
// return Executors.newVirtualThreadPerTaskExecutor();
// 注意需要手动管理生命周期线程名为 VirtualThread-#1
}
} }

View File

@ -1,83 +0,0 @@
package com.qqchen.deploy.backend.deploy.dto;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.qqchen.deploy.backend.deploy.enums.SSHMessageTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.SSHStatusEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* SSH WebSocket消息DTO
*
* 消息格式规范
* 1. 状态消息{"type":"status","status":"connecting"}
* 2. SSH输出{"type":"output","data":"..."}
* 3. 错误消息{"type":"error","message":"..."}
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL) // 只序列化非null字段
public class SSHMessage {
/**
* 消息类型
*/
private SSHMessageTypeEnum type;
/**
* 消息数据input/output时使用
*/
private String data;
/**
* 错误消息error时使用
*/
private String message;
/**
* 连接状态仅type=STATUS时使用
*/
private SSHStatusEnum status;
/**
* 创建input类型消息
*/
public static SSHMessage input(String data) {
SSHMessage msg = new SSHMessage();
msg.setType(SSHMessageTypeEnum.INPUT);
msg.setData(data);
return msg;
}
/**
* 创建output类型消息
*/
public static SSHMessage output(String data) {
SSHMessage msg = new SSHMessage();
msg.setType(SSHMessageTypeEnum.OUTPUT);
msg.setData(data);
return msg;
}
/**
* 创建error类型消息
*/
public static SSHMessage error(String message) {
SSHMessage msg = new SSHMessage();
msg.setType(SSHMessageTypeEnum.ERROR);
msg.setMessage(message);
return msg;
}
/**
* 创建status类型消息
*/
public static SSHMessage status(SSHStatusEnum status) {
SSHMessage msg = new SSHMessage();
msg.setType(SSHMessageTypeEnum.STATUS);
msg.setStatus(status);
return msg;
}
}

View File

@ -1,7 +1,7 @@
package com.qqchen.deploy.backend.deploy.dto; package com.qqchen.deploy.backend.deploy.dto;
import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum;
import com.qqchen.deploy.backend.framework.dto.BaseDTO; import com.qqchen.deploy.backend.framework.dto.BaseDTO;
import lombok.Data; import lombok.Data;

View File

@ -0,0 +1,62 @@
package com.qqchen.deploy.backend.deploy.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 服务器信息DTO
* 用于测试连接接口的返回
*/
@Data
public class ServerInfoDTO {
// ===== 连接状态 =====
/**
* 连接是否成功
*/
private Boolean connected;
/**
* 错误信息连接失败时
*/
private String errorMessage;
/**
* 连接时间
*/
private LocalDateTime connectTime;
/**
* 响应时间毫秒
*/
private Long responseTime;
// ===== 基础硬件信息 =====
/**
* 主机名
*/
private String hostname;
/**
* 操作系统版本
*/
private String osVersion;
/**
* CPU核心数
*/
private Integer cpuCores;
/**
* 内存大小(GB)
*/
private Integer memorySize;
/**
* 磁盘大小(GB)
*/
private Integer diskSize;
}

View File

@ -27,10 +27,10 @@ public class JenkinsBuild extends Entity<Long> {
@Column(name = "duration", nullable = false) @Column(name = "duration", nullable = false)
private Long duration; private Long duration;
@Column(name = "startTime", nullable = false) @Column(name = "start_time", nullable = false)
private LocalDateTime starttime; private LocalDateTime starttime;
@Column(name = "actions", columnDefinition = "TEXT") @Column(name = "actions", columnDefinition = "MEDIUMTEXT")
private String actions; private String actions;
@Column(name = "external_system_id", nullable = false) @Column(name = "external_system_id", nullable = false)

View File

@ -1,7 +1,7 @@
package com.qqchen.deploy.backend.deploy.entity; package com.qqchen.deploy.backend.deploy.entity;
import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum;
import com.qqchen.deploy.backend.framework.domain.Entity; import com.qqchen.deploy.backend.framework.domain.Entity;
import jakarta.persistence.*; import jakarta.persistence.*;

View File

@ -1,19 +0,0 @@
package com.qqchen.deploy.backend.deploy.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* SSH认证方式枚举
*/
@Getter
@AllArgsConstructor
public enum AuthTypeEnum {
PASSWORD("密码认证", "使用用户名和密码进行SSH认证"),
KEY("密钥认证", "使用SSH私钥进行认证");
private final String name;
private final String description;
}

View File

@ -1,22 +0,0 @@
package com.qqchen.deploy.backend.deploy.enums;
import com.fasterxml.jackson.annotation.JsonValue;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* SSH WebSocket消息类型枚举
*/
@Getter
@AllArgsConstructor
public enum SSHMessageTypeEnum {
INPUT("input", "用户输入"),
OUTPUT("output", "SSH输出"),
ERROR("error", "错误消息"),
STATUS("status", "连接状态");
@JsonValue // JSON序列化时使用此字段的值
private final String value;
private final String description;
}

View File

@ -1,21 +0,0 @@
package com.qqchen.deploy.backend.deploy.enums;
import com.fasterxml.jackson.annotation.JsonValue;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* SSH连接状态枚举
*/
@Getter
@AllArgsConstructor
public enum SSHStatusEnum {
CONNECTING("connecting", "连接中"),
CONNECTED("connected", "已连接"),
DISCONNECTED("disconnected", "已断开");
@JsonValue // JSON序列化时使用此字段的值
private final String value;
private final String description;
}

View File

@ -0,0 +1,182 @@
package com.qqchen.deploy.backend.deploy.handler;
import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
/**
* Docker Container SSH WebSocket处理器业务层
* 继承Framework的抽象Handler只需实现5个方法
*
* 示例实现展示如何为Docker容器提供SSH终端能力
*/
@Slf4j
@Component
public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
// TODO: 注入Docker相关的Service
// @Resource
// private IDockerContainerService containerService;
// @Resource
// private IDockerAuditLogService dockerAuditLogService;
private static final int MAX_SESSIONS_PER_CONTAINER = 3;
/**
* 构造函数注入Framework依赖
*/
public ContainerSSHWebSocketHandler(
SSHCommandServiceFactory sshCommandServiceFactory,
SSHSessionManager sessionManager) {
super(sshCommandServiceFactory, sessionManager);
}
// ========== 实现Framework要求的5个抽象方法 ==========
/**
* 1. 获取SSH连接目标业务逻辑
*/
@Override
protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception {
// 从URL提取containerId
Long containerId = extractContainerId(session);
if (containerId == null) {
throw new IllegalArgumentException("无效的Container ID");
}
// TODO: 从数据库获取Container信息
// DockerContainer container = containerService.findById(containerId);
// if (container == null) {
// throw new IllegalArgumentException("容器不存在: " + containerId);
// }
// 构建SSH目标
SSHTarget target = new SSHTarget();
target.setTargetType(SSHTargetType.CONTAINER);
// TODO: 从Docker API获取容器的SSH信息
// Docker容器通常通过宿主机SSH + docker exec
// target.setHost(container.getHostIp());
// target.setPort(container.getSshPort());
// target.setUsername(container.getUsername());
// target.setAuthType(container.getAuthType());
// target.setPassword(container.getPassword());
// target.setOsType(container.getOsType());
// 示例配置
target.setHost("docker-host-ip");
target.setPort(22);
target.setUsername("docker-user");
target.setMetadata(containerId); // 保存containerId供后续使用
return target;
}
/**
* 2. 权限验证业务逻辑
*/
@Override
protected boolean checkPermission(Long userId, SSHTarget target) {
// TODO: 实现Docker容器权限验证
// Long containerId = (Long) target.getMetadata();
// return dockerPermissionService.hasPermission(userId, containerId);
log.warn("Docker容器权限验证尚未实现默认允许所有用户访问");
return true;
}
/**
* 3. 获取最大并发连接数
*/
@Override
protected int getMaxSessions() {
return MAX_SESSIONS_PER_CONTAINER;
}
/**
* 4. 获取WebSocket路径模式
*/
@Override
protected String getPathPattern() {
return "/api/v1/docker-container-ssh/connect/*";
}
/**
* 5. 事件钩子审计日志等业务逻辑
*/
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
Long containerId = (Long) target.getMetadata();
switch (event) {
case AFTER_CONNECT:
// 创建Docker审计日志
try {
Long userId = (Long) data.get("userId");
log.info("Docker容器连接审计: sessionId={}, containerId={}, userId={}",
sessionId, containerId, userId);
// TODO: dockerAuditLogService.createAuditLog(...)
} catch (Exception e) {
log.error("创建Docker审计日志失败: sessionId={}", sessionId, e);
}
break;
case AFTER_DISCONNECT:
// 关闭Docker审计日志
log.info("Docker容器断开连接: sessionId={}, containerId={}", sessionId, containerId);
// TODO: dockerAuditLogService.closeAuditLog(...)
break;
case ON_COMMAND:
// 记录命令可选
String command = (String) data.get("command");
log.debug("Docker容器命令: sessionId={}, command={}", sessionId, command);
// TODO: dockerAuditLogService.recordCommand(...)
break;
case ON_ERROR:
// 记录错误
String error = (String) data.get("error");
log.error("Docker容器连接错误: sessionId={}, error={}", sessionId, error);
// TODO: dockerAuditLogService.closeAuditLog(sessionId, "FAILED", error);
break;
case BEFORE_SHUTDOWN:
// 优雅下线
String reason = (String) data.get("reason");
log.info("Docker容器优雅下线: sessionId={}, reason={}", sessionId, reason);
// TODO: dockerAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
break;
}
}
// ========== 辅助方法 ==========
/**
* 从WebSocket session URL中提取containerId
*/
private Long extractContainerId(WebSocketSession session) {
try {
String path = session.getUri().getPath();
// /api/v1/docker-container-ssh/connect/{containerId}
String[] parts = path.split("/");
if (parts.length > 0) {
return Long.parseLong(parts[parts.length - 1]);
}
} catch (Exception e) {
log.error("提取containerId失败", e);
}
return null;
}
}

View File

@ -0,0 +1,216 @@
package com.qqchen.deploy.backend.deploy.handler;
import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
/**
* K8S Pod SSH WebSocket处理器业务层
* 继承Framework的抽象Handler只需实现5个方法
*
* 示例实现展示如何为K8S Pod提供SSH终端能力
*/
@Slf4j
@Component
public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
// TODO: 注入K8S相关的Service
// @Resource
// private IK8sPodService podService;
// @Resource
// private IK8sAuditLogService k8sAuditLogService;
private static final int MAX_SESSIONS_PER_POD = 3;
/**
* 构造函数注入Framework依赖
*/
public PodSSHWebSocketHandler(
SSHCommandServiceFactory sshCommandServiceFactory,
SSHSessionManager sessionManager) {
super(sshCommandServiceFactory, sessionManager);
}
// ========== 实现Framework要求的5个抽象方法 ==========
/**
* 1. 获取SSH连接目标业务逻辑
*/
@Override
protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception {
// 从URL提取podId
Long podId = extractPodId(session);
if (podId == null) {
throw new IllegalArgumentException("无效的Pod ID");
}
// TODO: 从数据库获取Pod信息
// K8sPod pod = podService.findById(podId);
// if (pod == null) {
// throw new IllegalArgumentException("Pod不存在: " + podId);
// }
// 构建SSH目标示例
SSHTarget target = new SSHTarget();
target.setTargetType(SSHTargetType.POD);
// TODO: 从K8S API获取Pod的SSH信息
// 注意K8S Pod可能需要通过Node的SSH + kubectl exec
// target.setHost(pod.getNodeIp());
// target.setPort(pod.getSshPort());
// target.setUsername(pod.getUsername());
// target.setAuthType(pod.getAuthType());
// target.setPassword(pod.getPassword());
// target.setOsType(pod.getOsType());
// 示例配置
target.setHost("k8s-node-ip");
target.setPort(22);
target.setUsername("k8s-user");
target.setMetadata(podId); // 保存podId供后续使用
return target;
}
/**
* 2. 权限验证业务逻辑
*/
@Override
protected boolean checkPermission(Long userId, SSHTarget target) {
// TODO: 实现K8S RBAC权限验证
// Long podId = (Long) target.getMetadata();
// return k8sRbacService.hasPermission(userId, "pod", "exec", podId);
log.warn("K8S Pod权限验证尚未实现默认允许所有用户访问");
return true;
}
/**
* 3. 获取最大并发连接数
*/
@Override
protected int getMaxSessions() {
return MAX_SESSIONS_PER_POD;
}
/**
* 4. 获取WebSocket路径模式
*/
@Override
protected String getPathPattern() {
return "/api/v1/k8s-pod-ssh/connect/*";
}
/**
* 5. 事件钩子审计日志等业务逻辑
*/
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
Long podId = (Long) target.getMetadata();
switch (event) {
case AFTER_CONNECT:
// 创建K8S审计日志
try {
Long userId = (Long) data.get("userId");
String clientIp = (String) data.get("clientIp");
String userAgent = (String) data.get("userAgent");
// TODO: 调用K8S审计服务
// Long auditLogId = k8sAuditLogService.createAuditLog(
// userId, podId, sessionId, clientIp, userAgent);
// data.put("auditLogId", auditLogId);
log.info("K8S Pod连接审计日志已创建: sessionId={}, podId={}", sessionId, podId);
} catch (Exception e) {
log.error("创建K8S审计日志失败: sessionId={}", sessionId, e);
}
break;
case AFTER_DISCONNECT:
// 关闭K8S审计日志
try {
// TODO: k8sAuditLogService.closeAuditLog(sessionId, "SUCCESS", "正常断开");
log.info("K8S Pod审计日志已关闭: sessionId={}", sessionId);
} catch (Exception e) {
log.error("关闭K8S审计日志失败: sessionId={}", sessionId, e);
}
break;
case ON_COMMAND:
// 记录命令到K8S审计日志
try {
String command = (String) data.get("command");
if (command != null && command.length() > 0) {
// TODO: k8sAuditLogService.recordCommand(sessionId, command);
}
} catch (Exception e) {
log.error("记录K8S命令失败: sessionId={}", sessionId, e);
}
break;
case ON_ERROR:
// 记录错误
try {
String error = (String) data.get("error");
// TODO: k8sAuditLogService.closeAuditLog(sessionId, "FAILED", error);
log.error("K8S Pod连接错误: sessionId={}, error={}", sessionId, error);
} catch (Exception e) {
log.error("记录K8S错误失败: sessionId={}", sessionId, e);
}
break;
case BEFORE_SHUTDOWN:
// 优雅下线前处理审计日志
try {
String reason = (String) data.get("reason");
// TODO: k8sAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
log.info("K8S审计日志已更新(优雅下线): sessionId={}", sessionId);
} catch (Exception e) {
log.error("更新K8S审计日志失败(优雅下线): sessionId={}", sessionId, e);
}
break;
}
}
// ========== 辅助方法 ==========
/**
* 从WebSocket session URL中提取podId
*/
private Long extractPodId(WebSocketSession session) {
try {
String path = session.getUri().getPath();
// /api/v1/k8s-pod-ssh/connect/{podId}
String[] parts = path.split("/");
if (parts.length > 0) {
return Long.parseLong(parts[parts.length - 1]);
}
} catch (Exception e) {
log.error("提取podId失败", e);
}
return null;
}
/**
* 可选重写createConnection方法以支持kubectl exec
* K8S Pod可能不是标准SSH连接而是通过kubectl exec
*/
// @Override
// protected SSHClient createConnection(SSHTarget target) throws Exception {
// // 使用K8S API创建exec连接
// // 而不是标准SSH连接
// return k8sExecService.createExecConnection(target);
// }
}

View File

@ -1,46 +1,30 @@
package com.qqchen.deploy.backend.deploy.handler; package com.qqchen.deploy.backend.deploy.handler;
import com.qqchen.deploy.backend.deploy.dto.SSHMessage;
import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.entity.Server;
import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.SSHMessageTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.SSHStatusEnum;
import com.qqchen.deploy.backend.deploy.service.ISSHAuditLogService; import com.qqchen.deploy.backend.deploy.service.ISSHAuditLogService;
import com.qqchen.deploy.backend.deploy.service.IServerService; import com.qqchen.deploy.backend.deploy.service.IServerService;
import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.utils.JsonUtils; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.common.IOUtils;
import net.schmizz.sshj.connection.channel.direct.Session;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
import net.schmizz.sshj.userauth.password.PasswordUtils;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/** /**
* Server SSH WebSocket处理器 * Server SSH WebSocket处理器业务层
* 处理Web SSH终端的WebSocket连接和SSH交互 * 继承Framework的抽象Handler只需实现5个方法
*
* 注意asyncTaskExecutor 线程池由Framework自动注入业务层无需关心
*/ */
@Slf4j @Slf4j
@Component @Component
public class ServerSSHWebSocketHandler extends TextWebSocketHandler { public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
@Resource @Resource
private IServerService serverService; private IServerService serverService;
@ -48,427 +32,156 @@ public class ServerSSHWebSocketHandler extends TextWebSocketHandler {
@Resource @Resource
private ISSHAuditLogService auditLogService; private ISSHAuditLogService auditLogService;
@Resource(name = "sshOutputExecutor") private static final int MAX_SESSIONS_PER_SERVER = 5;
private AsyncTaskExecutor sshOutputExecutor;
/** /**
* 最大并发SSH会话数每个用户 * 构造函数注入业务依赖
*
* @param sshCommandServiceFactory SSH命令服务工厂
* @param sessionManager SSH会话管理器
*/ */
private static final int MAX_SESSIONS_PER_USER = 5; public ServerSSHWebSocketHandler(
SSHCommandServiceFactory sshCommandServiceFactory,
SSHSessionManager sessionManager) {
super(sshCommandServiceFactory, sessionManager);
}
// ========== 实现Framework要求的5个抽象方法 ==========
/** /**
* WebSocket会话存储sessionId -> WebSocketSession * 1. 获取SSH连接目标业务逻辑
*/
private final Map<String, WebSocketSession> webSocketSessions = new ConcurrentHashMap<>();
/**
* SSH会话存储sessionId -> SSHClient
*/
private final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
/**
* SSH会话通道存储sessionId -> Session.Shell
*/
private final Map<String, Session.Shell> sshShells = new ConcurrentHashMap<>();
/**
* 输出监听任务存储sessionId -> Future
*/
private final Map<String, Future<?>> outputTasks = new ConcurrentHashMap<>();
/**
* WebSocket连接建立时触发
*/ */
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception {
String sessionId = session.getId(); // 从URL提取serverId
log.info("WebSocket连接建立: sessionId={}", sessionId); Long serverId = extractServerId(session);
if (serverId == null) {
throw new IllegalArgumentException("无效的服务器ID");
}
try { // 获取服务器信息
// 1. 从attributes中获取用户信息由认证拦截器设置 Server server = serverService.findEntityById(serverId);
Long userId = (Long) session.getAttributes().get("userId"); if (server == null) {
String username = (String) session.getAttributes().get("username"); throw new IllegalArgumentException("服务器不存在: " + serverId);
String clientIp = (String) session.getAttributes().get("clientIp"); }
String userAgent = (String) session.getAttributes().get("userAgent");
if (userId == null) { // 构建SSH目标
log.error("无法获取用户信息: sessionId={}", sessionId); SSHTarget target = new SSHTarget();
sendError(session, "认证失败"); target.setTargetType(SSHTargetType.SERVER);
session.close(CloseStatus.POLICY_VIOLATION); target.setHost(server.getHostIp());
return; target.setPort(server.getSshPort());
} target.setUsername(server.getSshUser());
target.setAuthType(server.getAuthType());
target.setPassword(server.getSshPassword());
target.setPrivateKey(server.getSshPrivateKey());
target.setPassphrase(server.getSshPassphrase());
target.setOsType(server.getOsType());
target.setMetadata(serverId); // 保存serverId供后续使用
// 2. 从URL中提取serverId return target;
Long serverId = extractServerId(session); }
if (serverId == null) {
sendError(session, "无效的服务器ID");
session.close(CloseStatus.BAD_DATA);
return;
}
// 3. 获取服务器信息 /**
Server server = serverService.findEntityById(serverId); * 2. 权限验证业务逻辑
if (server == null) { */
sendError(session, "服务器不存在: " + serverId); @Override
session.close(CloseStatus.NOT_ACCEPTABLE); protected boolean checkPermission(Long userId, SSHTarget target) {
return; // TODO: 实现更细粒度的权限验证
} // 例如检查用户是否有访问该服务器的权限
return true; // 暂时允许所有
}
// 4. 检查用户对该服务器的SSH会话数 /**
long activeSessions = auditLogService.countUserActiveSessionsForServer(userId, serverId); * 3. 获取最大并发连接数
log.info("用户当前对该服务器的SSH连接数: userId={}, serverId={}, serverName={}, current={}, max={}", */
userId, serverId, server.getServerName(), activeSessions, MAX_SESSIONS_PER_USER); @Override
protected int getMaxSessions() {
return MAX_SESSIONS_PER_SERVER;
}
if (activeSessions >= MAX_SESSIONS_PER_USER) { /**
log.warn("用户对该服务器的SSH会话数超过限制: userId={}, serverId={}, serverName={}, current={}, max={}", * 4. 获取WebSocket路径模式
userId, serverId, server.getServerName(), activeSessions, MAX_SESSIONS_PER_USER); */
sendError(session, "对服务器【" + server.getServerName() + "】的SSH连接数超过限制最多" + MAX_SESSIONS_PER_USER + "个)"); @Override
session.close(CloseStatus.POLICY_VIOLATION); protected String getPathPattern() {
return; return "/api/v1/server-ssh/connect/*";
} }
// 5. 权限校验预留实际项目中需要实现 /**
// TODO: 根据业务需求实现权限校验逻辑 * 5. 事件钩子审计日志等业务逻辑
// 例如检查用户是否是管理员或者服务器是否允许该用户访问 */
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
Long serverId = (Long) target.getMetadata();
// 6. 发送连接中状态 switch (event) {
sendStatus(session, SSHStatusEnum.CONNECTING); case AFTER_CONNECT:
// 创建审计日志
// 7. 建立SSH连接
SSHClient sshClient = createSSHConnection(server);
sshClients.put(sessionId, sshClient);
// 8. 打开Shell通道并分配PTY伪终端
Session sshSession = sshClient.startSession();
// 关键分配PTY启用交互式Shell回显提示符
// 参数终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式
sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap());
log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId);
Session.Shell shell = sshSession.startShell();
log.debug("Shell已启动: sessionId={}", sessionId);
// 保存会话信息
webSocketSessions.put(sessionId, session);
sshShells.put(sessionId, shell);
// 9. 优化先启动输出监听线程确保不错过任何SSH输出
Future<?> stdoutTask = sshOutputExecutor.submit(() -> readSSHOutput(session, shell));
outputTasks.put(sessionId, stdoutTask);
// 同时启动错误流监听某些SSH服务器会将输出发送到错误流
Future<?> stderrTask = sshOutputExecutor.submit(() -> readSSHError(session, shell));
outputTasks.put(sessionId + "_stderr", stderrTask);
// 10. 发送连接成功状态
sendStatus(session, SSHStatusEnum.CONNECTED);
log.info("SSH连接建立成功: sessionId={}, userId={}, username={}, server={}@{}",
sessionId, userId, username, server.getSshUser(), server.getHostIp());
// 11. 异步创建审计日志不阻塞主线程
// 使用CompletableFuture异步执行避免数据库操作延迟影响SSH输出接收
CompletableFuture.runAsync(() -> {
try { try {
Long auditLogId = auditLogService.createAuditLog(userId, server, sessionId, clientIp, userAgent); Long userId = (Long) data.get("userId");
session.getAttributes().put("auditLogId", auditLogId); String clientIp = (String) data.get("clientIp");
log.info("SSH审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId); String userAgent = (String) data.get("userAgent");
} catch (Exception e) {
log.error("创建SSH审计日志失败: sessionId={}", sessionId, e);
}
});
} catch (Exception e) {
log.error("建立SSH连接失败: sessionId={}", sessionId, e);
sendError(session, "连接失败: " + e.getMessage());
// 记录失败的审计日志
try {
// 异步场景直接尝试创建审计日志有锁保护已存在则直接返回
// 无需检查 attributes因为异步任务可能还未完成
Long userId = (Long) session.getAttributes().get("userId");
String clientIp = (String) session.getAttributes().get("clientIp");
String userAgent = (String) session.getAttributes().get("userAgent");
Long serverId = extractServerId(session);
if (userId != null && serverId != null) {
Server server = serverService.findEntityById(serverId); Server server = serverService.findEntityById(serverId);
if (server != null) { if (server != null) {
// 先创建如果已存在则返回已有ID有锁保护不会重复 Long auditLogId = auditLogService.createAuditLog(
Long auditLogId = auditLogService.createAuditLog(userId, server, sessionId, clientIp, userAgent); userId, server, sessionId, clientIp, userAgent);
session.getAttributes().put("auditLogId", auditLogId); data.put("auditLogId", auditLogId);
// 再关闭 log.info("审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId);
auditLogService.closeAuditLog(sessionId, "FAILED", e.getMessage());
} }
} catch (Exception e) {
log.error("创建审计日志失败: sessionId={}", sessionId, e);
} }
} catch (Exception auditEx) { break;
log.error("记录失败审计日志异常", auditEx);
}
cleanupSession(sessionId); case AFTER_DISCONNECT:
session.close(CloseStatus.SERVER_ERROR); // 关闭审计日志
}
}
/**
* 接收前端消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String sessionId = session.getId();
try {
// 解析消息
SSHMessage sshMessage = JsonUtils.fromJson(message.getPayload(), SSHMessage.class);
if (sshMessage == null) {
log.warn("解析消息失败: sessionId={}", sessionId);
return;
}
if (sshMessage.getType() != SSHMessageTypeEnum.INPUT) {
log.warn("收到非input类型消息: sessionId={}, type={}", sessionId, sshMessage.getType());
return;
}
// 获取SSH Shell
Session.Shell shell = sshShells.get(sessionId);
if (shell == null) {
sendError(session, "SSH连接未建立");
return;
}
// 发送命令到SSH
String input = sshMessage.getData();
if (input != null) {
OutputStream outputStream = shell.getOutputStream();
outputStream.write(input.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
// 记录命令到审计日志只记录有意义的命令过滤掉单个字符的按键
if (input.length() > 0) {
auditLogService.recordCommand(sessionId, input);
}
log.debug("发送命令到SSH: sessionId={}, length={}", sessionId, input.length());
}
} catch (Exception e) {
log.error("处理WebSocket消息失败: sessionId={}", sessionId, e);
sendError(session, "命令执行失败: " + e.getMessage());
}
}
/**
* WebSocket连接关闭时触发
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String sessionId = session.getId();
log.info("WebSocket连接关闭: sessionId={}, status={}", sessionId, status);
// 关闭审计日志
try {
String auditStatus = status.getCode() == CloseStatus.NORMAL.getCode() ? "SUCCESS" : "INTERRUPTED";
auditLogService.closeAuditLog(sessionId, auditStatus, status.getReason());
} catch (Exception e) {
log.error("关闭审计日志失败: sessionId={}", sessionId, e);
}
// 清理资源
cleanupSession(sessionId);
}
/**
* WebSocket传输错误时触发
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String sessionId = session.getId();
log.error("WebSocket传输错误: sessionId={}", sessionId, exception);
// 记录错误到审计日志
try {
auditLogService.closeAuditLog(sessionId, "FAILED", "传输错误: " + exception.getMessage());
} catch (Exception e) {
log.error("关闭审计日志失败: sessionId={}", sessionId, e);
}
sendError(session, "传输错误: " + exception.getMessage());
cleanupSession(sessionId);
session.close(CloseStatus.SERVER_ERROR);
}
/**
* 创建SSH连接
*/
private SSHClient createSSHConnection(Server server) throws IOException {
SSHClient sshClient = new SSHClient();
// 跳过主机密钥验证生产环境建议使用正式的验证方式
sshClient.addHostKeyVerifier(new PromiscuousVerifier());
// 设置超时
sshClient.setTimeout(30000);
sshClient.setConnectTimeout(30000);
// 连接服务器
sshClient.connect(server.getHostIp(), server.getSshPort());
// 认证
if (server.getAuthType() == AuthTypeEnum.PASSWORD) {
// 密码认证
sshClient.authPassword(server.getSshUser(), server.getSshPassword());
} else if (server.getAuthType() == AuthTypeEnum.KEY) {
// 密钥认证
KeyProvider keyProvider;
if (server.getSshPassphrase() != null && !server.getSshPassphrase().isEmpty()) {
keyProvider = sshClient.loadKeys(server.getSshPrivateKey(), null,
PasswordUtils.createOneOff(server.getSshPassphrase().toCharArray()));
} else {
keyProvider = sshClient.loadKeys(server.getSshPrivateKey(), null, null);
}
sshClient.authPublickey(server.getSshUser(), keyProvider);
} else {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"不支持的认证类型: " + server.getAuthType()});
}
return sshClient;
}
/**
* 读取SSH输出并发送到前端
*/
private void readSSHOutput(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
log.debug("开始监听SSH输出: sessionId={}", sessionId);
try {
InputStream inputStream = shell.getInputStream();
byte[] buffer = new byte[1024];
int len;
log.debug("SSH输出流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = inputStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
sendOutput(session, output);
log.debug("SSH输出已发送到前端: sessionId={}", sessionId);
}
log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen());
} catch (java.io.InterruptedIOException e) {
// 线程被中断正常的清理过程检查是否是WebSocket关闭导致的
if (!session.isOpen()) {
log.debug("SSH输出监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e);
// 只在session仍然打开时尝试发送错误消息
try { try {
sendError(session, "SSH连接被中断"); auditLogService.closeAuditLog(sessionId, "SUCCESS", "正常断开");
} catch (Exception ex) { log.info("审计日志已关闭: sessionId={}", sessionId);
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId); } catch (Exception e) {
log.error("关闭审计日志失败: sessionId={}", sessionId, e);
} }
} break;
} catch (IOException e) {
// 其他IO异常真正的错误 case ON_COMMAND:
if (session.isOpen()) { // 记录命令到审计日志
log.error("读取SSH输出失败: sessionId={}", sessionId, e);
try { try {
sendError(session, "读取SSH输出失败: " + e.getMessage()); String command = (String) data.get("command");
} catch (Exception ex) { if (command != null && command.length() > 0) {
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId); auditLogService.recordCommand(sessionId, command);
}
} catch (Exception e) {
log.error("记录命令失败: sessionId={}", sessionId, e);
} }
} else { break;
log.debug("读取SSH输出时发生IO异常但session已关闭正常: sessionId={}", sessionId);
} case ON_ERROR:
// 记录错误
try {
String error = (String) data.get("error");
auditLogService.closeAuditLog(sessionId, "FAILED", error);
} catch (Exception e) {
log.error("记录错误失败: sessionId={}", sessionId, e);
}
break;
case BEFORE_SHUTDOWN:
// 优雅下线前处理审计日志Framework统一调用
try {
String reason = (String) data.get("reason");
auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
log.info("审计日志已更新(优雅下线): sessionId={}", sessionId);
} catch (Exception e) {
log.error("更新审计日志失败(优雅下线): sessionId={}", sessionId, e);
}
break;
} }
} }
/** // ========== 辅助方法 ==========
* 读取SSH错误流并发送到前端
* 某些SSH服务器可能将输出发送到标准错误流
*/
private void readSSHError(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
log.debug("开始监听SSH错误流: sessionId={}", sessionId);
try {
InputStream errorStream = shell.getErrorStream();
byte[] buffer = new byte[1024];
int len;
log.debug("SSH错误流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = errorStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
sendOutput(session, output); // 错误流也作为output发送到前端
log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId);
}
log.debug("SSH错误流监听结束: sessionId={}", sessionId);
} catch (java.io.InterruptedIOException e) {
if (!session.isOpen()) {
log.debug("SSH错误流监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e);
}
} catch (IOException e) {
if (session.isOpen()) {
log.error("读取SSH错误流失败: sessionId={}", sessionId, e);
} else {
log.debug("读取SSH错误流时发生IO异常但session已关闭正常: sessionId={}", sessionId);
}
}
}
/**
* 清理会话资源
*/
private void cleanupSession(String sessionId) {
log.debug("清理会话资源: sessionId={}", sessionId);
// 移除WebSocketSession
webSocketSessions.remove(sessionId);
// 取消输出监听任务标准输出
Future<?> stdoutTask = outputTasks.remove(sessionId);
if (stdoutTask != null && !stdoutTask.isDone()) {
stdoutTask.cancel(true);
}
// 取消错误流监听任务
Future<?> stderrTask = outputTasks.remove(sessionId + "_stderr");
if (stderrTask != null && !stderrTask.isDone()) {
stderrTask.cancel(true);
}
// 关闭SSH Shell
Session.Shell shell = sshShells.remove(sessionId);
if (shell != null) {
try {
shell.close();
} catch (IOException e) {
log.warn("关闭SSH Shell失败: sessionId={}", sessionId, e);
}
}
// 关闭SSH连接
SSHClient sshClient = sshClients.remove(sessionId);
if (sshClient != null) {
try {
sshClient.disconnect();
} catch (IOException e) {
log.warn("关闭SSH连接失败: sessionId={}", sessionId, e);
}
}
}
/** /**
* 从WebSocket session URL中提取serverId * 从WebSocket session URL中提取serverId
@ -486,125 +199,4 @@ public class ServerSSHWebSocketHandler extends TextWebSocketHandler {
} }
return null; return null;
} }
/**
* 发送output类型消息到前端
*/
private void sendOutput(WebSocketSession session, String output) throws IOException {
if (!session.isOpen()) {
return; // session已关闭直接返回
}
SSHMessage message = SSHMessage.output(output);
String json = JsonUtils.toJson(message);
if (json != null) {
session.sendMessage(new TextMessage(json));
}
}
/**
* 发送error类型消息到前端
*/
private void sendError(WebSocketSession session, String errorMessage) throws IOException {
if (!session.isOpen()) {
return; // session已关闭直接返回
}
SSHMessage message = SSHMessage.error(errorMessage);
String json = JsonUtils.toJson(message);
if (json != null) {
session.sendMessage(new TextMessage(json));
}
}
/**
* 发送status类型消息到前端
*/
private void sendStatus(WebSocketSession session, SSHStatusEnum status) throws IOException {
if (!session.isOpen()) {
return; // session已关闭直接返回
}
SSHMessage message = SSHMessage.status(status);
String json = JsonUtils.toJson(message);
if (json != null) {
session.sendMessage(new TextMessage(json));
}
}
/**
* 优雅下线应用关闭时清理所有活跃的SSH会话
* 使用 @PreDestroy 注解确保在Spring容器销毁前执行
*/
@jakarta.annotation.PreDestroy
public void gracefulShutdown() {
log.warn("====== 应用准备关闭开始优雅下线所有SSH会话 ======");
log.warn("当前活跃SSH会话数: {}", webSocketSessions.size());
if (webSocketSessions.isEmpty()) {
log.info("没有活跃的SSH会话跳过优雅下线");
return;
}
// 记录开始时间
long startTime = System.currentTimeMillis();
int successCount = 0;
int failureCount = 0;
// 遍历所有活跃会话
for (Map.Entry<String, WebSocketSession> entry : webSocketSessions.entrySet()) {
String sessionId = entry.getKey();
WebSocketSession session = entry.getValue();
try {
log.info("关闭SSH会话: sessionId={}", sessionId);
// 1. 尝试向前端发送服务器下线通知
try {
if (session.isOpen()) {
sendError(session, "服务器正在重启,连接即将关闭");
// 给前端一点时间接收消息
Thread.sleep(100);
}
} catch (Exception e) {
log.debug("发送下线通知失败: sessionId={}", sessionId, e);
}
// 2. 更新审计日志最重要防止僵尸会话
try {
auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", "服务器优雅下线");
log.info("审计日志已更新: sessionId={}", sessionId);
} catch (Exception e) {
log.error("更新审计日志失败: sessionId={}", sessionId, e);
}
// 3. 清理资源
cleanupSession(sessionId);
// 4. 关闭WebSocket连接
try {
if (session.isOpen()) {
session.close(new CloseStatus(1001, "服务器正在重启"));
}
} catch (Exception e) {
log.debug("关闭WebSocket失败: sessionId={}", sessionId, e);
}
successCount++;
log.info("SSH会话关闭成功: sessionId={}", sessionId);
} catch (Exception e) {
failureCount++;
log.error("关闭SSH会话失败: sessionId={}", sessionId, e);
}
}
// 清空所有缓存
webSocketSessions.clear();
sshClients.clear();
sshShells.clear();
outputTasks.clear();
long duration = System.currentTimeMillis() - startTime;
log.warn("====== 优雅下线完成 ======");
log.warn("总会话数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
successCount + failureCount, successCount, failureCount, duration);
}
} }

View File

@ -531,9 +531,6 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration
); );
if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null) { if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null) {
// 打印原始响应用于调试
log.info("Jenkins build details raw response: {}", response.getBody());
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
JenkinsBuildResponse buildResponse = mapper.readValue(response.getBody(), JenkinsBuildResponse.class); JenkinsBuildResponse buildResponse = mapper.readValue(response.getBody(), JenkinsBuildResponse.class);

View File

@ -1,7 +1,7 @@
package com.qqchen.deploy.backend.deploy.query; package com.qqchen.deploy.backend.deploy.query;
import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum;
import com.qqchen.deploy.backend.framework.annotation.QueryField; import com.qqchen.deploy.backend.framework.annotation.QueryField;
import com.qqchen.deploy.backend.framework.enums.QueryType; import com.qqchen.deploy.backend.framework.enums.QueryType;

View File

@ -1,6 +1,7 @@
package com.qqchen.deploy.backend.deploy.service; package com.qqchen.deploy.backend.deploy.service;
import com.qqchen.deploy.backend.deploy.dto.ServerDTO; import com.qqchen.deploy.backend.deploy.dto.ServerDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInfoDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO; import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO;
import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.entity.Server;
import com.qqchen.deploy.backend.deploy.query.ServerQuery; import com.qqchen.deploy.backend.deploy.query.ServerQuery;
@ -21,11 +22,12 @@ public interface IServerService extends IBaseService<Server, ServerDTO, ServerQu
ServerDTO initializeServerInfo(Long serverId, ServerInitializeDTO dto); ServerDTO initializeServerInfo(Long serverId, ServerInitializeDTO dto);
/** /**
* 测试服务器SSH连接 * 测试服务器SSH连接并获取服务器信息
* 通过SSH自动采集服务器硬件信息
* *
* @param serverId 服务器ID * @param serverId 服务器ID
* @return 是否连接成功 * @return 服务器详细信息包含连接状态和硬件信息
*/ */
boolean testConnection(Long serverId); ServerInfoDTO testConnection(Long serverId);
} }

View File

@ -43,6 +43,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.orm.ObjectOptimisticLockingFailureException; import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
@ -112,7 +113,7 @@ public class JenkinsBuildServiceImpl extends BaseServiceImpl<JenkinsBuild, Jenki
private INotificationService notificationService; private INotificationService notificationService;
@Resource(name = "jenkinsTaskExecutor") @Resource(name = "jenkinsTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor; private AsyncTaskExecutor threadPoolTaskExecutor;
@Resource @Resource
private SyncLockManager syncLockManager; private SyncLockManager syncLockManager;

View File

@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.orm.ObjectOptimisticLockingFailureException; import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -64,7 +65,7 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
private IRepositorySyncHistoryService repositorySyncHistoryService; private IRepositorySyncHistoryService repositorySyncHistoryService;
@Resource(name = "repositoryBranchExecutor") @Resource(name = "repositoryBranchExecutor")
private ThreadPoolTaskExecutor executor; private AsyncTaskExecutor executor;
@Resource @Resource
private com.qqchen.deploy.backend.deploy.lock.SyncLockManager syncLockManager; private com.qqchen.deploy.backend.deploy.lock.SyncLockManager syncLockManager;

View File

@ -28,6 +28,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.orm.ObjectOptimisticLockingFailureException; import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import java.time.ZoneId; import java.time.ZoneId;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -71,7 +72,7 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
private RepositoryProjectConverter repositoryProjectConverter; private RepositoryProjectConverter repositoryProjectConverter;
@Resource(name = "repositoryProjectExecutor") @Resource(name = "repositoryProjectExecutor")
private ThreadPoolTaskExecutor executor; private AsyncTaskExecutor executor;
@Resource @Resource
private IRepositorySyncHistoryService repositorySyncHistoryService; private IRepositorySyncHistoryService repositorySyncHistoryService;

View File

@ -1,10 +1,11 @@
package com.qqchen.deploy.backend.deploy.service.impl; package com.qqchen.deploy.backend.deploy.service.impl;
import com.qqchen.deploy.backend.deploy.dto.ServerDTO; import com.qqchen.deploy.backend.deploy.dto.ServerDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInfoDTO;
import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO; import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO;
import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.entity.Server;
import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum;
import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.deploy.query.ServerQuery; import com.qqchen.deploy.backend.deploy.query.ServerQuery;
import com.qqchen.deploy.backend.deploy.repository.IServerRepository; import com.qqchen.deploy.backend.deploy.repository.IServerRepository;
import com.qqchen.deploy.backend.deploy.service.IServerService; import com.qqchen.deploy.backend.deploy.service.IServerService;
@ -12,17 +13,15 @@ import com.qqchen.deploy.backend.framework.annotation.ServiceType;
import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient; import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/** /**
@ -37,6 +36,9 @@ public class ServerServiceImpl
private final IServerRepository serverRepository; private final IServerRepository serverRepository;
@Resource
private SSHCommandServiceFactory sshCommandServiceFactory;
public ServerServiceImpl(IServerRepository serverRepository) { public ServerServiceImpl(IServerRepository serverRepository) {
this.serverRepository = serverRepository; this.serverRepository = serverRepository;
} }
@ -83,108 +85,126 @@ public class ServerServiceImpl
@Override @Override
@Transactional @Transactional
public boolean testConnection(Long serverId) { public ServerInfoDTO testConnection(Long serverId) {
long startTime = System.currentTimeMillis();
// 1. 查询服务器信息 // 1. 查询服务器信息
Server server = serverRepository.findById(serverId) Server server = serverRepository.findById(serverId)
.orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND)); .orElseThrow(() -> new BusinessException(ResponseCode.DATA_NOT_FOUND));
// 2. 验证必要字段 // 2. 验证必要字段
if (server.getHostIp() == null || server.getSshUser() == null) { if (server.getHostIp() == null || server.getSshUser() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"服务器IP和SSH用户名不能为空"}); throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"服务器IP和SSH用户名不能为空"});
} }
SSHClient ssh = new SSHClient(); if (server.getOsType() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"请先选择操作系统类型"});
}
ServerInfoDTO info = new ServerInfoDTO();
SSHClient sshClient = null;
ISSHCommandService sshService = null;
try { try {
// 3. 配置SSH客户端 // 3. 获取对应OS的SSH命令服务
// TODO: 安全改进生产环境应使用更安全的主机密钥验证方式 sshService = sshCommandServiceFactory.getService(server.getOsType());
// 当前使用 PromiscuousVerifier 跳过主机密钥验证不验证服务器身份存在中间人攻击风险 log.info("使用{}服务测试连接: {}@{}:{} [认证方式: {}]",
// 建议改为 server.getOsType(), server.getSshUser(), server.getHostIp(),
// 1. ssh.loadKnownHosts() - 使用 ~/.ssh/known_hosts 文件验证 server.getSshPort(), server.getAuthType());
// 2. ssh.addHostKeyVerifier(new ConsoleVerifyingHostKeyVerifier()) - 首次自动接受后续验证
// 3. 在数据库中存储服务器指纹首次连接时记录后续验证
ssh.addHostKeyVerifier(new PromiscuousVerifier());
ssh.setTimeout(10000); // 10秒超时
ssh.setConnectTimeout(10000);
// 4. 连接服务器 // 4. 根据认证类型创建SSH连接
int port = server.getSshPort() != null ? server.getSshPort() : 22; if (server.getAuthType() == null) {
log.info("尝试连接服务器: {}@{}:{}", server.getSshUser(), server.getHostIp(), port); throw new BusinessException(ResponseCode.INVALID_PARAM,
ssh.connect(server.getHostIp(), port); new Object[]{"请选择认证方式"});
// 5. 根据认证方式进行认证
if (server.getAuthType() == AuthTypeEnum.KEY) {
// 密钥认证
if (server.getSshPrivateKey() == null || server.getSshPrivateKey().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH私钥不能为空"});
}
KeyProvider keyProvider;
if (server.getSshPassphrase() != null && !server.getSshPassphrase().isEmpty()) {
// 私钥有密码保护
keyProvider = ssh.loadKeys(server.getSshPrivateKey(), null,
net.schmizz.sshj.userauth.password.PasswordUtils.createOneOff(server.getSshPassphrase().toCharArray()));
} else {
// 私钥无密码保护
keyProvider = ssh.loadKeys(server.getSshPrivateKey(), null, null);
}
ssh.authPublickey(server.getSshUser(), keyProvider);
log.info("使用密钥认证成功");
} else {
// 密码认证
if (server.getSshPassword() == null || server.getSshPassword().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH密码不能为空"});
}
ssh.authPassword(server.getSshUser(), server.getSshPassword());
log.info("使用密码认证成功");
} }
// 6. 测试执行简单命令 // 根据认证类型传递对应的参数
try (var session = ssh.startSession()) { String password = null;
session.allocateDefaultPTY(); String privateKey = null;
var cmd = session.exec("echo 'Connection test successful'"); String passphrase = null;
cmd.join(5, TimeUnit.SECONDS);
if (cmd.getExitStatus() == 0) { switch (server.getAuthType()) {
log.info("SSH连接测试成功: {}", server.getHostIp()); case PASSWORD:
// 密码认证只传密码
if (server.getSshPassword() == null || server.getSshPassword().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"SSH密码不能为空"});
}
password = server.getSshPassword();
break;
// 7. 更新服务器状态和最后连接时间 case KEY:
server.setStatus(ServerStatusEnum.ONLINE); // 密钥认证只传密钥和密码短语
server.setLastConnectTime(LocalDateTime.now()); if (server.getSshPrivateKey() == null || server.getSshPrivateKey().isEmpty()) {
serverRepository.save(server); throw new BusinessException(ResponseCode.INVALID_PARAM,
log.info("已更新服务器状态为ONLINE: serverId={}", serverId); new Object[]{"SSH私钥不能为空"});
}
privateKey = server.getSshPrivateKey();
passphrase = server.getSshPassphrase();
break;
return true; default:
} else { throw new BusinessException(ResponseCode.INVALID_PARAM,
log.warn("SSH连接测试失败命令执行异常: {}", server.getHostIp()); new Object[]{"不支持的认证类型: " + server.getAuthType()});
// 8. 更新服务器状态为离线
server.setStatus(ServerStatusEnum.OFFLINE);
serverRepository.save(server);
return false;
}
} }
} catch (IOException e) { sshClient = sshService.createConnection(
log.error("SSH连接测试失败: {} - {}", server.getHostIp(), e.getMessage()); server.getHostIp(),
server.getSshPort(),
server.getSshUser(),
password,
privateKey,
passphrase
);
// 9. 连接失败更新服务器状态为离线 // 5. 采集服务器信息
info.setConnected(true);
info.setHostname(sshService.getHostname(sshClient));
info.setOsVersion(sshService.getOsVersion(sshClient));
info.setCpuCores(sshService.getCpuCores(sshClient));
info.setMemorySize(sshService.getMemorySize(sshClient));
info.setDiskSize(sshService.getDiskSize(sshClient));
log.info("服务器信息采集成功: serverId={}, hostname={}, cpu={}核, mem={}GB, disk={}GB",
serverId, info.getHostname(), info.getCpuCores(), info.getMemorySize(), info.getDiskSize());
// 6. 更新服务器信息到数据库
server.setStatus(ServerStatusEnum.ONLINE);
server.setLastConnectTime(LocalDateTime.now());
server.setHostname(info.getHostname());
server.setOsVersion(info.getOsVersion());
server.setCpuCores(info.getCpuCores());
server.setMemorySize(info.getMemorySize());
server.setDiskSize(info.getDiskSize());
serverRepository.save(server);
log.info("服务器状态已更新为ONLINE: serverId={}", serverId);
} catch (Exception e) {
log.error("测试连接失败: serverId={}, error={}", serverId, e.getMessage(), e);
// 连接失败设置错误信息
info.setConnected(false);
info.setErrorMessage(e.getMessage());
// 更新服务器状态为离线
server.setStatus(ServerStatusEnum.OFFLINE); server.setStatus(ServerStatusEnum.OFFLINE);
serverRepository.save(server); serverRepository.save(server);
throw new BusinessException(ResponseCode.ERROR, new Object[]{"SSH连接失败: " + e.getMessage()});
} finally { } finally {
// 10. 关闭连接 // 7. 关闭SSH连接
try { if (sshService != null) {
if (ssh.isConnected()) { sshService.closeConnection(sshClient);
ssh.disconnect();
}
} catch (IOException e) {
log.error("关闭SSH连接失败", e);
} }
// 8. 设置响应时间
info.setResponseTime(System.currentTimeMillis() - startTime);
info.setConnectTime(LocalDateTime.now());
} }
return info;
} }
} }

View File

@ -0,0 +1,16 @@
package com.qqchen.deploy.backend.framework.enums;
/**
* SSH认证方式枚举Framework层
*/
public enum AuthTypeEnum {
/**
* 密码认证
*/
PASSWORD,
/**
* 密钥认证
*/
KEY
}

View File

@ -1,10 +1,10 @@
package com.qqchen.deploy.backend.deploy.enums; package com.qqchen.deploy.backend.framework.enums;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
/** /**
* 操作系统类型枚举 * 操作系统类型枚举Framework层
*/ */
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
@ -19,4 +19,3 @@ public enum OsTypeEnum {
private final String code; private final String code;
private final String description; private final String description;
} }

View File

@ -0,0 +1,74 @@
package com.qqchen.deploy.backend.framework.enums;
/**
* SSH断开连接原因枚举Framework层
*
* 用于标识SSH连接断开的具体原因便于审计和追踪
*/
public enum SSHDisconnectReason {
/**
* 正常关闭用户主动关闭
*/
NORMAL_CLOSE("NORMAL_CLOSE", "正常关闭"),
/**
* 服务器优雅下线
*/
SERVER_SHUTDOWN("SERVER_SHUTDOWN", "服务器关闭"),
/**
* 客户端断开连接
*/
CLIENT_DISCONNECT("CLIENT_DISCONNECT", "客户端断开"),
/**
* 网络错误
*/
NETWORK_ERROR("NETWORK_ERROR", "网络错误"),
/**
* 会话超时
*/
SESSION_TIMEOUT("SESSION_TIMEOUT", "会话超时"),
/**
* SSH连接失败
*/
CONNECTION_FAILED("CONNECTION_FAILED", "连接失败"),
/**
* 认证失败
*/
AUTH_FAILED("AUTH_FAILED", "认证失败"),
/**
* 并发连接数超限
*/
CONCURRENT_LIMIT("CONCURRENT_LIMIT", "并发限制"),
/**
* 传输错误
*/
TRANSPORT_ERROR("TRANSPORT_ERROR", "传输错误"),
/**
* 未知原因
*/
UNKNOWN("UNKNOWN", "未知原因");
private final String code;
private final String description;
SSHDisconnectReason(String code, String description) {
this.code = code;
this.description = description;
}
public String getCode() {
return code;
}
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,41 @@
package com.qqchen.deploy.backend.framework.enums;
/**
* SSH事件枚举Framework层
*/
public enum SSHEvent {
/**
* 连接前
*/
BEFORE_CONNECT,
/**
* 连接后
*/
AFTER_CONNECT,
/**
* 断开前
*/
BEFORE_DISCONNECT,
/**
* 断开后
*/
AFTER_DISCONNECT,
/**
* 命令输入
*/
ON_COMMAND,
/**
* 错误
*/
ON_ERROR,
/**
* 优雅下线前
*/
BEFORE_SHUTDOWN
}

View File

@ -0,0 +1,45 @@
package com.qqchen.deploy.backend.framework.enums;
import com.fasterxml.jackson.annotation.JsonValue;
/**
* SSH WebSocket消息类型枚举Framework层
*
* 用于标识不同类型的SSH WebSocket消息
*/
public enum SSHMessageType {
/**
* 用户输入前端 后端
*/
INPUT("input"),
/**
* 终端输出后端 前端
*/
OUTPUT("output"),
/**
* 连接状态后端 前端
*/
STATUS("status"),
/**
* 错误信息后端 前端
*/
ERROR("error");
private final String value;
SSHMessageType(String value) {
this.value = value;
}
/**
* 序列化时输出小写字符串
* 例如OUTPUT "output"
*/
@JsonValue
public String getValue() {
return value;
}
}

View File

@ -0,0 +1,31 @@
package com.qqchen.deploy.backend.framework.enums;
/**
* SSH连接状态枚举Framework层
*/
public enum SSHStatusEnum {
/**
* 连接中首次连接
*/
CONNECTING,
/**
* 已连接
*/
CONNECTED,
/**
* 重新连接中网络闪断手动重连
*/
RECONNECTING,
/**
* 已断开
*/
DISCONNECTED,
/**
* 错误
*/
ERROR
}

View File

@ -0,0 +1,26 @@
package com.qqchen.deploy.backend.framework.enums;
/**
* SSH连接目标类型枚举Framework层
*/
public enum SSHTargetType {
/**
* 服务器
*/
SERVER,
/**
* Kubernetes Pod
*/
POD,
/**
* Docker容器
*/
CONTAINER,
/**
* 其他自定义类型
*/
CUSTOM
}

View File

@ -0,0 +1,139 @@
package com.qqchen.deploy.backend.framework.ssh;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.Session;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
import net.schmizz.sshj.userauth.password.PasswordUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* SSH命令服务抽象基类Framework层
* 提供通用的SSH连接和命令执行能力
*
* 子类只需实现OS特定的命令逻辑
*/
@Slf4j
public abstract class AbstractSSHCommandService implements ISSHCommandService {
private static final int DEFAULT_TIMEOUT_MS = 10000;
private static final int COMMAND_TIMEOUT_SECONDS = 5;
@Override
public SSHClient createConnection(String host, Integer port, String username,
String password, String privateKey, String passphrase) throws Exception {
SSHClient sshClient = new SSHClient();
try {
// 配置SSH客户端
// TODO: 安全改进生产环境应使用更安全的主机密钥验证方式
sshClient.addHostKeyVerifier(new PromiscuousVerifier());
sshClient.setTimeout(DEFAULT_TIMEOUT_MS);
sshClient.setConnectTimeout(DEFAULT_TIMEOUT_MS);
// 连接服务器
int sshPort = port != null ? port : 22;
log.debug("连接SSH服务器: {}:{}", host, sshPort);
sshClient.connect(host, sshPort);
// 认证
if (privateKey != null && !privateKey.isEmpty()) {
// 密钥认证
KeyProvider keyProvider;
if (passphrase != null && !passphrase.isEmpty()) {
keyProvider = sshClient.loadKeys(privateKey, null,
PasswordUtils.createOneOff(passphrase.toCharArray()));
} else {
keyProvider = sshClient.loadKeys(privateKey, null, null);
}
sshClient.authPublickey(username, keyProvider);
log.debug("使用密钥认证成功: {}", username);
} else if (password != null && !password.isEmpty()) {
// 密码认证
sshClient.authPassword(username, password);
log.debug("使用密码认证成功: {}", username);
} else {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"密码和私钥不能同时为空"});
}
return sshClient;
} catch (Exception e) {
// 连接失败关闭客户端
closeConnection(sshClient);
log.error("SSH连接失败: {}@{}:{}", username, host, port, e);
throw e;
}
}
@Override
public String executeCommand(SSHClient sshClient, String command) throws Exception {
try (Session session = sshClient.startSession()) {
Session.Command cmd = session.exec(command);
cmd.join(COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (cmd.getExitStatus() == 0) {
String output = new String(cmd.getInputStream().readAllBytes()).trim();
log.debug("命令执行成功: {} -> {}", command, output);
return output;
} else {
String error = new String(cmd.getErrorStream().readAllBytes()).trim();
log.warn("命令执行失败: {} -> {}", command, error);
throw new BusinessException(ResponseCode.ERROR,
new Object[]{"命令执行失败: " + error});
}
} catch (IOException e) {
log.error("执行SSH命令异常: {}", command, e);
throw new BusinessException(ResponseCode.ERROR,
new Object[]{"执行SSH命令异常: " + e.getMessage()});
}
}
@Override
public void closeConnection(SSHClient sshClient) {
if (sshClient != null) {
try {
if (sshClient.isConnected()) {
sshClient.disconnect();
log.debug("SSH连接已关闭");
}
} catch (IOException e) {
log.error("关闭SSH连接失败", e);
}
}
}
/**
* 安全执行命令子类使用
* 出错时返回null而不是抛异常
*/
protected String safeExecute(SSHClient sshClient, String command) {
try {
return executeCommand(sshClient, command);
} catch (Exception e) {
log.warn("执行命令失败: {}", command, e);
return null;
}
}
/**
* 解析整数结果
*/
protected Integer parseInteger(String value) {
if (value == null || value.isEmpty()) {
return null;
}
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException e) {
log.warn("解析整数失败: {}", value);
return null;
}
}
}

View File

@ -0,0 +1,90 @@
package com.qqchen.deploy.backend.framework.ssh;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import net.schmizz.sshj.SSHClient;
/**
* SSH命令服务接口Framework层
* 封装了SSH连接和OS命令执行能力
*/
public interface ISSHCommandService {
/**
* 创建SSH连接
*
* @param host IP地址
* @param port SSH端口
* @param username 用户名
* @param password 密码密码认证
* @param privateKey 私钥密钥认证
* @param passphrase 私钥密码
* @return SSH客户端
* @throws Exception 连接失败时抛出
*/
SSHClient createConnection(String host, Integer port, String username,
String password, String privateKey, String passphrase) throws Exception;
/**
* 执行命令并返回结果
*
* @param sshClient SSH客户端
* @param command 要执行的命令
* @return 命令输出结果
* @throws Exception 执行失败时抛出
*/
String executeCommand(SSHClient sshClient, String command) throws Exception;
/**
* 关闭SSH连接
*
* @param sshClient SSH客户端
*/
void closeConnection(SSHClient sshClient);
/**
* 获取主机名
*
* @param sshClient SSH客户端
* @return 主机名
*/
String getHostname(SSHClient sshClient);
/**
* 获取操作系统版本
*
* @param sshClient SSH客户端
* @return 操作系统版本
*/
String getOsVersion(SSHClient sshClient);
/**
* 获取CPU核心数
*
* @param sshClient SSH客户端
* @return CPU核心数
*/
Integer getCpuCores(SSHClient sshClient);
/**
* 获取内存大小(GB)
*
* @param sshClient SSH客户端
* @return 内存大小
*/
Integer getMemorySize(SSHClient sshClient);
/**
* 获取磁盘大小(GB)
*
* @param sshClient SSH客户端
* @return 磁盘大小
*/
Integer getDiskSize(SSHClient sshClient);
/**
* 获取支持的操作系统类型
*
* @return OS类型枚举
*/
OsTypeEnum getSupportedOsType();
}

View File

@ -0,0 +1,78 @@
package com.qqchen.deploy.backend.framework.ssh;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SSH命令服务工厂Framework层
* 通过反射自动注册所有SSH命令服务实现
*/
@Slf4j
@Component
public class SSHCommandServiceFactory {
@Autowired(required = false)
private List<ISSHCommandService> sshCommandServices;
private final Map<OsTypeEnum, ISSHCommandService> serviceMap = new HashMap<>();
@PostConstruct
public void init() {
if (sshCommandServices == null || sshCommandServices.isEmpty()) {
log.warn("未发现任何SSH命令服务实现");
return;
}
for (ISSHCommandService service : sshCommandServices) {
OsTypeEnum osType = service.getSupportedOsType();
serviceMap.put(osType, service);
log.info("注册SSH命令服务: {} -> {}", osType, service.getClass().getSimpleName());
}
log.info("SSH命令服务工厂初始化完成共注册{}个服务", serviceMap.size());
}
/**
* 根据操作系统类型获取对应的SSH命令服务
*
* @param osType 操作系统类型
* @return SSH命令服务实现
* @throws BusinessException 不支持的OS类型时抛出
*/
public ISSHCommandService getService(OsTypeEnum osType) {
ISSHCommandService service = serviceMap.get(osType);
if (service == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"不支持的操作系统类型: " + osType});
}
return service;
}
/**
* 判断是否支持某个操作系统
*
* @param osType 操作系统类型
* @return 是否支持
*/
public boolean isSupported(OsTypeEnum osType) {
return serviceMap.containsKey(osType);
}
/**
* 获取所有支持的操作系统类型
*
* @return 支持的OS类型集合
*/
public java.util.Set<OsTypeEnum> getSupportedOsTypes() {
return serviceMap.keySet();
}
}

View File

@ -0,0 +1,54 @@
package com.qqchen.deploy.backend.framework.ssh.impl;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import org.springframework.stereotype.Service;
/**
* Linux SSH命令服务实现Framework层
*/
@Slf4j
@Service
public class LinuxSSHCommandServiceImpl extends AbstractSSHCommandService {
@Override
public String getHostname(SSHClient sshClient) {
return safeExecute(sshClient, "hostname");
}
@Override
public String getOsVersion(SSHClient sshClient) {
// 优先使用 /etc/os-releasefallback lsb_release
String command = "cat /etc/os-release 2>/dev/null | grep PRETTY_NAME | cut -d'\"' -f2 || " +
"lsb_release -d 2>/dev/null | cut -f2 || " +
"cat /etc/issue | head -1";
return safeExecute(sshClient, command);
}
@Override
public Integer getCpuCores(SSHClient sshClient) {
String result = safeExecute(sshClient, "nproc");
return parseInteger(result);
}
@Override
public Integer getMemorySize(SSHClient sshClient) {
// 使用 free -g 获取GB单位的内存
String result = safeExecute(sshClient, "free -g | grep Mem | awk '{print $2}'");
return parseInteger(result);
}
@Override
public Integer getDiskSize(SSHClient sshClient) {
// 获取根分区的磁盘大小GB
String result = safeExecute(sshClient, "df -BG / | tail -1 | awk '{print $2}' | sed 's/G//'");
return parseInteger(result);
}
@Override
public OsTypeEnum getSupportedOsType() {
return OsTypeEnum.LINUX;
}
}

View File

@ -0,0 +1,55 @@
package com.qqchen.deploy.backend.framework.ssh.impl;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import org.springframework.stereotype.Service;
/**
* MacOS SSH命令服务实现Framework层
*/
@Slf4j
@Service
public class MacOSSSHCommandServiceImpl extends AbstractSSHCommandService {
@Override
public String getHostname(SSHClient sshClient) {
return safeExecute(sshClient, "hostname");
}
@Override
public String getOsVersion(SSHClient sshClient) {
// 获取MacOS版本信息
String command = "sw_vers -productName && sw_vers -productVersion";
return safeExecute(sshClient, command);
}
@Override
public Integer getCpuCores(SSHClient sshClient) {
// 使用sysctl获取CPU核心数
String result = safeExecute(sshClient, "sysctl -n hw.ncpu");
return parseInteger(result);
}
@Override
public Integer getMemorySize(SSHClient sshClient) {
// 获取内存大小GB
String command = "sysctl -n hw.memsize | awk '{print int($1/1024/1024/1024)}'";
String result = safeExecute(sshClient, command);
return parseInteger(result);
}
@Override
public Integer getDiskSize(SSHClient sshClient) {
// 获取根分区磁盘大小GB
String command = "df -g / | tail -1 | awk '{print $2}'";
String result = safeExecute(sshClient, command);
return parseInteger(result);
}
@Override
public OsTypeEnum getSupportedOsType() {
return OsTypeEnum.MACOS;
}
}

View File

@ -0,0 +1,57 @@
package com.qqchen.deploy.backend.framework.ssh.impl;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import org.springframework.stereotype.Service;
/**
* Windows SSH命令服务实现Framework层
* 需要Windows服务器安装OpenSSH Server
*/
@Slf4j
@Service
public class WindowsSSHCommandServiceImpl extends AbstractSSHCommandService {
@Override
public String getHostname(SSHClient sshClient) {
return safeExecute(sshClient, "hostname");
}
@Override
public String getOsVersion(SSHClient sshClient) {
// Windows版本信息
return safeExecute(sshClient, "ver");
}
@Override
public Integer getCpuCores(SSHClient sshClient) {
// 使用环境变量获取CPU核心数
String result = safeExecute(sshClient, "echo %NUMBER_OF_PROCESSORS%");
return parseInteger(result);
}
@Override
public Integer getMemorySize(SSHClient sshClient) {
// 使用PowerShell获取内存大小GB
String command = "powershell \"(Get-CimInstance Win32_PhysicalMemory | " +
"Measure-Object -Property capacity -Sum).sum /1gb\"";
String result = safeExecute(sshClient, command);
return parseInteger(result);
}
@Override
public Integer getDiskSize(SSHClient sshClient) {
// 使用PowerShell获取C盘大小GB
String command = "powershell \"(Get-PSDrive C | " +
"Select-Object @{N='Size';E={[math]::Round($_.Free/1GB + $_.Used/1GB,0)}}).Size\"";
String result = safeExecute(sshClient, command);
return parseInteger(result);
}
@Override
public OsTypeEnum getSupportedOsType() {
return OsTypeEnum.WINDOWS;
}
}

View File

@ -0,0 +1,739 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.enums.SSHDisconnectReason;
import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHMessageType;
import com.qqchen.deploy.backend.framework.enums.SSHStatusEnum;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.utils.JsonUtils;
import com.qqchen.deploy.backend.framework.utils.SessionIdGenerator;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.Session;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* SSH WebSocket 抽象处理器Framework层
* 提供通用的SSH WebSocket能力
*
* 子类需要实现5个方法
* 1. getSSHTarget(session) - 获取连接目标信息
* 2. checkPermission(userId, target) - 权限验证
* 3. getMaxSessions() - 最大并发连接数
* 4. getPathPattern() - WebSocket路径模式
* 5. onEvent(event, data) - 事件钩子可选
*
* @author Framework
*/
@Slf4j
public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
// ========== 会话存储 ==========
protected final Map<String, WebSocketSession> webSocketSessions = new ConcurrentHashMap<>();
protected final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
protected final Map<String, Session.Shell> sshShells = new ConcurrentHashMap<>();
protected final Map<String, Future<?>> outputTasks = new ConcurrentHashMap<>();
// ========== 依赖注入 ==========
/**
* SSH命令服务工厂由子类注入
*/
private final SSHCommandServiceFactory sshCommandServiceFactory;
/**
* SSH输出监听线程池Framework自动注入
*/
@Resource(name = "sshOutputExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
/**
* SSH会话管理器由子类注入
*/
private final SSHSessionManager sessionManager;
/**
* 会话目标信息存储sessionId SSHTarget
* 用于在事件中访问连接目标信息
*/
private final Map<String, SSHTarget> sessionTargets = new ConcurrentHashMap<>();
/**
* 构造函数子类必须调用
*
* @param sshCommandServiceFactory SSH命令服务工厂
* @param sessionManager SSH会话管理器
*/
protected AbstractSSHWebSocketHandler(
SSHCommandServiceFactory sshCommandServiceFactory,
SSHSessionManager sessionManager) {
this.sshCommandServiceFactory = sshCommandServiceFactory;
this.sessionManager = sessionManager;
}
// ========== 子类必须实现的抽象方法 ==========
/**
* 获取SSH连接目标信息由子类实现
*
* @param session WebSocket会话
* @return SSH目标信息
* @throws Exception 获取失败时抛出
*/
protected abstract SSHTarget getSSHTarget(WebSocketSession session) throws Exception;
/**
* 检查用户权限由子类实现
*
* @param userId 用户ID
* @param target SSH目标
* @return 是否有权限
*/
protected abstract boolean checkPermission(Long userId, SSHTarget target);
/**
* 获取最大并发连接数由子类实现
*
* @return 最大连接数
*/
protected abstract int getMaxSessions();
/**
* 获取WebSocket路径模式由子类实现
*
* @return 路径模式 "/api/v1/server-ssh/connect/*"
*/
protected abstract String getPathPattern();
/**
* 事件钩子可选子类可以重写
* 用于审计日志统计等业务逻辑
*
* @param event 事件类型
* @param eventData 事件数据强类型
*/
protected void onEvent(SSHEvent event, SSHEventData eventData) {
// 默认空实现
}
/**
* 从WebSocketSession中获取SSH SessionId
*
* @param session WebSocketSession
* @return SSH SessionId
*/
private String getSessionId(WebSocketSession session) {
String sessionId = (String) session.getAttributes().get("sshSessionId");
if (sessionId == null) {
// 兼容处理如果没有sshSessionId使用WebSocket原始ID
sessionId = session.getId();
}
return sessionId;
}
// ========== Framework 提供的核心能力 ==========
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 关键生成线程安全的唯一SessionId避免并发冲突
// 使用增强型SessionIdWebSocket原始ID + 时间戳 + 序列号
String sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
// 将生成的sessionId保存到WebSocketSession attributes中
session.getAttributes().put("sshSessionId", sessionId);
log.info("WebSocket连接建立: webSocketId={}, sshSessionId={}", session.getId(), sessionId);
try {
// 1. 获取用户信息
Long userId = (Long) session.getAttributes().get("userId");
String username = (String) session.getAttributes().get("username");
String clientIp = (String) session.getAttributes().get("clientIp");
String userAgent = (String) session.getAttributes().get("userAgent");
if (userId == null) {
log.error("无法获取用户信息: sessionId={}", sessionId);
sendError(session, "认证失败");
session.close(CloseStatus.POLICY_VIOLATION);
return;
}
// 2. 获取连接目标
SSHTarget target = getSSHTarget(session);
validateSSHTarget(target);
// 保存target信息供后续事件使用
sessionTargets.put(sessionId, target);
log.info("获取SSH目标成功: targetType={}, target={}", target.getTargetType(), target.getMetadata());
// 3. 触发连接前事件
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.userId(userId)
.username(username)
.target(target)
.clientIp(clientIp)
.userAgent(userAgent)
.build();
onEvent(SSHEvent.BEFORE_CONNECT, eventData);
// 4. 权限验证
if (!checkPermission(userId, target)) {
log.warn("用户无权访问目标: userId={}, target={}:{}",
userId, target.getTargetType(), target.getMetadata());
sendError(session, "无权访问此目标");
session.close(CloseStatus.POLICY_VIOLATION);
return;
}
// 5. 提前注册会话解决并发竞态问题
sessionManager.registerSession(sessionId, userId, target.getTargetType(), target.getMetadata());
boolean sessionRegistered = true;
try {
// 6. 检查并发连接数注册后立即检查
long activeCount = sessionManager.countActiveSessions(userId, target.getTargetType(), target.getMetadata());
if (activeCount > getMaxSessions()) {
log.warn("用户连接数超过限制: userId={}, target={}:{}, current={}, max={}",
userId, target.getTargetType(), target.getMetadata(), activeCount, getMaxSessions());
sendError(session, "连接数超过限制(最多" + getMaxSessions() + "个)");
sessionManager.removeSession(sessionId);
sessionRegistered = false;
session.close(CloseStatus.POLICY_VIOLATION);
return;
}
// 7. 发送连接中状态
sendStatus(session, SSHStatusEnum.CONNECTING);
// 8. 建立SSH连接
SSHClient sshClient = createSSHConnection(target);
sshClients.put(sessionId, sshClient);
// 9. 打开Shell通道并分配PTY伪终端
Session sshSession = sshClient.startSession();
// 关键分配PTY启用交互式Shell回显提示符
// 参数终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式
sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap());
log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId);
Session.Shell shell = sshSession.startShell();
log.debug("Shell已启动: sessionId={}", sessionId);
// 保存会话信息
webSocketSessions.put(sessionId, session);
sshShells.put(sessionId, shell);
// 10. 优化先启动输出监听线程确保不错过任何SSH输出
Future<?> stdoutTask = asyncTaskExecutor.submit(() -> readSSHOutput(session, shell));
outputTasks.put(sessionId, stdoutTask);
// 同时启动错误流监听某些SSH服务器会将输出发送到错误流
Future<?> stderrTask = asyncTaskExecutor.submit(() -> readSSHError(session, shell));
outputTasks.put(sessionId + "_stderr", stderrTask);
// 11. 发送连接成功状态
sendStatus(session, SSHStatusEnum.CONNECTED);
log.info("SSH连接成功: sessionId={}, userId={}, target={}:{}",
sessionId, userId, target.getTargetType(), target.getMetadata());
// 12. 异步创建审计日志不阻塞主线程
// 使用CompletableFuture异步执行避免数据库操作延迟影响SSH输出接收
java.util.concurrent.CompletableFuture.runAsync(() -> {
try {
onEvent(SSHEvent.AFTER_CONNECT, eventData);
} catch (Exception ex) {
log.error("AFTER_CONNECT事件处理失败: sessionId={}", sessionId, ex);
}
});
} catch (Exception ex) {
// 如果会话已注册需要清理
if (sessionRegistered) {
sessionManager.removeSession(sessionId);
}
throw ex; // 重新抛出由外层catch处理
}
} catch (Exception e) {
log.error("建立SSH连接失败: sessionId={}", sessionId, e);
sendError(session, "连接失败: " + e.getMessage());
// 构建失败事件数据
SSHEventData failureEventData = SSHEventData.builder()
.sessionId(sessionId)
.errorMessage(e.getMessage())
.exception(e)
.build();
cleanupSession(sessionId, failureEventData);
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException ex) {
log.error("关闭WebSocket会话失败: sessionId={}", sessionId, ex);
}
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String sessionId = getSessionId(session);
try {
SSHWebSocketMessage msg = JsonUtils.fromJson(message.getPayload(), SSHWebSocketMessage.class);
if (msg.getType() == SSHMessageType.INPUT) {
// 用户输入命令
Session.Shell shell = sshShells.get(sessionId);
if (shell != null) {
OutputStream outputStream = shell.getOutputStream();
outputStream.write(msg.getData().getBytes(StandardCharsets.UTF_8));
outputStream.flush();
// 触发命令事件
SSHTarget target = sessionTargets.get(sessionId);
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.command(msg.getData())
.target(target)
.build();
onEvent(SSHEvent.ON_COMMAND, eventData);
}
}
} catch (Exception e) {
log.error("处理WebSocket消息失败: sessionId={}", sessionId, e);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String sessionId = getSessionId(session);
log.info("WebSocket连接关闭: sessionId={}, status={}", sessionId, status);
SSHTarget target = sessionTargets.get(sessionId);
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.target(target)
.closeStatus(status)
.build();
cleanupSession(sessionId, eventData);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String sessionId = getSessionId(session);
log.error("WebSocket传输错误: sessionId={}", sessionId, exception);
SSHTarget target = sessionTargets.get(sessionId);
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.target(target)
.errorMessage(exception.getMessage())
.exception(exception)
.build();
onEvent(SSHEvent.ON_ERROR, eventData);
sendError(session, "传输错误: " + exception.getMessage());
cleanupSession(sessionId, eventData);
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException e) {
log.error("关闭WebSocket会话失败: sessionId={}", sessionId, e);
}
}
// ========== 私有方法 ==========
/**
* 创建SSH连接
*/
private SSHClient createSSHConnection(SSHTarget target) throws Exception {
ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType());
String password = null, privateKey = null, passphrase = null;
switch (target.getAuthType()) {
case PASSWORD:
if (target.getPassword() == null || target.getPassword().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"SSH密码不能为空"});
}
password = target.getPassword();
break;
case KEY:
if (target.getPrivateKey() == null || target.getPrivateKey().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"SSH私钥不能为空"});
}
privateKey = target.getPrivateKey();
passphrase = target.getPassphrase();
break;
default:
throw new BusinessException(ResponseCode.INVALID_PARAM,
new Object[]{"不支持的认证类型: " + target.getAuthType()});
}
return sshService.createConnection(target.getHost(), target.getPort(), target.getUsername(),
password, privateKey, passphrase);
}
/**
* 验证SSH目标参数
*/
private void validateSSHTarget(SSHTarget target) {
if (target == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH目标不能为空"});
}
if (target.getTargetType() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"目标类型不能为空"});
}
if (target.getHost() == null || target.getHost().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"主机地址不能为空"});
}
if (target.getUsername() == null || target.getUsername().isEmpty()) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"用户名不能为空"});
}
if (target.getAuthType() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"认证方式不能为空"});
}
if (target.getOsType() == null) {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"操作系统类型不能为空"});
}
}
/**
* 读取SSH输出并发送到前端
*/
private void readSSHOutput(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
log.debug("开始监听SSH输出: sessionId={}", sessionId);
try {
InputStream inputStream = shell.getInputStream();
byte[] buffer = new byte[1024];
int len;
log.debug("SSH输出流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = inputStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
sendOutput(session, output);
log.debug("SSH输出已发送到前端: sessionId={}", sessionId);
}
log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen());
} catch (java.io.InterruptedIOException e) {
// 线程被中断正常的清理过程检查是否是WebSocket关闭导致的
if (!session.isOpen()) {
log.debug("SSH输出监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e);
// 只在session仍然打开时尝试发送错误消息
try {
sendError(session, "SSH连接被中断");
} catch (Exception ex) {
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId);
}
}
} catch (IOException e) {
// 其他IO异常真正的错误
if (session.isOpen()) {
log.error("读取SSH输出失败: sessionId={}", sessionId, e);
try {
sendError(session, "读取SSH输出失败: " + e.getMessage());
} catch (Exception ex) {
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId);
}
} else {
log.debug("读取SSH输出时发生IO异常但session已关闭正常: sessionId={}", sessionId);
}
}
}
/**
* 读取SSH错误流并发送到前端
* 某些SSH服务器可能将输出发送到标准错误流
*/
private void readSSHError(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
log.debug("开始监听SSH错误流: sessionId={}", sessionId);
try {
InputStream errorStream = shell.getErrorStream();
byte[] buffer = new byte[1024];
int len;
log.debug("SSH错误流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = errorStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
sendOutput(session, output); // 错误流也作为output发送到前端
log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId);
}
log.debug("SSH错误流监听结束: sessionId={}", sessionId);
} catch (java.io.InterruptedIOException e) {
if (!session.isOpen()) {
log.debug("SSH错误流监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e);
}
} catch (IOException e) {
if (session.isOpen()) {
log.error("读取SSH错误流失败: sessionId={}", sessionId, e);
} else {
log.debug("读取SSH错误流时发生IO异常但session已关闭正常: sessionId={}", sessionId);
}
}
}
/**
* 清理单个会话供子类调用如gracefulShutdown
*
* @param sessionId 会话ID
*/
protected void cleanupSingleSession(String sessionId) {
SSHTarget target = sessionTargets.get(sessionId);
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.target(target)
.build();
cleanupSession(sessionId, eventData);
}
/**
* 清理会话资源
*
* @param sessionId 会话ID
* @param eventData 事件数据
*/
private void cleanupSession(String sessionId, SSHEventData eventData) {
log.debug("清理会话资源: sessionId={}", sessionId);
try {
// 1. 触发断开前事件异步不阻塞清理
asyncTaskExecutor.submit(() -> {
try {
onEvent(SSHEvent.BEFORE_DISCONNECT, eventData);
} catch (Exception e) {
log.error("BEFORE_DISCONNECT事件处理失败: sessionId={}", sessionId, e);
}
});
// 2. 移除WebSocketSession
webSocketSessions.remove(sessionId);
// 3. 取消输出监听任务
Future<?> stdoutTask = outputTasks.remove(sessionId);
if (stdoutTask != null && !stdoutTask.isDone()) {
stdoutTask.cancel(true);
}
Future<?> stderrTask = outputTasks.remove(sessionId + "_stderr");
if (stderrTask != null && !stderrTask.isDone()) {
stderrTask.cancel(true);
}
// 4. 关闭SSH Shell
Session.Shell shell = sshShells.remove(sessionId);
if (shell != null) {
try {
shell.close();
} catch (IOException e) {
log.error("关闭SSH Shell失败: sessionId={}", sessionId, e);
}
}
// 5. 关闭SSH客户端
SSHClient sshClient = sshClients.remove(sessionId);
if (sshClient != null) {
try {
if (sshClient.isConnected()) {
sshClient.disconnect();
}
} catch (IOException e) {
log.error("关闭SSH客户端失败: sessionId={}", sessionId, e);
}
}
// 6. 从会话管理器移除最重要必须执行
sessionManager.removeSession(sessionId);
// 7. 移除target信息
sessionTargets.remove(sessionId);
log.info("会话资源清理完成: sessionId={}", sessionId);
} finally {
// 8. 触发断开后事件异步即使清理失败也要触发
asyncTaskExecutor.submit(() -> {
try {
onEvent(SSHEvent.AFTER_DISCONNECT, eventData);
} catch (Exception e) {
log.error("AFTER_DISCONNECT事件处理失败: sessionId={}", sessionId, e);
}
});
}
}
/**
* 发送输出消息到前端
*/
protected void sendOutput(WebSocketSession session, String output) {
try {
SSHWebSocketMessage msg = SSHWebSocketMessage.output(output);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
} catch (IOException e) {
log.error("发送输出消息失败: sessionId={}", session.getId(), e);
}
}
/**
* 发送状态消息到前端
*/
protected void sendStatus(WebSocketSession session, SSHStatusEnum status) {
try {
SSHWebSocketMessage msg = SSHWebSocketMessage.status(status);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
} catch (IOException e) {
log.error("发送状态消息失败: sessionId={}", session.getId(), e);
}
}
/**
* 发送错误消息到前端
*/
protected void sendError(WebSocketSession session, String error) {
try {
SSHWebSocketMessage msg = SSHWebSocketMessage.error(error);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
} catch (IOException e) {
log.error("发送错误消息失败: sessionId={}", session.getId(), e);
}
}
/**
* 优雅下线应用关闭时清理所有活跃的SSH会话Framework统一实现
* 使用 @PreDestroy 注解确保在Spring容器销毁前执行
*/
@jakarta.annotation.PreDestroy
public void gracefulShutdown() {
log.warn("====== SSH Handler准备关闭开始优雅下线所有SSH会话 ======");
log.warn("当前活跃SSH会话数: {}", webSocketSessions.size());
if (webSocketSessions.isEmpty()) {
log.info("没有活跃的SSH会话跳过优雅下线");
return;
}
// 记录开始时间
long startTime = System.currentTimeMillis();
int successCount = 0;
int failureCount = 0;
// 遍历所有活跃会话
for (Map.Entry<String, WebSocketSession> entry : webSocketSessions.entrySet()) {
String sessionId = entry.getKey();
WebSocketSession session = entry.getValue();
try {
log.info("关闭SSH会话: sessionId={}", sessionId);
// 1. 尝试向前端发送服务器下线通知
try {
if (session.isOpen()) {
sendError(session, "服务器正在重启,连接即将关闭");
// 给前端一点时间接收消息
Thread.sleep(100);
}
} catch (Exception e) {
log.debug("发送下线通知失败: sessionId={}", sessionId, e);
}
// 2. 触发BEFORE_SHUTDOWN事件异步但等待完成
SSHTarget target = sessionTargets.get(sessionId);
Future<?> shutdownEventTask = asyncTaskExecutor.submit(() -> {
try {
SSHEventData eventData = SSHEventData.builder()
.sessionId(sessionId)
.target(target)
.disconnectReason(SSHDisconnectReason.SERVER_SHUTDOWN.getCode())
.disconnectReasonDesc(SSHDisconnectReason.SERVER_SHUTDOWN.getDescription())
.build();
onEvent(SSHEvent.BEFORE_SHUTDOWN, eventData);
} catch (Exception e) {
log.error("BEFORE_SHUTDOWN事件处理失败: sessionId={}", sessionId, e);
}
});
// 等待审计日志写入完成最多1秒
try {
shutdownEventTask.get(1, java.util.concurrent.TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("等待BEFORE_SHUTDOWN事件超时: sessionId={}", sessionId);
}
// 3. 清理SSH资源
try {
cleanupSingleSession(sessionId);
} catch (Exception e) {
log.error("清理SSH资源失败: sessionId={}", sessionId, e);
}
// 4. 关闭WebSocket连接
try {
if (session.isOpen()) {
session.close(new CloseStatus(1001, "服务器正在重启"));
}
} catch (Exception e) {
log.debug("关闭WebSocket失败: sessionId={}", sessionId, e);
}
successCount++;
log.info("SSH会话关闭成功: sessionId={}", sessionId);
} catch (Exception e) {
failureCount++;
log.error("关闭SSH会话失败: sessionId={}", sessionId, e);
}
}
long duration = System.currentTimeMillis() - startTime;
log.warn("====== 优雅下线完成 ======");
log.warn("总会话数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
successCount + failureCount, successCount, failureCount, duration);
}
}

View File

@ -0,0 +1,81 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import lombok.Builder;
import lombok.Data;
import org.springframework.web.socket.CloseStatus;
/**
* SSH事件数据Framework层
*
* 强类型的事件数据类替代 Map<String, Object>
* 提供类型安全IDE提示和编译时检查
*/
@Data
@Builder
public class SSHEventData {
/**
* 会话ID必需
*/
private String sessionId;
/**
* 用户ID连接时必需
*/
private Long userId;
/**
* 用户名连接时必需
*/
private String username;
/**
* SSH连接目标必需
*/
private SSHTarget target;
/**
* 用户输入的命令ON_COMMAND事件
*/
private String command;
/**
* WebSocket关闭状态AFTER_DISCONNECT事件
*/
private CloseStatus closeStatus;
/**
* 错误信息ON_ERROR事件
*/
private String errorMessage;
/**
* 异常对象ON_ERROR事件
*/
private Throwable exception;
/**
* 断开原因BEFORE_SHUTDOWN事件
*/
private String disconnectReason;
/**
* 断开原因描述BEFORE_SHUTDOWN事件
*/
private String disconnectReasonDesc;
/**
* 客户端IP连接时可选
*/
private String clientIp;
/**
* 客户端User-Agent连接时可选
*/
private String userAgent;
/**
* 扩展数据特殊场景
*/
private java.util.Map<String, Object> metadata;
}

View File

@ -0,0 +1,105 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* SSH会话管理器Framework层
* 管理所有活跃的SSH会话提供并发控制能力
*/
@Slf4j
@Component
public class SSHSessionManager {
/**
* 会话信息存储sessionId -> SSHSessionInfo
*/
private final Map<String, SSHSessionInfo> sessions = new ConcurrentHashMap<>();
/**
* 注册会话
*
* @param sessionId WebSocket会话ID
* @param userId 用户ID
* @param targetType 目标类型
* @param targetId 目标ID
*/
public void registerSession(String sessionId, Long userId, SSHTargetType targetType, Object targetId) {
SSHSessionInfo info = new SSHSessionInfo();
info.setSessionId(sessionId);
info.setUserId(userId);
info.setTargetType(targetType);
info.setTargetId(targetId);
info.setConnectTime(LocalDateTime.now());
sessions.put(sessionId, info);
log.debug("会话已注册: sessionId={}, userId={}, target={}:{}",
sessionId, userId, targetType, targetId);
}
/**
* 移除会话
*
* @param sessionId WebSocket会话ID
*/
public void removeSession(String sessionId) {
SSHSessionInfo removed = sessions.remove(sessionId);
if (removed != null) {
log.debug("会话已移除: sessionId={}, userId={}, target={}:{}",
sessionId, removed.getUserId(), removed.getTargetType(), removed.getTargetId());
}
}
/**
* 统计用户对指定目标的活跃会话数
*
* @param userId 用户ID
* @param targetType 目标类型
* @param targetId 目标ID
* @return 活跃会话数
*/
public long countActiveSessions(Long userId, SSHTargetType targetType, Object targetId) {
return sessions.values().stream()
.filter(s -> s.getUserId().equals(userId)
&& s.getTargetType() == targetType
&& s.getTargetId().equals(targetId))
.count();
}
/**
* 获取会话信息
*
* @param sessionId WebSocket会话ID
* @return 会话信息
*/
public SSHSessionInfo getSession(String sessionId) {
return sessions.get(sessionId);
}
/**
* 获取所有活跃会话数
*
* @return 会话总数
*/
public int getTotalSessions() {
return sessions.size();
}
/**
* SSH会话信息
*/
@Data
public static class SSHSessionInfo {
private String sessionId;
private Long userId;
private SSHTargetType targetType;
private Object targetId;
private LocalDateTime connectTime;
}
}

View File

@ -0,0 +1,63 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import lombok.Data;
/**
* SSH连接目标信息Framework层
*/
@Data
public class SSHTarget {
/**
* 目标类型
*/
private SSHTargetType targetType;
/**
* 主机IP
*/
private String host;
/**
* SSH端口
*/
private Integer port;
/**
* SSH用户名
*/
private String username;
/**
* 认证方式
*/
private AuthTypeEnum authType;
/**
* 密码密码认证时使用
*/
private String password;
/**
* 私钥密钥认证时使用
*/
private String privateKey;
/**
* 私钥密码短语密钥认证时可选
*/
private String passphrase;
/**
* 操作系统类型用于选择SSH服务
*/
private OsTypeEnum osType;
/**
* 业务自定义元数据如serverIdpodId等
*/
private Object metadata;
}

View File

@ -0,0 +1,46 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
/**
* SSH WebSocket框架配置Framework层
*
* 职责
* 1. 提供SSH输出监听专用的虚拟线程池
* 2. 框架内部使用业务层无需关心
*
* @author Framework
*/
@Configuration
public class SSHWebSocketConfig {
/**
* SSH输出监听专用线程池虚拟线程
*
* 为什么使用虚拟线程
* 1. SSH输出监听是典型的**阻塞I/O密集型**任务
* 2. 每个SSH连接需要2个长期阻塞的线程stdout + stderr
* 3. 虚拟线程几乎无资源开销支持数百万并发
* 4. 完美适配大量SSH长连接场景
*
* 📊 性能对比
* - 平台线程50个SSH连接 = 100个线程 100-200MB内存
* - 虚拟线程50个SSH连接 = 100个虚拟线程 几MB内存
*/
@Bean("sshOutputExecutor")
public AsyncTaskExecutor sshOutputExecutor() {
SimpleAsyncTaskExecutor executor =
new SimpleAsyncTaskExecutor("ssh-virtual-");
// 启用虚拟线程Java 21+
executor.setVirtualThreads(true);
// 并发限制-1表示无限制虚拟线程资源消耗极低
executor.setConcurrencyLimit(-1);
return executor;
}
}

View File

@ -0,0 +1,97 @@
package com.qqchen.deploy.backend.framework.ssh.websocket;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.qqchen.deploy.backend.framework.enums.SSHMessageType;
import com.qqchen.deploy.backend.framework.enums.SSHStatusEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* SSH WebSocket消息Framework层
*
* 统一的消息格式规范
* {
* "type": "output" | "input" | "status" | "error",
* "data": "消息内容(字符串)",
* "timestamp": 1733475005408,
* "metadata": { ... } // 可选
* }
*/
@Data
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SSHWebSocketMessage {
/**
* 消息类型
*/
private SSHMessageType type;
/**
* 消息内容字符串
* - type=OUTPUT: 终端输出内容
* - type=INPUT: 用户输入内容
* - type=STATUS: 状态值connecting/connected/reconnecting/disconnected/error
* - type=ERROR: 错误描述信息
*/
private String data;
/**
* 消息时间戳Unix毫秒
* 服务端自动生成
*/
private Long timestamp;
/**
* 可选元数据
* 通常为null特殊场景下使用
*/
private Map<String, Object> metadata;
/**
* 构造函数自动填充时间戳
*/
public SSHWebSocketMessage(SSHMessageType type, String data) {
this.type = type;
this.data = data;
this.timestamp = System.currentTimeMillis();
}
/**
* 创建OUTPUT消息
*
* @param data 终端输出内容
*/
public static SSHWebSocketMessage output(String data) {
return new SSHWebSocketMessage(SSHMessageType.OUTPUT, data);
}
/**
* 创建STATUS消息
*
* @param status 状态枚举
*/
public static SSHWebSocketMessage status(SSHStatusEnum status) {
return new SSHWebSocketMessage(SSHMessageType.STATUS, status.name().toLowerCase());
}
/**
* 创建ERROR消息
*
* @param message 错误描述
*/
public static SSHWebSocketMessage error(String message) {
return new SSHWebSocketMessage(SSHMessageType.ERROR, message);
}
/**
* 创建INPUT消息前端发送后端解析用
*
* @param data 用户输入内容
*/
public static SSHWebSocketMessage input(String data) {
return new SSHWebSocketMessage(SSHMessageType.INPUT, data);
}
}

View File

@ -0,0 +1,126 @@
package com.qqchen.deploy.backend.framework.utils;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicLong;
/**
* 会话ID生成器Framework层
*
* 线程安全的SessionId生成避免并发场景下的ID冲突
* 使用策略UUID + 时间戳 + 递增序列号
*/
public class SessionIdGenerator {
/**
* 重入锁保证并发安全
*/
private static final Lock lock = new ReentrantLock();
/**
* 递增序列号用于同一毫秒内的并发请求
*/
private static final AtomicLong sequence = new AtomicLong(0);
/**
* 上次生成ID的时间戳
*/
private static volatile long lastTimestamp = -1L;
/**
* 生成唯一的SessionId
*
* 格式{UUID}-{timestamp}-{sequence}
* 例如f47ac10b-58cc-4372-a567-0e02b2c3d479-1733475005408-001
*
* @return 唯一的SessionId
*/
public static String generateSessionId() {
lock.lock();
try {
long timestamp = System.currentTimeMillis();
// 如果时间戳相同递增序列号
if (timestamp == lastTimestamp) {
sequence.incrementAndGet();
} else {
// 新的毫秒重置序列号
sequence.set(0);
lastTimestamp = timestamp;
}
// 生成UUID去掉连字符缩短长度
String uuid = UUID.randomUUID().toString().replace("-", "");
// 组合UUID + 时间戳 + 序列号3位补零
return String.format("%s-%d-%03d", uuid, timestamp, sequence.get());
} finally {
lock.unlock();
}
}
/**
* 生成短格式SessionId更紧凑
*
* 格式{timestamp}{sequence}{randomSuffix}
* 例如1733475005408001A3F9
*
* @return 短格式唯一SessionId
*/
public static String generateShortSessionId() {
lock.lock();
try {
long timestamp = System.currentTimeMillis();
if (timestamp == lastTimestamp) {
sequence.incrementAndGet();
} else {
sequence.set(0);
lastTimestamp = timestamp;
}
// 生成4位随机后缀
String randomSuffix = UUID.randomUUID().toString().substring(0, 4).toUpperCase();
// 组合时间戳 + 序列号3位 + 随机后缀
return String.format("%d%03d%s", timestamp, sequence.get(), randomSuffix);
} finally {
lock.unlock();
}
}
/**
* 从WebSocket Session中生成增强的SessionId
*
* 在原始WebSocket SessionId基础上添加时间戳和序列号确保绝对唯一
*
* @param webSocketSessionId WebSocket原始SessionId
* @return 增强的SessionId
*/
public static String enhanceWebSocketSessionId(String webSocketSessionId) {
if (webSocketSessionId == null || webSocketSessionId.isEmpty()) {
return generateSessionId();
}
lock.lock();
try {
long timestamp = System.currentTimeMillis();
if (timestamp == lastTimestamp) {
sequence.incrementAndGet();
} else {
sequence.set(0);
lastTimestamp = timestamp;
}
// WebSocket SessionId + 时间戳 + 序列号
return String.format("%s-%d-%03d", webSocketSessionId, timestamp, sequence.get());
} finally {
lock.unlock();
}
}
}

View File

@ -903,6 +903,34 @@ CREATE TABLE deploy_team_environment_notification_config
INDEX idx_deleted (deleted) INDEX idx_deleted (deleted)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='团队环境通知配置表'; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='团队环境通知配置表';
-- Jenkins构建记录表
CREATE TABLE deploy_jenkins_build
(
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
create_by VARCHAR(100) NULL COMMENT '创建人',
create_time DATETIME(6) NULL COMMENT '创建时间',
update_by VARCHAR(100) NULL COMMENT '更新人',
update_time DATETIME(6) NULL COMMENT '更新时间',
version INT NOT NULL DEFAULT 1 COMMENT '版本号',
deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除',
build_number INT NOT NULL COMMENT '构建编号',
build_status VARCHAR(50) NOT NULL COMMENT '构建状态SUCCESS、FAILURE、UNSTABLE、ABORTED等',
build_url VARCHAR(500) NOT NULL COMMENT '构建URL',
duration BIGINT NOT NULL COMMENT '构建持续时间(毫秒)',
start_time DATETIME(6) NOT NULL COMMENT '构建开始时间',
actions MEDIUMTEXT NULL COMMENT 'Jenkins构建动作JSON数据可能包含大量参数、触发器等信息',
external_system_id BIGINT NOT NULL COMMENT '外部系统ID关联deploy_external_system',
job_id BIGINT NOT NULL COMMENT '任务ID关联deploy_jenkins_job',
UNIQUE INDEX uk_job_build (job_id, build_number),
INDEX idx_external_system (external_system_id),
INDEX idx_job (job_id),
INDEX idx_status (build_status),
INDEX idx_start_time (start_time),
INDEX idx_deleted (deleted)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Jenkins构建记录表';
-- Jenkins构建通知记录表记录通知状态防止重复通知 -- Jenkins构建通知记录表记录通知状态防止重复通知
CREATE TABLE deploy_jenkins_build_notification CREATE TABLE deploy_jenkins_build_notification
( (