增加ssh链接框架

This commit is contained in:
dengqichen 2025-12-06 17:22:40 +08:00
parent 5b6a06016e
commit 80c40bb6ab
4 changed files with 102 additions and 44 deletions

View File

@ -4,6 +4,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import lombok.extern.slf4j.Slf4j;
@ -114,16 +115,21 @@ public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
* 5. 事件钩子审计日志等业务逻辑
*/
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
protected void onEvent(SSHEvent event, SSHEventData eventData) {
String sessionId = eventData.getSessionId();
SSHTarget target = eventData.getTarget();
if (target == null) {
log.warn("事件处理失败target为null: event={}, sessionId={}", event, sessionId);
return;
}
Long containerId = (Long) target.getMetadata();
switch (event) {
case AFTER_CONNECT:
// 创建Docker审计日志
try {
Long userId = (Long) data.get("userId");
Long userId = eventData.getUserId();
log.info("Docker容器连接审计: sessionId={}, containerId={}, userId={}",
sessionId, containerId, userId);
// TODO: dockerAuditLogService.createAuditLog(...)
@ -140,21 +146,21 @@ public class ContainerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case ON_COMMAND:
// 记录命令可选
String command = (String) data.get("command");
String command = eventData.getCommand();
log.debug("Docker容器命令: sessionId={}, command={}", sessionId, command);
// TODO: dockerAuditLogService.recordCommand(...)
break;
case ON_ERROR:
// 记录错误
String error = (String) data.get("error");
String error = eventData.getErrorMessage();
log.error("Docker容器连接错误: sessionId={}, error={}", sessionId, error);
// TODO: dockerAuditLogService.closeAuditLog(sessionId, "FAILED", error);
break;
case BEFORE_SHUTDOWN:
// 优雅下线
String reason = (String) data.get("reason");
String reason = eventData.getDisconnectReason();
log.info("Docker容器优雅下线: sessionId={}, reason={}", sessionId, reason);
// TODO: dockerAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
break;

View File

@ -4,6 +4,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import lombok.extern.slf4j.Slf4j;
@ -114,23 +115,27 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
* 5. 事件钩子审计日志等业务逻辑
*/
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
protected void onEvent(SSHEvent event, SSHEventData eventData) {
String sessionId = eventData.getSessionId();
SSHTarget target = eventData.getTarget();
if (target == null) {
log.warn("事件处理失败target为null: event={}, sessionId={}", event, sessionId);
return;
}
Long podId = (Long) target.getMetadata();
switch (event) {
case AFTER_CONNECT:
// 创建K8S审计日志
try {
Long userId = (Long) data.get("userId");
String clientIp = (String) data.get("clientIp");
String userAgent = (String) data.get("userAgent");
Long userId = eventData.getUserId();
String clientIp = eventData.getClientIp();
String userAgent = eventData.getUserAgent();
// TODO: 调用K8S审计服务
// Long auditLogId = k8sAuditLogService.createAuditLog(
// userId, podId, sessionId, clientIp, userAgent);
// data.put("auditLogId", auditLogId);
log.info("K8S Pod连接审计日志已创建: sessionId={}, podId={}", sessionId, podId);
} catch (Exception e) {
@ -151,7 +156,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case ON_COMMAND:
// 记录命令到K8S审计日志
try {
String command = (String) data.get("command");
String command = eventData.getCommand();
if (command != null && command.length() > 0) {
// TODO: k8sAuditLogService.recordCommand(sessionId, command);
}
@ -163,7 +168,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case ON_ERROR:
// 记录错误
try {
String error = (String) data.get("error");
String error = eventData.getErrorMessage();
// TODO: k8sAuditLogService.closeAuditLog(sessionId, "FAILED", error);
log.error("K8S Pod连接错误: sessionId={}, error={}", sessionId, error);
} catch (Exception e) {
@ -174,7 +179,7 @@ public class PodSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case BEFORE_SHUTDOWN:
// 优雅下线前处理审计日志
try {
String reason = (String) data.get("reason");
String reason = eventData.getDisconnectReason();
// TODO: k8sAuditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
log.info("K8S审计日志已更新(优雅下线): sessionId={}", sessionId);
} catch (Exception e) {

View File

@ -7,6 +7,7 @@ import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.websocket.AbstractSSHWebSocketHandler;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHEventData;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHSessionManager;
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
import jakarta.annotation.Resource;
@ -111,24 +112,29 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
* 5. 事件钩子审计日志等业务逻辑
*/
@Override
protected void onEvent(SSHEvent event, Map<String, Object> data) {
String sessionId = (String) data.get("sessionId");
SSHTarget target = (SSHTarget) data.get("target");
protected void onEvent(SSHEvent event, SSHEventData eventData) {
String sessionId = eventData.getSessionId();
SSHTarget target = eventData.getTarget();
if (target == null) {
log.warn("事件处理失败target为null: event={}, sessionId={}", event, sessionId);
return;
}
Long serverId = (Long) target.getMetadata();
switch (event) {
case AFTER_CONNECT:
// 创建审计日志
try {
Long userId = (Long) data.get("userId");
String clientIp = (String) data.get("clientIp");
String userAgent = (String) data.get("userAgent");
Long userId = eventData.getUserId();
String clientIp = eventData.getClientIp();
String userAgent = eventData.getUserAgent();
Server server = serverService.findEntityById(serverId);
if (server != null) {
Long auditLogId = auditLogService.createAuditLog(
userId, server, sessionId, clientIp, userAgent);
data.put("auditLogId", auditLogId);
log.info("审计日志已创建: auditLogId={}, sessionId={}", auditLogId, sessionId);
}
} catch (Exception e) {
@ -149,7 +155,7 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case ON_COMMAND:
// 记录命令到审计日志
try {
String command = (String) data.get("command");
String command = eventData.getCommand();
if (command != null && command.length() > 0) {
auditLogService.recordCommand(sessionId, command);
}
@ -161,8 +167,8 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case ON_ERROR:
// 记录错误
try {
String error = (String) data.get("error");
auditLogService.closeAuditLog(sessionId, "FAILED", error);
String errorMessage = eventData.getErrorMessage();
auditLogService.closeAuditLog(sessionId, "FAILED", errorMessage);
} catch (Exception e) {
log.error("记录错误失败: sessionId={}", sessionId, e);
}
@ -171,7 +177,7 @@ public class ServerSSHWebSocketHandler extends AbstractSSHWebSocketHandler {
case BEFORE_SHUTDOWN:
// 优雅下线前处理审计日志Framework统一调用
try {
String reason = (String) data.get("reason");
String reason = eventData.getDisconnectReason();
auditLogService.closeAuditLog(sessionId, "SERVER_SHUTDOWN", reason);
log.info("审计日志已更新(优雅下线): sessionId={}", sessionId);
} catch (Exception e) {

View File

@ -137,16 +137,22 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
}
/**
* 从WebSocketSession中获取SSH SessionId
* 从WebSocketSession中获取SSH SessionId懒加载模式
*
* 如果session.attributes中没有sshSessionId会自动生成并存储
* 这是获取sessionId的唯一标准方法保证sessionId的一致性
*
* @param session WebSocketSession
* @return SSH SessionId
* @return SSH SessionId增强后的唯一ID
*/
private String getSessionId(WebSocketSession session) {
String sessionId = (String) session.getAttributes().get("sshSessionId");
if (sessionId == null) {
// 兼容处理如果没有sshSessionId使用WebSocket原始ID
sessionId = session.getId();
// 懒加载如果没有就生成并存储保证一致性
sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
session.getAttributes().put("sshSessionId", sessionId);
log.debug("懒加载生成sessionId: webSocketId={}, sshSessionId={}",
session.getId(), sessionId);
}
return sessionId;
}
@ -155,12 +161,9 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 关键生成线程安全的唯一SessionId避免并发冲突
// 使用增强型SessionIdWebSocket原始ID + 时间戳 + 序列号
String sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
// 将生成的sessionId保存到WebSocketSession attributes中
session.getAttributes().put("sshSessionId", sessionId);
// 关键使用统一的getSessionId方法获取线程安全的唯一SessionId
// 这会自动生成增强型SessionIdWebSocket原始ID + 时间戳 + 序列号
String sessionId = getSessionId(session);
log.info("WebSocket连接建立: webSocketId={}, sshSessionId={}", session.getId(), sessionId);
@ -248,6 +251,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
sshShells.put(sessionId, shell);
// 10. 优化先启动输出监听线程确保不错过任何SSH输出
// 直接传递session和shellsessionId从session.attributes中获取
Future<?> stdoutTask = asyncTaskExecutor.submit(() -> readSSHOutput(session, shell));
outputTasks.put(sessionId, stdoutTask);
@ -430,9 +434,13 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
/**
* 读取SSH输出并发送到前端
*
* @param session WebSocket会话
* @param shell SSH Shell
*/
private void readSSHOutput(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
// 从session.attributes中获取增强后的sessionId
String sessionId = getSessionId(session);
log.debug("开始监听SSH输出: sessionId={}", sessionId);
try {
@ -446,6 +454,8 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
log.debug("准备发送输出到前端: sessionId={}, session.isOpen={}", sessionId, session.isOpen());
sendOutput(session, output);
log.debug("SSH输出已发送到前端: sessionId={}", sessionId);
}
@ -457,7 +467,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
if (!session.isOpen()) {
log.debug("SSH输出监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH输出监听线程被异常中断: sessionId={}", sessionId, e);
log.warn("SSH输出监听线程被中断但WebSocket仍打开: sessionId={}", sessionId);
// 只在session仍然打开时尝试发送错误消息
try {
sendError(session, "SSH连接被中断");
@ -483,9 +493,13 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
/**
* 读取SSH错误流并发送到前端
* 某些SSH服务器可能将输出发送到标准错误流
*
* @param session WebSocket会话
* @param shell SSH Shell
*/
private void readSSHError(WebSocketSession session, Session.Shell shell) {
String sessionId = session.getId();
// 从session.attributes中获取增强后的sessionId
String sessionId = getSessionId(session);
log.debug("开始监听SSH错误流: sessionId={}", sessionId);
try {
@ -509,7 +523,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
if (!session.isOpen()) {
log.debug("SSH错误流监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.error("SSH错误流监听线程被异常中断: sessionId={}", sessionId, e);
log.warn("SSH错误流监听线程被中断但WebSocket仍打开: sessionId={}", sessionId);
}
} catch (IOException e) {
if (session.isOpen()) {
@ -611,13 +625,32 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
/**
* 发送输出消息到前端
*
* @param session WebSocket会话
* @param output 输出内容
*/
protected void sendOutput(WebSocketSession session, String output) {
// 从session.attributes中获取sessionId
String sessionId = getSessionId(session);
log.debug("→ sendOutput开始: sessionId={}, outputLength={}", sessionId, output.length());
try {
if (!session.isOpen()) {
log.warn("WebSocket已关闭跳过发送输出: sessionId={}", sessionId);
return;
}
log.debug(" ├─ 创建SSHWebSocketMessage: sessionId={}", sessionId);
SSHWebSocketMessage msg = SSHWebSocketMessage.output(output);
log.debug(" ├─ 准备调用session.sendMessage: sessionId={}", sessionId);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
log.debug(" └─ sendOutput完成: sessionId={}", sessionId);
} catch (IOException e) {
log.error("发送输出消息失败: sessionId={}", session.getId(), e);
log.error("发送输出消息失败(IOException): sessionId={}", sessionId, e);
} catch (Exception e) {
log.error("发送输出消息失败(Exception): sessionId={}", sessionId, e);
}
}
@ -638,10 +671,18 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
*/
protected void sendError(WebSocketSession session, String error) {
try {
if (!session.isOpen()) {
log.debug("WebSocket已关闭跳过发送错误消息: sessionId={}", session.getId());
return;
}
SSHWebSocketMessage msg = SSHWebSocketMessage.error(error);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
} catch (IOException e) {
if (session.isOpen()) {
log.error("发送错误消息失败: sessionId={}", session.getId(), e);
} else {
log.debug("发送错误消息失败session已关闭: sessionId={}", session.getId());
}
}
}