From af7a9ca39d53a1daf9d1247663dd50a5cf3ab379 Mon Sep 17 00:00:00 2001 From: dengqichen Date: Thu, 22 Jan 2026 12:50:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/pom.xml | 8 +- .../deploy/config/ThreadPoolConfig.java | 4 + .../backend/deploy/lock/SyncLockManager.java | 319 +++--------------- .../impl/RepositoryBranchServiceImpl.java | 21 +- .../impl/RepositoryGroupServiceImpl.java | 7 +- .../impl/RepositoryProjectServiceImpl.java | 13 +- .../framework/lock/LocalLockManager.java | 250 ++++++++++++++ 7 files changed, 335 insertions(+), 287 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/framework/lock/LocalLockManager.java diff --git a/backend/pom.xml b/backend/pom.xml index 324bb058..1a5b03fe 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -289,17 +289,17 @@ 21.0.2-legacy - - + + com.squareup.okhttp3 okhttp - 5.0.0-alpha.14 + 5.3.0 com.squareup.okhttp3 logging-interceptor - 5.0.0-alpha.14 + 5.3.0 diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java index dff14702..3d2a91a7 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java @@ -2,6 +2,7 @@ package com.qqchen.deploy.backend.deploy.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -117,7 +118,10 @@ public class ThreadPoolConfig { * 4. 平台线程对CPU密集型任务更高效 * * 💡 如果确认只用于I/O密集型任务,可改为虚拟线程 + * + * 🎯 标记为 @Primary,作为未指定 executor 的 @Async 方法的默认线程池 */ + @Primary @Bean("applicationTaskExecutor") public AsyncTaskExecutor applicationTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/lock/SyncLockManager.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/lock/SyncLockManager.java index 14060780..8670414e 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/lock/SyncLockManager.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/lock/SyncLockManager.java @@ -1,312 +1,103 @@ package com.qqchen.deploy.backend.deploy.lock; +import com.qqchen.deploy.backend.framework.lock.LocalLockManager; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.concurrent.ConcurrentHashMap; +import java.util.List; /** - * 智能同步任务本地锁管理器 - *

支持两种使用方式:

- *
    - *
  1. 自动识别调用者(类名+方法名)生成锁key - 适用于单方法独立锁
  2. - *
  3. 显式指定锁类型 - 适用于跨方法/跨服务协调锁
  4. - *
- * - *

使用示例1(自动识别): - *

- * if (!syncLockManager.tryLock(externalSystemId)) {
- *     return;
- * }
- * try {
- *     // 执行同步逻辑
- * } finally {
- *     syncLockManager.unlock(externalSystemId);
- * }
- * 
- * - *

使用示例2(显式锁类型 - 推荐用于跨服务协调): - *

+ * 同步任务锁管理器
+ * 

+ * 基于 {@link LocalLockManager} 实现,专用于同步任务的锁管理。 + * 提供类型安全的锁操作,通过 {@link SyncLockType} 避免字符串拼写错误。 + *

+ * + *

设计说明:

+ * + * + *

使用示例: + *

{@code
  * if (!syncLockManager.tryLock(SyncLockType.JENKINS_SYNC, externalSystemId)) {
- *     return;
+ *     return;  // 已有同步任务在执行
  * }
  * try {
- *     // 执行同步逻辑
+ *     doSync();
  * } finally {
  *     syncLockManager.unlock(SyncLockType.JENKINS_SYNC, externalSystemId);
  * }
- * 
+ * }
*/ @Slf4j @Component public class SyncLockManager { - // 统一的锁存储 - private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); - - // ==================== 显式锁类型方法(推荐用于跨服务协调) ==================== + @Resource + private LocalLockManager localLockManager; /** - * 尝试获取锁(显式指定锁类型) - *

推荐用于需要跨方法/跨服务协调的场景

+ * 尝试获取同步锁 * * @param lockType 锁类型 * @param params 锁参数(如 externalSystemId) - * @return true-获取成功,false-已被锁定 + * @return true-获取成功,false-锁已被持有 */ public boolean tryLock(SyncLockType lockType, Object... params) { - String lockKey = buildLockKey(lockType, params); - boolean acquired = locks.putIfAbsent(lockKey, true) == null; - - if (!acquired) { - log.warn("同步任务正在执行中,跳过本次同步: lockType={}, params={}", - lockType.getKey(), formatParams(params)); - } else { - log.debug("获取同步锁成功: lockType={}, params={}", - lockType.getKey(), formatParams(params)); - } - - return acquired; + String key = buildKey(lockType, params); + return localLockManager.tryLock(key); } /** - * 释放锁(显式指定锁类型) + * 释放同步锁 * * @param lockType 锁类型 - * @param params 锁参数(必须与tryLock时完全一致) + * @param params 锁参数(必须与 tryLock 时一致) */ public void unlock(SyncLockType lockType, Object... params) { - String lockKey = buildLockKey(lockType, params); - locks.remove(lockKey); - log.debug("同步任务完成,释放锁: lockType={}, params={}", - lockType.getKey(), formatParams(params)); + String key = buildKey(lockType, params); + localLockManager.unlock(key); } /** - * 检查锁是否被持有(显式指定锁类型) - * - * @param lockType 锁类型 - * @param params 锁参数 - * @return true-已被锁定,false-未锁定 + * 检查锁是否被持有 */ public boolean isLocked(SyncLockType lockType, Object... params) { - String lockKey = buildLockKey(lockType, params); - return locks.containsKey(lockKey); + String key = buildKey(lockType, params); + return localLockManager.isLocked(key); } /** - * 构建锁key(显式锁类型) + * 获取当前锁数量 */ - private String buildLockKey(SyncLockType lockType, Object... params) { + public long getLockCount() { + return localLockManager.getLockCount(); + } + + /** + * 获取所有锁状态(用于监控) + */ + public List getLockStatus() { + return localLockManager.getLockStatus(); + } + + /** + * 强制释放指定锁(紧急情况使用) + */ + public boolean forceUnlock(SyncLockType lockType, Object... params) { + String key = buildKey(lockType, params); + return localLockManager.forceUnlock(key); + } + + private String buildKey(SyncLockType lockType, Object... params) { StringBuilder key = new StringBuilder(lockType.getKey()); for (Object param : params) { key.append(":").append(param != null ? param : "null"); } return key.toString(); } - - // ==================== 自动识别调用者方法(向后兼容) ==================== - - /** - * 尝试获取锁(自动识别调用者) - *

适用于单方法独立锁场景,不需要跨服务协调

- * - * @param params 锁参数(如 externalSystemId, viewId等) - * @return true-获取成功,false-已被锁定 - */ - public boolean tryLock(Object... params) { - CallerInfo callerInfo = getCallerInfo(); - String lockKey = buildLockKey(callerInfo, params); - boolean acquired = locks.putIfAbsent(lockKey, true) == null; - - if (!acquired) { - log.warn("同步任务正在执行中,跳过本次同步: method={}, params={}", - callerInfo.getFullMethodName(), formatParams(params)); - } else { - log.debug("获取同步锁成功: method={}, params={}", - callerInfo.getFullMethodName(), formatParams(params)); - } - - return acquired; - } - - /** - * 释放锁(自动识别调用者) - * - * @param params 锁参数(必须与tryLock时完全一致) - */ - public void unlock(Object... params) { - CallerInfo callerInfo = getCallerInfo(); - String lockKey = buildLockKey(callerInfo, params); - locks.remove(lockKey); - log.debug("同步任务完成,释放锁: method={}, params={}", - callerInfo.getFullMethodName(), formatParams(params)); - } - - /** - * 检查锁是否被持有(自动识别调用者) - * - * @param params 锁参数 - * @return true-已被锁定,false-未锁定 - */ - public boolean isLocked(Object... params) { - CallerInfo callerInfo = getCallerInfo(); - String lockKey = buildLockKey(callerInfo, params); - return locks.containsKey(lockKey); - } - - /** - * 获取当前锁数量(用于监控) - * - * @return 当前持有的锁数量 - */ - public int getLockCount() { - return locks.size(); - } - - /** - * 获取调用者信息(通过栈追踪,智能跳过Spring代理层) - * - * @return 调用者信息 - */ - private CallerInfo getCallerInfo() { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - - // 从栈顶开始遍历,跳过本类和Spring的代理/拦截器类 - // 栈结构示例: - // 0: Thread.getStackTrace() - // 1: SyncLockManager.getCallerInfo() - // 2: SyncLockManager.tryLock/unlock/isLocked() - // 3+: 可能是Spring代理类或真实调用者 - - for (int i = 3; i < stackTrace.length; i++) { - StackTraceElement element = stackTrace[i]; - String className = element.getClassName(); - String methodName = element.getMethodName(); - - // 跳过 Spring 和 JDK 的内部类/代理类 - if (shouldSkipFrame(className, methodName)) { - continue; - } - - // 找到真正的业务调用者 - String simpleClassName = className.substring(className.lastIndexOf('.') + 1); - return new CallerInfo(simpleClassName, methodName, className + "." + methodName); - } - - // 兜底:使用unknown - return new CallerInfo("Unknown", "unknown", "Unknown.unknown"); - } - - /** - * 判断是否应该跳过该栈帧 - * - * @param className 类名 - * @param methodName 方法名 - * @return true-跳过,false-不跳过 - */ - private boolean shouldSkipFrame(String className, String methodName) { - // 跳过 CGLIB 动态代理类(所有包含$$的类都是CGLIB代理) - if (className.contains("$$")) { - return true; - } - - // 跳过 JDK 动态代理 - if (className.startsWith("jdk.proxy") || className.startsWith("com.sun.proxy")) { - return true; - } - - // 跳过 Spring AOP 拦截器 - if (className.contains("AsyncExecutionInterceptor") || - className.contains("TransactionInterceptor") || - className.contains("CglibAopProxy") || - className.contains("ReflectiveMethodInvocation") || - className.contains("DynamicAdvisedInterceptor")) { - return true; - } - - // 跳过线程池和任务执行器 - if (className.contains("ThreadPoolExecutor") || - className.contains("FutureTask") || - className.contains("CompletableFuture")) { - return true; - } - - // 跳过 Java 反射 - if (className.startsWith("java.lang.reflect") || - className.startsWith("jdk.internal.reflect")) { - return true; - } - - // 跳过 lambda 表达式的合成方法 - if (methodName.contains("lambda$")) { - return true; - } - - return false; - } - - /** - * 构建锁的唯一key - * - * @param callerInfo 调用者信息 - * @param params 参数列表 - * @return 锁key - */ - private String buildLockKey(CallerInfo callerInfo, Object... params) { - StringBuilder key = new StringBuilder(callerInfo.getFullMethodName()); - for (Object param : params) { - key.append(":").append(param != null ? param : "null"); - } - return key.toString(); - } - - /** - * 格式化参数用于日志输出 - * - * @param params 参数列表 - * @return 格式化字符串 - */ - private String formatParams(Object... params) { - if (params.length == 0) { - return "[]"; - } - - StringBuilder sb = new StringBuilder("["); - for (int i = 0; i < params.length; i++) { - if (i > 0) { - sb.append(", "); - } - sb.append(params[i]); - } - sb.append("]"); - return sb.toString(); - } - - /** - * 调用者信息 - */ - private static class CallerInfo { - private final String simpleClassName; - private final String methodName; - private final String fullMethodName; - - public CallerInfo(String simpleClassName, String methodName, String fullMethodName) { - this.simpleClassName = simpleClassName; - this.methodName = methodName; - this.fullMethodName = fullMethodName; - } - - public String getSimpleClassName() { - return simpleClassName; - } - - public String getMethodName() { - return methodName; - } - - public String getFullMethodName() { - return fullMethodName; - } - } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java index 11ae01f5..8cc0b078 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java @@ -16,8 +16,9 @@ import com.qqchen.deploy.backend.deploy.service.IRepositoryBranchService; import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; -import com.qqchen.deploy.backend.deploy.dto.RepositorySyncHistoryDTO; +import com.qqchen.deploy.backend.deploy.lock.SyncLockType; import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService; +import com.qqchen.deploy.backend.deploy.dto.RepositorySyncHistoryDTO; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.orm.ObjectOptimisticLockingFailureException; @@ -200,15 +201,15 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl + * 特性: + *
    + *
  • 锁自动超时释放,防止死锁(默认5分钟)
  • + *
  • 基于 Caffeine 实现,高性能、线程安全
  • + *
  • 支持监控:查看当前锁状态、强制释放
  • + *
+ * + *

使用示例: + *

{@code
+ * // 方式1:try-finally(推荐)
+ * if (lockManager.tryLock("jenkins-sync", systemId)) {
+ *     try {
+ *         doSync();
+ *     } finally {
+ *         lockManager.unlock("jenkins-sync", systemId);
+ *     }
+ * }
+ *
+ * // 方式2:使用 AutoCloseable
+ * try (var lock = lockManager.lock("jenkins-sync", systemId)) {
+ *     if (lock.isAcquired()) {
+ *         doSync();
+ *     }
+ * }
+ * }
+ */ +@Slf4j +@Component +public class LocalLockManager { + + /** + * 默认锁超时时间:5分钟 + */ + private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5); + + /** + * 最大锁数量 + */ + private static final int MAX_LOCK_SIZE = 10000; + + /** + * 锁存储(key -> 锁持有者信息),Caffeine 自动过期 + */ + private Cache locks; + + @PostConstruct + public void init() { + locks = Caffeine.newBuilder() + .maximumSize(MAX_LOCK_SIZE) + .expireAfterWrite(DEFAULT_TIMEOUT) + .evictionListener((key, value, cause) -> { + if (value != null && cause.wasEvicted()) { + LockHolder holder = (LockHolder) value; + log.warn("锁已过期自动释放: key={}, thread={}, heldFor={}s", key, holder.threadName, holder.getHeldSeconds()); + } + }) + .build(); + log.info("LocalLockManager 初始化完成(超时={}分钟)", DEFAULT_TIMEOUT.toMinutes()); + } + + // ==================== 核心方法 ==================== + + /** + * 尝试获取锁 + * + * @param key 锁标识(如 "jenkins-sync:3") + * @return true-获取成功,false-锁已被持有 + */ + public boolean tryLock(String key) { + LockHolder newHolder = new LockHolder(); + LockHolder existing = locks.asMap().putIfAbsent(key, newHolder); + + if (existing != null) { + log.warn("获取锁失败,已被占用: key={}, holder={}, heldFor={}s", key, existing.threadName, existing.getHeldSeconds()); + return false; + } + + log.debug("获取锁成功: key={}, thread={}", key, newHolder.threadName); + return true; + } + + /** + * 尝试获取锁(组合 key) + * + * @param prefix 锁前缀(如 "jenkins-sync") + * @param id 资源ID + * @return true-获取成功,false-锁已被持有 + */ + public boolean tryLock(String prefix, Object id) { + return tryLock(buildKey(prefix, id)); + } + + /** + * 释放锁 + */ + public void unlock(String key) { + LockHolder holder = locks.asMap().remove(key); + if (holder != null) { + log.debug("释放锁: key={}, heldFor={}s", key, holder.getHeldSeconds()); + } + } + + /** + * 释放锁(组合 key) + */ + public void unlock(String prefix, Object id) { + unlock(buildKey(prefix, id)); + } + + /** + * 检查锁是否被持有 + */ + public boolean isLocked(String key) { + return locks.getIfPresent(key) != null; + } + + /** + * 检查锁是否被持有(组合 key) + */ + public boolean isLocked(String prefix, Object id) { + return isLocked(buildKey(prefix, id)); + } + + /** + * 获取锁(返回 AutoCloseable,支持 try-with-resources) + * + * @return LockHandle,调用 isAcquired() 检查是否成功 + */ + public LockHandle lock(String key) { + boolean acquired = tryLock(key); + return new LockHandle(key, acquired, this); + } + + /** + * 获取锁(组合 key,返回 AutoCloseable) + */ + public LockHandle lock(String prefix, Object id) { + return lock(buildKey(prefix, id)); + } + + // ==================== 监控方法 ==================== + + /** + * 获取当前锁数量 + */ + public long getLockCount() { + return locks.estimatedSize(); + } + + /** + * 获取所有锁的状态(用于监控) + */ + public List getLockStatus() { + List status = new ArrayList<>(); + ConcurrentMap map = locks.asMap(); + for (var entry : map.entrySet()) { + LockHolder holder = entry.getValue(); + status.add(String.format("key=%s, thread=%s, heldFor=%ds", entry.getKey(), holder.threadName, holder.getHeldSeconds())); + } + return status; + } + + /** + * 强制释放指定锁(紧急情况使用) + */ + public boolean forceUnlock(String key) { + LockHolder holder = locks.asMap().remove(key); + if (holder != null) { + log.warn("强制释放锁: key={}, thread={}, heldFor={}s", key, holder.threadName, holder.getHeldSeconds()); + return true; + } + return false; + } + + /** + * 强制释放所有锁(紧急情况使用) + */ + public int forceUnlockAll() { + int count = (int) locks.estimatedSize(); + if (count > 0) { + log.warn("强制释放所有锁,共 {} 个", count); + locks.invalidateAll(); + } + return count; + } + + // ==================== 内部类 ==================== + + private String buildKey(String prefix, Object id) { + return prefix + ":" + id; + } + + /** + * 锁持有者信息 + */ + private static class LockHolder { + final String threadName = Thread.currentThread().getName(); + + final Instant acquiredAt = Instant.now(); + + long getHeldSeconds() { + return Duration.between(acquiredAt, Instant.now()).getSeconds(); + } + } + + /** + * 锁句柄(支持 try-with-resources 自动释放) + */ + public static class LockHandle implements AutoCloseable { + private final String key; + + private final boolean acquired; + + private final LocalLockManager manager; + + LockHandle(String key, boolean acquired, LocalLockManager manager) { + this.key = key; + this.acquired = acquired; + this.manager = manager; + } + + public boolean isAcquired() { + return acquired; + } + + @Override + public void close() { + if (acquired) { + manager.unlock(key); + } + } + } +}