大声道撒旦
This commit is contained in:
parent
5e85c51bc4
commit
dab7eb177d
@ -0,0 +1,45 @@
|
||||
package com.qqchen.deploy.backend.deploy.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Configuration
|
||||
@EnableAsync
|
||||
public class ThreadPoolConfig {
|
||||
|
||||
@Bean("jenkinsTaskExecutor")
|
||||
public ThreadPoolTaskExecutor jenkinsTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
|
||||
// 核心线程数:CPU核心数 + 1
|
||||
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
|
||||
|
||||
// 最大线程数:CPU核心数 * 2
|
||||
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
|
||||
|
||||
// 队列容量:根据平均任务执行时间和期望响应时间来设置
|
||||
executor.setQueueCapacity(50);
|
||||
|
||||
// 线程名前缀
|
||||
executor.setThreadNamePrefix("jenkins-sync-");
|
||||
|
||||
// 线程空闲时间:超过核心线程数的线程在空闲60秒后会被销毁
|
||||
executor.setKeepAliveSeconds(60);
|
||||
|
||||
// 拒绝策略:由调用线程处理
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
|
||||
// 等待所有任务完成再关闭线程池
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
|
||||
// 等待时间(秒)
|
||||
executor.setAwaitTerminationSeconds(60);
|
||||
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@ -75,19 +75,6 @@ public class ExternalSystem extends Entity<Long> {
|
||||
*/
|
||||
private String token;
|
||||
|
||||
/**
|
||||
* 最后同步状态
|
||||
*/
|
||||
@Column(name = "sync_status")
|
||||
@Enumerated(EnumType.STRING)
|
||||
private ExternalSystemSyncStatusEnum syncStatus;
|
||||
|
||||
/**
|
||||
* 最后同步时间
|
||||
*/
|
||||
@Column(name = "last_sync_time")
|
||||
private LocalDateTime lastSyncTime;
|
||||
|
||||
/**
|
||||
* 最近连接成功时间
|
||||
*/
|
||||
|
||||
@ -26,6 +26,7 @@ import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.service.impl.BaseServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@ -35,6 +36,11 @@ import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Jenkins构建信息 Service实现
|
||||
@ -65,6 +71,9 @@ public class JenkinsBuildServiceImpl extends BaseServiceImpl<JenkinsBuild, Jenki
|
||||
@Resource
|
||||
private IJenkinsSyncHistoryService jenkinsSyncHistoryService;
|
||||
|
||||
@Resource(name = "jenkinsTaskExecutor")
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Integer syncAllBuilds(Long externalSystemId) {
|
||||
@ -134,18 +143,34 @@ public class JenkinsBuildServiceImpl extends BaseServiceImpl<JenkinsBuild, Jenki
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 2. 同步每个任务的构建信息
|
||||
int totalSyncedBuilds = 0;
|
||||
for (JenkinsJob job : jobs) {
|
||||
try {
|
||||
Integer syncedBuilds = syncBuilds(externalSystem, job);
|
||||
totalSyncedBuilds += syncedBuilds;
|
||||
log.info("Successfully synchronized {} builds for job: {}", syncedBuilds, job.getJobName());
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to sync builds for job: {}", job.getJobName(), e);
|
||||
}
|
||||
}
|
||||
// 2. 使用线程池并发同步每个任务的构建信息
|
||||
List<CompletableFuture<Integer>> futures = jobs.stream()
|
||||
.map(job -> CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
Integer syncedBuilds = syncBuilds(externalSystem, job);
|
||||
log.info("Successfully synchronized {} builds for job: {}", syncedBuilds, job.getJobName());
|
||||
return syncedBuilds;
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to sync builds for job: {}", job.getJobName(), e);
|
||||
return 0;
|
||||
}
|
||||
}, threadPoolTaskExecutor))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 3. 等待所有任务完成并汇总结果
|
||||
int totalSyncedBuilds = futures.stream()
|
||||
.map(future -> {
|
||||
try {
|
||||
return future.get(5, TimeUnit.MINUTES);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get sync result", e);
|
||||
return 0;
|
||||
}
|
||||
})
|
||||
.mapToInt(Integer::intValue)
|
||||
.sum();
|
||||
|
||||
log.info("Successfully synchronized total {} builds for view: {}", totalSyncedBuilds, view.getViewName());
|
||||
return totalSyncedBuilds;
|
||||
}
|
||||
|
||||
@ -153,40 +178,57 @@ public class JenkinsBuildServiceImpl extends BaseServiceImpl<JenkinsBuild, Jenki
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Integer syncBuilds(ExternalSystem externalSystem, JenkinsJob job) {
|
||||
JenkinsJobResponse queryJob = jenkinsServiceIntegration.job(externalSystem, job.getJobName());
|
||||
if (queryJob == null || queryJob.getLastBuild() == null) {
|
||||
log.info("No builds found for job: {}", job.getJobName());
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Check if sync is needed
|
||||
if (job.getLastBuildNumber() != null && queryJob.getLastBuild().getNumber() <= job.getLastBuildNumber()) {
|
||||
log.info("No new builds to sync for job: {}", job.getJobName());
|
||||
return 0;
|
||||
}
|
||||
|
||||
List<JenkinsBuildResponse> buildResponses = jenkinsServiceIntegration.listBuilds(externalSystem, job.getJobName());
|
||||
if (buildResponses.isEmpty()) {
|
||||
log.info("No builds found for job: {}", job.getJobName());
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 3. 转换并保存/更新构建数据
|
||||
// Convert and save/update build data
|
||||
List<JenkinsBuild> jenkinsBuilds = new ArrayList<>();
|
||||
for (JenkinsBuildResponse buildResponse : buildResponses) {
|
||||
// 查找是否存在相同的构建
|
||||
Optional<JenkinsBuild> existingBuild = jenkinsBuildRepository.findByExternalSystemIdAndJobIdAndBuildNumber(externalSystem.getId(), job.getId(), buildResponse.getNumber());
|
||||
|
||||
JenkinsBuild jenkinsBuild;
|
||||
if (existingBuild.isPresent()) {
|
||||
// 更新已存在的构建
|
||||
jenkinsBuild = existingBuild.get();
|
||||
updateBuildFromResponse(jenkinsBuild, buildResponse);
|
||||
log.debug("Updating existing Jenkins build: {} for job: {}", jenkinsBuild.getBuildNumber(), job.getJobName());
|
||||
} else {
|
||||
// 创建新的构建
|
||||
jenkinsBuild = new JenkinsBuild();
|
||||
jenkinsBuild.setExternalSystemId(externalSystem.getId());
|
||||
jenkinsBuild.setJobId(job.getId());
|
||||
jenkinsBuild.setBuildNumber(buildResponse.getNumber());
|
||||
updateBuildFromResponse(jenkinsBuild, buildResponse);
|
||||
log.debug("Creating new Jenkins build: {} for job: {}", jenkinsBuild.getBuildNumber(), job.getJobName());
|
||||
// Skip existing builds
|
||||
if (job.getLastBuildNumber() != null && buildResponse.getNumber() <= job.getLastBuildNumber()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create new build
|
||||
JenkinsBuild jenkinsBuild = new JenkinsBuild();
|
||||
jenkinsBuild.setExternalSystemId(externalSystem.getId());
|
||||
jenkinsBuild.setJobId(job.getId());
|
||||
jenkinsBuild.setBuildNumber(buildResponse.getNumber());
|
||||
updateBuildFromResponse(jenkinsBuild, buildResponse);
|
||||
jenkinsBuilds.add(jenkinsBuild);
|
||||
log.debug("Creating new Jenkins build: {} for job: {}", jenkinsBuild.getBuildNumber(), job.getJobName());
|
||||
}
|
||||
|
||||
// 4. 批量保存或更新
|
||||
jenkinsBuildRepository.saveAll(jenkinsBuilds);
|
||||
|
||||
log.info("Successfully synchronized {} Jenkins builds for job: {}", jenkinsBuilds.size(), job.getJobName());
|
||||
// Batch save
|
||||
if (!jenkinsBuilds.isEmpty()) {
|
||||
jenkinsBuildRepository.saveAll(jenkinsBuilds);
|
||||
|
||||
// Update job's last build info
|
||||
job.setLastBuildNumber(queryJob.getLastBuild().getNumber());
|
||||
if (queryJob.getLastBuild().getTimestamp() != null) {
|
||||
job.setLastBuildTime(LocalDateTime.ofInstant(
|
||||
Instant.ofEpochMilli(queryJob.getLastBuild().getTimestamp()),
|
||||
ZoneId.systemDefault()
|
||||
));
|
||||
}
|
||||
jenkinsJobRepository.save(job);
|
||||
|
||||
log.info("Successfully synchronized {} builds for job: {}", jenkinsBuilds.size(), job.getJobName());
|
||||
}
|
||||
|
||||
return jenkinsBuilds.size();
|
||||
}
|
||||
|
||||
@ -39,10 +39,6 @@ public class ExternalSystemDTO extends BaseDTO {
|
||||
|
||||
private String token;
|
||||
|
||||
private ExternalSystemSyncStatusEnum syncStatus;
|
||||
|
||||
private LocalDateTime lastSyncTime;
|
||||
|
||||
private LocalDateTime lastConnectTime;
|
||||
|
||||
private String config;
|
||||
|
||||
@ -23,9 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
@Component
|
||||
public class DeployNodeDelegate extends BaseNodeDelegate<DeployNodePanelVariables, DeployNodeLocalVariables> {
|
||||
|
||||
@Resource
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@Resource
|
||||
private IJenkinsServiceIntegration jenkinsServiceIntegration;
|
||||
|
||||
@ -39,11 +36,6 @@ public class DeployNodeDelegate extends BaseNodeDelegate<DeployNodePanelVariable
|
||||
// 最大轮询次数
|
||||
private static final int MAX_BUILD_POLLS = 180; // 30分钟超时
|
||||
|
||||
// 用于存储实时输出的Map
|
||||
private static final Map<String, StringBuilder> outputMap = new ConcurrentHashMap<>();
|
||||
|
||||
private static final Map<String, StringBuilder> errorMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected Class<DeployNodePanelVariables> getPanelVariablesClass() {
|
||||
return DeployNodePanelVariables.class;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user