From ff149be46f36f260ccae1c3a642a986317c8fd5d Mon Sep 17 00:00:00 2001 From: dengqichen Date: Sun, 7 Dec 2025 18:26:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ssh=E9=93=BE=E6=8E=A5?= =?UTF-8?q?=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/ServerSSHFileApiController.java | 13 +- .../service/impl/ServerSSHFileService.java | 83 ++++-- .../backend/framework/enums/ResponseCode.java | 1 + .../exception/UploadCancelledException.java | 15 ++ .../ssh/file/AsyncFileUploadService.java | 240 ++++++++++++++---- .../framework/ssh/file/FileUploadTask.java | 20 ++ .../AbstractSSHWebSocketHandler.java | 63 +---- .../src/main/resources/messages.properties | 1 + .../main/resources/messages_en_US.properties | 1 + .../main/resources/messages_zh_CN.properties | 1 + 10 files changed, 296 insertions(+), 142 deletions(-) create mode 100644 backend/src/main/java/com/qqchen/deploy/backend/framework/exception/UploadCancelledException.java diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java index 318760d4..73a38d50 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/api/ServerSSHFileApiController.java @@ -178,10 +178,7 @@ public class ServerSSHFileApiController { @PathVariable Long serverId, @PathVariable String taskId ) { - FileUploadTask task = serverSSHFileService.getUploadTaskStatus(taskId); - if (task == null) { - return Response.error(com.qqchen.deploy.backend.framework.enums.ResponseCode.DATA_NOT_FOUND, "任务不存在"); - } + FileUploadTask task = serverSSHFileService.getUploadTaskStatus(serverId, taskId); return Response.success(task); } @@ -190,15 +187,15 @@ public class ServerSSHFileApiController { * * @param serverId 服务器ID * @param taskId 任务ID - * @return 是否取消成功 + * @return 是否取消成功(true=取消成功,false=无法取消) */ @DeleteMapping("/{serverId}/files/upload-task/{taskId}") - public Response> cancelUploadTask( + public Response cancelUploadTask( @PathVariable Long serverId, @PathVariable String taskId ) { - boolean cancelled = serverSSHFileService.cancelUploadTask(taskId); - return Response.success(Map.of("cancelled", cancelled)); + boolean cancelled = serverSSHFileService.cancelUploadTask(serverId, taskId); + return Response.success(cancelled); } /** diff --git a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java index 95cde110..1fdaeae1 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/deploy/service/impl/ServerSSHFileService.java @@ -8,6 +8,7 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode; import com.qqchen.deploy.backend.framework.enums.SSHEvent; import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.exception.BusinessException; +import com.qqchen.deploy.backend.framework.exception.UploadCancelledException; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.file.AbstractSSHFileOperations; import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService; @@ -28,6 +29,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -114,15 +116,15 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"读取文件失败: " + e.getMessage()}); } - // 准备元数据 - Map metadata = Map.of( - "serverId", serverId, - "remotePath", remotePath, - "overwrite", overwrite - ); + // 准备元数据(使用HashMap,可变) + Map metadata = new HashMap<>(); + metadata.put("serverId", serverId); + metadata.put("remotePath", remotePath); + metadata.put("overwrite", overwrite); // 提交任务到Framework层 return asyncFileUploadService.submitTask( + serverId, // 🔒 传递serverId fileName, fileSize, metadata, @@ -137,16 +139,32 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { /** * 查询上传任务状态 + * + * @param serverId 服务器ID + * @param taskId 任务ID + * @return 任务信息 + * @throws BusinessException 如果任务不存在或不属于该服务器 */ - public FileUploadTask getUploadTaskStatus(String taskId) { - return asyncFileUploadService.getTaskStatus(taskId); + public FileUploadTask getUploadTaskStatus(Long serverId, String taskId) { + FileUploadTask task = asyncFileUploadService.getTaskStatus(serverId, taskId); + + if (task == null) { + throw new BusinessException(ResponseCode.DATA_NOT_FOUND, + new Object[]{"任务不存在或不属于该服务器"}); + } + + return task; } /** * 取消上传任务 + * + * @param serverId 服务器ID + * @param taskId 任务ID + * @return 是否取消成功 */ - public boolean cancelUploadTask(String taskId) { - return asyncFileUploadService.cancelTask(taskId); + public boolean cancelUploadTask(Long serverId, String taskId) { + return asyncFileUploadService.cancelTask(serverId, taskId); } /** @@ -199,25 +217,38 @@ public class ServerSSHFileService extends AbstractSSHFileOperations { ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream( inputStream, fileSize, uploadedBytes, progressCallback); - sftpClient.put(new InMemorySourceFile() { - @Override - public String getName() { - return fileName; - } + try { + sftpClient.put(new InMemorySourceFile() { + @Override + public String getName() { + return fileName; + } + + @Override + public long getLength() { + return fileSize; + } + + @Override + public InputStream getInputStream() { + return progressStream; + } + }, remotePath); - @Override - public long getLength() { - return fileSize; - } + FileAttributes attrs = sftpClient.stat(remotePath); + return convertToSSHFileInfo(remotePath, attrs); - @Override - public InputStream getInputStream() { - return progressStream; + } catch (UploadCancelledException e) { + // 🗑️ 用户取消上传,删除部分上传的文件 + try { + sftpClient.rm(remotePath); + log.info("已删除部分上传的文件: path={}, uploadedBytes={}/{}", + remotePath, uploadedBytes.get(), fileSize); + } catch (IOException deleteEx) { + log.warn("删除部分上传文件失败: path={}", remotePath, deleteEx); } - }, remotePath); - - FileAttributes attrs = sftpClient.stat(remotePath); - return convertToSSHFileInfo(remotePath, attrs); + throw e; // 重新抛出原异常 + } } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/ResponseCode.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/ResponseCode.java index a715a4c9..fbfd9555 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/ResponseCode.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/enums/ResponseCode.java @@ -40,6 +40,7 @@ public enum ResponseCode { SSH_FILE_TIMEOUT(1211, "ssh.file.timeout"), SSH_FILE_OPERATION_FAILED(1212, "ssh.file.operation.failed"), SSH_FILE_UPLOAD_SIZE_EXCEEDED(1213, "ssh.file.upload.size.exceeded"), + SSH_FILE_UPLOAD_CANCELLED(1214, "ssh.file.upload.cancelled"), // 业务异常 (2开头) TENANT_NOT_FOUND(2001, "tenant.not.found"), diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/UploadCancelledException.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/UploadCancelledException.java new file mode 100644 index 00000000..d5827e44 --- /dev/null +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/exception/UploadCancelledException.java @@ -0,0 +1,15 @@ +package com.qqchen.deploy.backend.framework.exception; + +import com.qqchen.deploy.backend.framework.enums.ResponseCode; + +/** + * 上传取消异常 + * + * 当用户主动取消文件上传时抛出此异常 + */ +public class UploadCancelledException extends BusinessException { + + public UploadCancelledException() { + super(ResponseCode.SSH_FILE_UPLOAD_CANCELLED); + } +} diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java index 692a9334..52a512a5 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/AsyncFileUploadService.java @@ -1,5 +1,6 @@ package com.qqchen.deploy.backend.framework.ssh.file; +import com.qqchen.deploy.backend.framework.exception.UploadCancelledException; import com.qqchen.deploy.backend.framework.utils.FileUtils; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; @@ -10,6 +11,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -29,8 +31,16 @@ public class AsyncFileUploadService { private final AsyncTaskExecutor executor; // 虚拟线程池(来自SSHWebSocketConfig) - // 任务缓存(内存存储,实际生产环境建议用Redis) - private final Map tasks = new ConcurrentHashMap<>(); + // ========== 线程安全的任务存储 ========== + + // 按服务器分组存储任务(外层:serverId → 内层:taskId → Task) + private final Map> tasksByServer = new ConcurrentHashMap<>(); + + // 反向索引:taskId → serverId(快速查找任务归属) + private final Map taskToServer = new ConcurrentHashMap<>(); + + // 分段锁:每个服务器一个锁,减少锁竞争 + private final ConcurrentHashMap serverLocks = new ConcurrentHashMap<>(); /** * 构造函数 @@ -56,30 +66,42 @@ public class AsyncFileUploadService { /** * 提交上传任务 * + * @param serverId 服务器ID * @param fileName 文件名 * @param fileSize 文件大小 * @param uploadAction 上传动作(接收进度回调,执行实际上传) * @return 任务ID */ - public String submitTask(String fileName, long fileSize, + public String submitTask(Long serverId, String fileName, long fileSize, Consumer uploadAction) { - return submitTask(fileName, fileSize, null, uploadAction); + return submitTask(serverId, fileName, fileSize, null, uploadAction); } /** - * 提交上传任务(带元数据) + * 提交上传任务(带元数据,线程安全) * + * @param serverId 服务器ID * @param fileName 文件名 * @param fileSize 文件大小 - * @param metadata 元数据(如服务器ID、目标路径等) + * @param metadata 元数据(如目标路径、覆盖标记等) * @param uploadAction 上传动作(接收进度回调,执行实际上传) * @return 任务ID */ - public String submitTask(String fileName, long fileSize, Map metadata, + public String submitTask(Long serverId, String fileName, long fileSize, Map metadata, Consumer uploadAction) { // 生成任务ID String taskId = UUID.randomUUID().toString(); + // 初始化 metadata(如果为null) + if (metadata == null) { + metadata = new HashMap<>(); + } + + // 确保 metadata 包含 serverId(如果没有则添加) + if (!metadata.containsKey("serverId")) { + metadata.put("serverId", serverId); + } + // 创建任务 FileUploadTask task = FileUploadTask.builder() .taskId(taskId) @@ -95,13 +117,22 @@ public class AsyncFileUploadService { .updateTime(new Date()) .build(); - tasks.put(taskId, task); + // 🔒 获取服务器锁,保证原子性 + Object lock = serverLocks.computeIfAbsent(serverId, k -> new Object()); - log.info("提交上传任务: taskId={}, fileName={}, size={}", - taskId, fileName, FileUtils.formatFileSize(fileSize)); + synchronized (lock) { + // 同时更新两个 Map(原子操作) + tasksByServer.computeIfAbsent(serverId, k -> new ConcurrentHashMap<>()) + .put(taskId, task); + + taskToServer.put(taskId, serverId); + } + + log.info("提交上传任务: serverId={}, taskId={}, fileName={}, size={}", + serverId, taskId, fileName, FileUtils.formatFileSize(fileSize)); // 异步执行上传 - executor.submit(() -> executeUpload(taskId, task, uploadAction)); + executor.submit(() -> executeUpload(serverId, taskId, task, uploadAction)); return taskId; } @@ -109,7 +140,7 @@ public class AsyncFileUploadService { /** * 执行上传(异步) */ - private void executeUpload(String taskId, FileUploadTask task, + private void executeUpload(Long serverId, String taskId, FileUploadTask task, Consumer uploadAction) { try { // 更新状态为上传中 @@ -120,6 +151,13 @@ public class AsyncFileUploadService { // 创建进度回调 ProgressCallback progressCallback = (uploaded, total) -> { + // ⚠️ 检查任务是否被取消 + if (task.isCancelled()) { + log.info("检测到取消标记,中断上传: taskId={}, progress={}%", + taskId, (int) ((uploaded * 100) / total)); + throw new UploadCancelledException(); + } + // 计算进度 int progress = (int) ((uploaded * 100) / total); task.setUploadedSize(uploaded); @@ -143,12 +181,19 @@ public class AsyncFileUploadService { log.info("文件上传成功: taskId={}, fileName={}", taskId, task.getFileName()); } catch (Exception e) { - log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e); - task.setStatus(TaskStatus.FAILED); - task.setErrorMessage(e.getMessage()); + // 判断是取消还是失败 + if (task.isCancelled()) { + log.info("上传已取消: taskId={}, fileName={}", taskId, task.getFileName()); + task.setStatus(TaskStatus.CANCELLED); + task.setErrorMessage("用户取消上传"); + } else { + log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e); + task.setStatus(TaskStatus.FAILED); + task.setErrorMessage(e.getMessage()); + } task.setUpdateTime(new Date()); - // 发送失败通知 + // 发送状态更新通知 sendTaskUpdate(task); } } @@ -208,72 +253,162 @@ public class AsyncFileUploadService { } /** - * 查询任务状态 + * 查询任务状态(线程安全) * * @param taskId 任务ID * @return 任务信息 */ public FileUploadTask getTaskStatus(String taskId) { - return tasks.get(taskId); + Long serverId = taskToServer.get(taskId); + if (serverId == null) { + return null; + } + + Map serverTasks = tasksByServer.get(serverId); + return serverTasks != null ? serverTasks.get(taskId) : null; } /** - * 取消任务(仅能取消排队中的任务) + * 查询任务状态(带服务器验证,线程安全) * + * @param serverId 服务器ID + * @param taskId 任务ID + * @return 任务信息,如果任务不属于该服务器则返回null + */ + public FileUploadTask getTaskStatus(Long serverId, String taskId) { + // 验证任务归属 + Long actualServerId = taskToServer.get(taskId); + if (actualServerId == null || !actualServerId.equals(serverId)) { + return null; + } + + Map serverTasks = tasksByServer.get(serverId); + return serverTasks != null ? serverTasks.get(taskId) : null; + } + + /** + * 取消任务(支持取消排队中和上传中的任务,带服务器验证,线程安全) + * + * @param serverId 服务器ID * @param taskId 任务ID * @return 是否取消成功 */ - public boolean cancelTask(String taskId) { - FileUploadTask task = tasks.get(taskId); - if (task != null && task.getStatus() == TaskStatus.PENDING) { - task.setStatus(TaskStatus.CANCELLED); - task.setUpdateTime(new Date()); - sendTaskUpdate(task); - log.info("任务已取消: taskId={}", taskId); - return true; + public boolean cancelTask(Long serverId, String taskId) { + // 验证任务归属 + Long actualServerId = taskToServer.get(taskId); + if (actualServerId == null || !actualServerId.equals(serverId)) { + log.warn("任务不属于该服务器: taskId={}, requestServerId={}, actualServerId={}", + taskId, serverId, actualServerId); + return false; + } + + Object lock = serverLocks.get(serverId); + if (lock == null) { + return false; + } + + synchronized (lock) { + Map serverTasks = tasksByServer.get(serverId); + if (serverTasks == null) { + return false; + } + + FileUploadTask task = serverTasks.get(taskId); + if (task == null) { + return false; + } + + TaskStatus status = task.getStatus(); + + // ✅ 支持取消 PENDING 和 UPLOADING 状态的任务 + if (status == TaskStatus.PENDING || status == TaskStatus.UPLOADING) { + task.cancel(); // 设置取消标记 + + // 如果是 PENDING 状态,直接更新为 CANCELLED + if (status == TaskStatus.PENDING) { + task.setStatus(TaskStatus.CANCELLED); + task.setUpdateTime(new Date()); + sendTaskUpdate(task); + log.info("任务已取消: serverId={}, taskId={}, status=PENDING", serverId, taskId); + } else { + // UPLOADING 状态,会在下次进度回调时中断 + log.info("任务已标记为取消: serverId={}, taskId={}, status=UPLOADING(将在下次进度回调时中断)", + serverId, taskId); + } + + return true; + } } return false; } /** - * 清理已完成的任务(定时清理,避免内存泄漏) + * 清理已完成的任务(定时清理,避免内存泄漏,线程安全) * * @param olderThanHours 清理多少小时前的任务 * @return 清理的任务数 */ public int cleanupCompletedTasks(int olderThanHours) { long cutoffTime = System.currentTimeMillis() - (olderThanHours * 60 * 60 * 1000L); - int[] count = {0}; + int count = 0; - tasks.entrySet().removeIf(entry -> { - FileUploadTask task = entry.getValue(); - boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS || - task.getStatus() == TaskStatus.FAILED || - task.getStatus() == TaskStatus.CANCELLED) - && task.getUpdateTime().getTime() < cutoffTime; - if (shouldRemove) { - count[0]++; - log.debug("清理旧任务: taskId={}, status={}", entry.getKey(), task.getStatus()); + // 遍历所有服务器 + for (Map.Entry> serverEntry : tasksByServer.entrySet()) { + Long serverId = serverEntry.getKey(); + Map serverTasks = serverEntry.getValue(); + + Object lock = serverLocks.get(serverId); + if (lock == null) { + continue; } - return shouldRemove; - }); + + synchronized (lock) { + // 清理该服务器的旧任务 + serverTasks.entrySet().removeIf(taskEntry -> { + FileUploadTask task = taskEntry.getValue(); + boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS || + task.getStatus() == TaskStatus.FAILED || + task.getStatus() == TaskStatus.CANCELLED) + && task.getUpdateTime().getTime() < cutoffTime; + if (shouldRemove) { + // 同时从反向索引中删除 + taskToServer.remove(taskEntry.getKey()); + log.debug("清理旧任务: serverId={}, taskId={}, status={}", + serverId, taskEntry.getKey(), task.getStatus()); + } + return shouldRemove; + }); + + count += serverTasks.size(); + + // 如果服务器的任务列表空了,清理整个 Map + if (serverTasks.isEmpty()) { + tasksByServer.remove(serverId); + serverLocks.remove(serverId); + } + } + } - return count[0]; + return count; } /** - * 获取当前任务统计 + * 获取当前任务统计(线程安全) */ public Map getTaskStatistics() { - int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0; + int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0, total = 0; - for (FileUploadTask task : tasks.values()) { - switch (task.getStatus()) { - case PENDING -> pending++; - case UPLOADING -> uploading++; - case SUCCESS -> success++; - case FAILED -> failed++; - case CANCELLED -> cancelled++; + // 遍历所有服务器的任务 + for (Map serverTasks : tasksByServer.values()) { + for (FileUploadTask task : serverTasks.values()) { + total++; + switch (task.getStatus()) { + case PENDING -> pending++; + case UPLOADING -> uploading++; + case SUCCESS -> success++; + case FAILED -> failed++; + case CANCELLED -> cancelled++; + } } } @@ -283,7 +418,7 @@ public class AsyncFileUploadService { "success", success, "failed", failed, "cancelled", cancelled, - "total", tasks.size() + "total", total ); } @@ -292,7 +427,8 @@ public class AsyncFileUploadService { */ @PreDestroy public void shutdown() { - log.info("异步上传服务关闭,剩余任务: {}", tasks.size()); + int totalTasks = taskToServer.size(); + log.info("异步上传服务关闭,剩余任务: {}", totalTasks); // 虚拟线程池由Spring容器管理,无需手动关闭 } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java index b54e91eb..98cf76ae 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/file/FileUploadTask.java @@ -79,4 +79,24 @@ public class FileUploadTask { * 更新时间 */ private Date updateTime; + + /** + * 取消标记(volatile 保证多线程可见性) + * 用于支持取消正在上传的任务 + */ + private volatile boolean cancelled = false; + + /** + * 标记任务为取消状态 + */ + public void cancel() { + this.cancelled = true; + } + + /** + * 检查任务是否已被取消 + */ + public boolean isCancelled() { + return cancelled; + } } diff --git a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java index 6eb566ec..b4419a3f 100644 --- a/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java +++ b/backend/src/main/java/com/qqchen/deploy/backend/framework/ssh/websocket/AbstractSSHWebSocketHandler.java @@ -151,8 +151,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { // 懒加载:如果没有,就生成并存储(保证一致性) sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); session.getAttributes().put("sshSessionId", sessionId); - log.debug("懒加载生成sessionId: webSocketId={}, sshSessionId={}", - session.getId(), sessionId); } return sessionId; } @@ -241,10 +239,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { // 参数:终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式 sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap()); - log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId); - Session.Shell shell = sshSession.startShell(); - log.debug("Shell已启动: sessionId={}", sessionId); // 保存会话信息 webSocketSessions.put(sessionId, session); @@ -376,9 +371,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { shell.changeWindowDimensions(request.getCols(), request.getRows(), widthPixels, heightPixels); - - log.debug("SSH终端尺寸已调整: sessionId={}, cols={}, rows={}", - sessionId, request.getCols(), request.getRows()); } else { log.warn("未找到SSH Shell,无法调整尺寸: sessionId={}", sessionId); } @@ -495,53 +487,35 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { * @param shell SSH Shell */ private void readSSHOutput(WebSocketSession session, Session.Shell shell) { - // 从session.attributes中获取增强后的sessionId String sessionId = getSessionId(session); - log.debug("开始监听SSH输出: sessionId={}", sessionId); try { InputStream inputStream = shell.getInputStream(); byte[] buffer = new byte[1024]; int len; - log.debug("SSH输出流已获取,开始循环读取: sessionId={}", sessionId); - while (session.isOpen() && (len = inputStream.read(buffer)) > 0) { String output = new String(buffer, 0, len, StandardCharsets.UTF_8); - log.debug("读取到SSH输出: sessionId={}, length={}, content={}", - sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); - - log.debug("准备发送输出到前端: sessionId={}, session.isOpen={}", sessionId, session.isOpen()); sendOutput(session, output); - log.debug("SSH输出已发送到前端: sessionId={}", sessionId); } - log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen()); - } catch (java.io.InterruptedIOException e) { - // 线程被中断(正常的清理过程),检查是否是WebSocket关闭导致的 - if (!session.isOpen()) { - log.debug("SSH输出监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); - } else { - log.warn("SSH输出监听线程被中断,但WebSocket仍打开: sessionId={}", sessionId); - // 只在session仍然打开时尝试发送错误消息 + if (session.isOpen()) { + log.warn("SSH输出监听线程被中断: sessionId={}", sessionId); try { sendError(session, "SSH连接被中断"); } catch (Exception ex) { - log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + // 忽略 } } } catch (IOException e) { - // 其他IO异常(真正的错误) if (session.isOpen()) { log.error("读取SSH输出失败: sessionId={}", sessionId, e); try { sendError(session, "读取SSH输出失败: " + e.getMessage()); } catch (Exception ex) { - log.debug("发送错误消息失败(session可能已关闭): sessionId={}", sessionId); + // 忽略 } - } else { - log.debug("读取SSH输出时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); } } } @@ -554,38 +528,25 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { * @param shell SSH Shell */ private void readSSHError(WebSocketSession session, Session.Shell shell) { - // 从session.attributes中获取增强后的sessionId String sessionId = getSessionId(session); - log.debug("开始监听SSH错误流: sessionId={}", sessionId); try { InputStream errorStream = shell.getErrorStream(); byte[] buffer = new byte[1024]; int len; - log.debug("SSH错误流已获取,开始循环读取: sessionId={}", sessionId); - while (session.isOpen() && (len = errorStream.read(buffer)) > 0) { String output = new String(buffer, 0, len, StandardCharsets.UTF_8); - log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}", - sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n")); - sendOutput(session, output); // 错误流也作为output发送到前端 - log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId); + sendOutput(session, output); } - log.debug("SSH错误流监听结束: sessionId={}", sessionId); - } catch (java.io.InterruptedIOException e) { - if (!session.isOpen()) { - log.debug("SSH错误流监听线程被正常中断(WebSocket已关闭): sessionId={}", sessionId); - } else { - log.warn("SSH错误流监听线程被中断,但WebSocket仍打开: sessionId={}", sessionId); + if (session.isOpen()) { + log.warn("SSH错误流监听线程被中断: sessionId={}", sessionId); } } catch (IOException e) { if (session.isOpen()) { log.error("读取SSH错误流失败: sessionId={}", sessionId, e); - } else { - log.debug("读取SSH错误流时发生IO异常,但session已关闭(正常): sessionId={}", sessionId); } } } @@ -611,7 +572,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { * @param eventData 事件数据 */ private void cleanupSession(String sessionId, SSHEventData eventData) { - log.debug("清理会话资源: sessionId={}", sessionId); try { // 1. 触发断开前事件(异步,不阻塞清理) @@ -686,13 +646,10 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { * @param output 输出内容 */ protected void sendOutput(WebSocketSession session, String output) { - // 从session.attributes中获取sessionId String sessionId = getSessionId(session); - log.debug("→ sendOutput开始: sessionId={}, outputLength={}", sessionId, output.length()); try { if (!session.isOpen()) { - log.warn("WebSocket已关闭,跳过发送输出: sessionId={}", sessionId); return; } @@ -706,11 +663,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { // 创建消息 SSHWebSocketMessage msg = new SSHWebSocketMessage(SSHMessageType.OUTPUT, data); - - log.debug(" ├─ 准备调用session.sendMessage: sessionId={}", sessionId); session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); - - log.debug(" └─ sendOutput完成: sessionId={}", sessionId); } catch (IOException e) { log.error("发送输出消息失败(IOException): sessionId={}", sessionId, e); } catch (Exception e) { @@ -724,7 +677,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { protected void sendStatus(WebSocketSession session, SSHStatusEnum status) { try { if (!session.isOpen()) { - log.debug("WebSocket已关闭,跳过发送状态: sessionId={}", session.getId()); return; } @@ -750,7 +702,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler { protected void sendError(WebSocketSession session, String error) { try { if (!session.isOpen()) { - log.debug("WebSocket已关闭,跳过发送错误消息: sessionId={}", session.getId()); return; } diff --git a/backend/src/main/resources/messages.properties b/backend/src/main/resources/messages.properties index 0969a899..060a3529 100644 --- a/backend/src/main/resources/messages.properties +++ b/backend/src/main/resources/messages.properties @@ -47,6 +47,7 @@ ssh.file.io.error=文件IO错误 ssh.file.timeout=操作超时 ssh.file.operation.failed=文件操作失败 ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0} +ssh.file.upload.cancelled=用户取消上传 # -------------------------------------------------------------------------------------- # 业务异常 (Business Exceptions) - 2xxx diff --git a/backend/src/main/resources/messages_en_US.properties b/backend/src/main/resources/messages_en_US.properties index 1ebf30a9..cd8876c8 100644 --- a/backend/src/main/resources/messages_en_US.properties +++ b/backend/src/main/resources/messages_en_US.properties @@ -47,6 +47,7 @@ ssh.file.io.error=File IO error ssh.file.timeout=Operation timeout ssh.file.operation.failed=File operation failed ssh.file.upload.size.exceeded=File too large, maximum upload size is {0} +ssh.file.upload.cancelled=Upload cancelled by user # -------------------------------------------------------------------------------------- # Business Exceptions - 2xxx diff --git a/backend/src/main/resources/messages_zh_CN.properties b/backend/src/main/resources/messages_zh_CN.properties index da8d1bad..d023ef8e 100644 --- a/backend/src/main/resources/messages_zh_CN.properties +++ b/backend/src/main/resources/messages_zh_CN.properties @@ -47,6 +47,7 @@ ssh.file.io.error=文件IO错误 ssh.file.timeout=操作超时 ssh.file.operation.failed=文件操作失败 ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0} +ssh.file.upload.cancelled=用户取消上传 # -------------------------------------------------------------------------------------- # 业务异常 (Business Exceptions) - 2xxx