From e5f4fee51d32cd48145b9aed3bd4c4beaa17d03b Mon Sep 17 00:00:00 2001 From: dengqichen Date: Sat, 13 Dec 2025 21:31:01 +0800 Subject: [PATCH] =?UTF-8?q?1.30=20k8s=20pods=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/K8sDeploymentServiceImpl.java | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java index 80b85f80..39f4df4c 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/K8sDeploymentServiceImpl.java @@ -21,6 +21,7 @@ import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -31,6 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; @Slf4j @Service @@ -55,6 +59,10 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl namespaces = k8sNamespaceRepository.findByExternalSystemId(externalSystemId); - int totalSyncCount = 0; - for (K8sNamespace namespace : namespaces) { - int count = syncDeployments(externalSystem, namespace); - totalSyncCount += count; - } + log.info("开始并行同步 {} 个命名空间的Deployment", namespaces.size()); + + // 并行处理所有namespace + List> futures = namespaces.stream() + .map(namespace -> CompletableFuture.supplyAsync(() -> { + try { + log.debug("开始同步命名空间: {}", namespace.getNamespaceName()); + int count = syncDeployments(externalSystem, namespace); + log.debug("命名空间 {} 同步完成,同步数量: {}", namespace.getNamespaceName(), count); + return count; + } catch (Exception e) { + log.error("命名空间 {} 同步失败: {}", namespace.getNamespaceName(), e.getMessage(), e); + // 单个namespace失败不影响其他namespace,返回0 + return 0; + } + }, k8sTaskExecutor)) + .collect(Collectors.toList()); + + // 等待所有任务完成并汇总结果 + CompletableFuture allFutures = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0]) + ); + + // 阻塞等待所有任务完成 + allFutures.join(); + + // 汇总同步数量 + int totalSyncCount = futures.stream() + .map(CompletableFuture::join) + .mapToInt(Integer::intValue) + .sum(); syncHistory.setStatus(ExternalSystemSyncStatus.SUCCESS); syncHistory.setEndTime(LocalDateTime.now()); k8sSyncHistoryService.update(syncHistory.getId(), syncHistory); - log.info("K8S Deployment异步同步成功,集群ID: {}, 总同步数量: {}", externalSystemId, totalSyncCount); + log.info("K8S Deployment并行同步成功,集群ID: {}, 命名空间数: {}, 总同步数量: {}", + externalSystemId, namespaces.size(), totalSyncCount); } catch (Exception e) { log.error("K8S Deployment异步同步失败,集群ID: {}, 错误: {}", externalSystemId, e.getMessage(), e);