大声道撒旦

This commit is contained in:
dengqichen 2025-01-09 17:18:51 +08:00
parent 20df7a8488
commit d7fb6c8eaa
2 changed files with 75 additions and 61 deletions

View File

@ -35,4 +35,6 @@ public interface IRepositoryBranchRepository extends IBaseRepository<RepositoryB
Long countByProjectId(Long projectId); Long countByProjectId(Long projectId);
List<RepositoryBranch> findByExternalSystemIdAndProjectIdAndDeletedFalse(Long externalSystemId, Long id); List<RepositoryBranch> findByExternalSystemIdAndProjectIdAndDeletedFalse(Long externalSystemId, Long id);
List<RepositoryBranch> findByExternalSystemIdAndDeletedFalse(Long externalSystemId);
} }

View File

@ -27,6 +27,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ArrayList; import java.util.ArrayList;
@ -70,7 +72,9 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
private ThreadPoolTaskExecutor executor; private ThreadPoolTaskExecutor executor;
private static final int BATCH_SIZE = 100; private static final int BATCH_SIZE = 100;
private static final int MAX_RETRIES = 3; private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY = 100L; private static final long RETRY_DELAY = 100L;
@Transactional @Transactional
@ -121,54 +125,65 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
} }
log.info("Found {} projects to sync", projects.size()); log.info("Found {} projects to sync", projects.size());
AtomicInteger totalSyncedBranches = new AtomicInteger(0); // 4. 一次性获取所有现有分支
List<CompletableFuture<Void>> futures = new ArrayList<>(); List<RepositoryBranch> existingBranches = repositoryBranchRepository.findByExternalSystemIdAndDeletedFalse(externalSystemId);
List<RepositoryBranch> branchesToSave = Collections.synchronizedList(new ArrayList<>()); Map<String, Map<Long, RepositoryBranch>> existingBranchMap = existingBranches.stream()
.collect(Collectors.groupingBy(
RepositoryBranch::getName,
Collectors.toMap(
RepositoryBranch::getProjectId,
Function.identity(),
(existing, replacement) -> existing
)
));
log.info("Found {} existing branches in database", existingBranches.size());
// 4. 异步处理每个项目 AtomicInteger totalSyncedBranches = new AtomicInteger(0);
List<CompletableFuture<List<RepositoryBranch>>> futures = new ArrayList<>();
// 5. 并行处理每个项目
for (RepositoryProject project : projects) { for (RepositoryProject project : projects) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { CompletableFuture<List<RepositoryBranch>> future = CompletableFuture.supplyAsync(() -> {
try { try {
log.info("Syncing branches for project: {} (ID: {}, GitLab ID: {})", log.info("Syncing branches for project: {} (ID: {}, GitLab ID: {})", project.getName(), project.getId(), project.getProjectId());
project.getName(), project.getId(), project.getProjectId());
// 5.1 获取当前项目在GitLab上的所有分支
// 4.1 获取当前项目在GitLab上的所有分支使用GitLab的真实project_id
List<GitBranchResponse> remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId()); List<GitBranchResponse> remoteBranches = gitServiceIntegration.branches(externalSystem, project.getProjectId());
log.info("Found {} remote branches for project: {}", remoteBranches.size(), project.getName()); log.info("Found {} remote branches for project: {}", remoteBranches.size(), project.getName());
Set<String> remoteBranchNames = remoteBranches.stream()
.map(GitBranchResponse::getName)
.collect(Collectors.toSet());
log.info("Remote branch names: {}", remoteBranchNames);
// 4.2 获取数据库中的所有分支 List<RepositoryBranch> branchesToUpdate = new ArrayList<>();
Map<String, RepositoryBranch> existingBranchMap = repositoryBranchRepository Set<String> processedBranches = new HashSet<>();
.findByExternalSystemIdAndProjectIdAndDeletedFalse(externalSystemId, project.getId())
.stream()
.collect(Collectors.toMap(RepositoryBranch::getName, Function.identity()));
log.info("Found {} existing branches in database for project: {}", existingBranchMap.size(), project.getName());
log.info("Existing branch names: {}", existingBranchMap.keySet());
// 4.3 处理所有远程分支新增或更新 // 5.2 处理远程分支
for (GitBranchResponse remoteBranch : remoteBranches) { for (GitBranchResponse remoteBranch : remoteBranches) {
RepositoryBranch branch = updateOrCreateBranch(externalSystemId, project.getId(), remoteBranch, existingBranchMap); Map<Long, RepositoryBranch> projectBranches = existingBranchMap.getOrDefault(remoteBranch.getName(), new HashMap<>());
branchesToSave.add(branch); RepositoryBranch branch = projectBranches.getOrDefault(project.getId(), new RepositoryBranch());
// 更新分支信息
updateBranchInfo(branch, externalSystemId, project.getId(), project.getProjectId(), remoteBranch);
branchesToUpdate.add(branch);
processedBranches.add(remoteBranch.getName());
totalSyncedBranches.incrementAndGet(); totalSyncedBranches.incrementAndGet();
existingBranchMap.remove(remoteBranch.getName());
log.info("Processed remote branch: {}", remoteBranch.getName());
} }
// 4.4 处理已删除的分支 // 5.3 处理已删除的分支
existingBranchMap.forEach((branchName, branch) -> { Map<String, RepositoryBranch> projectBranches = existingBranchMap.values().stream()
branch.setDeleted(true); .flatMap(map -> map.values().stream())
branchesToSave.add(branch); .filter(b -> b.getProjectId().equals(project.getId()))
totalSyncedBranches.incrementAndGet(); .collect(Collectors.toMap(RepositoryBranch::getName, Function.identity()));
log.info("Marked branch as deleted: {}", branchName);
}); for (RepositoryBranch branch : projectBranches.values()) {
if (!processedBranches.contains(branch.getName())) {
branch.setDeleted(true);
branchesToUpdate.add(branch);
totalSyncedBranches.incrementAndGet();
}
}
log.info("Completed processing {} branches for project: {}", remoteBranches.size(), project.getName()); log.info("Processed {} branches for project: {}", branchesToUpdate.size(), project.getName());
return branchesToUpdate;
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to sync branches for project: {} (ID: {}, GitLab ID: {})", log.error("Failed to sync branches for project: {} (ID: {}, GitLab ID: {})",
project.getName(), project.getId(), project.getProjectId(), e); project.getName(), project.getId(), project.getProjectId(), e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -177,50 +192,49 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
futures.add(future); futures.add(future);
} }
// 等待所有异步任务完成 // 6. 等待所有异步任务完成并收集结果
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); List<RepositoryBranch> allBranchesToSave = futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
// 批量保存分支 // 7. 批量保存所有分支
if (!branchesToSave.isEmpty()) { if (!allBranchesToSave.isEmpty()) {
try { try {
log.info("Saving {} branches to database", branchesToSave.size()); log.info("Saving {} branches to database in batch", allBranchesToSave.size());
saveBatch(branchesToSave); saveBatch(allBranchesToSave);
} catch (Exception e) { } catch (Exception e) {
log.error("Error saving branches in batch", e); log.error("Error saving branches in batch", e);
throw e; throw e;
} }
} }
// 5. 更新同步历史为成功 // 8. 更新同步历史为成功
repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.SUCCESS, null); repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.SUCCESS, null);
log.info("Successfully synchronized {} branches for external system: {}", log.info("Successfully synchronized {} branches for external system: {}",
totalSyncedBranches.get(), externalSystem.getName()); totalSyncedBranches.get(), externalSystem.getName());
return totalSyncedBranches.get(); return totalSyncedBranches.get();
} catch (Exception e) { } catch (Exception e) {
// 6. 更新同步历史为失败 // 9. 更新同步历史为失败
log.error("Failed to sync branches", e); log.error("Failed to sync branches", e);
repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.FAILED, e.getMessage()); repositorySyncHistoryService.updateSyncHistory(syncHistory.getId(), ExternalSystemSyncStatus.FAILED, e.getMessage());
throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED); throw new BusinessException(ResponseCode.REPOSITORY_BRANCH_SYNC_FAILED);
} }
} }
private RepositoryBranch updateOrCreateBranch( /**
Long externalSystemId, * 更新分支信息
Long projectId, */
GitBranchResponse remoteBranch, private void updateBranchInfo(
Map<String, RepositoryBranch> existingBranchMap) { RepositoryBranch branch,
Long externalSystemId,
RepositoryBranch branch = existingBranchMap.getOrDefault(remoteBranch.getName(), new RepositoryBranch()); Long projectId,
Long gitProjectId,
// 获取对应的RepositoryProject以获取GitLab的真实project_id GitBranchResponse remoteBranch) {
RepositoryProject project = repositoryProjectRepository.findById(projectId)
.orElseThrow(() -> new BusinessException(ResponseCode.REPOSITORY_PROJECT_NOT_FOUND));
// 更新基本信息
branch.setExternalSystemId(externalSystemId); branch.setExternalSystemId(externalSystemId);
branch.setProjectId(projectId); // 设置我们数据库的project ID branch.setProjectId(projectId);
branch.setGitProjectId(project.getProjectId()); // 设置GitLab的真实project ID branch.setGitProjectId(gitProjectId);
branch.setName(remoteBranch.getName()); branch.setName(remoteBranch.getName());
branch.setIsDefaultBranch(remoteBranch.getIsDefaultBranch()); branch.setIsDefaultBranch(remoteBranch.getIsDefaultBranch());
branch.setCanPush(remoteBranch.getCanPush()); branch.setCanPush(remoteBranch.getCanPush());
@ -233,8 +247,6 @@ public class RepositoryBranchServiceImpl extends BaseServiceImpl<RepositoryBranc
branch.setWebUrl(remoteBranch.getWebUrl()); branch.setWebUrl(remoteBranch.getWebUrl());
branch.setLastUpdateTime(remoteBranch.getLastUpdateTime()); branch.setLastUpdateTime(remoteBranch.getLastUpdateTime());
branch.setLastCommitTime(remoteBranch.getLastCommitTime()); branch.setLastCommitTime(remoteBranch.getLastCommitTime());
return branch;
} }
@Override @Override