diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java index 42eadc51..5621c5cc 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/config/ThreadPoolConfig.java @@ -44,50 +44,82 @@ public class ThreadPoolConfig { return executor; } -// @Bean("repositorySyncExecutor") -// public ThreadPoolTaskExecutor repositorySyncExecutor() { -// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); -// -// // 核心线程数:CPU核心数 * 2 -// executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); -// -// // 最大线程数:CPU核心数 * 4 -// executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); -// -// // 队列容量:5000 -// executor.setQueueCapacity(5000); -// -// // 线程名前缀 -// executor.setThreadNamePrefix("repository-sync-"); -// -// // 线程空闲时间:60秒 -// executor.setKeepAliveSeconds(60); -// -// // 拒绝策略:由调用线程处理 -// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); -// -// // 等待所有任务完成再关闭线程池 -// executor.setWaitForTasksToCompleteOnShutdown(true); -// -// // 等待时间(秒) -// executor.setAwaitTerminationSeconds(60); -// -// executor.initialize(); -// return executor; -// } -// -// @Bean("applicationTaskExecutor") -// public AsyncTaskExecutor applicationTaskExecutor() { -// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); -// executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); -// executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); -// executor.setQueueCapacity(500); -// executor.setThreadNamePrefix("application-task-"); -// executor.setKeepAliveSeconds(60); -// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); -// executor.setWaitForTasksToCompleteOnShutdown(true); -// executor.setAwaitTerminationSeconds(60); -// executor.initialize(); -// return executor; -// } + @Bean("repositoryProjectExecutor") + public ThreadPoolTaskExecutor repositoryProjectExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:CPU核心数 * 2 + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); + + // 最大线程数:CPU核心数 * 4 + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); + + // 队列容量:根据平均任务执行时间和期望响应时间来设置 + executor.setQueueCapacity(100); + + // 线程名前缀 + executor.setThreadNamePrefix("repository-project-sync-"); + + // 线程空闲时间:超过核心线程数的线程在空闲60秒后会被销毁 + executor.setKeepAliveSeconds(60); + + // 拒绝策略:由调用线程处理 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 等待所有任务完成再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + + // 等待时间(秒) + executor.setAwaitTerminationSeconds(60); + + executor.initialize(); + return executor; + } + + @Bean("repositoryBranchExecutor") + public ThreadPoolTaskExecutor repositoryBranchExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:CPU核心数 * 2 + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); + + // 最大线程数:CPU核心数 * 4 + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); + + // 队列容量:根据平均任务执行时间和期望响应时间来设置 + executor.setQueueCapacity(100); + + // 线程名前缀 + executor.setThreadNamePrefix("repository-branch-sync-"); + + // 线程空闲时间:超过核心线程数的线程在空闲60秒后会被销毁 + executor.setKeepAliveSeconds(60); + + // 拒绝策略:由调用线程处理 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 等待所有任务完成再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + + // 等待时间(秒) + executor.setAwaitTerminationSeconds(60); + + executor.initialize(); + return executor; + } + + @Bean("applicationTaskExecutor") + public AsyncTaskExecutor applicationTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("application-task-"); + executor.setKeepAliveSeconds(60); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + return executor; + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java index 04698860..965038e6 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/RepositoryBranchServiceImpl.java @@ -22,11 +22,19 @@ import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService; import com.qqchen.deploy.backend.deploy.repository.IRepositorySyncHistoryRepository; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.orm.ObjectOptimisticLockingFailureException; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,6 +64,40 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl branches) { + int retryCount = 0; + while (retryCount < MAX_RETRIES) { + try { + for (int i = 0; i < branches.size(); i += BATCH_SIZE) { + int end = Math.min(i + BATCH_SIZE, branches.size()); + List batch = branches.subList(i, end); + repositoryBranchRepository.saveAll(batch); + } + return; + } catch (ObjectOptimisticLockingFailureException e) { + retryCount++; + if (retryCount >= MAX_RETRIES) { + throw e; + } + log.warn("Optimistic locking failure during batch save, retry attempt {}", retryCount); + try { + Thread.sleep(RETRY_DELAY * retryCount); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during retry delay", ie); + } + } + } + } + @Override @Transactional public Integer syncBranches(Long externalSystemId) { @@ -64,7 +106,8 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl new BusinessException(ResponseCode.EXTERNAL_SYSTEM_NOT_FOUND)); + ExternalSystem externalSystem = externalSystemRepository.findById(externalSystemId) + .orElseThrow(() -> new BusinessException(ResponseCode.EXTERNAL_SYSTEM_NOT_FOUND)); // 3. 获取所有项目 List projects = repositoryProjectRepository.findByExternalSystemIdAndDeletedFalse(externalSystemId); @@ -94,36 +137,54 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl> futures = new ArrayList<>(); + List branchesToSave = Collections.synchronizedList(new ArrayList<>()); + + // 6. 异步同步每个项目的分支 for (RepositoryProject project : projectsToSync) { - try { - // 6.1 获取远程分支信息 - List remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId()); - if (remoteBranches.isEmpty()) { - log.info("No branches found for project: {}", project.getName()); - continue; + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + // 6.1 获取远程分支信息 + List remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId()); + if (remoteBranches.isEmpty()) { + log.info("No branches found for project: {}", project.getName()); + return; + } + + // 6.2 获取本地已存在的分支 + Map existingBranchMap = repositoryBranchRepository + .findByExternalSystemIdAndProjectIdAndDeletedFalse(externalSystemId, project.getId()) + .stream() + .collect(Collectors.toMap(RepositoryBranch::getName, Function.identity())); + + // 6.3 更新或创建分支 + for (GitBranchResponse remoteBranch : remoteBranches) { + RepositoryBranch branch = updateOrCreateBranch(externalSystemId, project.getId(), remoteBranch, existingBranchMap); + branchesToSave.add(branch); + totalSyncedBranches.incrementAndGet(); + } + + log.info("Processed {} branches for project: {}", remoteBranches.size(), project.getName()); + } catch (Exception e) { + log.error("Failed to sync branches for project: {}", project.getName(), e); + throw new RuntimeException(e); } + }, executor); - // 6.2 获取本地已存在的分支 - List existingBranches = repositoryBranchRepository.findByExternalSystemIdAndProjectIdAndDeletedFalse( - externalSystemId, project.getId()); - Map existingBranchMap = existingBranches.stream() - .collect(Collectors.toMap(RepositoryBranch::getName, Function.identity())); + futures.add(future); + } - // 6.3 更新或创建分支 - List branchesToSave = remoteBranches.stream() - .map(remoteBranch -> updateOrCreateBranch(externalSystemId, project.getId(), remoteBranch, existingBranchMap)) - .collect(Collectors.toList()); + // 等待所有异步任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - // 6.4 保存分支 - repositoryBranchRepository.saveAll(branchesToSave); - totalSyncedBranches += branchesToSave.size(); - - log.info("Successfully synchronized {} branches for project: {}", - branchesToSave.size(), project.getName()); + // 批量保存分支 + if (!branchesToSave.isEmpty()) { + try { + saveBatch(branchesToSave); } catch (Exception e) { - log.error("Failed to sync branches for project: {}", project.getName(), e); + log.error("Error saving branches in batch", e); + throw e; } } @@ -131,8 +192,8 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl projects) { + int retryCount = 0; + while (retryCount < MAX_RETRIES) { + try { + for (int i = 0; i < projects.size(); i += BATCH_SIZE) { + int end = Math.min(i + BATCH_SIZE, projects.size()); + List batch = projects.subList(i, end); + repositoryProjectRepository.saveAll(batch); + } + return; + } catch (ObjectOptimisticLockingFailureException e) { + retryCount++; + if (retryCount >= MAX_RETRIES) { + throw e; + } + log.warn("Optimistic locking failure during batch save, retry attempt {}", retryCount); + try { + Thread.sleep(RETRY_DELAY * retryCount); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during retry delay", ie); + } + } + } + } + @Override @Transactional public Integer syncProjects(Long externalSystemId) { @@ -67,83 +108,97 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl existingProjects = repositoryProjectRepository - .findByExternalSystemId(externalSystemId) - .stream() - .collect(Collectors.toMap( - RepositoryProject::getProjectId, - Function.identity(), - (existing, replacement) -> { - log.warn("Duplicate project found with ID: {}, path: {}", - existing.getProjectId(), existing.getPathWithNamespace()); - return existing; - } - )); + Map existingProjects = Collections.synchronizedMap( + repositoryProjectRepository + .findByExternalSystemId(externalSystemId) + .stream() + .collect(Collectors.toMap( + RepositoryProject::getProjectId, + Function.identity(), + (existing, replacement) -> { + log.warn("Duplicate project found with ID: {}, path: {}", + existing.getProjectId(), existing.getPathWithNamespace()); + return existing; + } + )) + ); // 4. 用于跟踪已处理的项目ID - Set processedProjectIds = new HashSet<>(); - int totalCount = 0; - List projectsToSave = new ArrayList<>(); + Set processedProjectIds = ConcurrentHashMap.newKeySet(); + AtomicInteger totalCount = new AtomicInteger(0); + List> futures = new ArrayList<>(); - // 5. 遍历每个组,同步项目 + // 使用线程安全的列表来存储待保存的项目 + List projectsToSave = Collections.synchronizedList(new ArrayList<>()); + + // 5. 遍历每个组,异步同步项目 for (RepositoryGroup group : groups) { - List projectResponses = gitServiceIntegration.projectsByGroup(externalSystem, group.getGroupId()); - log.info("Processing group: {} (ID: {}), found {} projects", - group.getName(), group.getGroupId(), projectResponses.size()); - - for (GitProjectResponse projectResponse : projectResponses) { - if (processedProjectIds.contains(projectResponse.getId())) { - log.info("Project already processed: {} (ID: {}) in group: {}", - projectResponse.getPathWithNamespace(), projectResponse.getId(), group.getName()); - continue; - } - processedProjectIds.add(projectResponse.getId()); + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + List projectResponses = gitServiceIntegration.projectsByGroup(externalSystem, group.getGroupId()); + log.info("Processing group: {} (ID: {}), found {} projects", + group.getName(), group.getGroupId(), projectResponses.size()); - RepositoryProject project = existingProjects.get(projectResponse.getId()); - if (project == null) { - project = new RepositoryProject(); - project.setExternalSystemId(externalSystemId); - project.setProjectId(projectResponse.getId()); - log.info("Creating new project: {} (ID: {})", - projectResponse.getPathWithNamespace(), projectResponse.getId()); - totalCount++; - } else { - log.debug("Updating existing project: {} (ID: {})", - projectResponse.getPathWithNamespace(), projectResponse.getId()); - } + for (GitProjectResponse projectResponse : projectResponses) { + if (!processedProjectIds.add(projectResponse.getId())) { + log.info("Project already processed: {} (ID: {}) in group: {}", + projectResponse.getPathWithNamespace(), projectResponse.getId(), group.getName()); + continue; + } - project.setDeleted(false); - project.setName(projectResponse.getName()); - project.setPath(projectResponse.getPath()); - project.setDescription(projectResponse.getDescription()); - project.setVisibility(projectResponse.getVisibility()); - project.setGroupId(group.getId()); - project.setDefaultBranch(projectResponse.getDefaultBranch()); - project.setWebUrl(projectResponse.getWebUrl()); - project.setSshUrl(projectResponse.getSshUrlToRepo()); - project.setHttpUrl(projectResponse.getHttpUrlToRepo()); - project.setLastActivityAt(projectResponse.getLastActivityAt()); - project.setNameWithNamespace(projectResponse.getNameWithNamespace()); - project.setPathWithNamespace(projectResponse.getPathWithNamespace()); - project.setCreatedAt(projectResponse.getCreatedAt()); - - projectsToSave.add(project); - existingProjects.remove(projectResponse.getId()); + RepositoryProject project = existingProjects.get(projectResponse.getId()); + if (project == null) { + project = new RepositoryProject(); + project.setExternalSystemId(externalSystemId); + project.setProjectId(projectResponse.getId()); + log.info("Creating new project: {} (ID: {})", + projectResponse.getPathWithNamespace(), projectResponse.getId()); + totalCount.incrementAndGet(); + } else { + log.debug("Updating existing project: {} (ID: {})", + projectResponse.getPathWithNamespace(), projectResponse.getId()); + } - // 每100个项目批量保存一次,避免内存占用过大 - if (projectsToSave.size() >= 100) { - log.info("Batch saving {} projects", projectsToSave.size()); - repositoryProjectRepository.saveAll(projectsToSave); - projectsToSave.clear(); + project.setDeleted(false); + project.setName(projectResponse.getName()); + project.setPath(projectResponse.getPath()); + project.setDescription(projectResponse.getDescription()); + project.setVisibility(projectResponse.getVisibility()); + project.setGroupId(group.getId()); + project.setDefaultBranch(projectResponse.getDefaultBranch()); + project.setWebUrl(projectResponse.getWebUrl()); + project.setSshUrl(projectResponse.getSshUrlToRepo()); + project.setHttpUrl(projectResponse.getHttpUrlToRepo()); + project.setLastActivityAt(projectResponse.getLastActivityAt()); + project.setNameWithNamespace(projectResponse.getNameWithNamespace()); + project.setPathWithNamespace(projectResponse.getPathWithNamespace()); + project.setCreatedAt(projectResponse.getCreatedAt()); + + projectsToSave.add(project); + synchronized (existingProjects) { + existingProjects.remove(projectResponse.getId()); + } + } + } catch (Exception e) { + log.error("Error processing group {}: {}", group.getId(), e.getMessage(), e); + throw new CompletionException(e); } - } + }, executor); + + futures.add(future); } - // 保存剩余的项目 + // 等待所有异步任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + // 批量保存项目 if (!projectsToSave.isEmpty()) { - log.info("Batch saving remaining {} projects", projectsToSave.size()); - repositoryProjectRepository.saveAll(projectsToSave); - projectsToSave.clear(); + try { + saveBatch(projectsToSave); + } catch (Exception e) { + log.error("Error saving projects in batch", e); + throw e; + } } // 6. 删除不存在的项目 @@ -158,9 +213,9 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl findByExternalSystemId(Long externalSystemId) { return repositoryProjectConverter.toDtoList(repositoryProjectRepository.findByExternalSystemId(externalSystemId)); } - } \ No newline at end of file