优化
This commit is contained in:
parent
60395e46ed
commit
2fbd77b74e
@ -241,15 +241,6 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取K8S ApiClient(带缓存)
|
||||
* 用于直接调用Kubernetes Java Client API
|
||||
*
|
||||
* @param system K8S系统配置
|
||||
* @return ApiClient实例
|
||||
*/
|
||||
io.kubernetes.client.openapi.ApiClient getApiClient(ExternalSystem system);
|
||||
|
||||
/**
|
||||
* 获取用于日志流的K8S ApiClient(长超时)
|
||||
* 日志流是长连接,需要更长的读取超时时间
|
||||
|
||||
@ -6,19 +6,248 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* 外部系统集成基类
|
||||
* 提供统一的密码解密功能
|
||||
* 提供统一的凭证缓存管理和敏感数据解密功能
|
||||
*
|
||||
* <p>子类通过泛型指定凭证类型:</p>
|
||||
* <ul>
|
||||
* <li>Git: HttpHeaders(包含PRIVATE-TOKEN)</li>
|
||||
* <li>K8S: ApiClient(包含kubeconfig配置)</li>
|
||||
* <li>Jenkins: JenkinsCrumbIssuerResponse(包含Crumb和Cookie)</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param <C> 凭证类型(Credential Type)
|
||||
* @author qqchen
|
||||
* @since 2025-11-11
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class BaseExternalSystemIntegration {
|
||||
public abstract class BaseExternalSystemIntegration<C> {
|
||||
|
||||
@Resource
|
||||
protected SensitiveDataEncryptor encryptor;
|
||||
|
||||
/**
|
||||
* 凭证缓存 - 按系统ID存储
|
||||
*/
|
||||
private final Map<Long, CredentialCache<C>> credentialCacheMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 缓存锁 - 避免synchronized无限等待
|
||||
*/
|
||||
private final ReentrantLock cacheLock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* 锁超时时间(秒)
|
||||
*/
|
||||
private static final long LOCK_TIMEOUT_SECONDS = 3;
|
||||
|
||||
// ==================== 凭证缓存结构 ====================
|
||||
|
||||
/**
|
||||
* 凭证缓存包装类
|
||||
* 统一存储凭证对象、解密后的系统信息和过期时间
|
||||
*/
|
||||
protected static class CredentialCache<C> {
|
||||
private final C credential;
|
||||
private final ExternalSystem decryptedSystem;
|
||||
private final long expireTime;
|
||||
|
||||
public CredentialCache(C credential, ExternalSystem decryptedSystem, long expireTimeMs) {
|
||||
this.credential = credential;
|
||||
this.decryptedSystem = decryptedSystem;
|
||||
this.expireTime = System.currentTimeMillis() + expireTimeMs;
|
||||
}
|
||||
|
||||
public C getCredential() {
|
||||
return credential;
|
||||
}
|
||||
|
||||
public ExternalSystem getDecryptedSystem() {
|
||||
return decryptedSystem;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
return System.currentTimeMillis() > expireTime;
|
||||
}
|
||||
|
||||
public long getExpireTime() {
|
||||
return expireTime;
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 抽象方法 - 子类必须实现 ====================
|
||||
|
||||
/**
|
||||
* 创建凭证对象
|
||||
* 子类负责:基于已解密的系统信息创建特定类型的凭证
|
||||
*
|
||||
* @param decryptedSystem 已解密的系统配置(密码/Token已是明文)
|
||||
* @return 凭证对象
|
||||
*/
|
||||
protected abstract C createCredential(ExternalSystem decryptedSystem);
|
||||
|
||||
/**
|
||||
* 获取凭证缓存过期时间(毫秒)
|
||||
* 不同系统可以有不同的过期策略
|
||||
*
|
||||
* @return 过期时间(毫秒)
|
||||
*/
|
||||
protected abstract long getCredentialExpireTimeMs();
|
||||
|
||||
// ==================== 可选覆盖方法 ====================
|
||||
|
||||
/**
|
||||
* 凭证过期时的清理操作
|
||||
* 子类可覆盖此方法进行资源释放(如关闭连接池)
|
||||
*
|
||||
* @param credential 过期的凭证对象
|
||||
*/
|
||||
protected void onCredentialExpired(C credential) {
|
||||
// 默认空实现,子类按需覆盖
|
||||
}
|
||||
|
||||
// ==================== 通用凭证获取方法 ====================
|
||||
|
||||
/**
|
||||
* 获取凭证(带缓存、锁、降级策略)
|
||||
*
|
||||
* <p>实现逻辑:</p>
|
||||
* <ol>
|
||||
* <li>快速路径:无锁检查缓存是否有效</li>
|
||||
* <li>尝试获取锁(最多等待3秒)</li>
|
||||
* <li>双重检查:获取锁后再次检查缓存</li>
|
||||
* <li>统一解密 + 创建新凭证并缓存</li>
|
||||
* <li>降级策略:获取锁超时时返回过期缓存</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param system 系统配置(包含加密数据)
|
||||
* @return 凭证对象
|
||||
*/
|
||||
protected final C getCredential(ExternalSystem system) {
|
||||
Long systemId = system.getId();
|
||||
|
||||
// 1. 快速路径:无锁检查缓存
|
||||
CredentialCache<C> cache = credentialCacheMap.get(systemId);
|
||||
if (cache != null && !cache.isExpired()) {
|
||||
return cache.getCredential();
|
||||
}
|
||||
|
||||
// 2. 缓存不存在或已过期,尝试获取锁来更新
|
||||
boolean lockAcquired = false;
|
||||
try {
|
||||
lockAcquired = cacheLock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (lockAcquired) {
|
||||
// 3. 双重检查:其他线程可能已更新缓存
|
||||
cache = credentialCacheMap.get(systemId);
|
||||
if (cache == null || cache.isExpired()) {
|
||||
log.debug("凭证缓存失效,重新创建: systemId={}, systemType={}",
|
||||
systemId, getClass().getSimpleName());
|
||||
|
||||
// 关闭旧凭证(如K8S需要关闭连接池)
|
||||
if (cache != null) {
|
||||
try {
|
||||
onCredentialExpired(cache.getCredential());
|
||||
} catch (Exception e) {
|
||||
log.warn("关闭过期凭证时发生异常: systemId={}, error={}", systemId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 统一解密 + 创建新凭证并缓存
|
||||
ExternalSystem decryptedSystem = decryptSystem(system);
|
||||
C newCredential = createCredential(decryptedSystem);
|
||||
cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs());
|
||||
credentialCacheMap.put(systemId, cache);
|
||||
|
||||
log.debug("凭证缓存已更新: systemId={}, expireTime={}", systemId, cache.getExpireTime());
|
||||
}
|
||||
return cache.getCredential();
|
||||
} else {
|
||||
// 5. 获取锁超时,尝试降级策略
|
||||
log.warn("获取凭证缓存锁超时({}秒): systemId={}, systemType={}",
|
||||
LOCK_TIMEOUT_SECONDS, systemId, getClass().getSimpleName());
|
||||
|
||||
cache = credentialCacheMap.get(systemId);
|
||||
if (cache != null) {
|
||||
log.warn("使用过期凭证作为降级: systemId={}", systemId);
|
||||
return cache.getCredential();
|
||||
}
|
||||
|
||||
// 无缓存可用,同步创建(最后保底)
|
||||
log.warn("无可用凭证缓存,同步创建: systemId={}", systemId);
|
||||
ExternalSystem decryptedSystem = decryptSystem(system);
|
||||
C newCredential = createCredential(decryptedSystem);
|
||||
cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs());
|
||||
credentialCacheMap.put(systemId, cache);
|
||||
return newCredential;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("获取凭证缓存锁被中断: systemId={}", systemId);
|
||||
|
||||
// 被中断时尝试返回现有缓存
|
||||
cache = credentialCacheMap.get(systemId);
|
||||
if (cache != null) {
|
||||
return cache.getCredential();
|
||||
}
|
||||
|
||||
// 无缓存可用,同步创建(最后保底)
|
||||
log.warn("无可用凭证缓存,同步创建: systemId={}", systemId);
|
||||
ExternalSystem decryptedSystem = decryptSystem(system);
|
||||
C newCredential = createCredential(decryptedSystem);
|
||||
cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs());
|
||||
credentialCacheMap.put(systemId, cache);
|
||||
return newCredential;
|
||||
} finally {
|
||||
if (lockAcquired) {
|
||||
cacheLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取缓存(包含凭证和解密后的系统信息)
|
||||
* 用于需要同时访问凭证和解密系统信息的场景(如Jenkins需要用decryptedSystem构建HttpHeaders)
|
||||
*
|
||||
* @param system 系统配置
|
||||
* @return 缓存对象
|
||||
*/
|
||||
protected final CredentialCache<C> getCredentialCache(ExternalSystem system) {
|
||||
Long systemId = system.getId();
|
||||
|
||||
// 先调用 getCredential 确保缓存已初始化
|
||||
getCredential(system);
|
||||
|
||||
// 返回缓存
|
||||
return credentialCacheMap.get(systemId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清除指定系统的凭证缓存
|
||||
* 用于连接失败后强制下次重新创建
|
||||
*
|
||||
* @param systemId 系统ID
|
||||
*/
|
||||
protected final void clearCredentialCache(Long systemId) {
|
||||
CredentialCache<C> removed = credentialCacheMap.remove(systemId);
|
||||
if (removed != null) {
|
||||
log.debug("已清除凭证缓存: systemId={}", systemId);
|
||||
try {
|
||||
onCredentialExpired(removed.getCredential());
|
||||
} catch (Exception e) {
|
||||
log.warn("清除缓存时关闭凭证发生异常: systemId={}, error={}", systemId, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 解密工具方法 ====================
|
||||
|
||||
/**
|
||||
* 解密外部系统的敏感数据(密码和Token)
|
||||
* 创建一个新对象,不影响原对象
|
||||
|
||||
@ -17,7 +17,6 @@ import org.springframework.web.util.UriComponentsBuilder;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -26,13 +25,16 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration implements IGitServiceIntegration {
|
||||
public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration<HttpHeaders> implements IGitServiceIntegration {
|
||||
|
||||
private final RestTemplate restTemplate;
|
||||
|
||||
// Git系统解密缓存 - 线程安全
|
||||
private static final Map<Long, GitSystemCache> SYSTEM_CACHE = new ConcurrentHashMap<>();
|
||||
private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; // 30分钟过期
|
||||
/** 缓存过期时间:30分钟 */
|
||||
private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000;
|
||||
|
||||
// GitLab API 分页配置
|
||||
private static final int PER_PAGE = 100; // GitLab API 最大值
|
||||
private static final int MAX_PAGES = 100; // 防止无限循环,最多获取 10000 条
|
||||
|
||||
public GitServiceIntegrationImpl() {
|
||||
// 配置RestTemplate超时,防止网络故障时长时间阻塞
|
||||
@ -42,58 +44,52 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
this.restTemplate = new RestTemplate(factory);
|
||||
}
|
||||
|
||||
// GitLab API 分页配置
|
||||
private static final int PER_PAGE = 100; // GitLab API 最大值
|
||||
private static final int MAX_PAGES = 100; // 防止无限循环,最多获取 10000 条
|
||||
// ==================== 实现基类抽象方法 ====================
|
||||
|
||||
/**
|
||||
* Git系统缓存内部类
|
||||
* 创建Git认证凭证(HttpHeaders)
|
||||
* 基于已解密的系统信息构建认证头
|
||||
*
|
||||
* @param decryptedSystem 已解密的系统配置(基类统一解密)
|
||||
*/
|
||||
private static class GitSystemCache {
|
||||
final ExternalSystem decryptedSystem;
|
||||
final long expireTime;
|
||||
@Override
|
||||
protected HttpHeaders createCredential(ExternalSystem decryptedSystem) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
GitSystemCache(ExternalSystem system) {
|
||||
this.decryptedSystem = system;
|
||||
this.expireTime = System.currentTimeMillis() + CACHE_EXPIRE_TIME;
|
||||
// 根据认证类型设置认证信息
|
||||
switch (decryptedSystem.getAuthType()) {
|
||||
case BASIC -> {
|
||||
String auth = decryptedSystem.getUsername() + ":" + decryptedSystem.getPassword();
|
||||
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
|
||||
headers.set("Authorization", "Basic " + new String(encodedAuth));
|
||||
}
|
||||
case TOKEN -> {
|
||||
// Token认证 - GitLab使用Private Token
|
||||
headers.set("PRIVATE-TOKEN", decryptedSystem.getToken());
|
||||
}
|
||||
case OAUTH -> {
|
||||
// OAuth认证
|
||||
headers.set("Authorization", "Bearer " + decryptedSystem.getToken());
|
||||
}
|
||||
default -> throw new RuntimeException("Unsupported authentication type: " + decryptedSystem.getAuthType());
|
||||
}
|
||||
|
||||
boolean isExpired() {
|
||||
return System.currentTimeMillis() > expireTime;
|
||||
}
|
||||
log.debug("Git认证凭证已创建: systemId={}, authType={}", decryptedSystem.getId(), decryptedSystem.getAuthType());
|
||||
return headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* 线程安全地获取Git系统缓存
|
||||
* 如果缓存不存在或已过期,会重新解密
|
||||
*/
|
||||
private synchronized GitSystemCache getSystemCache(ExternalSystem system) {
|
||||
Long systemId = system.getId();
|
||||
GitSystemCache cache = SYSTEM_CACHE.get(systemId);
|
||||
|
||||
if (cache == null || cache.isExpired()) {
|
||||
log.debug("Git系统缓存失效,重新解密: systemId={}", systemId);
|
||||
|
||||
// 解密系统信息(只在这里解密一次)
|
||||
ExternalSystem decryptedSystem = decryptSystem(system);
|
||||
|
||||
// 创建新缓存
|
||||
cache = new GitSystemCache(decryptedSystem);
|
||||
SYSTEM_CACHE.put(systemId, cache);
|
||||
|
||||
log.debug("Git系统缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime);
|
||||
}
|
||||
|
||||
return cache;
|
||||
@Override
|
||||
protected long getCredentialExpireTimeMs() {
|
||||
return CACHE_EXPIRE_TIME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean testConnection(ExternalSystem system) {
|
||||
try {
|
||||
// 直接使用原始系统信息构建URL(URL不需要解密)
|
||||
String url = system.getUrl() + "/api/v4/version";
|
||||
// 创建请求头(内部自动处理解密)
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
// 使用基类统一的凭证获取方法
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
ResponseEntity<String> response = restTemplate.exchange(
|
||||
@ -106,6 +102,8 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
return response.getStatusCode() == HttpStatus.OK;
|
||||
} catch (Exception e) {
|
||||
log.error("Git connection test failed for system: {}", system.getName(), e);
|
||||
// 连接失败时清除缓存,下次重新创建
|
||||
clearCredentialCache(system.getId());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -113,7 +111,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
@Override
|
||||
public List<GitGroupResponse> groups(ExternalSystem system) {
|
||||
List<GitGroupResponse> allGroups = new ArrayList<>();
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
try {
|
||||
@ -155,10 +153,8 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
@Override
|
||||
public List<GitProjectResponse> projects(ExternalSystem system) {
|
||||
try {
|
||||
// 直接使用原始系统信息构建URL(URL不需要解密)
|
||||
String url = String.format("%s/api/v4/projects", system.getUrl());
|
||||
// 创建请求头(内部自动处理解密)
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
ResponseEntity<List<GitProjectResponse>> response = restTemplate.exchange(
|
||||
@ -179,7 +175,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
@Override
|
||||
public List<GitProjectResponse> projectsByGroup(ExternalSystem system, Long groupId) {
|
||||
List<GitProjectResponse> allProjects = new ArrayList<>();
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
try {
|
||||
@ -221,7 +217,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
@Override
|
||||
public List<GitBranchResponse> branches(ExternalSystem system, Long projectId) {
|
||||
List<GitBranchResponse> allBranches = new ArrayList<>();
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
try {
|
||||
@ -266,41 +262,6 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建包含认证信息的完整请求头
|
||||
* 内部自动处理系统解密和缓存
|
||||
*/
|
||||
private HttpHeaders createHeaders(ExternalSystem system) {
|
||||
// 获取缓存的解密系统信息
|
||||
GitSystemCache cache = getSystemCache(system);
|
||||
ExternalSystem decryptedSystem = cache.decryptedSystem;
|
||||
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
// 根据认证类型设置认证信息(使用解密后的系统信息)
|
||||
switch (decryptedSystem.getAuthType()) {
|
||||
case BASIC -> {
|
||||
// Basic认证
|
||||
String auth = decryptedSystem.getUsername() + ":" + decryptedSystem.getPassword();
|
||||
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
|
||||
String authHeader = "Basic " + new String(encodedAuth);
|
||||
headers.set("Authorization", authHeader);
|
||||
}
|
||||
case TOKEN -> {
|
||||
// Token认证 - GitLab使用Private Token
|
||||
headers.set("PRIVATE-TOKEN", decryptedSystem.getToken());
|
||||
}
|
||||
case OAUTH -> {
|
||||
// OAuth认证
|
||||
headers.set("Authorization", "Bearer " + decryptedSystem.getToken());
|
||||
}
|
||||
default -> throw new RuntimeException("Unsupported authentication type: " + decryptedSystem.getAuthType());
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GitBranchResponse getBranch(ExternalSystem system, Long projectId, String branchName) {
|
||||
try {
|
||||
@ -308,7 +269,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
String url = String.format("%s/api/v4/projects/%d/repository/branches/%s",
|
||||
system.getUrl(), projectId, branchName);
|
||||
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
ResponseEntity<GitBranchResponse> response = restTemplate.exchange(
|
||||
@ -337,7 +298,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
String url = String.format("%s/api/v4/projects/%d/repository/commits?ref_name=%s&per_page=%d",
|
||||
system.getUrl(), projectId, branchName, limit);
|
||||
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
HttpHeaders headers = getCredential(system);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
ResponseEntity<List<com.qqchen.deploy.backend.deploy.integration.response.GitCommitResponse>> response =
|
||||
|
||||
@ -56,28 +56,20 @@ import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.springframework.http.client.SimpleClientHttpRequestFactory;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration implements IJenkinsServiceIntegration {
|
||||
public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration<JenkinsCrumbIssuerResponse> implements IJenkinsServiceIntegration {
|
||||
|
||||
@Resource
|
||||
private IExternalSystemRepository systemRepository;
|
||||
|
||||
private final RestTemplate restTemplate;
|
||||
|
||||
// Jenkins Crumb缓存 - 线程安全
|
||||
private static final Map<Long, JenkinsCrumbCache> CRUMB_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
// 每个Jenkins系统独立的锁 - 避免不同系统间互相阻塞
|
||||
private static final Map<Long, ReentrantLock> SYSTEM_LOCKS = new ConcurrentHashMap<>();
|
||||
|
||||
private static final long CRUMB_EXPIRE_TIME = 25 * 60 * 1000; // 25分钟过期
|
||||
private static final long LOCK_TIMEOUT_SECONDS = 3; // 锁超时时间:3秒
|
||||
/** Crumb缓存过期时间:25分钟 */
|
||||
private static final long CRUMB_EXPIRE_TIME = 25 * 60 * 1000;
|
||||
|
||||
public JenkinsServiceIntegrationImpl() {
|
||||
// 配置RestTemplate超时,防止网络故障时长时间阻塞
|
||||
@ -87,106 +79,30 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration
|
||||
this.restTemplate = new RestTemplate(factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Jenkins Crumb缓存内部类
|
||||
*/
|
||||
private static class JenkinsCrumbCache {
|
||||
final JenkinsCrumbIssuerResponse crumb;
|
||||
|
||||
final ExternalSystem decryptedSystem;
|
||||
|
||||
final long expireTime;
|
||||
|
||||
JenkinsCrumbCache(JenkinsCrumbIssuerResponse crumb, ExternalSystem system) {
|
||||
this.crumb = crumb;
|
||||
this.decryptedSystem = system;
|
||||
this.expireTime = System.currentTimeMillis() + CRUMB_EXPIRE_TIME;
|
||||
}
|
||||
|
||||
boolean isExpired() {
|
||||
return System.currentTimeMillis() > expireTime;
|
||||
}
|
||||
}
|
||||
// ==================== 实现基类抽象方法 ====================
|
||||
|
||||
/**
|
||||
* 线程安全地获取Jenkins Crumb缓存
|
||||
* 使用ReentrantLock + tryLock实现带超时的锁机制,避免死锁
|
||||
* 如果缓存不存在或已过期,会重新获取和解密
|
||||
* 创建Jenkins Crumb凭证
|
||||
* 通过HTTP请求从Jenkins服务器获取Crumb
|
||||
*
|
||||
* @param decryptedSystem 已解密的系统配置(基类统一解密)
|
||||
*/
|
||||
private JenkinsCrumbCache getCrumbCache(ExternalSystem system) {
|
||||
Long systemId = system.getId();
|
||||
|
||||
// 快速路径:如果缓存有效,直接返回
|
||||
JenkinsCrumbCache cache = CRUMB_CACHE.get(systemId);
|
||||
if (cache != null && !cache.isExpired()) {
|
||||
return cache;
|
||||
}
|
||||
|
||||
// 获取该Jenkins系统的独立锁(公平锁,避免线程饥饿)
|
||||
ReentrantLock lock = SYSTEM_LOCKS.computeIfAbsent(systemId, k -> new ReentrantLock(true));
|
||||
|
||||
try {
|
||||
// 尝试获取锁,最多等待3秒
|
||||
if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
// 双重检查:获取锁后再次检查缓存
|
||||
cache = CRUMB_CACHE.get(systemId);
|
||||
if (cache != null && !cache.isExpired()) {
|
||||
log.debug("Jenkins Crumb缓存命中(双重检查): systemId={}", systemId);
|
||||
return cache;
|
||||
}
|
||||
|
||||
log.debug("Jenkins Crumb缓存失效,重新获取: systemId={}", systemId);
|
||||
|
||||
// 解密系统信息(只在这里解密一次)
|
||||
ExternalSystem decryptedSystem = decryptSystem(system);
|
||||
|
||||
// 获取新的crumb(网络I/O,但不持有synchronized锁)
|
||||
JenkinsCrumbIssuerResponse crumb = fetchCrumbFromJenkins(decryptedSystem);
|
||||
|
||||
// 创建新缓存
|
||||
cache = new JenkinsCrumbCache(crumb, decryptedSystem);
|
||||
CRUMB_CACHE.put(systemId, cache);
|
||||
|
||||
log.debug("Jenkins Crumb缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime);
|
||||
return cache;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
} else {
|
||||
// 获取锁超时,使用降级策略
|
||||
log.warn("获取Jenkins Crumb锁超时({}秒),systemId={}", LOCK_TIMEOUT_SECONDS, systemId);
|
||||
|
||||
// 降级策略:使用过期缓存(容忍短暂的认证失败)
|
||||
cache = CRUMB_CACHE.get(systemId);
|
||||
if (cache != null) {
|
||||
log.warn("使用过期的Jenkins Crumb缓存作为降级: systemId={}, 过期时间差={}ms",
|
||||
systemId, System.currentTimeMillis() - cache.expireTime);
|
||||
return cache;
|
||||
}
|
||||
|
||||
// 没有缓存可用,抛出异常
|
||||
throw new BusinessException(ResponseCode.JENKINS_API_ERROR,
|
||||
new Object[]{"CRUMB_LOCK_TIMEOUT", "获取Jenkins认证信息超时,请稍后重试"});
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("获取Jenkins Crumb锁被中断: systemId={}", systemId, e);
|
||||
throw new BusinessException(ResponseCode.JENKINS_API_ERROR,
|
||||
new Object[]{"CRUMB_LOCK_INTERRUPTED", "获取Jenkins认证信息被中断"});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 直接从Jenkins服务器获取Crumb(不经过缓存)
|
||||
*/
|
||||
private JenkinsCrumbIssuerResponse fetchCrumbFromJenkins(ExternalSystem decryptedSystem) {
|
||||
@Override
|
||||
protected JenkinsCrumbIssuerResponse createCredential(ExternalSystem decryptedSystem) {
|
||||
String url = decryptedSystem.getUrl() + "/crumbIssuer/api/json";
|
||||
HttpHeaders headers = createBasicHeaders(decryptedSystem);
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class);
|
||||
return convertResponse(response);
|
||||
JenkinsCrumbIssuerResponse crumb = convertResponse(response);
|
||||
|
||||
log.debug("Jenkins Crumb凭证已创建: systemId={}", decryptedSystem.getId());
|
||||
return crumb;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getCredentialExpireTimeMs() {
|
||||
return CRUMB_EXPIRE_TIME;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -199,8 +115,7 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration
|
||||
HttpHeaders headers = createHeaders(system);
|
||||
|
||||
// 打印实际发送的请求头
|
||||
log.info("Authorization头: {}", headers.getFirst("Authorization"));
|
||||
log.info("===================================");
|
||||
log.debug("Authorization头: {}", headers.getFirst("Authorization"));
|
||||
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
|
||||
@ -220,13 +135,13 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration
|
||||
|
||||
/**
|
||||
* 创建包含认证信息和Jenkins Crumb的完整请求头
|
||||
* 内部自动处理缓存获取和解密
|
||||
* 使用基类统一的凭证缓存机制
|
||||
*/
|
||||
private HttpHeaders createHeaders(ExternalSystem system) {
|
||||
// 获取缓存的解密系统和crumb
|
||||
JenkinsCrumbCache cache = getCrumbCache(system);
|
||||
ExternalSystem decryptedSystem = cache.decryptedSystem;
|
||||
JenkinsCrumbIssuerResponse crumb = cache.crumb;
|
||||
// 获取缓存(包含已解密的系统信息和Crumb凭证)
|
||||
CredentialCache<JenkinsCrumbIssuerResponse> cache = getCredentialCache(system);
|
||||
ExternalSystem decryptedSystem = cache.getDecryptedSystem();
|
||||
JenkinsCrumbIssuerResponse crumb = cache.getCredential();
|
||||
|
||||
// 创建基础认证头
|
||||
HttpHeaders headers = createBasicHeaders(decryptedSystem);
|
||||
|
||||
@ -32,70 +32,74 @@ import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* K8S集成服务实现
|
||||
* 使用基类统一的凭证缓存管理机制
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration implements IK8sServiceIntegration {
|
||||
|
||||
// K8S ApiClient缓存 - 线程安全
|
||||
private static final Map<Long, K8sApiClientCache> API_CLIENT_CACHE = new ConcurrentHashMap<>();
|
||||
public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration<ApiClient> implements IK8sServiceIntegration {
|
||||
|
||||
// 日志流专用ApiClient缓存(长超时,按k8sSystemId复用)
|
||||
private static final Map<Long, ApiClient> LOG_STREAM_API_CLIENT_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
private static final long CACHE_EXPIRE_TIME = 10 * 60 * 1000; // 10分钟过期(优化:缩短过期时间,避免持有失效连接)
|
||||
/** 缓存过期时间:10分钟(缩短过期时间,避免持有失效连接) */
|
||||
private static final long CACHE_EXPIRE_TIME = 10 * 60 * 1000;
|
||||
|
||||
// ==================== 实现基类抽象方法 ====================
|
||||
|
||||
/**
|
||||
* K8S ApiClient缓存内部类
|
||||
* 创建K8S ApiClient凭证
|
||||
* 解析kubeconfig并强制使用HTTP/1.1协议
|
||||
*
|
||||
* @param decryptedSystem 已解密的系统配置(基类统一解密)
|
||||
*/
|
||||
private static class K8sApiClientCache {
|
||||
final ApiClient apiClient;
|
||||
@Override
|
||||
protected ApiClient createCredential(ExternalSystem decryptedSystem) {
|
||||
try {
|
||||
String config = decryptedSystem.getConfig();
|
||||
if (config == null || config.trim().isEmpty()) {
|
||||
throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY);
|
||||
}
|
||||
|
||||
final long expireTime;
|
||||
// 直接使用config作为kubeconfig内容
|
||||
ApiClient client = Config.fromConfig(new StringReader(config));
|
||||
client.setConnectTimeout(15000); // 15秒连接超时
|
||||
client.setReadTimeout(120000); // 120秒读取超时(优化日志查询等耗时操作)
|
||||
|
||||
K8sApiClientCache(ApiClient apiClient) {
|
||||
this.apiClient = apiClient;
|
||||
this.expireTime = System.currentTimeMillis() + CACHE_EXPIRE_TIME;
|
||||
}
|
||||
// 关键优化:强制使用 HTTP/1.1,避免 HTTP/2 连接复用问题
|
||||
// HTTP/2 在 K8S API 高并发场景下容易出现 ConnectionShutdownException、Broken pipe 等问题
|
||||
okhttp3.OkHttpClient originalHttpClient = client.getHttpClient();
|
||||
okhttp3.OkHttpClient httpClient = originalHttpClient.newBuilder()
|
||||
.protocols(Arrays.asList(okhttp3.Protocol.HTTP_1_1)) // 强制 HTTP/1.1
|
||||
.retryOnConnectionFailure(true) // 连接失败时重试
|
||||
.build();
|
||||
client.setHttpClient(httpClient);
|
||||
|
||||
boolean isExpired() {
|
||||
return System.currentTimeMillis() > expireTime;
|
||||
log.debug("K8S ApiClient凭证已创建,使用HTTP/1.1协议: systemId={}", decryptedSystem.getId());
|
||||
return client;
|
||||
} catch (BusinessException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
log.error("创建K8S ApiClient失败: systemId={}, error={}", decryptedSystem.getId(), e.getMessage(), e);
|
||||
throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getCredentialExpireTimeMs() {
|
||||
return CACHE_EXPIRE_TIME;
|
||||
}
|
||||
|
||||
/**
|
||||
* 线程安全地获取K8S ApiClient缓存
|
||||
* 如果缓存不存在或已过期,会重新创建
|
||||
* 凭证过期时关闭ApiClient,释放OkHttp资源
|
||||
*/
|
||||
private synchronized K8sApiClientCache getApiClientCache(ExternalSystem system) {
|
||||
Long systemId = system.getId();
|
||||
K8sApiClientCache cache = API_CLIENT_CACHE.get(systemId);
|
||||
|
||||
if (cache == null || cache.isExpired()) {
|
||||
log.debug("K8S ApiClient缓存失效,重新创建: systemId={}", systemId);
|
||||
|
||||
// 关闭旧的ApiClient以释放OkHttp资源
|
||||
if (cache != null && cache.isExpired()) {
|
||||
log.debug("关闭过期的K8S ApiClient: systemId={}", systemId);
|
||||
closeApiClient(cache.apiClient);
|
||||
}
|
||||
|
||||
try {
|
||||
ApiClient apiClient = createApiClientInternal(system);
|
||||
cache = new K8sApiClientCache(apiClient);
|
||||
API_CLIENT_CACHE.put(systemId, cache);
|
||||
log.debug("K8S ApiClient缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime);
|
||||
} catch (Exception e) {
|
||||
log.error("创建K8S ApiClient失败: systemId={}", systemId, e);
|
||||
throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
return cache;
|
||||
@Override
|
||||
protected void onCredentialExpired(ApiClient credential) {
|
||||
closeApiClient(credential);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -116,9 +120,9 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY);
|
||||
}
|
||||
|
||||
// 使用缓存的ApiClient进行连接测试(优化:复用连接,减少资源开销)
|
||||
K8sApiClientCache cache = getApiClientCache(system);
|
||||
VersionApi versionApi = new VersionApi(cache.apiClient);
|
||||
// 使用基类统一的凭证获取方法
|
||||
ApiClient apiClient = getCredential(system);
|
||||
VersionApi versionApi = new VersionApi(apiClient);
|
||||
VersionInfo version = versionApi.getCode();
|
||||
log.info("K8S集群连接成功,版本: {}", version.getGitVersion());
|
||||
|
||||
@ -128,7 +132,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
log.error("K8S连接测试失败,集群: {}, 错误: {}", system.getName(), e.getMessage(), e);
|
||||
|
||||
// 连接失败时清除缓存,下次重新创建
|
||||
API_CLIENT_CACHE.remove(system.getId());
|
||||
clearCredentialCache(system.getId());
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -142,8 +146,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
log.info("查询K8S命名空间,集群: {}", externalSystem.getName());
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(apiClient);
|
||||
|
||||
V1NamespaceList namespaceList = api.listNamespace(
|
||||
null, null, null, null, null, null, null, null, null, null, null
|
||||
@ -196,8 +200,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
log.info("查询K8S Deployment,集群: {}, 命名空间: {}", externalSystem.getName(), namespace);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(apiClient);
|
||||
|
||||
V1DeploymentList deploymentList = api.listNamespacedDeployment(
|
||||
namespace, null, null, null, null, null, null, null, null, null, null, null
|
||||
@ -275,8 +279,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
log.info("查询所有K8S Deployment,集群: {}", externalSystem.getName());
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(apiClient);
|
||||
|
||||
V1DeploymentList deploymentList = api.listDeploymentForAllNamespaces(
|
||||
null, null, null, null, null, null, null, null, null, null, null
|
||||
@ -353,8 +357,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
log.info("查询K8S Pod,集群: {}, 命名空间: {}", externalSystem.getName(), namespace);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(apiClient);
|
||||
|
||||
V1PodList podList = api.listNamespacedPod(
|
||||
namespace, null, null, null, null, null, null, null, null, null, null, null
|
||||
@ -384,10 +388,10 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
externalSystem.getName(), namespace, deploymentName);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
|
||||
// 1. 先查询Deployment获取selector
|
||||
AppsV1Api appsApi = new AppsV1Api(cache.apiClient);
|
||||
AppsV1Api appsApi = new AppsV1Api(apiClient);
|
||||
V1Deployment deployment = appsApi.readNamespacedDeployment(deploymentName, namespace, null);
|
||||
|
||||
if (deployment.getSpec() == null || deployment.getSpec().getSelector() == null
|
||||
@ -396,7 +400,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
// 2. 构建label selector
|
||||
// 2. 构建Label selector
|
||||
Map<String, String> matchLabels = deployment.getSpec().getSelector().getMatchLabels();
|
||||
String labelSelector = matchLabels.entrySet().stream()
|
||||
.map(entry -> entry.getKey() + "=" + entry.getValue())
|
||||
@ -404,7 +408,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
.orElse("");
|
||||
|
||||
// 3. 使用label selector查询Pod
|
||||
CoreV1Api coreApi = new CoreV1Api(cache.apiClient);
|
||||
CoreV1Api coreApi = new CoreV1Api(apiClient);
|
||||
V1PodList podList = coreApi.listNamespacedPod(
|
||||
namespace, null, null, null, null, labelSelector, null, null, null, null, null, null
|
||||
);
|
||||
@ -441,8 +445,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
externalSystem.getName(), namespace, podName);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(apiClient);
|
||||
|
||||
V1Pod pod = api.readNamespacedPod(podName, namespace, null);
|
||||
|
||||
@ -599,8 +603,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
externalSystem.getName(), namespace, podName, container, effectiveTail, effectiveSinceSeconds);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
CoreV1Api api = new CoreV1Api(apiClient);
|
||||
|
||||
// 日志大小限制:10MB(防止OOM)
|
||||
Integer limitBytes = 10 * 1024 * 1024;
|
||||
@ -681,8 +685,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
externalSystem.getName(), namespace, deploymentName);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(apiClient);
|
||||
|
||||
// 生成ISO 8601格式的时间戳作为重启标记
|
||||
String timestamp = java.time.format.DateTimeFormatter.ISO_INSTANT
|
||||
@ -746,8 +750,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
externalSystem.getName(), namespace, deploymentName, replicas);
|
||||
|
||||
try {
|
||||
K8sApiClientCache cache = getApiClientCache(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(cache.apiClient);
|
||||
ApiClient apiClient = getCredential(externalSystem);
|
||||
AppsV1Api api = new AppsV1Api(apiClient);
|
||||
|
||||
// 构建patch内容:更新spec.replicas
|
||||
String patchBody = String.format("{\"spec\":{\"replicas\":%d}}", replicas);
|
||||
@ -1057,26 +1061,6 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建K8S ApiClient(内部实现,不使用缓存)
|
||||
*
|
||||
* @param externalSystem K8S系统配置
|
||||
* @return ApiClient
|
||||
*/
|
||||
private ApiClient createApiClientInternal(ExternalSystem externalSystem) throws Exception {
|
||||
String config = externalSystem.getConfig();
|
||||
|
||||
if (config == null || config.trim().isEmpty()) {
|
||||
throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY);
|
||||
}
|
||||
|
||||
// 直接使用config作为kubeconfig内容
|
||||
ApiClient client = Config.fromConfig(new StringReader(config));
|
||||
client.setConnectTimeout(15000); // 15秒连接超时
|
||||
client.setReadTimeout(120000); // 120秒读取超时(优化日志查询等耗时操作)
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取Deployment中第一个容器的镜像
|
||||
*
|
||||
@ -1189,19 +1173,6 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取K8S ApiClient(带缓存)
|
||||
* 复用内部缓存机制,避免重复创建连接
|
||||
*
|
||||
* @param system K8S系统配置
|
||||
* @return ApiClient实例
|
||||
*/
|
||||
@Override
|
||||
public ApiClient getApiClient(ExternalSystem system) {
|
||||
K8sApiClientCache cache = getApiClientCache(system);
|
||||
return cache.apiClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用于日志流的K8S ApiClient(长超时,按集群缓存复用)
|
||||
*
|
||||
@ -1221,6 +1192,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
|
||||
/**
|
||||
* 创建日志流专用ApiClient(内部方法)
|
||||
* 日志流是长连接,需要更长的读取超时,同样强制使用HTTP/1.1
|
||||
*/
|
||||
private ApiClient createLogStreamApiClient(ExternalSystem system) {
|
||||
try {
|
||||
@ -1233,7 +1205,15 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
|
||||
client.setConnectTimeout(15000); // 15秒连接超时
|
||||
client.setReadTimeout(30 * 60 * 1000); // 30分钟读取超时(日志流长连接)
|
||||
|
||||
log.debug("日志流ApiClient创建完成,readTimeout=30分钟");
|
||||
// 关键优化:强制使用 HTTP/1.1,避免 HTTP/2 连接复用问题
|
||||
okhttp3.OkHttpClient originalHttpClient = client.getHttpClient();
|
||||
okhttp3.OkHttpClient httpClient = originalHttpClient.newBuilder()
|
||||
.protocols(Arrays.asList(okhttp3.Protocol.HTTP_1_1)) // 强制 HTTP/1.1
|
||||
.retryOnConnectionFailure(true) // 连接失败时重试
|
||||
.build();
|
||||
client.setHttpClient(httpClient);
|
||||
|
||||
log.debug("日志流ApiClient创建完成,readTimeout=30分钟,使用HTTP/1.1协议");
|
||||
return client;
|
||||
} catch (BusinessException e) {
|
||||
throw e;
|
||||
|
||||
@ -97,9 +97,7 @@ public class ExternalSystemHealthCheckScheduler {
|
||||
config = new ServerMonitorNotificationConfig();
|
||||
config.setNotificationChannelId(notificationChannelId);
|
||||
config.setResourceAlertTemplateId(resourceAlertTemplateId);
|
||||
|
||||
log.debug("开始检查外部系统健康状态: channelId={}, alertTemplateId={}",
|
||||
notificationChannelId, resourceAlertTemplateId);
|
||||
log.debug("开始检查外部系统健康状态: channelId={}, alertTemplateId={}", notificationChannelId, resourceAlertTemplateId);
|
||||
} else {
|
||||
config = null;
|
||||
log.debug("开始检查外部系统健康状态(不发送通知)");
|
||||
@ -112,7 +110,7 @@ public class ExternalSystemHealthCheckScheduler {
|
||||
List<ExternalSystem> systems = externalSystemRepository.findByDeletedFalseOrderBySort()
|
||||
.stream()
|
||||
.filter(system -> Boolean.TRUE.equals(system.getEnabled()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
if (systems.isEmpty()) {
|
||||
log.debug("没有需要检查的外部系统,跳过健康检查");
|
||||
@ -125,7 +123,7 @@ public class ExternalSystemHealthCheckScheduler {
|
||||
List<CompletableFuture<Void>> futures = systems.stream()
|
||||
.map(system -> CompletableFuture.runAsync(() ->
|
||||
checkSingleSystem(system, config), externalSystemHealthCheckExecutor))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
|
||||
// 3. 等待所有检测完成(带超时控制,防止线程永久阻塞)
|
||||
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user