大声道撒旦

This commit is contained in:
dengqichen 2025-01-09 15:47:27 +08:00
parent 285d7e5189
commit 914ad0d1a1
3 changed files with 292 additions and 145 deletions

View File

@ -44,50 +44,82 @@ public class ThreadPoolConfig {
return executor; return executor;
} }
// @Bean("repositorySyncExecutor") @Bean("repositoryProjectExecutor")
// public ThreadPoolTaskExecutor repositorySyncExecutor() { public ThreadPoolTaskExecutor repositoryProjectExecutor() {
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//
// // 核心线程数CPU核心数 * 2 // 核心线程数CPU核心数 * 2
// executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
//
// // 最大线程数CPU核心数 * 4 // 最大线程数CPU核心数 * 4
// executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
//
// // 队列容量5000 // 队列容量根据平均任务执行时间和期望响应时间来设置
// executor.setQueueCapacity(5000); executor.setQueueCapacity(100);
//
// // 线程名前缀 // 线程名前缀
// executor.setThreadNamePrefix("repository-sync-"); executor.setThreadNamePrefix("repository-project-sync-");
//
// // 线程空闲时间60秒 // 线程空闲时间超过核心线程数的线程在空闲60秒后会被销毁
// executor.setKeepAliveSeconds(60); executor.setKeepAliveSeconds(60);
//
// // 拒绝策略由调用线程处理 // 拒绝策略由调用线程处理
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//
// // 等待所有任务完成再关闭线程池 // 等待所有任务完成再关闭线程池
// executor.setWaitForTasksToCompleteOnShutdown(true); executor.setWaitForTasksToCompleteOnShutdown(true);
//
// // 等待时间 // 等待时间
// executor.setAwaitTerminationSeconds(60); executor.setAwaitTerminationSeconds(60);
//
// executor.initialize(); executor.initialize();
// return executor; return executor;
// } }
//
// @Bean("applicationTaskExecutor") @Bean("repositoryBranchExecutor")
// public AsyncTaskExecutor applicationTaskExecutor() { public ThreadPoolTaskExecutor repositoryBranchExecutor() {
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
// executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // 核心线程数CPU核心数 * 2
// executor.setQueueCapacity(500); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
// executor.setThreadNamePrefix("application-task-");
// executor.setKeepAliveSeconds(60); // 最大线程数CPU核心数 * 4
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// executor.setWaitForTasksToCompleteOnShutdown(true);
// executor.setAwaitTerminationSeconds(60); // 队列容量根据平均任务执行时间和期望响应时间来设置
// executor.initialize(); executor.setQueueCapacity(100);
// return executor;
// } // 线程名前缀
executor.setThreadNamePrefix("repository-branch-sync-");
// 线程空闲时间超过核心线程数的线程在空闲60秒后会被销毁
executor.setKeepAliveSeconds(60);
// 拒绝策略由调用线程处理
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务完成再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean("applicationTaskExecutor")
public AsyncTaskExecutor applicationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("application-task-");
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
} }

View File

@ -22,11 +22,19 @@ import com.qqchen.deploy.backend.deploy.service.IRepositorySyncHistoryService;
import com.qqchen.deploy.backend.deploy.repository.IRepositorySyncHistoryRepository; import com.qqchen.deploy.backend.deploy.repository.IRepositorySyncHistoryRepository;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -56,6 +64,40 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
@Resource @Resource
private IRepositorySyncHistoryRepository repositorySyncHistoryRepository; private IRepositorySyncHistoryRepository repositorySyncHistoryRepository;
@Resource(name = "repositoryBranchExecutor")
private ThreadPoolTaskExecutor executor;
private static final int BATCH_SIZE = 100;
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY = 100L;
@Transactional
public void saveBatch(List<RepositoryBranch> branches) {
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
for (int i = 0; i < branches.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, branches.size());
List<RepositoryBranch> batch = branches.subList(i, end);
repositoryBranchRepository.saveAll(batch);
}
return;
} catch (ObjectOptimisticLockingFailureException e) {
retryCount++;
if (retryCount >= MAX_RETRIES) {
throw e;
}
log.warn("Optimistic locking failure during batch save, retry attempt {}", retryCount);
try {
Thread.sleep(RETRY_DELAY * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry delay", ie);
}
}
}
}
@Override @Override
@Transactional @Transactional
public Integer syncBranches(Long externalSystemId) { public Integer syncBranches(Long externalSystemId) {
@ -64,7 +106,8 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
try { try {
// 2. 获取外部系统信息 // 2. 获取外部系统信息
ExternalSystem externalSystem = externalSystemRepository.findById(externalSystemId).orElseThrow(() -> new BusinessException(ResponseCode.EXTERNAL_SYSTEM_NOT_FOUND)); ExternalSystem externalSystem = externalSystemRepository.findById(externalSystemId)
.orElseThrow(() -> new BusinessException(ResponseCode.EXTERNAL_SYSTEM_NOT_FOUND));
// 3. 获取所有项目 // 3. 获取所有项目
List<RepositoryProject> projects = repositoryProjectRepository.findByExternalSystemIdAndDeletedFalse(externalSystemId); List<RepositoryProject> projects = repositoryProjectRepository.findByExternalSystemIdAndDeletedFalse(externalSystemId);
@ -94,36 +137,54 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
return 0; return 0;
} }
int totalSyncedBranches = 0; AtomicInteger totalSyncedBranches = new AtomicInteger(0);
// 6. 同步需要更新的项目的分支 List<CompletableFuture<Void>> futures = new ArrayList<>();
List<RepositoryBranch> branchesToSave = Collections.synchronizedList(new ArrayList<>());
// 6. 异步同步每个项目的分支
for (RepositoryProject project : projectsToSync) { for (RepositoryProject project : projectsToSync) {
try { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 6.1 获取远程分支信息 try {
List<GitBranchResponse> remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId()); // 6.1 获取远程分支信息
if (remoteBranches.isEmpty()) { List<GitBranchResponse> remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId());
log.info("No branches found for project: {}", project.getName()); if (remoteBranches.isEmpty()) {
continue; log.info("No branches found for project: {}", project.getName());
return;
}
// 6.2 获取本地已存在的分支
Map<String, RepositoryBranch> existingBranchMap = repositoryBranchRepository
.findByExternalSystemIdAndProjectIdAndDeletedFalse(externalSystemId, project.getId())
.stream()
.collect(Collectors.toMap(RepositoryBranch::getName, Function.identity()));
// 6.3 更新或创建分支
for (GitBranchResponse remoteBranch : remoteBranches) {
RepositoryBranch branch = updateOrCreateBranch(externalSystemId, project.getId(), remoteBranch, existingBranchMap);
branchesToSave.add(branch);
totalSyncedBranches.incrementAndGet();
}
log.info("Processed {} branches for project: {}", remoteBranches.size(), project.getName());
} catch (Exception e) {
log.error("Failed to sync branches for project: {}", project.getName(), e);
throw new RuntimeException(e);
} }
}, executor);
// 6.2 获取本地已存在的分支 futures.add(future);
List<RepositoryBranch> existingBranches = repositoryBranchRepository.findByExternalSystemIdAndProjectIdAndDeletedFalse( }
externalSystemId, project.getId());
Map<String, RepositoryBranch> existingBranchMap = existingBranches.stream()
.collect(Collectors.toMap(RepositoryBranch::getName, Function.identity()));
// 6.3 更新或创建分支 // 等待所有异步任务完成
List<RepositoryBranch> branchesToSave = remoteBranches.stream() CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
.map(remoteBranch -> updateOrCreateBranch(externalSystemId, project.getId(), remoteBranch, existingBranchMap))
.collect(Collectors.toList());
// 6.4 保存分支 // 批量保存分支
repositoryBranchRepository.saveAll(branchesToSave); if (!branchesToSave.isEmpty()) {
totalSyncedBranches += branchesToSave.size(); try {
saveBatch(branchesToSave);
log.info("Successfully synchronized {} branches for project: {}",
branchesToSave.size(), project.getName());
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to sync branches for project: {}", project.getName(), e); log.error("Error saving branches in batch", e);
throw e;
} }
} }
@ -131,8 +192,8 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.SUCCESS, null); repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.SUCCESS, null);
log.info("Successfully synchronized total {} branches for external system: {}", log.info("Successfully synchronized total {} branches for external system: {}",
totalSyncedBranches, externalSystem.getName()); totalSyncedBranches.get(), externalSystem.getName());
return totalSyncedBranches; return totalSyncedBranches.get();
} catch (Exception e) { } catch (Exception e) {
// 8. 更新同步历史为失败 // 8. 更新同步历史为失败
repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.FAILED, e.getMessage()); repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.FAILED, e.getMessage());
@ -176,4 +237,4 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
public Long countByProjectId(Long projectId) { public Long countByProjectId(Long projectId) {
return repositoryBranchRepository.countByProjectId(projectId); return repositoryBranchRepository.countByProjectId(projectId);
} }
} }

View File

@ -17,15 +17,22 @@ import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl; import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Collections;
/** /**
* Git仓库项目服务实现 * Git仓库项目服务实现
@ -49,6 +56,40 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
@Resource @Resource
private RepositoryProjectConverter repositoryProjectConverter; private RepositoryProjectConverter repositoryProjectConverter;
@Resource(name = "repositoryProjectExecutor")
private ThreadPoolTaskExecutor executor;
private static final int BATCH_SIZE = 100;
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY = 100L;
@Transactional
public void saveBatch(List<RepositoryProject> projects) {
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
for (int i = 0; i < projects.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, projects.size());
List<RepositoryProject> batch = projects.subList(i, end);
repositoryProjectRepository.saveAll(batch);
}
return;
} catch (ObjectOptimisticLockingFailureException e) {
retryCount++;
if (retryCount >= MAX_RETRIES) {
throw e;
}
log.warn("Optimistic locking failure during batch save, retry attempt {}", retryCount);
try {
Thread.sleep(RETRY_DELAY * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry delay", ie);
}
}
}
}
@Override @Override
@Transactional @Transactional
public Integer syncProjects(Long externalSystemId) { public Integer syncProjects(Long externalSystemId) {
@ -67,83 +108,97 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
log.info("Found {} groups to sync", groups.size()); log.info("Found {} groups to sync", groups.size());
// 3. 获取现有项目映射包括已删除的 // 3. 获取现有项目映射包括已删除的
Map<Long, RepositoryProject> existingProjects = repositoryProjectRepository Map<Long, RepositoryProject> existingProjects = Collections.synchronizedMap(
.findByExternalSystemId(externalSystemId) repositoryProjectRepository
.stream() .findByExternalSystemId(externalSystemId)
.collect(Collectors.toMap( .stream()
RepositoryProject::getProjectId, .collect(Collectors.toMap(
Function.identity(), RepositoryProject::getProjectId,
(existing, replacement) -> { Function.identity(),
log.warn("Duplicate project found with ID: {}, path: {}", (existing, replacement) -> {
existing.getProjectId(), existing.getPathWithNamespace()); log.warn("Duplicate project found with ID: {}, path: {}",
return existing; existing.getProjectId(), existing.getPathWithNamespace());
} return existing;
)); }
))
);
// 4. 用于跟踪已处理的项目ID // 4. 用于跟踪已处理的项目ID
Set<Long> processedProjectIds = new HashSet<>(); Set<Long> processedProjectIds = ConcurrentHashMap.newKeySet();
int totalCount = 0; AtomicInteger totalCount = new AtomicInteger(0);
List<RepositoryProject> projectsToSave = new ArrayList<>(); List<CompletableFuture<Void>> futures = new ArrayList<>();
// 5. 遍历每个组同步项目 // 使用线程安全的列表来存储待保存的项目
List<RepositoryProject> projectsToSave = Collections.synchronizedList(new ArrayList<>());
// 5. 遍历每个组异步同步项目
for (RepositoryGroup group : groups) { for (RepositoryGroup group : groups) {
List<GitProjectResponse> projectResponses = gitServiceIntegration.projectsByGroup(externalSystem, group.getGroupId()); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("Processing group: {} (ID: {}), found {} projects", try {
group.getName(), group.getGroupId(), projectResponses.size()); List<GitProjectResponse> projectResponses = gitServiceIntegration.projectsByGroup(externalSystem, group.getGroupId());
log.info("Processing group: {} (ID: {}), found {} projects",
for (GitProjectResponse projectResponse : projectResponses) { group.getName(), group.getGroupId(), projectResponses.size());
if (processedProjectIds.contains(projectResponse.getId())) {
log.info("Project already processed: {} (ID: {}) in group: {}",
projectResponse.getPathWithNamespace(), projectResponse.getId(), group.getName());
continue;
}
processedProjectIds.add(projectResponse.getId());
RepositoryProject project = existingProjects.get(projectResponse.getId()); for (GitProjectResponse projectResponse : projectResponses) {
if (project == null) { if (!processedProjectIds.add(projectResponse.getId())) {
project = new RepositoryProject(); log.info("Project already processed: {} (ID: {}) in group: {}",
project.setExternalSystemId(externalSystemId); projectResponse.getPathWithNamespace(), projectResponse.getId(), group.getName());
project.setProjectId(projectResponse.getId()); continue;
log.info("Creating new project: {} (ID: {})", }
projectResponse.getPathWithNamespace(), projectResponse.getId());
totalCount++;
} else {
log.debug("Updating existing project: {} (ID: {})",
projectResponse.getPathWithNamespace(), projectResponse.getId());
}
project.setDeleted(false); RepositoryProject project = existingProjects.get(projectResponse.getId());
project.setName(projectResponse.getName()); if (project == null) {
project.setPath(projectResponse.getPath()); project = new RepositoryProject();
project.setDescription(projectResponse.getDescription()); project.setExternalSystemId(externalSystemId);
project.setVisibility(projectResponse.getVisibility()); project.setProjectId(projectResponse.getId());
project.setGroupId(group.getId()); log.info("Creating new project: {} (ID: {})",
project.setDefaultBranch(projectResponse.getDefaultBranch()); projectResponse.getPathWithNamespace(), projectResponse.getId());
project.setWebUrl(projectResponse.getWebUrl()); totalCount.incrementAndGet();
project.setSshUrl(projectResponse.getSshUrlToRepo()); } else {
project.setHttpUrl(projectResponse.getHttpUrlToRepo()); log.debug("Updating existing project: {} (ID: {})",
project.setLastActivityAt(projectResponse.getLastActivityAt()); projectResponse.getPathWithNamespace(), projectResponse.getId());
project.setNameWithNamespace(projectResponse.getNameWithNamespace()); }
project.setPathWithNamespace(projectResponse.getPathWithNamespace());
project.setCreatedAt(projectResponse.getCreatedAt());
projectsToSave.add(project);
existingProjects.remove(projectResponse.getId());
// 每100个项目批量保存一次避免内存占用过大 project.setDeleted(false);
if (projectsToSave.size() >= 100) { project.setName(projectResponse.getName());
log.info("Batch saving {} projects", projectsToSave.size()); project.setPath(projectResponse.getPath());
repositoryProjectRepository.saveAll(projectsToSave); project.setDescription(projectResponse.getDescription());
projectsToSave.clear(); project.setVisibility(projectResponse.getVisibility());
project.setGroupId(group.getId());
project.setDefaultBranch(projectResponse.getDefaultBranch());
project.setWebUrl(projectResponse.getWebUrl());
project.setSshUrl(projectResponse.getSshUrlToRepo());
project.setHttpUrl(projectResponse.getHttpUrlToRepo());
project.setLastActivityAt(projectResponse.getLastActivityAt());
project.setNameWithNamespace(projectResponse.getNameWithNamespace());
project.setPathWithNamespace(projectResponse.getPathWithNamespace());
project.setCreatedAt(projectResponse.getCreatedAt());
projectsToSave.add(project);
synchronized (existingProjects) {
existingProjects.remove(projectResponse.getId());
}
}
} catch (Exception e) {
log.error("Error processing group {}: {}", group.getId(), e.getMessage(), e);
throw new CompletionException(e);
} }
} }, executor);
futures.add(future);
} }
// 保存剩余的项目 // 等待所有异步任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 批量保存项目
if (!projectsToSave.isEmpty()) { if (!projectsToSave.isEmpty()) {
log.info("Batch saving remaining {} projects", projectsToSave.size()); try {
repositoryProjectRepository.saveAll(projectsToSave); saveBatch(projectsToSave);
projectsToSave.clear(); } catch (Exception e) {
log.error("Error saving projects in batch", e);
throw e;
}
} }
// 6. 删除不存在的项目 // 6. 删除不存在的项目
@ -158,9 +213,9 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
} }
} }
log.info("Successfully synced projects. Added {} new projects, processed {} total projects", log.info("Successfully synced projects. Added {} new projects, processed {} total projects",
totalCount, processedProjectIds.size()); totalCount.get(), processedProjectIds.size());
return totalCount; return totalCount.get();
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to sync repository projects for external system: {}", externalSystemId, e); log.error("Failed to sync repository projects for external system: {}", externalSystemId, e);
throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED); throw new BusinessException(ResponseCode.REPOSITORY_PROJECT_SYNC_FAILED);
@ -176,5 +231,4 @@ public class RepositoryProjectServiceImpl extends BaseServiceImpl<RepositoryProj
public List<RepositoryProjectDTO> findByExternalSystemId(Long externalSystemId) { public List<RepositoryProjectDTO> findByExternalSystemId(Long externalSystemId) {
return repositoryProjectConverter.toDtoList(repositoryProjectRepository.findByExternalSystemId(externalSystemId)); return repositoryProjectConverter.toDtoList(repositoryProjectRepository.findByExternalSystemId(externalSystemId));
} }
} }