增加ssh链接框架
This commit is contained in:
parent
ba7663ebd6
commit
04f1cbd251
@ -4,12 +4,14 @@ import com.qqchen.deploy.backend.deploy.dto.FileUploadResultDTO;
|
||||
import com.qqchen.deploy.backend.deploy.dto.RemoteFileInfoDTO;
|
||||
import com.qqchen.deploy.backend.deploy.service.impl.ServerSSHFileService;
|
||||
import com.qqchen.deploy.backend.framework.api.Response;
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.FileUploadTask;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 服务器SSH文件管理API(业务层)
|
||||
@ -134,4 +136,81 @@ public class ServerSSHFileApiController {
|
||||
|
||||
return Response.success();
|
||||
}
|
||||
|
||||
// ========== 异步上传相关接口 ==========
|
||||
|
||||
/**
|
||||
* 异步上传文件(立即返回任务ID)
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param file 文件
|
||||
* @param remotePath 远程路径
|
||||
* @param overwrite 是否覆盖
|
||||
* @return 任务ID和提示信息
|
||||
*/
|
||||
@PostMapping("/{serverId}/files/upload-async")
|
||||
public Response<Map<String, String>> uploadFileAsync(
|
||||
@PathVariable Long serverId,
|
||||
@RequestParam("file") MultipartFile file,
|
||||
@RequestParam("remotePath") String remotePath,
|
||||
@RequestParam(value = "overwrite", defaultValue = "false") Boolean overwrite
|
||||
) {
|
||||
log.info("提交异步上传任务: serverId={}, fileName={}, remotePath={}, size={}",
|
||||
serverId, file.getOriginalFilename(), remotePath, file.getSize());
|
||||
|
||||
String taskId = serverSSHFileService.uploadFileAsync(serverId, file, remotePath, overwrite);
|
||||
|
||||
return Response.success(Map.of(
|
||||
"taskId", taskId,
|
||||
"message", "上传任务已提交,请通过WebSocket订阅进度:/topic/upload/" + taskId
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询上传任务状态
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param taskId 任务ID
|
||||
* @return 任务状态
|
||||
*/
|
||||
@GetMapping("/{serverId}/files/upload-task/{taskId}")
|
||||
public Response<FileUploadTask> getUploadTaskStatus(
|
||||
@PathVariable Long serverId,
|
||||
@PathVariable String taskId
|
||||
) {
|
||||
FileUploadTask task = serverSSHFileService.getUploadTaskStatus(taskId);
|
||||
if (task == null) {
|
||||
return Response.error(com.qqchen.deploy.backend.framework.enums.ResponseCode.DATA_NOT_FOUND, "任务不存在");
|
||||
}
|
||||
return Response.success(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消上传任务
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param taskId 任务ID
|
||||
* @return 是否取消成功
|
||||
*/
|
||||
@DeleteMapping("/{serverId}/files/upload-task/{taskId}")
|
||||
public Response<Map<String, Boolean>> cancelUploadTask(
|
||||
@PathVariable Long serverId,
|
||||
@PathVariable String taskId
|
||||
) {
|
||||
boolean cancelled = serverSSHFileService.cancelUploadTask(taskId);
|
||||
return Response.success(Map.of("cancelled", cancelled));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取上传任务统计
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @return 任务统计信息
|
||||
*/
|
||||
@GetMapping("/{serverId}/files/upload-statistics")
|
||||
public Response<Map<String, Integer>> getUploadStatistics(
|
||||
@PathVariable Long serverId
|
||||
) {
|
||||
return Response.success(serverSSHFileService.getUploadTaskStatistics());
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,6 +10,8 @@ import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
|
||||
import com.qqchen.deploy.backend.framework.exception.BusinessException;
|
||||
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.AbstractSSHFileOperations;
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService;
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.FileUploadTask;
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.SSHFileInfo;
|
||||
import com.qqchen.deploy.backend.framework.ssh.websocket.SSHTarget;
|
||||
import jakarta.annotation.Resource;
|
||||
@ -22,10 +24,14 @@ import net.schmizz.sshj.xfer.InMemorySourceFile;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -44,6 +50,9 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
|
||||
@Resource
|
||||
private RemoteFileInfoConverter remoteFileInfoConverter;
|
||||
|
||||
@Resource
|
||||
private AsyncFileUploadService asyncFileUploadService;
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*/
|
||||
@ -82,6 +91,213 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
|
||||
return remoteFileInfoConverter.toUploadResultDTO(fileInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步上传文件(立即返回任务ID)
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param file 文件
|
||||
* @param remotePath 远程路径
|
||||
* @param overwrite 是否覆盖
|
||||
* @return 任务ID
|
||||
*/
|
||||
public String uploadFileAsync(Long serverId, MultipartFile file,
|
||||
String remotePath, boolean overwrite) {
|
||||
// ⚠️ 关键:在HTTP请求结束前,先复制文件内容到内存
|
||||
// 因为异步任务执行时,Spring已经清理了MultipartFile的临时文件
|
||||
final byte[] fileBytes;
|
||||
final String fileName = file.getOriginalFilename();
|
||||
final long fileSize = file.getSize();
|
||||
|
||||
try {
|
||||
fileBytes = file.getBytes(); // 将文件内容读入内存
|
||||
} catch (IOException e) {
|
||||
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"读取文件失败: " + e.getMessage()});
|
||||
}
|
||||
|
||||
// 准备元数据
|
||||
Map<String, Object> metadata = Map.of(
|
||||
"serverId", serverId,
|
||||
"remotePath", remotePath,
|
||||
"overwrite", overwrite
|
||||
);
|
||||
|
||||
// 提交任务到Framework层
|
||||
return asyncFileUploadService.submitTask(
|
||||
fileName,
|
||||
fileSize,
|
||||
metadata,
|
||||
progressCallback -> {
|
||||
// 执行SSH文件上传(带进度)
|
||||
// 使用字节数组创建临时的MultipartFile包装
|
||||
uploadFileWithProgressFromBytes(serverId, fileBytes, fileName,
|
||||
remotePath, overwrite, progressCallback::onProgress);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询上传任务状态
|
||||
*/
|
||||
public FileUploadTask getUploadTaskStatus(String taskId) {
|
||||
return asyncFileUploadService.getTaskStatus(taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消上传任务
|
||||
*/
|
||||
public boolean cancelUploadTask(String taskId) {
|
||||
return asyncFileUploadService.cancelTask(taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取上传任务统计
|
||||
*/
|
||||
public Map<String, Integer> getUploadTaskStatistics() {
|
||||
return asyncFileUploadService.getTaskStatistics();
|
||||
}
|
||||
|
||||
/**
|
||||
* 从字节数组上传文件(带进度)
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param fileBytes 文件字节数组
|
||||
* @param fileName 文件名
|
||||
* @param remotePath 远程目标路径
|
||||
* @param overwrite 是否覆盖
|
||||
* @param progressCallback 进度回调
|
||||
* @return 文件信息
|
||||
*/
|
||||
private SSHFileInfo uploadFileWithProgressFromBytes(Long serverId, byte[] fileBytes,
|
||||
String fileName, String remotePath,
|
||||
boolean overwrite,
|
||||
BiConsumer<Long, Long> progressCallback) {
|
||||
validatePath(remotePath);
|
||||
|
||||
if (fileBytes == null || fileBytes.length == 0) {
|
||||
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"文件不能为空"});
|
||||
}
|
||||
|
||||
SSHTarget target = buildSSHTarget(serverId);
|
||||
final long fileSize = fileBytes.length;
|
||||
final AtomicLong uploadedBytes = new AtomicLong(0);
|
||||
|
||||
return executeFileOperation(target, new FileOperation<SSHFileInfo>() {
|
||||
@Override
|
||||
public SSHFileInfo execute(SFTPClient sftpClient) throws Exception {
|
||||
// 检查文件是否已存在
|
||||
if (!overwrite) {
|
||||
try {
|
||||
sftpClient.stat(remotePath);
|
||||
throw new BusinessException(ResponseCode.SSH_FILE_ALREADY_EXISTS);
|
||||
} catch (IOException e) {
|
||||
// 文件不存在,继续上传
|
||||
}
|
||||
}
|
||||
|
||||
// 使用字节数组创建输入流并监控进度
|
||||
try (InputStream inputStream = new java.io.ByteArrayInputStream(fileBytes)) {
|
||||
ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream(
|
||||
inputStream, fileSize, uploadedBytes, progressCallback);
|
||||
|
||||
sftpClient.put(new InMemorySourceFile() {
|
||||
@Override
|
||||
public String getName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() {
|
||||
return progressStream;
|
||||
}
|
||||
}, remotePath);
|
||||
|
||||
FileAttributes attrs = sftpClient.stat(remotePath);
|
||||
return convertToSSHFileInfo(remotePath, attrs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "上传文件(带进度)";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 带进度回调的文件上传
|
||||
*
|
||||
* @param serverId 服务器ID
|
||||
* @param file 要上传的文件
|
||||
* @param remotePath 远程目标路径(完整路径,包括文件名)
|
||||
* @param overwrite 是否覆盖已存在的文件
|
||||
* @param progressCallback 进度回调函数 (已上传字节数, 总字节数)
|
||||
* @return 文件信息
|
||||
*/
|
||||
public SSHFileInfo uploadFileWithProgress(Long serverId, MultipartFile file,
|
||||
String remotePath, boolean overwrite,
|
||||
BiConsumer<Long, Long> progressCallback) {
|
||||
validatePath(remotePath);
|
||||
|
||||
if (file == null || file.isEmpty()) {
|
||||
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"文件不能为空"});
|
||||
}
|
||||
|
||||
SSHTarget target = buildSSHTarget(serverId);
|
||||
final long fileSize = file.getSize();
|
||||
final AtomicLong uploadedBytes = new AtomicLong(0);
|
||||
|
||||
return executeFileOperation(target, new FileOperation<SSHFileInfo>() {
|
||||
@Override
|
||||
public SSHFileInfo execute(SFTPClient sftpClient) throws Exception {
|
||||
// 检查文件是否已存在
|
||||
if (!overwrite) {
|
||||
try {
|
||||
sftpClient.stat(remotePath);
|
||||
throw new BusinessException(ResponseCode.SSH_FILE_ALREADY_EXISTS);
|
||||
} catch (IOException e) {
|
||||
// 文件不存在,继续上传
|
||||
}
|
||||
}
|
||||
|
||||
// 包装输入流以监控进度
|
||||
try (InputStream inputStream = file.getInputStream()) {
|
||||
ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream(
|
||||
inputStream, fileSize, uploadedBytes, progressCallback);
|
||||
|
||||
sftpClient.put(new InMemorySourceFile() {
|
||||
@Override
|
||||
public String getName() {
|
||||
return file.getOriginalFilename();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream() {
|
||||
return progressStream;
|
||||
}
|
||||
}, remotePath);
|
||||
|
||||
FileAttributes attrs = sftpClient.stat(remotePath);
|
||||
return convertToSSHFileInfo(remotePath, attrs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "上传文件(带进度)";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ========== 公共API方法(返回实体) ==========
|
||||
|
||||
/**
|
||||
@ -449,4 +665,56 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
|
||||
|
||||
// TODO: 可以在这里触发错误事件
|
||||
}
|
||||
|
||||
/**
|
||||
* 进度监控输入流
|
||||
*
|
||||
* 包装原始输入流,在读取时记录进度并回调
|
||||
*/
|
||||
private static class ProgressMonitorInputStream extends FilterInputStream {
|
||||
private final long totalSize;
|
||||
private final AtomicLong uploadedBytes;
|
||||
private final BiConsumer<Long, Long> progressCallback;
|
||||
private long lastReportedBytes = 0;
|
||||
private static final long REPORT_INTERVAL = 1024 * 1024; // 每1MB报告一次
|
||||
|
||||
public ProgressMonitorInputStream(InputStream in, long totalSize,
|
||||
AtomicLong uploadedBytes,
|
||||
BiConsumer<Long, Long> progressCallback) {
|
||||
super(in);
|
||||
this.totalSize = totalSize;
|
||||
this.uploadedBytes = uploadedBytes;
|
||||
this.progressCallback = progressCallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
int b = super.read();
|
||||
if (b != -1) {
|
||||
updateProgress(1);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
int bytesRead = super.read(b, off, len);
|
||||
if (bytesRead > 0) {
|
||||
updateProgress(bytesRead);
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
private void updateProgress(int bytes) {
|
||||
long current = uploadedBytes.addAndGet(bytes);
|
||||
|
||||
// 每1MB或结束时报告一次进度,避免回调过于频繁
|
||||
if (current - lastReportedBytes >= REPORT_INTERVAL || current >= totalSize) {
|
||||
lastReportedBytes = current;
|
||||
if (progressCallback != null) {
|
||||
progressCallback.accept(current, totalSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,10 @@ public abstract class BaseException extends RuntimeException {
|
||||
this(errorCode, args, null);
|
||||
}
|
||||
|
||||
protected BaseException(ResponseCode errorCode, Throwable cause) {
|
||||
this(errorCode, null, cause);
|
||||
}
|
||||
|
||||
protected BaseException(ResponseCode errorCode, Object[] args, Throwable cause) {
|
||||
super(buildMessage(errorCode, args), cause);
|
||||
this.errorCode = errorCode;
|
||||
|
||||
@ -15,4 +15,12 @@ public class BusinessException extends BaseException {
|
||||
super(errorCode, args);
|
||||
}
|
||||
|
||||
public BusinessException(ResponseCode errorCode, Throwable cause) {
|
||||
super(errorCode, cause);
|
||||
}
|
||||
|
||||
public BusinessException(ResponseCode errorCode, Object[] args, Throwable cause) {
|
||||
super(errorCode, args, cause);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,317 @@
|
||||
package com.qqchen.deploy.backend.framework.ssh.file;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.utils.FileUtils;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.task.AsyncTaskExecutor;
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 异步文件上传服务(Framework层)
|
||||
*
|
||||
* 提供通用的异步上传、进度追踪、任务管理功能
|
||||
* 可被不同的文件上传场景(SSH、FTP、云存储等)复用
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class AsyncFileUploadService {
|
||||
|
||||
private final SimpMessagingTemplate messagingTemplate; // WebSocket,可选
|
||||
|
||||
private final AsyncTaskExecutor executor; // 虚拟线程池(来自SSHWebSocketConfig)
|
||||
|
||||
// 任务缓存(内存存储,实际生产环境建议用Redis)
|
||||
private final Map<String, FileUploadTask> tasks = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*
|
||||
* @param messagingTemplate WebSocket消息模板(可选,如果没有配置WebSocket则为null)
|
||||
* @param executor 文件上传专用虚拟线程池
|
||||
*/
|
||||
public AsyncFileUploadService(
|
||||
@Autowired(required = false) SimpMessagingTemplate messagingTemplate,
|
||||
@Qualifier("fileUploadExecutor") AsyncTaskExecutor executor) {
|
||||
this.messagingTemplate = messagingTemplate;
|
||||
this.executor = executor;
|
||||
|
||||
if (messagingTemplate == null) {
|
||||
log.warn("SimpMessagingTemplate未配置,进度推送功能不可用(仅支持轮询查询)");
|
||||
} else {
|
||||
log.info("SimpMessagingTemplate已配置,支持WebSocket实时进度推送");
|
||||
}
|
||||
|
||||
log.info("文件上传服务初始化完成,使用虚拟线程池");
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交上传任务
|
||||
*
|
||||
* @param fileName 文件名
|
||||
* @param fileSize 文件大小
|
||||
* @param uploadAction 上传动作(接收进度回调,执行实际上传)
|
||||
* @return 任务ID
|
||||
*/
|
||||
public String submitTask(String fileName, long fileSize,
|
||||
Consumer<ProgressCallback> uploadAction) {
|
||||
return submitTask(fileName, fileSize, null, uploadAction);
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交上传任务(带元数据)
|
||||
*
|
||||
* @param fileName 文件名
|
||||
* @param fileSize 文件大小
|
||||
* @param metadata 元数据(如服务器ID、目标路径等)
|
||||
* @param uploadAction 上传动作(接收进度回调,执行实际上传)
|
||||
* @return 任务ID
|
||||
*/
|
||||
public String submitTask(String fileName, long fileSize, Map<String, Object> metadata,
|
||||
Consumer<ProgressCallback> uploadAction) {
|
||||
// 生成任务ID
|
||||
String taskId = UUID.randomUUID().toString();
|
||||
|
||||
// 创建任务
|
||||
FileUploadTask task = FileUploadTask.builder()
|
||||
.taskId(taskId)
|
||||
.fileName(fileName)
|
||||
.fileSize(fileSize)
|
||||
.fileSizeFormatted(FileUtils.formatFileSize(fileSize))
|
||||
.uploadedSize(0L)
|
||||
.uploadedSizeFormatted("0 B")
|
||||
.progress(0)
|
||||
.status(TaskStatus.PENDING)
|
||||
.metadata(metadata)
|
||||
.createTime(new Date())
|
||||
.updateTime(new Date())
|
||||
.build();
|
||||
|
||||
tasks.put(taskId, task);
|
||||
|
||||
log.info("提交上传任务: taskId={}, fileName={}, size={}",
|
||||
taskId, fileName, FileUtils.formatFileSize(fileSize));
|
||||
|
||||
// 异步执行上传
|
||||
executor.submit(() -> executeUpload(taskId, task, uploadAction));
|
||||
|
||||
return taskId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行上传(异步)
|
||||
*/
|
||||
private void executeUpload(String taskId, FileUploadTask task,
|
||||
Consumer<ProgressCallback> uploadAction) {
|
||||
try {
|
||||
// 更新状态为上传中
|
||||
updateTaskStatus(task, TaskStatus.UPLOADING, 0);
|
||||
sendTaskUpdate(task);
|
||||
|
||||
log.info("开始上传文件: taskId={}, fileName={}", taskId, task.getFileName());
|
||||
|
||||
// 创建进度回调
|
||||
ProgressCallback progressCallback = (uploaded, total) -> {
|
||||
// 计算进度
|
||||
int progress = (int) ((uploaded * 100) / total);
|
||||
task.setUploadedSize(uploaded);
|
||||
task.setUploadedSizeFormatted(FileUtils.formatFileSize(uploaded));
|
||||
task.setProgress(progress);
|
||||
task.setUpdateTime(new Date());
|
||||
|
||||
// 发送进度更新
|
||||
sendProgressUpdate(taskId, progress, uploaded, total);
|
||||
};
|
||||
|
||||
// 执行上传动作
|
||||
uploadAction.accept(progressCallback);
|
||||
|
||||
// 上传成功
|
||||
updateTaskStatus(task, TaskStatus.SUCCESS, 100);
|
||||
task.setUploadedSize(task.getFileSize());
|
||||
task.setUploadedSizeFormatted(task.getFileSizeFormatted());
|
||||
sendTaskUpdate(task);
|
||||
|
||||
log.info("文件上传成功: taskId={}, fileName={}", taskId, task.getFileName());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e);
|
||||
task.setStatus(TaskStatus.FAILED);
|
||||
task.setErrorMessage(e.getMessage());
|
||||
task.setUpdateTime(new Date());
|
||||
|
||||
// 发送失败通知
|
||||
sendTaskUpdate(task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务状态
|
||||
*/
|
||||
private void updateTaskStatus(FileUploadTask task, TaskStatus status, int progress) {
|
||||
task.setStatus(status);
|
||||
task.setProgress(progress);
|
||||
task.setUpdateTime(new Date());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送进度更新(通过WebSocket)
|
||||
*/
|
||||
private void sendProgressUpdate(String taskId, int progress, long uploaded, long total) {
|
||||
if (messagingTemplate != null) {
|
||||
try {
|
||||
messagingTemplate.convertAndSend("/topic/upload/" + taskId, Map.of(
|
||||
"taskId", taskId,
|
||||
"status", TaskStatus.UPLOADING.name(),
|
||||
"progress", progress,
|
||||
"uploaded", uploaded,
|
||||
"total", total,
|
||||
"uploadedFormatted", FileUtils.formatFileSize(uploaded),
|
||||
"totalFormatted", FileUtils.formatFileSize(total),
|
||||
"timestamp", System.currentTimeMillis()
|
||||
));
|
||||
} catch (Exception e) {
|
||||
log.warn("发送进度更新失败: taskId={}", taskId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送任务更新(通过WebSocket)
|
||||
*/
|
||||
private void sendTaskUpdate(FileUploadTask task) {
|
||||
if (messagingTemplate != null) {
|
||||
try {
|
||||
messagingTemplate.convertAndSend("/topic/upload/" + task.getTaskId(), Map.of(
|
||||
"taskId", task.getTaskId(),
|
||||
"status", task.getStatus().name(),
|
||||
"progress", task.getProgress(),
|
||||
"uploaded", task.getUploadedSize(),
|
||||
"total", task.getFileSize(),
|
||||
"uploadedFormatted", task.getUploadedSizeFormatted(),
|
||||
"totalFormatted", task.getFileSizeFormatted(),
|
||||
"errorMessage", task.getErrorMessage() != null ? task.getErrorMessage() : "",
|
||||
"timestamp", System.currentTimeMillis()
|
||||
));
|
||||
} catch (Exception e) {
|
||||
log.warn("发送任务更新失败: taskId={}", task.getTaskId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询任务状态
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 任务信息
|
||||
*/
|
||||
public FileUploadTask getTaskStatus(String taskId) {
|
||||
return tasks.get(taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消任务(仅能取消排队中的任务)
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 是否取消成功
|
||||
*/
|
||||
public boolean cancelTask(String taskId) {
|
||||
FileUploadTask task = tasks.get(taskId);
|
||||
if (task != null && task.getStatus() == TaskStatus.PENDING) {
|
||||
task.setStatus(TaskStatus.CANCELLED);
|
||||
task.setUpdateTime(new Date());
|
||||
sendTaskUpdate(task);
|
||||
log.info("任务已取消: taskId={}", taskId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理已完成的任务(定时清理,避免内存泄漏)
|
||||
*
|
||||
* @param olderThanHours 清理多少小时前的任务
|
||||
* @return 清理的任务数
|
||||
*/
|
||||
public int cleanupCompletedTasks(int olderThanHours) {
|
||||
long cutoffTime = System.currentTimeMillis() - (olderThanHours * 60 * 60 * 1000L);
|
||||
int[] count = {0};
|
||||
|
||||
tasks.entrySet().removeIf(entry -> {
|
||||
FileUploadTask task = entry.getValue();
|
||||
boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS ||
|
||||
task.getStatus() == TaskStatus.FAILED ||
|
||||
task.getStatus() == TaskStatus.CANCELLED)
|
||||
&& task.getUpdateTime().getTime() < cutoffTime;
|
||||
if (shouldRemove) {
|
||||
count[0]++;
|
||||
log.debug("清理旧任务: taskId={}, status={}", entry.getKey(), task.getStatus());
|
||||
}
|
||||
return shouldRemove;
|
||||
});
|
||||
|
||||
return count[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前任务统计
|
||||
*/
|
||||
public Map<String, Integer> getTaskStatistics() {
|
||||
int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0;
|
||||
|
||||
for (FileUploadTask task : tasks.values()) {
|
||||
switch (task.getStatus()) {
|
||||
case PENDING -> pending++;
|
||||
case UPLOADING -> uploading++;
|
||||
case SUCCESS -> success++;
|
||||
case FAILED -> failed++;
|
||||
case CANCELLED -> cancelled++;
|
||||
}
|
||||
}
|
||||
|
||||
return Map.of(
|
||||
"pending", pending,
|
||||
"uploading", uploading,
|
||||
"success", success,
|
||||
"failed", failed,
|
||||
"cancelled", cancelled,
|
||||
"total", tasks.size()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭服务时的清理(虚拟线程池由Spring自动管理)
|
||||
*/
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
log.info("异步上传服务关闭,剩余任务: {}", tasks.size());
|
||||
// 虚拟线程池由Spring容器管理,无需手动关闭
|
||||
}
|
||||
|
||||
/**
|
||||
* 进度回调接口
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ProgressCallback {
|
||||
void onProgress(long uploaded, long total);
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务状态枚举
|
||||
*/
|
||||
public enum TaskStatus {
|
||||
PENDING, // 等待中
|
||||
UPLOADING, // 上传中
|
||||
SUCCESS, // 成功
|
||||
FAILED, // 失败
|
||||
CANCELLED // 已取消
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
package com.qqchen.deploy.backend.framework.ssh.file;
|
||||
|
||||
import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService.TaskStatus;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 文件上传任务(Framework层)
|
||||
*
|
||||
* 通用的文件上传任务模型,可用于各种文件上传场景
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class FileUploadTask {
|
||||
|
||||
/**
|
||||
* 任务ID
|
||||
*/
|
||||
private String taskId;
|
||||
|
||||
/**
|
||||
* 文件名
|
||||
*/
|
||||
private String fileName;
|
||||
|
||||
/**
|
||||
* 文件大小(字节)
|
||||
*/
|
||||
private Long fileSize;
|
||||
|
||||
/**
|
||||
* 格式化的文件大小
|
||||
*/
|
||||
private String fileSizeFormatted;
|
||||
|
||||
/**
|
||||
* 已上传大小(字节)
|
||||
*/
|
||||
private Long uploadedSize;
|
||||
|
||||
/**
|
||||
* 格式化的已上传大小
|
||||
*/
|
||||
private String uploadedSizeFormatted;
|
||||
|
||||
/**
|
||||
* 进度百分比(0-100)
|
||||
*/
|
||||
private Integer progress;
|
||||
|
||||
/**
|
||||
* 状态
|
||||
*/
|
||||
private TaskStatus status;
|
||||
|
||||
/**
|
||||
* 错误信息
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* 元数据(扩展字段,如服务器ID、目标路径等)
|
||||
*/
|
||||
private Map<String, Object> metadata;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private Date createTime;
|
||||
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
private Date updateTime;
|
||||
}
|
||||
@ -34,10 +34,9 @@ public class SSHFileException extends BusinessException {
|
||||
}
|
||||
|
||||
public SSHFileException(SSHFileErrorType errorType, ResponseCode errorCode, Throwable cause) {
|
||||
super(errorCode);
|
||||
super(errorCode, cause);
|
||||
this.errorType = errorType;
|
||||
this.originalCause = cause;
|
||||
initCause(cause);
|
||||
}
|
||||
|
||||
public SSHFileErrorType getErrorType() {
|
||||
|
||||
@ -43,4 +43,36 @@ public class SSHWebSocketConfig {
|
||||
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 文件上传专用线程池(虚拟线程)
|
||||
*
|
||||
* ⚠️ 为什么使用虚拟线程?
|
||||
* 1. 文件上传是典型的**I/O密集型**任务(网络传输、磁盘读取)
|
||||
* 2. 大文件上传时线程主要在等待I/O完成
|
||||
* 3. 虚拟线程在阻塞时自动释放底层平台线程
|
||||
* 4. 支持大量并发上传而不消耗过多资源
|
||||
*
|
||||
* 📊 性能对比:
|
||||
* - 平台线程:100个并发上传 = 100个线程 ≈ 100-200MB内存 + CPU调度开销 ❌
|
||||
* - 虚拟线程:100个并发上传 = 100个虚拟线程 ≈ 几MB内存 + 零调度开销 ✅
|
||||
*
|
||||
* 💡 使用场景:
|
||||
* - 异步文件上传
|
||||
* - 文件传输监控
|
||||
* - 进度追踪任务
|
||||
*/
|
||||
@Bean("fileUploadExecutor")
|
||||
public AsyncTaskExecutor fileUploadExecutor() {
|
||||
SimpleAsyncTaskExecutor executor =
|
||||
new SimpleAsyncTaskExecutor("file-upload-");
|
||||
|
||||
// 启用虚拟线程(Java 21+)
|
||||
executor.setVirtualThreads(true);
|
||||
|
||||
// 并发限制:-1表示无限制(虚拟线程适合I/O密集型任务)
|
||||
executor.setConcurrencyLimit(-1);
|
||||
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user