This commit is contained in:
dengqichen 2025-12-30 11:01:22 +08:00
parent 16a7b9513d
commit a798dee401
9 changed files with 692 additions and 828 deletions

View File

@ -250,6 +250,15 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration {
*/
io.kubernetes.client.openapi.ApiClient getApiClient(ExternalSystem system);
/**
* 获取用于日志流的K8S ApiClient长超时
* 日志流是长连接需要更长的读取超时时间
*
* @param system K8S系统配置
* @return ApiClient实例readTimeout=30分钟
*/
io.kubernetes.client.openapi.ApiClient getApiClientForLogStream(ExternalSystem system);
/**
* 获取系统类型
*

View File

@ -1198,6 +1198,36 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
return cache.apiClient;
}
/**
* 获取用于日志流的K8S ApiClient长超时
* 日志流是长连接需要更长的读取超时时间
* 注意此方法每次调用都会创建新的ApiClient不使用缓存
*
* @param system K8S系统配置
* @return ApiClient实例readTimeout=30分钟
*/
@Override
public ApiClient getApiClientForLogStream(ExternalSystem system) {
try {
String config = system.getConfig();
if (config == null || config.trim().isEmpty()) {
throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY);
}
ApiClient client = Config.fromConfig(new StringReader(config));
client.setConnectTimeout(15000); // 15秒连接超时
client.setReadTimeout(30 * 60 * 1000); // 30分钟读取超时日志流长连接
log.debug("创建日志流专用ApiClientreadTimeout=30分钟");
return client;
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("创建日志流ApiClient失败: {}", e.getMessage(), e);
throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED);
}
}
/**
* 关闭K8S ApiClient释放OkHttp资源
* 包括连接池和调度器线程

View File

@ -3,223 +3,116 @@ package com.qqchen.deploy.backend.deploy.strategy.log;
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.Session;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* Docker日志流策略
* 通过SSH连接执行docker logs -f命令获取日志流
*
* @author qqchen
* @since 2025-12-16
*/
@Slf4j
@Component
public class DockerLogStreamStrategy implements ILogStreamStrategy {
public class DockerLogStreamStrategy extends AbstractLogStreamStrategy<DockerLogStreamStrategy.SSHConnection> {
public record SSHConnection(SSHClient client, Session session, Session.Command command) {}
@Resource
private SSHCommandServiceFactory sshCommandServiceFactory;
/**
* 关键修复保存每个会话的SSH连接引用用于stop()时强制关闭
* sessionId SSHClient
*/
private final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
/**
* 关键修复保存每个会话的SSH Session引用
* sessionId SSH Session
*/
private final Map<String, Session> sshSessions = new ConcurrentHashMap<>();
/**
* 关键修复保存每个会话的Command引用
* sessionId Session.Command
*/
private final Map<String, Session.Command> sshCommands = new ConcurrentHashMap<>();
@Override
public RuntimeTypeEnum supportedType() {
return RuntimeTypeEnum.DOCKER;
}
@Override
public void streamLogs(WebSocketSession session,
LogStreamTarget target,
AtomicBoolean paused,
LogLineCallback callback) throws Exception {
protected SSHConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception {
log.info("创建Docker日志流连接: sessionId={}, container={}, host={}",
sessionId, target.getName(), target.getHost());
// 关键修复从session attributes获取增强后的sessionId与AbstractLogStreamWebSocketHandler保持一致
String sessionId = (String) session.getAttributes().get("logStreamSessionId");
if (sessionId == null) {
sessionId = session.getId(); // 降级方案
}
log.info("开始Docker日志流: sessionId={}, container={}, host={}, session.isOpen={}",
sessionId, target.getName(), target.getHost(), session.isOpen());
SSHClient sshClient = null;
Session sshSession = null;
Session.Command cmd = null;
try {
// 1. 建立SSH连接
ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType());
sshClient = sshService.createConnection(
target.getHost(),
target.getPort(),
target.getUsername(),
target.getPassword(),
target.getPrivateKey(),
target.getPassphrase()
SSHClient sshClient = sshService.createConnection(
target.getHost(), target.getPort(), target.getUsername(),
target.getPassword(), target.getPrivateKey(), target.getPassphrase()
);
// 关键修复保存SSH连接引用供stop()方法使用
sshClients.put(sessionId, sshClient);
// 2. 构建docker logs命令
String command = String.format("docker logs -f %s --tail %d",
target.getName(), target.getLines());
log.debug("Docker日志命令: {}", command);
log.debug("执行Docker日志命令: {}", command);
Session sshSession = sshClient.startSession();
Session.Command cmd = sshSession.exec(command);
// 3. 执行命令
sshSession = sshClient.startSession();
sshSessions.put(sessionId, sshSession);
return new SSHConnection(sshClient, sshSession, cmd);
}
cmd = sshSession.exec(command);
sshCommands.put(sessionId, cmd);
@Override
protected void doStreamLogs(String sessionId,
LogStreamTarget target,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception {
SSHConnection conn = getConnection(sessionId);
if (conn == null) {
log.warn("获取连接失败: sessionId={}", sessionId);
return;
}
// 4. 持续读取输出流
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(cmd.getInputStream()))) {
new InputStreamReader(conn.command().getInputStream()))) {
String line;
// 关键修复增加线程中断检查确保stop()能够中断阻塞的readLine()
while (session.isOpen() && !Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted()) {
line = reader.readLine();
// readLine()返回null表示流已关闭
if (line == null) {
log.debug("SSH输出流已关闭: sessionId={}", sessionId);
break;
}
// 检查暂停标志
if (paused.get()) {
Thread.sleep(100);
continue;
}
// 推送日志行Docker日志没有时间戳使用当前时间
if (line == null) break;
if (isPaused.get()) { Thread.sleep(100); continue; }
callback.sendLogLine(Instant.now().toString(), line);
}
log.debug("Docker日志流正常结束: sessionId={}, session.isOpen={}, interrupted={}",
sessionId, session.isOpen(), Thread.currentThread().isInterrupted());
}
} catch (java.net.SocketException e) {
// Socket关闭是正常的清理流程
log.debug("SSH Socket已关闭: sessionId={}", sessionId);
} catch (java.io.InterruptedIOException e) {
// 线程被中断正常的清理流程
log.debug("Docker日志流线程被中断: sessionId={}", sessionId);
} catch (java.net.SocketException | java.io.InterruptedIOException e) {
log.debug("Docker日志流中断: sessionId={}", sessionId);
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
log.debug("Docker日志流线程被中断(sleep): sessionId={}", sessionId);
} catch (Exception e) {
if (session.isOpen()) {
log.error("Docker日志流异常: sessionId={}", sessionId, e);
throw e;
} else {
log.debug("Docker日志流异常(session已关闭): sessionId={}", sessionId);
}
} finally {
// 关键修复确保资源被清理
cleanupResources(sessionId, cmd, sshSession, sshClient);
log.info("Docker日志流结束: sessionId={}", sessionId);
}
@Override
protected void doStop(String sessionId, SSHConnection conn) {
if (conn.command() != null) {
try { conn.command().getInputStream().close(); } catch (Exception ignored) {}
}
}
@Override
public void stop(String sessionId) {
log.info("停止Docker日志流: sessionId={}", sessionId);
// 关键修复主动清理SSH资源不依赖finally块
Session.Command cmd = sshCommands.remove(sessionId);
Session sshSession = sshSessions.remove(sessionId);
SSHClient sshClient = sshClients.remove(sessionId);
cleanupResources(sessionId, cmd, sshSession, sshClient);
protected void doCleanupConnection(String sessionId, SSHConnection conn) {
if (conn.command() != null) {
try { conn.command().getInputStream().close(); } catch (Exception ignored) {}
try { conn.command().close(); } catch (Exception ignored) {}
}
/**
* 关键修复统一的资源清理方法
* 确保SSH连接SessionCommand都被正确关闭
*
* @param sessionId 会话ID
* @param cmd SSH Command
* @param sshSession SSH Session
* @param sshClient SSH Client
*/
private void cleanupResources(String sessionId, Session.Command cmd, Session sshSession, SSHClient sshClient) {
// 1. 先关闭Command的输入输出流强制中断阻塞的readLine()
if (cmd != null) {
if (conn.session() != null) {
try { conn.session().close(); } catch (Exception ignored) {}
}
if (conn.client() != null) {
try {
cmd.getInputStream().close();
} catch (Exception e) {
log.debug("关闭Command输入流失败: sessionId={}", sessionId, e);
if (conn.client().isConnected()) conn.client().disconnect();
conn.client().close();
} catch (Exception ignored) {}
}
try {
cmd.close();
} catch (Exception e) {
log.debug("关闭Command失败: sessionId={}", sessionId, e);
}
}
// 2. 关闭SSH Session
if (sshSession != null) {
try {
sshSession.close();
log.debug("SSH Session已关闭: sessionId={}", sessionId);
} catch (Exception e) {
log.debug("关闭SSH Session失败: sessionId={}", sessionId, e);
}
}
// 3. 断开SSH Client
if (sshClient != null) {
try {
if (sshClient.isConnected()) {
sshClient.disconnect();
}
sshClient.close();
log.debug("SSH Client已断开: sessionId={}", sessionId);
} catch (Exception e) {
log.debug("断开SSH Client失败: sessionId={}", sessionId, e);
}
}
// 4. 从Map中移除引用
sshCommands.remove(sessionId);
sshSessions.remove(sessionId);
sshClients.remove(sessionId);
log.debug("Docker日志流资源清理完成: sessionId={}", sessionId);
log.debug("Docker连接清理完成: sessionId={}", sessionId);
}
}

View File

@ -6,32 +6,35 @@ import com.qqchen.deploy.backend.deploy.integration.IK8sServiceIntegration;
import com.qqchen.deploy.backend.deploy.repository.IExternalSystemRepository;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/**
* K8S日志流策略
* 使用Kubernetes Java Client API获取Pod日志流
*
* @author qqchen
* @since 2025-12-16
*/
@Slf4j
@Component
public class K8sLogStreamStrategy implements ILogStreamStrategy {
public class K8sLogStreamStrategy extends AbstractLogStreamStrategy<K8sLogStreamStrategy.K8sConnection> {
public record K8sConnection(ApiClient apiClient, Call call) {}
@Resource
private IK8sServiceIntegration k8sServiceIntegration;
@ -45,82 +48,106 @@ public class K8sLogStreamStrategy implements ILogStreamStrategy {
}
@Override
public void streamLogs(WebSocketSession session,
LogStreamTarget target,
AtomicBoolean paused,
LogLineCallback callback) throws Exception {
protected K8sConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception {
log.info("创建K8S日志流连接: sessionId={}, pod={}, namespace={}",
sessionId, target.getName(), target.getK8sNamespace());
// 注意: 不要使用session.getId()因为AbstractLogStreamWebSocketHandler使用的是增强后的sessionId
// 这里仅用于日志输出实际的session管理由AbstractLogStreamWebSocketHandler负责
String webSocketId = session.getId();
log.info("开始K8S日志流: webSocketId={}, pod={}, namespace={}",
webSocketId, target.getName(), target.getK8sNamespace());
try {
// 1. 获取K8S系统配置
ExternalSystem k8sSystem = externalSystemRepository.findById(target.getK8sSystemId())
.orElseThrow(() -> new BusinessException(ResponseCode.K8S_SYSTEM_NOT_FOUND));
// 2. 获取ApiClient使用集成服务的缓存机制
ApiClient apiClient = k8sServiceIntegration.getApiClient(k8sSystem);
ApiClient apiClient = k8sServiceIntegration.getApiClientForLogStream(k8sSystem);
CoreV1Api api = new CoreV1Api(apiClient);
// 3. 调用K8S API获取日志流
Call call = api.readNamespacedPodLogCall(
target.getName(), // podName
target.getK8sNamespace(), // namespace
null, // container (null = default container)
true, // follow = true (实时流)
null, // insecureSkipTLSVerifyBackend
null, // limitBytes
"false", // pretty
false, // previous
null, // sinceSeconds
target.getLines(), // tailLines
true, // timestamps
null // callback
target.getName(), target.getK8sNamespace(),
null, true, null, null, "false", false, null,
target.getLines(), true, null
);
// 4. 执行调用并获取Response使用try-with-resources自动关闭
try (okhttp3.Response response = call.execute()) {
InputStream inputStream = response.body().byteStream();
// 5. 持续读取日志流
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while (session.isOpen() && (line = reader.readLine()) != null) {
// 检查暂停标志
if (paused.get()) {
Thread.sleep(100);
continue;
return new K8sConnection(apiClient, call);
}
// 解析K8S日志行格式timestamp content
@Override
protected void doStreamLogs(String sessionId,
LogStreamTarget target,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception {
K8sConnection conn = getConnection(sessionId);
if (conn == null) {
log.warn("获取连接失败: sessionId={}", sessionId);
return;
}
AtomicLong lineCount = new AtomicLong(0);
try (Response response = conn.call().execute()) {
if (!response.isSuccessful()) {
String errorBody = response.body() != null ? response.body().string() : "无响应体";
log.error("K8S日志API错误: sessionId={}, code={}, body={}",
sessionId, response.code(), errorBody);
throw new BusinessException(ResponseCode.K8S_OPERATION_FAILED);
}
ResponseBody body = response.body();
if (body == null) {
log.warn("K8S日志API返回空响应体: sessionId={}", sessionId);
return;
}
InputStream inputStream = body.byteStream();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while (!Thread.currentThread().isInterrupted()) {
line = reader.readLine();
if (line == null) break;
if (isPaused.get()) { Thread.sleep(100); continue; }
String[] parts = line.split(" ", 2);
String timestamp = parts.length > 0 ? parts[0] : Instant.now().toString();
String content = parts.length > 1 ? parts[1] : line;
// 推送日志行
callback.sendLogLine(timestamp, content);
}
log.debug("K8S日志流退出while循环: webSocketId={}, session.isOpen={}",
webSocketId, session.isOpen());
lineCount.incrementAndGet();
}
}
} catch (java.net.SocketException | java.io.InterruptedIOException e) {
log.debug("K8S日志流中断: sessionId={}, lineCount={}", sessionId, lineCount.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("K8S日志流正常结束: webSocketId={}", webSocketId);
log.info("K8S日志流结束: sessionId={}, lineCount={}", sessionId, lineCount.get());
}
} catch (Exception e) {
log.error("K8S日志流异常: webSocketId={}", webSocketId, e);
throw e;
@Override
protected void doStop(String sessionId, K8sConnection conn) {
if (conn.call() != null && !conn.call().isCanceled()) {
conn.call().cancel();
log.debug("K8S Call已取消: sessionId={}", sessionId);
}
}
@Override
public void stop(String sessionId) {
log.info("停止K8S日志流: logStreamSessionId={}", sessionId);
// K8S使用Kubernetes API连接由ApiClient管理无需手动清理
// 当输入流关闭时K8S API会自动断开连接
protected void doCleanupConnection(String sessionId, K8sConnection conn) {
if (conn.call() != null && !conn.call().isCanceled()) {
try { conn.call().cancel(); } catch (Exception ignored) {}
}
if (conn.apiClient() != null) {
try {
okhttp3.OkHttpClient httpClient = conn.apiClient().getHttpClient();
if (httpClient.connectionPool() != null) {
httpClient.connectionPool().evictAll();
}
if (httpClient.dispatcher() != null &&
httpClient.dispatcher().executorService() != null) {
httpClient.dispatcher().executorService().shutdown();
}
} catch (Exception ignored) {}
}
log.debug("K8S连接清理完成: sessionId={}", sessionId);
}
}

View File

@ -3,223 +3,115 @@ package com.qqchen.deploy.backend.deploy.strategy.log;
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamStrategy;
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.Session;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* Server日志流策略
* 通过SSH连接执行tail -f命令获取日志流
*
* @author qqchen
* @since 2025-12-16
*/
@Slf4j
@Component
public class ServerLogStreamStrategy implements ILogStreamStrategy {
public class ServerLogStreamStrategy extends AbstractLogStreamStrategy<ServerLogStreamStrategy.SSHConnection> {
public record SSHConnection(SSHClient client, Session session, Session.Command command) {}
@Resource
private SSHCommandServiceFactory sshCommandServiceFactory;
/**
* 关键修复保存每个会话的SSH连接引用用于stop()时强制关闭
* sessionId SSHClient
*/
private final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
/**
* 关键修复保存每个会话的SSH Session引用
* sessionId SSH Session
*/
private final Map<String, Session> sshSessions = new ConcurrentHashMap<>();
/**
* 关键修复保存每个会话的Command引用
* sessionId Session.Command
*/
private final Map<String, Session.Command> sshCommands = new ConcurrentHashMap<>();
@Override
public RuntimeTypeEnum supportedType() {
return RuntimeTypeEnum.SERVER;
}
@Override
public void streamLogs(WebSocketSession session,
LogStreamTarget target,
AtomicBoolean paused,
LogLineCallback callback) throws Exception {
protected SSHConnection doCreateConnection(String sessionId, LogStreamTarget target) throws Exception {
log.info("创建Server日志流连接: sessionId={}, logFile={}, host={}",
sessionId, target.getLogFilePath(), target.getHost());
// 关键修复从session attributes获取增强后的sessionId与AbstractLogStreamWebSocketHandler保持一致
String sessionId = (String) session.getAttributes().get("logStreamSessionId");
if (sessionId == null) {
sessionId = session.getId(); // 降级方案
}
log.info("开始Server日志流: sessionId={}, logFile={}, host={}, session.isOpen={}",
sessionId, target.getLogFilePath(), target.getHost(), session.isOpen());
SSHClient sshClient = null;
Session sshSession = null;
Session.Command cmd = null;
try {
// 1. 建立SSH连接
ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType());
sshClient = sshService.createConnection(
target.getHost(),
target.getPort(),
target.getUsername(),
target.getPassword(),
target.getPrivateKey(),
target.getPassphrase()
SSHClient sshClient = sshService.createConnection(
target.getHost(), target.getPort(), target.getUsername(),
target.getPassword(), target.getPrivateKey(), target.getPassphrase()
);
// 关键修复保存SSH连接引用供stop()方法使用
sshClients.put(sessionId, sshClient);
// 2. 构建tail命令
// logQueryCommand已经包含完整的tail命令,只需要在后面加上-n参数
String command = target.getLogFilePath() + " -n " + target.getLines();
log.debug("Server日志命令: {}", command);
log.debug("执行Server日志命令: {}", command);
Session sshSession = sshClient.startSession();
Session.Command cmd = sshSession.exec(command);
// 3. 执行命令
sshSession = sshClient.startSession();
sshSessions.put(sessionId, sshSession);
return new SSHConnection(sshClient, sshSession, cmd);
}
cmd = sshSession.exec(command);
sshCommands.put(sessionId, cmd);
@Override
protected void doStreamLogs(String sessionId,
LogStreamTarget target,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception {
SSHConnection conn = getConnection(sessionId);
if (conn == null) {
log.warn("获取连接失败: sessionId={}", sessionId);
return;
}
// 4. 持续读取输出流
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(cmd.getInputStream()))) {
new InputStreamReader(conn.command().getInputStream()))) {
String line;
// 关键修复增加线程中断检查确保stop()能够中断阻塞的readLine()
while (session.isOpen() && !Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted()) {
line = reader.readLine();
// readLine()返回null表示流已关闭
if (line == null) {
log.debug("SSH输出流已关闭: sessionId={}", sessionId);
break;
}
// 检查暂停标志
if (paused.get()) {
Thread.sleep(100);
continue;
}
// 推送日志行使用当前时间作为时间戳
if (line == null) break;
if (isPaused.get()) { Thread.sleep(100); continue; }
callback.sendLogLine(Instant.now().toString(), line);
}
log.debug("Server日志流正常结束: sessionId={}, session.isOpen={}, interrupted={}",
sessionId, session.isOpen(), Thread.currentThread().isInterrupted());
}
} catch (java.net.SocketException e) {
// Socket关闭是正常的清理流程
log.debug("SSH Socket已关闭: sessionId={}", sessionId);
} catch (java.io.InterruptedIOException e) {
// 线程被中断正常的清理流程
log.debug("Server日志流线程被中断: sessionId={}", sessionId);
} catch (java.net.SocketException | java.io.InterruptedIOException e) {
log.debug("Server日志流中断: sessionId={}", sessionId);
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
log.debug("Server日志流线程被中断(sleep): sessionId={}", sessionId);
} catch (Exception e) {
if (session.isOpen()) {
log.error("Server日志流异常: sessionId={}", sessionId, e);
throw e;
} else {
log.debug("Server日志流异常(session已关闭): sessionId={}", sessionId);
}
} finally {
// 关键修复确保资源被清理
cleanupResources(sessionId, cmd, sshSession, sshClient);
log.info("Server日志流结束: sessionId={}", sessionId);
}
@Override
protected void doStop(String sessionId, SSHConnection conn) {
if (conn.command() != null) {
try { conn.command().getInputStream().close(); } catch (Exception ignored) {}
}
}
@Override
public void stop(String sessionId) {
log.info("停止Server日志流: sessionId={}", sessionId);
// 关键修复主动清理SSH资源不依赖finally块
Session.Command cmd = sshCommands.remove(sessionId);
Session sshSession = sshSessions.remove(sessionId);
SSHClient sshClient = sshClients.remove(sessionId);
cleanupResources(sessionId, cmd, sshSession, sshClient);
protected void doCleanupConnection(String sessionId, SSHConnection conn) {
if (conn.command() != null) {
try { conn.command().getInputStream().close(); } catch (Exception ignored) {}
try { conn.command().close(); } catch (Exception ignored) {}
}
/**
* 关键修复统一的资源清理方法
* 确保SSH连接SessionCommand都被正确关闭
*
* @param sessionId 会话ID
* @param cmd SSH Command
* @param sshSession SSH Session
* @param sshClient SSH Client
*/
private void cleanupResources(String sessionId, Session.Command cmd, Session sshSession, SSHClient sshClient) {
// 1. 先关闭Command的输入输出流强制中断阻塞的readLine()
if (cmd != null) {
if (conn.session() != null) {
try { conn.session().close(); } catch (Exception ignored) {}
}
if (conn.client() != null) {
try {
cmd.getInputStream().close();
} catch (Exception e) {
log.debug("关闭Command输入流失败: sessionId={}", sessionId, e);
if (conn.client().isConnected()) conn.client().disconnect();
conn.client().close();
} catch (Exception ignored) {}
}
try {
cmd.close();
} catch (Exception e) {
log.debug("关闭Command失败: sessionId={}", sessionId, e);
}
}
// 2. 关闭SSH Session
if (sshSession != null) {
try {
sshSession.close();
log.debug("SSH Session已关闭: sessionId={}", sessionId);
} catch (Exception e) {
log.debug("关闭SSH Session失败: sessionId={}", sessionId, e);
}
}
// 3. 断开SSH Client
if (sshClient != null) {
try {
if (sshClient.isConnected()) {
sshClient.disconnect();
}
sshClient.close();
log.debug("SSH Client已断开: sessionId={}", sessionId);
} catch (Exception e) {
log.debug("断开SSH Client失败: sessionId={}", sessionId, e);
}
}
// 4. 从Map中移除引用
sshCommands.remove(sessionId);
sshSessions.remove(sessionId);
sshClients.remove(sessionId);
log.debug("Server日志流资源清理完成: sessionId={}", sessionId);
log.debug("Server连接清理完成: sessionId={}", sessionId);
}
}

View File

@ -0,0 +1,130 @@
package com.qqchen.deploy.backend.framework.websocket.log;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* 日志流策略抽象基类
*
* <p>使用模板方法模式统一管理连接的生命周期
*
* <p>职责边界
* <ul>
* <li>只负责连接管理创建存储清理</li>
* <li>只负责日志读取逻辑</li>
* <li>不涉及 WebSocket 协议sessionId Handler 传入</li>
* </ul>
*
* @param <T> 连接类型子类定义的 record
* @author Framework
* @since 2025-12-30
*/
@Slf4j
public abstract class AbstractLogStreamStrategy<T> implements ILogStreamStrategy {
/**
* 连接容器sessionId 连接对象
*/
private final Map<String, T> connections = new ConcurrentHashMap<>();
// ==================== 模板方法 ====================
@Override
public final void streamLogs(String sessionId,
LogStreamTarget target,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception {
try {
// 1. 创建连接
T connection = doCreateConnection(sessionId, target);
if (connection != null) {
connections.put(sessionId, connection);
}
// 2. 执行日志流读取
doStreamLogs(sessionId, target, isPaused, callback);
} finally {
// 3. 清理连接
cleanup(sessionId);
}
}
@Override
public final void stop(String sessionId) {
log.info("停止日志流: sessionId={}, strategy={}", sessionId, getClass().getSimpleName());
T connection = connections.remove(sessionId);
if (connection == null) {
log.debug("连接已被清理或不存在: sessionId={}", sessionId);
return;
}
try {
doStop(sessionId, connection);
} catch (Exception e) {
log.warn("停止逻辑异常: sessionId={}", sessionId, e);
}
try {
doCleanupConnection(sessionId, connection);
} catch (Exception e) {
log.warn("清理连接异常: sessionId={}", sessionId, e);
}
}
/**
* 流结束时清理 finally 块调用
*/
private void cleanup(String sessionId) {
T connection = connections.remove(sessionId);
if (connection == null) {
return;
}
log.debug("流结束清理连接: sessionId={}", sessionId);
try {
doCleanupConnection(sessionId, connection);
} catch (Exception e) {
log.warn("清理连接异常: sessionId={}", sessionId, e);
}
}
// ==================== 连接访问 ====================
/**
* 获取连接供子类在 doStreamLogs 中使用
*/
protected T getConnection(String sessionId) {
return connections.get(sessionId);
}
// ==================== 抽象方法 ====================
/**
* 创建连接
*/
protected abstract T doCreateConnection(String sessionId, LogStreamTarget target) throws Exception;
/**
* 执行日志流读取
*/
protected abstract void doStreamLogs(String sessionId,
LogStreamTarget target,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception;
/**
* 停止逻辑如取消正在进行的 IO 操作
*/
protected abstract void doStop(String sessionId, T connection);
/**
* 清理连接资源
*/
protected abstract void doCleanupConnection(String sessionId, T connection);
}

View File

@ -22,7 +22,6 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@ -30,12 +29,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* 抽象日志流WebSocket处理器Framework层
* 提供通用的日志流WebSocket能力
*
* 子类需要实现3个方法
* 1. getLogStreamTarget(session) - 获取日志流目标信息
* 2. checkPermission(userId, target) - 权限验证
* 3. streamLogs(session, target, paused) - 执行日志流推送
* <p>使用模板方法模式提供通用的日志流WebSocket能力
* 子类需要实现3个抽象方法来定制业务逻辑
*
* <p>设计要点
* <ul>
* <li>单例Bean通过sessionId隔离不同用户的会话</li>
* <li>使用ConcurrentHashMap保证线程安全</li>
* <li>每个session独立的callback日志不会串</li>
* </ul>
*
* @author Framework
* @since 2025-12-16
@ -43,115 +46,67 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHandler {
// ========== 会话存储 ==========
// ==================== 会话数据结构 ====================
/**
* WebSocket会话存储sessionId WebSocketSession
* 日志流会话封装单个连接的所有状态
*
* @param webSocket WebSocket会话用于主动关闭统计
* @param task 异步任务用于取消
* @param paused 暂停标志
* @param strategy 日志流策略
* @param userId 用户ID用于SSH配额管理
*/
protected final Map<String, WebSocketSession> webSocketSessions = new ConcurrentHashMap<>();
public record LogStreamSession(
WebSocketSession webSocket,
Future<?> task,
AtomicBoolean paused,
ILogStreamStrategy strategy,
Long userId
) {}
/**
* 日志流任务存储sessionId Future
* 会话存储sessionId LogStreamSession
* <p>一个Map管理所有会话状态保证原子性
*/
protected final Map<String, Future<?>> streamTasks = new ConcurrentHashMap<>();
private final Map<String, LogStreamSession> sessions = new ConcurrentHashMap<>();
/**
* 暂停标志存储sessionId AtomicBoolean
*/
protected final Map<String, AtomicBoolean> pausedFlags = new ConcurrentHashMap<>();
// ==================== 依赖注入 ====================
/**
* 目标信息存储sessionId LogStreamTarget
*/
protected final Map<String, LogStreamTarget> sessionTargets = new ConcurrentHashMap<>();
/**
* 策略实例存储sessionId ILogStreamStrategy
*/
protected final Map<String, ILogStreamStrategy> sessionStrategies = new ConcurrentHashMap<>();
/**
* SSH会话管理器用于配额管理
*/
@Resource
private SSHSessionManager sshSessionManager;
/**
* 日志流输出监听线程池Framework自动注入
*/
@Resource(name = "logStreamOutputExecutor")
private AsyncTaskExecutor logStreamOutputExecutor;
/**
* 最大SSH连接数与SSH终端共享配额
*/
/** 最大SSH连接数与SSH终端共享配额 */
private static final int MAX_SSH_SESSIONS = 5;
// ========== 辅助方法 ==========
// ==================== 抽象方法子类实现 ====================
/**
* 获取增强的SessionId线程安全
*
* 使用SessionIdGenerator增强原始WebSocket SessionId确保并发场景下的唯一性
*
* @param session WebSocket会话
* @return 增强的SessionId
*/
protected String getSessionId(WebSocketSession session) {
// 1. 先尝试从session attributes中获取避免重复生成
String sessionId = (String) session.getAttributes().get("logStreamSessionId");
if (sessionId == null) {
// 2. 懒加载如果没有就生成并存储保证一致性
sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
session.getAttributes().put("logStreamSessionId", sessionId);
}
return sessionId;
}
// ========== 子类必须实现的抽象方法 ==========
/**
* 获取日志流目标信息由子类实现
*
* @param session WebSocket会话
* @param request 日志流请求
* @return 日志流目标信息
* @throws Exception 获取失败时抛出
* 获取日志流目标信息
*/
protected abstract LogStreamTarget getLogStreamTarget(WebSocketSession session, LogStreamRequest request) throws Exception;
/**
* 检查用户权限由子类实现
*
* @param userId 用户ID
* @param target 日志流目标
* @return 是否有权限
* 检查用户权限
*/
protected abstract boolean checkPermission(Long userId, LogStreamTarget target);
/**
* 获取日志流策略由子类实现
*
* @param target 日志流目标
* @return 日志流策略
* 获取日志流策略
*/
protected abstract ILogStreamStrategy getLogStreamStrategy(LogStreamTarget target);
// ========== Framework 提供的核心能力 ==========
// ==================== WebSocket生命周期 ====================
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionId = getSessionId(session);
log.info("日志流WebSocket连接建立: webSocketId={}, logStreamSessionId={}",
session.getId(), sessionId);
log.info("日志流WebSocket连接建立: sessionId={}", sessionId);
try {
// 1. 获取用户信息
Long userId = (Long) session.getAttributes().get("userId");
String username = (String) session.getAttributes().get("username");
if (userId == null) {
log.error("无法获取用户信息: sessionId={}", sessionId);
sendError(session, "认证失败");
@ -159,24 +114,10 @@ public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHan
return;
}
// 2. 保存会话
webSocketSessions.put(sessionId, session);
// 预注册会话只有基本信息task/strategy在START时填充
sessions.put(sessionId, new LogStreamSession(session, null, null, null, userId));
// 3. 不立即发送状态消息等待客户端发送START消息
// 参照SSH WebSocket的做法避免在客户端未准备好时发送消息
log.info("日志流连接成功等待START消息: sessionId={}, userId={}, username={}",
sessionId, userId, username);
} catch (Exception e) {
log.error("建立日志流连接失败: sessionId={}", sessionId, e);
cleanupSession(sessionId);
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException ex) {
log.error("关闭WebSocket会话失败: sessionId={}", sessionId, ex);
}
}
log.info("日志流连接成功等待START消息: sessionId={}, userId={}", sessionId, userId);
}
@Override
@ -186,118 +127,113 @@ public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHan
try {
LogWebSocketMessage msg = JsonUtils.fromJson(message.getPayload(), LogWebSocketMessage.class);
if (msg.getType() == LogMessageType.START) {
// 启动日志流
handleStartMessage(session, msg);
} else if (msg.getType() == LogMessageType.CONTROL) {
// 控制消息
handleControlMessage(session, msg);
} else {
log.warn("未知的消息类型: sessionId={}, type={}", sessionId, msg.getType());
switch (msg.getType()) {
case START -> handleStartMessage(session, sessionId, msg);
case CONTROL -> handleControlMessage(sessionId, msg);
default -> log.warn("未知的消息类型: sessionId={}, type={}", sessionId, msg.getType());
}
} catch (Exception e) {
log.error("处理WebSocket消息失败: sessionId={}", sessionId, e);
sendError(session, "消息处理失败: " + e.getMessage());
}
}
/**
* 处理START消息
*/
private void handleStartMessage(WebSocketSession session, LogWebSocketMessage msg) {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String sessionId = getSessionId(session);
log.info("日志流WebSocket连接关闭: sessionId={}, status={}", sessionId, status);
cleanupSession(sessionId);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String sessionId = getSessionId(session);
if (exception instanceof java.io.EOFException) {
log.debug("客户端关闭连接: sessionId={}", sessionId);
} else {
log.error("日志流WebSocket传输错误: sessionId={}", sessionId, exception);
sendError(session, "传输错误: " + exception.getMessage());
}
cleanupSession(sessionId);
try {
// 1. 提取请求参数
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException ignored) {}
}
// ==================== 消息处理 ====================
private void handleStartMessage(WebSocketSession session, String sessionId, LogWebSocketMessage msg) {
try {
// 1. 验证请求
LogStreamRequest request = msg.getRequest(LogStreamRequest.class);
if (request == null || !request.isValid()) {
log.warn("START消息参数无效: sessionId={}", sessionId);
sendError(session, "请求参数无效");
return;
}
// 2. 获取用户信息
Long userId = (Long) session.getAttributes().get("userId");
// 2. 获取当前会话
LogStreamSession currentSession = sessions.get(sessionId);
if (currentSession == null) {
sendError(session, "会话不存在");
return;
}
// 3. 获取日志流目标信息
Long userId = currentSession.userId();
// 3. 获取日志流目标
LogStreamTarget target = getLogStreamTarget(session, request);
if (target == null) {
log.error("无法获取日志流目标: sessionId={}", sessionId);
sendError(session, "无法获取日志流目标");
return;
}
// 保存target信息
sessionTargets.put(sessionId, target);
log.info("获取日志流目标成功: sessionId={}, runtimeType={}, name={}",
log.info("获取日志流目标: sessionId={}, runtimeType={}, name={}",
sessionId, target.getRuntimeType(), target.getName());
// 4. 权限验证
if (!checkPermission(userId, target)) {
log.warn("用户无权访问日志: userId={}, target={}", userId, target.getName());
sendError(session, "无权访问此日志");
return;
}
// 5. 对于需要SSH的运行时类型检查配额
if (target.getRuntimeType() == RuntimeTypeEnum.SERVER
|| target.getRuntimeType() == RuntimeTypeEnum.DOCKER) {
// 尝试注册会话包含配额检查
boolean registered = sshSessionManager.tryRegisterSession(
sessionId, userId,
SSHTargetType.LOG_STREAM,
target.getName(), MAX_SSH_SESSIONS);
if (!registered) {
long currentCount = sshSessionManager.countUserTotalSessions(userId);
log.warn("用户SSH连接数超过限制: userId={}, current={}, max={}",
userId, currentCount, MAX_SSH_SESSIONS);
sendError(session, "SSH连接数已达上限(" + MAX_SSH_SESSIONS + "个),请关闭其他连接后重试");
return; // 不启动日志流
// 5. SSH配额检查仅Server/Docker
if (requiresSSH(target.getRuntimeType())) {
if (!tryRegisterSSHSession(sessionId, userId, target.getName())) {
sendError(session, "SSH连接数已达上限(" + MAX_SSH_SESSIONS + "个)");
return;
}
}
log.debug("日志流SSH会话注册成功: sessionId={}, userId={}", sessionId, userId);
}
// 6. 发送流式传输状态
sendStatus(session, LogStatusEnum.STREAMING);
// 6. 创建暂停标志
AtomicBoolean paused = new AtomicBoolean(false);
pausedFlags.put(sessionId, paused);
// 7. 获取日志流策略
// 6. 获取策略
ILogStreamStrategy strategy = getLogStreamStrategy(target);
if (strategy == null) {
log.error("无法获取日志流策略: sessionId={}, runtimeType={}",
sessionId, target.getRuntimeType());
sendError(session, "不支持的运行时类型");
return;
}
// 保存策略实例用于后续清理
sessionStrategies.put(sessionId, strategy);
// 7. 创建暂停标志
AtomicBoolean paused = new AtomicBoolean(false);
// 8. 启动日志流任务异步使用Spring管理的虚拟线程池
// 8. 启动日志流任务
Future<?> task = logStreamOutputExecutor.submit(() -> {
try {
// 使用策略执行日志流
strategy.streamLogs(session, target, paused,
// Strategy 只接收 sessionId不接触 WebSocket
strategy.streamLogs(sessionId, target, paused::get,
(timestamp, content) -> sendLogLine(session, timestamp, content));
} catch (Exception e) {
log.error("日志流异常: sessionId={}", sessionId, e);
try {
sendError(session, "日志流中断: " + e.getMessage());
} catch (Exception ex) {
log.error("发送错误消息失败: sessionId={}", sessionId, ex);
}
}
});
streamTasks.put(sessionId, task);
// 9. 更新会话原子替换
sessions.put(sessionId, new LogStreamSession(session, task, paused, strategy, userId));
// 10. 发送状态
sendStatus(session, LogStatusEnum.STREAMING);
log.info("日志流已启动: sessionId={}", sessionId);
} catch (Exception e) {
@ -306,228 +242,173 @@ public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHan
}
}
/**
* 处理CONTROL消息
*/
private void handleControlMessage(WebSocketSession session, LogWebSocketMessage msg) {
String sessionId = getSessionId(session);
private void handleControlMessage(String sessionId, LogWebSocketMessage msg) {
LogStreamSession session = sessions.get(sessionId);
if (session == null || session.paused() == null) {
log.warn("日志流未启动,无法控制: sessionId={}", sessionId);
return;
}
try {
LogControlRequest request = msg.getRequest(LogControlRequest.class);
if (request == null || !request.isValid()) {
log.warn("CONTROL消息参数无效: sessionId={}", sessionId);
return;
}
AtomicBoolean paused = pausedFlags.get(sessionId);
if (paused == null) {
log.warn("日志流未启动,无法控制: sessionId={}", sessionId);
return;
}
LogControlAction action = request.getAction();
if (action == LogControlAction.PAUSE) {
paused.set(true);
sendStatus(session, LogStatusEnum.PAUSED);
switch (action) {
case PAUSE -> {
session.paused().set(true);
sendStatus(session.webSocket(), LogStatusEnum.PAUSED);
log.info("日志流已暂停: sessionId={}", sessionId);
} else if (action == LogControlAction.RESUME) {
paused.set(false);
sendStatus(session, LogStatusEnum.STREAMING);
log.info("日志流已恢复: sessionId={}", sessionId);
} else if (action == LogControlAction.STOP) {
log.info("收到停止请求: sessionId={}", sessionId);
session.close(CloseStatus.NORMAL);
}
case RESUME -> {
session.paused().set(false);
sendStatus(session.webSocket(), LogStatusEnum.STREAMING);
log.info("日志流已恢复: sessionId={}", sessionId);
}
case STOP -> {
log.info("收到停止请求: sessionId={}", sessionId);
session.webSocket().close(CloseStatus.NORMAL);
}
}
} catch (Exception e) {
log.error("处理CONTROL消息失败: sessionId={}", sessionId, e);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String sessionId = getSessionId(session);
log.info("日志流WebSocket连接关闭: logStreamSessionId={}, status={}", sessionId, status);
cleanupSession(sessionId);
}
// ==================== 资源清理 ====================
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String sessionId = getSessionId(session);
// EOFException通常表示客户端正常关闭连接不需要记录ERROR日志
if (exception instanceof java.io.EOFException) {
log.debug("客户端关闭连接: sessionId={}", sessionId);
cleanupSession(sessionId);
return;
}
log.error("日志流WebSocket传输错误: sessionId={}", sessionId, exception);
try {
sendError(session, "传输错误: " + exception.getMessage());
} catch (Exception e) {
// 忽略发送错误消息时的异常
log.debug("发送错误消息失败: sessionId={}", sessionId);
}
cleanupSession(sessionId);
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (IOException e) {
log.debug("关闭WebSocket会话失败: sessionId={}", sessionId);
}
}
/**
* 清理会话资源
*/
private void cleanupSession(String sessionId) {
log.info("开始清理日志流会话资源: sessionId={}", sessionId);
try {
// 1. 调用Strategy的stop方法清理资源SSH连接等
ILogStreamStrategy strategy = sessionStrategies.remove(sessionId);
if (strategy != null) {
try {
log.debug("调用Strategy.stop清理资源: sessionId={}", sessionId);
strategy.stop(sessionId);
} catch (Exception e) {
log.error("Strategy清理资源失败: sessionId={}", sessionId, e);
}
}
// 2. 从SSH会话管理器移除释放配额
try {
sshSessionManager.removeSession(sessionId);
log.debug("SSH会话已从配额管理器移除: sessionId={}", sessionId);
} catch (Exception e) {
log.error("从SSH会话管理器移除失败: sessionId={}", sessionId, e);
}
// 3. 取消日志流任务
Future<?> task = streamTasks.remove(sessionId);
if (task != null && !task.isDone()) {
log.debug("取消日志流任务: sessionId={}", sessionId);
task.cancel(true);
}
// 4. 移除WebSocketSession
webSocketSessions.remove(sessionId);
// 5. 移除暂停标志
pausedFlags.remove(sessionId);
// 6. 移除target信息
sessionTargets.remove(sessionId);
log.info("日志流会话资源清理完成: sessionId={}", sessionId);
} catch (Exception e) {
log.error("清理会话资源失败: sessionId={}", sessionId, e);
}
}
// ========== 辅助方法供子类使用 ==========
/**
* 发送日志行到前端
*
* @param session WebSocket会话
* @param timestamp 时间戳
* @param content 日志内容
*/
protected void sendLogLine(WebSocketSession session, String timestamp, String content) {
String sessionId = getSessionId(session);
try {
LogStreamSession session = sessions.remove(sessionId);
if (session == null) {
log.warn("发送日志行失败: session为null, sessionId={}", sessionId);
return;
}
if (!session.isOpen()) {
log.warn("发送日志行失败: session已关闭, sessionId={}", sessionId);
return;
}
log.info("清理日志流会话: sessionId={}", sessionId);
LogLineResponse response = new LogLineResponse(timestamp, content);
Map<String, Object> data = new HashMap<>();
data.put("response", response);
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.LOG, data);
// WebSocketSession.sendMessage()不是线程安全的必须加锁
synchronized (session) {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
}
log.trace("日志行已发送: sessionId={}, content={}", sessionId, content.substring(0, Math.min(50, content.length())));
} catch (IOException e) {
log.error("发送日志行失败(IOException): sessionId={}, error={}", sessionId, e.getMessage(), e);
// 1. 停止策略清理SSH/K8S连接
if (session.strategy() != null) {
try {
session.strategy().stop(sessionId);
} catch (Exception e) {
log.error("发送日志行失败(Exception): sessionId={}, error={}", sessionId, e.getMessage(), e);
log.error("Strategy清理失败: sessionId={}", sessionId, e);
}
}
/**
* 发送状态消息到前端
*/
protected void sendStatus(WebSocketSession session, LogStatusEnum status) {
try {
// 2. 释放SSH配额
sshSessionManager.removeSession(sessionId);
// 3. 取消异步任务
if (session.task() != null && !session.task().isDone()) {
session.task().cancel(true);
}
log.info("日志流会话清理完成: sessionId={}", sessionId);
}
// ==================== 辅助方法 ====================
private boolean requiresSSH(RuntimeTypeEnum runtimeType) {
return runtimeType == RuntimeTypeEnum.SERVER || runtimeType == RuntimeTypeEnum.DOCKER;
}
private boolean tryRegisterSSHSession(String sessionId, Long userId, String targetName) {
boolean registered = sshSessionManager.tryRegisterSession(
sessionId, userId, SSHTargetType.LOG_STREAM, targetName, MAX_SSH_SESSIONS);
if (!registered) {
long currentCount = sshSessionManager.countUserTotalSessions(userId);
log.warn("SSH连接数超限: userId={}, current={}, max={}", userId, currentCount, MAX_SSH_SESSIONS);
}
return registered;
}
protected String getSessionId(WebSocketSession session) {
String sessionId = (String) session.getAttributes().get("logStreamSessionId");
if (sessionId == null) {
sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
session.getAttributes().put("logStreamSessionId", sessionId);
}
return sessionId;
}
// ==================== 消息发送 ====================
private void sendLogLine(WebSocketSession session, String timestamp, String content) {
if (session == null || !session.isOpen()) {
log.debug("会话未打开,跳过发送状态消息: status={}", status);
return;
}
LogStatusResponse response = new LogStatusResponse(status);
try {
LogWebSocketMessage msg = LogWebSocketMessage.of(
LogMessageType.LOG,
new LogLineResponse(timestamp, content)
);
Map<String, Object> data = new HashMap<>();
data.put("response", response);
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.STATUS, data);
// WebSocketSession.sendMessage()不是线程安全的必须加锁
synchronized (session) {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
}
} catch (IOException e) {
// 降低日志级别客户端断开是正常情况
log.debug("发送状态消息失败(客户端可能已断开): logStreamSessionId={}, status={}",
getSessionId(session), status);
log.debug("发送日志行失败: sessionId={}", getSessionId(session));
}
}
protected void sendStatus(WebSocketSession session, LogStatusEnum status) {
if (session == null || !session.isOpen()) {
return;
}
try {
LogWebSocketMessage msg = LogWebSocketMessage.of(
LogMessageType.STATUS,
new LogStatusResponse(status)
);
synchronized (session) {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
}
} catch (IOException e) {
log.debug("发送状态消息失败: sessionId={}", getSessionId(session));
}
}
protected void sendError(WebSocketSession session, String error) {
if (session == null || !session.isOpen()) {
return;
}
try {
LogWebSocketMessage msg = LogWebSocketMessage.of(
LogMessageType.ERROR,
new LogErrorResponse(error)
);
synchronized (session) {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
}
} catch (IOException e) {
log.debug("发送错误消息失败: sessionId={}", getSessionId(session));
}
}
// ==================== 监控方法供子类或管理接口使用 ====================
/**
* 获取当前活跃会话数
*/
protected int getActiveSessionCount() {
return sessions.size();
}
/**
* 发送错误消息到前端
* 获取指定用户的会话数
*/
protected void sendError(WebSocketSession session, String error) {
try {
if (!session.isOpen()) {
return;
}
LogErrorResponse response = new LogErrorResponse(error);
Map<String, Object> data = new HashMap<>();
data.put("response", response);
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.ERROR, data);
// WebSocketSession.sendMessage()不是线程安全的必须加锁
synchronized (session) {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
}
} catch (IOException e) {
if (session.isOpen()) {
log.error("发送错误消息失败: logStreamSessionId={}", getSessionId(session), e);
}
}
protected long getUserSessionCount(Long userId) {
return sessions.values().stream()
.filter(s -> userId.equals(s.userId()))
.count();
}
}

View File

@ -1,13 +1,14 @@
package com.qqchen.deploy.backend.framework.websocket.log;
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
import org.springframework.web.socket.WebSocketSession;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* 日志流策略接口
* 定义不同运行时类型的日志流获取方式
*
* <p>定义不同运行时类型的日志流获取方式
* Strategy 只负责连接管理和日志读取不涉及 WebSocket 协议
*
* @author Framework
* @since 2025-12-16
@ -16,31 +17,27 @@ public interface ILogStreamStrategy {
/**
* 支持的运行时类型
*
* @return 运行时类型
*/
RuntimeTypeEnum supportedType();
/**
* 执行日志流推送
* 此方法应该持续读取日志并通过callback推送到前端
*
* @param session WebSocket会话
* @param sessionId 会话ID Handler 生成
* @param target 日志流目标信息
* @param paused 暂停标志实现应定期检查此标志
* @param callback 日志行回调接口
* @param isPaused 暂停状态查询 Handler 控制
* @param callback 日志行回调
* @throws Exception 流式推送失败时抛出
*/
void streamLogs(WebSocketSession session,
void streamLogs(String sessionId,
LogStreamTarget target,
AtomicBoolean paused,
Supplier<Boolean> isPaused,
LogLineCallback callback) throws Exception;
/**
* 停止日志流并清理资源
* 当WebSocket连接关闭时调用确保SSH连接等资源被正确释放
*
* @param sessionId WebSocket会话ID
* @param sessionId 会话ID
*/
void stop(String sessionId);
@ -49,12 +46,6 @@ public interface ILogStreamStrategy {
*/
@FunctionalInterface
interface LogLineCallback {
/**
* 发送日志行
*
* @param timestamp 时间戳
* @param content 日志内容
*/
void sendLogLine(String timestamp, String content);
}
}

View File

@ -34,6 +34,17 @@ public class LogWebSocketMessage {
*/
private Map<String, Object> data;
/**
* 创建响应消息的工厂方法
*
* @param type 消息类型
* @param response 响应对象
* @return LogWebSocketMessage
*/
public static LogWebSocketMessage of(LogMessageType type, Object response) {
return new LogWebSocketMessage(type, Map.of("response", response));
}
/**
* 从data中提取request对象强类型
*