This commit is contained in:
dengqichen 2026-01-21 13:39:39 +08:00
parent 3fa1d53368
commit af8e638f0b
7 changed files with 318 additions and 10 deletions

View File

@ -245,6 +245,29 @@ public class ThreadPoolConfig {
return executor;
}
/**
* 外部系统健康检查线程池 - 使用虚拟线程Java 21+
*
* 为什么使用虚拟线程
* 1. 外部系统连接检测是典型的**网络I/O密集型**任务
* 2. 调用Jenkins/Git/K8s的testConnection()时会长时间等待网络响应
* 3. 虚拟线程在阻塞时不占用OS线程资源消耗极低
* 4. 支持数十个外部系统并发检测无需担心线程池耗尽
*
* 💡 场景
* - 定时检测Jenkins/Git/K8s等外部系统的连通性
* - 并发调用多个外部系统的API
* - 检测VPN连接状态
* - 快速失败机制超时5-15秒
*/
@Bean("externalSystemHealthCheckExecutor")
public SimpleAsyncTaskExecutor externalSystemHealthCheckExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("external-system-health-virtual-");
executor.setVirtualThreads(true);
executor.setConcurrencyLimit(-1); // 无限制支持大量并发检测
return executor;
}
// ========== 注意 ==========
// sshOutputExecutor 已迁移到 Framework
// : framework.ssh.websocket.SSHWebSocketConfig

View File

@ -9,6 +9,7 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.*;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
@ -27,12 +28,20 @@ import java.util.stream.Collectors;
@Service
public class GitServiceIntegrationImpl extends BaseExternalSystemIntegration implements IGitServiceIntegration {
private final RestTemplate restTemplate = new RestTemplate();
private final RestTemplate restTemplate;
// Git系统解密缓存 - 线程安全
private static final Map<Long, GitSystemCache> SYSTEM_CACHE = new ConcurrentHashMap<>();
private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; // 30分钟过期
public GitServiceIntegrationImpl() {
// 配置RestTemplate超时防止网络故障时长时间阻塞
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(5000); // 连接超时5秒
factory.setReadTimeout(10000); // 读取超时10秒
this.restTemplate = new RestTemplate(factory);
}
// GitLab API 分页配置
private static final int PER_PAGE = 100; // GitLab API 最大值
private static final int MAX_PAGES = 100; // 防止无限循环最多获取 10000

View File

@ -47,7 +47,7 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
// 日志流专用ApiClient缓存长超时按k8sSystemId复用
private static final Map<Long, ApiClient> LOG_STREAM_API_CLIENT_CACHE = new ConcurrentHashMap<>();
private static final long CACHE_EXPIRE_TIME = 30 * 60 * 1000; // 30分钟过期
private static final long CACHE_EXPIRE_TIME = 10 * 60 * 1000; // 10分钟过期优化缩短过期时间避免持有失效连接
/**
* K8S ApiClient缓存内部类
@ -116,12 +116,9 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
throw new BusinessException(ResponseCode.K8S_CONFIG_EMPTY);
}
// 创建K8S ApiClient并测试连接直接使用config作为kubeconfig
ApiClient client = Config.fromConfig(new StringReader(config));
client.setConnectTimeout(15000); // 15秒连接超时
client.setReadTimeout(120000); // 120秒读取超时优化日志查询等耗时操作
VersionApi versionApi = new VersionApi(client);
// 使用缓存的ApiClient进行连接测试优化复用连接减少资源开销
K8sApiClientCache cache = getApiClientCache(system);
VersionApi versionApi = new VersionApi(cache.apiClient);
VersionInfo version = versionApi.getCode();
log.info("K8S集群连接成功版本: {}", version.getGitVersion());
@ -129,6 +126,10 @@ public class K8sServiceIntegrationImpl extends BaseExternalSystemIntegration imp
} catch (Exception e) {
log.error("K8S连接测试失败集群: {}, 错误: {}", system.getName(), e.getMessage(), e);
// 连接失败时清除缓存下次重新创建
API_CLIENT_CACHE.remove(system.getId());
return false;
}
}

View File

@ -0,0 +1,225 @@
package com.qqchen.deploy.backend.deploy.scheduler;
import com.qqchen.deploy.backend.deploy.dto.ServerMonitorNotificationConfig;
import com.qqchen.deploy.backend.deploy.entity.ExternalSystem;
import com.qqchen.deploy.backend.deploy.repository.IExternalSystemRepository;
import com.qqchen.deploy.backend.deploy.service.IExternalSystemService;
import com.qqchen.deploy.backend.framework.utils.RedisUtil;
import com.qqchen.deploy.backend.notification.dto.SendNotificationRequest;
import com.qqchen.deploy.backend.notification.service.INotificationService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* 外部系统健康检查定时任务
* 定期检测Jenkins/Git/K8s等外部系统的连通性
* 由定时任务管理系统调用不使用@Scheduled注解
*
* @author qqchen
* @since 2025-01-21
*/
@Slf4j
@Component
public class ExternalSystemHealthCheckScheduler {
@Resource
private IExternalSystemRepository externalSystemRepository;
@Resource
private IExternalSystemService externalSystemService;
@Resource
private RedisUtil redisUtil;
@Resource
private INotificationService notificationService;
@Resource
@Qualifier("externalSystemHealthCheckExecutor")
private Executor externalSystemHealthCheckExecutor;
// 失败阈值配置硬编码
private static final int WARNING_THRESHOLD = 3; // 连续失败3次触发警告
private static final int CRITICAL_THRESHOLD = 5; // 连续失败5次触发严重告警
// Redis Key前缀
private static final String FAILURE_COUNT_KEY_PREFIX = "external_system:failure_count:";
private static final long FAILURE_COUNT_TTL_SECONDS = 5 * 60; // 失败计数过期时间
/**
* 检查所有外部系统的健康状态
* 此方法由定时任务管理系统调用
*
* @param notificationChannelId 通知渠道ID可选为null则不发送通知
* @param resourceAlertTemplateId 资源告警通知模板ID可选
*/
public void checkExternalSystemsHealth(Long notificationChannelId,
Long resourceAlertTemplateId) {
// 构建通知配置对象
final ServerMonitorNotificationConfig config;
if (notificationChannelId != null) {
config = new ServerMonitorNotificationConfig();
config.setNotificationChannelId(notificationChannelId);
config.setResourceAlertTemplateId(resourceAlertTemplateId);
log.debug("开始检查外部系统健康状态: channelId={}, alertTemplateId={}",
notificationChannelId, resourceAlertTemplateId);
} else {
config = null;
log.debug("开始检查外部系统健康状态(不发送通知)");
}
long startTime = System.currentTimeMillis();
try {
// 1. 查询所有启用的外部系统
List<ExternalSystem> systems = externalSystemRepository.findByDeletedFalseOrderBySort()
.stream()
.filter(system -> Boolean.TRUE.equals(system.getEnabled()))
.collect(Collectors.toList());
if (systems.isEmpty()) {
log.debug("没有需要检查的外部系统,跳过健康检查");
return;
}
log.info("开始检查 {} 个外部系统的健康状态", systems.size());
// 2. 使用虚拟线程并发检测所有系统
List<CompletableFuture<Void>> futures = systems.stream()
.map(system -> CompletableFuture.runAsync(() ->
checkSingleSystem(system, config), externalSystemHealthCheckExecutor))
.collect(Collectors.toList());
// 3. 等待所有检测完成带超时控制防止线程永久阻塞
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
try {
// 设置3分钟超时外部系统连接测试比服务器监控快
allFutures.get(3, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("⏱️ 外部系统健康检查超时3分钟部分系统可能连接失败或响应缓慢", e);
// 取消未完成的任务释放资源
futures.forEach(f -> f.cancel(true));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("外部系统健康检查被中断", e);
} catch (ExecutionException e) {
log.error("外部系统健康检查执行异常", e);
}
long duration = System.currentTimeMillis() - startTime;
log.info("外部系统健康检查完成,耗时: {}ms", duration);
} catch (Exception e) {
log.error("外部系统健康检查失败", e);
}
}
/**
* 检查单个外部系统的健康状态
*/
private void checkSingleSystem(ExternalSystem system, ServerMonitorNotificationConfig config) {
String systemName = system.getName();
Long systemId = system.getId();
try {
log.debug("检查外部系统: {} ({})", systemName, system.getType());
// 调用Service层的testConnection方法会自动更新lastConnectTime
boolean healthy = externalSystemService.testConnection(systemId);
// Redis失败计数key
String failureKey = FAILURE_COUNT_KEY_PREFIX + systemId;
if (!healthy) {
// 失败累加计数
long failureCount = redisUtil.incr(failureKey, 1);
redisUtil.expire(failureKey, FAILURE_COUNT_TTL_SECONDS);
log.warn("外部系统连接失败: {} ({}), 连续失败次数: {}",
systemName, system.getType(), failureCount);
// 检查是否达到告警阈值
if (failureCount == WARNING_THRESHOLD) {
sendAlert(system, "WARNING", failureCount, config);
} else if (failureCount == CRITICAL_THRESHOLD) {
sendAlert(system, "CRITICAL", failureCount, config);
}
} else {
// 成功检查之前是否有失败记录
Object previousCountObj = redisUtil.get(failureKey);
if (previousCountObj != null) {
long previousCount = Long.parseLong(previousCountObj.toString());
if (previousCount >= WARNING_THRESHOLD) {
log.info("外部系统连接已恢复: {} ({}), 之前连续失败: {}次",
systemName, system.getType(), previousCount);
}
}
// 清除失败计数
redisUtil.del(failureKey);
}
} catch (Exception e) {
log.error("检测外部系统健康状态异常: {} ({})", systemName, system.getType(), e);
}
}
/**
* 发送告警通知
*/
private void sendAlert(ExternalSystem system, String level,
Long failureCount, ServerMonitorNotificationConfig config) {
if (config == null || config.getNotificationChannelId() == null) {
log.debug("通知配置不完整,跳过发送告警通知");
return;
}
try {
// 构建通知参数
Map<String, Object> params = new HashMap<>();
params.put("systemName", system.getName());
params.put("systemType", system.getType().name());
params.put("systemUrl", system.getUrl());
params.put("alertLevel", level.equals("WARNING") ? "警告" : "严重");
params.put("failureCount", failureCount);
params.put("threshold", level.equals("WARNING") ? WARNING_THRESHOLD : CRITICAL_THRESHOLD);
params.put("alertTime", LocalDateTime.now().format(
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
params.put("suggestion", level.equals("CRITICAL") ?
"请立即检查网络连接或VPN状态" :
"请关注系统连接状态,如持续失败将触发严重告警");
// 发送通知
SendNotificationRequest request = new SendNotificationRequest();
request.setChannelId(config.getNotificationChannelId());
request.setNotificationTemplateId(config.getResourceAlertTemplateId());
request.setTemplateParams(params);
notificationService.send(request);
log.info("✅ 外部系统告警通知已发送: system={}, level={}, count={}, channelId={}",
system.getName(), level, failureCount, config.getNotificationChannelId());
} catch (Exception e) {
log.error("发送外部系统告警通知失败: system={}", system.getName(), e);
}
}
}

View File

@ -28,7 +28,10 @@ import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
@ -108,11 +111,24 @@ public class ServerMonitorScheduler {
serverMonitorExecutor)) // 使用专用虚拟线程池
.collect(Collectors.toList());
// 3. 等待所有任务完成
// 3. 等待所有任务完成带超时控制防止线程永久阻塞
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.join();
try {
// 设置5分钟超时避免Quartz工作线程被长时间占用
allFutures.get(5, TimeUnit.MINUTES);
} catch (TimeoutException e) {
log.error("⏱️ 服务器监控数据采集超时5分钟部分服务器可能连接失败或响应缓慢", e);
// 取消未完成的任务释放资源
futures.forEach(f -> f.cancel(true));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("服务器监控数据采集被中断", e);
} catch (ExecutionException e) {
log.error("服务器监控数据采集执行异常", e);
}
// 4. 收集结果
List<ServerMonitorDataDTO> monitorDataList = futures.stream()

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
<!-- Jenkins构建同步日志优化 - 无数据库变更 -->
<changeSet id="20260121103600" author="dengqichen">
<sqlFile path="../sql/20260121103600-01.sql" relativeToChangelogFile="true"/>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,23 @@
-- --------------------------------------------------------------------------------------
-- 系统版本发布记录 - v1.1
-- 功能记录Jenkins日志优化版本
-- 作者qqchen
-- 日期2025-12-09 14:13
-- --------------------------------------------------------------------------------------
-- 插入 1.1 日志优化发布记录
INSERT INTO system_release (
create_by, create_time, update_by, update_time, version, deleted,
release_version, module, release_date, changes, notified, delay_minutes, estimated_duration, enable_auto_shutdown
)
VALUES (
'system', NOW(), 'system', NOW(), 1, 0,
1.48, 'ALL', NOW(),
'【后端】
Git集成RestTemplate未配置超时导致VPN断开时线程永久阻塞的问题510
ExternalSystemHealthCheckSchedulerJenkins/Git/K8s连通性检测
线externalSystemHealthCheckExecutor
35使Redis记录失败计数
',
0, NULL, NULL, 0
);