diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java index 2b140448..d2d5869c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java @@ -250,6 +250,15 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration { */ io.kubernetes.client.openapi.ApiClient getApiClient(ExternalSystem system); + /** + * 获取用于日志流的K8S ApiClient(长超时) + * 日志流是长连接,需要更长的读取超时时间 + * + * @param system K8S系统配置 + * @return ApiClient实例(readTimeout=30分钟) + */ + io.kubernetes.client.openapi.ApiClient getApiClientForLogStream(ExternalSystem system); + /** * 获取系统类型 * diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java index a5f9a594..0ae6d517 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java @@ -590,7 +590,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp // 智能默认参数:优化性能,避免超时 Integer effectiveTail = tail != null ? tail : 500; // 默认最后500行 Integer effectiveSinceSeconds = sinceSeconds != null ? sinceSeconds : 3600; // 默认最近1小时 - + log.info("查询K8S Pod日志,集群: {}, 命名空间: {}, Pod: {}, 容器: {}, tail: {}, sinceSeconds: {}", externalSystem.getName(), namespace, podName, container, effectiveTail, effectiveSinceSeconds); @@ -624,7 +624,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.warn("查询Pod日志返回空body: {}/{}", namespace, podName); return ""; } - + String logs = body.string(); int logLength = logs != null ? logs.length() : 0; log.info("查询Pod日志成功,日志长度: {} bytes, 行数约: {}", logLength, logLength > 0 ? logs.split("\n").length : 0); @@ -653,14 +653,14 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 重启Deployment(通过更新annotation触发滚动更新) - * + * *

实现原理:

* - * + * *

安全性说明:

* - * + * *

等价于执行命令:kubectl rollout restart deployment/{deploymentName} -n {namespace}

*/ @Override @@ -683,7 +683,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp // 生成ISO 8601格式的时间戳作为重启标记 String timestamp = java.time.format.DateTimeFormatter.ISO_INSTANT .format(java.time.Instant.now()); - + // 构建Strategic Merge Patch内容 // 只更新 spec.template.metadata.annotations 中的 kubectl.kubernetes.io/restartedAt 字段 // 其他所有字段(replicas、image、env、resources等)都不会受影响 @@ -692,7 +692,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp timestamp ); - log.info("执行Strategic Merge Patch,Deployment: {}/{}, 重启时间戳: {}", + log.info("执行Strategic Merge Patch,Deployment: {}/{}, 重启时间戳: {}", namespace, deploymentName, timestamp); // 使用PatchUtils进行strategic merge patch @@ -719,7 +719,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp } catch (ApiException e) { if (e.getCode() == 404) { - log.warn("重启失败:Deployment不存在,集群: {}, 命名空间: {}, Deployment: {}", + log.warn("重启失败:Deployment不存在,集群: {}, 命名空间: {}, Deployment: {}", externalSystem.getName(), namespace, deploymentName); throw new BusinessException(ResponseCode.K8S_RESOURCE_NOT_FOUND); } @@ -814,18 +814,18 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 批量计算namespace下所有Deployment的重启次数 * 性能优化:一次性查询所有Pod,在内存中按Deployment分组计算 - * + * * @param externalSystem K8s集群 - * @param namespace 命名空间 - * @param deployments Deployment列表 + * @param namespace 命名空间 + * @param deployments Deployment列表 * @return Map */ @Override public Map batchCalculateRestartCounts( - ExternalSystem externalSystem, - String namespace, - List deployments) { - + ExternalSystem externalSystem, + String namespace, + List deployments) { + log.debug("批量计算Deployment重启次数,集群: {}, 命名空间: {}, Deployment数量: {}", externalSystem.getName(), namespace, deployments.size()); @@ -834,7 +834,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp try { // 1. 一次性查询namespace下所有Pod List allPods = listPods(externalSystem, namespace); - + if (allPods.isEmpty()) { log.debug("命名空间 {} 下没有Pod", namespace); return restartCountMap; @@ -879,18 +879,18 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 批量计算namespace下所有Deployment的Pod统计信息(重启次数和实际Pod数量) * 性能优化:一次性查询所有Pod,在内存中按Deployment分组计算 - * + * * @param externalSystem K8s集群 - * @param namespace 命名空间 - * @param deployments Deployment列表 + * @param namespace 命名空间 + * @param deployments Deployment列表 * @return Map */ @Override public Map batchCalculatePodStats( - ExternalSystem externalSystem, - String namespace, - List deployments) { - + ExternalSystem externalSystem, + String namespace, + List deployments) { + log.debug("批量计算Deployment Pod统计信息,集群: {}, 命名空间: {}, Deployment数量: {}", externalSystem.getName(), namespace, deployments.size()); @@ -899,7 +899,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp try { // 1. 一次性查询namespace下所有Pod List allPods = listPods(externalSystem, namespace); - + if (allPods.isEmpty()) { log.debug("命名空间 {} 下没有Pod", namespace); return statsMap; @@ -911,7 +911,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp Map selector = deployment.getSelector(); if (selector == null || selector.isEmpty()) { log.debug("Deployment {} 没有selector,跳过", deployment.getName()); - statsMap.put(deployment.getName(), + statsMap.put(deployment.getName(), new IK8sServiceIntegration.PodStats(0, 0, 0, 0, 0, 0, 0, 0, 0, "0", "0", "0", "0")); continue; } @@ -925,7 +925,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp int totalRestartCount = matchedPods.stream() .mapToInt(pod -> pod.getRestartCount() != null ? pod.getRestartCount() : 0) .sum(); - + // 5. 计算Pod总数 int actualPodCount = matchedPods.size(); @@ -1003,16 +1003,16 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp (int) readyCount, notReadyCount, totalCpuRequest, totalMemoryRequest, totalCpuLimit, totalMemoryLimit ); - + statsMap.put(deployment.getName(), stats); - log.debug("Deployment {} 的统计信息: 重启次数={}, 实际Pod数={}, Running={}, Pending={}, Failed={}, Succeeded={}, Unknown={}, Ready={}, NotReady={}, CPU请求={}, 内存请求={}, CPU限制={}, 内存限制={}", + log.debug("Deployment {} 的统计信息: 重启次数={}, 实际Pod数={}, Running={}, Pending={}, Failed={}, Succeeded={}, Unknown={}, Ready={}, NotReady={}, CPU请求={}, 内存请求={}, CPU限制={}, 内存限制={}", deployment.getName(), totalRestartCount, actualPodCount, runningCount, pendingCount, failedCount, succeededCount, unknownCount, readyCount, notReadyCount, totalCpuRequest, totalMemoryRequest, totalCpuLimit, totalMemoryLimit); } catch (Exception e) { log.warn("计算Deployment {} 统计信息失败: {}", deployment.getName(), e.getMessage()); - statsMap.put(deployment.getName(), + statsMap.put(deployment.getName(), new IK8sServiceIntegration.PodStats(0, 0, 0, 0, 0, 0, 0, 0, 0, "0", "0", "0", "0")); } } @@ -1030,9 +1030,9 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 判断Pod的labels是否匹配Deployment的selector - * + * * @param podLabels Pod的labels - * @param selector Deployment的selector + * @param selector Deployment的selector * @return 是否匹配 */ private boolean matchesSelector(Map podLabels, Map selector) { @@ -1044,7 +1044,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp for (Map.Entry entry : selector.entrySet()) { String selectorValue = entry.getValue(); String podValue = podLabels.get(entry.getKey()); - + if (podValue == null || !podValue.equals(selectorValue)) { return false; } @@ -1075,7 +1075,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 提取Deployment中第一个容器的镜像 - * + * * @param deployment K8s Deployment对象 * @return 镜像名称,如果不存在则返回null */ @@ -1083,29 +1083,29 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp if (deployment.getSpec() == null) { return null; } - + V1PodTemplateSpec template = deployment.getSpec().getTemplate(); if (template == null) { return null; } - + V1PodSpec podSpec = template.getSpec(); if (podSpec == null) { return null; } - + List containers = podSpec.getContainers(); if (containers == null || containers.isEmpty()) { return null; } - + return containers.get(0).getImage(); } /** * 填充容器资源配置信息 * 按容器名称匹配,确保资源信息正确填充到对应容器 - * + * * @param specContainers Pod Spec中的容器列表 * @param containerInfos 响应对象中的容器信息列表 */ @@ -1113,11 +1113,11 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp if (specContainers == null || containerInfos == null) { return; } - + // 将Spec容器列表转换为Map,key为容器名称 Map specContainerMap = specContainers.stream() .collect(java.util.stream.Collectors.toMap(V1Container::getName, c -> c)); - + // 遍历ContainerInfo,根据名称匹配Spec容器 for (K8sPodResponse.ContainerInfo containerInfo : containerInfos) { V1Container specContainer = specContainerMap.get(containerInfo.getName()); @@ -1125,15 +1125,15 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.debug("容器 {} 在Spec中不存在,跳过资源填充", containerInfo.getName()); continue; } - + V1ResourceRequirements resources = specContainer.getResources(); if (resources == null) { continue; } - + // 填充资源请求(requests) fillResourceRequests(resources, containerInfo); - + // 填充资源限制(limits) fillResourceLimits(resources, containerInfo); } @@ -1141,8 +1141,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 填充容器资源请求配置 - * - * @param resources 资源配置对象 + * + * @param resources 资源配置对象 * @param containerInfo 容器信息对象 */ private void fillResourceRequests(V1ResourceRequirements resources, K8sPodResponse.ContainerInfo containerInfo) { @@ -1150,12 +1150,12 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp if (requests == null) { return; } - + io.kubernetes.client.custom.Quantity cpu = requests.get("cpu"); if (cpu != null) { containerInfo.setCpuRequest(cpu.toSuffixedString()); } - + io.kubernetes.client.custom.Quantity memory = requests.get("memory"); if (memory != null) { containerInfo.setMemoryRequest(memory.toSuffixedString()); @@ -1164,8 +1164,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 填充容器资源限制配置 - * - * @param resources 资源配置对象 + * + * @param resources 资源配置对象 * @param containerInfo 容器信息对象 */ private void fillResourceLimits(V1ResourceRequirements resources, K8sPodResponse.ContainerInfo containerInfo) { @@ -1173,12 +1173,12 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp if (limits == null) { return; } - + io.kubernetes.client.custom.Quantity cpu = limits.get("cpu"); if (cpu != null) { containerInfo.setCpuLimit(cpu.toSuffixedString()); } - + io.kubernetes.client.custom.Quantity memory = limits.get("memory"); if (memory != null) { containerInfo.setMemoryLimit(memory.toSuffixedString()); @@ -1188,7 +1188,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 获取K8S ApiClient(带缓存) * 复用内部缓存机制,避免重复创建连接 - * + * * @param system K8S系统配置 * @return ApiClient实例 */ @@ -1198,10 +1198,40 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp return cache.apiClient; } + /** + * 获取用于日志流的K8S ApiClient(长超时) + * 日志流是长连接,需要更长的读取超时时间 + * 注意:此方法每次调用都会创建新的ApiClient,不使用缓存 + * + * @param system K8S系统配置 + * @return ApiClient实例(readTimeout=30分钟) + */ + @Override + public ApiClient getApiClientForLogStream(ExternalSystem system) { + try { + String config = system.getConfig(); + if (config == null || config.trim().isEmpty()) { + throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY); + } + + ApiClient client = Config.fromConfig(new StringReader(config)); + client.setConnectTimeout(15000); // 15秒连接超时 + client.setReadTimeout(30 * 60 * 1000); // 30分钟读取超时(日志流长连接) + + log.debug("创建日志流专用ApiClient,readTimeout=30分钟"); + return client; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("创建日志流ApiClient失败: {}", e.getMessage(), e); + throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED); + } + } + /** * 关闭K8S ApiClient,释放OkHttp资源 * 包括连接池和调度器线程 - * + * * @param apiClient 要关闭的ApiClient */ private void closeApiClient(ApiClient apiClient) { @@ -1211,19 +1241,19 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp try { okhttp3.OkHttpClient httpClient = apiClient.getHttpClient(); - + // 关闭连接池,驱逐所有空闲连接 if (httpClient.connectionPool() != null) { httpClient.connectionPool().evictAll(); log.debug("已关闭K8S ApiClient连接池"); } - + // 关闭调度器线程池 if (httpClient.dispatcher() != null && httpClient.dispatcher().executorService() != null) { httpClient.dispatcher().executorService().shutdown(); log.debug("已关闭K8S ApiClient调度器"); } - + } catch (Exception e) { log.warn("关闭K8S ApiClient时发生异常(不影响主流程): {}", e.getMessage()); } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/DockerLogStreamStrategy.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/DockerLogStreamStrategy.java index ed9c188e..881d3fb1 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/DockerLogStreamStrategy.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/DockerLogStreamStrategy.java @@ -3,223 +3,116 @@ package com.qqchen.deploy.backend.deploy.strategy.log; import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum; import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; -import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy; +import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy; import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import net.schmizz.sshj.SSHClient; import net.schmizz.sshj.connection.channel.direct.Session; import org.springframework.stereotype.Component; -import org.springframework.web.socket.WebSocketSession; import java.io.BufferedReader; import java.io.InputStreamReader; import java.time.Instant; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * Docker日志流策略 - * 通过SSH连接执行docker logs -f命令获取日志流 * * @author qqchen * @since 2025-12-16 */ @Slf4j @Component -public class DockerLogStreamStrategy implements ILogStreamStrategy { +public class DockerLogStreamStrategy extends AbstractLogStreamStrategy { + + public record SSHConnection(SSHClient client, Session session, Session.Command command) {} @Resource private SSHCommandServiceFactory sshCommandServiceFactory; - /** - * ⚠️ 关键修复:保存每个会话的SSH连接引用,用于stop()时强制关闭 - * sessionId → SSHClient - */ - private final Map sshClients = new ConcurrentHashMap<>(); - - /** - * ⚠️ 关键修复:保存每个会话的SSH Session引用 - * sessionId → SSH Session - */ - private final Map sshSessions = new ConcurrentHashMap<>(); - - /** - * ⚠️ 关键修复:保存每个会话的Command引用 - * sessionId → Session.Command - */ - private final Map sshCommands = new ConcurrentHashMap<>(); - @Override public RuntimeTypeEnum supportedType() { return RuntimeTypeEnum.DOCKER; } @Override - public void streamLogs(WebSocketSession session, - LogStreamTarget target, - AtomicBoolean paused, - LogLineCallback callback) throws Exception { + protected SSHConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception { + log.info("创建Docker日志流连接: sessionId={}, container={}, host={}", + sessionId, target.getName(), target.getHost()); - // ⚠️ 关键修复:从session attributes获取增强后的sessionId(与AbstractLogStreamWebSocketHandler保持一致) - String sessionId = (String) session.getAttributes().get("logStreamSessionId"); - if (sessionId == null) { - sessionId = session.getId(); // 降级方案 + ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType()); + SSHClient sshClient = sshService.createConnection( + target.getHost(), target.getPort(), target.getUsername(), + target.getPassword(), target.getPrivateKey(), target.getPassphrase() + ); + + String command = String.format("docker logs -f %s --tail %d", + target.getName(), target.getLines()); + log.debug("Docker日志命令: {}", command); + + Session sshSession = sshClient.startSession(); + Session.Command cmd = sshSession.exec(command); + + return new SSHConnection(sshClient, sshSession, cmd); + } + + @Override + protected void doStreamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception { + + SSHConnection conn = getConnection(sessionId); + if (conn == null) { + log.warn("获取连接失败: sessionId={}", sessionId); + return; } - log.info("开始Docker日志流: sessionId={}, container={}, host={}, session.isOpen={}", - sessionId, target.getName(), target.getHost(), session.isOpen()); - - SSHClient sshClient = null; - Session sshSession = null; - Session.Command cmd = null; - - try { - // 1. 建立SSH连接 - ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType()); - sshClient = sshService.createConnection( - target.getHost(), - target.getPort(), - target.getUsername(), - target.getPassword(), - target.getPrivateKey(), - target.getPassphrase() - ); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.command().getInputStream()))) { - // ⚠️ 关键修复:保存SSH连接引用,供stop()方法使用 - sshClients.put(sessionId, sshClient); - - // 2. 构建docker logs命令 - String command = String.format("docker logs -f %s --tail %d", - target.getName(), target.getLines()); - - log.debug("执行Docker日志命令: {}", command); - - // 3. 执行命令 - sshSession = sshClient.startSession(); - sshSessions.put(sessionId, sshSession); - - cmd = sshSession.exec(command); - sshCommands.put(sessionId, cmd); - - // 4. 持续读取输出流 - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(cmd.getInputStream()))) { - - String line; - // ⚠️ 关键修复:增加线程中断检查,确保stop()能够中断阻塞的readLine() - while (session.isOpen() && !Thread.currentThread().isInterrupted()) { - line = reader.readLine(); - - // readLine()返回null表示流已关闭 - if (line == null) { - log.debug("SSH输出流已关闭: sessionId={}", sessionId); - break; - } - - // 检查暂停标志 - if (paused.get()) { - Thread.sleep(100); - continue; - } - - // 推送日志行(Docker日志没有时间戳,使用当前时间) - callback.sendLogLine(Instant.now().toString(), line); - } - - log.debug("Docker日志流正常结束: sessionId={}, session.isOpen={}, interrupted={}", - sessionId, session.isOpen(), Thread.currentThread().isInterrupted()); + String line; + while (!Thread.currentThread().isInterrupted()) { + line = reader.readLine(); + if (line == null) break; + if (isPaused.get()) { Thread.sleep(100); continue; } + callback.sendLogLine(Instant.now().toString(), line); } - - } catch (java.net.SocketException e) { - // Socket关闭是正常的清理流程 - log.debug("SSH Socket已关闭: sessionId={}", sessionId); - } catch (java.io.InterruptedIOException e) { - // 线程被中断,正常的清理流程 - log.debug("Docker日志流线程被中断: sessionId={}", sessionId); + } catch (java.net.SocketException | java.io.InterruptedIOException e) { + log.debug("Docker日志流中断: sessionId={}", sessionId); } catch (InterruptedException e) { - // 恢复中断状态 Thread.currentThread().interrupt(); - log.debug("Docker日志流线程被中断(sleep): sessionId={}", sessionId); - } catch (Exception e) { - if (session.isOpen()) { - log.error("Docker日志流异常: sessionId={}", sessionId, e); - throw e; - } else { - log.debug("Docker日志流异常(session已关闭): sessionId={}", sessionId); - } - } finally { - // ⚠️ 关键修复:确保资源被清理 - cleanupResources(sessionId, cmd, sshSession, sshClient); + } + + log.info("Docker日志流结束: sessionId={}", sessionId); + } + + @Override + protected void doStop(String sessionId, SSHConnection conn) { + if (conn.command() != null) { + try { conn.command().getInputStream().close(); } catch (Exception ignored) {} } } @Override - public void stop(String sessionId) { - log.info("停止Docker日志流: sessionId={}", sessionId); - - // ⚠️ 关键修复:主动清理SSH资源,不依赖finally块 - Session.Command cmd = sshCommands.remove(sessionId); - Session sshSession = sshSessions.remove(sessionId); - SSHClient sshClient = sshClients.remove(sessionId); - - cleanupResources(sessionId, cmd, sshSession, sshClient); - } - - /** - * ⚠️ 关键修复:统一的资源清理方法 - * 确保SSH连接、Session、Command都被正确关闭 - * - * @param sessionId 会话ID - * @param cmd SSH Command - * @param sshSession SSH Session - * @param sshClient SSH Client - */ - private void cleanupResources(String sessionId, Session.Command cmd, Session sshSession, SSHClient sshClient) { - // 1. 先关闭Command的输入输出流,强制中断阻塞的readLine() - if (cmd != null) { - try { - cmd.getInputStream().close(); - } catch (Exception e) { - log.debug("关闭Command输入流失败: sessionId={}", sessionId, e); - } - - try { - cmd.close(); - } catch (Exception e) { - log.debug("关闭Command失败: sessionId={}", sessionId, e); - } + protected void doCleanupConnection(String sessionId, SSHConnection conn) { + if (conn.command() != null) { + try { conn.command().getInputStream().close(); } catch (Exception ignored) {} + try { conn.command().close(); } catch (Exception ignored) {} } - // 2. 关闭SSH Session - if (sshSession != null) { - try { - sshSession.close(); - log.debug("SSH Session已关闭: sessionId={}", sessionId); - } catch (Exception e) { - log.debug("关闭SSH Session失败: sessionId={}", sessionId, e); - } + if (conn.session() != null) { + try { conn.session().close(); } catch (Exception ignored) {} } - // 3. 断开SSH Client - if (sshClient != null) { + if (conn.client() != null) { try { - if (sshClient.isConnected()) { - sshClient.disconnect(); - } - sshClient.close(); - log.debug("SSH Client已断开: sessionId={}", sessionId); - } catch (Exception e) { - log.debug("断开SSH Client失败: sessionId={}", sessionId, e); - } + if (conn.client().isConnected()) conn.client().disconnect(); + conn.client().close(); + } catch (Exception ignored) {} } - // 4. 从Map中移除引用 - sshCommands.remove(sessionId); - sshSessions.remove(sessionId); - sshClients.remove(sessionId); - - log.debug("Docker日志流资源清理完成: sessionId={}", sessionId); + log.debug("Docker连接清理完成: sessionId={}", sessionId); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/K8sLogStreamStrategy.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/K8sLogStreamStrategy.java index d967434c..edb81032 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/K8sLogStreamStrategy.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/K8sLogStreamStrategy.java @@ -6,32 +6,35 @@ import com.qqchen.deploy.backend.deploy.integration.IK8sServiceIntegration; import com.qqchen.deploy.backend.deploy.repository.IExternalSystemRepository; import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.exception.BusinessException; -import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy; +import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy; import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.apis.CoreV1Api; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import okhttp3.Call; +import okhttp3.Response; +import okhttp3.ResponseBody; import org.springframework.stereotype.Component; -import org.springframework.web.socket.WebSocketSession; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.time.Instant; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * K8S日志流策略 - * 使用Kubernetes Java Client API获取Pod日志流 * * @author qqchen * @since 2025-12-16 */ @Slf4j @Component -public class K8sLogStreamStrategy implements ILogStreamStrategy { +public class K8sLogStreamStrategy extends AbstractLogStreamStrategy { + + public record K8sConnection(ApiClient apiClient, Call call) {} @Resource private IK8sServiceIntegration k8sServiceIntegration; @@ -45,82 +48,106 @@ public class K8sLogStreamStrategy implements ILogStreamStrategy { } @Override - public void streamLogs(WebSocketSession session, - LogStreamTarget target, - AtomicBoolean paused, - LogLineCallback callback) throws Exception { + protected K8sConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception { + log.info("创建K8S日志流连接: sessionId={}, pod={}, namespace={}", + sessionId, target.getName(), target.getK8sNamespace()); - // 注意: 不要使用session.getId(),因为AbstractLogStreamWebSocketHandler使用的是增强后的sessionId - // 这里仅用于日志输出,实际的session管理由AbstractLogStreamWebSocketHandler负责 - String webSocketId = session.getId(); - log.info("开始K8S日志流: webSocketId={}, pod={}, namespace={}", - webSocketId, target.getName(), target.getK8sNamespace()); + ExternalSystem k8sSystem = externalSystemRepository.findById(target.getK8sSystemId()) + .orElseThrow(() -> new BusinessException(ResponseCode.K8S_SYSTEM_NOT_FOUND)); - try { - // 1. 获取K8S系统配置 - ExternalSystem k8sSystem = externalSystemRepository.findById(target.getK8sSystemId()) - .orElseThrow(() -> new BusinessException(ResponseCode.K8S_SYSTEM_NOT_FOUND)); - - // 2. 获取ApiClient(使用集成服务的缓存机制) - ApiClient apiClient = k8sServiceIntegration.getApiClient(k8sSystem); - CoreV1Api api = new CoreV1Api(apiClient); - - // 3. 调用K8S API获取日志流 - Call call = api.readNamespacedPodLogCall( - target.getName(), // podName - target.getK8sNamespace(), // namespace - null, // container (null = default container) - true, // follow = true (实时流) - null, // insecureSkipTLSVerifyBackend - null, // limitBytes - "false", // pretty - false, // previous - null, // sinceSeconds - target.getLines(), // tailLines - true, // timestamps - null // callback - ); - - // 4. 执行调用并获取Response(使用try-with-resources自动关闭) - try (okhttp3.Response response = call.execute()) { - InputStream inputStream = response.body().byteStream(); - - // 5. 持续读取日志流 - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - String line; - while (session.isOpen() && (line = reader.readLine()) != null) { - // 检查暂停标志 - if (paused.get()) { - Thread.sleep(100); - continue; - } - - // 解析K8S日志行(格式:timestamp content) - String[] parts = line.split(" ", 2); - String timestamp = parts.length > 0 ? parts[0] : Instant.now().toString(); - String content = parts.length > 1 ? parts[1] : line; - - // 推送日志行 - callback.sendLogLine(timestamp, content); - } - - log.debug("K8S日志流退出while循环: webSocketId={}, session.isOpen={}", - webSocketId, session.isOpen()); - } + ApiClient apiClient = k8sServiceIntegration.getApiClientForLogStream(k8sSystem); + CoreV1Api api = new CoreV1Api(apiClient); + + Call call = api.readNamespacedPodLogCall( + target.getName(), target.getK8sNamespace(), + null, true, null, null, "false", false, null, + target.getLines(), true, null + ); + + return new K8sConnection(apiClient, call); + } + + @Override + protected void doStreamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception { + + K8sConnection conn = getConnection(sessionId); + if (conn == null) { + log.warn("获取连接失败: sessionId={}", sessionId); + return; + } + + AtomicLong lineCount = new AtomicLong(0); + + try (Response response = conn.call().execute()) { + if (!response.isSuccessful()) { + String errorBody = response.body() != null ? response.body().string() : "无响应体"; + log.error("K8S日志API错误: sessionId={}, code={}, body={}", + sessionId, response.code(), errorBody); + throw new BusinessException(ResponseCode.K8S_OPERATION_FAILED); } - log.info("K8S日志流正常结束: webSocketId={}", webSocketId); + ResponseBody body = response.body(); + if (body == null) { + log.warn("K8S日志API返回空响应体: sessionId={}", sessionId); + return; + } - } catch (Exception e) { - log.error("K8S日志流异常: webSocketId={}", webSocketId, e); - throw e; + InputStream inputStream = body.byteStream(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + String line; + while (!Thread.currentThread().isInterrupted()) { + line = reader.readLine(); + if (line == null) break; + if (isPaused.get()) { Thread.sleep(100); continue; } + + String[] parts = line.split(" ", 2); + String timestamp = parts.length > 0 ? parts[0] : Instant.now().toString(); + String content = parts.length > 1 ? parts[1] : line; + + callback.sendLogLine(timestamp, content); + lineCount.incrementAndGet(); + } + } + } catch (java.net.SocketException | java.io.InterruptedIOException e) { + log.debug("K8S日志流中断: sessionId={}, lineCount={}", sessionId, lineCount.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + log.info("K8S日志流结束: sessionId={}, lineCount={}", sessionId, lineCount.get()); + } + + @Override + protected void doStop(String sessionId, K8sConnection conn) { + if (conn.call() != null && !conn.call().isCanceled()) { + conn.call().cancel(); + log.debug("K8S Call已取消: sessionId={}", sessionId); } } @Override - public void stop(String sessionId) { - log.info("停止K8S日志流: logStreamSessionId={}", sessionId); - // K8S使用Kubernetes API,连接由ApiClient管理,无需手动清理 - // 当输入流关闭时,K8S API会自动断开连接 + protected void doCleanupConnection(String sessionId, K8sConnection conn) { + if (conn.call() != null && !conn.call().isCanceled()) { + try { conn.call().cancel(); } catch (Exception ignored) {} + } + + if (conn.apiClient() != null) { + try { + okhttp3.OkHttpClient httpClient = conn.apiClient().getHttpClient(); + if (httpClient.connectionPool() != null) { + httpClient.connectionPool().evictAll(); + } + if (httpClient.dispatcher() != null && + httpClient.dispatcher().executorService() != null) { + httpClient.dispatcher().executorService().shutdown(); + } + } catch (Exception ignored) {} + } + + log.debug("K8S连接清理完成: sessionId={}", sessionId); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/ServerLogStreamStrategy.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/ServerLogStreamStrategy.java index 7b7b1d40..d2775933 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/ServerLogStreamStrategy.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/strategy/log/ServerLogStreamStrategy.java @@ -3,223 +3,115 @@ package com.qqchen.deploy.backend.deploy.strategy.log; import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum; import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; -import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy; +import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy; import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import net.schmizz.sshj.SSHClient; import net.schmizz.sshj.connection.channel.direct.Session; import org.springframework.stereotype.Component; -import org.springframework.web.socket.WebSocketSession; import java.io.BufferedReader; import java.io.InputStreamReader; import java.time.Instant; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * Server日志流策略 - * 通过SSH连接执行tail -f命令获取日志流 * * @author qqchen * @since 2025-12-16 */ @Slf4j @Component -public class ServerLogStreamStrategy implements ILogStreamStrategy { +public class ServerLogStreamStrategy extends AbstractLogStreamStrategy { + + public record SSHConnection(SSHClient client, Session session, Session.Command command) {} @Resource private SSHCommandServiceFactory sshCommandServiceFactory; - /** - * ⚠️ 关键修复:保存每个会话的SSH连接引用,用于stop()时强制关闭 - * sessionId → SSHClient - */ - private final Map sshClients = new ConcurrentHashMap<>(); - - /** - * ⚠️ 关键修复:保存每个会话的SSH Session引用 - * sessionId → SSH Session - */ - private final Map sshSessions = new ConcurrentHashMap<>(); - - /** - * ⚠️ 关键修复:保存每个会话的Command引用 - * sessionId → Session.Command - */ - private final Map sshCommands = new ConcurrentHashMap<>(); - @Override public RuntimeTypeEnum supportedType() { return RuntimeTypeEnum.SERVER; } @Override - public void streamLogs(WebSocketSession session, - LogStreamTarget target, - AtomicBoolean paused, - LogLineCallback callback) throws Exception { + protected SSHConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception { + log.info("创建Server日志流连接: sessionId={}, logFile={}, host={}", + sessionId, target.getLogFilePath(), target.getHost()); - // ⚠️ 关键修复:从session attributes获取增强后的sessionId(与AbstractLogStreamWebSocketHandler保持一致) - String sessionId = (String) session.getAttributes().get("logStreamSessionId"); - if (sessionId == null) { - sessionId = session.getId(); // 降级方案 + ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType()); + SSHClient sshClient = sshService.createConnection( + target.getHost(), target.getPort(), target.getUsername(), + target.getPassword(), target.getPrivateKey(), target.getPassphrase() + ); + + String command = target.getLogFilePath() + " -n " + target.getLines(); + log.debug("Server日志命令: {}", command); + + Session sshSession = sshClient.startSession(); + Session.Command cmd = sshSession.exec(command); + + return new SSHConnection(sshClient, sshSession, cmd); + } + + @Override + protected void doStreamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception { + + SSHConnection conn = getConnection(sessionId); + if (conn == null) { + log.warn("获取连接失败: sessionId={}", sessionId); + return; } - log.info("开始Server日志流: sessionId={}, logFile={}, host={}, session.isOpen={}", - sessionId, target.getLogFilePath(), target.getHost(), session.isOpen()); - - SSHClient sshClient = null; - Session sshSession = null; - Session.Command cmd = null; - - try { - // 1. 建立SSH连接 - ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType()); - sshClient = sshService.createConnection( - target.getHost(), - target.getPort(), - target.getUsername(), - target.getPassword(), - target.getPrivateKey(), - target.getPassphrase() - ); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.command().getInputStream()))) { - // ⚠️ 关键修复:保存SSH连接引用,供stop()方法使用 - sshClients.put(sessionId, sshClient); - - // 2. 构建tail命令 - // logQueryCommand已经包含完整的tail命令,只需要在后面加上-n参数 - String command = target.getLogFilePath() + " -n " + target.getLines(); - - log.debug("执行Server日志命令: {}", command); - - // 3. 执行命令 - sshSession = sshClient.startSession(); - sshSessions.put(sessionId, sshSession); - - cmd = sshSession.exec(command); - sshCommands.put(sessionId, cmd); - - // 4. 持续读取输出流 - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(cmd.getInputStream()))) { - - String line; - // ⚠️ 关键修复:增加线程中断检查,确保stop()能够中断阻塞的readLine() - while (session.isOpen() && !Thread.currentThread().isInterrupted()) { - line = reader.readLine(); - - // readLine()返回null表示流已关闭 - if (line == null) { - log.debug("SSH输出流已关闭: sessionId={}", sessionId); - break; - } - - // 检查暂停标志 - if (paused.get()) { - Thread.sleep(100); - continue; - } - - // 推送日志行(使用当前时间作为时间戳) - callback.sendLogLine(Instant.now().toString(), line); - } - - log.debug("Server日志流正常结束: sessionId={}, session.isOpen={}, interrupted={}", - sessionId, session.isOpen(), Thread.currentThread().isInterrupted()); + String line; + while (!Thread.currentThread().isInterrupted()) { + line = reader.readLine(); + if (line == null) break; + if (isPaused.get()) { Thread.sleep(100); continue; } + callback.sendLogLine(Instant.now().toString(), line); } - - } catch (java.net.SocketException e) { - // Socket关闭是正常的清理流程 - log.debug("SSH Socket已关闭: sessionId={}", sessionId); - } catch (java.io.InterruptedIOException e) { - // 线程被中断,正常的清理流程 - log.debug("Server日志流线程被中断: sessionId={}", sessionId); + } catch (java.net.SocketException | java.io.InterruptedIOException e) { + log.debug("Server日志流中断: sessionId={}", sessionId); } catch (InterruptedException e) { - // 恢复中断状态 Thread.currentThread().interrupt(); - log.debug("Server日志流线程被中断(sleep): sessionId={}", sessionId); - } catch (Exception e) { - if (session.isOpen()) { - log.error("Server日志流异常: sessionId={}", sessionId, e); - throw e; - } else { - log.debug("Server日志流异常(session已关闭): sessionId={}", sessionId); - } - } finally { - // ⚠️ 关键修复:确保资源被清理 - cleanupResources(sessionId, cmd, sshSession, sshClient); + } + + log.info("Server日志流结束: sessionId={}", sessionId); + } + + @Override + protected void doStop(String sessionId, SSHConnection conn) { + if (conn.command() != null) { + try { conn.command().getInputStream().close(); } catch (Exception ignored) {} } } @Override - public void stop(String sessionId) { - log.info("停止Server日志流: sessionId={}", sessionId); - - // ⚠️ 关键修复:主动清理SSH资源,不依赖finally块 - Session.Command cmd = sshCommands.remove(sessionId); - Session sshSession = sshSessions.remove(sessionId); - SSHClient sshClient = sshClients.remove(sessionId); - - cleanupResources(sessionId, cmd, sshSession, sshClient); - } - - /** - * ⚠️ 关键修复:统一的资源清理方法 - * 确保SSH连接、Session、Command都被正确关闭 - * - * @param sessionId 会话ID - * @param cmd SSH Command - * @param sshSession SSH Session - * @param sshClient SSH Client - */ - private void cleanupResources(String sessionId, Session.Command cmd, Session sshSession, SSHClient sshClient) { - // 1. 先关闭Command的输入输出流,强制中断阻塞的readLine() - if (cmd != null) { - try { - cmd.getInputStream().close(); - } catch (Exception e) { - log.debug("关闭Command输入流失败: sessionId={}", sessionId, e); - } - - try { - cmd.close(); - } catch (Exception e) { - log.debug("关闭Command失败: sessionId={}", sessionId, e); - } + protected void doCleanupConnection(String sessionId, SSHConnection conn) { + if (conn.command() != null) { + try { conn.command().getInputStream().close(); } catch (Exception ignored) {} + try { conn.command().close(); } catch (Exception ignored) {} } - // 2. 关闭SSH Session - if (sshSession != null) { - try { - sshSession.close(); - log.debug("SSH Session已关闭: sessionId={}", sessionId); - } catch (Exception e) { - log.debug("关闭SSH Session失败: sessionId={}", sessionId, e); - } + if (conn.session() != null) { + try { conn.session().close(); } catch (Exception ignored) {} } - // 3. 断开SSH Client - if (sshClient != null) { + if (conn.client() != null) { try { - if (sshClient.isConnected()) { - sshClient.disconnect(); - } - sshClient.close(); - log.debug("SSH Client已断开: sessionId={}", sessionId); - } catch (Exception e) { - log.debug("断开SSH Client失败: sessionId={}", sessionId, e); - } + if (conn.client().isConnected()) conn.client().disconnect(); + conn.client().close(); + } catch (Exception ignored) {} } - // 4. 从Map中移除引用 - sshCommands.remove(sessionId); - sshSessions.remove(sessionId); - sshClients.remove(sessionId); - - log.debug("Server日志流资源清理完成: sessionId={}", sessionId); + log.debug("Server连接清理完成: sessionId={}", sessionId); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamStrategy.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamStrategy.java new file mode 100644 index 00000000..a7244028 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamStrategy.java @@ -0,0 +1,130 @@ +package com.qqchen.deploy.backend.framework.websocket.log; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * 日志流策略抽象基类 + * + *

使用模板方法模式,统一管理连接的生命周期。 + * + *

职责边界: + *

    + *
  • 只负责连接管理(创建、存储、清理)
  • + *
  • 只负责日志读取逻辑
  • + *
  • 不涉及 WebSocket 协议(sessionId 由 Handler 传入)
  • + *
+ * + * @param 连接类型(子类定义的 record) + * @author Framework + * @since 2025-12-30 + */ +@Slf4j +public abstract class AbstractLogStreamStrategy implements ILogStreamStrategy { + + /** + * 连接容器:sessionId → 连接对象 + */ + private final Map connections = new ConcurrentHashMap<>(); + + // ==================== 模板方法 ==================== + + @Override + public final void streamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception { + try { + // 1. 创建连接 + T connection = doCreateConnection(sessionId, target); + if (connection != null) { + connections.put(sessionId, connection); + } + + // 2. 执行日志流读取 + doStreamLogs(sessionId, target, isPaused, callback); + + } finally { + // 3. 清理连接 + cleanup(sessionId); + } + } + + @Override + public final void stop(String sessionId) { + log.info("停止日志流: sessionId={}, strategy={}", sessionId, getClass().getSimpleName()); + + T connection = connections.remove(sessionId); + if (connection == null) { + log.debug("连接已被清理或不存在: sessionId={}", sessionId); + return; + } + + try { + doStop(sessionId, connection); + } catch (Exception e) { + log.warn("停止逻辑异常: sessionId={}", sessionId, e); + } + + try { + doCleanupConnection(sessionId, connection); + } catch (Exception e) { + log.warn("清理连接异常: sessionId={}", sessionId, e); + } + } + + /** + * 流结束时清理(由 finally 块调用) + */ + private void cleanup(String sessionId) { + T connection = connections.remove(sessionId); + if (connection == null) { + return; + } + + log.debug("流结束清理连接: sessionId={}", sessionId); + + try { + doCleanupConnection(sessionId, connection); + } catch (Exception e) { + log.warn("清理连接异常: sessionId={}", sessionId, e); + } + } + + // ==================== 连接访问 ==================== + + /** + * 获取连接(供子类在 doStreamLogs 中使用) + */ + protected T getConnection(String sessionId) { + return connections.get(sessionId); + } + + // ==================== 抽象方法 ==================== + + /** + * 创建连接 + */ + protected abstract T doCreateConnection(String sessionId, LogStreamTarget target) throws Exception; + + /** + * 执行日志流读取 + */ + protected abstract void doStreamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception; + + /** + * 停止逻辑(如取消正在进行的 IO 操作) + */ + protected abstract void doStop(String sessionId, T connection); + + /** + * 清理连接资源 + */ + protected abstract void doCleanupConnection(String sessionId, T connection); +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamWebSocketHandler.java index 06525fa8..55330050 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/AbstractLogStreamWebSocketHandler.java @@ -22,7 +22,6 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -30,12 +29,16 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * 抽象日志流WebSocket处理器(Framework层) - * 提供通用的日志流WebSocket能力 * - * 子类需要实现3个方法: - * 1. getLogStreamTarget(session) - 获取日志流目标信息 - * 2. checkPermission(userId, target) - 权限验证 - * 3. streamLogs(session, target, paused) - 执行日志流推送 + *

使用模板方法模式,提供通用的日志流WebSocket能力。 + * 子类需要实现3个抽象方法来定制业务逻辑。 + * + *

设计要点: + *

    + *
  • 单例Bean,通过sessionId隔离不同用户的会话
  • + *
  • 使用ConcurrentHashMap保证线程安全
  • + *
  • 每个session独立的callback,日志不会串
  • + *
* * @author Framework * @since 2025-12-16 @@ -43,140 +46,78 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHandler { - // ========== 会话存储 ========== + // ==================== 会话数据结构 ==================== /** - * WebSocket会话存储:sessionId → WebSocketSession + * 日志流会话(封装单个连接的所有状态) + * + * @param webSocket WebSocket会话(用于主动关闭、统计) + * @param task 异步任务(用于取消) + * @param paused 暂停标志 + * @param strategy 日志流策略 + * @param userId 用户ID(用于SSH配额管理) */ - protected final Map webSocketSessions = new ConcurrentHashMap<>(); + public record LogStreamSession( + WebSocketSession webSocket, + Future task, + AtomicBoolean paused, + ILogStreamStrategy strategy, + Long userId + ) {} /** - * 日志流任务存储:sessionId → Future + * 会话存储:sessionId → LogStreamSession + *

一个Map管理所有会话状态,保证原子性 */ - protected final Map> streamTasks = new ConcurrentHashMap<>(); + private final Map sessions = new ConcurrentHashMap<>(); - /** - * 暂停标志存储:sessionId → AtomicBoolean - */ - protected final Map pausedFlags = new ConcurrentHashMap<>(); + // ==================== 依赖注入 ==================== - /** - * 目标信息存储:sessionId → LogStreamTarget - */ - protected final Map sessionTargets = new ConcurrentHashMap<>(); - - /** - * 策略实例存储:sessionId → ILogStreamStrategy - */ - protected final Map sessionStrategies = new ConcurrentHashMap<>(); - - /** - * SSH会话管理器(用于配额管理) - */ @Resource private SSHSessionManager sshSessionManager; - /** - * 日志流输出监听线程池(Framework自动注入) - */ @Resource(name = "logStreamOutputExecutor") private AsyncTaskExecutor logStreamOutputExecutor; - /** - * 最大SSH连接数(与SSH终端共享配额) - */ + /** 最大SSH连接数(与SSH终端共享配额) */ private static final int MAX_SSH_SESSIONS = 5; - // ========== 辅助方法 ========== + // ==================== 抽象方法(子类实现) ==================== /** - * 获取增强的SessionId(线程安全) - * - * 使用SessionIdGenerator增强原始WebSocket SessionId,确保并发场景下的唯一性 - * - * @param session WebSocket会话 - * @return 增强的SessionId - */ - protected String getSessionId(WebSocketSession session) { - // 1. 先尝试从session attributes中获取(避免重复生成) - String sessionId = (String) session.getAttributes().get("logStreamSessionId"); - - if (sessionId == null) { - // 2. 懒加载:如果没有,就生成并存储(保证一致性) - sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); - session.getAttributes().put("logStreamSessionId", sessionId); - } - - return sessionId; - } - - // ========== 子类必须实现的抽象方法 ========== - - /** - * 获取日志流目标信息(由子类实现) - * - * @param session WebSocket会话 - * @param request 日志流请求 - * @return 日志流目标信息 - * @throws Exception 获取失败时抛出 + * 获取日志流目标信息 */ protected abstract LogStreamTarget getLogStreamTarget(WebSocketSession session, LogStreamRequest request) throws Exception; /** - * 检查用户权限(由子类实现) - * - * @param userId 用户ID - * @param target 日志流目标 - * @return 是否有权限 + * 检查用户权限 */ protected abstract boolean checkPermission(Long userId, LogStreamTarget target); /** - * 获取日志流策略(由子类实现) - * - * @param target 日志流目标 - * @return 日志流策略 + * 获取日志流策略 */ protected abstract ILogStreamStrategy getLogStreamStrategy(LogStreamTarget target); - // ========== Framework 提供的核心能力 ========== + // ==================== WebSocket生命周期 ==================== @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionId = getSessionId(session); - log.info("日志流WebSocket连接建立: webSocketId={}, logStreamSessionId={}", - session.getId(), sessionId); + log.info("日志流WebSocket连接建立: sessionId={}", sessionId); - try { - // 1. 获取用户信息 - Long userId = (Long) session.getAttributes().get("userId"); - String username = (String) session.getAttributes().get("username"); - - if (userId == null) { - log.error("无法获取用户信息: sessionId={}", sessionId); - sendError(session, "认证失败"); - session.close(CloseStatus.POLICY_VIOLATION); - return; - } - - // 2. 保存会话 - webSocketSessions.put(sessionId, session); - - // 3. 不立即发送状态消息,等待客户端发送START消息 - // 参照SSH WebSocket的做法,避免在客户端未准备好时发送消息 - log.info("日志流连接成功,等待START消息: sessionId={}, userId={}, username={}", - sessionId, userId, username); - - } catch (Exception e) { - log.error("建立日志流连接失败: sessionId={}", sessionId, e); - cleanupSession(sessionId); - - try { - session.close(CloseStatus.SERVER_ERROR); - } catch (IOException ex) { - log.error("关闭WebSocket会话失败: sessionId={}", sessionId, ex); - } + Long userId = (Long) session.getAttributes().get("userId"); + if (userId == null) { + log.error("无法获取用户信息: sessionId={}", sessionId); + sendError(session, "认证失败"); + session.close(CloseStatus.POLICY_VIOLATION); + return; } + + // 预注册会话(只有基本信息,task/strategy在START时填充) + sessions.put(sessionId, new LogStreamSession(session, null, null, null, userId)); + + log.info("日志流连接成功,等待START消息: sessionId={}, userId={}", sessionId, userId); } @Override @@ -186,118 +127,113 @@ public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHan try { LogWebSocketMessage msg = JsonUtils.fromJson(message.getPayload(), LogWebSocketMessage.class); - if (msg.getType() == LogMessageType.START) { - // 启动日志流 - handleStartMessage(session, msg); - } else if (msg.getType() == LogMessageType.CONTROL) { - // 控制消息 - handleControlMessage(session, msg); - } else { - log.warn("未知的消息类型: sessionId={}, type={}", sessionId, msg.getType()); + switch (msg.getType()) { + case START -> handleStartMessage(session, sessionId, msg); + case CONTROL -> handleControlMessage(sessionId, msg); + default -> log.warn("未知的消息类型: sessionId={}, type={}", sessionId, msg.getType()); } - } catch (Exception e) { log.error("处理WebSocket消息失败: sessionId={}", sessionId, e); sendError(session, "消息处理失败: " + e.getMessage()); } } - /** - * 处理START消息 - */ - private void handleStartMessage(WebSocketSession session, LogWebSocketMessage msg) { + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + String sessionId = getSessionId(session); + log.info("日志流WebSocket连接关闭: sessionId={}, status={}", sessionId, status); + cleanupSession(sessionId); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { String sessionId = getSessionId(session); + if (exception instanceof java.io.EOFException) { + log.debug("客户端关闭连接: sessionId={}", sessionId); + } else { + log.error("日志流WebSocket传输错误: sessionId={}", sessionId, exception); + sendError(session, "传输错误: " + exception.getMessage()); + } + + cleanupSession(sessionId); + try { - // 1. 提取请求参数 + session.close(CloseStatus.SERVER_ERROR); + } catch (IOException ignored) {} + } + + // ==================== 消息处理 ==================== + + private void handleStartMessage(WebSocketSession session, String sessionId, LogWebSocketMessage msg) { + try { + // 1. 验证请求 LogStreamRequest request = msg.getRequest(LogStreamRequest.class); if (request == null || !request.isValid()) { - log.warn("START消息参数无效: sessionId={}", sessionId); sendError(session, "请求参数无效"); return; } - // 2. 获取用户信息 - Long userId = (Long) session.getAttributes().get("userId"); + // 2. 获取当前会话 + LogStreamSession currentSession = sessions.get(sessionId); + if (currentSession == null) { + sendError(session, "会话不存在"); + return; + } - // 3. 获取日志流目标信息 + Long userId = currentSession.userId(); + + // 3. 获取日志流目标 LogStreamTarget target = getLogStreamTarget(session, request); if (target == null) { - log.error("无法获取日志流目标: sessionId={}", sessionId); sendError(session, "无法获取日志流目标"); return; } - // 保存target信息 - sessionTargets.put(sessionId, target); - - log.info("获取日志流目标成功: sessionId={}, runtimeType={}, name={}", + log.info("获取日志流目标: sessionId={}, runtimeType={}, name={}", sessionId, target.getRuntimeType(), target.getName()); // 4. 权限验证 if (!checkPermission(userId, target)) { - log.warn("用户无权访问日志: userId={}, target={}", userId, target.getName()); sendError(session, "无权访问此日志"); return; } - // 5. 对于需要SSH的运行时类型,检查配额 - if (target.getRuntimeType() == RuntimeTypeEnum.SERVER - || target.getRuntimeType() == RuntimeTypeEnum.DOCKER) { - - // 尝试注册会话(包含配额检查) - boolean registered = sshSessionManager.tryRegisterSession( - sessionId, userId, - SSHTargetType.LOG_STREAM, - target.getName(), MAX_SSH_SESSIONS); - - if (!registered) { - long currentCount = sshSessionManager.countUserTotalSessions(userId); - log.warn("用户SSH连接数超过限制: userId={}, current={}, max={}", - userId, currentCount, MAX_SSH_SESSIONS); - sendError(session, "SSH连接数已达上限(" + MAX_SSH_SESSIONS + "个),请关闭其他连接后重试"); - return; // 不启动日志流 + // 5. SSH配额检查(仅Server/Docker) + if (requiresSSH(target.getRuntimeType())) { + if (!tryRegisterSSHSession(sessionId, userId, target.getName())) { + sendError(session, "SSH连接数已达上限(" + MAX_SSH_SESSIONS + "个)"); + return; } - - log.debug("日志流SSH会话注册成功: sessionId={}, userId={}", sessionId, userId); } - // 6. 发送流式传输状态 - sendStatus(session, LogStatusEnum.STREAMING); - - // 6. 创建暂停标志 - AtomicBoolean paused = new AtomicBoolean(false); - pausedFlags.put(sessionId, paused); - - // 7. 获取日志流策略 + // 6. 获取策略 ILogStreamStrategy strategy = getLogStreamStrategy(target); if (strategy == null) { - log.error("无法获取日志流策略: sessionId={}, runtimeType={}", - sessionId, target.getRuntimeType()); sendError(session, "不支持的运行时类型"); return; } - // 保存策略实例,用于后续清理 - sessionStrategies.put(sessionId, strategy); + // 7. 创建暂停标志 + AtomicBoolean paused = new AtomicBoolean(false); - // 8. 启动日志流任务(异步,使用Spring管理的虚拟线程池) + // 8. 启动日志流任务 Future task = logStreamOutputExecutor.submit(() -> { try { - // 使用策略执行日志流 - strategy.streamLogs(session, target, paused, + // Strategy 只接收 sessionId,不接触 WebSocket + strategy.streamLogs(sessionId, target, paused::get, (timestamp, content) -> sendLogLine(session, timestamp, content)); } catch (Exception e) { log.error("日志流异常: sessionId={}", sessionId, e); - try { - sendError(session, "日志流中断: " + e.getMessage()); - } catch (Exception ex) { - log.error("发送错误消息失败: sessionId={}", sessionId, ex); - } + sendError(session, "日志流中断: " + e.getMessage()); } }); - streamTasks.put(sessionId, task); + // 9. 更新会话(原子替换) + sessions.put(sessionId, new LogStreamSession(session, task, paused, strategy, userId)); + + // 10. 发送状态 + sendStatus(session, LogStatusEnum.STREAMING); log.info("日志流已启动: sessionId={}", sessionId); } catch (Exception e) { @@ -306,228 +242,173 @@ public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHan } } - /** - * 处理CONTROL消息 - */ - private void handleControlMessage(WebSocketSession session, LogWebSocketMessage msg) { - String sessionId = getSessionId(session); + private void handleControlMessage(String sessionId, LogWebSocketMessage msg) { + LogStreamSession session = sessions.get(sessionId); + if (session == null || session.paused() == null) { + log.warn("日志流未启动,无法控制: sessionId={}", sessionId); + return; + } try { LogControlRequest request = msg.getRequest(LogControlRequest.class); if (request == null || !request.isValid()) { - log.warn("CONTROL消息参数无效: sessionId={}", sessionId); - return; - } - - AtomicBoolean paused = pausedFlags.get(sessionId); - if (paused == null) { - log.warn("日志流未启动,无法控制: sessionId={}", sessionId); return; } LogControlAction action = request.getAction(); - if (action == LogControlAction.PAUSE) { - paused.set(true); - sendStatus(session, LogStatusEnum.PAUSED); - log.info("日志流已暂停: sessionId={}", sessionId); - } else if (action == LogControlAction.RESUME) { - paused.set(false); - sendStatus(session, LogStatusEnum.STREAMING); - log.info("日志流已恢复: sessionId={}", sessionId); - } else if (action == LogControlAction.STOP) { - log.info("收到停止请求: sessionId={}", sessionId); - session.close(CloseStatus.NORMAL); + switch (action) { + case PAUSE -> { + session.paused().set(true); + sendStatus(session.webSocket(), LogStatusEnum.PAUSED); + log.info("日志流已暂停: sessionId={}", sessionId); + } + case RESUME -> { + session.paused().set(false); + sendStatus(session.webSocket(), LogStatusEnum.STREAMING); + log.info("日志流已恢复: sessionId={}", sessionId); + } + case STOP -> { + log.info("收到停止请求: sessionId={}", sessionId); + session.webSocket().close(CloseStatus.NORMAL); + } } - } catch (Exception e) { log.error("处理CONTROL消息失败: sessionId={}", sessionId, e); } } - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - String sessionId = getSessionId(session); - log.info("日志流WebSocket连接关闭: logStreamSessionId={}, status={}", sessionId, status); - cleanupSession(sessionId); - } + // ==================== 资源清理 ==================== - @Override - public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - String sessionId = getSessionId(session); - - // EOFException通常表示客户端正常关闭连接,不需要记录ERROR日志 - if (exception instanceof java.io.EOFException) { - log.debug("客户端关闭连接: sessionId={}", sessionId); - cleanupSession(sessionId); + private void cleanupSession(String sessionId) { + LogStreamSession session = sessions.remove(sessionId); + if (session == null) { return; } - log.error("日志流WebSocket传输错误: sessionId={}", sessionId, exception); + log.info("清理日志流会话: sessionId={}", sessionId); - try { - sendError(session, "传输错误: " + exception.getMessage()); - } catch (Exception e) { - // 忽略发送错误消息时的异常 - log.debug("发送错误消息失败: sessionId={}", sessionId); - } - - cleanupSession(sessionId); - - try { - session.close(CloseStatus.SERVER_ERROR); - } catch (IOException e) { - log.debug("关闭WebSocket会话失败: sessionId={}", sessionId); - } - } - - /** - * 清理会话资源 - */ - private void cleanupSession(String sessionId) { - log.info("开始清理日志流会话资源: sessionId={}", sessionId); - - try { - // 1. 调用Strategy的stop方法清理资源(SSH连接等) - ILogStreamStrategy strategy = sessionStrategies.remove(sessionId); - if (strategy != null) { - try { - log.debug("调用Strategy.stop清理资源: sessionId={}", sessionId); - strategy.stop(sessionId); - } catch (Exception e) { - log.error("Strategy清理资源失败: sessionId={}", sessionId, e); - } - } - - // 2. 从SSH会话管理器移除(释放配额) + // 1. 停止策略(清理SSH/K8S连接) + if (session.strategy() != null) { try { - sshSessionManager.removeSession(sessionId); - log.debug("SSH会话已从配额管理器移除: sessionId={}", sessionId); + session.strategy().stop(sessionId); } catch (Exception e) { - log.error("从SSH会话管理器移除失败: sessionId={}", sessionId, e); + log.error("Strategy清理失败: sessionId={}", sessionId, e); } - - // 3. 取消日志流任务 - Future task = streamTasks.remove(sessionId); - if (task != null && !task.isDone()) { - log.debug("取消日志流任务: sessionId={}", sessionId); - task.cancel(true); - } - - // 4. 移除WebSocketSession - webSocketSessions.remove(sessionId); - - // 5. 移除暂停标志 - pausedFlags.remove(sessionId); - - // 6. 移除target信息 - sessionTargets.remove(sessionId); - - log.info("日志流会话资源清理完成: sessionId={}", sessionId); - - } catch (Exception e) { - log.error("清理会话资源失败: sessionId={}", sessionId, e); } + + // 2. 释放SSH配额 + sshSessionManager.removeSession(sessionId); + + // 3. 取消异步任务 + if (session.task() != null && !session.task().isDone()) { + session.task().cancel(true); + } + + log.info("日志流会话清理完成: sessionId={}", sessionId); } - // ========== 辅助方法(供子类使用) ========== + // ==================== 辅助方法 ==================== - /** - * 发送日志行到前端 - * - * @param session WebSocket会话 - * @param timestamp 时间戳 - * @param content 日志内容 - */ - protected void sendLogLine(WebSocketSession session, String timestamp, String content) { - String sessionId = getSessionId(session); + private boolean requiresSSH(RuntimeTypeEnum runtimeType) { + return runtimeType == RuntimeTypeEnum.SERVER || runtimeType == RuntimeTypeEnum.DOCKER; + } + + private boolean tryRegisterSSHSession(String sessionId, Long userId, String targetName) { + boolean registered = sshSessionManager.tryRegisterSession( + sessionId, userId, SSHTargetType.LOG_STREAM, targetName, MAX_SSH_SESSIONS); + + if (!registered) { + long currentCount = sshSessionManager.countUserTotalSessions(userId); + log.warn("SSH连接数超限: userId={}, current={}, max={}", userId, currentCount, MAX_SSH_SESSIONS); + } + + return registered; + } + + protected String getSessionId(WebSocketSession session) { + String sessionId = (String) session.getAttributes().get("logStreamSessionId"); + if (sessionId == null) { + sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); + session.getAttributes().put("logStreamSessionId", sessionId); + } + return sessionId; + } + + // ==================== 消息发送 ==================== + + private void sendLogLine(WebSocketSession session, String timestamp, String content) { + if (session == null || !session.isOpen()) { + return; + } try { - if (session == null) { - log.warn("发送日志行失败: session为null, sessionId={}", sessionId); - return; - } + LogWebSocketMessage msg = LogWebSocketMessage.of( + LogMessageType.LOG, + new LogLineResponse(timestamp, content) + ); - if (!session.isOpen()) { - log.warn("发送日志行失败: session已关闭, sessionId={}", sessionId); - return; - } - - LogLineResponse response = new LogLineResponse(timestamp, content); - - Map data = new HashMap<>(); - data.put("response", response); - - LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.LOG, data); - - // WebSocketSession.sendMessage()不是线程安全的,必须加锁 synchronized (session) { session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); } - - log.trace("日志行已发送: sessionId={}, content={}", sessionId, content.substring(0, Math.min(50, content.length()))); - } catch (IOException e) { - log.error("发送日志行失败(IOException): sessionId={}, error={}", sessionId, e.getMessage(), e); - } catch (Exception e) { - log.error("发送日志行失败(Exception): sessionId={}, error={}", sessionId, e.getMessage(), e); + log.debug("发送日志行失败: sessionId={}", getSessionId(session)); } } - /** - * 发送状态消息到前端 - */ protected void sendStatus(WebSocketSession session, LogStatusEnum status) { + if (session == null || !session.isOpen()) { + return; + } + try { - if (session == null || !session.isOpen()) { - log.debug("会话未打开,跳过发送状态消息: status={}", status); - return; - } + LogWebSocketMessage msg = LogWebSocketMessage.of( + LogMessageType.STATUS, + new LogStatusResponse(status) + ); - LogStatusResponse response = new LogStatusResponse(status); - - Map data = new HashMap<>(); - data.put("response", response); - - LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.STATUS, data); - - // WebSocketSession.sendMessage()不是线程安全的,必须加锁 synchronized (session) { session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); } - } catch (IOException e) { - // 降低日志级别,客户端断开是正常情况 - log.debug("发送状态消息失败(客户端可能已断开): logStreamSessionId={}, status={}", - getSessionId(session), status); + log.debug("发送状态消息失败: sessionId={}", getSessionId(session)); } } - /** - * 发送错误消息到前端 - */ protected void sendError(WebSocketSession session, String error) { + if (session == null || !session.isOpen()) { + return; + } + try { - if (!session.isOpen()) { - return; - } + LogWebSocketMessage msg = LogWebSocketMessage.of( + LogMessageType.ERROR, + new LogErrorResponse(error) + ); - LogErrorResponse response = new LogErrorResponse(error); - - Map data = new HashMap<>(); - data.put("response", response); - - LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.ERROR, data); - - // WebSocketSession.sendMessage()不是线程安全的,必须加锁 synchronized (session) { session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); } - } catch (IOException e) { - if (session.isOpen()) { - log.error("发送错误消息失败: logStreamSessionId={}", getSessionId(session), e); - } + log.debug("发送错误消息失败: sessionId={}", getSessionId(session)); } } + + // ==================== 监控方法(供子类或管理接口使用) ==================== + + /** + * 获取当前活跃会话数 + */ + protected int getActiveSessionCount() { + return sessions.size(); + } + + /** + * 获取指定用户的会话数 + */ + protected long getUserSessionCount(Long userId) { + return sessions.values().stream() + .filter(s -> userId.equals(s.userId())) + .count(); + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/ILogStreamStrategy.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/ILogStreamStrategy.java index 0ffd6598..a9d910c1 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/ILogStreamStrategy.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/ILogStreamStrategy.java @@ -1,13 +1,14 @@ package com.qqchen.deploy.backend.framework.websocket.log; import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum; -import org.springframework.web.socket.WebSocketSession; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * 日志流策略接口 - * 定义不同运行时类型的日志流获取方式 + * + *

定义不同运行时类型的日志流获取方式。 + * Strategy 只负责连接管理和日志读取,不涉及 WebSocket 协议。 * * @author Framework * @since 2025-12-16 @@ -16,31 +17,27 @@ public interface ILogStreamStrategy { /** * 支持的运行时类型 - * - * @return 运行时类型 */ RuntimeTypeEnum supportedType(); /** * 执行日志流推送 - * 此方法应该持续读取日志并通过callback推送到前端 * - * @param session WebSocket会话 + * @param sessionId 会话ID(由 Handler 生成) * @param target 日志流目标信息 - * @param paused 暂停标志(实现应定期检查此标志) - * @param callback 日志行回调接口 + * @param isPaused 暂停状态查询(由 Handler 控制) + * @param callback 日志行回调 * @throws Exception 流式推送失败时抛出 */ - void streamLogs(WebSocketSession session, - LogStreamTarget target, - AtomicBoolean paused, - LogLineCallback callback) throws Exception; + void streamLogs(String sessionId, + LogStreamTarget target, + Supplier isPaused, + LogLineCallback callback) throws Exception; /** * 停止日志流并清理资源 - * 当WebSocket连接关闭时调用,确保SSH连接等资源被正确释放 * - * @param sessionId WebSocket会话ID + * @param sessionId 会话ID */ void stop(String sessionId); @@ -49,12 +46,6 @@ public interface ILogStreamStrategy { */ @FunctionalInterface interface LogLineCallback { - /** - * 发送日志行 - * - * @param timestamp 时间戳 - * @param content 日志内容 - */ void sendLogLine(String timestamp, String content); } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/LogWebSocketMessage.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/LogWebSocketMessage.java index 9ee720d0..900e0df8 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/LogWebSocketMessage.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/websocket/log/LogWebSocketMessage.java @@ -34,6 +34,17 @@ public class LogWebSocketMessage { */ private Map data; + /** + * 创建响应消息的工厂方法 + * + * @param type 消息类型 + * @param response 响应对象 + * @return LogWebSocketMessage + */ + public static LogWebSocketMessage of(LogMessageType type, Object response) { + return new LogWebSocketMessage(type, Map.of("response", response)); + } + /** * 从data中提取request对象(强类型) *