This commit is contained in:
dengqichen 2026-01-21 17:49:39 +08:00
parent af8e638f0b
commit 60395e46ed
7 changed files with 133 additions and 14 deletions

View File

@ -282,11 +282,24 @@
<version>5.0.1</version> <version>5.0.1</version>
</dependency> </dependency>
<!-- Kubernetes Java Client (支持K8S 1.23-1.28) --> <!-- Kubernetes Java Client (支持K8S 1.23-1.29+) -->
<dependency> <dependency>
<groupId>io.kubernetes</groupId> <groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId> <artifactId>client-java</artifactId>
<version>18.0.1</version> <version>21.0.2-legacy</version>
</dependency>
<!-- 覆盖 kubernetes-client 的 OkHttp 版本,使用 5.x 以支持虚拟线程 -->
<!-- OkHttp 5.x 内部将 synchronized 改为 Lock/Condition避免虚拟线程 pinning 和 HTTP/2 死锁 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>5.0.0-alpha.14</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>5.0.0-alpha.14</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -146,7 +146,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
CoreV1Api api = new CoreV1Api(cache.apiClient); CoreV1Api api = new CoreV1Api(cache.apiClient);
V1NamespaceList namespaceList = api.listNamespace( V1NamespaceList namespaceList = api.listNamespace(
null, null, null, null, null, null, null, null, null, null null, null, null, null, null, null, null, null, null, null, null
); );
List<K8sNamespaceResponse> namespaces = new ArrayList<>(); List<K8sNamespaceResponse> namespaces = new ArrayList<>();
@ -200,7 +200,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
AppsV1Api api = new AppsV1Api(cache.apiClient); AppsV1Api api = new AppsV1Api(cache.apiClient);
V1DeploymentList deploymentList = api.listNamespacedDeployment( V1DeploymentList deploymentList = api.listNamespacedDeployment(
namespace, null, null, null, null, null, null, null, null, null, null namespace, null, null, null, null, null, null, null, null, null, null, null
); );
List<K8sDeploymentResponse> deployments = new ArrayList<>(); List<K8sDeploymentResponse> deployments = new ArrayList<>();
@ -279,7 +279,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
AppsV1Api api = new AppsV1Api(cache.apiClient); AppsV1Api api = new AppsV1Api(cache.apiClient);
V1DeploymentList deploymentList = api.listDeploymentForAllNamespaces( V1DeploymentList deploymentList = api.listDeploymentForAllNamespaces(
null, null, null, null, null, null, null, null, null, null null, null, null, null, null, null, null, null, null, null, null
); );
List<K8sDeploymentResponse> deployments = new ArrayList<>(); List<K8sDeploymentResponse> deployments = new ArrayList<>();
@ -357,7 +357,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
CoreV1Api api = new CoreV1Api(cache.apiClient); CoreV1Api api = new CoreV1Api(cache.apiClient);
V1PodList podList = api.listNamespacedPod( V1PodList podList = api.listNamespacedPod(
namespace, null, null, null, null, null, null, null, null, null, null namespace, null, null, null, null, null, null, null, null, null, null, null
); );
List<K8sPodResponse> pods = new ArrayList<>(); List<K8sPodResponse> pods = new ArrayList<>();
@ -406,7 +406,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
// 3. 使用label selector查询Pod // 3. 使用label selector查询Pod
CoreV1Api coreApi = new CoreV1Api(cache.apiClient); CoreV1Api coreApi = new CoreV1Api(cache.apiClient);
V1PodList podList = coreApi.listNamespacedPod( V1PodList podList = coreApi.listNamespacedPod(
namespace, null, null, null, null, labelSelector, null, null, null, null, null namespace, null, null, null, null, labelSelector, null, null, null, null, null, null
); );
List<K8sPodResponse> pods = new ArrayList<>(); List<K8sPodResponse> pods = new ArrayList<>();

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -60,6 +61,9 @@ public class ExternalSystemHealthCheckScheduler {
private static final String FAILURE_COUNT_KEY_PREFIX = "external_system:failure_count:"; private static final String FAILURE_COUNT_KEY_PREFIX = "external_system:failure_count:";
private static final long FAILURE_COUNT_TTL_SECONDS = 5 * 60; // 失败计数过期时间 private static final long FAILURE_COUNT_TTL_SECONDS = 5 * 60; // 失败计数过期时间
// 防重复执行标记
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** /**
* 检查所有外部系统的健康状态 * 检查所有外部系统的健康状态
* 此方法由定时任务管理系统调用 * 此方法由定时任务管理系统调用
@ -69,6 +73,24 @@ public class ExternalSystemHealthCheckScheduler {
*/ */
public void checkExternalSystemsHealth(Long notificationChannelId, public void checkExternalSystemsHealth(Long notificationChannelId,
Long resourceAlertTemplateId) { Long resourceAlertTemplateId) {
// 防止重复执行如果上一次检查还在执行中跳过本次
if (!isRunning.compareAndSet(false, true)) {
log.warn("上一次外部系统健康检查任务仍在执行中,跳过本次执行");
return;
}
try {
doCheckExternalSystemsHealth(notificationChannelId, resourceAlertTemplateId);
} finally {
isRunning.set(false);
}
}
/**
* 执行健康检查的核心逻辑
*/
private void doCheckExternalSystemsHealth(Long notificationChannelId,
Long resourceAlertTemplateId) {
// 构建通知配置对象 // 构建通知配置对象
final ServerMonitorNotificationConfig config; final ServerMonitorNotificationConfig config;
if (notificationChannelId != null) { if (notificationChannelId != null) {

View File

@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -67,6 +68,9 @@ public class ServerMonitorScheduler {
@Qualifier("serverMonitorExecutor") @Qualifier("serverMonitorExecutor")
private Executor serverMonitorExecutor; private Executor serverMonitorExecutor;
// 防重复执行标记
private final AtomicBoolean isRunning = new AtomicBoolean(false);
/** /**
* 采集所有在线服务器的监控数据 * 采集所有在线服务器的监控数据
* 此方法由定时任务管理系统调用 * 此方法由定时任务管理系统调用
@ -76,6 +80,24 @@ public class ServerMonitorScheduler {
*/ */
public void collectServerMetrics(Long notificationChannelId, public void collectServerMetrics(Long notificationChannelId,
Long resourceAlertTemplateId) { Long resourceAlertTemplateId) {
// 防止重复执行如果上一次采集还在执行中跳过本次
if (!isRunning.compareAndSet(false, true)) {
log.warn("上一次服务器监控采集任务仍在执行中,跳过本次执行");
return;
}
try {
doCollectServerMetrics(notificationChannelId, resourceAlertTemplateId);
} finally {
isRunning.set(false);
}
}
/**
* 执行服务器监控采集的核心逻辑
*/
private void doCollectServerMetrics(Long notificationChannelId,
Long resourceAlertTemplateId) {
// 构建通知配置对象 // 构建通知配置对象
ServerMonitorNotificationConfig config = null; ServerMonitorNotificationConfig config = null;
if (notificationChannelId != null) { if (notificationChannelId != null) {

View File

@ -33,7 +33,10 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -249,12 +252,31 @@ public class K8sDeploymentServiceImpl extends BaseServiceImpl<K8sDeployment, K8s
futures.toArray(new CompletableFuture[0]) futures.toArray(new CompletableFuture[0])
); );
// 阻塞等待所有任务完成 // 阻塞等待所有任务完成最多5分钟超时
allFutures.join(); try {
allFutures.get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("K8S Deployment同步超时5分钟", e);
futures.forEach(f -> f.cancel(true));
throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED);
} catch (ExecutionException e) {
log.error("K8S Deployment同步执行异常", e);
throw new BusinessException(ResponseCode.K8S_DEPLOYMENT_SYNC_FAILED);
}
// 汇总同步数量 // 汇总同步数量
int totalSyncCount = futures.stream() int totalSyncCount = futures.stream()
.map(CompletableFuture::join) .filter(f -> !f.isCancelled() && f.isDone())
.map(f -> {
try {
return f.getNow(0);
} catch (Exception e) {
return 0;
}
})
.mapToInt(Integer::intValue) .mapToInt(Integer::intValue)
.sum(); .sum();

View File

@ -34,6 +34,9 @@ import java.util.Map;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -349,9 +352,31 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
futures.add(future); futures.add(future);
} }
// 6. 等待所有异步任务完成并收集结果 // 6. 等待所有异步任务完成并收集结果最多5分钟超时
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("分支同步超时5分钟", e);
futures.forEach(f -> f.cancel(true));
throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED);
} catch (ExecutionException e) {
log.error("分支同步执行异常", e);
throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED);
}
List<RepositoryBranch> allBranchesToSave = futures.stream() List<RepositoryBranch> allBranchesToSave = futures.stream()
.map(CompletableFuture::join) .filter(f -> !f.isCancelled() && f.isDone())
.map(f -> {
try {
return f.getNow(List.of());
} catch (Exception e) {
return List.<RepositoryBranch>of();
}
})
.flatMap(List::stream) .flatMap(List::stream)
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -43,6 +43,9 @@ import java.util.ArrayList;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.Collections; import java.util.Collections;
@ -255,8 +258,20 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
futures.add(future); futures.add(future);
} }
// 等待所有异步任务完成 // 等待所有异步任务完成最多5分钟超时
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("项目同步超时5分钟", e);
futures.forEach(f -> f.cancel(true));
throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED);
} catch (ExecutionException e) {
log.error("项目同步执行异常", e);
throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED);
}
// 批量保存项目 // 批量保存项目
if (!projectsToSave.isEmpty()) { if (!projectsToSave.isEmpty()) {