1.33 日志通用查询
This commit is contained in:
parent
3469c8ccb1
commit
beb2cd6544
@ -1,8 +1,6 @@
|
||||
package com.qqchen.deploy.backend.deploy.api;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.TeamApplicationDTO;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.query.TeamApplicationQuery;
|
||||
import com.qqchen.deploy.backend.deploy.service.ITeamApplicationService;
|
||||
@ -79,56 +77,101 @@ public class TeamApplicationApiController extends BaseController<TeamApplication
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询应用日志
|
||||
* 统一的日志查询接口,支持K8S/Docker/Server三种运行时类型
|
||||
* 根据请求中的runtimeType字段自动选择对应的日志查询策略
|
||||
* 查询应用的Pod/容器名称列表
|
||||
*
|
||||
* <p>注意:此接口不影响现有的K8s Deployment日志查询接口
|
||||
* (/api/v1/k8s-deployment/{deploymentId}/pods/{podName}/logs)
|
||||
* <p>用于WebSocket日志流前,获取可用的Pod/容器名称列表供用户选择
|
||||
*
|
||||
* <p>请求示例:
|
||||
* <pre>
|
||||
* // K8S日志查询
|
||||
* POST /api/v1/team-applications/123/logs
|
||||
* {
|
||||
* "runtimeType": "K8S",
|
||||
* "podName": "backend-xxx",
|
||||
* "container": "app",
|
||||
* "referenceTimestamp": "newest",
|
||||
* "direction": "next",
|
||||
* "logCount": 500
|
||||
* }
|
||||
*
|
||||
* // Docker日志查询
|
||||
* POST /api/v1/team-applications/123/logs
|
||||
* {
|
||||
* "runtimeType": "DOCKER",
|
||||
* "containerName": "my-container",
|
||||
* "lines": 100,
|
||||
* "since": "1h"
|
||||
* }
|
||||
*
|
||||
* // Server日志查询
|
||||
* POST /api/v1/team-applications/123/logs
|
||||
* {
|
||||
* "runtimeType": "SERVER",
|
||||
* "lines": 100
|
||||
* }
|
||||
* </pre>
|
||||
* @param teamAppId 团队应用ID
|
||||
* @return Pod/容器名称列表
|
||||
*/
|
||||
@Operation(
|
||||
summary = "查询应用日志",
|
||||
description = "统一的日志查询接口,支持K8S/Docker/Server三种运行时类型。\n\n" +
|
||||
"使用POST请求,请求体中包含runtimeType字段用于区分不同类型的参数。\n\n" +
|
||||
"返回格式统一,前端无需关心运行时类型差异。"
|
||||
summary = "查询Pod/容器名称列表",
|
||||
description = "根据团队应用ID查询可用的Pod/容器名称列表。\n\n" +
|
||||
"- K8S运行时:返回Deployment下的所有Pod名称\n" +
|
||||
"- Docker运行时:返回配置的容器名称\n" +
|
||||
"- Server运行时:返回空列表(不需要选择)"
|
||||
)
|
||||
@PostMapping("/{teamAppId}/logs")
|
||||
public Response<LogQueryResponse> queryLogs(
|
||||
@Parameter(description = "团队应用ID", required = true) @PathVariable Long teamAppId,
|
||||
@Parameter(description = "日志查询请求(根据runtimeType自动反序列化为对应的子类)", required = true)
|
||||
@Validated @RequestBody BaseLogQueryRequest request
|
||||
@GetMapping("/{teamAppId}/pod-names")
|
||||
public Response<List<String>> listPodNames(
|
||||
@Parameter(description = "团队应用ID", required = true) @PathVariable Long teamAppId
|
||||
) {
|
||||
return Response.success(teamApplicationService.queryLogs(teamAppId, request));
|
||||
return Response.success(teamApplicationService.listPodNames(teamAppId));
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket实时日志流接口
|
||||
*
|
||||
* <p>端点:ws://host/api/v1/team-applications/{teamAppId}/logs/stream
|
||||
*
|
||||
* <p>连接建立后,前端需要发送START消息来启动日志流:
|
||||
* <pre>
|
||||
* {
|
||||
* "type": "START",
|
||||
* "data": {
|
||||
* "request": {
|
||||
* "name": "pod-name", // Pod名称(K8S必填)/容器名称(Docker可选)
|
||||
* "lines": 100 // 初始日志行数(可选,默认100)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <p>服务端推送消息格式:
|
||||
* <pre>
|
||||
* // 日志行
|
||||
* {
|
||||
* "type": "LOG",
|
||||
* "data": {
|
||||
* "response": {
|
||||
* "timestamp": "2025-12-16T10:30:00.123Z",
|
||||
* "content": "Application started"
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // 状态消息
|
||||
* {
|
||||
* "type": "STATUS",
|
||||
* "data": {
|
||||
* "response": {
|
||||
* "status": "STREAMING" // STREAMING | PAUSED | STOPPED | ERROR
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // 错误消息
|
||||
* {
|
||||
* "type": "ERROR",
|
||||
* "data": {
|
||||
* "response": {
|
||||
* "error": "Connection failed"
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <p>前端可发送控制消息:
|
||||
* <pre>
|
||||
* {
|
||||
* "type": "CONTROL",
|
||||
* "data": {
|
||||
* "request": {
|
||||
* "action": "PAUSE" // PAUSE | RESUME | STOP
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <p>注意:此WebSocket接口替代了之前的REST API日志查询接口,提供真正的实时日志流。
|
||||
*/
|
||||
@Operation(
|
||||
summary = "WebSocket实时日志流",
|
||||
description = "通过WebSocket实时推送应用日志,支持K8S/Docker/Server三种运行时类型。\n\n" +
|
||||
"连接地址:ws://host/api/v1/team-applications/{teamAppId}/logs/stream\n\n" +
|
||||
"详细使用方式请参见方法注释。"
|
||||
)
|
||||
public void logStreamWebSocketEndpoint() {
|
||||
// 此方法仅用于API文档展示,实际WebSocket端点在WebSocketConfig中注册
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package com.qqchen.deploy.backend.deploy.config;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.handler.ServerSSHWebSocketHandler;
|
||||
import com.qqchen.deploy.backend.deploy.handler.TeamApplicationLogStreamWebSocketHandler;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -19,6 +20,9 @@ public class WebSocketConfig implements WebSocketConfigurer {
|
||||
@Resource
|
||||
private ServerSSHWebSocketHandler serverSSHWebSocketHandler;
|
||||
|
||||
@Resource
|
||||
private TeamApplicationLogStreamWebSocketHandler teamApplicationLogStreamWebSocketHandler;
|
||||
|
||||
@Resource
|
||||
private WebSocketAuthInterceptor webSocketAuthInterceptor;
|
||||
|
||||
@ -30,5 +34,12 @@ public class WebSocketConfig implements WebSocketConfigurer {
|
||||
registry.addHandler(serverSSHWebSocketHandler, "/api/v1/server-ssh/connect/{serverId}")
|
||||
.addInterceptors(webSocketAuthInterceptor) // 添加认证拦截器
|
||||
.setAllowedOrigins("*"); // 生产环境建议配置具体的域名
|
||||
|
||||
log.info("注册WebSocket处理器: /api/v1/team-applications/{teamAppId}/logs/stream");
|
||||
|
||||
// 注册日志流WebSocket处理器(添加认证拦截器)
|
||||
registry.addHandler(teamApplicationLogStreamWebSocketHandler, "/api/v1/team-applications/{teamAppId}/logs/stream")
|
||||
.addInterceptors(webSocketAuthInterceptor) // 添加认证拦截器
|
||||
.setAllowedOrigins("*"); // 生产环境建议配置具体的域名
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package com.qqchen.deploy.backend.deploy.dto;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.BuildTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.enums.DevelopmentLanguageTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
@ -38,6 +39,9 @@ public class UserDeployableTeamEnvironmentApplicationDTO {
|
||||
@Schema(description = "构建类型(JENKINS-Jenkins构建,NATIVE-脚本部署)")
|
||||
private BuildTypeEnum buildType;
|
||||
|
||||
@Schema(description = "运行时类型(K8S-Kubernetes,DOCKER-Docker容器,SERVER-传统服务器)")
|
||||
private RuntimeTypeEnum runtimeType;
|
||||
|
||||
// ==================== 源Git配置(公司内部Git) ====================
|
||||
|
||||
@Schema(description = "源Git系统ID(公司Git)")
|
||||
|
||||
@ -1,30 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.dto.log;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
|
||||
/**
|
||||
* 日志查询请求基类
|
||||
* 不同运行时类型有不同的查询参数,使用多态设计
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@JsonTypeInfo(
|
||||
use = JsonTypeInfo.Id.NAME,
|
||||
include = JsonTypeInfo.As.PROPERTY,
|
||||
property = "runtimeType"
|
||||
)
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = K8sLogQueryRequest.class, name = "K8S"),
|
||||
@JsonSubTypes.Type(value = DockerLogQueryRequest.class, name = "DOCKER"),
|
||||
@JsonSubTypes.Type(value = ServerLogQueryRequest.class, name = "SERVER")
|
||||
})
|
||||
public abstract class BaseLogQueryRequest {
|
||||
|
||||
/**
|
||||
* 获取运行时类型
|
||||
*/
|
||||
public abstract RuntimeTypeEnum getRuntimeType();
|
||||
}
|
||||
@ -1,49 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.dto.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import jakarta.validation.constraints.Positive;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* Docker日志查询请求
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class DockerLogQueryRequest extends BaseLogQueryRequest {
|
||||
|
||||
/**
|
||||
* 运行时类型(用于Jackson反序列化)
|
||||
*/
|
||||
private final RuntimeTypeEnum runtimeType = RuntimeTypeEnum.DOCKER;
|
||||
|
||||
/**
|
||||
* 容器名称(可选,默认使用TeamApplication.dockerContainerName)
|
||||
*/
|
||||
private String containerName;
|
||||
|
||||
/**
|
||||
* 日志行数(可选,默认100)
|
||||
*/
|
||||
@Positive(message = "日志行数必须大于0")
|
||||
private Integer lines = 100;
|
||||
|
||||
/**
|
||||
* 时间范围(可选)
|
||||
* 例如:1h(1小时)、30m(30分钟)、2d(2天)
|
||||
*/
|
||||
private String since;
|
||||
|
||||
/**
|
||||
* 是否显示时间戳(可选,默认false)
|
||||
*/
|
||||
private Boolean timestamps = false;
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum getRuntimeType() {
|
||||
return RuntimeTypeEnum.DOCKER;
|
||||
}
|
||||
}
|
||||
@ -1,62 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.dto.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Positive;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* K8S日志查询请求
|
||||
* 复用现有K8S日志查询的参数设计
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class K8sLogQueryRequest extends BaseLogQueryRequest {
|
||||
|
||||
/**
|
||||
* 运行时类型(用于Jackson反序列化)
|
||||
*/
|
||||
private final RuntimeTypeEnum runtimeType = RuntimeTypeEnum.K8S;
|
||||
|
||||
/**
|
||||
* Pod名称(必填)
|
||||
*/
|
||||
@NotBlank(message = "Pod名称不能为空")
|
||||
private String podName;
|
||||
|
||||
/**
|
||||
* 容器名称(可选,默认第一个容器)
|
||||
*/
|
||||
private String container;
|
||||
|
||||
/**
|
||||
* 引用点时间戳(可选,默认newest)
|
||||
* 特殊值:
|
||||
* - "newest": 最新的日志行
|
||||
* - "oldest": 最早的日志行
|
||||
* - RFC3339时间戳:具体的时间点
|
||||
*/
|
||||
private String referenceTimestamp = "newest";
|
||||
|
||||
/**
|
||||
* 方向(可选,默认next)
|
||||
* - prev: 向上加载历史日志
|
||||
* - next: 向下加载新日志
|
||||
*/
|
||||
private String direction = "next";
|
||||
|
||||
/**
|
||||
* 每次加载的行数(可选,默认100)
|
||||
*/
|
||||
@Positive(message = "日志行数必须大于0")
|
||||
private Integer logCount = 100;
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum getRuntimeType() {
|
||||
return RuntimeTypeEnum.K8S;
|
||||
}
|
||||
}
|
||||
@ -1,51 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.dto.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 统一日志查询响应
|
||||
* 适用于K8S/Docker/Server等所有运行时类型
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class LogQueryResponse {
|
||||
|
||||
/**
|
||||
* 运行时类型
|
||||
*/
|
||||
private RuntimeTypeEnum runtimeType;
|
||||
|
||||
/**
|
||||
* 日志行列表
|
||||
*/
|
||||
private List<LogLine> logs;
|
||||
|
||||
/**
|
||||
* 是否还有更多日志
|
||||
*/
|
||||
private Boolean hasMore;
|
||||
|
||||
/**
|
||||
* 扩展元数据
|
||||
* 用于存放特定运行时类型的额外信息
|
||||
* 例如K8S的引用点信息:
|
||||
* - referenceForPrevious: K8sLogSelection对象
|
||||
* - referenceForNext: K8sLogSelection对象
|
||||
* - podName: Pod名称
|
||||
* - containerName: 容器名称
|
||||
* - truncated: 是否被截断
|
||||
*/
|
||||
private Map<String, Object> metadata;
|
||||
}
|
||||
@ -1,34 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.dto.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import jakarta.validation.constraints.Positive;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* Server日志查询请求
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class ServerLogQueryRequest extends BaseLogQueryRequest {
|
||||
|
||||
/**
|
||||
* 运行时类型(用于Jackson反序列化)
|
||||
*/
|
||||
private final RuntimeTypeEnum runtimeType = RuntimeTypeEnum.SERVER;
|
||||
|
||||
/**
|
||||
* 日志行数(可选,默认100)
|
||||
* 将作为占位符{lines}替换到logQueryCommand中
|
||||
*/
|
||||
@Positive(message = "日志行数必须大于0")
|
||||
private Integer lines = 100;
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum getRuntimeType() {
|
||||
return RuntimeTypeEnum.SERVER;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,217 @@
|
||||
package com.qqchen.deploy.backend.deploy.handler;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.entity.K8sNamespace;
|
||||
import com.qqchen.deploy.backend.deploy.entity.Server;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IK8sNamespaceRepository;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IServerRepository;
|
||||
import com.qqchen.deploy.backend.deploy.repository.ITeamApplicationRepository;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.log.DockerLogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.log.K8sLogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.log.ServerLogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.AbstractLogStreamWebSocketHandler;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.request.LogStreamRequest;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
/**
|
||||
* 团队应用日志流WebSocket处理器
|
||||
* 统一的日志流入口,根据runtimeType分发到不同的策略
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TeamApplicationLogStreamWebSocketHandler extends AbstractLogStreamWebSocketHandler {
|
||||
|
||||
@Resource
|
||||
private ITeamApplicationRepository teamApplicationRepository;
|
||||
|
||||
@Resource
|
||||
private IServerRepository serverRepository;
|
||||
|
||||
@Resource
|
||||
private IK8sNamespaceRepository k8sNamespaceRepository;
|
||||
|
||||
@Resource
|
||||
private K8sLogStreamStrategy k8sLogStreamStrategy;
|
||||
|
||||
@Resource
|
||||
private DockerLogStreamStrategy dockerLogStreamStrategy;
|
||||
|
||||
@Resource
|
||||
private ServerLogStreamStrategy serverLogStreamStrategy;
|
||||
|
||||
@Override
|
||||
protected LogStreamTarget getLogStreamTarget(WebSocketSession session, LogStreamRequest request) throws Exception {
|
||||
// 1. 从URL提取teamAppId
|
||||
Long teamAppId = extractTeamAppId(session);
|
||||
if (teamAppId == null) {
|
||||
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"无效的团队应用ID"});
|
||||
}
|
||||
|
||||
// 2. 查询TeamApplication
|
||||
TeamApplication teamApp = teamApplicationRepository.findById(teamAppId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.TEAM_APP_NOT_FOUND));
|
||||
|
||||
// 3. 根据runtimeType构建LogStreamTarget
|
||||
RuntimeTypeEnum runtimeType = teamApp.getRuntimeType();
|
||||
if (runtimeType == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_RUNTIME_TYPE_NOT_CONFIGURED);
|
||||
}
|
||||
|
||||
LogStreamTarget target = LogStreamTarget.builder()
|
||||
.runtimeType(runtimeType)
|
||||
.name(request.getName())
|
||||
.lines(request.getLines())
|
||||
.teamAppId(teamAppId)
|
||||
.build();
|
||||
|
||||
// 4. 根据不同的运行时类型填充特定字段
|
||||
switch (runtimeType) {
|
||||
case K8S:
|
||||
fillK8sTarget(target, teamApp);
|
||||
break;
|
||||
case DOCKER:
|
||||
fillDockerTarget(target, teamApp, request);
|
||||
break;
|
||||
case SERVER:
|
||||
fillServerTarget(target, teamApp);
|
||||
break;
|
||||
default:
|
||||
throw new BusinessException(ResponseCode.UNSUPPORTED_RUNTIME_TYPE,
|
||||
new Object[]{runtimeType});
|
||||
}
|
||||
|
||||
return target;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean checkPermission(Long userId, LogStreamTarget target) {
|
||||
// TODO: 实现权限验证
|
||||
// 检查用户是否有权限查看该团队应用的日志
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ILogStreamStrategy getLogStreamStrategy(LogStreamTarget target) {
|
||||
switch (target.getRuntimeType()) {
|
||||
case K8S:
|
||||
return k8sLogStreamStrategy;
|
||||
case DOCKER:
|
||||
return dockerLogStreamStrategy;
|
||||
case SERVER:
|
||||
return serverLogStreamStrategy;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充K8S目标信息
|
||||
*/
|
||||
private void fillK8sTarget(LogStreamTarget target, TeamApplication teamApp) {
|
||||
if (teamApp.getK8sSystemId() == null || teamApp.getK8sNamespaceId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_K8S_CONFIG_INCOMPLETE);
|
||||
}
|
||||
|
||||
// K8S运行时必须指定Pod名称
|
||||
if (target.getName() == null || target.getName().isBlank()) {
|
||||
throw new BusinessException(ResponseCode.INVALID_PARAM,
|
||||
new Object[]{"K8S运行时必须指定Pod名称(name字段)"});
|
||||
}
|
||||
|
||||
// 查询K8sNamespace获取实际的namespace名称
|
||||
K8sNamespace k8sNamespace =
|
||||
k8sNamespaceRepository.findById(teamApp.getK8sNamespaceId())
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.K8S_NAMESPACE_NOT_FOUND));
|
||||
|
||||
target.setK8sSystemId(teamApp.getK8sSystemId());
|
||||
target.setK8sNamespace(k8sNamespace.getNamespaceName());
|
||||
target.setK8sDeploymentId(teamApp.getK8sDeploymentId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充Docker目标信息
|
||||
*/
|
||||
private void fillDockerTarget(LogStreamTarget target, TeamApplication teamApp, LogStreamRequest request) {
|
||||
if (teamApp.getDockerServerId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_DOCKER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
|
||||
// 查询Docker服务器信息
|
||||
Server server = serverRepository.findById(teamApp.getDockerServerId())
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.DOCKER_SERVER_NOT_FOUND));
|
||||
|
||||
// 填充SSH连接信息
|
||||
target.setServerId(server.getId());
|
||||
target.setHost(server.getHostIp());
|
||||
target.setPort(server.getSshPort());
|
||||
target.setUsername(server.getSshUser());
|
||||
target.setAuthType(server.getAuthType());
|
||||
target.setPassword(server.getSshPassword());
|
||||
target.setPrivateKey(server.getSshPrivateKey());
|
||||
target.setPassphrase(server.getSshPassphrase());
|
||||
target.setOsType(server.getOsType());
|
||||
|
||||
// 容器名称:优先使用请求中的name,否则使用配置的默认值
|
||||
if (target.getName() == null || target.getName().isBlank()) {
|
||||
target.setName(teamApp.getDockerContainerName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充Server目标信息
|
||||
*/
|
||||
private void fillServerTarget(LogStreamTarget target, TeamApplication teamApp) {
|
||||
if (teamApp.getServerId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_SERVER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
|
||||
// 查询服务器信息
|
||||
Server server = serverRepository.findById(teamApp.getServerId())
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.SERVER_NOT_FOUND));
|
||||
|
||||
// 填充SSH连接信息
|
||||
target.setServerId(server.getId());
|
||||
target.setHost(server.getHostIp());
|
||||
target.setPort(server.getSshPort());
|
||||
target.setUsername(server.getSshUser());
|
||||
target.setAuthType(server.getAuthType());
|
||||
target.setPassword(server.getSshPassword());
|
||||
target.setPrivateKey(server.getSshPrivateKey());
|
||||
target.setPassphrase(server.getSshPassphrase());
|
||||
target.setOsType(server.getOsType());
|
||||
|
||||
// 日志文件路径:从TeamApplication配置中获取
|
||||
if (teamApp.getLogQueryCommand() == null || teamApp.getLogQueryCommand().isBlank()) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_LOG_COMMAND_NOT_CONFIGURED);
|
||||
}
|
||||
target.setLogFilePath(teamApp.getLogQueryCommand());
|
||||
}
|
||||
|
||||
/**
|
||||
* 从WebSocket session URL中提取teamAppId
|
||||
*/
|
||||
private Long extractTeamAppId(WebSocketSession session) {
|
||||
try {
|
||||
String path = session.getUri().getPath();
|
||||
// /api/v1/team-applications/{teamAppId}/logs/stream
|
||||
String[] parts = path.split("/");
|
||||
if (parts.length >= 5) {
|
||||
return Long.parseLong(parts[4]);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("提取teamAppId失败", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -241,6 +241,15 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取K8S ApiClient(带缓存)
|
||||
* 用于直接调用Kubernetes Java Client API
|
||||
*
|
||||
* @param system K8S系统配置
|
||||
* @return ApiClient实例
|
||||
*/
|
||||
io.kubernetes.client.openapi.ApiClient getApiClient(ExternalSystem system);
|
||||
|
||||
/**
|
||||
* 获取系统类型
|
||||
*
|
||||
|
||||
@ -1164,4 +1164,17 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
containerInfo.setMemoryLimit(memory.toSuffixedString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取K8S ApiClient(带缓存)
|
||||
* 复用内部缓存机制,避免重复创建连接
|
||||
*
|
||||
* @param system K8S系统配置
|
||||
* @return ApiClient实例
|
||||
*/
|
||||
@Override
|
||||
public ApiClient getApiClient(ExternalSystem system) {
|
||||
K8sApiClientCache cache = getApiClientCache(system);
|
||||
return cache.apiClient;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,22 +1,21 @@
|
||||
package com.qqchen.deploy.backend.deploy.service;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.TeamApplicationDTO;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.query.TeamApplicationQuery;
|
||||
import com.qqchen.deploy.backend.framework.service.IBaseService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ITeamApplicationService extends IBaseService<TeamApplication, TeamApplicationDTO, TeamApplicationQuery, Long> {
|
||||
|
||||
/**
|
||||
* 查询应用日志
|
||||
* 统一的日志查询接口,支持K8S/Docker/Server三种运行时类型
|
||||
*
|
||||
* 查询应用的Pod/容器名称列表
|
||||
* 用于前端下拉选择,仅返回名称列表
|
||||
*
|
||||
* @param teamAppId 团队应用ID
|
||||
* @param request 日志查询请求(多态,根据runtimeType自动反序列化为具体子类)
|
||||
* @return 统一的日志查询响应
|
||||
* @return Pod/容器名称列表
|
||||
*/
|
||||
LogQueryResponse queryLogs(Long teamAppId, BaseLogQueryRequest request);
|
||||
List<String> listPodNames(Long teamAppId);
|
||||
}
|
||||
|
||||
|
||||
@ -597,6 +597,7 @@ public class DeployServiceImpl implements IDeployService {
|
||||
dto.setApplicationDesc(app.getAppDesc());
|
||||
dto.setLanguage(app.getLanguage());
|
||||
dto.setBuildType(teamApp.getBuildType());
|
||||
dto.setRuntimeType(teamApp.getRuntimeType());
|
||||
|
||||
// 设置源Git配置
|
||||
dto.setSourceGitSystemId(teamApp.getSourceGitSystemId());
|
||||
|
||||
@ -2,8 +2,6 @@ package com.qqchen.deploy.backend.deploy.service.impl;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.converter.TeamApplicationConverter;
|
||||
import com.qqchen.deploy.backend.deploy.dto.TeamApplicationDTO;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.Application;
|
||||
import com.qqchen.deploy.backend.deploy.entity.DeployRecord;
|
||||
import com.qqchen.deploy.backend.deploy.entity.Environment;
|
||||
@ -12,6 +10,7 @@ import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.entity.K8sNamespace;
|
||||
import com.qqchen.deploy.backend.deploy.entity.K8sDeployment;
|
||||
import com.qqchen.deploy.backend.deploy.entity.Server;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.enums.BuildTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.query.TeamApplicationQuery;
|
||||
import com.qqchen.deploy.backend.deploy.entity.ExternalSystem;
|
||||
@ -26,9 +25,9 @@ import com.qqchen.deploy.backend.deploy.repository.ITeamRepository;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IK8sNamespaceRepository;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IK8sDeploymentRepository;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IServerRepository;
|
||||
import com.qqchen.deploy.backend.deploy.service.IK8sPodService;
|
||||
import com.qqchen.deploy.backend.deploy.integration.response.K8sPodResponse;
|
||||
import com.qqchen.deploy.backend.deploy.service.ITeamApplicationService;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.ILogQueryStrategy;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.LogQueryStrategyFactory;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
@ -75,9 +74,6 @@ public class TeamApplicationServiceImpl extends BaseServiceImpl<TeamApplication,
|
||||
@Resource
|
||||
private TeamApplicationConverter teamApplicationConverter;
|
||||
|
||||
@Resource
|
||||
private LogQueryStrategyFactory logQueryStrategyFactory;
|
||||
|
||||
@Resource
|
||||
private IK8sNamespaceRepository k8sNamespaceRepository;
|
||||
|
||||
@ -87,6 +83,9 @@ public class TeamApplicationServiceImpl extends BaseServiceImpl<TeamApplication,
|
||||
@Resource
|
||||
private IServerRepository serverRepository;
|
||||
|
||||
@Resource
|
||||
private IK8sPodService k8sPodService;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public TeamApplicationDTO create(TeamApplicationDTO dto) {
|
||||
@ -94,27 +93,31 @@ public class TeamApplicationServiceImpl extends BaseServiceImpl<TeamApplication,
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogQueryResponse queryLogs(Long teamAppId, BaseLogQueryRequest request) {
|
||||
log.info("查询应用日志,teamAppId: {}, runtimeType: {}", teamAppId, request.getRuntimeType());
|
||||
public List<String> listPodNames(Long teamAppId) {
|
||||
log.info("查询应用的Pod名称列表,teamAppId: {}", teamAppId);
|
||||
|
||||
// 1. 查询团队应用
|
||||
TeamApplication teamApp = teamApplicationRepository.findById(teamAppId)
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.TEAM_APPLICATION_NOT_FOUND));
|
||||
|
||||
// 2. 校验运行时类型是否匹配
|
||||
if (teamApp.getRuntimeType() != request.getRuntimeType()) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_RUNTIME_TYPE_MISMATCH,
|
||||
new Object[]{teamApp.getRuntimeType(), request.getRuntimeType()});
|
||||
// 2. 只有K8S类型才有Pod概念,其他类型返回空列表
|
||||
if (teamApp.getRuntimeType() != RuntimeTypeEnum.K8S) {
|
||||
log.debug("非K8S类型应用,返回空Pod列表,runtimeType: {}", teamApp.getRuntimeType());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 3. 获取对应的日志查询策略
|
||||
ILogQueryStrategy strategy = logQueryStrategyFactory.getStrategy(request.getRuntimeType());
|
||||
// 3. K8S类型:查询Pod列表
|
||||
if (teamApp.getK8sDeploymentId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_K8S_CONFIG_INCOMPLETE);
|
||||
}
|
||||
|
||||
// 4. 执行日志查询
|
||||
LogQueryResponse response = strategy.queryLogs(teamApp, request);
|
||||
List<K8sPodResponse> pods = k8sPodService.listPodsByDeployment(teamApp.getK8sDeploymentId());
|
||||
List<String> podNames = pods.stream()
|
||||
.map(K8sPodResponse::getName)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
log.info("日志查询完成,返回 {} 条日志", response.getLogs() != null ? response.getLogs().size() : 0);
|
||||
return response;
|
||||
log.info("查询到 {} 个Pod", podNames.size());
|
||||
return podNames;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -1,32 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
|
||||
/**
|
||||
* 日志查询策略接口
|
||||
* 使用策略模式支持不同运行时类型的日志查询
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
public interface ILogQueryStrategy {
|
||||
|
||||
/**
|
||||
* 支持的运行时类型
|
||||
*
|
||||
* @return 运行时类型枚举
|
||||
*/
|
||||
RuntimeTypeEnum supportedType();
|
||||
|
||||
/**
|
||||
* 查询日志
|
||||
*
|
||||
* @param teamApp 团队应用实体
|
||||
* @param request 日志查询请求
|
||||
* @return 统一的日志查询响应
|
||||
*/
|
||||
LogQueryResponse queryLogs(TeamApplication teamApp, BaseLogQueryRequest request);
|
||||
}
|
||||
@ -1,59 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 日志查询策略工厂
|
||||
* 根据运行时类型选择对应的策略实现
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class LogQueryStrategyFactory {
|
||||
|
||||
@Resource
|
||||
private List<ILogQueryStrategy> strategies;
|
||||
|
||||
private Map<RuntimeTypeEnum, ILogQueryStrategy> strategyMap;
|
||||
|
||||
/**
|
||||
* 初始化策略映射
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
strategyMap = new HashMap<>();
|
||||
for (ILogQueryStrategy strategy : strategies) {
|
||||
strategyMap.put(strategy.supportedType(), strategy);
|
||||
log.info("注册日志查询策略: {} -> {}",
|
||||
strategy.supportedType(), strategy.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取日志查询策略
|
||||
*
|
||||
* @param runtimeType 运行时类型
|
||||
* @return 对应的策略实现
|
||||
* @throws BusinessException 如果找不到对应的策略
|
||||
*/
|
||||
public ILogQueryStrategy getStrategy(RuntimeTypeEnum runtimeType) {
|
||||
ILogQueryStrategy strategy = strategyMap.get(runtimeType);
|
||||
if (strategy == null) {
|
||||
throw new BusinessException(ResponseCode.LOG_QUERY_STRATEGY_NOT_FOUND,
|
||||
new Object[]{runtimeType});
|
||||
}
|
||||
return strategy;
|
||||
}
|
||||
}
|
||||
@ -1,57 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.impl;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.DockerLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.ILogQueryStrategy;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Docker日志查询策略(骨架实现)
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DockerLogQueryStrategy implements ILogQueryStrategy {
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.DOCKER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogQueryResponse queryLogs(TeamApplication teamApp, BaseLogQueryRequest request) {
|
||||
// 类型转换
|
||||
DockerLogQueryRequest dockerRequest = (DockerLogQueryRequest) request;
|
||||
|
||||
// 校验Docker配置
|
||||
validateDockerConfig(teamApp);
|
||||
|
||||
// TODO: 实现Docker日志查询逻辑
|
||||
// 1. 获取Docker服务器连接信息
|
||||
// 2. 执行docker logs命令
|
||||
// 3. 解析日志输出
|
||||
// 4. 转换为统一格式
|
||||
|
||||
throw new BusinessException(ResponseCode.FEATURE_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验Docker配置是否完整
|
||||
*/
|
||||
private void validateDockerConfig(TeamApplication teamApp) {
|
||||
if (teamApp.getDockerServerId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_DOCKER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
if (teamApp.getDockerContainerName() == null || teamApp.getDockerContainerName().isBlank()) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_DOCKER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,115 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.impl;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.K8sLogLine;
|
||||
import com.qqchen.deploy.backend.deploy.dto.K8sPodLogsResponse;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.K8sLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogLine;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.service.IK8sPodService;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.ILogQueryStrategy;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* K8S日志查询策略
|
||||
* 复用现有的IK8sPodService实现
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class K8sLogQueryStrategy implements ILogQueryStrategy {
|
||||
|
||||
@Resource
|
||||
private IK8sPodService k8sPodService;
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.K8S;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogQueryResponse queryLogs(TeamApplication teamApp, BaseLogQueryRequest request) {
|
||||
// 类型转换
|
||||
K8sLogQueryRequest k8sRequest = (K8sLogQueryRequest) request;
|
||||
|
||||
// 校验K8S配置
|
||||
validateK8sConfig(teamApp);
|
||||
|
||||
// 直接使用K8S Deployment ID
|
||||
Long deploymentId = teamApp.getK8sDeploymentId();
|
||||
|
||||
// 调用现有的K8S日志查询服务
|
||||
K8sPodLogsResponse k8sResponse = k8sPodService.getPodLogsWithReference(
|
||||
deploymentId,
|
||||
k8sRequest.getPodName(),
|
||||
k8sRequest.getContainer(),
|
||||
k8sRequest.getReferenceTimestamp(),
|
||||
k8sRequest.getDirection(),
|
||||
k8sRequest.getLogCount()
|
||||
);
|
||||
|
||||
// 转换为统一格式
|
||||
return convertToLogQueryResponse(k8sResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验K8S配置是否完整
|
||||
*/
|
||||
private void validateK8sConfig(TeamApplication teamApp) {
|
||||
if (teamApp.getK8sSystemId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_K8S_CONFIG_INCOMPLETE);
|
||||
}
|
||||
if (teamApp.getK8sNamespaceId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_K8S_CONFIG_INCOMPLETE);
|
||||
}
|
||||
if (teamApp.getK8sDeploymentId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_K8S_CONFIG_INCOMPLETE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将K8sPodLogsResponse转换为统一的LogQueryResponse
|
||||
*/
|
||||
private LogQueryResponse convertToLogQueryResponse(K8sPodLogsResponse k8sResponse) {
|
||||
// 转换日志行
|
||||
List<LogLine> logs = k8sResponse.getLogs().stream()
|
||||
.map(this::convertLogLine)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 构建元数据
|
||||
Map<String, Object> metadata = new HashMap<>();
|
||||
metadata.put("podName", k8sResponse.getPodName());
|
||||
metadata.put("containerName", k8sResponse.getContainerName());
|
||||
metadata.put("referenceForPrevious", k8sResponse.getReferenceForPrevious());
|
||||
metadata.put("referenceForNext", k8sResponse.getReferenceForNext());
|
||||
metadata.put("truncated", k8sResponse.getTruncated());
|
||||
|
||||
// 构建响应
|
||||
return LogQueryResponse.builder()
|
||||
.runtimeType(RuntimeTypeEnum.K8S)
|
||||
.logs(logs)
|
||||
.hasMore(!Boolean.TRUE.equals(k8sResponse.getTruncated()))
|
||||
.metadata(metadata)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换单个日志行
|
||||
*/
|
||||
private LogLine convertLogLine(K8sLogLine k8sLogLine) {
|
||||
return new LogLine(k8sLogLine.getTimestamp(), k8sLogLine.getContent());
|
||||
}
|
||||
}
|
||||
@ -1,59 +0,0 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.impl;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.BaseLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.LogQueryResponse;
|
||||
import com.qqchen.deploy.backend.deploy.dto.log.ServerLogQueryRequest;
|
||||
import com.qqchen.deploy.backend.deploy.entity.TeamApplication;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.strategy.ILogQueryStrategy;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Server日志查询策略(骨架实现)
|
||||
* TODO: 待后续实现时注入ISSHCommandService
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ServerLogQueryStrategy implements ILogQueryStrategy {
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.SERVER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogQueryResponse queryLogs(TeamApplication teamApp, BaseLogQueryRequest request) {
|
||||
// 类型转换
|
||||
ServerLogQueryRequest serverRequest = (ServerLogQueryRequest) request;
|
||||
|
||||
// 校验Server配置
|
||||
validateServerConfig(teamApp);
|
||||
|
||||
// TODO: 实现Server日志查询逻辑
|
||||
// 1. 获取服务器连接信息
|
||||
// 2. 替换logQueryCommand中的占位符(如{lines})
|
||||
// 3. 通过SSH执行命令
|
||||
// 4. 解析命令输出
|
||||
// 5. 转换为统一格式
|
||||
|
||||
throw new BusinessException(ResponseCode.FEATURE_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验Server配置是否完整
|
||||
*/
|
||||
private void validateServerConfig(TeamApplication teamApp) {
|
||||
if (teamApp.getServerId() == null) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_SERVER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
if (teamApp.getLogQueryCommand() == null || teamApp.getLogQueryCommand().isBlank()) {
|
||||
throw new BusinessException(ResponseCode.TEAM_APP_SERVER_CONFIG_INCOMPLETE);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,151 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
|
||||
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.schmizz.sshj.SSHClient;
|
||||
import net.schmizz.sshj.connection.channel.direct.Session;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Docker日志流策略
|
||||
* 通过SSH连接执行docker logs -f命令获取日志流
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DockerLogStreamStrategy implements ILogStreamStrategy {
|
||||
|
||||
@Resource
|
||||
private SSHCommandServiceFactory sshCommandServiceFactory;
|
||||
|
||||
/**
|
||||
* SSH连接存储:sessionId → SSHClient
|
||||
*/
|
||||
private final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* SSH会话存储:sessionId → Session
|
||||
*/
|
||||
private final Map<String, Session> sshSessions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.DOCKER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamLogs(WebSocketSession session,
|
||||
LogStreamTarget target,
|
||||
AtomicBoolean paused,
|
||||
LogLineCallback callback) throws Exception {
|
||||
|
||||
String sessionId = session.getId();
|
||||
log.info("开始Docker日志流: sessionId={}, container={}, host={}",
|
||||
sessionId, target.getName(), target.getHost());
|
||||
|
||||
SSHClient sshClient = null;
|
||||
Session sshSession = null;
|
||||
|
||||
try {
|
||||
// 1. 建立SSH连接
|
||||
ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType());
|
||||
sshClient = sshService.createConnection(
|
||||
target.getHost(),
|
||||
target.getPort(),
|
||||
target.getUsername(),
|
||||
target.getPassword(),
|
||||
target.getPrivateKey(),
|
||||
target.getPassphrase()
|
||||
);
|
||||
|
||||
// 保存SSH连接,用于后续清理
|
||||
sshClients.put(sessionId, sshClient);
|
||||
|
||||
// 2. 构建docker logs命令
|
||||
String command = String.format("docker logs -f %s --tail %d",
|
||||
target.getName(), target.getLines());
|
||||
|
||||
log.debug("执行Docker日志命令: {}", command);
|
||||
|
||||
// 3. 执行命令
|
||||
sshSession = sshClient.startSession();
|
||||
sshSessions.put(sessionId, sshSession);
|
||||
|
||||
Session.Command cmd = sshSession.exec(command);
|
||||
|
||||
// 4. 持续读取输出流
|
||||
try (BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(cmd.getInputStream()))) {
|
||||
|
||||
String line;
|
||||
while (session.isOpen() && (line = reader.readLine()) != null) {
|
||||
// 检查暂停标志
|
||||
if (paused.get()) {
|
||||
Thread.sleep(100);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 推送日志行(Docker日志没有时间戳,使用当前时间)
|
||||
callback.sendLogLine(Instant.now().toString(), line);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Docker日志流正常结束: sessionId={}", sessionId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Docker日志流异常: sessionId={}", sessionId, e);
|
||||
throw e;
|
||||
} finally {
|
||||
// 清理资源(正常结束时)
|
||||
cleanupResources(sessionId, sshSession, sshClient);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String sessionId) {
|
||||
log.info("停止Docker日志流并清理资源: sessionId={}", sessionId);
|
||||
|
||||
Session sshSession = sshSessions.remove(sessionId);
|
||||
SSHClient sshClient = sshClients.remove(sessionId);
|
||||
|
||||
cleanupResources(sessionId, sshSession, sshClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理SSH资源
|
||||
*/
|
||||
private void cleanupResources(String sessionId, Session sshSession, SSHClient sshClient) {
|
||||
if (sshSession != null) {
|
||||
try {
|
||||
sshSession.close();
|
||||
log.debug("SSH Session已关闭: sessionId={}", sessionId);
|
||||
} catch (Exception e) {
|
||||
log.error("关闭SSH Session失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (sshClient != null) {
|
||||
try {
|
||||
sshClient.disconnect();
|
||||
log.debug("SSH Client已断开: sessionId={}", sessionId);
|
||||
} catch (Exception e) {
|
||||
log.error("断开SSH Client失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,119 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.entity.ExternalSystem;
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.deploy.integration.IK8sServiceIntegration;
|
||||
import com.qqchen.deploy.backend.deploy.repository.IExternalSystemRepository;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
|
||||
import io.kubernetes.client.openapi.ApiClient;
|
||||
import io.kubernetes.client.openapi.apis.CoreV1Api;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.Call;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* K8S日志流策略
|
||||
* 使用Kubernetes Java Client API获取Pod日志流
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class K8sLogStreamStrategy implements ILogStreamStrategy {
|
||||
|
||||
@Resource
|
||||
private IK8sServiceIntegration k8sServiceIntegration;
|
||||
|
||||
@Resource
|
||||
private IExternalSystemRepository externalSystemRepository;
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.K8S;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamLogs(WebSocketSession session,
|
||||
LogStreamTarget target,
|
||||
AtomicBoolean paused,
|
||||
LogLineCallback callback) throws Exception {
|
||||
|
||||
String sessionId = session.getId();
|
||||
log.info("开始K8S日志流: sessionId={}, pod={}, namespace={}",
|
||||
sessionId, target.getName(), target.getK8sNamespace());
|
||||
|
||||
try {
|
||||
// 1. 获取K8S系统配置
|
||||
ExternalSystem k8sSystem = externalSystemRepository.findById(target.getK8sSystemId())
|
||||
.orElseThrow(() -> new BusinessException(ResponseCode.K8S_SYSTEM_NOT_FOUND));
|
||||
|
||||
// 2. 获取ApiClient(使用集成服务的缓存机制)
|
||||
ApiClient apiClient = k8sServiceIntegration.getApiClient(k8sSystem);
|
||||
CoreV1Api api = new CoreV1Api(apiClient);
|
||||
|
||||
// 3. 调用K8S API获取日志流
|
||||
Call call = api.readNamespacedPodLogCall(
|
||||
target.getName(), // podName
|
||||
target.getK8sNamespace(), // namespace
|
||||
null, // container (null = default container)
|
||||
true, // follow = true (实时流)
|
||||
null, // insecureSkipTLSVerifyBackend
|
||||
null, // limitBytes
|
||||
"false", // pretty
|
||||
false, // previous
|
||||
null, // sinceSeconds
|
||||
target.getLines(), // tailLines
|
||||
true, // timestamps
|
||||
null // callback
|
||||
);
|
||||
|
||||
// 4. 执行调用并获取输入流
|
||||
InputStream inputStream = call.execute().body().byteStream();
|
||||
|
||||
// 5. 持续读取日志流
|
||||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
|
||||
String line;
|
||||
while (session.isOpen() && (line = reader.readLine()) != null) {
|
||||
// 检查暂停标志
|
||||
if (paused.get()) {
|
||||
Thread.sleep(100);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 解析K8S日志行(格式:timestamp content)
|
||||
String[] parts = line.split(" ", 2);
|
||||
String timestamp = parts.length > 0 ? parts[0] : Instant.now().toString();
|
||||
String content = parts.length > 1 ? parts[1] : line;
|
||||
|
||||
// 推送日志行
|
||||
callback.sendLogLine(timestamp, content);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("K8S日志流正常结束: sessionId={}", sessionId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("K8S日志流异常: sessionId={}", sessionId, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String sessionId) {
|
||||
log.info("停止K8S日志流: sessionId={}", sessionId);
|
||||
// K8S使用Kubernetes API,连接由ApiClient管理,无需手动清理
|
||||
// 当输入流关闭时,K8S API会自动断开连接
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,151 @@
|
||||
package com.qqchen.deploy.backend.deploy.strategy.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.framework.ssh.ISSHCommandService;
|
||||
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.ILogStreamStrategy;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.LogStreamTarget;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.schmizz.sshj.SSHClient;
|
||||
import net.schmizz.sshj.connection.channel.direct.Session;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Server日志流策略
|
||||
* 通过SSH连接执行tail -f命令获取日志流
|
||||
*
|
||||
* @author qqchen
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ServerLogStreamStrategy implements ILogStreamStrategy {
|
||||
|
||||
@Resource
|
||||
private SSHCommandServiceFactory sshCommandServiceFactory;
|
||||
|
||||
/**
|
||||
* SSH连接存储:sessionId → SSHClient
|
||||
*/
|
||||
private final Map<String, SSHClient> sshClients = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* SSH会话存储:sessionId → Session
|
||||
*/
|
||||
private final Map<String, Session> sshSessions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public RuntimeTypeEnum supportedType() {
|
||||
return RuntimeTypeEnum.SERVER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamLogs(WebSocketSession session,
|
||||
LogStreamTarget target,
|
||||
AtomicBoolean paused,
|
||||
LogLineCallback callback) throws Exception {
|
||||
|
||||
String sessionId = session.getId();
|
||||
log.info("开始Server日志流: sessionId={}, logFile={}, host={}",
|
||||
sessionId, target.getLogFilePath(), target.getHost());
|
||||
|
||||
SSHClient sshClient = null;
|
||||
Session sshSession = null;
|
||||
|
||||
try {
|
||||
// 1. 建立SSH连接
|
||||
ISSHCommandService sshService = sshCommandServiceFactory.getService(target.getOsType());
|
||||
sshClient = sshService.createConnection(
|
||||
target.getHost(),
|
||||
target.getPort(),
|
||||
target.getUsername(),
|
||||
target.getPassword(),
|
||||
target.getPrivateKey(),
|
||||
target.getPassphrase()
|
||||
);
|
||||
|
||||
// 保存SSH连接,用于后续清理
|
||||
sshClients.put(sessionId, sshClient);
|
||||
|
||||
// 2. 构建tail命令
|
||||
String command = String.format("tail -f %s -n %d",
|
||||
target.getLogFilePath(), target.getLines());
|
||||
|
||||
log.debug("执行Server日志命令: {}", command);
|
||||
|
||||
// 3. 执行命令
|
||||
sshSession = sshClient.startSession();
|
||||
sshSessions.put(sessionId, sshSession);
|
||||
|
||||
Session.Command cmd = sshSession.exec(command);
|
||||
|
||||
// 4. 持续读取输出流
|
||||
try (BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(cmd.getInputStream()))) {
|
||||
|
||||
String line;
|
||||
while (session.isOpen() && (line = reader.readLine()) != null) {
|
||||
// 检查暂停标志
|
||||
if (paused.get()) {
|
||||
Thread.sleep(100);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 推送日志行(使用当前时间作为时间戳)
|
||||
callback.sendLogLine(Instant.now().toString(), line);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Server日志流正常结束: sessionId={}", sessionId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Server日志流异常: sessionId={}", sessionId, e);
|
||||
throw e;
|
||||
} finally {
|
||||
// 清理资源(正常结束时)
|
||||
cleanupResources(sessionId, sshSession, sshClient);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String sessionId) {
|
||||
log.info("停止Server日志流并清理资源: sessionId={}", sessionId);
|
||||
|
||||
Session sshSession = sshSessions.remove(sessionId);
|
||||
SSHClient sshClient = sshClients.remove(sessionId);
|
||||
|
||||
cleanupResources(sessionId, sshSession, sshClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理SSH资源
|
||||
*/
|
||||
private void cleanupResources(String sessionId, Session sshSession, SSHClient sshClient) {
|
||||
if (sshSession != null) {
|
||||
try {
|
||||
sshSession.close();
|
||||
log.debug("SSH Session已关闭: sessionId={}", sessionId);
|
||||
} catch (Exception e) {
|
||||
log.error("关闭SSH Session失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (sshClient != null) {
|
||||
try {
|
||||
sshClient.disconnect();
|
||||
log.debug("SSH Client已断开: sessionId={}", sessionId);
|
||||
} catch (Exception e) {
|
||||
log.error("断开SSH Client失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
package com.qqchen.deploy.backend.framework.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 日志流控制动作枚举
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Getter
|
||||
public enum LogControlAction {
|
||||
|
||||
/**
|
||||
* 暂停日志流
|
||||
*/
|
||||
PAUSE("pause", "暂停日志流"),
|
||||
|
||||
/**
|
||||
* 恢复日志流
|
||||
*/
|
||||
RESUME("resume", "恢复日志流"),
|
||||
|
||||
/**
|
||||
* 停止日志流
|
||||
*/
|
||||
STOP("stop", "停止日志流");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
LogControlAction(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package com.qqchen.deploy.backend.framework.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 日志WebSocket消息类型枚举
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Getter
|
||||
public enum LogMessageType {
|
||||
|
||||
/**
|
||||
* 启动日志流(客户端 → 服务端)
|
||||
*/
|
||||
START("start", "启动日志流"),
|
||||
|
||||
/**
|
||||
* 控制消息(客户端 → 服务端)
|
||||
* 用于暂停、恢复、停止日志流
|
||||
*/
|
||||
CONTROL("control", "控制消息"),
|
||||
|
||||
/**
|
||||
* 日志行(服务端 → 客户端)
|
||||
*/
|
||||
LOG("log", "日志行"),
|
||||
|
||||
/**
|
||||
* 状态消息(服务端 → 客户端)
|
||||
*/
|
||||
STATUS("status", "状态消息"),
|
||||
|
||||
/**
|
||||
* 错误消息(服务端 → 客户端)
|
||||
*/
|
||||
ERROR("error", "错误消息");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
LogMessageType(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,46 @@
|
||||
package com.qqchen.deploy.backend.framework.enums;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 日志流状态枚举
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Getter
|
||||
public enum LogStatusEnum {
|
||||
|
||||
/**
|
||||
* 连接中
|
||||
*/
|
||||
CONNECTING("connecting", "连接中"),
|
||||
|
||||
/**
|
||||
* 已连接
|
||||
*/
|
||||
CONNECTED("connected", "已连接"),
|
||||
|
||||
/**
|
||||
* 流式传输中
|
||||
*/
|
||||
STREAMING("streaming", "流式传输中"),
|
||||
|
||||
/**
|
||||
* 已暂停
|
||||
*/
|
||||
PAUSED("paused", "已暂停"),
|
||||
|
||||
/**
|
||||
* 已关闭
|
||||
*/
|
||||
CLOSED("closed", "已关闭");
|
||||
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
LogStatusEnum(String code, String description) {
|
||||
this.code = code;
|
||||
this.description = description;
|
||||
}
|
||||
}
|
||||
@ -223,17 +223,34 @@ public enum ResponseCode {
|
||||
TEAM_MEMBER_NOT_FOUND(2925, "team.member.not.found"),
|
||||
TEAM_MEMBER_ALREADY_EXISTS(2926, "team.member.already.exists"),
|
||||
TEAM_APPLICATION_NOT_FOUND(2927, "team.application.not.found"),
|
||||
TEAM_APP_NOT_FOUND(2927, "team.application.not.found"), // 别名
|
||||
TEAM_APPLICATION_ALREADY_EXISTS(2928, "team.application.already.exists"),
|
||||
TEAM_CONFIG_NOT_FOUND(2929, "team.config.not.found"),
|
||||
TEAM_APPLICATION_DEPLOY_JOB_EXISTS(2930, "team.application.deploy.job.exists"),
|
||||
TEAM_APPLICATION_RUNTIME_TYPE_NOT_CONFIGURED(2931, "team.application.runtime.type.not.configured"),
|
||||
TEAM_APP_RUNTIME_TYPE_NOT_CONFIGURED(2931, "team.application.runtime.type.not.configured"), // 别名
|
||||
TEAM_APPLICATION_LOG_QUERY_STRATEGY_NOT_FOUND(2932, "team.application.log.query.strategy.not.found"),
|
||||
TEAM_APP_RUNTIME_TYPE_MISMATCH(2933, "team.application.runtime.type.mismatch"),
|
||||
TEAM_APP_K8S_CONFIG_INCOMPLETE(2934, "team.application.k8s.config.incomplete"),
|
||||
TEAM_APP_DOCKER_CONFIG_INCOMPLETE(2935, "team.application.docker.config.incomplete"),
|
||||
TEAM_APP_SERVER_CONFIG_INCOMPLETE(2936, "team.application.server.config.incomplete"),
|
||||
TEAM_APP_LOG_COMMAND_NOT_CONFIGURED(2948, "team.application.log.command.not.configured"),
|
||||
LOG_QUERY_STRATEGY_NOT_FOUND(2937, "log.query.strategy.not.found"),
|
||||
FEATURE_NOT_IMPLEMENTED(2938, "feature.not.implemented"),
|
||||
UNSUPPORTED_RUNTIME_TYPE(2949, "unsupported.runtime.type"),
|
||||
|
||||
// 运行时状态检查相关错误码 (2939-2947)
|
||||
K8S_SYSTEM_NOT_FOUND(2950, "k8s.system.not.found"),
|
||||
K8S_NAMESPACE_NOT_FOUND(2951, "k8s.namespace.not.found"),
|
||||
K8S_DEPLOYMENT_NOT_FOUND(2939, "k8s.deployment.not.found"),
|
||||
K8S_POD_NOT_FOUND(2940, "k8s.pod.not.found"),
|
||||
K8S_POD_NAME_NOT_FOUND(2941, "k8s.pod.name.not.found"),
|
||||
DOCKER_SERVER_NOT_FOUND(2942, "docker.server.not.found"),
|
||||
DOCKER_SERVER_UNREACHABLE(2943, "docker.server.unreachable"),
|
||||
DOCKER_CONTAINER_NOT_FOUND(2944, "docker.container.not.found"),
|
||||
DOCKER_CONTAINER_NOT_RUNNING(2945, "docker.container.not.running"),
|
||||
SERVER_NOT_FOUND_RUNTIME(2946, "server.not.found.runtime"),
|
||||
SERVER_UNREACHABLE(2947, "server.unreachable"),
|
||||
|
||||
// 服务器管理相关错误码 (2950-2969)
|
||||
SERVER_NOT_FOUND(2950, "server.not.found"),
|
||||
@ -288,14 +305,11 @@ public enum ResponseCode {
|
||||
K8S_CONFIG_EMPTY(3224, "k8s.config.empty"),
|
||||
K8S_KUBECONFIG_INVALID(3225, "k8s.kubeconfig.invalid"),
|
||||
K8S_KUBECONFIG_EMPTY(3226, "k8s.kubeconfig.empty"),
|
||||
K8S_NAMESPACE_NOT_FOUND(3227, "k8s.namespace.not.found"),
|
||||
K8S_DEPLOYMENT_NOT_FOUND(3228, "k8s.deployment.not.found"),
|
||||
K8S_API_ERROR(3229, "k8s.api.error"),
|
||||
K8S_SERVER_ERROR(3230, "k8s.server.error"),
|
||||
K8S_SYNC_FAILED(3231, "k8s.sync.failed"),
|
||||
K8S_NAMESPACE_SYNC_FAILED(3232, "k8s.namespace.sync.failed"),
|
||||
K8S_DEPLOYMENT_SYNC_FAILED(3233, "k8s.deployment.sync.failed"),
|
||||
K8S_POD_NOT_FOUND(3234, "k8s.pod.not.found"),
|
||||
K8S_RESOURCE_NOT_FOUND(3235, "k8s.resource.not.found"),
|
||||
K8S_OPERATION_FAILED(3236, "k8s.operation.failed"),
|
||||
|
||||
|
||||
@ -0,0 +1,433 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.enums.LogControlAction;
|
||||
import com.qqchen.deploy.backend.framework.enums.LogMessageType;
|
||||
import com.qqchen.deploy.backend.framework.enums.LogStatusEnum;
|
||||
import com.qqchen.deploy.backend.framework.utils.JsonUtils;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.request.LogControlRequest;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.request.LogStreamRequest;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.response.LogErrorResponse;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.response.LogLineResponse;
|
||||
import com.qqchen.deploy.backend.framework.websocket.log.response.LogStatusResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* 抽象日志流WebSocket处理器(Framework层)
|
||||
* 提供通用的日志流WebSocket能力
|
||||
*
|
||||
* 子类需要实现3个方法:
|
||||
* 1. getLogStreamTarget(session) - 获取日志流目标信息
|
||||
* 2. checkPermission(userId, target) - 权限验证
|
||||
* 3. streamLogs(session, target, paused) - 执行日志流推送
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractLogStreamWebSocketHandler extends TextWebSocketHandler {
|
||||
|
||||
// ========== 会话存储 ==========
|
||||
|
||||
/**
|
||||
* WebSocket会话存储:sessionId → WebSocketSession
|
||||
*/
|
||||
protected final Map<String, WebSocketSession> webSocketSessions = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 日志流任务存储:sessionId → Future
|
||||
*/
|
||||
protected final Map<String, Future<?>> streamTasks = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 暂停标志存储:sessionId → AtomicBoolean
|
||||
*/
|
||||
protected final Map<String, AtomicBoolean> pausedFlags = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 目标信息存储:sessionId → LogStreamTarget
|
||||
*/
|
||||
protected final Map<String, LogStreamTarget> sessionTargets = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 策略实例存储:sessionId → ILogStreamStrategy
|
||||
*/
|
||||
protected final Map<String, ILogStreamStrategy> sessionStrategies = new ConcurrentHashMap<>();
|
||||
|
||||
// ========== 子类必须实现的抽象方法 ==========
|
||||
|
||||
/**
|
||||
* 获取日志流目标信息(由子类实现)
|
||||
*
|
||||
* @param session WebSocket会话
|
||||
* @param request 日志流请求
|
||||
* @return 日志流目标信息
|
||||
* @throws Exception 获取失败时抛出
|
||||
*/
|
||||
protected abstract LogStreamTarget getLogStreamTarget(WebSocketSession session, LogStreamRequest request) throws Exception;
|
||||
|
||||
/**
|
||||
* 检查用户权限(由子类实现)
|
||||
*
|
||||
* @param userId 用户ID
|
||||
* @param target 日志流目标
|
||||
* @return 是否有权限
|
||||
*/
|
||||
protected abstract boolean checkPermission(Long userId, LogStreamTarget target);
|
||||
|
||||
/**
|
||||
* 获取日志流策略(由子类实现)
|
||||
*
|
||||
* @param target 日志流目标
|
||||
* @return 日志流策略
|
||||
*/
|
||||
protected abstract ILogStreamStrategy getLogStreamStrategy(LogStreamTarget target);
|
||||
|
||||
// ========== Framework 提供的核心能力 ==========
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
String sessionId = session.getId();
|
||||
log.info("日志流WebSocket连接建立: sessionId={}", sessionId);
|
||||
|
||||
try {
|
||||
// 1. 获取用户信息
|
||||
Long userId = (Long) session.getAttributes().get("userId");
|
||||
String username = (String) session.getAttributes().get("username");
|
||||
|
||||
if (userId == null) {
|
||||
log.error("无法获取用户信息: sessionId={}", sessionId);
|
||||
sendError(session, "认证失败");
|
||||
session.close(CloseStatus.POLICY_VIOLATION);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 保存会话
|
||||
webSocketSessions.put(sessionId, session);
|
||||
|
||||
// 3. 不立即发送状态消息,等待客户端发送START消息
|
||||
// 参照SSH WebSocket的做法,避免在客户端未准备好时发送消息
|
||||
log.info("日志流连接成功,等待START消息: sessionId={}, userId={}, username={}",
|
||||
sessionId, userId, username);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("建立日志流连接失败: sessionId={}", sessionId, e);
|
||||
cleanupSession(sessionId);
|
||||
|
||||
try {
|
||||
session.close(CloseStatus.SERVER_ERROR);
|
||||
} catch (IOException ex) {
|
||||
log.error("关闭WebSocket会话失败: sessionId={}", sessionId, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
|
||||
String sessionId = session.getId();
|
||||
|
||||
try {
|
||||
LogWebSocketMessage msg = JsonUtils.fromJson(message.getPayload(), LogWebSocketMessage.class);
|
||||
|
||||
if (msg.getType() == LogMessageType.START) {
|
||||
// 启动日志流
|
||||
handleStartMessage(session, msg);
|
||||
} else if (msg.getType() == LogMessageType.CONTROL) {
|
||||
// 控制消息
|
||||
handleControlMessage(session, msg);
|
||||
} else {
|
||||
log.warn("未知的消息类型: sessionId={}, type={}", sessionId, msg.getType());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理WebSocket消息失败: sessionId={}", sessionId, e);
|
||||
sendError(session, "消息处理失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理START消息
|
||||
*/
|
||||
private void handleStartMessage(WebSocketSession session, LogWebSocketMessage msg) {
|
||||
String sessionId = session.getId();
|
||||
|
||||
try {
|
||||
// 1. 提取请求参数
|
||||
LogStreamRequest request = msg.getRequest(LogStreamRequest.class);
|
||||
if (request == null || !request.isValid()) {
|
||||
log.warn("START消息参数无效: sessionId={}", sessionId);
|
||||
sendError(session, "请求参数无效");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 获取用户信息
|
||||
Long userId = (Long) session.getAttributes().get("userId");
|
||||
|
||||
// 3. 获取日志流目标信息
|
||||
LogStreamTarget target = getLogStreamTarget(session, request);
|
||||
if (target == null) {
|
||||
log.error("无法获取日志流目标: sessionId={}", sessionId);
|
||||
sendError(session, "无法获取日志流目标");
|
||||
return;
|
||||
}
|
||||
|
||||
// 保存target信息
|
||||
sessionTargets.put(sessionId, target);
|
||||
|
||||
log.info("获取日志流目标成功: sessionId={}, runtimeType={}, name={}",
|
||||
sessionId, target.getRuntimeType(), target.getName());
|
||||
|
||||
// 4. 权限验证
|
||||
if (!checkPermission(userId, target)) {
|
||||
log.warn("用户无权访问日志: userId={}, target={}", userId, target.getName());
|
||||
sendError(session, "无权访问此日志");
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 发送流式传输状态
|
||||
sendStatus(session, LogStatusEnum.STREAMING);
|
||||
|
||||
// 6. 创建暂停标志
|
||||
AtomicBoolean paused = new AtomicBoolean(false);
|
||||
pausedFlags.put(sessionId, paused);
|
||||
|
||||
// 7. 获取日志流策略
|
||||
ILogStreamStrategy strategy = getLogStreamStrategy(target);
|
||||
if (strategy == null) {
|
||||
log.error("无法获取日志流策略: sessionId={}, runtimeType={}",
|
||||
sessionId, target.getRuntimeType());
|
||||
sendError(session, "不支持的运行时类型");
|
||||
return;
|
||||
}
|
||||
|
||||
// 保存策略实例,用于后续清理
|
||||
sessionStrategies.put(sessionId, strategy);
|
||||
|
||||
// 8. 启动日志流任务(异步)
|
||||
Future<?> task = java.util.concurrent.Executors.newSingleThreadExecutor().submit(() -> {
|
||||
try {
|
||||
// 使用策略执行日志流
|
||||
strategy.streamLogs(session, target, paused,
|
||||
(timestamp, content) -> sendLogLine(session, timestamp, content));
|
||||
} catch (Exception e) {
|
||||
log.error("日志流异常: sessionId={}", sessionId, e);
|
||||
try {
|
||||
sendError(session, "日志流中断: " + e.getMessage());
|
||||
} catch (Exception ex) {
|
||||
log.error("发送错误消息失败: sessionId={}", sessionId, ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
streamTasks.put(sessionId, task);
|
||||
log.info("日志流已启动: sessionId={}", sessionId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理START消息失败: sessionId={}", sessionId, e);
|
||||
sendError(session, "启动日志流失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理CONTROL消息
|
||||
*/
|
||||
private void handleControlMessage(WebSocketSession session, LogWebSocketMessage msg) {
|
||||
String sessionId = session.getId();
|
||||
|
||||
try {
|
||||
LogControlRequest request = msg.getRequest(LogControlRequest.class);
|
||||
if (request == null || !request.isValid()) {
|
||||
log.warn("CONTROL消息参数无效: sessionId={}", sessionId);
|
||||
return;
|
||||
}
|
||||
|
||||
AtomicBoolean paused = pausedFlags.get(sessionId);
|
||||
if (paused == null) {
|
||||
log.warn("日志流未启动,无法控制: sessionId={}", sessionId);
|
||||
return;
|
||||
}
|
||||
|
||||
LogControlAction action = request.getAction();
|
||||
|
||||
if (action == LogControlAction.PAUSE) {
|
||||
paused.set(true);
|
||||
sendStatus(session, LogStatusEnum.PAUSED);
|
||||
log.info("日志流已暂停: sessionId={}", sessionId);
|
||||
} else if (action == LogControlAction.RESUME) {
|
||||
paused.set(false);
|
||||
sendStatus(session, LogStatusEnum.STREAMING);
|
||||
log.info("日志流已恢复: sessionId={}", sessionId);
|
||||
} else if (action == LogControlAction.STOP) {
|
||||
log.info("收到停止请求: sessionId={}", sessionId);
|
||||
session.close(CloseStatus.NORMAL);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理CONTROL消息失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||
String sessionId = session.getId();
|
||||
log.info("日志流WebSocket连接关闭: sessionId={}, status={}", sessionId, status);
|
||||
cleanupSession(sessionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||
String sessionId = session.getId();
|
||||
|
||||
// EOFException通常表示客户端正常关闭连接,不需要记录ERROR日志
|
||||
if (exception instanceof java.io.EOFException) {
|
||||
log.debug("客户端关闭连接: sessionId={}", sessionId);
|
||||
cleanupSession(sessionId);
|
||||
return;
|
||||
}
|
||||
|
||||
log.error("日志流WebSocket传输错误: sessionId={}", sessionId, exception);
|
||||
|
||||
try {
|
||||
sendError(session, "传输错误: " + exception.getMessage());
|
||||
} catch (Exception e) {
|
||||
// 忽略发送错误消息时的异常
|
||||
log.debug("发送错误消息失败: sessionId={}", sessionId);
|
||||
}
|
||||
|
||||
cleanupSession(sessionId);
|
||||
|
||||
try {
|
||||
session.close(CloseStatus.SERVER_ERROR);
|
||||
} catch (IOException e) {
|
||||
log.debug("关闭WebSocket会话失败: sessionId={}", sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理会话资源
|
||||
*/
|
||||
private void cleanupSession(String sessionId) {
|
||||
log.info("开始清理日志流会话资源: sessionId={}", sessionId);
|
||||
|
||||
try {
|
||||
// 1. 调用Strategy的stop方法清理资源(SSH连接等)
|
||||
ILogStreamStrategy strategy = sessionStrategies.remove(sessionId);
|
||||
if (strategy != null) {
|
||||
try {
|
||||
log.debug("调用Strategy.stop清理资源: sessionId={}", sessionId);
|
||||
strategy.stop(sessionId);
|
||||
} catch (Exception e) {
|
||||
log.error("Strategy清理资源失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 取消日志流任务
|
||||
Future<?> task = streamTasks.remove(sessionId);
|
||||
if (task != null && !task.isDone()) {
|
||||
log.debug("取消日志流任务: sessionId={}", sessionId);
|
||||
task.cancel(true);
|
||||
}
|
||||
|
||||
// 3. 移除WebSocketSession
|
||||
webSocketSessions.remove(sessionId);
|
||||
|
||||
// 4. 移除暂停标志
|
||||
pausedFlags.remove(sessionId);
|
||||
|
||||
// 5. 移除target信息
|
||||
sessionTargets.remove(sessionId);
|
||||
|
||||
log.info("日志流会话资源清理完成: sessionId={}", sessionId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("清理会话资源失败: sessionId={}", sessionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 辅助方法(供子类使用) ==========
|
||||
|
||||
/**
|
||||
* 发送日志行到前端
|
||||
*
|
||||
* @param session WebSocket会话
|
||||
* @param timestamp 时间戳
|
||||
* @param content 日志内容
|
||||
*/
|
||||
protected void sendLogLine(WebSocketSession session, String timestamp, String content) {
|
||||
try {
|
||||
if (session == null || !session.isOpen()) {
|
||||
return;
|
||||
}
|
||||
|
||||
LogLineResponse response = new LogLineResponse(timestamp, content);
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("response", response);
|
||||
|
||||
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.LOG, data);
|
||||
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
|
||||
|
||||
} catch (IOException e) {
|
||||
log.debug("发送日志行失败: sessionId={}", session.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送状态消息到前端
|
||||
*/
|
||||
protected void sendStatus(WebSocketSession session, LogStatusEnum status) {
|
||||
try {
|
||||
if (session == null || !session.isOpen()) {
|
||||
log.debug("会话未打开,跳过发送状态消息: status={}", status);
|
||||
return;
|
||||
}
|
||||
|
||||
LogStatusResponse response = new LogStatusResponse(status);
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("response", response);
|
||||
|
||||
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.STATUS, data);
|
||||
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
|
||||
|
||||
} catch (IOException e) {
|
||||
// 降低日志级别,客户端断开是正常情况
|
||||
log.debug("发送状态消息失败(客户端可能已断开): sessionId={}, status={}",
|
||||
session.getId(), status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送错误消息到前端
|
||||
*/
|
||||
protected void sendError(WebSocketSession session, String error) {
|
||||
try {
|
||||
if (!session.isOpen()) {
|
||||
return;
|
||||
}
|
||||
|
||||
LogErrorResponse response = new LogErrorResponse(error);
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("response", response);
|
||||
|
||||
LogWebSocketMessage msg = new LogWebSocketMessage(LogMessageType.ERROR, data);
|
||||
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
|
||||
|
||||
} catch (IOException e) {
|
||||
if (session.isOpen()) {
|
||||
log.error("发送错误消息失败: sessionId={}", session.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,116 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log;
|
||||
|
||||
import com.qqchen.deploy.backend.deploy.enums.RuntimeTypeEnum;
|
||||
import com.qqchen.deploy.backend.framework.enums.AuthTypeEnum;
|
||||
import com.qqchen.deploy.backend.framework.enums.OsTypeEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志流目标信息
|
||||
* 封装日志流的连接目标信息
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class LogStreamTarget {
|
||||
|
||||
/**
|
||||
* 运行时类型
|
||||
*/
|
||||
private RuntimeTypeEnum runtimeType;
|
||||
|
||||
/**
|
||||
* 实例名称(Pod名称/容器名称/进程名称)
|
||||
*/
|
||||
private String name;
|
||||
|
||||
/**
|
||||
* 日志行数
|
||||
*/
|
||||
private Integer lines;
|
||||
|
||||
// ========== K8S相关字段 ==========
|
||||
|
||||
/**
|
||||
* K8S系统ID
|
||||
*/
|
||||
private Long k8sSystemId;
|
||||
|
||||
/**
|
||||
* K8S命名空间
|
||||
*/
|
||||
private String k8sNamespace;
|
||||
|
||||
/**
|
||||
* K8S Deployment ID
|
||||
*/
|
||||
private Long k8sDeploymentId;
|
||||
|
||||
// ========== Docker/Server相关字段(需要SSH) ==========
|
||||
|
||||
/**
|
||||
* 服务器主机地址
|
||||
*/
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* SSH端口
|
||||
*/
|
||||
private Integer port;
|
||||
|
||||
/**
|
||||
* SSH用户名
|
||||
*/
|
||||
private String username;
|
||||
|
||||
/**
|
||||
* 认证类型
|
||||
*/
|
||||
private AuthTypeEnum authType;
|
||||
|
||||
/**
|
||||
* SSH密码
|
||||
*/
|
||||
private String password;
|
||||
|
||||
/**
|
||||
* SSH私钥
|
||||
*/
|
||||
private String privateKey;
|
||||
|
||||
/**
|
||||
* SSH私钥密码
|
||||
*/
|
||||
private String passphrase;
|
||||
|
||||
/**
|
||||
* 操作系统类型
|
||||
*/
|
||||
private OsTypeEnum osType;
|
||||
|
||||
// ========== Server特有字段 ==========
|
||||
|
||||
/**
|
||||
* 日志文件路径(Server类型)
|
||||
*/
|
||||
private String logFilePath;
|
||||
|
||||
// ========== 元数据 ==========
|
||||
|
||||
/**
|
||||
* 团队应用ID
|
||||
*/
|
||||
private Long teamAppId;
|
||||
|
||||
/**
|
||||
* 服务器ID
|
||||
*/
|
||||
private Long serverId;
|
||||
}
|
||||
@ -0,0 +1,90 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.qqchen.deploy.backend.framework.enums.LogMessageType;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 日志WebSocket消息包装类
|
||||
* 统一的消息格式,包含type和data字段
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class LogWebSocketMessage {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private LogMessageType type;
|
||||
|
||||
/**
|
||||
* 消息数据(动态内容)
|
||||
* 客户端→服务端:包含request字段
|
||||
* 服务端→客户端:包含response字段
|
||||
*/
|
||||
private Map<String, Object> data;
|
||||
|
||||
/**
|
||||
* 从data中提取request对象(强类型)
|
||||
*
|
||||
* @param clazz Request类型
|
||||
* @param <T> 泛型类型
|
||||
* @return Request对象
|
||||
*/
|
||||
public <T> T getRequest(Class<T> clazz) {
|
||||
if (data == null || !data.containsKey("request")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Object request = data.get("request");
|
||||
if (request == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 如果已经是目标类型,直接返回
|
||||
if (clazz.isInstance(request)) {
|
||||
return clazz.cast(request);
|
||||
}
|
||||
|
||||
// 否则通过Jackson转换
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.convertValue(request, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从data中提取response对象(强类型)
|
||||
*
|
||||
* @param clazz Response类型
|
||||
* @param <T> 泛型类型
|
||||
* @return Response对象
|
||||
*/
|
||||
public <T> T getResponse(Class<T> clazz) {
|
||||
if (data == null || !data.containsKey("response")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Object response = data.get("response");
|
||||
if (response == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 如果已经是目标类型,直接返回
|
||||
if (clazz.isInstance(response)) {
|
||||
return clazz.cast(response);
|
||||
}
|
||||
|
||||
// 否则通过Jackson转换
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.convertValue(response, clazz);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,31 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log.request;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.enums.LogControlAction;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志流控制请求
|
||||
* 客户端发送CONTROL消息时使用
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogControlRequest {
|
||||
|
||||
/**
|
||||
* 控制动作
|
||||
*/
|
||||
private LogControlAction action;
|
||||
|
||||
/**
|
||||
* 验证请求参数
|
||||
*/
|
||||
public boolean isValid() {
|
||||
return action != null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log.request;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志流启动请求
|
||||
* 客户端发送START消息时使用
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogStreamRequest {
|
||||
|
||||
/**
|
||||
* 实例名称(可选)
|
||||
* - K8S: Pod名称(可选,不传则使用TeamApplication配置的默认值)
|
||||
* - Docker: 容器名称(可选,不传则使用TeamApplication配置的默认值)
|
||||
* - Server: 不使用此字段
|
||||
*/
|
||||
private String name;
|
||||
|
||||
/**
|
||||
* 日志行数(默认100)
|
||||
*/
|
||||
private Integer lines;
|
||||
|
||||
/**
|
||||
* 验证请求参数
|
||||
* 注意:name可以为空,lines也可以为空(使用默认值100)
|
||||
*/
|
||||
public boolean isValid() {
|
||||
// lines为null时使用默认值,为非null时必须大于0
|
||||
return lines == null || lines > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取日志行数(如果为null则返回默认值100)
|
||||
*/
|
||||
public Integer getLines() {
|
||||
return lines != null ? lines : 100;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log.response;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志错误响应
|
||||
* 服务端推送ERROR消息时使用
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogErrorResponse {
|
||||
|
||||
/**
|
||||
* 错误消息
|
||||
*/
|
||||
private String error;
|
||||
}
|
||||
@ -0,0 +1,28 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log.response;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志行响应
|
||||
* 服务端推送LOG消息时使用
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogLineResponse {
|
||||
|
||||
/**
|
||||
* 日志时间戳(RFC3339格式)
|
||||
*/
|
||||
private String timestamp;
|
||||
|
||||
/**
|
||||
* 日志内容
|
||||
*/
|
||||
private String content;
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package com.qqchen.deploy.backend.framework.websocket.log.response;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.enums.LogStatusEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 日志状态响应
|
||||
* 服务端推送STATUS消息时使用
|
||||
*
|
||||
* @author Framework
|
||||
* @since 2025-12-16
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LogStatusResponse {
|
||||
|
||||
/**
|
||||
* 日志流状态
|
||||
*/
|
||||
private LogStatusEnum status;
|
||||
}
|
||||
@ -317,6 +317,7 @@ k8s.kubeconfig.empty=kubeconfig内容不能为空,请提供有效的kubeconfig
|
||||
k8s.namespace.not.found=K8S命名空间不存在
|
||||
k8s.deployment.not.found=K8S Deployment不存在
|
||||
k8s.pod.not.found=K8S Pod不存在
|
||||
k8s.pod.name.not.found=指定的Pod "{0}" 不存在,请检查Pod名称是否正确
|
||||
k8s.resource.not.found=K8S资源不存在
|
||||
|
||||
# K8S API错误
|
||||
@ -330,3 +331,19 @@ k8s.deployment.sync.failed=K8S Deployment同步失败,请检查集群配置和
|
||||
|
||||
# K8S操作错误
|
||||
k8s.operation.failed=K8S操作失败,请稍后重试
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# Docker集成相关 (Docker Integration) - 3240-3259
|
||||
# --------------------------------------------------------------------------------------
|
||||
docker.server.not.found=Docker服务器不存在,请检查服务器配置
|
||||
docker.server.unreachable=Docker服务器 {0} 无法连接,请检查服务器状态和网络连接
|
||||
docker.container.not.found=Docker容器 "{0}" 不存在,请检查容器名称是否正确
|
||||
docker.container.not.running=Docker容器 "{0}" 未运行,当前状态: {1}
|
||||
docker.log.query.not.implemented=Docker日志查询功能待实现,请联系管理员
|
||||
|
||||
# --------------------------------------------------------------------------------------
|
||||
# 服务器日志查询相关 (Server Log Query) - 3260-3279
|
||||
# --------------------------------------------------------------------------------------
|
||||
server.not.found.runtime=服务器不存在,请检查服务器配置
|
||||
server.unreachable=服务器 {0} 无法连接,请检查服务器状态和网络连接
|
||||
server.log.query.not.implemented=服务器日志查询功能待实现,请联系管理员
|
||||
|
||||
@ -258,6 +258,7 @@ k8s.kubeconfig.empty=Kubeconfig content cannot be empty, please provide valid ku
|
||||
k8s.namespace.not.found=K8S namespace not found
|
||||
k8s.deployment.not.found=K8S deployment not found
|
||||
k8s.pod.not.found=K8S pod not found
|
||||
k8s.pod.name.not.found=Specified pod "{0}" not found, please check the pod name
|
||||
k8s.resource.not.found=K8S resource not found
|
||||
|
||||
# K8S API Errors
|
||||
@ -273,7 +274,13 @@ k8s.deployment.sync.failed=K8S deployment synchronization failed, please check c
|
||||
k8s.operation.failed=K8S operation failed, please try again later
|
||||
|
||||
# Docker Integration Errors
|
||||
docker.server.not.found=Docker server not found, please check server configuration
|
||||
docker.server.unreachable=Docker server {0} is unreachable, please check server status and network connection
|
||||
docker.container.not.found=Docker container "{0}" not found, please check container name
|
||||
docker.container.not.running=Docker container "{0}" is not running, current status: {1}
|
||||
docker.log.query.not.implemented=Docker log query feature not implemented yet, please contact administrator
|
||||
|
||||
# Server Log Query Errors
|
||||
server.not.found.runtime=Server not found, please check server configuration
|
||||
server.unreachable=Server {0} is unreachable, please check server status and network connection
|
||||
server.log.query.not.implemented=Server log query feature not implemented yet, please contact administrator
|
||||
|
||||
@ -258,6 +258,7 @@ k8s.kubeconfig.empty=kubeconfig内容不能为空,请提供有效的kubeconfig
|
||||
k8s.namespace.not.found=K8S命名空间不存在
|
||||
k8s.deployment.not.found=K8S Deployment不存在
|
||||
k8s.pod.not.found=K8S Pod不存在
|
||||
k8s.pod.name.not.found=指定的Pod "{0}" 不存在,请检查Pod名称是否正确
|
||||
k8s.resource.not.found=K8S资源不存在
|
||||
|
||||
# K8S API错误
|
||||
@ -273,7 +274,13 @@ k8s.deployment.sync.failed=K8S Deployment同步失败,请检查集群配置和
|
||||
k8s.operation.failed=K8S操作失败,请稍后重试
|
||||
|
||||
# Docker集成错误
|
||||
docker.server.not.found=Docker服务器不存在,请检查服务器配置
|
||||
docker.server.unreachable=Docker服务器 {0} 无法连接,请检查服务器状态和网络连接
|
||||
docker.container.not.found=Docker容器 "{0}" 不存在,请检查容器名称是否正确
|
||||
docker.container.not.running=Docker容器 "{0}" 未运行,当前状态: {1}
|
||||
docker.log.query.not.implemented=Docker日志查询功能待实现,请联系管理员
|
||||
|
||||
# 服务器日志查询错误
|
||||
server.not.found.runtime=服务器不存在,请检查服务器配置
|
||||
server.unreachable=服务器 {0} 无法连接,请检查服务器状态和网络连接
|
||||
server.log.query.not.implemented=服务器日志查询功能待实现,请联系管理员
|
||||
|
||||
Loading…
Reference in New Issue
Block a user