From 04f1cbd251b6012e13d87a17f3e25604398c9f64 Mon Sep 17 00:00:00 2001 From: dengqichen Date: Sun, 7 Dec 2025 02:21:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ssh=E9=93=BE=E6=8E=A5?= =?UTF-8?q?=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/ServerSSHFileApiController.java | 79 +++++ .../service/impl/ServerSSHFileService.java | 268 +++++++++++++++ .../framework/exception/BaseException.java | 4 + .../exception/BusinessException.java | 8 + .../ssh/file/AsyncFileUploadService.java | 317 ++++++++++++++++++ .../framework/ssh/file/FileUploadTask.java | 82 +++++ .../framework/ssh/file/SSHFileException.java | 3 +- .../ssh/websocket/SSHWebSocketConfig.java | 32 ++ 8 files changed, 791 insertions(+), 2 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java index 735ed4f4..318760d4 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java @@ -4,12 +4,14 @@ import com.qqchen.deploy.backend.deploy.dto.FileUploadResultDTO; import com.qqchen.deploy.backend.deploy.dto.RemoteFileInfoDTO; import com.qqchen.deploy.backend.deploy.service.impl.ServerSSHFileService; import com.qqchen.deploy.backend.framework.api.Response; +import com.qqchen.deploy.backend.framework.ssh.file.FileUploadTask; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import java.util.List; +import java.util.Map; /** * 服务器SSH文件管理API(业务层) @@ -134,4 +136,81 @@ public class ServerSSHFileApiController { return Response.success(); } + + // ========== 异步上传相关接口 ========== + + /** + * 异步上传文件(立即返回任务ID) + * + * @param serverId 服务器ID + * @param file 文件 + * @param remotePath 远程路径 + * @param overwrite 是否覆盖 + * @return 任务ID和提示信息 + */ + @PostMapping("/{serverId}/files/upload-async") + public Response> uploadFileAsync( + @PathVariable Long serverId, + @RequestParam("file") MultipartFile file, + @RequestParam("remotePath") String remotePath, + @RequestParam(value = "overwrite", defaultValue = "false") Boolean overwrite + ) { + log.info("提交异步上传任务: serverId={}, fileName={}, remotePath={}, size={}", + serverId, file.getOriginalFilename(), remotePath, file.getSize()); + + String taskId = serverSSHFileService.uploadFileAsync(serverId, file, remotePath, overwrite); + + return Response.success(Map.of( + "taskId", taskId, + "message", "上传任务已提交,请通过WebSocket订阅进度:/topic/upload/" + taskId + )); + } + + /** + * 查询上传任务状态 + * + * @param serverId 服务器ID + * @param taskId 任务ID + * @return 任务状态 + */ + @GetMapping("/{serverId}/files/upload-task/{taskId}") + public Response getUploadTaskStatus( + @PathVariable Long serverId, + @PathVariable String taskId + ) { + FileUploadTask task = serverSSHFileService.getUploadTaskStatus(taskId); + if (task == null) { + return Response.error(com.qqchen.deploy.backend.framework.enums.ResponseCode.DATA_NOT_FOUND, "任务不存在"); + } + return Response.success(task); + } + + /** + * 取消上传任务 + * + * @param serverId 服务器ID + * @param taskId 任务ID + * @return 是否取消成功 + */ + @DeleteMapping("/{serverId}/files/upload-task/{taskId}") + public Response> cancelUploadTask( + @PathVariable Long serverId, + @PathVariable String taskId + ) { + boolean cancelled = serverSSHFileService.cancelUploadTask(taskId); + return Response.success(Map.of("cancelled", cancelled)); + } + + /** + * 获取上传任务统计 + * + * @param serverId 服务器ID + * @return 任务统计信息 + */ + @GetMapping("/{serverId}/files/upload-statistics") + public Response> getUploadStatistics( + @PathVariable Long serverId + ) { + return Response.success(serverSSHFileService.getUploadTaskStatistics()); + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java index 67bb8269..95cde110 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java @@ -10,6 +10,8 @@ import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.file.AbstractSSHFileOperations; +import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService; +import com.qqchen.deploy.backend.framework.ssh.file.FileUploadTask; import com.qqchen.deploy.backend.framework.ssh.file.SSHFileInfo; import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget; import jakarta.annotation.Resource; @@ -22,10 +24,14 @@ import net.schmizz.sshj.xfer.InMemorySourceFile; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -44,6 +50,9 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { @Resource private RemoteFileInfoConverter remoteFileInfoConverter; + @Resource + private AsyncFileUploadService asyncFileUploadService; + /** * 构造函数 */ @@ -82,6 +91,213 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { return remoteFileInfoConverter.toUploadResultDTO(fileInfo); } + /** + * 异步上传文件(立即返回任务ID) + * + * @param serverId 服务器ID + * @param file 文件 + * @param remotePath 远程路径 + * @param overwrite 是否覆盖 + * @return 任务ID + */ + public String uploadFileAsync(Long serverId, MultipartFile file, + String remotePath, boolean overwrite) { + // ⚠️ 关键:在HTTP请求结束前,先复制文件内容到内存 + // 因为异步任务执行时,Spring已经清理了MultipartFile的临时文件 + final byte[] fileBytes; + final String fileName = file.getOriginalFilename(); + final long fileSize = file.getSize(); + + try { + fileBytes = file.getBytes(); // 将文件内容读入内存 + } catch (IOException e) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"读取文件失败: " + e.getMessage()}); + } + + // 准备元数据 + Map metadata = Map.of( + "serverId", serverId, + "remotePath", remotePath, + "overwrite", overwrite + ); + + // 提交任务到Framework层 + return asyncFileUploadService.submitTask( + fileName, + fileSize, + metadata, + progressCallback -> { + // 执行SSH文件上传(带进度) + // 使用字节数组创建临时的MultipartFile包装 + uploadFileWithProgressFromBytes(serverId, fileBytes, fileName, + remotePath, overwrite, progressCallback::onProgress); + } + ); + } + + /** + * 查询上传任务状态 + */ + public FileUploadTask getUploadTaskStatus(String taskId) { + return asyncFileUploadService.getTaskStatus(taskId); + } + + /** + * 取消上传任务 + */ + public boolean cancelUploadTask(String taskId) { + return asyncFileUploadService.cancelTask(taskId); + } + + /** + * 获取上传任务统计 + */ + public Map getUploadTaskStatistics() { + return asyncFileUploadService.getTaskStatistics(); + } + + /** + * 从字节数组上传文件(带进度) + * + * @param serverId 服务器ID + * @param fileBytes 文件字节数组 + * @param fileName 文件名 + * @param remotePath 远程目标路径 + * @param overwrite 是否覆盖 + * @param progressCallback 进度回调 + * @return 文件信息 + */ + private SSHFileInfo uploadFileWithProgressFromBytes(Long serverId, byte[] fileBytes, + String fileName, String remotePath, + boolean overwrite, + BiConsumer progressCallback) { + validatePath(remotePath); + + if (fileBytes == null || fileBytes.length == 0) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"文件不能为空"}); + } + + SSHTarget target = buildSSHTarget(serverId); + final long fileSize = fileBytes.length; + final AtomicLong uploadedBytes = new AtomicLong(0); + + return executeFileOperation(target, new FileOperation() { + @Override + public SSHFileInfo execute(SFTPClient sftpClient) throws Exception { + // 检查文件是否已存在 + if (!overwrite) { + try { + sftpClient.stat(remotePath); + throw new BusinessException(ResponseCode.SSH_FILE_ALREADY_EXISTS); + } catch (IOException e) { + // 文件不存在,继续上传 + } + } + + // 使用字节数组创建输入流并监控进度 + try (InputStream inputStream = new java.io.ByteArrayInputStream(fileBytes)) { + ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream( + inputStream, fileSize, uploadedBytes, progressCallback); + + sftpClient.put(new InMemorySourceFile() { + @Override + public String getName() { + return fileName; + } + + @Override + public long getLength() { + return fileSize; + } + + @Override + public InputStream getInputStream() { + return progressStream; + } + }, remotePath); + + FileAttributes attrs = sftpClient.stat(remotePath); + return convertToSSHFileInfo(remotePath, attrs); + } + } + + @Override + public String getName() { + return "上传文件(带进度)"; + } + }); + } + + /** + * 带进度回调的文件上传 + * + * @param serverId 服务器ID + * @param file 要上传的文件 + * @param remotePath 远程目标路径(完整路径,包括文件名) + * @param overwrite 是否覆盖已存在的文件 + * @param progressCallback 进度回调函数 (已上传字节数, 总字节数) + * @return 文件信息 + */ + public SSHFileInfo uploadFileWithProgress(Long serverId, MultipartFile file, + String remotePath, boolean overwrite, + BiConsumer progressCallback) { + validatePath(remotePath); + + if (file == null || file.isEmpty()) { + throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"文件不能为空"}); + } + + SSHTarget target = buildSSHTarget(serverId); + final long fileSize = file.getSize(); + final AtomicLong uploadedBytes = new AtomicLong(0); + + return executeFileOperation(target, new FileOperation() { + @Override + public SSHFileInfo execute(SFTPClient sftpClient) throws Exception { + // 检查文件是否已存在 + if (!overwrite) { + try { + sftpClient.stat(remotePath); + throw new BusinessException(ResponseCode.SSH_FILE_ALREADY_EXISTS); + } catch (IOException e) { + // 文件不存在,继续上传 + } + } + + // 包装输入流以监控进度 + try (InputStream inputStream = file.getInputStream()) { + ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream( + inputStream, fileSize, uploadedBytes, progressCallback); + + sftpClient.put(new InMemorySourceFile() { + @Override + public String getName() { + return file.getOriginalFilename(); + } + + @Override + public long getLength() { + return fileSize; + } + + @Override + public InputStream getInputStream() { + return progressStream; + } + }, remotePath); + + FileAttributes attrs = sftpClient.stat(remotePath); + return convertToSSHFileInfo(remotePath, attrs); + } + } + + @Override + public String getName() { + return "上传文件(带进度)"; + } + }); + } + // ========== 公共API方法(返回实体) ========== /** @@ -449,4 +665,56 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { // TODO: 可以在这里触发错误事件 } + + /** + * 进度监控输入流 + * + * 包装原始输入流,在读取时记录进度并回调 + */ + private static class ProgressMonitorInputStream extends FilterInputStream { + private final long totalSize; + private final AtomicLong uploadedBytes; + private final BiConsumer progressCallback; + private long lastReportedBytes = 0; + private static final long REPORT_INTERVAL = 1024 * 1024; // 每1MB报告一次 + + public ProgressMonitorInputStream(InputStream in, long totalSize, + AtomicLong uploadedBytes, + BiConsumer progressCallback) { + super(in); + this.totalSize = totalSize; + this.uploadedBytes = uploadedBytes; + this.progressCallback = progressCallback; + } + + @Override + public int read() throws IOException { + int b = super.read(); + if (b != -1) { + updateProgress(1); + } + return b; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = super.read(b, off, len); + if (bytesRead > 0) { + updateProgress(bytesRead); + } + return bytesRead; + } + + private void updateProgress(int bytes) { + long current = uploadedBytes.addAndGet(bytes); + + // 每1MB或结束时报告一次进度,避免回调过于频繁 + if (current - lastReportedBytes >= REPORT_INTERVAL || current >= totalSize) { + lastReportedBytes = current; + if (progressCallback != null) { + progressCallback.accept(current, totalSize); + } + } + } + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BaseException.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BaseException.java index 39eb64d5..cc740148 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BaseException.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BaseException.java @@ -18,6 +18,10 @@ public abstract class BaseException extends RuntimeException { protected BaseException(ResponseCode errorCode, Object[] args) { this(errorCode, args, null); } + + protected BaseException(ResponseCode errorCode, Throwable cause) { + this(errorCode, null, cause); + } protected BaseException(ResponseCode errorCode, Object[] args, Throwable cause) { super(buildMessage(errorCode, args), cause); diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BusinessException.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BusinessException.java index 68f3b12f..332161c1 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BusinessException.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/BusinessException.java @@ -14,5 +14,13 @@ public class BusinessException extends BaseException { public BusinessException(ResponseCode errorCode, Object[] args) { super(errorCode, args); } + + public BusinessException(ResponseCode errorCode, Throwable cause) { + super(errorCode, cause); + } + + public BusinessException(ResponseCode errorCode, Object[] args, Throwable cause) { + super(errorCode, args, cause); + } } \ No newline at end of file diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java new file mode 100644 index 00000000..692a9334 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java @@ -0,0 +1,317 @@ +package com.qqchen.deploy.backend.framework.ssh.file; + +import com.qqchen.deploy.backend.framework.utils.FileUtils; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * 异步文件上传服务(Framework层) + * + * 提供通用的异步上传、进度追踪、任务管理功能 + * 可被不同的文件上传场景(SSH、FTP、云存储等)复用 + */ +@Service +@Slf4j +public class AsyncFileUploadService { + + private final SimpMessagingTemplate messagingTemplate; // WebSocket,可选 + + private final AsyncTaskExecutor executor; // 虚拟线程池(来自SSHWebSocketConfig) + + // 任务缓存(内存存储,实际生产环境建议用Redis) + private final Map tasks = new ConcurrentHashMap<>(); + + /** + * 构造函数 + * + * @param messagingTemplate WebSocket消息模板(可选,如果没有配置WebSocket则为null) + * @param executor 文件上传专用虚拟线程池 + */ + public AsyncFileUploadService( + @Autowired(required = false) SimpMessagingTemplate messagingTemplate, + @Qualifier("fileUploadExecutor") AsyncTaskExecutor executor) { + this.messagingTemplate = messagingTemplate; + this.executor = executor; + + if (messagingTemplate == null) { + log.warn("SimpMessagingTemplate未配置,进度推送功能不可用(仅支持轮询查询)"); + } else { + log.info("SimpMessagingTemplate已配置,支持WebSocket实时进度推送"); + } + + log.info("文件上传服务初始化完成,使用虚拟线程池"); + } + + /** + * 提交上传任务 + * + * @param fileName 文件名 + * @param fileSize 文件大小 + * @param uploadAction 上传动作(接收进度回调,执行实际上传) + * @return 任务ID + */ + public String submitTask(String fileName, long fileSize, + Consumer uploadAction) { + return submitTask(fileName, fileSize, null, uploadAction); + } + + /** + * 提交上传任务(带元数据) + * + * @param fileName 文件名 + * @param fileSize 文件大小 + * @param metadata 元数据(如服务器ID、目标路径等) + * @param uploadAction 上传动作(接收进度回调,执行实际上传) + * @return 任务ID + */ + public String submitTask(String fileName, long fileSize, Map metadata, + Consumer uploadAction) { + // 生成任务ID + String taskId = UUID.randomUUID().toString(); + + // 创建任务 + FileUploadTask task = FileUploadTask.builder() + .taskId(taskId) + .fileName(fileName) + .fileSize(fileSize) + .fileSizeFormatted(FileUtils.formatFileSize(fileSize)) + .uploadedSize(0L) + .uploadedSizeFormatted("0 B") + .progress(0) + .status(TaskStatus.PENDING) + .metadata(metadata) + .createTime(new Date()) + .updateTime(new Date()) + .build(); + + tasks.put(taskId, task); + + log.info("提交上传任务: taskId={}, fileName={}, size={}", + taskId, fileName, FileUtils.formatFileSize(fileSize)); + + // 异步执行上传 + executor.submit(() -> executeUpload(taskId, task, uploadAction)); + + return taskId; + } + + /** + * 执行上传(异步) + */ + private void executeUpload(String taskId, FileUploadTask task, + Consumer uploadAction) { + try { + // 更新状态为上传中 + updateTaskStatus(task, TaskStatus.UPLOADING, 0); + sendTaskUpdate(task); + + log.info("开始上传文件: taskId={}, fileName={}", taskId, task.getFileName()); + + // 创建进度回调 + ProgressCallback progressCallback = (uploaded, total) -> { + // 计算进度 + int progress = (int) ((uploaded * 100) / total); + task.setUploadedSize(uploaded); + task.setUploadedSizeFormatted(FileUtils.formatFileSize(uploaded)); + task.setProgress(progress); + task.setUpdateTime(new Date()); + + // 发送进度更新 + sendProgressUpdate(taskId, progress, uploaded, total); + }; + + // 执行上传动作 + uploadAction.accept(progressCallback); + + // 上传成功 + updateTaskStatus(task, TaskStatus.SUCCESS, 100); + task.setUploadedSize(task.getFileSize()); + task.setUploadedSizeFormatted(task.getFileSizeFormatted()); + sendTaskUpdate(task); + + log.info("文件上传成功: taskId={}, fileName={}", taskId, task.getFileName()); + + } catch (Exception e) { + log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e); + task.setStatus(TaskStatus.FAILED); + task.setErrorMessage(e.getMessage()); + task.setUpdateTime(new Date()); + + // 发送失败通知 + sendTaskUpdate(task); + } + } + + /** + * 更新任务状态 + */ + private void updateTaskStatus(FileUploadTask task, TaskStatus status, int progress) { + task.setStatus(status); + task.setProgress(progress); + task.setUpdateTime(new Date()); + } + + /** + * 发送进度更新(通过WebSocket) + */ + private void sendProgressUpdate(String taskId, int progress, long uploaded, long total) { + if (messagingTemplate != null) { + try { + messagingTemplate.convertAndSend("/topic/upload/" + taskId, Map.of( + "taskId", taskId, + "status", TaskStatus.UPLOADING.name(), + "progress", progress, + "uploaded", uploaded, + "total", total, + "uploadedFormatted", FileUtils.formatFileSize(uploaded), + "totalFormatted", FileUtils.formatFileSize(total), + "timestamp", System.currentTimeMillis() + )); + } catch (Exception e) { + log.warn("发送进度更新失败: taskId={}", taskId, e); + } + } + } + + /** + * 发送任务更新(通过WebSocket) + */ + private void sendTaskUpdate(FileUploadTask task) { + if (messagingTemplate != null) { + try { + messagingTemplate.convertAndSend("/topic/upload/" + task.getTaskId(), Map.of( + "taskId", task.getTaskId(), + "status", task.getStatus().name(), + "progress", task.getProgress(), + "uploaded", task.getUploadedSize(), + "total", task.getFileSize(), + "uploadedFormatted", task.getUploadedSizeFormatted(), + "totalFormatted", task.getFileSizeFormatted(), + "errorMessage", task.getErrorMessage() != null ? task.getErrorMessage() : "", + "timestamp", System.currentTimeMillis() + )); + } catch (Exception e) { + log.warn("发送任务更新失败: taskId={}", task.getTaskId(), e); + } + } + } + + /** + * 查询任务状态 + * + * @param taskId 任务ID + * @return 任务信息 + */ + public FileUploadTask getTaskStatus(String taskId) { + return tasks.get(taskId); + } + + /** + * 取消任务(仅能取消排队中的任务) + * + * @param taskId 任务ID + * @return 是否取消成功 + */ + public boolean cancelTask(String taskId) { + FileUploadTask task = tasks.get(taskId); + if (task != null && task.getStatus() == TaskStatus.PENDING) { + task.setStatus(TaskStatus.CANCELLED); + task.setUpdateTime(new Date()); + sendTaskUpdate(task); + log.info("任务已取消: taskId={}", taskId); + return true; + } + return false; + } + + /** + * 清理已完成的任务(定时清理,避免内存泄漏) + * + * @param olderThanHours 清理多少小时前的任务 + * @return 清理的任务数 + */ + public int cleanupCompletedTasks(int olderThanHours) { + long cutoffTime = System.currentTimeMillis() - (olderThanHours * 60 * 60 * 1000L); + int[] count = {0}; + + tasks.entrySet().removeIf(entry -> { + FileUploadTask task = entry.getValue(); + boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS || + task.getStatus() == TaskStatus.FAILED || + task.getStatus() == TaskStatus.CANCELLED) + && task.getUpdateTime().getTime() < cutoffTime; + if (shouldRemove) { + count[0]++; + log.debug("清理旧任务: taskId={}, status={}", entry.getKey(), task.getStatus()); + } + return shouldRemove; + }); + + return count[0]; + } + + /** + * 获取当前任务统计 + */ + public Map getTaskStatistics() { + int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0; + + for (FileUploadTask task : tasks.values()) { + switch (task.getStatus()) { + case PENDING -> pending++; + case UPLOADING -> uploading++; + case SUCCESS -> success++; + case FAILED -> failed++; + case CANCELLED -> cancelled++; + } + } + + return Map.of( + "pending", pending, + "uploading", uploading, + "success", success, + "failed", failed, + "cancelled", cancelled, + "total", tasks.size() + ); + } + + /** + * 关闭服务时的清理(虚拟线程池由Spring自动管理) + */ + @PreDestroy + public void shutdown() { + log.info("异步上传服务关闭,剩余任务: {}", tasks.size()); + // 虚拟线程池由Spring容器管理,无需手动关闭 + } + + /** + * 进度回调接口 + */ + @FunctionalInterface + public interface ProgressCallback { + void onProgress(long uploaded, long total); + } + + /** + * 任务状态枚举 + */ + public enum TaskStatus { + PENDING, // 等待中 + UPLOADING, // 上传中 + SUCCESS, // 成功 + FAILED, // 失败 + CANCELLED // 已取消 + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java new file mode 100644 index 00000000..b54e91eb --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java @@ -0,0 +1,82 @@ +package com.qqchen.deploy.backend.framework.ssh.file; + +import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService.TaskStatus; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; +import java.util.Map; + +/** + * 文件上传任务(Framework层) + * + * 通用的文件上传任务模型,可用于各种文件上传场景 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FileUploadTask { + + /** + * 任务ID + */ + private String taskId; + + /** + * 文件名 + */ + private String fileName; + + /** + * 文件大小(字节) + */ + private Long fileSize; + + /** + * 格式化的文件大小 + */ + private String fileSizeFormatted; + + /** + * 已上传大小(字节) + */ + private Long uploadedSize; + + /** + * 格式化的已上传大小 + */ + private String uploadedSizeFormatted; + + /** + * 进度百分比(0-100) + */ + private Integer progress; + + /** + * 状态 + */ + private TaskStatus status; + + /** + * 错误信息 + */ + private String errorMessage; + + /** + * 元数据(扩展字段,如服务器ID、目标路径等) + */ + private Map metadata; + + /** + * 创建时间 + */ + private Date createTime; + + /** + * 更新时间 + */ + private Date updateTime; +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/SSHFileException.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/SSHFileException.java index fad03259..2d32ab29 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/SSHFileException.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/SSHFileException.java @@ -34,10 +34,9 @@ public class SSHFileException extends BusinessException { } public SSHFileException(SSHFileErrorType errorType, ResponseCode errorCode, Throwable cause) { - super(errorCode); + super(errorCode, cause); this.errorType = errorType; this.originalCause = cause; - initCause(cause); } public SSHFileErrorType getErrorType() { diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java index 4c387e5f..21a8014b 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/SSHWebSocketConfig.java @@ -43,4 +43,36 @@ public class SSHWebSocketConfig { return executor; } + + /** + * 文件上传专用线程池(虚拟线程) + * + * ⚠️ 为什么使用虚拟线程? + * 1. 文件上传是典型的**I/O密集型**任务(网络传输、磁盘读取) + * 2. 大文件上传时线程主要在等待I/O完成 + * 3. 虚拟线程在阻塞时自动释放底层平台线程 + * 4. 支持大量并发上传而不消耗过多资源 + * + * 📊 性能对比: + * - 平台线程:100个并发上传 = 100个线程 ≈ 100-200MB内存 + CPU调度开销 ❌ + * - 虚拟线程:100个并发上传 = 100个虚拟线程 ≈ 几MB内存 + 零调度开销 ✅ + * + * 💡 使用场景: + * - 异步文件上传 + * - 文件传输监控 + * - 进度追踪任务 + */ + @Bean("fileUploadExecutor") + public AsyncTaskExecutor fileUploadExecutor() { + SimpleAsyncTaskExecutor executor = + new SimpleAsyncTaskExecutor("file-upload-"); + + // 启用虚拟线程(Java 21+) + executor.setVirtualThreads(true); + + // 并发限制:-1表示无限制(虚拟线程适合I/O密集型任务) + executor.setConcurrencyLimit(-1); + + return executor; + } }