diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java index 2fe9d67e..ac651de8 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/IK8sServiceIntegration.java @@ -241,15 +241,6 @@ public interface IK8sServiceIntegration extends IExternalSystemIntegration { } } - /** - * 获取K8S ApiClient(带缓存) - * 用于直接调用Kubernetes Java Client API - * - * @param system K8S系统配置 - * @return ApiClient实例 - */ - io.kubernetes.client.openapi.ApiClient getApiClient(ExternalSystem system); - /** * 获取用于日志流的K8S ApiClient(长超时) * 日志流是长连接,需要更长的读取超时时间 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/BaseExternalSystemIntegration.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/BaseExternalSystemIntegration.java index 043cd6e1..aa6f3c3e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/BaseExternalSystemIntegration.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/BaseExternalSystemIntegration.java @@ -6,23 +6,252 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + /** * 外部系统集成基类 - * 提供统一的密码解密功能 - * + * 提供统一的凭证缓存管理和敏感数据解密功能 + * + *

子类通过泛型指定凭证类型:

+ * + * + * @param 凭证类型(Credential Type) * @author qqchen * @since 2025-11-11 */ @Slf4j -public abstract class BaseExternalSystemIntegration { +public abstract class BaseExternalSystemIntegration { @Resource protected SensitiveDataEncryptor encryptor; + /** + * 凭证缓存 - 按系统ID存储 + */ + private final Map> credentialCacheMap = new ConcurrentHashMap<>(); + + /** + * 缓存锁 - 避免synchronized无限等待 + */ + private final ReentrantLock cacheLock = new ReentrantLock(); + + /** + * 锁超时时间(秒) + */ + private static final long LOCK_TIMEOUT_SECONDS = 3; + + // ==================== 凭证缓存结构 ==================== + + /** + * 凭证缓存包装类 + * 统一存储凭证对象、解密后的系统信息和过期时间 + */ + protected static class CredentialCache { + private final C credential; + private final ExternalSystem decryptedSystem; + private final long expireTime; + + public CredentialCache(C credential, ExternalSystem decryptedSystem, long expireTimeMs) { + this.credential = credential; + this.decryptedSystem = decryptedSystem; + this.expireTime = System.currentTimeMillis() + expireTimeMs; + } + + public C getCredential() { + return credential; + } + + public ExternalSystem getDecryptedSystem() { + return decryptedSystem; + } + + public boolean isExpired() { + return System.currentTimeMillis() > expireTime; + } + + public long getExpireTime() { + return expireTime; + } + } + + // ==================== 抽象方法 - 子类必须实现 ==================== + + /** + * 创建凭证对象 + * 子类负责:基于已解密的系统信息创建特定类型的凭证 + * + * @param decryptedSystem 已解密的系统配置(密码/Token已是明文) + * @return 凭证对象 + */ + protected abstract C createCredential(ExternalSystem decryptedSystem); + + /** + * 获取凭证缓存过期时间(毫秒) + * 不同系统可以有不同的过期策略 + * + * @return 过期时间(毫秒) + */ + protected abstract long getCredentialExpireTimeMs(); + + // ==================== 可选覆盖方法 ==================== + + /** + * 凭证过期时的清理操作 + * 子类可覆盖此方法进行资源释放(如关闭连接池) + * + * @param credential 过期的凭证对象 + */ + protected void onCredentialExpired(C credential) { + // 默认空实现,子类按需覆盖 + } + + // ==================== 通用凭证获取方法 ==================== + + /** + * 获取凭证(带缓存、锁、降级策略) + * + *

实现逻辑:

+ *
    + *
  1. 快速路径:无锁检查缓存是否有效
  2. + *
  3. 尝试获取锁(最多等待3秒)
  4. + *
  5. 双重检查:获取锁后再次检查缓存
  6. + *
  7. 统一解密 + 创建新凭证并缓存
  8. + *
  9. 降级策略:获取锁超时时返回过期缓存
  10. + *
+ * + * @param system 系统配置(包含加密数据) + * @return 凭证对象 + */ + protected final C getCredential(ExternalSystem system) { + Long systemId = system.getId(); + + // 1. 快速路径:无锁检查缓存 + CredentialCache cache = credentialCacheMap.get(systemId); + if (cache != null && !cache.isExpired()) { + return cache.getCredential(); + } + + // 2. 缓存不存在或已过期,尝试获取锁来更新 + boolean lockAcquired = false; + try { + lockAcquired = cacheLock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + if (lockAcquired) { + // 3. 双重检查:其他线程可能已更新缓存 + cache = credentialCacheMap.get(systemId); + if (cache == null || cache.isExpired()) { + log.debug("凭证缓存失效,重新创建: systemId={}, systemType={}", + systemId, getClass().getSimpleName()); + + // 关闭旧凭证(如K8S需要关闭连接池) + if (cache != null) { + try { + onCredentialExpired(cache.getCredential()); + } catch (Exception e) { + log.warn("关闭过期凭证时发生异常: systemId={}, error={}", systemId, e.getMessage()); + } + } + + // 4. 统一解密 + 创建新凭证并缓存 + ExternalSystem decryptedSystem = decryptSystem(system); + C newCredential = createCredential(decryptedSystem); + cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs()); + credentialCacheMap.put(systemId, cache); + + log.debug("凭证缓存已更新: systemId={}, expireTime={}", systemId, cache.getExpireTime()); + } + return cache.getCredential(); + } else { + // 5. 获取锁超时,尝试降级策略 + log.warn("获取凭证缓存锁超时({}秒): systemId={}, systemType={}", + LOCK_TIMEOUT_SECONDS, systemId, getClass().getSimpleName()); + + cache = credentialCacheMap.get(systemId); + if (cache != null) { + log.warn("使用过期凭证作为降级: systemId={}", systemId); + return cache.getCredential(); + } + + // 无缓存可用,同步创建(最后保底) + log.warn("无可用凭证缓存,同步创建: systemId={}", systemId); + ExternalSystem decryptedSystem = decryptSystem(system); + C newCredential = createCredential(decryptedSystem); + cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs()); + credentialCacheMap.put(systemId, cache); + return newCredential; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("获取凭证缓存锁被中断: systemId={}", systemId); + + // 被中断时尝试返回现有缓存 + cache = credentialCacheMap.get(systemId); + if (cache != null) { + return cache.getCredential(); + } + + // 无缓存可用,同步创建(最后保底) + log.warn("无可用凭证缓存,同步创建: systemId={}", systemId); + ExternalSystem decryptedSystem = decryptSystem(system); + C newCredential = createCredential(decryptedSystem); + cache = new CredentialCache<>(newCredential, decryptedSystem, getCredentialExpireTimeMs()); + credentialCacheMap.put(systemId, cache); + return newCredential; + } finally { + if (lockAcquired) { + cacheLock.unlock(); + } + } + } + + /** + * 获取缓存(包含凭证和解密后的系统信息) + * 用于需要同时访问凭证和解密系统信息的场景(如Jenkins需要用decryptedSystem构建HttpHeaders) + * + * @param system 系统配置 + * @return 缓存对象 + */ + protected final CredentialCache getCredentialCache(ExternalSystem system) { + Long systemId = system.getId(); + + // 先调用 getCredential 确保缓存已初始化 + getCredential(system); + + // 返回缓存 + return credentialCacheMap.get(systemId); + } + + /** + * 清除指定系统的凭证缓存 + * 用于连接失败后强制下次重新创建 + * + * @param systemId 系统ID + */ + protected final void clearCredentialCache(Long systemId) { + CredentialCache removed = credentialCacheMap.remove(systemId); + if (removed != null) { + log.debug("已清除凭证缓存: systemId={}", systemId); + try { + onCredentialExpired(removed.getCredential()); + } catch (Exception e) { + log.warn("清除缓存时关闭凭证发生异常: systemId={}, error={}", systemId, e.getMessage()); + } + } + } + + // ==================== 解密工具方法 ==================== + /** * 解密外部系统的敏感数据(密码和Token) * 创建一个新对象,不影响原对象 - * + * * @param system 原始的外部系统对象(包含加密的密码) * @return 解密后的外部系统对象 */ @@ -33,7 +262,7 @@ public abstract class BaseExternalSystemIntegration { // 创建新对象,避免修改原对象 ExternalSystem decrypted = new ExternalSystem(); - + // 复制所有属性 decrypted.setId(system.getId()); decrypted.setName(system.getName()); @@ -45,7 +274,7 @@ public abstract class BaseExternalSystemIntegration { decrypted.setConfig(system.getConfig()); decrypted.setRemark(system.getRemark()); decrypted.setSort(system.getSort()); - + // 解密密码 if (StringUtils.isNotBlank(system.getPassword())) { try { @@ -57,7 +286,7 @@ public abstract class BaseExternalSystemIntegration { decrypted.setPassword(system.getPassword()); } } - + // 解密Token if (StringUtils.isNotBlank(system.getToken())) { try { @@ -69,7 +298,7 @@ public abstract class BaseExternalSystemIntegration { decrypted.setToken(system.getToken()); } } - + return decrypted; } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/GitServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/GitServiceIntegrationImpl.java index a314529e..43107c5c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/GitServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/GitServiceIntegrationImpl.java @@ -17,7 +17,6 @@ import org.springframework.web.util.UriComponentsBuilder; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -26,13 +25,16 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration implements IGitServiceIntegration { +public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration implements IGitServiceIntegration { private final RestTemplate restTemplate; - // Git系统解密缓存 - 线程安全 - private static final Map SYSTEM_CACHE = new ConcurrentHashMap<>(); - private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; // 30分钟过期 + /** 缓存过期时间:30分钟 */ + private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; + + // GitLab API 分页配置 + private static final int PER_PAGE = 100; // GitLab API 最大值 + private static final int MAX_PAGES = 100; // 防止无限循环,最多获取 10000 条 public GitServiceIntegrationImpl() { // 配置RestTemplate超时,防止网络故障时长时间阻塞 @@ -42,58 +44,52 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp this.restTemplate = new RestTemplate(factory); } - // GitLab API 分页配置 - private static final int PER_PAGE = 100; // GitLab API 最大值 - private static final int MAX_PAGES = 100; // 防止无限循环,最多获取 10000 条 + // ==================== 实现基类抽象方法 ==================== /** - * Git系统缓存内部类 + * 创建Git认证凭证(HttpHeaders) + * 基于已解密的系统信息构建认证头 + * + * @param decryptedSystem 已解密的系统配置(基类统一解密) */ - private static class GitSystemCache { - final ExternalSystem decryptedSystem; - final long expireTime; + @Override + protected HttpHeaders createCredential(ExternalSystem decryptedSystem) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); - GitSystemCache(ExternalSystem system) { - this.decryptedSystem = system; - this.expireTime = System.currentTimeMillis() + CACHE_EXPIRE_TIME; + // 根据认证类型设置认证信息 + switch (decryptedSystem.getAuthType()) { + case BASIC -> { + String auth = decryptedSystem.getUsername() + ":" + decryptedSystem.getPassword(); + byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); + headers.set("Authorization", "Basic " + new String(encodedAuth)); + } + case TOKEN -> { + // Token认证 - GitLab使用Private Token + headers.set("PRIVATE-TOKEN", decryptedSystem.getToken()); + } + case OAUTH -> { + // OAuth认证 + headers.set("Authorization", "Bearer " + decryptedSystem.getToken()); + } + default -> throw new RuntimeException("Unsupported authentication type: " + decryptedSystem.getAuthType()); } - boolean isExpired() { - return System.currentTimeMillis() > expireTime; - } + log.debug("Git认证凭证已创建: systemId={}, authType={}", decryptedSystem.getId(), decryptedSystem.getAuthType()); + return headers; } - /** - * 线程安全地获取Git系统缓存 - * 如果缓存不存在或已过期,会重新解密 - */ - private synchronized GitSystemCache getSystemCache(ExternalSystem system) { - Long systemId = system.getId(); - GitSystemCache cache = SYSTEM_CACHE.get(systemId); - - if (cache == null || cache.isExpired()) { - log.debug("Git系统缓存失效,重新解密: systemId={}", systemId); - - // 解密系统信息(只在这里解密一次) - ExternalSystem decryptedSystem = decryptSystem(system); - - // 创建新缓存 - cache = new GitSystemCache(decryptedSystem); - SYSTEM_CACHE.put(systemId, cache); - - log.debug("Git系统缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime); - } - - return cache; + @Override + protected long getCredentialExpireTimeMs() { + return CACHE_EXPIRE_TIME; } @Override public boolean testConnection(ExternalSystem system) { try { - // 直接使用原始系统信息构建URL(URL不需要解密) String url = system.getUrl() + "/api/v4/version"; - // 创建请求头(内部自动处理解密) - HttpHeaders headers = createHeaders(system); + // 使用基类统一的凭证获取方法 + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); ResponseEntity response = restTemplate.exchange( @@ -106,6 +102,8 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp return response.getStatusCode() == HttpStatus.OK; } catch (Exception e) { log.error("Git connection test failed for system: {}", system.getName(), e); + // 连接失败时清除缓存,下次重新创建 + clearCredentialCache(system.getId()); return false; } } @@ -113,7 +111,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp @Override public List groups(ExternalSystem system) { List allGroups = new ArrayList<>(); - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); try { @@ -155,10 +153,8 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp @Override public List projects(ExternalSystem system) { try { - // 直接使用原始系统信息构建URL(URL不需要解密) String url = String.format("%s/api/v4/projects", system.getUrl()); - // 创建请求头(内部自动处理解密) - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); ResponseEntity> response = restTemplate.exchange( @@ -179,7 +175,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp @Override public List projectsByGroup(ExternalSystem system, Long groupId) { List allProjects = new ArrayList<>(); - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); try { @@ -221,7 +217,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp @Override public List branches(ExternalSystem system, Long projectId) { List allBranches = new ArrayList<>(); - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); try { @@ -266,41 +262,6 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp } } - /** - * 创建包含认证信息的完整请求头 - * 内部自动处理系统解密和缓存 - */ - private HttpHeaders createHeaders(ExternalSystem system) { - // 获取缓存的解密系统信息 - GitSystemCache cache = getSystemCache(system); - ExternalSystem decryptedSystem = cache.decryptedSystem; - - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - - // 根据认证类型设置认证信息(使用解密后的系统信息) - switch (decryptedSystem.getAuthType()) { - case BASIC -> { - // Basic认证 - String auth = decryptedSystem.getUsername() + ":" + decryptedSystem.getPassword(); - byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); - String authHeader = "Basic " + new String(encodedAuth); - headers.set("Authorization", authHeader); - } - case TOKEN -> { - // Token认证 - GitLab使用Private Token - headers.set("PRIVATE-TOKEN", decryptedSystem.getToken()); - } - case OAUTH -> { - // OAuth认证 - headers.set("Authorization", "Bearer " + decryptedSystem.getToken()); - } - default -> throw new RuntimeException("Unsupported authentication type: " + decryptedSystem.getAuthType()); - } - - return headers; - } - @Override public GitBranchResponse getBranch(ExternalSystem system, Long projectId, String branchName) { try { @@ -308,7 +269,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp String url = String.format("%s/api/v4/projects/%d/repository/branches/%s", system.getUrl(), projectId, branchName); - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); ResponseEntity response = restTemplate.exchange( @@ -337,7 +298,7 @@ public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration imp String url = String.format("%s/api/v4/projects/%d/repository/commits?ref_name=%s&per_page=%d", system.getUrl(), projectId, branchName, limit); - HttpHeaders headers = createHeaders(system); + HttpHeaders headers = getCredential(system); HttpEntity entity = new HttpEntity<>(headers); ResponseEntity> response = diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java index 3425945e..4760a991 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/JenkinsServiceIntegrationImpl.java @@ -56,29 +56,21 @@ import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; + import org.springframework.http.client.SimpleClientHttpRequestFactory; @Slf4j @Service -public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration implements IJenkinsServiceIntegration { +public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration implements IJenkinsServiceIntegration { @Resource private IExternalSystemRepository systemRepository; private final RestTemplate restTemplate; - // Jenkins Crumb缓存 - 线程安全 - private static final Map CRUMB_CACHE = new ConcurrentHashMap<>(); - - // 每个Jenkins系统独立的锁 - 避免不同系统间互相阻塞 - private static final Map SYSTEM_LOCKS = new ConcurrentHashMap<>(); + /** Crumb缓存过期时间:25分钟 */ + private static final long CRUMB_EXPIRE_TIME = 25 * 60 * 1000; - private static final long CRUMB_EXPIRE_TIME = 25 * 60 * 1000; // 25分钟过期 - private static final long LOCK_TIMEOUT_SECONDS = 3; // 锁超时时间:3秒 - public JenkinsServiceIntegrationImpl() { // 配置RestTemplate超时,防止网络故障时长时间阻塞 SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); @@ -87,106 +79,30 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration this.restTemplate = new RestTemplate(factory); } - /** - * Jenkins Crumb缓存内部类 - */ - private static class JenkinsCrumbCache { - final JenkinsCrumbIssuerResponse crumb; - - final ExternalSystem decryptedSystem; - - final long expireTime; - - JenkinsCrumbCache(JenkinsCrumbIssuerResponse crumb, ExternalSystem system) { - this.crumb = crumb; - this.decryptedSystem = system; - this.expireTime = System.currentTimeMillis() + CRUMB_EXPIRE_TIME; - } - - boolean isExpired() { - return System.currentTimeMillis() > expireTime; - } - } + // ==================== 实现基类抽象方法 ==================== /** - * 线程安全地获取Jenkins Crumb缓存 - * 使用ReentrantLock + tryLock实现带超时的锁机制,避免死锁 - * 如果缓存不存在或已过期,会重新获取和解密 + * 创建Jenkins Crumb凭证 + * 通过HTTP请求从Jenkins服务器获取Crumb + * + * @param decryptedSystem 已解密的系统配置(基类统一解密) */ - private JenkinsCrumbCache getCrumbCache(ExternalSystem system) { - Long systemId = system.getId(); - - // 快速路径:如果缓存有效,直接返回 - JenkinsCrumbCache cache = CRUMB_CACHE.get(systemId); - if (cache != null && !cache.isExpired()) { - return cache; - } - - // 获取该Jenkins系统的独立锁(公平锁,避免线程饥饿) - ReentrantLock lock = SYSTEM_LOCKS.computeIfAbsent(systemId, k -> new ReentrantLock(true)); - - try { - // 尝试获取锁,最多等待3秒 - if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - // 双重检查:获取锁后再次检查缓存 - cache = CRUMB_CACHE.get(systemId); - if (cache != null && !cache.isExpired()) { - log.debug("Jenkins Crumb缓存命中(双重检查): systemId={}", systemId); - return cache; - } - - log.debug("Jenkins Crumb缓存失效,重新获取: systemId={}", systemId); - - // 解密系统信息(只在这里解密一次) - ExternalSystem decryptedSystem = decryptSystem(system); - - // 获取新的crumb(网络I/O,但不持有synchronized锁) - JenkinsCrumbIssuerResponse crumb = fetchCrumbFromJenkins(decryptedSystem); - - // 创建新缓存 - cache = new JenkinsCrumbCache(crumb, decryptedSystem); - CRUMB_CACHE.put(systemId, cache); - - log.debug("Jenkins Crumb缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime); - return cache; - } finally { - lock.unlock(); - } - } else { - // 获取锁超时,使用降级策略 - log.warn("获取Jenkins Crumb锁超时({}秒),systemId={}", LOCK_TIMEOUT_SECONDS, systemId); - - // 降级策略:使用过期缓存(容忍短暂的认证失败) - cache = CRUMB_CACHE.get(systemId); - if (cache != null) { - log.warn("使用过期的Jenkins Crumb缓存作为降级: systemId={}, 过期时间差={}ms", - systemId, System.currentTimeMillis() - cache.expireTime); - return cache; - } - - // 没有缓存可用,抛出异常 - throw new BusinessException(ResponseCode.JENKINS_API_ERROR, - new Object[]{"CRUMB_LOCK_TIMEOUT", "获取Jenkins认证信息超时,请稍后重试"}); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("获取Jenkins Crumb锁被中断: systemId={}", systemId, e); - throw new BusinessException(ResponseCode.JENKINS_API_ERROR, - new Object[]{"CRUMB_LOCK_INTERRUPTED", "获取Jenkins认证信息被中断"}); - } - } - - /** - * 直接从Jenkins服务器获取Crumb(不经过缓存) - */ - private JenkinsCrumbIssuerResponse fetchCrumbFromJenkins(ExternalSystem decryptedSystem) { + @Override + protected JenkinsCrumbIssuerResponse createCredential(ExternalSystem decryptedSystem) { String url = decryptedSystem.getUrl() + "/crumbIssuer/api/json"; HttpHeaders headers = createBasicHeaders(decryptedSystem); HttpEntity entity = new HttpEntity<>(headers); ResponseEntity response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class); - return convertResponse(response); + JenkinsCrumbIssuerResponse crumb = convertResponse(response); + + log.debug("Jenkins Crumb凭证已创建: systemId={}", decryptedSystem.getId()); + return crumb; + } + + @Override + protected long getCredentialExpireTimeMs() { + return CRUMB_EXPIRE_TIME; } @Override @@ -199,8 +115,7 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration HttpHeaders headers = createHeaders(system); // 打印实际发送的请求头 - log.info("Authorization头: {}", headers.getFirst("Authorization")); - log.info("==================================="); + log.debug("Authorization头: {}", headers.getFirst("Authorization")); HttpEntity entity = new HttpEntity<>(headers); @@ -220,13 +135,13 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration /** * 创建包含认证信息和Jenkins Crumb的完整请求头 - * 内部自动处理缓存获取和解密 + * 使用基类统一的凭证缓存机制 */ private HttpHeaders createHeaders(ExternalSystem system) { - // 获取缓存的解密系统和crumb - JenkinsCrumbCache cache = getCrumbCache(system); - ExternalSystem decryptedSystem = cache.decryptedSystem; - JenkinsCrumbIssuerResponse crumb = cache.crumb; + // 获取缓存(包含已解密的系统信息和Crumb凭证) + CredentialCache cache = getCredentialCache(system); + ExternalSystem decryptedSystem = cache.getDecryptedSystem(); + JenkinsCrumbIssuerResponse crumb = cache.getCredential(); // 创建基础认证头 HttpHeaders headers = createBasicHeaders(decryptedSystem); @@ -519,7 +434,7 @@ public class JenkinsServiceIntegrationImpl extends BaseExternalSystemIntegration (String) executable.get("url") ); } - + // 还在队列中等待 return null; } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java index bfa4d679..a0bf01bb 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java @@ -32,70 +32,74 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; /** * K8S集成服务实现 + * 使用基类统一的凭证缓存管理机制 */ @Slf4j @Service -public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration implements IK8sServiceIntegration { - - // K8S ApiClient缓存 - 线程安全 - private static final Map API_CLIENT_CACHE = new ConcurrentHashMap<>(); +public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration implements IK8sServiceIntegration { // 日志流专用ApiClient缓存(长超时,按k8sSystemId复用) private static final Map LOG_STREAM_API_CLIENT_CACHE = new ConcurrentHashMap<>(); - private static final long CACHE_EXPIRE_TIME = 10 * 60 * 1000; // 10分钟过期(优化:缩短过期时间,避免持有失效连接) - + /** 缓存过期时间:10分钟(缩短过期时间,避免持有失效连接) */ + private static final long CACHE_EXPIRE_TIME = 10 * 60 * 1000; + + // ==================== 实现基类抽象方法 ==================== + /** - * K8S ApiClient缓存内部类 + * 创建K8S ApiClient凭证 + * 解析kubeconfig并强制使用HTTP/1.1协议 + * + * @param decryptedSystem 已解密的系统配置(基类统一解密) */ - private static class K8sApiClientCache { - final ApiClient apiClient; + @Override + protected ApiClient createCredential(ExternalSystem decryptedSystem) { + try { + String config = decryptedSystem.getConfig(); + if (config == null || config.trim().isEmpty()) { + throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY); + } - final long expireTime; - - K8sApiClientCache(ApiClient apiClient) { - this.apiClient = apiClient; - this.expireTime = System.currentTimeMillis() + CACHE_EXPIRE_TIME; - } - - boolean isExpired() { - return System.currentTimeMillis() > expireTime; + // 直接使用config作为kubeconfig内容 + ApiClient client = Config.fromConfig(new StringReader(config)); + client.setConnectTimeout(15000); // 15秒连接超时 + client.setReadTimeout(120000); // 120秒读取超时(优化日志查询等耗时操作) + + // 关键优化:强制使用 HTTP/1.1,避免 HTTP/2 连接复用问题 + // HTTP/2 在 K8S API 高并发场景下容易出现 ConnectionShutdownException、Broken pipe 等问题 + okhttp3.OkHttpClient originalHttpClient = client.getHttpClient(); + okhttp3.OkHttpClient httpClient = originalHttpClient.newBuilder() + .protocols(Arrays.asList(okhttp3.Protocol.HTTP_1_1)) // 强制 HTTP/1.1 + .retryOnConnectionFailure(true) // 连接失败时重试 + .build(); + client.setHttpClient(httpClient); + + log.debug("K8S ApiClient凭证已创建,使用HTTP/1.1协议: systemId={}", decryptedSystem.getId()); + return client; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("创建K8S ApiClient失败: systemId={}, error={}", decryptedSystem.getId(), e.getMessage(), e); + throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED); } } - + + @Override + protected long getCredentialExpireTimeMs() { + return CACHE_EXPIRE_TIME; + } + /** - * 线程安全地获取K8S ApiClient缓存 - * 如果缓存不存在或已过期,会重新创建 + * 凭证过期时关闭ApiClient,释放OkHttp资源 */ - private synchronized K8sApiClientCache getApiClientCache(ExternalSystem system) { - Long systemId = system.getId(); - K8sApiClientCache cache = API_CLIENT_CACHE.get(systemId); - - if (cache == null || cache.isExpired()) { - log.debug("K8S ApiClient缓存失效,重新创建: systemId={}", systemId); - - // 关闭旧的ApiClient以释放OkHttp资源 - if (cache != null && cache.isExpired()) { - log.debug("关闭过期的K8S ApiClient: systemId={}", systemId); - closeApiClient(cache.apiClient); - } - - try { - ApiClient apiClient = createApiClientInternal(system); - cache = new K8sApiClientCache(apiClient); - API_CLIENT_CACHE.put(systemId, cache); - log.debug("K8S ApiClient缓存已更新: systemId={}, expireTime={}", systemId, cache.expireTime); - } catch (Exception e) { - log.error("创建K8S ApiClient失败: systemId={}", systemId, e); - throw new BusinessException(ResponseCode.K8S_CONNECTION_FAILED); - } - } - - return cache; + @Override + protected void onCredentialExpired(ApiClient credential) { + closeApiClient(credential); } @Override @@ -116,9 +120,9 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY); } - // 使用缓存的ApiClient进行连接测试(优化:复用连接,减少资源开销) - K8sApiClientCache cache = getApiClientCache(system); - VersionApi versionApi = new VersionApi(cache.apiClient); + // 使用基类统一的凭证获取方法 + ApiClient apiClient = getCredential(system); + VersionApi versionApi = new VersionApi(apiClient); VersionInfo version = versionApi.getCode(); log.info("K8S集群连接成功,版本: {}", version.getGitVersion()); @@ -128,7 +132,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.error("K8S连接测试失败,集群: {}, 错误: {}", system.getName(), e.getMessage(), e); // 连接失败时清除缓存,下次重新创建 - API_CLIENT_CACHE.remove(system.getId()); + clearCredentialCache(system.getId()); return false; } @@ -142,8 +146,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.info("查询K8S命名空间,集群: {}", externalSystem.getName()); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - CoreV1Api api = new CoreV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + CoreV1Api api = new CoreV1Api(apiClient); V1NamespaceList namespaceList = api.listNamespace( null, null, null, null, null, null, null, null, null, null, null @@ -196,8 +200,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.info("查询K8S Deployment,集群: {}, 命名空间: {}", externalSystem.getName(), namespace); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - AppsV1Api api = new AppsV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + AppsV1Api api = new AppsV1Api(apiClient); V1DeploymentList deploymentList = api.listNamespacedDeployment( namespace, null, null, null, null, null, null, null, null, null, null, null @@ -275,8 +279,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.info("查询所有K8S Deployment,集群: {}", externalSystem.getName()); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - AppsV1Api api = new AppsV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + AppsV1Api api = new AppsV1Api(apiClient); V1DeploymentList deploymentList = api.listDeploymentForAllNamespaces( null, null, null, null, null, null, null, null, null, null, null @@ -353,8 +357,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp log.info("查询K8S Pod,集群: {}, 命名空间: {}", externalSystem.getName(), namespace); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - CoreV1Api api = new CoreV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + CoreV1Api api = new CoreV1Api(apiClient); V1PodList podList = api.listNamespacedPod( namespace, null, null, null, null, null, null, null, null, null, null, null @@ -384,10 +388,10 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp externalSystem.getName(), namespace, deploymentName); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); + ApiClient apiClient = getCredential(externalSystem); // 1. 先查询Deployment获取selector - AppsV1Api appsApi = new AppsV1Api(cache.apiClient); + AppsV1Api appsApi = new AppsV1Api(apiClient); V1Deployment deployment = appsApi.readNamespacedDeployment(deploymentName, namespace, null); if (deployment.getSpec() == null || deployment.getSpec().getSelector() == null @@ -396,7 +400,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp return new ArrayList<>(); } - // 2. 构建label selector + // 2. 构建Label selector Map matchLabels = deployment.getSpec().getSelector().getMatchLabels(); String labelSelector = matchLabels.entrySet().stream() .map(entry -> entry.getKey() + "=" + entry.getValue()) @@ -404,7 +408,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp .orElse(""); // 3. 使用label selector查询Pod - CoreV1Api coreApi = new CoreV1Api(cache.apiClient); + CoreV1Api coreApi = new CoreV1Api(apiClient); V1PodList podList = coreApi.listNamespacedPod( namespace, null, null, null, null, labelSelector, null, null, null, null, null, null ); @@ -441,8 +445,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp externalSystem.getName(), namespace, podName); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - CoreV1Api api = new CoreV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + CoreV1Api api = new CoreV1Api(apiClient); V1Pod pod = api.readNamespacedPod(podName, namespace, null); @@ -599,8 +603,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp externalSystem.getName(), namespace, podName, container, effectiveTail, effectiveSinceSeconds); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - CoreV1Api api = new CoreV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + CoreV1Api api = new CoreV1Api(apiClient); // 日志大小限制:10MB(防止OOM) Integer limitBytes = 10 * 1024 * 1024; @@ -681,8 +685,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp externalSystem.getName(), namespace, deploymentName); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - AppsV1Api api = new AppsV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + AppsV1Api api = new AppsV1Api(apiClient); // 生成ISO 8601格式的时间戳作为重启标记 String timestamp = java.time.format.DateTimeFormatter.ISO_INSTANT @@ -746,8 +750,8 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp externalSystem.getName(), namespace, deploymentName, replicas); try { - K8sApiClientCache cache = getApiClientCache(externalSystem); - AppsV1Api api = new AppsV1Api(cache.apiClient); + ApiClient apiClient = getCredential(externalSystem); + AppsV1Api api = new AppsV1Api(apiClient); // 构建patch内容:更新spec.replicas String patchBody = String.format("{\"spec\":{\"replicas\":%d}}", replicas); @@ -1057,26 +1061,6 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp return true; } - /** - * 创建K8S ApiClient(内部实现,不使用缓存) - * - * @param externalSystem K8S系统配置 - * @return ApiClient - */ - private ApiClient createApiClientInternal(ExternalSystem externalSystem) throws Exception { - String config = externalSystem.getConfig(); - - if (config == null || config.trim().isEmpty()) { - throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY); - } - - // 直接使用config作为kubeconfig内容 - ApiClient client = Config.fromConfig(new StringReader(config)); - client.setConnectTimeout(15000); // 15秒连接超时 - client.setReadTimeout(120000); // 120秒读取超时(优化日志查询等耗时操作) - return client; - } - /** * 提取Deployment中第一个容器的镜像 * @@ -1189,19 +1173,6 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp } } - /** - * 获取K8S ApiClient(带缓存) - * 复用内部缓存机制,避免重复创建连接 - * - * @param system K8S系统配置 - * @return ApiClient实例 - */ - @Override - public ApiClient getApiClient(ExternalSystem system) { - K8sApiClientCache cache = getApiClientCache(system); - return cache.apiClient; - } - /** * 获取用于日志流的K8S ApiClient(长超时,按集群缓存复用) * @@ -1221,6 +1192,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp /** * 创建日志流专用ApiClient(内部方法) + * 日志流是长连接,需要更长的读取超时,同样强制使用HTTP/1.1 */ private ApiClient createLogStreamApiClient(ExternalSystem system) { try { @@ -1232,8 +1204,16 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp ApiClient client = Config.fromConfig(new StringReader(config)); client.setConnectTimeout(15000); // 15秒连接超时 client.setReadTimeout(30 * 60 * 1000); // 30分钟读取超时(日志流长连接) + + // 关键优化:强制使用 HTTP/1.1,避免 HTTP/2 连接复用问题 + okhttp3.OkHttpClient originalHttpClient = client.getHttpClient(); + okhttp3.OkHttpClient httpClient = originalHttpClient.newBuilder() + .protocols(Arrays.asList(okhttp3.Protocol.HTTP_1_1)) // 强制 HTTP/1.1 + .retryOnConnectionFailure(true) // 连接失败时重试 + .build(); + client.setHttpClient(httpClient); - log.debug("日志流ApiClient创建完成,readTimeout=30分钟"); + log.debug("日志流ApiClient创建完成,readTimeout=30分钟,使用HTTP/1.1协议"); return client; } catch (BusinessException e) { throw e; diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java index 63ecef91..ecd420b4 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java @@ -97,9 +97,7 @@ public class ExternalSystemHealthCheckScheduler { config = new ServerMonitorNotificationConfig(); config.setNotificationChannelId(notificationChannelId); config.setResourceAlertTemplateId(resourceAlertTemplateId); - - log.debug("开始检查外部系统健康状态: channelId={}, alertTemplateId={}", - notificationChannelId, resourceAlertTemplateId); + log.debug("开始检查外部系统健康状态: channelId={}, alertTemplateId={}", notificationChannelId, resourceAlertTemplateId); } else { config = null; log.debug("开始检查外部系统健康状态(不发送通知)"); @@ -112,7 +110,7 @@ public class ExternalSystemHealthCheckScheduler { List systems = externalSystemRepository.findByDeletedFalseOrderBySort() .stream() .filter(system -> Boolean.TRUE.equals(system.getEnabled())) - .collect(Collectors.toList()); + .toList(); if (systems.isEmpty()) { log.debug("没有需要检查的外部系统,跳过健康检查"); @@ -125,7 +123,7 @@ public class ExternalSystemHealthCheckScheduler { List> futures = systems.stream() .map(system -> CompletableFuture.runAsync(() -> checkSingleSystem(system, config), externalSystemHealthCheckExecutor)) - .collect(Collectors.toList()); + .toList(); // 3. 等待所有检测完成(带超时控制,防止线程永久阻塞) CompletableFuture allFutures = CompletableFuture.allOf(