diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java index a78d365b..45b562d2 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ContainerSSHWebSocketHandler.java @@ -4,6 +4,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent; import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; import lombok.extern.slf4j.Slf4j; @@ -114,16 +115,21 @@ public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { * 5. 事件钩子(审计日志等业务逻辑) */ @Override - protected void onEvent(SSHEvent event, Map data) { - String sessionId = (String) data.get("sessionId"); - SSHTarget target = (SSHTarget) data.get("target"); + protected void onEvent(SSHEvent event, SSHEventData eventData) { + String sessionId = eventData.getSessionId(); + SSHTarget target = eventData.getTarget(); + + if (target == null) { + log.warn("事件处理失败,target为null: event={}, sessionId={}", event, sessionId); + return; + } Long containerId = (Long) target.getMetadata(); switch (event) { case AFTER_CONNECT: // 创建Docker审计日志 try { - Long userId = (Long) data.get("userId"); + Long userId = eventData.getUserId(); log.info("Docker容器连接审计: sessionId={}, containerId={}, userId={}", sessionId, containerId, userId); // TODO: dockerAuditLogService.createAuditLog(...) @@ -140,21 +146,21 @@ public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case ON_COMMAND: // 记录命令(可选) - String command = (String) data.get("command"); + String command = eventData.getCommand(); log.debug("Docker容器命令: sessionId={}, command={}", sessionId, command); // TODO: dockerAuditLogService.recordCommand(...) break; case ON_ERROR: // 记录错误 - String error = (String) data.get("error"); + String error = eventData.getErrorMessage(); log.error("Docker容器连接错误: sessionId={}, error={}", sessionId, error); // TODO: dockerAuditLogService.closeAuditLog(sessionId, "FAILED", error); break; case BEFORE_SHUTDOWN: // 优雅下线 - String reason = (String) data.get("reason"); + String reason = eventData.getDisconnectReason(); log.info("Docker容器优雅下线: sessionId={}, reason={}", sessionId, reason); // TODO: dockerAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); break; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java index c8156f77..1d0a09bd 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/PodSSHWebSocketHandler.java @@ -4,6 +4,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent; import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; import lombok.extern.slf4j.Slf4j; @@ -114,23 +115,27 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler { * 5. 事件钩子(审计日志等业务逻辑) */ @Override - protected void onEvent(SSHEvent event, Map data) { - String sessionId = (String) data.get("sessionId"); - SSHTarget target = (SSHTarget) data.get("target"); + protected void onEvent(SSHEvent event, SSHEventData eventData) { + String sessionId = eventData.getSessionId(); + SSHTarget target = eventData.getTarget(); + + if (target == null) { + log.warn("事件处理失败,target为null: event={}, sessionId={}", event, sessionId); + return; + } Long podId = (Long) target.getMetadata(); switch (event) { case AFTER_CONNECT: // 创建K8S审计日志 try { - Long userId = (Long) data.get("userId"); - String clientIp = (String) data.get("clientIp"); - String userAgent = (String) data.get("userAgent"); + Long userId = eventData.getUserId(); + String clientIp = eventData.getClientIp(); + String userAgent = eventData.getUserAgent(); // TODO: 调用K8S审计服务 // Long auditLogId = k8sAuditLogService.createAuditLog( // userId, podId, sessionId, clientIp, userAgent); - // data.put("auditLogId", auditLogId); log.info("K8S Pod连接审计日志已创建: sessionId={}, podId={}", sessionId, podId); } catch (Exception e) { @@ -151,7 +156,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case ON_COMMAND: // 记录命令到K8S审计日志 try { - String command = (String) data.get("command"); + String command = eventData.getCommand(); if (command != null && command.length() > 0) { // TODO: k8sAuditLogService.recordCommand(sessionId, command); } @@ -163,7 +168,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case ON_ERROR: // 记录错误 try { - String error = (String) data.get("error"); + String error = eventData.getErrorMessage(); // TODO: k8sAuditLogService.closeAuditLog(sessionId, "FAILED", error); log.error("K8S Pod连接错误: sessionId={}, error={}", sessionId, error); } catch (Exception e) { @@ -174,7 +179,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case BEFORE_SHUTDOWN: // 优雅下线前处理审计日志 try { - String reason = (String) data.get("reason"); + String reason = eventData.getDisconnectReason(); // TODO: k8sAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); log.info("K8S审计日志已更新(优雅下线): sessionId={}", sessionId); } catch (Exception e) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java index d9270400..c981d3f4 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/handler/ServerSSHWebSocketHandler.java @@ -7,6 +7,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent; import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler; +import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; import jakarta.annotation.Resource; @@ -111,24 +112,29 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { * 5. 事件钩子(审计日志等业务逻辑) */ @Override - protected void onEvent(SSHEvent event, Map data) { - String sessionId = (String) data.get("sessionId"); - SSHTarget target = (SSHTarget) data.get("target"); + protected void onEvent(SSHEvent event, SSHEventData eventData) { + String sessionId = eventData.getSessionId(); + SSHTarget target = eventData.getTarget(); + + if (target == null) { + log.warn("事件处理失败,target为null: event={}, sessionId={}", event, sessionId); + return; + } + Long serverId = (Long) target.getMetadata(); switch (event) { case AFTER_CONNECT: // 创建审计日志 try { - Long userId = (Long) data.get("userId"); - String clientIp = (String) data.get("clientIp"); - String userAgent = (String) data.get("userAgent"); + Long userId = eventData.getUserId(); + String clientIp = eventData.getClientIp(); + String userAgent = eventData.getUserAgent(); Server server = serverService.findEntityById(serverId); if (server != null) { Long auditLogId = auditLogService.createAuditLog( userId, server, sessionId, clientIp, userAgent); - data.put("auditLogId", auditLogId); log.info("审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId); } } catch (Exception e) { @@ -149,7 +155,7 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case ON_COMMAND: // 记录命令到审计日志 try { - String command = (String) data.get("command"); + String command = eventData.getCommand(); if (command != null && command.length() > 0) { auditLogService.recordCommand(sessionId, command); } @@ -161,8 +167,8 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case ON_ERROR: // 记录错误 try { - String error = (String) data.get("error"); - auditLogService.closeAuditLog(sessionId, "FAILED", error); + String errorMessage = eventData.getErrorMessage(); + auditLogService.closeAuditLog(sessionId, "FAILED", errorMessage); } catch (Exception e) { log.error("记录错误失败: sessionId={}", sessionId, e); } @@ -171,7 +177,7 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler { case BEFORE_SHUTDOWN: // 优雅下线前处理审计日志(Framework统一调用) try { - String reason = (String) data.get("reason"); + String reason = eventData.getDisconnectReason(); auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason); log.info("审计日志已更新(优雅下线): sessionId={}", sessionId); } catch (Exception e) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java index baa002d0..4f89b266 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java @@ -137,16 +137,22 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { } /** - * 从WebSocketSession中获取SSH SessionId + * 从WebSocketSession中获取SSH SessionId(懒加载模式) + * + * 如果session.attributes中没有sshSessionId,会自动生成并存储 + * 这是获取sessionId的唯一标准方法,保证sessionId的一致性 * * @param session WebSocketSession - * @return SSH SessionId + * @return SSH SessionId(增强后的唯一ID) */ private String getSessionId(WebSocketSession session) { String sessionId = (String) session.getAttributes().get("sshSessionId"); if (sessionId == null) { - // 兼容处理:如果没有sshSessionId,使用WebSocket原始ID - sessionId = session.getId(); + // 懒加载:如果没有,就生成并存储(保证一致性) + sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); + session.getAttributes().put("sshSessionId", sessionId); + log.debug("懒加载生成sessionId: webSocketId={}, sshSessionId={}", + session.getId(), sessionId); } return sessionId; } @@ -155,12 +161,9 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - // ⚠️ 关键:生成线程安全的唯一SessionId(避免并发冲突) - // 使用增强型SessionId:WebSocket原始ID + 时间戳 + 序列号 - String sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); - - // 将生成的sessionId保存到WebSocketSession attributes中 - session.getAttributes().put("sshSessionId", sessionId); + // ⚠️ 关键:使用统一的getSessionId方法获取线程安全的唯一SessionId + // 这会自动生成增强型SessionId:WebSocket原始ID + 时间戳 + 序列号 + String sessionId = getSessionId(session); log.info("WebSocket连接建立: webSocketId={}, sshSessionId={}", session.getId(), sessionId); @@ -248,6 +251,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { sshShells.put(sessionId, shell); // 10. ⚠️ 优化:先启动输出监听线程,确保不错过任何SSH输出 + // 直接传递session和shell,sessionId从session.attributes中获取 Future stdoutTask = asyncTaskExecutor.submit(() -> readSSHOutput(session, shell)); outputTasks.put(sessionId, stdoutTask); @@ -430,9 +434,13 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { /** * 读取SSH输出并发送到前端 + * + * @param session WebSocket会话 + * @param shell SSH Shell */ private void readSSHOutput(WebSocketSession session, Session.Shell shell) { - String sessionId = session.getId(); + // 从session.attributes中获取增强后的sessionId + String sessionId = getSessionId(session); log.debug("开始监听SSH输出: sessionId={}", sessionId); try { @@ -446,6 +454,8 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { String output = new String(buffer, 0, len, StandardCharsets.UTF_8); log.debug("读取到SSH输出: sessionId={}, length={}, content={}", sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); + + log.debug("准备发送输出到前端: sessionId={}, session.isOpen={}", sessionId, session.isOpen()); sendOutput(session, output); log.debug("SSH输出已发送到前端: sessionId={}", sessionId); } @@ -457,7 +467,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { if (!session.isOpen()) { log.debug("SSH输出监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); } else { - log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e); + log.warn("SSH输出监听线程被中断,但WebSocket仍打开: sessionId={}", sessionId); // 只在session仍然打开时尝试发送错误消息 try { sendError(session, "SSH连接被中断"); @@ -483,9 +493,13 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { /** * 读取SSH错误流并发送到前端 * 某些SSH服务器可能将输出发送到标准错误流 + * + * @param session WebSocket会话 + * @param shell SSH Shell */ private void readSSHError(WebSocketSession session, Session.Shell shell) { - String sessionId = session.getId(); + // 从session.attributes中获取增强后的sessionId + String sessionId = getSessionId(session); log.debug("开始监听SSH错误流: sessionId={}", sessionId); try { @@ -509,7 +523,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { if (!session.isOpen()) { log.debug("SSH错误流监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); } else { - log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e); + log.warn("SSH错误流监听线程被中断,但WebSocket仍打开: sessionId={}", sessionId); } } catch (IOException e) { if (session.isOpen()) { @@ -611,13 +625,32 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { /** * 发送输出消息到前端 + * + * @param session WebSocket会话 + * @param output 输出内容 */ protected void sendOutput(WebSocketSession session, String output) { + // 从session.attributes中获取sessionId + String sessionId = getSessionId(session); + log.debug("→ sendOutput开始: sessionId={}, outputLength={}", sessionId, output.length()); + try { + if (!session.isOpen()) { + log.warn("WebSocket已关闭,跳过发送输出: sessionId={}", sessionId); + return; + } + + log.debug(" ├─ 创建SSHWebSocketMessage: sessionId={}", sessionId); SSHWebSocketMessage msg = SSHWebSocketMessage.output(output); + + log.debug(" ├─ 准备调用session.sendMessage: sessionId={}", sessionId); session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); + + log.debug(" └─ sendOutput完成: sessionId={}", sessionId); } catch (IOException e) { - log.error("发送输出消息失败: sessionId={}", session.getId(), e); + log.error("发送输出消息失败(IOException): sessionId={}", sessionId, e); + } catch (Exception e) { + log.error("发送输出消息失败(Exception): sessionId={}", sessionId, e); } } @@ -638,10 +671,18 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { */ protected void sendError(WebSocketSession session, String error) { try { + if (!session.isOpen()) { + log.debug("WebSocket已关闭,跳过发送错误消息: sessionId={}", session.getId()); + return; + } SSHWebSocketMessage msg = SSHWebSocketMessage.error(error); session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); } catch (IOException e) { - log.error("发送错误消息失败: sessionId={}", session.getId(), e); + if (session.isOpen()) { + log.error("发送错误消息失败: sessionId={}", session.getId(), e); + } else { + log.debug("发送错误消息失败(session已关闭): sessionId={}", session.getId()); + } } }