diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerApiController.java index 478898c4..428c9fdc 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerApiController.java @@ -1,6 +1,7 @@ package com.qqchen.deploy.backend.deploy.api; import com.qqchen.deploy.backend.deploy.dto.ServerDTO; +import com.qqchen.deploy.backend.deploy.dto.ServerInfoDTO; import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO; import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.query.ServerQuery; @@ -84,13 +85,13 @@ public class ServerApiController return Response.success(result); } - @Operation(summary = "测试SSH连接", description = "测试服务器SSH连接是否正常") + @Operation(summary = "测试SSH连接并获取服务器信息", description = "测试服务器SSH连接并自动采集硬件信息") @PostMapping("/{id}/test-connection") - public Response testConnection( + public Response testConnection( @Parameter(description = "服务器ID", required = true) @PathVariable Long id ) { - boolean success = serverService.testConnection(id); - return Response.success(success); + ServerInfoDTO info = serverService.testConnection(id); + return Response.success(info); } @Override diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java index 974a5bf7..fef1647e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java @@ -87,42 +87,9 @@ public class ThreadPoolConfig { executor.initialize(); return executor; } - - /** - * SSH输出监听线程池 - 使用虚拟线程(Java 21+) - * - * ⚠️ 为什么使用虚拟线程? - * 1. SSH输出监听是典型的**阻塞I/O密集型**任务 - * 2. 每个SSH连接需要2个长期阻塞的线程(stdout + stderr) - * 3. 虚拟线程几乎无资源开销,支持数百万并发 - * 4. 完美适配大量SSH长连接场景 - * - * 📊 性能对比: - * - 平台线程:50个SSH连接 = 100个线程 ≈ 100-200MB内存 ❌ - * - 虚拟线程:50个SSH连接 = 100个虚拟线程 ≈ 几MB内存 ✅ - * - * 💡 方案选择: - * - 方案1【当前】:SimpleAsyncTaskExecutor - Spring集成,支持优雅关闭,可定制线程名 - * - 方案2:Executors.newVirtualThreadPerTaskExecutor() - 原生API,最简洁,性能略优 - */ - @Bean("sshOutputExecutor") - public org.springframework.core.task.SimpleAsyncTaskExecutor sshOutputExecutor() { - // 方案1:Spring封装的虚拟线程Executor(推荐) - // 优点:与Spring集成,支持优雅关闭,线程名可定制(便于调试) - org.springframework.core.task.SimpleAsyncTaskExecutor executor = - new org.springframework.core.task.SimpleAsyncTaskExecutor("ssh-virtual-"); - - // ⚠️ 关键:启用虚拟线程(Java 21+) - executor.setVirtualThreads(true); - - // 并发限制:-1表示无限制(虚拟线程资源消耗极低) - executor.setConcurrencyLimit(-1); - - return executor; - - // 方案2:原生虚拟线程Executor(可选) - // 如果需要纯Java实现,无Spring依赖,可以使用: - // return Executors.newVirtualThreadPerTaskExecutor(); - // 注意:需要手动管理生命周期,线程名为 VirtualThread-#1 - } + + // ========== 注意 ========== + // sshOutputExecutor 已迁移到 Framework 层 + // 见: framework.ssh.websocket.SSHWebSocketConfig + // SSH WebSocket 框架自己管理线程池,业务层无需关心 } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/SSHMessage.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/SSHMessage.java deleted file mode 100644 index 8b95192d..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/SSHMessage.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.qqchen.deploy.backend.deploy.dto; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.qqchen.deploy.backend.deploy.enums.SSHMessageTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.SSHStatusEnum; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** - * SSH WebSocket消息DTO - * - * 消息格式规范: - * 1. 状态消息:{"type":"status","status":"connecting"} - * 2. SSH输出:{"type":"output","data":"..."} - * 3. 错误消息:{"type":"error","message":"..."} - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -@JsonInclude(JsonInclude.Include.NON_NULL) // 只序列化非null字段 -public class SSHMessage { - - /** - * 消息类型 - */ - private SSHMessageTypeEnum type; - - /** - * 消息数据(input/output时使用) - */ - private String data; - - /** - * 错误消息(error时使用) - */ - private String message; - - /** - * 连接状态(仅type=STATUS时使用) - */ - private SSHStatusEnum status; - - /** - * 创建input类型消息 - */ - public static SSHMessage input(String data) { - SSHMessage msg = new SSHMessage(); - msg.setType(SSHMessageTypeEnum.INPUT); - msg.setData(data); - return msg; - } - - /** - * 创建output类型消息 - */ - public static SSHMessage output(String data) { - SSHMessage msg = new SSHMessage(); - msg.setType(SSHMessageTypeEnum.OUTPUT); - msg.setData(data); - return msg; - } - - /** - * 创建error类型消息 - */ - public static SSHMessage error(String message) { - SSHMessage msg = new SSHMessage(); - msg.setType(SSHMessageTypeEnum.ERROR); - msg.setMessage(message); - return msg; - } - - /** - * 创建status类型消息 - */ - public static SSHMessage status(SSHStatusEnum status) { - SSHMessage msg = new SSHMessage(); - msg.setType(SSHMessageTypeEnum.STATUS); - msg.setStatus(status); - return msg; - } -} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerDTO.java index 2bd80c09..c2dca37e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerDTO.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerDTO.java @@ -1,7 +1,7 @@ package com.qqchen.deploy.backend.deploy.dto; -import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum; +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.framework.dto.BaseDTO; import lombok.Data; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerInfoDTO.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerInfoDTO.java new file mode 100644 index 00000000..dfeee04b --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/dto/ServerInfoDTO.java @@ -0,0 +1,62 @@ +package com.qqchen.deploy.backend.deploy.dto; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 服务器信息DTO + * 用于测试连接接口的返回 + */ +@Data +public class ServerInfoDTO { + + // ===== 连接状态 ===== + + /** + * 连接是否成功 + */ + private Boolean connected; + + /** + * 错误信息(连接失败时) + */ + private String errorMessage; + + /** + * 连接时间 + */ + private LocalDateTime connectTime; + + /** + * 响应时间(毫秒) + */ + private Long responseTime; + + // ===== 基础硬件信息 ===== + + /** + * 主机名 + */ + private String hostname; + + /** + * 操作系统版本 + */ + private String osVersion; + + /** + * CPU核心数 + */ + private Integer cpuCores; + + /** + * 内存大小(GB) + */ + private Integer memorySize; + + /** + * 磁盘大小(GB) + */ + private Integer diskSize; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/JenkinsBuild.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/JenkinsBuild.java index b3c08062..ec26bbed 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/JenkinsBuild.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/JenkinsBuild.java @@ -27,10 +27,10 @@ public class JenkinsBuild extends Entity { @Column(name = "duration", nullable = false) private Long duration; - @Column(name = "startTime", nullable = false) + @Column(name = "start_time", nullable = false) private LocalDateTime starttime; - @Column(name = "actions", columnDefinition = "TEXT") + @Column(name = "actions", columnDefinition = "MEDIUMTEXT") private String actions; @Column(name = "external_system_id", nullable = false) diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/Server.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/Server.java index 01daee5f..b7285f31 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/Server.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/entity/Server.java @@ -1,7 +1,7 @@ package com.qqchen.deploy.backend.deploy.entity; -import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum; +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.framework.domain.Entity; import jakarta.persistence.*; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/AuthTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/AuthTypeEnum.java deleted file mode 100644 index ca79b792..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/AuthTypeEnum.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.qqchen.deploy.backend.deploy.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * SSH认证方式枚举 - */ -@Getter -@AllArgsConstructor -public enum AuthTypeEnum { - - PASSWORD("密码认证", "使用用户名和密码进行SSH认证"), - KEY("密钥认证", "使用SSH私钥进行认证"); - - private final String name; - private final String description; -} - diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHMessageTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHMessageTypeEnum.java deleted file mode 100644 index 2362ed45..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHMessageTypeEnum.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.qqchen.deploy.backend.deploy.enums; - -import com.fasterxml.jackson.annotation.JsonValue; -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * SSH WebSocket消息类型枚举 - */ -@Getter -@AllArgsConstructor -public enum SSHMessageTypeEnum { - - INPUT("input", "用户输入"), - OUTPUT("output", "SSH输出"), - ERROR("error", "错误消息"), - STATUS("status", "连接状态"); - - @JsonValue // JSON序列化时使用此字段的值 - private final String value; - private final String description; -} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHStatusEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHStatusEnum.java deleted file mode 100644 index 06532b75..00000000 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/SSHStatusEnum.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.qqchen.deploy.backend.deploy.enums; - -import com.fasterxml.jackson.annotation.JsonValue; -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * SSH连接状态枚举 - */ -@Getter -@AllArgsConstructor -public enum SSHStatusEnum { - - CONNECTING("connecting", "连接中"), - CONNECTED("connected", "已连接"), - DISCONNECTED("disconnected", "已断开"); - - @JsonValue // JSON序列化时使用此字段的值 - private final String value; - private final String description; -} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java new file mode 100644 index 00000000..a78d365b --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java @@ -0,0 +1,182 @@ +package com.qqchen.deploy.backend.deploy.handler; + +import com.qqchen.deploy.backend.framework.enums.SSHEvent; +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; +import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; + +/** + * Docker Container SSH WebSocket处理器(业务层) + * 继承Framework的抽象Handler,只需实现5个方法 + * + * 示例实现:展示如何为Docker容器提供SSH终端能力 + */ +@Slf4j +@Component +public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { + + // TODO: 注入Docker相关的Service + // @Resource + // private IDockerContainerService containerService; + + // @Resource + // private IDockerAuditLogService dockerAuditLogService; + + private static final int MAX_SESSIONS_PER_CONTAINER = 3; + + /** + * 构造函数(注入Framework依赖) + */ + public ContainerSSHWebSocketHandler( + SSHCommandServiceFactory sshCommandServiceFactory, + SSHSessionManager sessionManager) { + super(sshCommandServiceFactory, sessionManager); + } + + // ========== 实现Framework要求的5个抽象方法 ========== + + /** + * 1. 获取SSH连接目标(业务逻辑) + */ + @Override + protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception { + // 从URL提取containerId + Long containerId = extractContainerId(session); + if (containerId == null) { + throw new IllegalArgumentException("无效的Container ID"); + } + + // TODO: 从数据库获取Container信息 + // DockerContainer container = containerService.findById(containerId); + // if (container == null) { + // throw new IllegalArgumentException("容器不存在: " + containerId); + // } + + // 构建SSH目标 + SSHTarget target = new SSHTarget(); + target.setTargetType(SSHTargetType.CONTAINER); + + // TODO: 从Docker API获取容器的SSH信息 + // Docker容器通常通过宿主机SSH + docker exec + // target.setHost(container.getHostIp()); + // target.setPort(container.getSshPort()); + // target.setUsername(container.getUsername()); + // target.setAuthType(container.getAuthType()); + // target.setPassword(container.getPassword()); + // target.setOsType(container.getOsType()); + + // 示例配置 + target.setHost("docker-host-ip"); + target.setPort(22); + target.setUsername("docker-user"); + target.setMetadata(containerId); // 保存containerId供后续使用 + + return target; + } + + /** + * 2. 权限验证(业务逻辑) + */ + @Override + protected boolean checkPermission(Long userId, SSHTarget target) { + // TODO: 实现Docker容器权限验证 + // Long containerId = (Long) target.getMetadata(); + // return dockerPermissionService.hasPermission(userId, containerId); + + log.warn("Docker容器权限验证尚未实现,默认允许所有用户访问"); + return true; + } + + /** + * 3. 获取最大并发连接数 + */ + @Override + protected int getMaxSessions() { + return MAX_SESSIONS_PER_CONTAINER; + } + + /** + * 4. 获取WebSocket路径模式 + */ + @Override + protected String getPathPattern() { + return "/api/v1/docker-container-ssh/connect/*"; + } + + /** + * 5. 事件钩子(审计日志等业务逻辑) + */ + @Override + protected void onEvent(SSHEvent event, Map data) { + String sessionId = (String) data.get("sessionId"); + SSHTarget target = (SSHTarget) data.get("target"); + Long containerId = (Long) target.getMetadata(); + + switch (event) { + case AFTER_CONNECT: + // 创建Docker审计日志 + try { + Long userId = (Long) data.get("userId"); + log.info("Docker容器连接审计: sessionId={}, containerId={}, userId={}", + sessionId, containerId, userId); + // TODO: dockerAuditLogService.createAuditLog(...) + } catch (Exception e) { + log.error("创建Docker审计日志失败: sessionId={}", sessionId, e); + } + break; + + case AFTER_DISCONNECT: + // 关闭Docker审计日志 + log.info("Docker容器断开连接: sessionId={}, containerId={}", sessionId, containerId); + // TODO: dockerAuditLogService.closeAuditLog(...) + break; + + case ON_COMMAND: + // 记录命令(可选) + String command = (String) data.get("command"); + log.debug("Docker容器命令: sessionId={}, command={}", sessionId, command); + // TODO: dockerAuditLogService.recordCommand(...) + break; + + case ON_ERROR: + // 记录错误 + String error = (String) data.get("error"); + log.error("Docker容器连接错误: sessionId={}, error={}", sessionId, error); + // TODO: dockerAuditLogService.closeAuditLog(sessionId, "FAILED", error); + break; + + case BEFORE_SHUTDOWN: + // 优雅下线 + String reason = (String) data.get("reason"); + log.info("Docker容器优雅下线: sessionId={}, reason={}", sessionId, reason); + // TODO: dockerAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); + break; + } + } + + // ========== 辅助方法 ========== + + /** + * 从WebSocket session URL中提取containerId + */ + private Long extractContainerId(WebSocketSession session) { + try { + String path = session.getUri().getPath(); + // /api/v1/docker-container-ssh/connect/{containerId} + String[] parts = path.split("/"); + if (parts.length > 0) { + return Long.parseLong(parts[parts.length - 1]); + } + } catch (Exception e) { + log.error("提取containerId失败", e); + } + return null; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java new file mode 100644 index 00000000..c8156f77 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java @@ -0,0 +1,216 @@ +package com.qqchen.deploy.backend.deploy.handler; + +import com.qqchen.deploy.backend.framework.enums.SSHEvent; +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; +import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; + +/** + * K8S Pod SSH WebSocket处理器(业务层) + * 继承Framework的抽象Handler,只需实现5个方法 + * + * 示例实现:展示如何为K8S Pod提供SSH终端能力 + */ +@Slf4j +@Component +public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler { + + // TODO: 注入K8S相关的Service + // @Resource + // private IK8sPodService podService; + + // @Resource + // private IK8sAuditLogService k8sAuditLogService; + + private static final int MAX_SESSIONS_PER_POD = 3; + + /** + * 构造函数(注入Framework依赖) + */ + public PodSSHWebSocketHandler( + SSHCommandServiceFactory sshCommandServiceFactory, + SSHSessionManager sessionManager) { + super(sshCommandServiceFactory, sessionManager); + } + + // ========== 实现Framework要求的5个抽象方法 ========== + + /** + * 1. 获取SSH连接目标(业务逻辑) + */ + @Override + protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception { + // 从URL提取podId + Long podId = extractPodId(session); + if (podId == null) { + throw new IllegalArgumentException("无效的Pod ID"); + } + + // TODO: 从数据库获取Pod信息 + // K8sPod pod = podService.findById(podId); + // if (pod == null) { + // throw new IllegalArgumentException("Pod不存在: " + podId); + // } + + // 构建SSH目标(示例) + SSHTarget target = new SSHTarget(); + target.setTargetType(SSHTargetType.POD); + + // TODO: 从K8S API获取Pod的SSH信息 + // 注意:K8S Pod可能需要通过Node的SSH + kubectl exec + // target.setHost(pod.getNodeIp()); + // target.setPort(pod.getSshPort()); + // target.setUsername(pod.getUsername()); + // target.setAuthType(pod.getAuthType()); + // target.setPassword(pod.getPassword()); + // target.setOsType(pod.getOsType()); + + // 示例配置 + target.setHost("k8s-node-ip"); + target.setPort(22); + target.setUsername("k8s-user"); + target.setMetadata(podId); // 保存podId供后续使用 + + return target; + } + + /** + * 2. 权限验证(业务逻辑) + */ + @Override + protected boolean checkPermission(Long userId, SSHTarget target) { + // TODO: 实现K8S RBAC权限验证 + // Long podId = (Long) target.getMetadata(); + // return k8sRbacService.hasPermission(userId, "pod", "exec", podId); + + log.warn("K8S Pod权限验证尚未实现,默认允许所有用户访问"); + return true; + } + + /** + * 3. 获取最大并发连接数 + */ + @Override + protected int getMaxSessions() { + return MAX_SESSIONS_PER_POD; + } + + /** + * 4. 获取WebSocket路径模式 + */ + @Override + protected String getPathPattern() { + return "/api/v1/k8s-pod-ssh/connect/*"; + } + + /** + * 5. 事件钩子(审计日志等业务逻辑) + */ + @Override + protected void onEvent(SSHEvent event, Map data) { + String sessionId = (String) data.get("sessionId"); + SSHTarget target = (SSHTarget) data.get("target"); + Long podId = (Long) target.getMetadata(); + + switch (event) { + case AFTER_CONNECT: + // 创建K8S审计日志 + try { + Long userId = (Long) data.get("userId"); + String clientIp = (String) data.get("clientIp"); + String userAgent = (String) data.get("userAgent"); + + // TODO: 调用K8S审计服务 + // Long auditLogId = k8sAuditLogService.createAuditLog( + // userId, podId, sessionId, clientIp, userAgent); + // data.put("auditLogId", auditLogId); + + log.info("K8S Pod连接审计日志已创建: sessionId={}, podId={}", sessionId, podId); + } catch (Exception e) { + log.error("创建K8S审计日志失败: sessionId={}", sessionId, e); + } + break; + + case AFTER_DISCONNECT: + // 关闭K8S审计日志 + try { + // TODO: k8sAuditLogService.closeAuditLog(sessionId, "SUCCESS", "正常断开"); + log.info("K8S Pod审计日志已关闭: sessionId={}", sessionId); + } catch (Exception e) { + log.error("关闭K8S审计日志失败: sessionId={}", sessionId, e); + } + break; + + case ON_COMMAND: + // 记录命令到K8S审计日志 + try { + String command = (String) data.get("command"); + if (command != null && command.length() > 0) { + // TODO: k8sAuditLogService.recordCommand(sessionId, command); + } + } catch (Exception e) { + log.error("记录K8S命令失败: sessionId={}", sessionId, e); + } + break; + + case ON_ERROR: + // 记录错误 + try { + String error = (String) data.get("error"); + // TODO: k8sAuditLogService.closeAuditLog(sessionId, "FAILED", error); + log.error("K8S Pod连接错误: sessionId={}, error={}", sessionId, error); + } catch (Exception e) { + log.error("记录K8S错误失败: sessionId={}", sessionId, e); + } + break; + + case BEFORE_SHUTDOWN: + // 优雅下线前处理审计日志 + try { + String reason = (String) data.get("reason"); + // TODO: k8sAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); + log.info("K8S审计日志已更新(优雅下线): sessionId={}", sessionId); + } catch (Exception e) { + log.error("更新K8S审计日志失败(优雅下线): sessionId={}", sessionId, e); + } + break; + } + } + + // ========== 辅助方法 ========== + + /** + * 从WebSocket session URL中提取podId + */ + private Long extractPodId(WebSocketSession session) { + try { + String path = session.getUri().getPath(); + // /api/v1/k8s-pod-ssh/connect/{podId} + String[] parts = path.split("/"); + if (parts.length > 0) { + return Long.parseLong(parts[parts.length - 1]); + } + } catch (Exception e) { + log.error("提取podId失败", e); + } + return null; + } + + /** + * 可选:重写createConnection方法以支持kubectl exec + * K8S Pod可能不是标准SSH连接,而是通过kubectl exec + */ + // @Override + // protected SSHClient createConnection(SSHTarget target) throws Exception { + // // 使用K8S API创建exec连接 + // // 而不是标准SSH连接 + // return k8sExecService.createExecConnection(target); + // } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java index dad2db92..d9270400 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java @@ -1,475 +1,188 @@ package com.qqchen.deploy.backend.deploy.handler; -import com.qqchen.deploy.backend.deploy.dto.SSHMessage; import com.qqchen.deploy.backend.deploy.entity.Server; -import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.SSHMessageTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.SSHStatusEnum; import com.qqchen.deploy.backend.deploy.service.ISSHAuditLogService; import com.qqchen.deploy.backend.deploy.service.IServerService; -import com.qqchen.deploy.backend.framework.enums.ResponseCode; -import com.qqchen.deploy.backend.framework.exception.BusinessException; -import com.qqchen.deploy.backend.framework.utils.JsonUtils; +import com.qqchen.deploy.backend.framework.enums.SSHEvent; +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; +import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import net.schmizz.sshj.SSHClient; -import net.schmizz.sshj.common.IOUtils; -import net.schmizz.sshj.connection.channel.direct.Session; -import net.schmizz.sshj.transport.verification.PromiscuousVerifier; -import net.schmizz.sshj.userauth.keyprovider.KeyProvider; -import net.schmizz.sshj.userauth.password.PasswordUtils; -import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.TextWebSocketHandler; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; /** - * Server SSH WebSocket处理器 - * 处理Web SSH终端的WebSocket连接和SSH交互 + * Server SSH WebSocket处理器(业务层) + * 继承Framework的抽象Handler,只需实现5个方法 + * + * 注意:asyncTaskExecutor 线程池由Framework自动注入,业务层无需关心 */ @Slf4j @Component -public class ServerSSHWebSocketHandler extends TextWebSocketHandler { +public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { @Resource private IServerService serverService; - + @Resource private ISSHAuditLogService auditLogService; + + private static final int MAX_SESSIONS_PER_SERVER = 5; + + /** + * 构造函数(注入业务依赖) + * + * @param sshCommandServiceFactory SSH命令服务工厂 + * @param sessionManager SSH会话管理器 + */ + public ServerSSHWebSocketHandler( + SSHCommandServiceFactory sshCommandServiceFactory, + SSHSessionManager sessionManager) { + super(sshCommandServiceFactory, sessionManager); + } - @Resource(name = "sshOutputExecutor") - private AsyncTaskExecutor sshOutputExecutor; - - /** - * 最大并发SSH会话数(每个用户) - */ - private static final int MAX_SESSIONS_PER_USER = 5; + // ========== 实现Framework要求的5个抽象方法 ========== /** - * WebSocket会话存储:sessionId -> WebSocketSession - */ - private final Map webSocketSessions = new ConcurrentHashMap<>(); - - /** - * SSH会话存储:sessionId -> SSHClient - */ - private final Map sshClients = new ConcurrentHashMap<>(); - - /** - * SSH会话通道存储:sessionId -> Session.Shell - */ - private final Map sshShells = new ConcurrentHashMap<>(); - - /** - * 输出监听任务存储:sessionId -> Future - */ - private final Map> outputTasks = new ConcurrentHashMap<>(); - - /** - * WebSocket连接建立时触发 + * 1. 获取SSH连接目标(业务逻辑) */ @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - String sessionId = session.getId(); - log.info("WebSocket连接建立: sessionId={}", sessionId); + protected SSHTarget getSSHTarget(WebSocketSession session) throws Exception { + // 从URL提取serverId + Long serverId = extractServerId(session); + if (serverId == null) { + throw new IllegalArgumentException("无效的服务器ID"); + } - try { - // 1. 从attributes中获取用户信息(由认证拦截器设置) - Long userId = (Long) session.getAttributes().get("userId"); - String username = (String) session.getAttributes().get("username"); - String clientIp = (String) session.getAttributes().get("clientIp"); - String userAgent = (String) session.getAttributes().get("userAgent"); - - if (userId == null) { - log.error("无法获取用户信息: sessionId={}", sessionId); - sendError(session, "认证失败"); - session.close(CloseStatus.POLICY_VIOLATION); - return; - } - - // 2. 从URL中提取serverId - Long serverId = extractServerId(session); - if (serverId == null) { - sendError(session, "无效的服务器ID"); - session.close(CloseStatus.BAD_DATA); - return; - } - - // 3. 获取服务器信息 - Server server = serverService.findEntityById(serverId); - if (server == null) { - sendError(session, "服务器不存在: " + serverId); - session.close(CloseStatus.NOT_ACCEPTABLE); - return; - } - - // 4. 检查用户对该服务器的SSH会话数 - long activeSessions = auditLogService.countUserActiveSessionsForServer(userId, serverId); - log.info("用户当前对该服务器的SSH连接数: userId={}, serverId={}, serverName={}, current={}, max={}", - userId, serverId, server.getServerName(), activeSessions, MAX_SESSIONS_PER_USER); - - if (activeSessions >= MAX_SESSIONS_PER_USER) { - log.warn("用户对该服务器的SSH会话数超过限制: userId={}, serverId={}, serverName={}, current={}, max={}", - userId, serverId, server.getServerName(), activeSessions, MAX_SESSIONS_PER_USER); - sendError(session, "对服务器【" + server.getServerName() + "】的SSH连接数超过限制(最多" + MAX_SESSIONS_PER_USER + "个)"); - session.close(CloseStatus.POLICY_VIOLATION); - return; - } - - // 5. 权限校验(预留,实际项目中需要实现) - // TODO: 根据业务需求实现权限校验逻辑 - // 例如:检查用户是否是管理员,或者服务器是否允许该用户访问 - - // 6. 发送连接中状态 - sendStatus(session, SSHStatusEnum.CONNECTING); - - // 7. 建立SSH连接 - SSHClient sshClient = createSSHConnection(server); - sshClients.put(sessionId, sshClient); - - // 8. 打开Shell通道并分配PTY(伪终端) - Session sshSession = sshClient.startSession(); - - // ⚠️ 关键:分配PTY,启用交互式Shell、回显、提示符 - // 参数:终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式 - sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap()); - - log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId); - - Session.Shell shell = sshSession.startShell(); - log.debug("Shell已启动: sessionId={}", sessionId); - - // 保存会话信息 - webSocketSessions.put(sessionId, session); - sshShells.put(sessionId, shell); - - // 9. ⚠️ 优化:先启动输出监听线程,确保不错过任何SSH输出 - Future stdoutTask = sshOutputExecutor.submit(() -> readSSHOutput(session, shell)); - outputTasks.put(sessionId, stdoutTask); - - // 同时启动错误流监听(某些SSH服务器会将输出发送到错误流) - Future stderrTask = sshOutputExecutor.submit(() -> readSSHError(session, shell)); - outputTasks.put(sessionId + "_stderr", stderrTask); - - // 10. 发送连接成功状态 - sendStatus(session, SSHStatusEnum.CONNECTED); - log.info("SSH连接建立成功: sessionId={}, userId={}, username={}, server={}@{}", - sessionId, userId, username, server.getSshUser(), server.getHostIp()); - - // 11. ⚠️ 异步创建审计日志,不阻塞主线程 - // 使用CompletableFuture异步执行,避免数据库操作延迟影响SSH输出接收 - CompletableFuture.runAsync(() -> { + // 获取服务器信息 + Server server = serverService.findEntityById(serverId); + if (server == null) { + throw new IllegalArgumentException("服务器不存在: " + serverId); + } + + // 构建SSH目标 + SSHTarget target = new SSHTarget(); + target.setTargetType(SSHTargetType.SERVER); + target.setHost(server.getHostIp()); + target.setPort(server.getSshPort()); + target.setUsername(server.getSshUser()); + target.setAuthType(server.getAuthType()); + target.setPassword(server.getSshPassword()); + target.setPrivateKey(server.getSshPrivateKey()); + target.setPassphrase(server.getSshPassphrase()); + target.setOsType(server.getOsType()); + target.setMetadata(serverId); // 保存serverId供后续使用 + + return target; + } + + /** + * 2. 权限验证(业务逻辑) + */ + @Override + protected boolean checkPermission(Long userId, SSHTarget target) { + // TODO: 实现更细粒度的权限验证 + // 例如:检查用户是否有访问该服务器的权限 + return true; // 暂时允许所有 + } + + /** + * 3. 获取最大并发连接数 + */ + @Override + protected int getMaxSessions() { + return MAX_SESSIONS_PER_SERVER; + } + + /** + * 4. 获取WebSocket路径模式 + */ + @Override + protected String getPathPattern() { + return "/api/v1/server-ssh/connect/*"; + } + + /** + * 5. 事件钩子(审计日志等业务逻辑) + */ + @Override + protected void onEvent(SSHEvent event, Map data) { + String sessionId = (String) data.get("sessionId"); + SSHTarget target = (SSHTarget) data.get("target"); + Long serverId = (Long) target.getMetadata(); + + switch (event) { + case AFTER_CONNECT: + // 创建审计日志 try { - Long auditLogId = auditLogService.createAuditLog(userId, server, sessionId, clientIp, userAgent); - session.getAttributes().put("auditLogId", auditLogId); - log.info("SSH审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId); - } catch (Exception e) { - log.error("创建SSH审计日志失败: sessionId={}", sessionId, e); - } - }); - - } catch (Exception e) { - log.error("建立SSH连接失败: sessionId={}", sessionId, e); - sendError(session, "连接失败: " + e.getMessage()); - - // 记录失败的审计日志 - try { - // ⚠️ 异步场景:直接尝试创建审计日志(有锁保护,已存在则直接返回) - // 无需检查 attributes,因为异步任务可能还未完成 - Long userId = (Long) session.getAttributes().get("userId"); - String clientIp = (String) session.getAttributes().get("clientIp"); - String userAgent = (String) session.getAttributes().get("userAgent"); - Long serverId = extractServerId(session); - - if (userId != null && serverId != null) { + Long userId = (Long) data.get("userId"); + String clientIp = (String) data.get("clientIp"); + String userAgent = (String) data.get("userAgent"); Server server = serverService.findEntityById(serverId); + if (server != null) { - // 先创建(如果已存在则返回已有ID,有锁保护不会重复) - Long auditLogId = auditLogService.createAuditLog(userId, server, sessionId, clientIp, userAgent); - session.getAttributes().put("auditLogId", auditLogId); - // 再关闭 - auditLogService.closeAuditLog(sessionId, "FAILED", e.getMessage()); + Long auditLogId = auditLogService.createAuditLog( + userId, server, sessionId, clientIp, userAgent); + data.put("auditLogId", auditLogId); + log.info("审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId); } + } catch (Exception e) { + log.error("创建审计日志失败: sessionId={}", sessionId, e); } - } catch (Exception auditEx) { - log.error("记录失败审计日志异常", auditEx); - } - - cleanupSession(sessionId); - session.close(CloseStatus.SERVER_ERROR); - } - } - - /** - * 接收前端消息 - */ - @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - String sessionId = session.getId(); - - try { - // 解析消息 - SSHMessage sshMessage = JsonUtils.fromJson(message.getPayload(), SSHMessage.class); - if (sshMessage == null) { - log.warn("解析消息失败: sessionId={}", sessionId); - return; - } - - if (sshMessage.getType() != SSHMessageTypeEnum.INPUT) { - log.warn("收到非input类型消息: sessionId={}, type={}", sessionId, sshMessage.getType()); - return; - } - - // 获取SSH Shell - Session.Shell shell = sshShells.get(sessionId); - if (shell == null) { - sendError(session, "SSH连接未建立"); - return; - } - - // 发送命令到SSH - String input = sshMessage.getData(); - if (input != null) { - OutputStream outputStream = shell.getOutputStream(); - outputStream.write(input.getBytes(StandardCharsets.UTF_8)); - outputStream.flush(); + break; - // 记录命令到审计日志(只记录有意义的命令,过滤掉单个字符的按键) - if (input.length() > 0) { - auditLogService.recordCommand(sessionId, input); + case AFTER_DISCONNECT: + // 关闭审计日志 + try { + auditLogService.closeAuditLog(sessionId, "SUCCESS", "正常断开"); + log.info("审计日志已关闭: sessionId={}", sessionId); + } catch (Exception e) { + log.error("关闭审计日志失败: sessionId={}", sessionId, e); } + break; - log.debug("发送命令到SSH: sessionId={}, length={}", sessionId, input.length()); - } - - } catch (Exception e) { - log.error("处理WebSocket消息失败: sessionId={}", sessionId, e); - sendError(session, "命令执行失败: " + e.getMessage()); - } - } - - /** - * WebSocket连接关闭时触发 - */ - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - String sessionId = session.getId(); - log.info("WebSocket连接关闭: sessionId={}, status={}", sessionId, status); - - // 关闭审计日志 - try { - String auditStatus = status.getCode() == CloseStatus.NORMAL.getCode() ? "SUCCESS" : "INTERRUPTED"; - auditLogService.closeAuditLog(sessionId, auditStatus, status.getReason()); - } catch (Exception e) { - log.error("关闭审计日志失败: sessionId={}", sessionId, e); - } - - // 清理资源 - cleanupSession(sessionId); - } - - /** - * WebSocket传输错误时触发 - */ - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - String sessionId = session.getId(); - log.error("WebSocket传输错误: sessionId={}", sessionId, exception); - - // 记录错误到审计日志 - try { - auditLogService.closeAuditLog(sessionId, "FAILED", "传输错误: " + exception.getMessage()); - } catch (Exception e) { - log.error("关闭审计日志失败: sessionId={}", sessionId, e); - } - - sendError(session, "传输错误: " + exception.getMessage()); - cleanupSession(sessionId); - session.close(CloseStatus.SERVER_ERROR); - } - - /** - * 创建SSH连接 - */ - private SSHClient createSSHConnection(Server server) throws IOException { - SSHClient sshClient = new SSHClient(); - - // 跳过主机密钥验证(生产环境建议使用正式的验证方式) - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - - // 设置超时 - sshClient.setTimeout(30000); - sshClient.setConnectTimeout(30000); - - // 连接服务器 - sshClient.connect(server.getHostIp(), server.getSshPort()); - - // 认证 - if (server.getAuthType() == AuthTypeEnum.PASSWORD) { - // 密码认证 - sshClient.authPassword(server.getSshUser(), server.getSshPassword()); - } else if (server.getAuthType() == AuthTypeEnum.KEY) { - // 密钥认证 - KeyProvider keyProvider; - if (server.getSshPassphrase() != null && !server.getSshPassphrase().isEmpty()) { - keyProvider = sshClient.loadKeys(server.getSshPrivateKey(), null, - PasswordUtils.createOneOff(server.getSshPassphrase().toCharArray())); - } else { - keyProvider = sshClient.loadKeys(server.getSshPrivateKey(), null, null); - } - sshClient.authPublickey(server.getSshUser(), keyProvider); - } else { - throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"不支持的认证类型: " + server.getAuthType()}); - } - - return sshClient; - } - - /** - * 读取SSH输出并发送到前端 - */ - private void readSSHOutput(WebSocketSession session, Session.Shell shell) { - String sessionId = session.getId(); - log.debug("开始监听SSH输出: sessionId={}", sessionId); - - try { - InputStream inputStream = shell.getInputStream(); - byte[] buffer = new byte[1024]; - int len; - - log.debug("SSH输出流已获取,开始循环读取: sessionId={}", sessionId); - - while (session.isOpen() && (len = inputStream.read(buffer)) > 0) { - String output = new String(buffer, 0, len, StandardCharsets.UTF_8); - log.debug("读取到SSH输出: sessionId={}, length={}, content={}", - sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); - sendOutput(session, output); - log.debug("SSH输出已发送到前端: sessionId={}", sessionId); - } - - log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen()); - - } catch (java.io.InterruptedIOException e) { - // 线程被中断(正常的清理过程),检查是否是WebSocket关闭导致的 - if (!session.isOpen()) { - log.debug("SSH输出监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); - } else { - log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e); - // 只在session仍然打开时尝试发送错误消息 + case ON_COMMAND: + // 记录命令到审计日志 try { - sendError(session, "SSH连接被中断"); - } catch (Exception ex) { - log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + String command = (String) data.get("command"); + if (command != null && command.length() > 0) { + auditLogService.recordCommand(sessionId, command); + } + } catch (Exception e) { + log.error("记录命令失败: sessionId={}", sessionId, e); } - } - } catch (IOException e) { - // 其他IO异常(真正的错误) - if (session.isOpen()) { - log.error("读取SSH输出失败: sessionId={}", sessionId, e); + break; + + case ON_ERROR: + // 记录错误 try { - sendError(session, "读取SSH输出失败: " + e.getMessage()); - } catch (Exception ex) { - log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + String error = (String) data.get("error"); + auditLogService.closeAuditLog(sessionId, "FAILED", error); + } catch (Exception e) { + log.error("记录错误失败: sessionId={}", sessionId, e); } - } else { - log.debug("读取SSH输出时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); - } - } - } - - /** - * 读取SSH错误流并发送到前端 - * 某些SSH服务器可能将输出发送到标准错误流 - */ - private void readSSHError(WebSocketSession session, Session.Shell shell) { - String sessionId = session.getId(); - log.debug("开始监听SSH错误流: sessionId={}", sessionId); - - try { - InputStream errorStream = shell.getErrorStream(); - byte[] buffer = new byte[1024]; - int len; - - log.debug("SSH错误流已获取,开始循环读取: sessionId={}", sessionId); - - while (session.isOpen() && (len = errorStream.read(buffer)) > 0) { - String output = new String(buffer, 0, len, StandardCharsets.UTF_8); - log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}", - sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); - sendOutput(session, output); // 错误流也作为output发送到前端 - log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId); - } - - log.debug("SSH错误流监听结束: sessionId={}", sessionId); - - } catch (java.io.InterruptedIOException e) { - if (!session.isOpen()) { - log.debug("SSH错误流监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); - } else { - log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e); - } - } catch (IOException e) { - if (session.isOpen()) { - log.error("读取SSH错误流失败: sessionId={}", sessionId, e); - } else { - log.debug("读取SSH错误流时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); - } - } - } - - /** - * 清理会话资源 - */ - private void cleanupSession(String sessionId) { - log.debug("清理会话资源: sessionId={}", sessionId); - - // 移除WebSocketSession - webSocketSessions.remove(sessionId); - - // 取消输出监听任务(标准输出) - Future stdoutTask = outputTasks.remove(sessionId); - if (stdoutTask != null && !stdoutTask.isDone()) { - stdoutTask.cancel(true); - } - - // 取消错误流监听任务 - Future stderrTask = outputTasks.remove(sessionId + "_stderr"); - if (stderrTask != null && !stderrTask.isDone()) { - stderrTask.cancel(true); - } - - // 关闭SSH Shell - Session.Shell shell = sshShells.remove(sessionId); - if (shell != null) { - try { - shell.close(); - } catch (IOException e) { - log.warn("关闭SSH Shell失败: sessionId={}", sessionId, e); - } - } - - // 关闭SSH连接 - SSHClient sshClient = sshClients.remove(sessionId); - if (sshClient != null) { - try { - sshClient.disconnect(); - } catch (IOException e) { - log.warn("关闭SSH连接失败: sessionId={}", sessionId, e); - } + break; + + case BEFORE_SHUTDOWN: + // 优雅下线前处理审计日志(Framework统一调用) + try { + String reason = (String) data.get("reason"); + auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); + log.info("审计日志已更新(优雅下线): sessionId={}", sessionId); + } catch (Exception e) { + log.error("更新审计日志失败(优雅下线): sessionId={}", sessionId, e); + } + break; } } + // ========== 辅助方法 ========== + /** * 从WebSocket session URL中提取serverId */ @@ -486,125 +199,4 @@ public class ServerSSHWebSocketHandler extends TextWebSocketHandler { } return null; } - - /** - * 发送output类型消息到前端 - */ - private void sendOutput(WebSocketSession session, String output) throws IOException { - if (!session.isOpen()) { - return; // session已关闭,直接返回 - } - SSHMessage message = SSHMessage.output(output); - String json = JsonUtils.toJson(message); - if (json != null) { - session.sendMessage(new TextMessage(json)); - } - } - - /** - * 发送error类型消息到前端 - */ - private void sendError(WebSocketSession session, String errorMessage) throws IOException { - if (!session.isOpen()) { - return; // session已关闭,直接返回 - } - SSHMessage message = SSHMessage.error(errorMessage); - String json = JsonUtils.toJson(message); - if (json != null) { - session.sendMessage(new TextMessage(json)); - } - } - - /** - * 发送status类型消息到前端 - */ - private void sendStatus(WebSocketSession session, SSHStatusEnum status) throws IOException { - if (!session.isOpen()) { - return; // session已关闭,直接返回 - } - SSHMessage message = SSHMessage.status(status); - String json = JsonUtils.toJson(message); - if (json != null) { - session.sendMessage(new TextMessage(json)); - } - } - - /** - * 优雅下线:应用关闭时清理所有活跃的SSH会话 - * 使用 @PreDestroy 注解,确保在Spring容器销毁前执行 - */ - @jakarta.annotation.PreDestroy - public void gracefulShutdown() { - log.warn("====== 应用准备关闭,开始优雅下线所有SSH会话 ======"); - log.warn("当前活跃SSH会话数: {}", webSocketSessions.size()); - - if (webSocketSessions.isEmpty()) { - log.info("没有活跃的SSH会话,跳过优雅下线"); - return; - } - - // 记录开始时间 - long startTime = System.currentTimeMillis(); - int successCount = 0; - int failureCount = 0; - - // 遍历所有活跃会话 - for (Map.Entry entry : webSocketSessions.entrySet()) { - String sessionId = entry.getKey(); - WebSocketSession session = entry.getValue(); - - try { - log.info("关闭SSH会话: sessionId={}", sessionId); - - // 1. 尝试向前端发送服务器下线通知 - try { - if (session.isOpen()) { - sendError(session, "服务器正在重启,连接即将关闭"); - // 给前端一点时间接收消息 - Thread.sleep(100); - } - } catch (Exception e) { - log.debug("发送下线通知失败: sessionId={}", sessionId, e); - } - - // 2. 更新审计日志(最重要!防止僵尸会话) - try { - auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", "服务器优雅下线"); - log.info("审计日志已更新: sessionId={}", sessionId); - } catch (Exception e) { - log.error("更新审计日志失败: sessionId={}", sessionId, e); - } - - // 3. 清理资源 - cleanupSession(sessionId); - - // 4. 关闭WebSocket连接 - try { - if (session.isOpen()) { - session.close(new CloseStatus(1001, "服务器正在重启")); - } - } catch (Exception e) { - log.debug("关闭WebSocket失败: sessionId={}", sessionId, e); - } - - successCount++; - log.info("SSH会话关闭成功: sessionId={}", sessionId); - - } catch (Exception e) { - failureCount++; - log.error("关闭SSH会话失败: sessionId={}", sessionId, e); - } - } - - // 清空所有缓存 - webSocketSessions.clear(); - sshClients.clear(); - sshShells.clear(); - outputTasks.clear(); - - long duration = System.currentTimeMillis() - startTime; - log.warn("====== 优雅下线完成 ======"); - log.warn("总会话数: {}, 成功: {}, 失败: {}, 耗时: {}ms", - successCount + failureCount, successCount, failureCount, duration); - } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java index 32566b17..36dc5692 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java @@ -531,9 +531,6 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration ); if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null) { - // 打印原始响应用于调试 - log.info("Jenkins build details raw response: {}", response.getBody()); - ObjectMapper mapper = new ObjectMapper(); JenkinsBuildResponse buildResponse = mapper.readValue(response.getBody(), JenkinsBuildResponse.class); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/query/ServerQuery.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/query/ServerQuery.java index ffea80a5..3f76540e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/query/ServerQuery.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/query/ServerQuery.java @@ -1,7 +1,7 @@ package com.qqchen.deploy.backend.deploy.query; -import com.qqchen.deploy.backend.deploy.enums.AuthTypeEnum; -import com.qqchen.deploy.backend.deploy.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum; +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; import com.qqchen.deploy.backend.deploy.enums.ServerStatusEnum; import com.qqchen.deploy.backend.framework.annotation.QueryField; import com.qqchen.deploy.backend.framework.enums.QueryType; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IServerService.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IServerService.java index 72b30089..b151987c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IServerService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/IServerService.java @@ -1,6 +1,7 @@ package com.qqchen.deploy.backend.deploy.service; import com.qqchen.deploy.backend.deploy.dto.ServerDTO; +import com.qqchen.deploy.backend.deploy.dto.ServerInfoDTO; import com.qqchen.deploy.backend.deploy.dto.ServerInitializeDTO; import com.qqchen.deploy.backend.deploy.entity.Server; import com.qqchen.deploy.backend.deploy.query.ServerQuery; @@ -21,11 +22,12 @@ public interface IServerService extends IBaseService new BusinessException(ResponseCode.DATA_NOT_FOUND)); // 2. 验证必要字段 if (server.getHostIp() == null || server.getSshUser() == null) { - throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"服务器IP和SSH用户名不能为空"}); + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"服务器IP和SSH用户名不能为空"}); } - SSHClient ssh = new SSHClient(); + if (server.getOsType() == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"请先选择操作系统类型"}); + } + + ServerInfoDTO info = new ServerInfoDTO(); + SSHClient sshClient = null; + ISSHCommandService sshService = null; + try { - // 3. 配置SSH客户端 - // TODO: 【安全改进】生产环境应使用更安全的主机密钥验证方式 - // 当前:使用 PromiscuousVerifier 跳过主机密钥验证(不验证服务器身份,存在中间人攻击风险) - // 建议改为: - // 1. ssh.loadKnownHosts() - 使用 ~/.ssh/known_hosts 文件验证 - // 2. ssh.addHostKeyVerifier(new ConsoleVerifyingHostKeyVerifier()) - 首次自动接受,后续验证 - // 3. 在数据库中存储服务器指纹,首次连接时记录,后续验证 - ssh.addHostKeyVerifier(new PromiscuousVerifier()); - ssh.setTimeout(10000); // 10秒超时 - ssh.setConnectTimeout(10000); + // 3. 获取对应OS的SSH命令服务 + sshService = sshCommandServiceFactory.getService(server.getOsType()); + log.info("使用{}服务测试连接: {}@{}:{} [认证方式: {}]", + server.getOsType(), server.getSshUser(), server.getHostIp(), + server.getSshPort(), server.getAuthType()); - // 4. 连接服务器 - int port = server.getSshPort() != null ? server.getSshPort() : 22; - log.info("尝试连接服务器: {}@{}:{}", server.getSshUser(), server.getHostIp(), port); - ssh.connect(server.getHostIp(), port); - - // 5. 根据认证方式进行认证 - if (server.getAuthType() == AuthTypeEnum.KEY) { - // 密钥认证 - if (server.getSshPrivateKey() == null || server.getSshPrivateKey().isEmpty()) { - throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH私钥不能为空"}); - } - - KeyProvider keyProvider; - if (server.getSshPassphrase() != null && !server.getSshPassphrase().isEmpty()) { - // 私钥有密码保护 - keyProvider = ssh.loadKeys(server.getSshPrivateKey(), null, - net.schmizz.sshj.userauth.password.PasswordUtils.createOneOff(server.getSshPassphrase().toCharArray())); - } else { - // 私钥无密码保护 - keyProvider = ssh.loadKeys(server.getSshPrivateKey(), null, null); - } - - ssh.authPublickey(server.getSshUser(), keyProvider); - log.info("使用密钥认证成功"); - } else { - // 密码认证 - if (server.getSshPassword() == null || server.getSshPassword().isEmpty()) { - throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH密码不能为空"}); - } - - ssh.authPassword(server.getSshUser(), server.getSshPassword()); - log.info("使用密码认证成功"); + // 4. 根据认证类型创建SSH连接 + if (server.getAuthType() == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"请选择认证方式"}); } - // 6. 测试执行简单命令 - try (var session = ssh.startSession()) { - session.allocateDefaultPTY(); - var cmd = session.exec("echo 'Connection test successful'"); - cmd.join(5, TimeUnit.SECONDS); - - if (cmd.getExitStatus() == 0) { - log.info("SSH连接测试成功: {}", server.getHostIp()); + // 根据认证类型传递对应的参数 + String password = null; + String privateKey = null; + String passphrase = null; + + switch (server.getAuthType()) { + case PASSWORD: + // 密码认证:只传密码 + if (server.getSshPassword() == null || server.getSshPassword().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"SSH密码不能为空"}); + } + password = server.getSshPassword(); + break; - // 7. 更新服务器状态和最后连接时间 - server.setStatus(ServerStatusEnum.ONLINE); - server.setLastConnectTime(LocalDateTime.now()); - serverRepository.save(server); - log.info("已更新服务器状态为ONLINE: serverId={}", serverId); + case KEY: + // 密钥认证:只传密钥和密码短语 + if (server.getSshPrivateKey() == null || server.getSshPrivateKey().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"SSH私钥不能为空"}); + } + privateKey = server.getSshPrivateKey(); + passphrase = server.getSshPassphrase(); + break; - return true; - } else { - log.warn("SSH连接测试失败,命令执行异常: {}", server.getHostIp()); - - // 8. 更新服务器状态为离线 - server.setStatus(ServerStatusEnum.OFFLINE); - serverRepository.save(server); - - return false; - } + default: + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"不支持的认证类型: " + server.getAuthType()}); } - } catch (IOException e) { - log.error("SSH连接测试失败: {} - {}", server.getHostIp(), e.getMessage()); + sshClient = sshService.createConnection( + server.getHostIp(), + server.getSshPort(), + server.getSshUser(), + password, + privateKey, + passphrase + ); - // 9. 连接失败,更新服务器状态为离线 + // 5. 采集服务器信息 + info.setConnected(true); + info.setHostname(sshService.getHostname(sshClient)); + info.setOsVersion(sshService.getOsVersion(sshClient)); + info.setCpuCores(sshService.getCpuCores(sshClient)); + info.setMemorySize(sshService.getMemorySize(sshClient)); + info.setDiskSize(sshService.getDiskSize(sshClient)); + + log.info("服务器信息采集成功: serverId={}, hostname={}, cpu={}核, mem={}GB, disk={}GB", + serverId, info.getHostname(), info.getCpuCores(), info.getMemorySize(), info.getDiskSize()); + + // 6. 更新服务器信息到数据库 + server.setStatus(ServerStatusEnum.ONLINE); + server.setLastConnectTime(LocalDateTime.now()); + server.setHostname(info.getHostname()); + server.setOsVersion(info.getOsVersion()); + server.setCpuCores(info.getCpuCores()); + server.setMemorySize(info.getMemorySize()); + server.setDiskSize(info.getDiskSize()); + serverRepository.save(server); + + log.info("服务器状态已更新为ONLINE: serverId={}", serverId); + + } catch (Exception e) { + log.error("测试连接失败: serverId={}, error={}", serverId, e.getMessage(), e); + + // 连接失败,设置错误信息 + info.setConnected(false); + info.setErrorMessage(e.getMessage()); + + // 更新服务器状态为离线 server.setStatus(ServerStatusEnum.OFFLINE); serverRepository.save(server); - throw new BusinessException(ResponseCode.ERROR, new Object[]{"SSH连接失败: " + e.getMessage()}); } finally { - // 10. 关闭连接 - try { - if (ssh.isConnected()) { - ssh.disconnect(); - } - } catch (IOException e) { - log.error("关闭SSH连接失败", e); + // 7. 关闭SSH连接 + if (sshService != null) { + sshService.closeConnection(sshClient); } + + // 8. 设置响应时间 + info.setResponseTime(System.currentTimeMillis() - startTime); + info.setConnectTime(LocalDateTime.now()); } + + return info; } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/AuthTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/AuthTypeEnum.java new file mode 100644 index 00000000..b3f4b513 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/AuthTypeEnum.java @@ -0,0 +1,16 @@ +package com.qqchen.deploy.backend.framework.enums; + +/** + * SSH认证方式枚举(Framework层) + */ +public enum AuthTypeEnum { + /** + * 密码认证 + */ + PASSWORD, + + /** + * 密钥认证 + */ + KEY +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/OsTypeEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/OsTypeEnum.java similarity index 81% rename from backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/OsTypeEnum.java rename to backend/src/main/java/com/qqchen/deploy/backend/framework/enums/OsTypeEnum.java index a3921c96..00270b14 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/enums/OsTypeEnum.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/OsTypeEnum.java @@ -1,10 +1,10 @@ -package com.qqchen.deploy.backend.deploy.enums; +package com.qqchen.deploy.backend.framework.enums; import lombok.AllArgsConstructor; import lombok.Getter; /** - * 操作系统类型枚举 + * 操作系统类型枚举(Framework层) */ @Getter @AllArgsConstructor @@ -19,4 +19,3 @@ public enum OsTypeEnum { private final String code; private final String description; } - diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHDisconnectReason.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHDisconnectReason.java new file mode 100644 index 00000000..74708cbf --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHDisconnectReason.java @@ -0,0 +1,74 @@ +package com.qqchen.deploy.backend.framework.enums; + +/** + * SSH断开连接原因枚举(Framework层) + * + * 用于标识SSH连接断开的具体原因,便于审计和追踪 + */ +public enum SSHDisconnectReason { + /** + * 正常关闭(用户主动关闭) + */ + NORMAL_CLOSE("NORMAL_CLOSE", "正常关闭"), + + /** + * 服务器优雅下线 + */ + SERVER_SHUTDOWN("SERVER_SHUTDOWN", "服务器关闭"), + + /** + * 客户端断开连接 + */ + CLIENT_DISCONNECT("CLIENT_DISCONNECT", "客户端断开"), + + /** + * 网络错误 + */ + NETWORK_ERROR("NETWORK_ERROR", "网络错误"), + + /** + * 会话超时 + */ + SESSION_TIMEOUT("SESSION_TIMEOUT", "会话超时"), + + /** + * SSH连接失败 + */ + CONNECTION_FAILED("CONNECTION_FAILED", "连接失败"), + + /** + * 认证失败 + */ + AUTH_FAILED("AUTH_FAILED", "认证失败"), + + /** + * 并发连接数超限 + */ + CONCURRENT_LIMIT("CONCURRENT_LIMIT", "并发限制"), + + /** + * 传输错误 + */ + TRANSPORT_ERROR("TRANSPORT_ERROR", "传输错误"), + + /** + * 未知原因 + */ + UNKNOWN("UNKNOWN", "未知原因"); + + private final String code; + private final String description; + + SSHDisconnectReason(String code, String description) { + this.code = code; + this.description = description; + } + + public String getCode() { + return code; + } + + public String getDescription() { + return description; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHEvent.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHEvent.java new file mode 100644 index 00000000..a2dc9e51 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHEvent.java @@ -0,0 +1,41 @@ +package com.qqchen.deploy.backend.framework.enums; + +/** + * SSH事件枚举(Framework层) + */ +public enum SSHEvent { + /** + * 连接前 + */ + BEFORE_CONNECT, + + /** + * 连接后 + */ + AFTER_CONNECT, + + /** + * 断开前 + */ + BEFORE_DISCONNECT, + + /** + * 断开后 + */ + AFTER_DISCONNECT, + + /** + * 命令输入 + */ + ON_COMMAND, + + /** + * 错误 + */ + ON_ERROR, + + /** + * 优雅下线前 + */ + BEFORE_SHUTDOWN +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHMessageType.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHMessageType.java new file mode 100644 index 00000000..6e5c21c4 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHMessageType.java @@ -0,0 +1,45 @@ +package com.qqchen.deploy.backend.framework.enums; + +import com.fasterxml.jackson.annotation.JsonValue; + +/** + * SSH WebSocket消息类型枚举(Framework层) + * + * 用于标识不同类型的SSH WebSocket消息 + */ +public enum SSHMessageType { + /** + * 用户输入(前端 → 后端) + */ + INPUT("input"), + + /** + * 终端输出(后端 → 前端) + */ + OUTPUT("output"), + + /** + * 连接状态(后端 → 前端) + */ + STATUS("status"), + + /** + * 错误信息(后端 → 前端) + */ + ERROR("error"); + + private final String value; + + SSHMessageType(String value) { + this.value = value; + } + + /** + * 序列化时输出小写字符串 + * 例如:OUTPUT → "output" + */ + @JsonValue + public String getValue() { + return value; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHStatusEnum.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHStatusEnum.java new file mode 100644 index 00000000..fcacdd0f --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHStatusEnum.java @@ -0,0 +1,31 @@ +package com.qqchen.deploy.backend.framework.enums; + +/** + * SSH连接状态枚举(Framework层) + */ +public enum SSHStatusEnum { + /** + * 连接中(首次连接) + */ + CONNECTING, + + /** + * 已连接 + */ + CONNECTED, + + /** + * 重新连接中(网络闪断、手动重连) + */ + RECONNECTING, + + /** + * 已断开 + */ + DISCONNECTED, + + /** + * 错误 + */ + ERROR +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHTargetType.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHTargetType.java new file mode 100644 index 00000000..e0a93217 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/SSHTargetType.java @@ -0,0 +1,26 @@ +package com.qqchen.deploy.backend.framework.enums; + +/** + * SSH连接目标类型枚举(Framework层) + */ +public enum SSHTargetType { + /** + * 服务器 + */ + SERVER, + + /** + * Kubernetes Pod + */ + POD, + + /** + * Docker容器 + */ + CONTAINER, + + /** + * 其他自定义类型 + */ + CUSTOM +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/AbstractSSHCommandService.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/AbstractSSHCommandService.java new file mode 100644 index 00000000..93ae49b8 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/AbstractSSHCommandService.java @@ -0,0 +1,139 @@ +package com.qqchen.deploy.backend.framework.ssh; + +import com.qqchen.deploy.backend.framework.enums.ResponseCode; +import com.qqchen.deploy.backend.framework.exception.BusinessException; +import lombok.extern.slf4j.Slf4j; +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.connection.channel.direct.Session; +import net.schmizz.sshj.transport.verification.PromiscuousVerifier; +import net.schmizz.sshj.userauth.keyprovider.KeyProvider; +import net.schmizz.sshj.userauth.password.PasswordUtils; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * SSH命令服务抽象基类(Framework层) + * 提供通用的SSH连接和命令执行能力 + * + * 子类只需实现OS特定的命令逻辑 + */ +@Slf4j +public abstract class AbstractSSHCommandService implements ISSHCommandService { + + private static final int DEFAULT_TIMEOUT_MS = 10000; + private static final int COMMAND_TIMEOUT_SECONDS = 5; + + @Override + public SSHClient createConnection(String host, Integer port, String username, + String password, String privateKey, String passphrase) throws Exception { + SSHClient sshClient = new SSHClient(); + + try { + // 配置SSH客户端 + // TODO: 【安全改进】生产环境应使用更安全的主机密钥验证方式 + sshClient.addHostKeyVerifier(new PromiscuousVerifier()); + sshClient.setTimeout(DEFAULT_TIMEOUT_MS); + sshClient.setConnectTimeout(DEFAULT_TIMEOUT_MS); + + // 连接服务器 + int sshPort = port != null ? port : 22; + log.debug("连接SSH服务器: {}:{}", host, sshPort); + sshClient.connect(host, sshPort); + + // 认证 + if (privateKey != null && !privateKey.isEmpty()) { + // 密钥认证 + KeyProvider keyProvider; + if (passphrase != null && !passphrase.isEmpty()) { + keyProvider = sshClient.loadKeys(privateKey, null, + PasswordUtils.createOneOff(passphrase.toCharArray())); + } else { + keyProvider = sshClient.loadKeys(privateKey, null, null); + } + sshClient.authPublickey(username, keyProvider); + log.debug("使用密钥认证成功: {}", username); + } else if (password != null && !password.isEmpty()) { + // 密码认证 + sshClient.authPassword(username, password); + log.debug("使用密码认证成功: {}", username); + } else { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"密码和私钥不能同时为空"}); + } + + return sshClient; + + } catch (Exception e) { + // 连接失败,关闭客户端 + closeConnection(sshClient); + log.error("SSH连接失败: {}@{}:{}", username, host, port, e); + throw e; + } + } + + @Override + public String executeCommand(SSHClient sshClient, String command) throws Exception { + try (Session session = sshClient.startSession()) { + Session.Command cmd = session.exec(command); + cmd.join(COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + if (cmd.getExitStatus() == 0) { + String output = new String(cmd.getInputStream().readAllBytes()).trim(); + log.debug("命令执行成功: {} -> {}", command, output); + return output; + } else { + String error = new String(cmd.getErrorStream().readAllBytes()).trim(); + log.warn("命令执行失败: {} -> {}", command, error); + throw new BusinessException(ResponseCode.ERROR, + new Object[]{"命令执行失败: " + error}); + } + } catch (IOException e) { + log.error("执行SSH命令异常: {}", command, e); + throw new BusinessException(ResponseCode.ERROR, + new Object[]{"执行SSH命令异常: " + e.getMessage()}); + } + } + + @Override + public void closeConnection(SSHClient sshClient) { + if (sshClient != null) { + try { + if (sshClient.isConnected()) { + sshClient.disconnect(); + log.debug("SSH连接已关闭"); + } + } catch (IOException e) { + log.error("关闭SSH连接失败", e); + } + } + } + + /** + * 安全执行命令(子类使用) + * 出错时返回null而不是抛异常 + */ + protected String safeExecute(SSHClient sshClient, String command) { + try { + return executeCommand(sshClient, command); + } catch (Exception e) { + log.warn("执行命令失败: {}", command, e); + return null; + } + } + + /** + * 解析整数结果 + */ + protected Integer parseInteger(String value) { + if (value == null || value.isEmpty()) { + return null; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + log.warn("解析整数失败: {}", value); + return null; + } + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/ISSHCommandService.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/ISSHCommandService.java new file mode 100644 index 00000000..791e8ba0 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/ISSHCommandService.java @@ -0,0 +1,90 @@ +package com.qqchen.deploy.backend.framework.ssh; + +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import net.schmizz.sshj.SSHClient; + +/** + * SSH命令服务接口(Framework层) + * 封装了SSH连接和OS命令执行能力 + */ +public interface ISSHCommandService { + + /** + * 创建SSH连接 + * + * @param host IP地址 + * @param port SSH端口 + * @param username 用户名 + * @param password 密码(密码认证) + * @param privateKey 私钥(密钥认证) + * @param passphrase 私钥密码 + * @return SSH客户端 + * @throws Exception 连接失败时抛出 + */ + SSHClient createConnection(String host, Integer port, String username, + String password, String privateKey, String passphrase) throws Exception; + + /** + * 执行命令并返回结果 + * + * @param sshClient SSH客户端 + * @param command 要执行的命令 + * @return 命令输出结果 + * @throws Exception 执行失败时抛出 + */ + String executeCommand(SSHClient sshClient, String command) throws Exception; + + /** + * 关闭SSH连接 + * + * @param sshClient SSH客户端 + */ + void closeConnection(SSHClient sshClient); + + /** + * 获取主机名 + * + * @param sshClient SSH客户端 + * @return 主机名 + */ + String getHostname(SSHClient sshClient); + + /** + * 获取操作系统版本 + * + * @param sshClient SSH客户端 + * @return 操作系统版本 + */ + String getOsVersion(SSHClient sshClient); + + /** + * 获取CPU核心数 + * + * @param sshClient SSH客户端 + * @return CPU核心数 + */ + Integer getCpuCores(SSHClient sshClient); + + /** + * 获取内存大小(GB) + * + * @param sshClient SSH客户端 + * @return 内存大小 + */ + Integer getMemorySize(SSHClient sshClient); + + /** + * 获取磁盘大小(GB) + * + * @param sshClient SSH客户端 + * @return 磁盘大小 + */ + Integer getDiskSize(SSHClient sshClient); + + /** + * 获取支持的操作系统类型 + * + * @return OS类型枚举 + */ + OsTypeEnum getSupportedOsType(); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/SSHCommandServiceFactory.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/SSHCommandServiceFactory.java new file mode 100644 index 00000000..ab6afa0d --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/SSHCommandServiceFactory.java @@ -0,0 +1,78 @@ +package com.qqchen.deploy.backend.framework.ssh; + +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; +import com.qqchen.deploy.backend.framework.exception.BusinessException; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * SSH命令服务工厂(Framework层) + * 通过反射自动注册所有SSH命令服务实现 + */ +@Slf4j +@Component +public class SSHCommandServiceFactory { + + @Autowired(required = false) + private List sshCommandServices; + + private final Map serviceMap = new HashMap<>(); + + @PostConstruct + public void init() { + if (sshCommandServices == null || sshCommandServices.isEmpty()) { + log.warn("未发现任何SSH命令服务实现"); + return; + } + + for (ISSHCommandService service : sshCommandServices) { + OsTypeEnum osType = service.getSupportedOsType(); + serviceMap.put(osType, service); + log.info("注册SSH命令服务: {} -> {}", osType, service.getClass().getSimpleName()); + } + + log.info("SSH命令服务工厂初始化完成,共注册{}个服务", serviceMap.size()); + } + + /** + * 根据操作系统类型获取对应的SSH命令服务 + * + * @param osType 操作系统类型 + * @return SSH命令服务实现 + * @throws BusinessException 不支持的OS类型时抛出 + */ + public ISSHCommandService getService(OsTypeEnum osType) { + ISSHCommandService service = serviceMap.get(osType); + if (service == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"不支持的操作系统类型: " + osType}); + } + return service; + } + + /** + * 判断是否支持某个操作系统 + * + * @param osType 操作系统类型 + * @return 是否支持 + */ + public boolean isSupported(OsTypeEnum osType) { + return serviceMap.containsKey(osType); + } + + /** + * 获取所有支持的操作系统类型 + * + * @return 支持的OS类型集合 + */ + public java.util.Set getSupportedOsTypes() { + return serviceMap.keySet(); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/LinuxSSHCommandServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/LinuxSSHCommandServiceImpl.java new file mode 100644 index 00000000..22799726 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/LinuxSSHCommandServiceImpl.java @@ -0,0 +1,54 @@ +package com.qqchen.deploy.backend.framework.ssh.impl; + +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService; +import lombok.extern.slf4j.Slf4j; +import net.schmizz.sshj.SSHClient; +import org.springframework.stereotype.Service; + +/** + * Linux SSH命令服务实现(Framework层) + */ +@Slf4j +@Service +public class LinuxSSHCommandServiceImpl extends AbstractSSHCommandService { + + @Override + public String getHostname(SSHClient sshClient) { + return safeExecute(sshClient, "hostname"); + } + + @Override + public String getOsVersion(SSHClient sshClient) { + // 优先使用 /etc/os-release,fallback 到 lsb_release + String command = "cat /etc/os-release 2>/dev/null | grep PRETTY_NAME | cut -d'\"' -f2 || " + + "lsb_release -d 2>/dev/null | cut -f2 || " + + "cat /etc/issue | head -1"; + return safeExecute(sshClient, command); + } + + @Override + public Integer getCpuCores(SSHClient sshClient) { + String result = safeExecute(sshClient, "nproc"); + return parseInteger(result); + } + + @Override + public Integer getMemorySize(SSHClient sshClient) { + // 使用 free -g 获取GB单位的内存 + String result = safeExecute(sshClient, "free -g | grep Mem | awk '{print $2}'"); + return parseInteger(result); + } + + @Override + public Integer getDiskSize(SSHClient sshClient) { + // 获取根分区的磁盘大小(GB) + String result = safeExecute(sshClient, "df -BG / | tail -1 | awk '{print $2}' | sed 's/G//'"); + return parseInteger(result); + } + + @Override + public OsTypeEnum getSupportedOsType() { + return OsTypeEnum.LINUX; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/MacOSSSHCommandServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/MacOSSSHCommandServiceImpl.java new file mode 100644 index 00000000..441d9b75 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/MacOSSSHCommandServiceImpl.java @@ -0,0 +1,55 @@ +package com.qqchen.deploy.backend.framework.ssh.impl; + +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService; +import lombok.extern.slf4j.Slf4j; +import net.schmizz.sshj.SSHClient; +import org.springframework.stereotype.Service; + +/** + * MacOS SSH命令服务实现(Framework层) + */ +@Slf4j +@Service +public class MacOSSSHCommandServiceImpl extends AbstractSSHCommandService { + + @Override + public String getHostname(SSHClient sshClient) { + return safeExecute(sshClient, "hostname"); + } + + @Override + public String getOsVersion(SSHClient sshClient) { + // 获取MacOS版本信息 + String command = "sw_vers -productName && sw_vers -productVersion"; + return safeExecute(sshClient, command); + } + + @Override + public Integer getCpuCores(SSHClient sshClient) { + // 使用sysctl获取CPU核心数 + String result = safeExecute(sshClient, "sysctl -n hw.ncpu"); + return parseInteger(result); + } + + @Override + public Integer getMemorySize(SSHClient sshClient) { + // 获取内存大小(GB) + String command = "sysctl -n hw.memsize | awk '{print int($1/1024/1024/1024)}'"; + String result = safeExecute(sshClient, command); + return parseInteger(result); + } + + @Override + public Integer getDiskSize(SSHClient sshClient) { + // 获取根分区磁盘大小(GB) + String command = "df -g / | tail -1 | awk '{print $2}'"; + String result = safeExecute(sshClient, command); + return parseInteger(result); + } + + @Override + public OsTypeEnum getSupportedOsType() { + return OsTypeEnum.MACOS; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/WindowsSSHCommandServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/WindowsSSHCommandServiceImpl.java new file mode 100644 index 00000000..f5e28cea --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/impl/WindowsSSHCommandServiceImpl.java @@ -0,0 +1,57 @@ +package com.qqchen.deploy.backend.framework.ssh.impl; + +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.ssh.AbstractSSHCommandService; +import lombok.extern.slf4j.Slf4j; +import net.schmizz.sshj.SSHClient; +import org.springframework.stereotype.Service; + +/** + * Windows SSH命令服务实现(Framework层) + * 需要Windows服务器安装OpenSSH Server + */ +@Slf4j +@Service +public class WindowsSSHCommandServiceImpl extends AbstractSSHCommandService { + + @Override + public String getHostname(SSHClient sshClient) { + return safeExecute(sshClient, "hostname"); + } + + @Override + public String getOsVersion(SSHClient sshClient) { + // Windows版本信息 + return safeExecute(sshClient, "ver"); + } + + @Override + public Integer getCpuCores(SSHClient sshClient) { + // 使用环境变量获取CPU核心数 + String result = safeExecute(sshClient, "echo %NUMBER_OF_PROCESSORS%"); + return parseInteger(result); + } + + @Override + public Integer getMemorySize(SSHClient sshClient) { + // 使用PowerShell获取内存大小(GB) + String command = "powershell \"(Get-CimInstance Win32_PhysicalMemory | " + + "Measure-Object -Property capacity -Sum).sum /1gb\""; + String result = safeExecute(sshClient, command); + return parseInteger(result); + } + + @Override + public Integer getDiskSize(SSHClient sshClient) { + // 使用PowerShell获取C盘大小(GB) + String command = "powershell \"(Get-PSDrive C | " + + "Select-Object @{N='Size';E={[math]::Round($_.Free/1GB + $_.Used/1GB,0)}}).Size\""; + String result = safeExecute(sshClient, command); + return parseInteger(result); + } + + @Override + public OsTypeEnum getSupportedOsType() { + return OsTypeEnum.WINDOWS; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java new file mode 100644 index 00000000..baa002d0 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java @@ -0,0 +1,739 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum; +import com.qqchen.deploy.backend.framework.enums.ResponseCode; +import com.qqchen.deploy.backend.framework.enums.SSHDisconnectReason; +import com.qqchen.deploy.backend.framework.enums.SSHEvent; +import com.qqchen.deploy.backend.framework.enums.SSHMessageType; +import com.qqchen.deploy.backend.framework.enums.SSHStatusEnum; +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import com.qqchen.deploy.backend.framework.exception.BusinessException; +import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService; +import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; +import com.qqchen.deploy.backend.framework.utils.JsonUtils; +import com.qqchen.deploy.backend.framework.utils.SessionIdGenerator; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.connection.channel.direct.Session; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; + +/** + * SSH WebSocket 抽象处理器(Framework层) + * 提供通用的SSH WebSocket能力 + * + * 子类需要实现5个方法: + * 1. getSSHTarget(session) - 获取连接目标信息 + * 2. checkPermission(userId, target) - 权限验证 + * 3. getMaxSessions() - 最大并发连接数 + * 4. getPathPattern() - WebSocket路径模式 + * 5. onEvent(event, data) - 事件钩子(可选) + * + * @author Framework + */ +@Slf4j +public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { + + // ========== 会话存储 ========== + + protected final Map webSocketSessions = new ConcurrentHashMap<>(); + protected final Map sshClients = new ConcurrentHashMap<>(); + protected final Map sshShells = new ConcurrentHashMap<>(); + protected final Map> outputTasks = new ConcurrentHashMap<>(); + + // ========== 依赖注入 ========== + + /** + * SSH命令服务工厂(由子类注入) + */ + private final SSHCommandServiceFactory sshCommandServiceFactory; + + /** + * SSH输出监听线程池(Framework自动注入) + */ + @Resource(name = "sshOutputExecutor") + private AsyncTaskExecutor asyncTaskExecutor; + + /** + * SSH会话管理器(由子类注入) + */ + private final SSHSessionManager sessionManager; + + /** + * 会话目标信息存储:sessionId → SSHTarget + * 用于在事件中访问连接目标信息 + */ + private final Map sessionTargets = new ConcurrentHashMap<>(); + + /** + * 构造函数(子类必须调用) + * + * @param sshCommandServiceFactory SSH命令服务工厂 + * @param sessionManager SSH会话管理器 + */ + protected AbstractSSHWebSocketHandler( + SSHCommandServiceFactory sshCommandServiceFactory, + SSHSessionManager sessionManager) { + this.sshCommandServiceFactory = sshCommandServiceFactory; + this.sessionManager = sessionManager; + } + + // ========== 子类必须实现的抽象方法 ========== + + /** + * 获取SSH连接目标信息(由子类实现) + * + * @param session WebSocket会话 + * @return SSH目标信息 + * @throws Exception 获取失败时抛出 + */ + protected abstract SSHTarget getSSHTarget(WebSocketSession session) throws Exception; + + /** + * 检查用户权限(由子类实现) + * + * @param userId 用户ID + * @param target SSH目标 + * @return 是否有权限 + */ + protected abstract boolean checkPermission(Long userId, SSHTarget target); + + /** + * 获取最大并发连接数(由子类实现) + * + * @return 最大连接数 + */ + protected abstract int getMaxSessions(); + + /** + * 获取WebSocket路径模式(由子类实现) + * + * @return 路径模式,如 "/api/v1/server-ssh/connect/*" + */ + protected abstract String getPathPattern(); + + /** + * 事件钩子(可选,子类可以重写) + * 用于审计日志、统计等业务逻辑 + * + * @param event 事件类型 + * @param eventData 事件数据(强类型) + */ + protected void onEvent(SSHEvent event, SSHEventData eventData) { + // 默认空实现 + } + + /** + * 从WebSocketSession中获取SSH SessionId + * + * @param session WebSocketSession + * @return SSH SessionId + */ + private String getSessionId(WebSocketSession session) { + String sessionId = (String) session.getAttributes().get("sshSessionId"); + if (sessionId == null) { + // 兼容处理:如果没有sshSessionId,使用WebSocket原始ID + sessionId = session.getId(); + } + return sessionId; + } + + // ========== Framework 提供的核心能力 ========== + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + // ⚠️ 关键:生成线程安全的唯一SessionId(避免并发冲突) + // 使用增强型SessionId:WebSocket原始ID + 时间戳 + 序列号 + String sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); + + // 将生成的sessionId保存到WebSocketSession attributes中 + session.getAttributes().put("sshSessionId", sessionId); + + log.info("WebSocket连接建立: webSocketId={}, sshSessionId={}", session.getId(), sessionId); + + try { + // 1. 获取用户信息 + Long userId = (Long) session.getAttributes().get("userId"); + String username = (String) session.getAttributes().get("username"); + String clientIp = (String) session.getAttributes().get("clientIp"); + String userAgent = (String) session.getAttributes().get("userAgent"); + + if (userId == null) { + log.error("无法获取用户信息: sessionId={}", sessionId); + sendError(session, "认证失败"); + session.close(CloseStatus.POLICY_VIOLATION); + return; + } + + // 2. 获取连接目标 + SSHTarget target = getSSHTarget(session); + validateSSHTarget(target); + + // 保存target信息,供后续事件使用 + sessionTargets.put(sessionId, target); + + log.info("获取SSH目标成功: targetType={}, target={}", target.getTargetType(), target.getMetadata()); + + // 3. 触发连接前事件 + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .userId(userId) + .username(username) + .target(target) + .clientIp(clientIp) + .userAgent(userAgent) + .build(); + onEvent(SSHEvent.BEFORE_CONNECT, eventData); + + // 4. 权限验证 + if (!checkPermission(userId, target)) { + log.warn("用户无权访问目标: userId={}, target={}:{}", + userId, target.getTargetType(), target.getMetadata()); + sendError(session, "无权访问此目标"); + session.close(CloseStatus.POLICY_VIOLATION); + return; + } + + // 5. 提前注册会话(解决并发竞态问题) + sessionManager.registerSession(sessionId, userId, target.getTargetType(), target.getMetadata()); + boolean sessionRegistered = true; + + try { + // 6. 检查并发连接数(注册后立即检查) + long activeCount = sessionManager.countActiveSessions(userId, target.getTargetType(), target.getMetadata()); + if (activeCount > getMaxSessions()) { + log.warn("用户连接数超过限制: userId={}, target={}:{}, current={}, max={}", + userId, target.getTargetType(), target.getMetadata(), activeCount, getMaxSessions()); + sendError(session, "连接数超过限制(最多" + getMaxSessions() + "个)"); + sessionManager.removeSession(sessionId); + sessionRegistered = false; + session.close(CloseStatus.POLICY_VIOLATION); + return; + } + + // 7. 发送连接中状态 + sendStatus(session, SSHStatusEnum.CONNECTING); + + // 8. 建立SSH连接 + SSHClient sshClient = createSSHConnection(target); + sshClients.put(sessionId, sshClient); + + // 9. 打开Shell通道并分配PTY(伪终端) + Session sshSession = sshClient.startSession(); + + // ⚠️ 关键:分配PTY,启用交互式Shell、回显、提示符 + // 参数:终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式 + sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap()); + + log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId); + + Session.Shell shell = sshSession.startShell(); + log.debug("Shell已启动: sessionId={}", sessionId); + + // 保存会话信息 + webSocketSessions.put(sessionId, session); + sshShells.put(sessionId, shell); + + // 10. ⚠️ 优化:先启动输出监听线程,确保不错过任何SSH输出 + Future stdoutTask = asyncTaskExecutor.submit(() -> readSSHOutput(session, shell)); + outputTasks.put(sessionId, stdoutTask); + + // 同时启动错误流监听(某些SSH服务器会将输出发送到错误流) + Future stderrTask = asyncTaskExecutor.submit(() -> readSSHError(session, shell)); + outputTasks.put(sessionId + "_stderr", stderrTask); + + // 11. 发送连接成功状态 + sendStatus(session, SSHStatusEnum.CONNECTED); + log.info("SSH连接成功: sessionId={}, userId={}, target={}:{}", + sessionId, userId, target.getTargetType(), target.getMetadata()); + + // 12. ⚠️ 异步创建审计日志,不阻塞主线程 + // 使用CompletableFuture异步执行,避免数据库操作延迟影响SSH输出接收 + java.util.concurrent.CompletableFuture.runAsync(() -> { + try { + onEvent(SSHEvent.AFTER_CONNECT, eventData); + } catch (Exception ex) { + log.error("AFTER_CONNECT事件处理失败: sessionId={}", sessionId, ex); + } + }); + + } catch (Exception ex) { + // 如果会话已注册,需要清理 + if (sessionRegistered) { + sessionManager.removeSession(sessionId); + } + throw ex; // 重新抛出,由外层catch处理 + } + + } catch (Exception e) { + log.error("建立SSH连接失败: sessionId={}", sessionId, e); + sendError(session, "连接失败: " + e.getMessage()); + + // 构建失败事件数据 + SSHEventData failureEventData = SSHEventData.builder() + .sessionId(sessionId) + .errorMessage(e.getMessage()) + .exception(e) + .build(); + cleanupSession(sessionId, failureEventData); + + try { + session.close(CloseStatus.SERVER_ERROR); + } catch (IOException ex) { + log.error("关闭WebSocket会话失败: sessionId={}", sessionId, ex); + } + } + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + String sessionId = getSessionId(session); + + try { + SSHWebSocketMessage msg = JsonUtils.fromJson(message.getPayload(), SSHWebSocketMessage.class); + + if (msg.getType() == SSHMessageType.INPUT) { + // 用户输入命令 + Session.Shell shell = sshShells.get(sessionId); + if (shell != null) { + OutputStream outputStream = shell.getOutputStream(); + outputStream.write(msg.getData().getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + + // 触发命令事件 + SSHTarget target = sessionTargets.get(sessionId); + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .command(msg.getData()) + .target(target) + .build(); + onEvent(SSHEvent.ON_COMMAND, eventData); + } + } + } catch (Exception e) { + log.error("处理WebSocket消息失败: sessionId={}", sessionId, e); + } + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String sessionId = getSessionId(session); + log.info("WebSocket连接关闭: sessionId={}, status={}", sessionId, status); + + SSHTarget target = sessionTargets.get(sessionId); + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .target(target) + .closeStatus(status) + .build(); + + cleanupSession(sessionId, eventData); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + String sessionId = getSessionId(session); + log.error("WebSocket传输错误: sessionId={}", sessionId, exception); + + SSHTarget target = sessionTargets.get(sessionId); + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .target(target) + .errorMessage(exception.getMessage()) + .exception(exception) + .build(); + onEvent(SSHEvent.ON_ERROR, eventData); + + sendError(session, "传输错误: " + exception.getMessage()); + cleanupSession(sessionId, eventData); + + try { + session.close(CloseStatus.SERVER_ERROR); + } catch (IOException e) { + log.error("关闭WebSocket会话失败: sessionId={}", sessionId, e); + } + } + + // ========== 私有方法 ========== + + /** + * 创建SSH连接 + */ + private SSHClient createSSHConnection(SSHTarget target) throws Exception { + ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType()); + + String password = null, privateKey = null, passphrase = null; + + switch (target.getAuthType()) { + case PASSWORD: + if (target.getPassword() == null || target.getPassword().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"SSH密码不能为空"}); + } + password = target.getPassword(); + break; + + case KEY: + if (target.getPrivateKey() == null || target.getPrivateKey().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"SSH私钥不能为空"}); + } + privateKey = target.getPrivateKey(); + passphrase = target.getPassphrase(); + break; + + default: + throw new BusinessException(ResponseCode.INVALID_PARAM, + new Object[]{"不支持的认证类型: " + target.getAuthType()}); + } + + return sshService.createConnection(target.getHost(), target.getPort(), target.getUsername(), + password, privateKey, passphrase); + } + + /** + * 验证SSH目标参数 + */ + private void validateSSHTarget(SSHTarget target) { + if (target == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"SSH目标不能为空"}); + } + if (target.getTargetType() == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"目标类型不能为空"}); + } + if (target.getHost() == null || target.getHost().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"主机地址不能为空"}); + } + if (target.getUsername() == null || target.getUsername().isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"用户名不能为空"}); + } + if (target.getAuthType() == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"认证方式不能为空"}); + } + if (target.getOsType() == null) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"操作系统类型不能为空"}); + } + } + + /** + * 读取SSH输出并发送到前端 + */ + private void readSSHOutput(WebSocketSession session, Session.Shell shell) { + String sessionId = session.getId(); + log.debug("开始监听SSH输出: sessionId={}", sessionId); + + try { + InputStream inputStream = shell.getInputStream(); + byte[] buffer = new byte[1024]; + int len; + + log.debug("SSH输出流已获取,开始循环读取: sessionId={}", sessionId); + + while (session.isOpen() && (len = inputStream.read(buffer)) > 0) { + String output = new String(buffer, 0, len, StandardCharsets.UTF_8); + log.debug("读取到SSH输出: sessionId={}, length={}, content={}", + sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); + sendOutput(session, output); + log.debug("SSH输出已发送到前端: sessionId={}", sessionId); + } + + log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen()); + + } catch (java.io.InterruptedIOException e) { + // 线程被中断(正常的清理过程),检查是否是WebSocket关闭导致的 + if (!session.isOpen()) { + log.debug("SSH输出监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); + } else { + log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e); + // 只在session仍然打开时尝试发送错误消息 + try { + sendError(session, "SSH连接被中断"); + } catch (Exception ex) { + log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + } + } + } catch (IOException e) { + // 其他IO异常(真正的错误) + if (session.isOpen()) { + log.error("读取SSH输出失败: sessionId={}", sessionId, e); + try { + sendError(session, "读取SSH输出失败: " + e.getMessage()); + } catch (Exception ex) { + log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + } + } else { + log.debug("读取SSH输出时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); + } + } + } + + /** + * 读取SSH错误流并发送到前端 + * 某些SSH服务器可能将输出发送到标准错误流 + */ + private void readSSHError(WebSocketSession session, Session.Shell shell) { + String sessionId = session.getId(); + log.debug("开始监听SSH错误流: sessionId={}", sessionId); + + try { + InputStream errorStream = shell.getErrorStream(); + byte[] buffer = new byte[1024]; + int len; + + log.debug("SSH错误流已获取,开始循环读取: sessionId={}", sessionId); + + while (session.isOpen() && (len = errorStream.read(buffer)) > 0) { + String output = new String(buffer, 0, len, StandardCharsets.UTF_8); + log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}", + sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); + sendOutput(session, output); // 错误流也作为output发送到前端 + log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId); + } + + log.debug("SSH错误流监听结束: sessionId={}", sessionId); + + } catch (java.io.InterruptedIOException e) { + if (!session.isOpen()) { + log.debug("SSH错误流监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); + } else { + log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e); + } + } catch (IOException e) { + if (session.isOpen()) { + log.error("读取SSH错误流失败: sessionId={}", sessionId, e); + } else { + log.debug("读取SSH错误流时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); + } + } + } + + /** + * 清理单个会话(供子类调用,如gracefulShutdown) + * + * @param sessionId 会话ID + */ + protected void cleanupSingleSession(String sessionId) { + SSHTarget target = sessionTargets.get(sessionId); + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .target(target) + .build(); + cleanupSession(sessionId, eventData); + } + + /** + * 清理会话资源 + * + * @param sessionId 会话ID + * @param eventData 事件数据 + */ + private void cleanupSession(String sessionId, SSHEventData eventData) { + log.debug("清理会话资源: sessionId={}", sessionId); + + try { + // 1. 触发断开前事件(异步,不阻塞清理) + asyncTaskExecutor.submit(() -> { + try { + onEvent(SSHEvent.BEFORE_DISCONNECT, eventData); + } catch (Exception e) { + log.error("BEFORE_DISCONNECT事件处理失败: sessionId={}", sessionId, e); + } + }); + + // 2. 移除WebSocketSession + webSocketSessions.remove(sessionId); + + // 3. 取消输出监听任务 + Future stdoutTask = outputTasks.remove(sessionId); + if (stdoutTask != null && !stdoutTask.isDone()) { + stdoutTask.cancel(true); + } + + Future stderrTask = outputTasks.remove(sessionId + "_stderr"); + if (stderrTask != null && !stderrTask.isDone()) { + stderrTask.cancel(true); + } + + // 4. 关闭SSH Shell + Session.Shell shell = sshShells.remove(sessionId); + if (shell != null) { + try { + shell.close(); + } catch (IOException e) { + log.error("关闭SSH Shell失败: sessionId={}", sessionId, e); + } + } + + // 5. 关闭SSH客户端 + SSHClient sshClient = sshClients.remove(sessionId); + if (sshClient != null) { + try { + if (sshClient.isConnected()) { + sshClient.disconnect(); + } + } catch (IOException e) { + log.error("关闭SSH客户端失败: sessionId={}", sessionId, e); + } + } + + // 6. 从会话管理器移除(最重要,必须执行) + sessionManager.removeSession(sessionId); + + // 7. 移除target信息 + sessionTargets.remove(sessionId); + + log.info("会话资源清理完成: sessionId={}", sessionId); + + } finally { + // 8. 触发断开后事件(异步,即使清理失败也要触发) + asyncTaskExecutor.submit(() -> { + try { + onEvent(SSHEvent.AFTER_DISCONNECT, eventData); + } catch (Exception e) { + log.error("AFTER_DISCONNECT事件处理失败: sessionId={}", sessionId, e); + } + }); + } + } + + /** + * 发送输出消息到前端 + */ + protected void sendOutput(WebSocketSession session, String output) { + try { + SSHWebSocketMessage msg = SSHWebSocketMessage.output(output); + session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); + } catch (IOException e) { + log.error("发送输出消息失败: sessionId={}", session.getId(), e); + } + } + + /** + * 发送状态消息到前端 + */ + protected void sendStatus(WebSocketSession session, SSHStatusEnum status) { + try { + SSHWebSocketMessage msg = SSHWebSocketMessage.status(status); + session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); + } catch (IOException e) { + log.error("发送状态消息失败: sessionId={}", session.getId(), e); + } + } + + /** + * 发送错误消息到前端 + */ + protected void sendError(WebSocketSession session, String error) { + try { + SSHWebSocketMessage msg = SSHWebSocketMessage.error(error); + session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); + } catch (IOException e) { + log.error("发送错误消息失败: sessionId={}", session.getId(), e); + } + } + + /** + * 优雅下线:应用关闭时清理所有活跃的SSH会话(Framework统一实现) + * 使用 @PreDestroy 注解,确保在Spring容器销毁前执行 + */ + @jakarta.annotation.PreDestroy + public void gracefulShutdown() { + log.warn("====== SSH Handler准备关闭,开始优雅下线所有SSH会话 ======"); + log.warn("当前活跃SSH会话数: {}", webSocketSessions.size()); + + if (webSocketSessions.isEmpty()) { + log.info("没有活跃的SSH会话,跳过优雅下线"); + return; + } + + // 记录开始时间 + long startTime = System.currentTimeMillis(); + int successCount = 0; + int failureCount = 0; + + // 遍历所有活跃会话 + for (Map.Entry entry : webSocketSessions.entrySet()) { + String sessionId = entry.getKey(); + WebSocketSession session = entry.getValue(); + + try { + log.info("关闭SSH会话: sessionId={}", sessionId); + + // 1. 尝试向前端发送服务器下线通知 + try { + if (session.isOpen()) { + sendError(session, "服务器正在重启,连接即将关闭"); + // 给前端一点时间接收消息 + Thread.sleep(100); + } + } catch (Exception e) { + log.debug("发送下线通知失败: sessionId={}", sessionId, e); + } + + // 2. 触发BEFORE_SHUTDOWN事件(异步,但等待完成) + SSHTarget target = sessionTargets.get(sessionId); + Future shutdownEventTask = asyncTaskExecutor.submit(() -> { + try { + SSHEventData eventData = SSHEventData.builder() + .sessionId(sessionId) + .target(target) + .disconnectReason(SSHDisconnectReason.SERVER_SHUTDOWN.getCode()) + .disconnectReasonDesc(SSHDisconnectReason.SERVER_SHUTDOWN.getDescription()) + .build(); + onEvent(SSHEvent.BEFORE_SHUTDOWN, eventData); + } catch (Exception e) { + log.error("BEFORE_SHUTDOWN事件处理失败: sessionId={}", sessionId, e); + } + }); + + // 等待审计日志写入完成(最多1秒) + try { + shutdownEventTask.get(1, java.util.concurrent.TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("等待BEFORE_SHUTDOWN事件超时: sessionId={}", sessionId); + } + + // 3. 清理SSH资源 + try { + cleanupSingleSession(sessionId); + } catch (Exception e) { + log.error("清理SSH资源失败: sessionId={}", sessionId, e); + } + + // 4. 关闭WebSocket连接 + try { + if (session.isOpen()) { + session.close(new CloseStatus(1001, "服务器正在重启")); + } + } catch (Exception e) { + log.debug("关闭WebSocket失败: sessionId={}", sessionId, e); + } + + successCount++; + log.info("SSH会话关闭成功: sessionId={}", sessionId); + + } catch (Exception e) { + failureCount++; + log.error("关闭SSH会话失败: sessionId={}", sessionId, e); + } + } + + long duration = System.currentTimeMillis() - startTime; + log.warn("====== 优雅下线完成 ======"); + log.warn("总会话数: {}, 成功: {}, 失败: {}, 耗时: {}ms", + successCount + failureCount, successCount, failureCount, duration); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHEventData.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHEventData.java new file mode 100644 index 00000000..bec13d63 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHEventData.java @@ -0,0 +1,81 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import lombok.Builder; +import lombok.Data; +import org.springframework.web.socket.CloseStatus; + +/** + * SSH事件数据(Framework层) + * + * 强类型的事件数据类,替代 Map + * 提供类型安全、IDE提示和编译时检查 + */ +@Data +@Builder +public class SSHEventData { + + /** + * 会话ID(必需) + */ + private String sessionId; + + /** + * 用户ID(连接时必需) + */ + private Long userId; + + /** + * 用户名(连接时必需) + */ + private String username; + + /** + * SSH连接目标(必需) + */ + private SSHTarget target; + + /** + * 用户输入的命令(ON_COMMAND事件) + */ + private String command; + + /** + * WebSocket关闭状态(AFTER_DISCONNECT事件) + */ + private CloseStatus closeStatus; + + /** + * 错误信息(ON_ERROR事件) + */ + private String errorMessage; + + /** + * 异常对象(ON_ERROR事件) + */ + private Throwable exception; + + /** + * 断开原因(BEFORE_SHUTDOWN事件) + */ + private String disconnectReason; + + /** + * 断开原因描述(BEFORE_SHUTDOWN事件) + */ + private String disconnectReasonDesc; + + /** + * 客户端IP(连接时可选) + */ + private String clientIp; + + /** + * 客户端User-Agent(连接时可选) + */ + private String userAgent; + + /** + * 扩展数据(特殊场景) + */ + private java.util.Map metadata; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHSessionManager.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHSessionManager.java new file mode 100644 index 00000000..533d11cf --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHSessionManager.java @@ -0,0 +1,105 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * SSH会话管理器(Framework层) + * 管理所有活跃的SSH会话,提供并发控制能力 + */ +@Slf4j +@Component +public class SSHSessionManager { + + /** + * 会话信息存储:sessionId -> SSHSessionInfo + */ + private final Map sessions = new ConcurrentHashMap<>(); + + /** + * 注册会话 + * + * @param sessionId WebSocket会话ID + * @param userId 用户ID + * @param targetType 目标类型 + * @param targetId 目标ID + */ + public void registerSession(String sessionId, Long userId, SSHTargetType targetType, Object targetId) { + SSHSessionInfo info = new SSHSessionInfo(); + info.setSessionId(sessionId); + info.setUserId(userId); + info.setTargetType(targetType); + info.setTargetId(targetId); + info.setConnectTime(LocalDateTime.now()); + + sessions.put(sessionId, info); + log.debug("会话已注册: sessionId={}, userId={}, target={}:{}", + sessionId, userId, targetType, targetId); + } + + /** + * 移除会话 + * + * @param sessionId WebSocket会话ID + */ + public void removeSession(String sessionId) { + SSHSessionInfo removed = sessions.remove(sessionId); + if (removed != null) { + log.debug("会话已移除: sessionId={}, userId={}, target={}:{}", + sessionId, removed.getUserId(), removed.getTargetType(), removed.getTargetId()); + } + } + + /** + * 统计用户对指定目标的活跃会话数 + * + * @param userId 用户ID + * @param targetType 目标类型 + * @param targetId 目标ID + * @return 活跃会话数 + */ + public long countActiveSessions(Long userId, SSHTargetType targetType, Object targetId) { + return sessions.values().stream() + .filter(s -> s.getUserId().equals(userId) + && s.getTargetType() == targetType + && s.getTargetId().equals(targetId)) + .count(); + } + + /** + * 获取会话信息 + * + * @param sessionId WebSocket会话ID + * @return 会话信息 + */ + public SSHSessionInfo getSession(String sessionId) { + return sessions.get(sessionId); + } + + /** + * 获取所有活跃会话数 + * + * @return 会话总数 + */ + public int getTotalSessions() { + return sessions.size(); + } + + /** + * SSH会话信息 + */ + @Data + public static class SSHSessionInfo { + private String sessionId; + private Long userId; + private SSHTargetType targetType; + private Object targetId; + private LocalDateTime connectTime; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHTarget.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHTarget.java new file mode 100644 index 00000000..7facdc67 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHTarget.java @@ -0,0 +1,63 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum; +import com.qqchen.deploy.backend.framework.enums.OsTypeEnum; +import com.qqchen.deploy.backend.framework.enums.SSHTargetType; +import lombok.Data; + +/** + * SSH连接目标信息(Framework层) + */ +@Data +public class SSHTarget { + + /** + * 目标类型 + */ + private SSHTargetType targetType; + + /** + * 主机IP + */ + private String host; + + /** + * SSH端口 + */ + private Integer port; + + /** + * SSH用户名 + */ + private String username; + + /** + * 认证方式 + */ + private AuthTypeEnum authType; + + /** + * 密码(密码认证时使用) + */ + private String password; + + /** + * 私钥(密钥认证时使用) + */ + private String privateKey; + + /** + * 私钥密码短语(密钥认证时可选) + */ + private String passphrase; + + /** + * 操作系统类型(用于选择SSH服务) + */ + private OsTypeEnum osType; + + /** + * 业务自定义元数据(如serverId、podId等) + */ + private Object metadata; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java new file mode 100644 index 00000000..4c387e5f --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java @@ -0,0 +1,46 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; + +/** + * SSH WebSocket框架配置(Framework层) + * + * 职责: + * 1. 提供SSH输出监听专用的虚拟线程池 + * 2. 框架内部使用,业务层无需关心 + * + * @author Framework + */ +@Configuration +public class SSHWebSocketConfig { + + /** + * SSH输出监听专用线程池(虚拟线程) + * + * ⚠️ 为什么使用虚拟线程? + * 1. SSH输出监听是典型的**阻塞I/O密集型**任务 + * 2. 每个SSH连接需要2个长期阻塞的线程(stdout + stderr) + * 3. 虚拟线程几乎无资源开销,支持数百万并发 + * 4. 完美适配大量SSH长连接场景 + * + * 📊 性能对比: + * - 平台线程:50个SSH连接 = 100个线程 ≈ 100-200MB内存 ❌ + * - 虚拟线程:50个SSH连接 = 100个虚拟线程 ≈ 几MB内存 ✅ + */ + @Bean("sshOutputExecutor") + public AsyncTaskExecutor sshOutputExecutor() { + SimpleAsyncTaskExecutor executor = + new SimpleAsyncTaskExecutor("ssh-virtual-"); + + // 启用虚拟线程(Java 21+) + executor.setVirtualThreads(true); + + // 并发限制:-1表示无限制(虚拟线程资源消耗极低) + executor.setConcurrencyLimit(-1); + + return executor; + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketMessage.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketMessage.java new file mode 100644 index 00000000..6aadae2b --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketMessage.java @@ -0,0 +1,97 @@ +package com.qqchen.deploy.backend.framework.ssh.websocket; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.qqchen.deploy.backend.framework.enums.SSHMessageType; +import com.qqchen.deploy.backend.framework.enums.SSHStatusEnum; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * SSH WebSocket消息(Framework层) + * + * 统一的消息格式规范: + * { + * "type": "output" | "input" | "status" | "error", + * "data": "消息内容(字符串)", + * "timestamp": 1733475005408, + * "metadata": { ... } // 可选 + * } + */ +@Data +@NoArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SSHWebSocketMessage { + + /** + * 消息类型 + */ + private SSHMessageType type; + + /** + * 消息内容(字符串) + * - type=OUTPUT: 终端输出内容 + * - type=INPUT: 用户输入内容 + * - type=STATUS: 状态值(connecting/connected/reconnecting/disconnected/error) + * - type=ERROR: 错误描述信息 + */ + private String data; + + /** + * 消息时间戳(Unix毫秒) + * 服务端自动生成 + */ + private Long timestamp; + + /** + * 可选元数据 + * 通常为null,特殊场景下使用 + */ + private Map metadata; + + /** + * 构造函数(自动填充时间戳) + */ + public SSHWebSocketMessage(SSHMessageType type, String data) { + this.type = type; + this.data = data; + this.timestamp = System.currentTimeMillis(); + } + + /** + * 创建OUTPUT消息 + * + * @param data 终端输出内容 + */ + public static SSHWebSocketMessage output(String data) { + return new SSHWebSocketMessage(SSHMessageType.OUTPUT, data); + } + + /** + * 创建STATUS消息 + * + * @param status 状态枚举 + */ + public static SSHWebSocketMessage status(SSHStatusEnum status) { + return new SSHWebSocketMessage(SSHMessageType.STATUS, status.name().toLowerCase()); + } + + /** + * 创建ERROR消息 + * + * @param message 错误描述 + */ + public static SSHWebSocketMessage error(String message) { + return new SSHWebSocketMessage(SSHMessageType.ERROR, message); + } + + /** + * 创建INPUT消息(前端发送,后端解析用) + * + * @param data 用户输入内容 + */ + public static SSHWebSocketMessage input(String data) { + return new SSHWebSocketMessage(SSHMessageType.INPUT, data); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/SessionIdGenerator.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/SessionIdGenerator.java new file mode 100644 index 00000000..def63641 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/utils/SessionIdGenerator.java @@ -0,0 +1,126 @@ +package com.qqchen.deploy.backend.framework.utils; + +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 会话ID生成器(Framework层) + * + * 线程安全的SessionId生成,避免并发场景下的ID冲突 + * 使用策略:UUID + 时间戳 + 递增序列号 + */ +public class SessionIdGenerator { + + /** + * 重入锁,保证并发安全 + */ + private static final Lock lock = new ReentrantLock(); + + /** + * 递增序列号(用于同一毫秒内的并发请求) + */ + private static final AtomicLong sequence = new AtomicLong(0); + + /** + * 上次生成ID的时间戳 + */ + private static volatile long lastTimestamp = -1L; + + /** + * 生成唯一的SessionId + * + * 格式:{UUID}-{timestamp}-{sequence} + * 例如:f47ac10b-58cc-4372-a567-0e02b2c3d479-1733475005408-001 + * + * @return 唯一的SessionId + */ + public static String generateSessionId() { + lock.lock(); + try { + long timestamp = System.currentTimeMillis(); + + // 如果时间戳相同,递增序列号 + if (timestamp == lastTimestamp) { + sequence.incrementAndGet(); + } else { + // 新的毫秒,重置序列号 + sequence.set(0); + lastTimestamp = timestamp; + } + + // 生成UUID(去掉连字符,缩短长度) + String uuid = UUID.randomUUID().toString().replace("-", ""); + + // 组合:UUID + 时间戳 + 序列号(3位补零) + return String.format("%s-%d-%03d", uuid, timestamp, sequence.get()); + + } finally { + lock.unlock(); + } + } + + /** + * 生成短格式SessionId(更紧凑) + * + * 格式:{timestamp}{sequence}{randomSuffix} + * 例如:1733475005408001A3F9 + * + * @return 短格式唯一SessionId + */ + public static String generateShortSessionId() { + lock.lock(); + try { + long timestamp = System.currentTimeMillis(); + + if (timestamp == lastTimestamp) { + sequence.incrementAndGet(); + } else { + sequence.set(0); + lastTimestamp = timestamp; + } + + // 生成4位随机后缀 + String randomSuffix = UUID.randomUUID().toString().substring(0, 4).toUpperCase(); + + // 组合:时间戳 + 序列号(3位) + 随机后缀 + return String.format("%d%03d%s", timestamp, sequence.get(), randomSuffix); + + } finally { + lock.unlock(); + } + } + + /** + * 从WebSocket Session中生成增强的SessionId + * + * 在原始WebSocket SessionId基础上添加时间戳和序列号,确保绝对唯一 + * + * @param webSocketSessionId WebSocket原始SessionId + * @return 增强的SessionId + */ + public static String enhanceWebSocketSessionId(String webSocketSessionId) { + if (webSocketSessionId == null || webSocketSessionId.isEmpty()) { + return generateSessionId(); + } + + lock.lock(); + try { + long timestamp = System.currentTimeMillis(); + + if (timestamp == lastTimestamp) { + sequence.incrementAndGet(); + } else { + sequence.set(0); + lastTimestamp = timestamp; + } + + // WebSocket SessionId + 时间戳 + 序列号 + return String.format("%s-%d-%03d", webSocketSessionId, timestamp, sequence.get()); + + } finally { + lock.unlock(); + } + } +} diff --git a/backend/src/main/resources/db/changelog/changes/v1.0.0-schema.sql b/backend/src/main/resources/db/changelog/changes/v1.0.0-schema.sql index d4003929..09b346f7 100644 --- a/backend/src/main/resources/db/changelog/changes/v1.0.0-schema.sql +++ b/backend/src/main/resources/db/changelog/changes/v1.0.0-schema.sql @@ -903,6 +903,34 @@ CREATE TABLE deploy_team_environment_notification_config INDEX idx_deleted (deleted) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='团队环境通知配置表'; +-- Jenkins构建记录表 +CREATE TABLE deploy_jenkins_build +( + id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID', + create_by VARCHAR(100) NULL COMMENT '创建人', + create_time DATETIME(6) NULL COMMENT '创建时间', + update_by VARCHAR(100) NULL COMMENT '更新人', + update_time DATETIME(6) NULL COMMENT '更新时间', + version INT NOT NULL DEFAULT 1 COMMENT '版本号', + deleted BIT NOT NULL DEFAULT 0 COMMENT '是否删除', + + build_number INT NOT NULL COMMENT '构建编号', + build_status VARCHAR(50) NOT NULL COMMENT '构建状态(SUCCESS、FAILURE、UNSTABLE、ABORTED等)', + build_url VARCHAR(500) NOT NULL COMMENT '构建URL', + duration BIGINT NOT NULL COMMENT '构建持续时间(毫秒)', + start_time DATETIME(6) NOT NULL COMMENT '构建开始时间', + actions MEDIUMTEXT NULL COMMENT 'Jenkins构建动作JSON数据(可能包含大量参数、触发器等信息)', + external_system_id BIGINT NOT NULL COMMENT '外部系统ID(关联deploy_external_system)', + job_id BIGINT NOT NULL COMMENT '任务ID(关联deploy_jenkins_job)', + + UNIQUE INDEX uk_job_build (job_id, build_number), + INDEX idx_external_system (external_system_id), + INDEX idx_job (job_id), + INDEX idx_status (build_status), + INDEX idx_start_time (start_time), + INDEX idx_deleted (deleted) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Jenkins构建记录表'; + -- Jenkins构建通知记录表(记录通知状态,防止重复通知) CREATE TABLE deploy_jenkins_build_notification (