1.30 k8s pods查询
This commit is contained in:
parent
69a4dc8e1f
commit
e5f4fee51d
@ -21,6 +21,7 @@ import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
@ -31,6 +32,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -55,6 +59,10 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl<K8sDeployment, K8s
|
||||
@Resource
|
||||
private IK8sSyncHistoryService k8sSyncHistoryService;
|
||||
|
||||
@Resource
|
||||
@Qualifier("k8sTaskExecutor")
|
||||
private Executor k8sTaskExecutor;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Integer syncDeployments(ExternalSystem externalSystem, K8sNamespace namespace) {
|
||||
@ -135,7 +143,6 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl<K8sDeployment, K8s
|
||||
|
||||
@Override
|
||||
@Async("k8sTaskExecutor")
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncDeployments(Long externalSystemId) {
|
||||
log.info("异步同步K8S Deployment(所有命名空间),集群ID: {}", externalSystemId);
|
||||
|
||||
@ -154,17 +161,44 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl<K8sDeployment, K8s
|
||||
|
||||
List<K8sNamespace> namespaces = k8sNamespaceRepository.findByExternalSystemId(externalSystemId);
|
||||
|
||||
int totalSyncCount = 0;
|
||||
for (K8sNamespace namespace : namespaces) {
|
||||
int count = syncDeployments(externalSystem, namespace);
|
||||
totalSyncCount += count;
|
||||
}
|
||||
log.info("开始并行同步 {} 个命名空间的Deployment", namespaces.size());
|
||||
|
||||
// 并行处理所有namespace
|
||||
List<CompletableFuture<Integer>> futures = namespaces.stream()
|
||||
.map(namespace -> CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
log.debug("开始同步命名空间: {}", namespace.getNamespaceName());
|
||||
int count = syncDeployments(externalSystem, namespace);
|
||||
log.debug("命名空间 {} 同步完成,同步数量: {}", namespace.getNamespaceName(), count);
|
||||
return count;
|
||||
} catch (Exception e) {
|
||||
log.error("命名空间 {} 同步失败: {}", namespace.getNamespaceName(), e.getMessage(), e);
|
||||
// 单个namespace失败不影响其他namespace,返回0
|
||||
return 0;
|
||||
}
|
||||
}, k8sTaskExecutor))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 等待所有任务完成并汇总结果
|
||||
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
|
||||
futures.toArray(new CompletableFuture[0])
|
||||
);
|
||||
|
||||
// 阻塞等待所有任务完成
|
||||
allFutures.join();
|
||||
|
||||
// 汇总同步数量
|
||||
int totalSyncCount = futures.stream()
|
||||
.map(CompletableFuture::join)
|
||||
.mapToInt(Integer::intValue)
|
||||
.sum();
|
||||
|
||||
syncHistory.setStatus(ExternalSystemSyncStatus.SUCCESS);
|
||||
syncHistory.setEndTime(LocalDateTime.now());
|
||||
k8sSyncHistoryService.update(syncHistory.getId(), syncHistory);
|
||||
|
||||
log.info("K8S Deployment异步同步成功,集群ID: {}, 总同步数量: {}", externalSystemId, totalSyncCount);
|
||||
log.info("K8S Deployment并行同步成功,集群ID: {}, 命名空间数: {}, 总同步数量: {}",
|
||||
externalSystemId, namespaces.size(), totalSyncCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("K8S Deployment异步同步失败,集群ID: {}, 错误: {}", externalSystemId, e.getMessage(), e);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user