This commit is contained in:
dengqichen 2025-12-30 15:09:49 +08:00
parent 1aa84e9ffe
commit 9567a43418
10 changed files with 262 additions and 4163 deletions

View File

@ -180,21 +180,15 @@ public class ThreadPoolConfig {
* 日志流输出监听线程池 - 使用虚拟线程Java 21+
*
* 为什么使用虚拟线程
* 1. 日志流监听是**I/O密集型阻塞任务**BufferedReader.readLine()阻塞等待
* 2. 虚拟线程在I/O阻塞时会自动让出载体线程不占用OS线程资源
* 3. 支持数百个并发日志流连接无需担心线程池耗尽
* 4. 每个日志流虽然长时间运行但大部分时间在等待I/O阻塞状态
* 1. 日志流是典型的**网络I/O密集型**任务
* 2. 等待K8S/Docker/SSH日志输出时线程会长时间阻塞
* 3. 虚拟线程在阻塞时不占用OS线程资源消耗极低
* 4. 支持数百个并发日志流无需担心线程池耗尽
*
* 💡 场景
* - 监听SSH exec()命令的输出流tail -fdocker logs -f
* - 监听K8S Pod日志流
* - 持续读取日志行并推送到WebSocket客户端
* - 每个日志流连接占用一个虚拟线程直到连接关闭
*
* 🎯 虚拟线程优势
* - 轻量级每个虚拟线程只占用几KB内存
* - 高并发支持数千个并发日志流连接
* - 自动调度I/O阻塞时自动释放载体线程
* - 无需配置线程池大小无限制并发
*/
@Bean("logStreamOutputExecutor")
public SimpleAsyncTaskExecutor logStreamOutputExecutor() {

View File

@ -1,9 +1,10 @@
package com.qqchen.deploy.backend.deploy.handler;
import com.qqchen.deploy.backend.deploy.entity.K8sNamespace;
import com.qqchen.deploy.backend.deploy.entity.K8sDeployment;
import com.qqchen.deploy.backend.deploy.entity.Server;
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
import com.qqchen.deploy.backend.deploy.repository.IK8sDeploymentRepository;
import com.qqchen.deploy.backend.deploy.repository.IK8sNamespaceRepository;
import com.qqchen.deploy.backend.deploy.repository.IServerRepository;
import com.qqchen.deploy.backend.deploy.repository.ITeamApplicationRepository;
@ -12,6 +13,7 @@ import com.qqchen.deploy.backend.deploy.strategy.log.K8sLogStreamStrategy;
import com.qqchen.deploy.backend.deploy.strategy.log.ServerLogStreamStrategy;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.utils.K8sYamlUtils;
import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamWebSocketHandler;
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
@ -41,6 +43,9 @@ public class TeamApplicationLogStreamWebSocketHandler extends AbstractLogStreamW
@Resource
private IK8sNamespaceRepository k8sNamespaceRepository;
@Resource
private IK8sDeploymentRepository k8sDeploymentRepository;
@Resource
private K8sLogStreamStrategy k8sLogStreamStrategy;
@ -132,6 +137,45 @@ public class TeamApplicationLogStreamWebSocketHandler extends AbstractLogStreamW
// 直接使用TeamApplication中的namespace名称
target.setK8sSystemId(teamApp.getK8sSystemId());
target.setK8sNamespace(teamApp.getK8sNamespaceName());
// 从K8sDeployment的YAML配置中解析容器名
String containerName = getContainerNameFromDeployment(teamApp);
if (containerName != null) {
target.setK8sContainer(containerName);
log.debug("从Deployment YAML解析到容器名: {}", containerName);
}
}
/**
* 从K8sDeployment获取第一个容器名
*/
private String getContainerNameFromDeployment(TeamApplication teamApp) {
if (teamApp.getK8sDeploymentName() == null || teamApp.getK8sDeploymentName().isBlank()) {
log.warn("TeamApplication未配置K8sDeploymentName无法获取容器名");
return null;
}
// 查询K8sDeployment
K8sDeployment deployment = k8sDeploymentRepository
.findByExternalSystemIdAndNamespaceNameAndDeploymentName(
teamApp.getK8sSystemId(),
teamApp.getK8sNamespaceName(),
teamApp.getK8sDeploymentName())
.orElse(null);
if (deployment == null) {
log.warn("未找到K8sDeployment: systemId={}, namespace={}, deployment={}",
teamApp.getK8sSystemId(), teamApp.getK8sNamespaceName(), teamApp.getK8sDeploymentName());
return null;
}
// 使用工具类解析YAML获取容器名
String containerName = K8sYamlUtils.extractFirstContainerName(deployment.getYamlConfig());
if (containerName != null) {
log.info("从Deployment YAML解析容器名: deployment={}, containerName={}",
deployment.getDeploymentName(), containerName);
}
return containerName;
}
/**

View File

@ -253,6 +253,7 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration {
/**
* 获取用于日志流的K8S ApiClient长超时
* 日志流是长连接需要更长的读取超时时间
* 同一K8S集群的所有日志流共享一个ApiClient
*
* @param system K8S系统配置
* @return ApiClient实例readTimeout=30分钟

View File

@ -44,6 +44,9 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
// K8S ApiClient缓存 - 线程安全
private static final Map<Long, K8sApiClientCache> API_CLIENT_CACHE = new ConcurrentHashMap<>();
// 日志流专用ApiClient缓存长超时按k8sSystemId复用
private static final Map<Long, ApiClient> LOG_STREAM_API_CLIENT_CACHE = new ConcurrentHashMap<>();
private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; // 30分钟过期
/**
@ -1199,15 +1202,26 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
}
/**
* 获取用于日志流的K8S ApiClient长超时
* 日志流是长连接需要更长的读取超时时间
* 注意此方法每次调用都会创建新的ApiClient不使用缓存
* 获取用于日志流的K8S ApiClient长超时按集群缓存复用
*
* <p>日志流是长连接需要更长的读取超时时间
* 同一K8S集群的所有日志流共享一个ApiClient避免重复创建
*
* @param system K8S系统配置
* @return ApiClient实例readTimeout=30分钟
*/
@Override
public ApiClient getApiClientForLogStream(ExternalSystem system) {
return LOG_STREAM_API_CLIENT_CACHE.computeIfAbsent(system.getId(), id -> {
log.info("创建日志流专用ApiClient: k8sSystemId={}, clusterName={}", id, system.getName());
return createLogStreamApiClient(system);
});
}
/**
* 创建日志流专用ApiClient内部方法
*/
private ApiClient createLogStreamApiClient(ExternalSystem system) {
try {
String config = system.getConfig();
if (config == null || config.trim().isEmpty()) {
@ -1218,7 +1232,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
client.setConnectTimeout(15000); // 15秒连接超时
client.setReadTimeout(30 * 60 * 1000); // 30分钟读取超时日志流长连接
log.debug("创建日志流专用ApiClientreadTimeout=30分钟");
log.debug("日志流ApiClient创建完成readTimeout=30分钟");
return client;
} catch (BusinessException e) {
throw e;

View File

@ -15,6 +15,21 @@ public interface IK8sDeploymentRepository extends IBaseRepository<K8sDeployment,
Optional<K8sDeployment> findByNamespaceIdAndDeploymentName(Long namespaceId, String deploymentName);
/**
* 通过外部系统ID命名空间名称Deployment名称查询
* 用于日志流查询时获取Deployment的YAML配置以解析容器名
*/
@Query(value = "SELECT d.* FROM deploy_k8s_deployment d " +
"INNER JOIN deploy_k8s_namespace n ON d.namespace_id = n.id " +
"WHERE d.external_system_id = :externalSystemId " +
"AND n.namespace_name = :namespaceName " +
"AND d.deployment_name = :deploymentName " +
"AND d.deleted = 0", nativeQuery = true)
Optional<K8sDeployment> findByExternalSystemIdAndNamespaceNameAndDeploymentName(
@Param("externalSystemId") Long externalSystemId,
@Param("namespaceName") String namespaceName,
@Param("deploymentName") String deploymentName);
List<K8sDeployment> findByExternalSystemId(Long externalSystemId);
List<K8sDeployment> findByNamespaceId(Long namespaceId);

View File

@ -58,12 +58,29 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
ApiClient apiClient = k8sServiceIntegration.getApiClientForLogStream(k8sSystem);
CoreV1Api api = new CoreV1Api(apiClient);
// 先获取历史日志tailLines再follow新日志
// sinceSeconds=null 表示不限制时间范围依赖tailLines获取历史
// container: 多容器Pod必须指定否则K8S API可能阻塞
String container = target.getK8sContainer();
Call call = api.readNamespacedPodLogCall(
target.getName(), target.getK8sNamespace(),
null, true, null, null, "false", false, null,
target.getLines(), true, null
target.getName(), // podName
target.getK8sNamespace(), // namespace
container, // container (多容器Pod必须指定)
true, // follow
null, // insecureSkipTLSVerifyBackend
null, // limitBytes
"false", // pretty
false, // previous (不查询上一个容器)
null, // sinceSeconds (null=不限制依赖tailLines)
target.getLines(), // tailLines (历史行数)
true, // timestamps
null // callback
);
log.info("K8S日志流Call创建完成: sessionId={}, pod={}, container={}, tailLines={}",
sessionId, target.getName(), container, target.getLines());
return new K8sConnection(apiClient, call);
}
@ -81,11 +98,28 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
AtomicLong lineCount = new AtomicLong(0);
// 调试检查Call状态
log.info("执行K8S日志API调用: sessionId={}, pod={}, callExecuted={}, callCanceled={}",
sessionId, target.getName(), conn.call().isExecuted(), conn.call().isCanceled());
// 先发送连接提示因为K8S API可能需要较长时间响应
callback.sendLogLine(Instant.now().toString(),
"[系统] 正在连接K8S日志流请稍候...");
try (Response response = conn.call().execute()) {
log.info("K8S日志API响应: sessionId={}, pod={}, code={}, protocol={}, message={}",
sessionId, target.getName(), response.code(), response.protocol(), response.message());
// 连接成功提示
callback.sendLogLine(Instant.now().toString(),
"[系统] K8S日志流连接成功开始接收日志...");
if (!response.isSuccessful()) {
String errorBody = response.body() != null ? response.body().string() : "无响应体";
log.error("K8S日志API错误: sessionId={}, code={}, body={}",
sessionId, response.code(), errorBody);
callback.sendLogLine(Instant.now().toString(),
"[错误] K8S API返回错误: " + response.code() + " - " + errorBody);
throw new BusinessException(ResponseCode.K8S_OPERATION_FAILED);
}
@ -95,13 +129,38 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
return;
}
// 调试检查响应体信息
log.info("K8S日志响应体: sessionId={}, contentType={}, contentLength={}",
sessionId, body.contentType(), body.contentLength());
InputStream inputStream = body.byteStream();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
log.info("开始读取K8S日志流: sessionId={}", sessionId);
// 首次读取超时检测如果5秒内没有数据发送等待提示
long startTime = System.currentTimeMillis();
boolean sentWaitingHint = false;
while (!Thread.currentThread().isInterrupted()) {
// 检查是否有数据可读非阻塞检查
if (!sentWaitingHint && lineCount.get() == 0) {
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > 5000) {
// 5秒没有数据发送等待提示
callback.sendLogLine(Instant.now().toString(),
"[系统提示] 正在等待Pod输出新日志如果Pod没有日志输出则会一直等待...");
sentWaitingHint = true;
log.info("K8S日志流等待中已发送提示: sessionId={}", sessionId);
}
}
line = reader.readLine();
if (line == null) break;
if (line == null) {
log.info("K8S日志流EOF: sessionId={}, lineCount={}", sessionId, lineCount.get());
break;
}
if (isPaused.get()) { Thread.sleep(100); continue; }
String[] parts = line.split(" ", 2);
@ -109,13 +168,20 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
String content = parts.length > 1 ? parts[1] : line;
callback.sendLogLine(timestamp, content);
lineCount.incrementAndGet();
long count = lineCount.incrementAndGet();
if (count == 1) {
log.info("K8S日志流首行数据: sessionId={}", sessionId);
}
}
}
} catch (java.net.SocketException | java.io.InterruptedIOException e) {
log.debug("K8S日志流中断: sessionId={}, lineCount={}", sessionId, lineCount.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("K8S日志流异常: sessionId={}, error={}", sessionId, e.getMessage(), e);
callback.sendLogLine(Instant.now().toString(),
"[错误] K8S日志流异常: " + e.getMessage());
}
log.info("K8S日志流结束: sessionId={}, lineCount={}", sessionId, lineCount.get());
@ -131,23 +197,11 @@ public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStream
@Override
protected void doCleanupConnection(String sessionId, K8sConnection conn) {
// 只取消Call不关闭ApiClientApiClient是按集群共享的
if (conn.call() != null && !conn.call().isCanceled()) {
try { conn.call().cancel(); } catch (Exception ignored) {}
}
if (conn.apiClient() != null) {
try {
okhttp3.OkHttpClient httpClient = conn.apiClient().getHttpClient();
if (httpClient.connectionPool() != null) {
httpClient.connectionPool().evictAll();
}
if (httpClient.dispatcher() != null &&
httpClient.dispatcher().executorService() != null) {
httpClient.dispatcher().executorService().shutdown();
}
} catch (Exception ignored) {}
}
log.debug("K8S连接清理完成: sessionId={}", sessionId);
}
}

View File

@ -0,0 +1,93 @@
package com.qqchen.deploy.backend.framework.utils;
import lombok.extern.slf4j.Slf4j;
import org.yaml.snakeyaml.Yaml;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* K8S YAML解析工具类
*
* @author qqchen
* @since 2025-12-30
*/
@Slf4j
public class K8sYamlUtils {
private K8sYamlUtils() {
// 工具类禁止实例化
}
/**
* 从Deployment YAML中提取第一个容器名
* YAML路径: spec.template.spec.containers[0].name
*
* @param yamlContent Deployment的YAML配置内容
* @return 第一个容器名解析失败返回null
*/
public static String extractFirstContainerName(String yamlContent) {
List<Map<String, Object>> containers = parseContainers(yamlContent);
if (containers == null || containers.isEmpty()) {
return null;
}
return (String) containers.get(0).get("name");
}
/**
* 从Deployment YAML中提取所有容器名列表
* YAML路径: spec.template.spec.containers[*].name
*
* @param yamlContent Deployment的YAML配置内容
* @return 容器名列表解析失败返回空列表
*/
public static List<String> extractAllContainerNames(String yamlContent) {
List<String> result = new ArrayList<>();
List<Map<String, Object>> containers = parseContainers(yamlContent);
if (containers == null) {
return result;
}
for (Map<String, Object> container : containers) {
String name = (String) container.get("name");
if (name != null) {
result.add(name);
}
}
return result;
}
/**
* 解析YAML获取containers列表
* YAML路径: spec.template.spec.containers
*
* @param yamlContent Deployment的YAML配置内容
* @return containers列表解析失败返回null
*/
@SuppressWarnings("unchecked")
private static List<Map<String, Object>> parseContainers(String yamlContent) {
if (yamlContent == null || yamlContent.isBlank()) {
return null;
}
try {
Yaml yaml = new Yaml();
Map<String, Object> deploymentMap = yaml.load(yamlContent);
Map<String, Object> spec = (Map<String, Object>) deploymentMap.get("spec");
if (spec == null) return null;
Map<String, Object> template = (Map<String, Object>) spec.get("template");
if (template == null) return null;
Map<String, Object> templateSpec = (Map<String, Object>) template.get("spec");
if (templateSpec == null) return null;
return (List<Map<String, Object>>) templateSpec.get("containers");
} catch (Exception e) {
log.error("解析Deployment YAML获取containers失败: {}", e.getMessage());
return null;
}
}
}

View File

@ -48,6 +48,11 @@ public class LogStreamTarget {
*/
private String k8sNamespace;
/**
* K8S容器名称多容器Pod必须指定
*/
private String k8sContainer;
// ========== Docker/Server相关字段需要SSH ==========

View File

@ -12,9 +12,14 @@ INSERT INTO system_release (
)
VALUES (
'system', NOW(), 'system', NOW(), 1, 0,
1.45, 'ALL', NOW(),
1.47, 'ALL', NOW(),
'【后端】
- enabledCount disabledCount totalElements data
',
1K8S日志流多并发连接只有部分能返回数据的问题
2K8S日志流ApiClient缓存机制
3线
1Monaco滚动条残影问题
2
3/',
0, NULL, NULL, 0
);

File diff suppressed because it is too large Load Diff