优化
This commit is contained in:
parent
e6507ae8a6
commit
af7a9ca39d
@ -289,17 +289,17 @@
|
||||
<version>21.0.2-legacy</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 覆盖 kubernetes-client 的 OkHttp 版本,使用 5.x 以支持虚拟线程 -->
|
||||
<!-- OkHttp 5.x 内部将 synchronized 改为 Lock/Condition,避免虚拟线程 pinning 和 HTTP/2 死锁 -->
|
||||
<!-- 覆盖 kubernetes-client 的 OkHttp 版本,使用 5.x 稳定版 -->
|
||||
<!-- OkHttp 5.3.0 修复了 alpha 版本的 TaskRunner 死锁问题 -->
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
<version>5.0.0-alpha.14</version>
|
||||
<version>5.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>logging-interceptor</artifactId>
|
||||
<version>5.0.0-alpha.14</version>
|
||||
<version>5.3.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package com.qqchen.deploy.backend.deploy.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
@ -117,7 +118,10 @@ public class ThreadPoolConfig {
|
||||
* 4. 平台线程对CPU密集型任务更高效
|
||||
*
|
||||
* 💡 如果确认只用于I/O密集型任务,可改为虚拟线程
|
||||
*
|
||||
* 🎯 标记为 @Primary,作为未指定 executor 的 @Async 方法的默认线程池
|
||||
*/
|
||||
@Primary
|
||||
@Bean("applicationTaskExecutor")
|
||||
public AsyncTaskExecutor applicationTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
|
||||
@ -1,312 +1,103 @@
|
||||
package com.qqchen.deploy.backend.deploy.lock;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.lock.LocalLockManager;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 智能同步任务本地锁管理器
|
||||
* <p>支持两种使用方式:</p>
|
||||
* <ol>
|
||||
* <li>自动识别调用者(类名+方法名)生成锁key - 适用于单方法独立锁</li>
|
||||
* <li>显式指定锁类型 - 适用于跨方法/跨服务协调锁</li>
|
||||
* </ol>
|
||||
* 同步任务锁管理器
|
||||
* <p>
|
||||
* 基于 {@link LocalLockManager} 实现,专用于同步任务的锁管理。
|
||||
* 提供类型安全的锁操作,通过 {@link SyncLockType} 避免字符串拼写错误。
|
||||
* </p>
|
||||
*
|
||||
* <p>使用示例1(自动识别):
|
||||
* <pre>
|
||||
* if (!syncLockManager.tryLock(externalSystemId)) {
|
||||
* return;
|
||||
* }
|
||||
* try {
|
||||
* // 执行同步逻辑
|
||||
* } finally {
|
||||
* syncLockManager.unlock(externalSystemId);
|
||||
* }
|
||||
* </pre>
|
||||
* <p><b>设计说明:</b></p>
|
||||
* <ul>
|
||||
* <li>同一外部系统的不同同步操作(如 Jenkins 的 View/Job/Build)共享同一把锁</li>
|
||||
* <li>锁粒度为 externalSystemId,避免同一系统的多个同步任务并发执行</li>
|
||||
* <li>锁自动超时释放(默认5分钟),防止死锁</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>使用示例2(显式锁类型 - 推荐用于跨服务协调):
|
||||
* <pre>
|
||||
* <p>使用示例:
|
||||
* <pre>{@code
|
||||
* if (!syncLockManager.tryLock(SyncLockType.JENKINS_SYNC, externalSystemId)) {
|
||||
* return;
|
||||
* return; // 已有同步任务在执行
|
||||
* }
|
||||
* try {
|
||||
* // 执行同步逻辑
|
||||
* doSync();
|
||||
* } finally {
|
||||
* syncLockManager.unlock(SyncLockType.JENKINS_SYNC, externalSystemId);
|
||||
* }
|
||||
* </pre>
|
||||
* }</pre>
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SyncLockManager {
|
||||
|
||||
// 统一的锁存储
|
||||
private final ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<>();
|
||||
|
||||
// ==================== 显式锁类型方法(推荐用于跨服务协调) ====================
|
||||
@Resource
|
||||
private LocalLockManager localLockManager;
|
||||
|
||||
/**
|
||||
* 尝试获取锁(显式指定锁类型)
|
||||
* <p>推荐用于需要跨方法/跨服务协调的场景</p>
|
||||
* 尝试获取同步锁
|
||||
*
|
||||
* @param lockType 锁类型
|
||||
* @param params 锁参数(如 externalSystemId)
|
||||
* @return true-获取成功,false-已被锁定
|
||||
* @return true-获取成功,false-锁已被持有
|
||||
*/
|
||||
public boolean tryLock(SyncLockType lockType, Object... params) {
|
||||
String lockKey = buildLockKey(lockType, params);
|
||||
boolean acquired = locks.putIfAbsent(lockKey, true) == null;
|
||||
|
||||
if (!acquired) {
|
||||
log.warn("同步任务正在执行中,跳过本次同步: lockType={}, params={}",
|
||||
lockType.getKey(), formatParams(params));
|
||||
} else {
|
||||
log.debug("获取同步锁成功: lockType={}, params={}",
|
||||
lockType.getKey(), formatParams(params));
|
||||
}
|
||||
|
||||
return acquired;
|
||||
String key = buildKey(lockType, params);
|
||||
return localLockManager.tryLock(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放锁(显式指定锁类型)
|
||||
* 释放同步锁
|
||||
*
|
||||
* @param lockType 锁类型
|
||||
* @param params 锁参数(必须与tryLock时完全一致)
|
||||
* @param params 锁参数(必须与 tryLock 时一致)
|
||||
*/
|
||||
public void unlock(SyncLockType lockType, Object... params) {
|
||||
String lockKey = buildLockKey(lockType, params);
|
||||
locks.remove(lockKey);
|
||||
log.debug("同步任务完成,释放锁: lockType={}, params={}",
|
||||
lockType.getKey(), formatParams(params));
|
||||
String key = buildKey(lockType, params);
|
||||
localLockManager.unlock(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查锁是否被持有(显式指定锁类型)
|
||||
*
|
||||
* @param lockType 锁类型
|
||||
* @param params 锁参数
|
||||
* @return true-已被锁定,false-未锁定
|
||||
* 检查锁是否被持有
|
||||
*/
|
||||
public boolean isLocked(SyncLockType lockType, Object... params) {
|
||||
String lockKey = buildLockKey(lockType, params);
|
||||
return locks.containsKey(lockKey);
|
||||
String key = buildKey(lockType, params);
|
||||
return localLockManager.isLocked(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建锁key(显式锁类型)
|
||||
* 获取当前锁数量
|
||||
*/
|
||||
private String buildLockKey(SyncLockType lockType, Object... params) {
|
||||
public long getLockCount() {
|
||||
return localLockManager.getLockCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有锁状态(用于监控)
|
||||
*/
|
||||
public List<String> getLockStatus() {
|
||||
return localLockManager.getLockStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* 强制释放指定锁(紧急情况使用)
|
||||
*/
|
||||
public boolean forceUnlock(SyncLockType lockType, Object... params) {
|
||||
String key = buildKey(lockType, params);
|
||||
return localLockManager.forceUnlock(key);
|
||||
}
|
||||
|
||||
private String buildKey(SyncLockType lockType, Object... params) {
|
||||
StringBuilder key = new StringBuilder(lockType.getKey());
|
||||
for (Object param : params) {
|
||||
key.append(":").append(param != null ? param : "null");
|
||||
}
|
||||
return key.toString();
|
||||
}
|
||||
|
||||
// ==================== 自动识别调用者方法(向后兼容) ====================
|
||||
|
||||
/**
|
||||
* 尝试获取锁(自动识别调用者)
|
||||
* <p>适用于单方法独立锁场景,不需要跨服务协调</p>
|
||||
*
|
||||
* @param params 锁参数(如 externalSystemId, viewId等)
|
||||
* @return true-获取成功,false-已被锁定
|
||||
*/
|
||||
public boolean tryLock(Object... params) {
|
||||
CallerInfo callerInfo = getCallerInfo();
|
||||
String lockKey = buildLockKey(callerInfo, params);
|
||||
boolean acquired = locks.putIfAbsent(lockKey, true) == null;
|
||||
|
||||
if (!acquired) {
|
||||
log.warn("同步任务正在执行中,跳过本次同步: method={}, params={}",
|
||||
callerInfo.getFullMethodName(), formatParams(params));
|
||||
} else {
|
||||
log.debug("获取同步锁成功: method={}, params={}",
|
||||
callerInfo.getFullMethodName(), formatParams(params));
|
||||
}
|
||||
|
||||
return acquired;
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放锁(自动识别调用者)
|
||||
*
|
||||
* @param params 锁参数(必须与tryLock时完全一致)
|
||||
*/
|
||||
public void unlock(Object... params) {
|
||||
CallerInfo callerInfo = getCallerInfo();
|
||||
String lockKey = buildLockKey(callerInfo, params);
|
||||
locks.remove(lockKey);
|
||||
log.debug("同步任务完成,释放锁: method={}, params={}",
|
||||
callerInfo.getFullMethodName(), formatParams(params));
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查锁是否被持有(自动识别调用者)
|
||||
*
|
||||
* @param params 锁参数
|
||||
* @return true-已被锁定,false-未锁定
|
||||
*/
|
||||
public boolean isLocked(Object... params) {
|
||||
CallerInfo callerInfo = getCallerInfo();
|
||||
String lockKey = buildLockKey(callerInfo, params);
|
||||
return locks.containsKey(lockKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前锁数量(用于监控)
|
||||
*
|
||||
* @return 当前持有的锁数量
|
||||
*/
|
||||
public int getLockCount() {
|
||||
return locks.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取调用者信息(通过栈追踪,智能跳过Spring代理层)
|
||||
*
|
||||
* @return 调用者信息
|
||||
*/
|
||||
private CallerInfo getCallerInfo() {
|
||||
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
|
||||
|
||||
// 从栈顶开始遍历,跳过本类和Spring的代理/拦截器类
|
||||
// 栈结构示例:
|
||||
// 0: Thread.getStackTrace()
|
||||
// 1: SyncLockManager.getCallerInfo()
|
||||
// 2: SyncLockManager.tryLock/unlock/isLocked()
|
||||
// 3+: 可能是Spring代理类或真实调用者
|
||||
|
||||
for (int i = 3; i < stackTrace.length; i++) {
|
||||
StackTraceElement element = stackTrace[i];
|
||||
String className = element.getClassName();
|
||||
String methodName = element.getMethodName();
|
||||
|
||||
// 跳过 Spring 和 JDK 的内部类/代理类
|
||||
if (shouldSkipFrame(className, methodName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 找到真正的业务调用者
|
||||
String simpleClassName = className.substring(className.lastIndexOf('.') + 1);
|
||||
return new CallerInfo(simpleClassName, methodName, className + "." + methodName);
|
||||
}
|
||||
|
||||
// 兜底:使用unknown
|
||||
return new CallerInfo("Unknown", "unknown", "Unknown.unknown");
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否应该跳过该栈帧
|
||||
*
|
||||
* @param className 类名
|
||||
* @param methodName 方法名
|
||||
* @return true-跳过,false-不跳过
|
||||
*/
|
||||
private boolean shouldSkipFrame(String className, String methodName) {
|
||||
// 跳过 CGLIB 动态代理类(所有包含$$的类都是CGLIB代理)
|
||||
if (className.contains("$$")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 跳过 JDK 动态代理
|
||||
if (className.startsWith("jdk.proxy") || className.startsWith("com.sun.proxy")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 跳过 Spring AOP 拦截器
|
||||
if (className.contains("AsyncExecutionInterceptor") ||
|
||||
className.contains("TransactionInterceptor") ||
|
||||
className.contains("CglibAopProxy") ||
|
||||
className.contains("ReflectiveMethodInvocation") ||
|
||||
className.contains("DynamicAdvisedInterceptor")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 跳过线程池和任务执行器
|
||||
if (className.contains("ThreadPoolExecutor") ||
|
||||
className.contains("FutureTask") ||
|
||||
className.contains("CompletableFuture")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 跳过 Java 反射
|
||||
if (className.startsWith("java.lang.reflect") ||
|
||||
className.startsWith("jdk.internal.reflect")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 跳过 lambda 表达式的合成方法
|
||||
if (methodName.contains("lambda$")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建锁的唯一key
|
||||
*
|
||||
* @param callerInfo 调用者信息
|
||||
* @param params 参数列表
|
||||
* @return 锁key
|
||||
*/
|
||||
private String buildLockKey(CallerInfo callerInfo, Object... params) {
|
||||
StringBuilder key = new StringBuilder(callerInfo.getFullMethodName());
|
||||
for (Object param : params) {
|
||||
key.append(":").append(param != null ? param : "null");
|
||||
}
|
||||
return key.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 格式化参数用于日志输出
|
||||
*
|
||||
* @param params 参数列表
|
||||
* @return 格式化字符串
|
||||
*/
|
||||
private String formatParams(Object... params) {
|
||||
if (params.length == 0) {
|
||||
return "[]";
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder("[");
|
||||
for (int i = 0; i < params.length; i++) {
|
||||
if (i > 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append(params[i]);
|
||||
}
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用者信息
|
||||
*/
|
||||
private static class CallerInfo {
|
||||
private final String simpleClassName;
|
||||
private final String methodName;
|
||||
private final String fullMethodName;
|
||||
|
||||
public CallerInfo(String simpleClassName, String methodName, String fullMethodName) {
|
||||
this.simpleClassName = simpleClassName;
|
||||
this.methodName = methodName;
|
||||
this.fullMethodName = fullMethodName;
|
||||
}
|
||||
|
||||
public String getSimpleClassName() {
|
||||
return simpleClassName;
|
||||
}
|
||||
|
||||
public String getMethodName() {
|
||||
return methodName;
|
||||
}
|
||||
|
||||
public String getFullMethodName() {
|
||||
return fullMethodName;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,8 +16,9 @@ import com.qqchen.deploy.backend.deploy.service.IRepositoryBranchService;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
import com.qqchen.deploy.backend.deploy.dto.RepositorySyncHistoryDTO;
|
||||
import com.qqchen.deploy.backend.deploy.lock.SyncLockType;
|
||||
import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService;
|
||||
import com.qqchen.deploy.backend.deploy.dto.RepositorySyncHistoryDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.orm.ObjectOptimisticLockingFailureException;
|
||||
@ -200,15 +201,15 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncBranches(Long externalSystemId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncBranches(externalSystemId, null, null);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -216,15 +217,15 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncBranches(Long externalSystemId, Long repoGroupId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId, repoGroupId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncBranches(externalSystemId, repoGroupId, null);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId, repoGroupId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -232,15 +233,15 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncBranches(Long externalSystemId, Long repoGroupId, Long repoProjectId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId, repoGroupId, repoProjectId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncBranches(externalSystemId, repoGroupId, repoProjectId);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId, repoGroupId, repoProjectId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@ import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
import com.qqchen.deploy.backend.deploy.lock.SyncLockType;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.Page;
|
||||
@ -67,15 +68,15 @@ public class RepositoryGroupServiceImpl extends BaseServiceImpl<RepositoryGroup,
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncGroups(Long externalSystemId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncGroups(externalSystemId);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService;
|
||||
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
import com.qqchen.deploy.backend.deploy.lock.SyncLockType;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.domain.Page;
|
||||
@ -118,15 +119,15 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncProjects(Long externalSystemId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncProjects(externalSystemId, null);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,15 +135,15 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
|
||||
@Async
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void syncProjects(Long externalSystemId, Long repoGroupId) {
|
||||
// 尝试获取锁
|
||||
if (!syncLockManager.tryLock(externalSystemId, repoGroupId)) {
|
||||
// 尝试获取锁(使用 GITLAB_SYNC,锁粒度为 externalSystemId)
|
||||
if (!syncLockManager.tryLock(SyncLockType.GITLAB_SYNC, externalSystemId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doSyncProjects(externalSystemId, repoGroupId);
|
||||
} finally {
|
||||
syncLockManager.unlock(externalSystemId, repoGroupId);
|
||||
syncLockManager.unlock(SyncLockType.GITLAB_SYNC, externalSystemId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,250 @@
|
||||
package com.qqchen.deploy.backend.framework.lock;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* 本地锁管理器(基于 Caffeine,自带超时保护)
|
||||
* <p>
|
||||
* 特性:
|
||||
* <ul>
|
||||
* <li>锁自动超时释放,防止死锁(默认5分钟)</li>
|
||||
* <li>基于 Caffeine 实现,高性能、线程安全</li>
|
||||
* <li>支持监控:查看当前锁状态、强制释放</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>使用示例:
|
||||
* <pre>{@code
|
||||
* // 方式1:try-finally(推荐)
|
||||
* if (lockManager.tryLock("jenkins-sync", systemId)) {
|
||||
* try {
|
||||
* doSync();
|
||||
* } finally {
|
||||
* lockManager.unlock("jenkins-sync", systemId);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // 方式2:使用 AutoCloseable
|
||||
* try (var lock = lockManager.lock("jenkins-sync", systemId)) {
|
||||
* if (lock.isAcquired()) {
|
||||
* doSync();
|
||||
* }
|
||||
* }
|
||||
* }</pre>
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class LocalLockManager {
|
||||
|
||||
/**
|
||||
* 默认锁超时时间:5分钟
|
||||
*/
|
||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
|
||||
|
||||
/**
|
||||
* 最大锁数量
|
||||
*/
|
||||
private static final int MAX_LOCK_SIZE = 10000;
|
||||
|
||||
/**
|
||||
* 锁存储(key -> 锁持有者信息),Caffeine 自动过期
|
||||
*/
|
||||
private Cache<String, LockHolder> locks;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
locks = Caffeine.newBuilder()
|
||||
.maximumSize(MAX_LOCK_SIZE)
|
||||
.expireAfterWrite(DEFAULT_TIMEOUT)
|
||||
.evictionListener((key, value, cause) -> {
|
||||
if (value != null && cause.wasEvicted()) {
|
||||
LockHolder holder = (LockHolder) value;
|
||||
log.warn("锁已过期自动释放: key={}, thread={}, heldFor={}s", key, holder.threadName, holder.getHeldSeconds());
|
||||
}
|
||||
})
|
||||
.build();
|
||||
log.info("LocalLockManager 初始化完成(超时={}分钟)", DEFAULT_TIMEOUT.toMinutes());
|
||||
}
|
||||
|
||||
// ==================== 核心方法 ====================
|
||||
|
||||
/**
|
||||
* 尝试获取锁
|
||||
*
|
||||
* @param key 锁标识(如 "jenkins-sync:3")
|
||||
* @return true-获取成功,false-锁已被持有
|
||||
*/
|
||||
public boolean tryLock(String key) {
|
||||
LockHolder newHolder = new LockHolder();
|
||||
LockHolder existing = locks.asMap().putIfAbsent(key, newHolder);
|
||||
|
||||
if (existing != null) {
|
||||
log.warn("获取锁失败,已被占用: key={}, holder={}, heldFor={}s", key, existing.threadName, existing.getHeldSeconds());
|
||||
return false;
|
||||
}
|
||||
|
||||
log.debug("获取锁成功: key={}, thread={}", key, newHolder.threadName);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试获取锁(组合 key)
|
||||
*
|
||||
* @param prefix 锁前缀(如 "jenkins-sync")
|
||||
* @param id 资源ID
|
||||
* @return true-获取成功,false-锁已被持有
|
||||
*/
|
||||
public boolean tryLock(String prefix, Object id) {
|
||||
return tryLock(buildKey(prefix, id));
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放锁
|
||||
*/
|
||||
public void unlock(String key) {
|
||||
LockHolder holder = locks.asMap().remove(key);
|
||||
if (holder != null) {
|
||||
log.debug("释放锁: key={}, heldFor={}s", key, holder.getHeldSeconds());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放锁(组合 key)
|
||||
*/
|
||||
public void unlock(String prefix, Object id) {
|
||||
unlock(buildKey(prefix, id));
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查锁是否被持有
|
||||
*/
|
||||
public boolean isLocked(String key) {
|
||||
return locks.getIfPresent(key) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查锁是否被持有(组合 key)
|
||||
*/
|
||||
public boolean isLocked(String prefix, Object id) {
|
||||
return isLocked(buildKey(prefix, id));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取锁(返回 AutoCloseable,支持 try-with-resources)
|
||||
*
|
||||
* @return LockHandle,调用 isAcquired() 检查是否成功
|
||||
*/
|
||||
public LockHandle lock(String key) {
|
||||
boolean acquired = tryLock(key);
|
||||
return new LockHandle(key, acquired, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取锁(组合 key,返回 AutoCloseable)
|
||||
*/
|
||||
public LockHandle lock(String prefix, Object id) {
|
||||
return lock(buildKey(prefix, id));
|
||||
}
|
||||
|
||||
// ==================== 监控方法 ====================
|
||||
|
||||
/**
|
||||
* 获取当前锁数量
|
||||
*/
|
||||
public long getLockCount() {
|
||||
return locks.estimatedSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有锁的状态(用于监控)
|
||||
*/
|
||||
public List<String> getLockStatus() {
|
||||
List<String> status = new ArrayList<>();
|
||||
ConcurrentMap<String, LockHolder> map = locks.asMap();
|
||||
for (var entry : map.entrySet()) {
|
||||
LockHolder holder = entry.getValue();
|
||||
status.add(String.format("key=%s, thread=%s, heldFor=%ds", entry.getKey(), holder.threadName, holder.getHeldSeconds()));
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* 强制释放指定锁(紧急情况使用)
|
||||
*/
|
||||
public boolean forceUnlock(String key) {
|
||||
LockHolder holder = locks.asMap().remove(key);
|
||||
if (holder != null) {
|
||||
log.warn("强制释放锁: key={}, thread={}, heldFor={}s", key, holder.threadName, holder.getHeldSeconds());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 强制释放所有锁(紧急情况使用)
|
||||
*/
|
||||
public int forceUnlockAll() {
|
||||
int count = (int) locks.estimatedSize();
|
||||
if (count > 0) {
|
||||
log.warn("强制释放所有锁,共 {} 个", count);
|
||||
locks.invalidateAll();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
// ==================== 内部类 ====================
|
||||
|
||||
private String buildKey(String prefix, Object id) {
|
||||
return prefix + ":" + id;
|
||||
}
|
||||
|
||||
/**
|
||||
* 锁持有者信息
|
||||
*/
|
||||
private static class LockHolder {
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
|
||||
final Instant acquiredAt = Instant.now();
|
||||
|
||||
long getHeldSeconds() {
|
||||
return Duration.between(acquiredAt, Instant.now()).getSeconds();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 锁句柄(支持 try-with-resources 自动释放)
|
||||
*/
|
||||
public static class LockHandle implements AutoCloseable {
|
||||
private final String key;
|
||||
|
||||
private final boolean acquired;
|
||||
|
||||
private final LocalLockManager manager;
|
||||
|
||||
LockHandle(String key, boolean acquired, LocalLockManager manager) {
|
||||
this.key = key;
|
||||
this.acquired = acquired;
|
||||
this.manager = manager;
|
||||
}
|
||||
|
||||
public boolean isAcquired() {
|
||||
return acquired;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (acquired) {
|
||||
manager.unlock(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user