From 60395e46ed39ac6253ebb40f4a9c7be4cb613f9a Mon Sep 17 00:00:00 2001 From: dengqichen Date: Wed, 21 Jan 2026 17:49:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 17 +++++++++-- .../impl/K8sServiceIntegrationImpl.java | 10 +++---- .../ExternalSystemHealthCheckScheduler.java | 22 ++++++++++++++ .../scheduler/ServerMonitorScheduler.java | 22 ++++++++++++++ .../impl/K8sDeploymentServiceImpl.java | 28 ++++++++++++++++-- .../impl/RepositoryBranchServiceImpl.java | 29 +++++++++++++++++-- .../impl/RepositoryProjectServiceImpl.java | 19 ++++++++++-- 7 files changed, 133 insertions(+), 14 deletions(-) diff --git a/backend/pom.xml b/backend/pom.xml index 93fade5a..324bb058 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -282,11 +282,24 @@ 5.0.1 - + io.kubernetes client-java - 18.0.1 + 21.0.2-legacy + + + + + + com.squareup.okhttp3 + okhttp + 5.0.0-alpha.14 + + + com.squareup.okhttp3 + logging-interceptor + 5.0.0-alpha.14 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java index d1222a35..bfa4d679 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/integration/impl/K8sServiceIntegrationImpl.java @@ -146,7 +146,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp CoreV1Api api = new CoreV1Api(cache.apiClient); V1NamespaceList namespaceList = api.listNamespace( - null, null, null, null, null, null, null, null, null, null + null, null, null, null, null, null, null, null, null, null, null ); List namespaces = new ArrayList<>(); @@ -200,7 +200,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp AppsV1Api api = new AppsV1Api(cache.apiClient); V1DeploymentList deploymentList = api.listNamespacedDeployment( - namespace, null, null, null, null, null, null, null, null, null, null + namespace, null, null, null, null, null, null, null, null, null, null, null ); List deployments = new ArrayList<>(); @@ -279,7 +279,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp AppsV1Api api = new AppsV1Api(cache.apiClient); V1DeploymentList deploymentList = api.listDeploymentForAllNamespaces( - null, null, null, null, null, null, null, null, null, null + null, null, null, null, null, null, null, null, null, null, null ); List deployments = new ArrayList<>(); @@ -357,7 +357,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp CoreV1Api api = new CoreV1Api(cache.apiClient); V1PodList podList = api.listNamespacedPod( - namespace, null, null, null, null, null, null, null, null, null, null + namespace, null, null, null, null, null, null, null, null, null, null, null ); List pods = new ArrayList<>(); @@ -406,7 +406,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp // 3. 使用label selector查询Pod CoreV1Api coreApi = new CoreV1Api(cache.apiClient); V1PodList podList = coreApi.listNamespacedPod( - namespace, null, null, null, null, labelSelector, null, null, null, null, null + namespace, null, null, null, null, labelSelector, null, null, null, null, null, null ); List pods = new ArrayList<>(); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java index ffb10f00..63ecef91 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ExternalSystemHealthCheckScheduler.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -60,6 +61,9 @@ public class ExternalSystemHealthCheckScheduler { private static final String FAILURE_COUNT_KEY_PREFIX = "external_system:failure_count:"; private static final long FAILURE_COUNT_TTL_SECONDS = 5 * 60; // 失败计数过期时间(秒) + // 防重复执行标记 + private final AtomicBoolean isRunning = new AtomicBoolean(false); + /** * 检查所有外部系统的健康状态 * 此方法由定时任务管理系统调用 @@ -69,6 +73,24 @@ public class ExternalSystemHealthCheckScheduler { */ public void checkExternalSystemsHealth(Long notificationChannelId, Long resourceAlertTemplateId) { + // 防止重复执行:如果上一次检查还在执行中,跳过本次 + if (!isRunning.compareAndSet(false, true)) { + log.warn("上一次外部系统健康检查任务仍在执行中,跳过本次执行"); + return; + } + + try { + doCheckExternalSystemsHealth(notificationChannelId, resourceAlertTemplateId); + } finally { + isRunning.set(false); + } + } + + /** + * 执行健康检查的核心逻辑 + */ + private void doCheckExternalSystemsHealth(Long notificationChannelId, + Long resourceAlertTemplateId) { // 构建通知配置对象 final ServerMonitorNotificationConfig config; if (notificationChannelId != null) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ServerMonitorScheduler.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ServerMonitorScheduler.java index 4d80d8fc..bb36d581 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ServerMonitorScheduler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/scheduler/ServerMonitorScheduler.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -67,6 +68,9 @@ public class ServerMonitorScheduler { @Qualifier("serverMonitorExecutor") private Executor serverMonitorExecutor; + // 防重复执行标记 + private final AtomicBoolean isRunning = new AtomicBoolean(false); + /** * 采集所有在线服务器的监控数据 * 此方法由定时任务管理系统调用 @@ -76,6 +80,24 @@ public class ServerMonitorScheduler { */ public void collectServerMetrics(Long notificationChannelId, Long resourceAlertTemplateId) { + // 防止重复执行:如果上一次采集还在执行中,跳过本次 + if (!isRunning.compareAndSet(false, true)) { + log.warn("上一次服务器监控采集任务仍在执行中,跳过本次执行"); + return; + } + + try { + doCollectServerMetrics(notificationChannelId, resourceAlertTemplateId); + } finally { + isRunning.set(false); + } + } + + /** + * 执行服务器监控采集的核心逻辑 + */ + private void doCollectServerMetrics(Long notificationChannelId, + Long resourceAlertTemplateId) { // 构建通知配置对象 ServerMonitorNotificationConfig config = null; if (notificationChannelId != null) { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java index fdaf9764..d3830051 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java @@ -33,7 +33,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @Slf4j @@ -249,12 +252,31 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl f.cancel(true)); + throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED); + } catch (ExecutionException e) { + log.error("K8S Deployment同步执行异常", e); + throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED); + } // 汇总同步数量 int totalSyncCount = futures.stream() - .map(CompletableFuture::join) + .filter(f -> !f.isCancelled() && f.isDone()) + .map(f -> { + try { + return f.getNow(0); + } catch (Exception e) { + return 0; + } + }) .mapToInt(Integer::intValue) .sum(); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java index a45c4749..11ae01f5 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java @@ -34,6 +34,9 @@ import java.util.Map; import java.util.ArrayList; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -349,9 +352,31 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + try { + allFutures.get(5, TimeUnit.MINUTES); + } catch (TimeoutException e) { + log.error("分支同步超时(5分钟)", e); + futures.forEach(f -> f.cancel(true)); + throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED); + } catch (ExecutionException e) { + log.error("分支同步执行异常", e); + throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED); + } + List allBranchesToSave = futures.stream() - .map(CompletableFuture::join) + .filter(f -> !f.isCancelled() && f.isDone()) + .map(f -> { + try { + return f.getNow(List.of()); + } catch (Exception e) { + return List.of(); + } + }) .flatMap(List::stream) .collect(Collectors.toList()); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryProjectServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryProjectServiceImpl.java index 84d9e9aa..0248d356 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryProjectServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryProjectServiceImpl.java @@ -43,6 +43,9 @@ import java.util.ArrayList; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.Collections; @@ -255,8 +258,20 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl f.cancel(true)); + throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED); + } catch (ExecutionException e) { + log.error("项目同步执行异常", e); + throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED); + } // 批量保存项目 if (!projectsToSave.isEmpty()) {