This commit is contained in:
dengqichen 2025-12-30 16:27:52 +08:00
parent 05802210b2
commit 3d6ca74fed

View File

@ -21,12 +21,17 @@ import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/**
* K8S日志流策略
*
* <p> 重要OkHttp Response必须显式关闭否则会导致连接泄漏
* <p>当用户关闭WebSocket时需要同时关闭Response而不仅仅是cancel Call
*
* @author qqchen
* @since 2025-12-16
*/
@ -36,6 +41,12 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
public record K8sConnection(ApiClient apiClient, Call call) {}
/**
* Response缓存sessionId Response
* <p>用于在stop/cleanup时显式关闭Response避免OkHttp连接泄漏
*/
private final Map<String, Response> responseCache = new ConcurrentHashMap<>();
@Resource
private IK8sServiceIntegration k8sServiceIntegration;
@ -106,7 +117,13 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
callback.sendLogLine(Instant.now().toString(),
"[系统] 正在连接K8S日志流请稍候...");
try (Response response = conn.call().execute()) {
Response response = null;
try {
response = conn.call().execute();
// 关键缓存Response引用用于stop/cleanup时显式关闭
responseCache.put(sessionId, response);
log.info("K8S日志API响应: sessionId={}, pod={}, code={}, protocol={}, message={}",
sessionId, target.getName(), response.code(), response.protocol(), response.message());
@ -182,6 +199,9 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
log.error("K8S日志流异常: sessionId={}, error={}", sessionId, e.getMessage(), e);
callback.sendLogLine(Instant.now().toString(),
"[错误] K8S日志流异常: " + e.getMessage());
} finally {
// 关键确保Response被关闭避免OkHttp连接泄漏
closeResponse(sessionId, response);
}
log.info("K8S日志流结束: sessionId={}, lineCount={}", sessionId, lineCount.get());
@ -189,6 +209,10 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
@Override
protected void doStop(String sessionId, K8sConnection conn) {
// 1. 先关闭Response释放HTTP连接
closeResponse(sessionId, null);
// 2. 再取消Call
if (conn.call() != null && !conn.call().isCanceled()) {
conn.call().cancel();
log.debug("K8S Call已取消: sessionId={}", sessionId);
@ -197,11 +221,35 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
@Override
protected void doCleanupConnection(String sessionId, K8sConnection conn) {
// 只取消Call不关闭ApiClientApiClient是按集群共享的
// 1. 确保Response被关闭
closeResponse(sessionId, null);
// 2. 取消Call如果还没取消
if (conn.call() != null && !conn.call().isCanceled()) {
try { conn.call().cancel(); } catch (Exception ignored) {}
}
log.debug("K8S连接清理完成: sessionId={}", sessionId);
}
/**
* 关闭Response避免OkHttp连接泄漏
*
* @param sessionId 会话ID
* @param response 要关闭的Response如果为null则从缓存中获取
*/
private void closeResponse(String sessionId, Response response) {
// 从缓存中移除并获取Response
Response cachedResponse = responseCache.remove(sessionId);
Response toClose = response != null ? response : cachedResponse;
if (toClose != null) {
try {
toClose.close();
log.debug("K8S Response已关闭: sessionId={}", sessionId);
} catch (Exception e) {
log.debug("关闭K8S Response异常可忽略: sessionId={}, error={}", sessionId, e.getMessage());
}
}
}
}