增加ssh链接框架

This commit is contained in:
dengqichen 2025-12-07 18:26:42 +08:00
parent 4d4ffabe05
commit ff149be46f
10 changed files with 296 additions and 142 deletions

View File

@ -178,10 +178,7 @@ public class ServerSSHFileApiController {
@PathVariable Long serverId, @PathVariable Long serverId,
@PathVariable String taskId @PathVariable String taskId
) { ) {
FileUploadTask task = serverSSHFileService.getUploadTaskStatus(taskId); FileUploadTask task = serverSSHFileService.getUploadTaskStatus(serverId, taskId);
if (task == null) {
return Response.error(com.qqchen.deploy.backend.framework.enums.ResponseCode.DATA_NOT_FOUND, "任务不存在");
}
return Response.success(task); return Response.success(task);
} }
@ -190,15 +187,15 @@ public class ServerSSHFileApiController {
* *
* @param serverId 服务器ID * @param serverId 服务器ID
* @param taskId 任务ID * @param taskId 任务ID
* @return 是否取消成功 * @return 是否取消成功true=取消成功false=无法取消
*/ */
@DeleteMapping("/{serverId}/files/upload-task/{taskId}") @DeleteMapping("/{serverId}/files/upload-task/{taskId}")
public Response<Map<String, Boolean>> cancelUploadTask( public Response<Boolean> cancelUploadTask(
@PathVariable Long serverId, @PathVariable Long serverId,
@PathVariable String taskId @PathVariable String taskId
) { ) {
boolean cancelled = serverSSHFileService.cancelUploadTask(taskId); boolean cancelled = serverSSHFileService.cancelUploadTask(serverId, taskId);
return Response.success(Map.of("cancelled", cancelled)); return Response.success(cancelled);
} }
/** /**

View File

@ -8,6 +8,7 @@ import com.qqchen.deploy.backend.framework.enums.ResponseCode;
import com.qqchen.deploy.backend.framework.enums.SSHEvent; import com.qqchen.deploy.backend.framework.enums.SSHEvent;
import com.qqchen.deploy.backend.framework.enums.SSHTargetType; import com.qqchen.deploy.backend.framework.enums.SSHTargetType;
import com.qqchen.deploy.backend.framework.exception.BusinessException; import com.qqchen.deploy.backend.framework.exception.BusinessException;
import com.qqchen.deploy.backend.framework.exception.UploadCancelledException;
import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory; import com.qqchen.deploy.backend.framework.ssh.SSHCommandServiceFactory;
import com.qqchen.deploy.backend.framework.ssh.file.AbstractSSHFileOperations; import com.qqchen.deploy.backend.framework.ssh.file.AbstractSSHFileOperations;
import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService; import com.qqchen.deploy.backend.framework.ssh.file.AsyncFileUploadService;
@ -28,6 +29,7 @@ import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -114,15 +116,15 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"读取文件失败: " + e.getMessage()}); throw new BusinessException(ResponseCode.INVALID_PARAM, new Object[]{"读取文件失败: " + e.getMessage()});
} }
// 准备元数据 // 准备元数据使用HashMap可变
Map<String, Object> metadata = Map.of( Map<String, Object> metadata = new HashMap<>();
"serverId", serverId, metadata.put("serverId", serverId);
"remotePath", remotePath, metadata.put("remotePath", remotePath);
"overwrite", overwrite metadata.put("overwrite", overwrite);
);
// 提交任务到Framework层 // 提交任务到Framework层
return asyncFileUploadService.submitTask( return asyncFileUploadService.submitTask(
serverId, // 🔒 传递serverId
fileName, fileName,
fileSize, fileSize,
metadata, metadata,
@ -137,16 +139,32 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
/** /**
* 查询上传任务状态 * 查询上传任务状态
*
* @param serverId 服务器ID
* @param taskId 任务ID
* @return 任务信息
* @throws BusinessException 如果任务不存在或不属于该服务器
*/ */
public FileUploadTask getUploadTaskStatus(String taskId) { public FileUploadTask getUploadTaskStatus(Long serverId, String taskId) {
return asyncFileUploadService.getTaskStatus(taskId); FileUploadTask task = asyncFileUploadService.getTaskStatus(serverId, taskId);
if (task == null) {
throw new BusinessException(ResponseCode.DATA_NOT_FOUND,
new Object[]{"任务不存在或不属于该服务器"});
}
return task;
} }
/** /**
* 取消上传任务 * 取消上传任务
*
* @param serverId 服务器ID
* @param taskId 任务ID
* @return 是否取消成功
*/ */
public boolean cancelUploadTask(String taskId) { public boolean cancelUploadTask(Long serverId, String taskId) {
return asyncFileUploadService.cancelTask(taskId); return asyncFileUploadService.cancelTask(serverId, taskId);
} }
/** /**
@ -199,6 +217,7 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream( ProgressMonitorInputStream progressStream = new ProgressMonitorInputStream(
inputStream, fileSize, uploadedBytes, progressCallback); inputStream, fileSize, uploadedBytes, progressCallback);
try {
sftpClient.put(new InMemorySourceFile() { sftpClient.put(new InMemorySourceFile() {
@Override @Override
public String getName() { public String getName() {
@ -218,6 +237,18 @@ public class ServerSSHFileService extends AbstractSSHFileOperations {
FileAttributes attrs = sftpClient.stat(remotePath); FileAttributes attrs = sftpClient.stat(remotePath);
return convertToSSHFileInfo(remotePath, attrs); return convertToSSHFileInfo(remotePath, attrs);
} catch (UploadCancelledException e) {
// 🗑 用户取消上传删除部分上传的文件
try {
sftpClient.rm(remotePath);
log.info("已删除部分上传的文件: path={}, uploadedBytes={}/{}",
remotePath, uploadedBytes.get(), fileSize);
} catch (IOException deleteEx) {
log.warn("删除部分上传文件失败: path={}", remotePath, deleteEx);
}
throw e; // 重新抛出原异常
}
} }
} }

View File

@ -40,6 +40,7 @@ public enum ResponseCode {
SSH_FILE_TIMEOUT(1211, "ssh.file.timeout"), SSH_FILE_TIMEOUT(1211, "ssh.file.timeout"),
SSH_FILE_OPERATION_FAILED(1212, "ssh.file.operation.failed"), SSH_FILE_OPERATION_FAILED(1212, "ssh.file.operation.failed"),
SSH_FILE_UPLOAD_SIZE_EXCEEDED(1213, "ssh.file.upload.size.exceeded"), SSH_FILE_UPLOAD_SIZE_EXCEEDED(1213, "ssh.file.upload.size.exceeded"),
SSH_FILE_UPLOAD_CANCELLED(1214, "ssh.file.upload.cancelled"),
// 业务异常 (2开头) // 业务异常 (2开头)
TENANT_NOT_FOUND(2001, "tenant.not.found"), TENANT_NOT_FOUND(2001, "tenant.not.found"),

View File

@ -0,0 +1,15 @@
package com.qqchen.deploy.backend.framework.exception;
import com.qqchen.deploy.backend.framework.enums.ResponseCode;
/**
* 上传取消异常
*
* 当用户主动取消文件上传时抛出此异常
*/
public class UploadCancelledException extends BusinessException {
public UploadCancelledException() {
super(ResponseCode.SSH_FILE_UPLOAD_CANCELLED);
}
}

View File

@ -1,5 +1,6 @@
package com.qqchen.deploy.backend.framework.ssh.file; package com.qqchen.deploy.backend.framework.ssh.file;
import com.qqchen.deploy.backend.framework.exception.UploadCancelledException;
import com.qqchen.deploy.backend.framework.utils.FileUtils; import com.qqchen.deploy.backend.framework.utils.FileUtils;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -10,6 +11,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -29,8 +31,16 @@ public class AsyncFileUploadService {
private final AsyncTaskExecutor executor; // 虚拟线程池来自SSHWebSocketConfig private final AsyncTaskExecutor executor; // 虚拟线程池来自SSHWebSocketConfig
// 任务缓存内存存储实际生产环境建议用Redis // ========== 线程安全的任务存储 ==========
private final Map<String, FileUploadTask> tasks = new ConcurrentHashMap<>();
// 按服务器分组存储任务外层serverId 内层taskId Task
private final Map<Long, Map<String, FileUploadTask>> tasksByServer = new ConcurrentHashMap<>();
// 反向索引taskId serverId快速查找任务归属
private final Map<String, Long> taskToServer = new ConcurrentHashMap<>();
// 分段锁每个服务器一个锁减少锁竞争
private final ConcurrentHashMap<Long, Object> serverLocks = new ConcurrentHashMap<>();
/** /**
* 构造函数 * 构造函数
@ -56,30 +66,42 @@ public class AsyncFileUploadService {
/** /**
* 提交上传任务 * 提交上传任务
* *
* @param serverId 服务器ID
* @param fileName 文件名 * @param fileName 文件名
* @param fileSize 文件大小 * @param fileSize 文件大小
* @param uploadAction 上传动作接收进度回调执行实际上传 * @param uploadAction 上传动作接收进度回调执行实际上传
* @return 任务ID * @return 任务ID
*/ */
public String submitTask(String fileName, long fileSize, public String submitTask(Long serverId, String fileName, long fileSize,
Consumer<ProgressCallback> uploadAction) { Consumer<ProgressCallback> uploadAction) {
return submitTask(fileName, fileSize, null, uploadAction); return submitTask(serverId, fileName, fileSize, null, uploadAction);
} }
/** /**
* 提交上传任务带元数据 * 提交上传任务带元数据线程安全
* *
* @param serverId 服务器ID
* @param fileName 文件名 * @param fileName 文件名
* @param fileSize 文件大小 * @param fileSize 文件大小
* @param metadata 元数据服务器ID目标路径 * @param metadata 元数据目标路径覆盖标记
* @param uploadAction 上传动作接收进度回调执行实际上传 * @param uploadAction 上传动作接收进度回调执行实际上传
* @return 任务ID * @return 任务ID
*/ */
public String submitTask(String fileName, long fileSize, Map<String, Object> metadata, public String submitTask(Long serverId, String fileName, long fileSize, Map<String, Object> metadata,
Consumer<ProgressCallback> uploadAction) { Consumer<ProgressCallback> uploadAction) {
// 生成任务ID // 生成任务ID
String taskId = UUID.randomUUID().toString(); String taskId = UUID.randomUUID().toString();
// 初始化 metadata如果为null
if (metadata == null) {
metadata = new HashMap<>();
}
// 确保 metadata 包含 serverId如果没有则添加
if (!metadata.containsKey("serverId")) {
metadata.put("serverId", serverId);
}
// 创建任务 // 创建任务
FileUploadTask task = FileUploadTask.builder() FileUploadTask task = FileUploadTask.builder()
.taskId(taskId) .taskId(taskId)
@ -95,13 +117,22 @@ public class AsyncFileUploadService {
.updateTime(new Date()) .updateTime(new Date())
.build(); .build();
tasks.put(taskId, task); // 🔒 获取服务器锁保证原子性
Object lock = serverLocks.computeIfAbsent(serverId, k -> new Object());
log.info("提交上传任务: taskId={}, fileName={}, size={}", synchronized (lock) {
taskId, fileName, FileUtils.formatFileSize(fileSize)); // 同时更新两个 Map原子操作
tasksByServer.computeIfAbsent(serverId, k -> new ConcurrentHashMap<>())
.put(taskId, task);
taskToServer.put(taskId, serverId);
}
log.info("提交上传任务: serverId={}, taskId={}, fileName={}, size={}",
serverId, taskId, fileName, FileUtils.formatFileSize(fileSize));
// 异步执行上传 // 异步执行上传
executor.submit(() -> executeUpload(taskId, task, uploadAction)); executor.submit(() -> executeUpload(serverId, taskId, task, uploadAction));
return taskId; return taskId;
} }
@ -109,7 +140,7 @@ public class AsyncFileUploadService {
/** /**
* 执行上传异步 * 执行上传异步
*/ */
private void executeUpload(String taskId, FileUploadTask task, private void executeUpload(Long serverId, String taskId, FileUploadTask task,
Consumer<ProgressCallback> uploadAction) { Consumer<ProgressCallback> uploadAction) {
try { try {
// 更新状态为上传中 // 更新状态为上传中
@ -120,6 +151,13 @@ public class AsyncFileUploadService {
// 创建进度回调 // 创建进度回调
ProgressCallback progressCallback = (uploaded, total) -> { ProgressCallback progressCallback = (uploaded, total) -> {
// 检查任务是否被取消
if (task.isCancelled()) {
log.info("检测到取消标记,中断上传: taskId={}, progress={}%",
taskId, (int) ((uploaded * 100) / total));
throw new UploadCancelledException();
}
// 计算进度 // 计算进度
int progress = (int) ((uploaded * 100) / total); int progress = (int) ((uploaded * 100) / total);
task.setUploadedSize(uploaded); task.setUploadedSize(uploaded);
@ -143,12 +181,19 @@ public class AsyncFileUploadService {
log.info("文件上传成功: taskId={}, fileName={}", taskId, task.getFileName()); log.info("文件上传成功: taskId={}, fileName={}", taskId, task.getFileName());
} catch (Exception e) { } catch (Exception e) {
// 判断是取消还是失败
if (task.isCancelled()) {
log.info("上传已取消: taskId={}, fileName={}", taskId, task.getFileName());
task.setStatus(TaskStatus.CANCELLED);
task.setErrorMessage("用户取消上传");
} else {
log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e); log.error("文件上传失败: taskId={}, fileName={}", taskId, task.getFileName(), e);
task.setStatus(TaskStatus.FAILED); task.setStatus(TaskStatus.FAILED);
task.setErrorMessage(e.getMessage()); task.setErrorMessage(e.getMessage());
}
task.setUpdateTime(new Date()); task.setUpdateTime(new Date());
// 发送失败通知 // 发送状态更新通知
sendTaskUpdate(task); sendTaskUpdate(task);
} }
} }
@ -208,66 +253,155 @@ public class AsyncFileUploadService {
} }
/** /**
* 查询任务状态 * 查询任务状态线程安全
* *
* @param taskId 任务ID * @param taskId 任务ID
* @return 任务信息 * @return 任务信息
*/ */
public FileUploadTask getTaskStatus(String taskId) { public FileUploadTask getTaskStatus(String taskId) {
return tasks.get(taskId); Long serverId = taskToServer.get(taskId);
if (serverId == null) {
return null;
}
Map<String, FileUploadTask> serverTasks = tasksByServer.get(serverId);
return serverTasks != null ? serverTasks.get(taskId) : null;
} }
/** /**
* 取消任务仅能取消排队中的任务 * 查询任务状态带服务器验证线程安全
* *
* @param serverId 服务器ID
* @param taskId 任务ID
* @return 任务信息如果任务不属于该服务器则返回null
*/
public FileUploadTask getTaskStatus(Long serverId, String taskId) {
// 验证任务归属
Long actualServerId = taskToServer.get(taskId);
if (actualServerId == null || !actualServerId.equals(serverId)) {
return null;
}
Map<String, FileUploadTask> serverTasks = tasksByServer.get(serverId);
return serverTasks != null ? serverTasks.get(taskId) : null;
}
/**
* 取消任务支持取消排队中和上传中的任务带服务器验证线程安全
*
* @param serverId 服务器ID
* @param taskId 任务ID * @param taskId 任务ID
* @return 是否取消成功 * @return 是否取消成功
*/ */
public boolean cancelTask(String taskId) { public boolean cancelTask(Long serverId, String taskId) {
FileUploadTask task = tasks.get(taskId); // 验证任务归属
if (task != null && task.getStatus() == TaskStatus.PENDING) { Long actualServerId = taskToServer.get(taskId);
if (actualServerId == null || !actualServerId.equals(serverId)) {
log.warn("任务不属于该服务器: taskId={}, requestServerId={}, actualServerId={}",
taskId, serverId, actualServerId);
return false;
}
Object lock = serverLocks.get(serverId);
if (lock == null) {
return false;
}
synchronized (lock) {
Map<String, FileUploadTask> serverTasks = tasksByServer.get(serverId);
if (serverTasks == null) {
return false;
}
FileUploadTask task = serverTasks.get(taskId);
if (task == null) {
return false;
}
TaskStatus status = task.getStatus();
// 支持取消 PENDING UPLOADING 状态的任务
if (status == TaskStatus.PENDING || status == TaskStatus.UPLOADING) {
task.cancel(); // 设置取消标记
// 如果是 PENDING 状态直接更新为 CANCELLED
if (status == TaskStatus.PENDING) {
task.setStatus(TaskStatus.CANCELLED); task.setStatus(TaskStatus.CANCELLED);
task.setUpdateTime(new Date()); task.setUpdateTime(new Date());
sendTaskUpdate(task); sendTaskUpdate(task);
log.info("任务已取消: taskId={}", taskId); log.info("任务已取消: serverId={}, taskId={}, status=PENDING", serverId, taskId);
} else {
// UPLOADING 状态会在下次进度回调时中断
log.info("任务已标记为取消: serverId={}, taskId={}, status=UPLOADING将在下次进度回调时中断",
serverId, taskId);
}
return true; return true;
} }
}
return false; return false;
} }
/** /**
* 清理已完成的任务定时清理避免内存泄漏 * 清理已完成的任务定时清理避免内存泄漏线程安全
* *
* @param olderThanHours 清理多少小时前的任务 * @param olderThanHours 清理多少小时前的任务
* @return 清理的任务数 * @return 清理的任务数
*/ */
public int cleanupCompletedTasks(int olderThanHours) { public int cleanupCompletedTasks(int olderThanHours) {
long cutoffTime = System.currentTimeMillis() - (olderThanHours * 60 * 60 * 1000L); long cutoffTime = System.currentTimeMillis() - (olderThanHours * 60 * 60 * 1000L);
int[] count = {0}; int count = 0;
tasks.entrySet().removeIf(entry -> { // 遍历所有服务器
FileUploadTask task = entry.getValue(); for (Map.Entry<Long, Map<String, FileUploadTask>> serverEntry : tasksByServer.entrySet()) {
Long serverId = serverEntry.getKey();
Map<String, FileUploadTask> serverTasks = serverEntry.getValue();
Object lock = serverLocks.get(serverId);
if (lock == null) {
continue;
}
synchronized (lock) {
// 清理该服务器的旧任务
serverTasks.entrySet().removeIf(taskEntry -> {
FileUploadTask task = taskEntry.getValue();
boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS || boolean shouldRemove = (task.getStatus() == TaskStatus.SUCCESS ||
task.getStatus() == TaskStatus.FAILED || task.getStatus() == TaskStatus.FAILED ||
task.getStatus() == TaskStatus.CANCELLED) task.getStatus() == TaskStatus.CANCELLED)
&& task.getUpdateTime().getTime() < cutoffTime; && task.getUpdateTime().getTime() < cutoffTime;
if (shouldRemove) { if (shouldRemove) {
count[0]++; // 同时从反向索引中删除
log.debug("清理旧任务: taskId={}, status={}", entry.getKey(), task.getStatus()); taskToServer.remove(taskEntry.getKey());
log.debug("清理旧任务: serverId={}, taskId={}, status={}",
serverId, taskEntry.getKey(), task.getStatus());
} }
return shouldRemove; return shouldRemove;
}); });
return count[0]; count += serverTasks.size();
// 如果服务器的任务列表空了清理整个 Map
if (serverTasks.isEmpty()) {
tasksByServer.remove(serverId);
serverLocks.remove(serverId);
}
}
}
return count;
} }
/** /**
* 获取当前任务统计 * 获取当前任务统计线程安全
*/ */
public Map<String, Integer> getTaskStatistics() { public Map<String, Integer> getTaskStatistics() {
int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0; int pending = 0, uploading = 0, success = 0, failed = 0, cancelled = 0, total = 0;
for (FileUploadTask task : tasks.values()) { // 遍历所有服务器的任务
for (Map<String, FileUploadTask> serverTasks : tasksByServer.values()) {
for (FileUploadTask task : serverTasks.values()) {
total++;
switch (task.getStatus()) { switch (task.getStatus()) {
case PENDING -> pending++; case PENDING -> pending++;
case UPLOADING -> uploading++; case UPLOADING -> uploading++;
@ -276,6 +410,7 @@ public class AsyncFileUploadService {
case CANCELLED -> cancelled++; case CANCELLED -> cancelled++;
} }
} }
}
return Map.of( return Map.of(
"pending", pending, "pending", pending,
@ -283,7 +418,7 @@ public class AsyncFileUploadService {
"success", success, "success", success,
"failed", failed, "failed", failed,
"cancelled", cancelled, "cancelled", cancelled,
"total", tasks.size() "total", total
); );
} }
@ -292,7 +427,8 @@ public class AsyncFileUploadService {
*/ */
@PreDestroy @PreDestroy
public void shutdown() { public void shutdown() {
log.info("异步上传服务关闭,剩余任务: {}", tasks.size()); int totalTasks = taskToServer.size();
log.info("异步上传服务关闭,剩余任务: {}", totalTasks);
// 虚拟线程池由Spring容器管理无需手动关闭 // 虚拟线程池由Spring容器管理无需手动关闭
} }

View File

@ -79,4 +79,24 @@ public class FileUploadTask {
* 更新时间 * 更新时间
*/ */
private Date updateTime; private Date updateTime;
/**
* 取消标记volatile 保证多线程可见性
* 用于支持取消正在上传的任务
*/
private volatile boolean cancelled = false;
/**
* 标记任务为取消状态
*/
public void cancel() {
this.cancelled = true;
}
/**
* 检查任务是否已被取消
*/
public boolean isCancelled() {
return cancelled;
}
} }

View File

@ -151,8 +151,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
// 懒加载如果没有就生成并存储保证一致性 // 懒加载如果没有就生成并存储保证一致性
sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId()); sessionId = SessionIdGenerator.enhanceWebSocketSessionId(session.getId());
session.getAttributes().put("sshSessionId", sessionId); session.getAttributes().put("sshSessionId", sessionId);
log.debug("懒加载生成sessionId: webSocketId={}, sshSessionId={}",
session.getId(), sessionId);
} }
return sessionId; return sessionId;
} }
@ -241,10 +239,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
// 参数终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式 // 参数终端类型, 列数, 行数, 宽度(像素), 高度(像素), 终端模式
sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap()); sshSession.allocatePTY("xterm", 80, 24, 0, 0, java.util.Collections.emptyMap());
log.debug("PTY已分配: sessionId={}, termType=xterm, cols=80, rows=24", sessionId);
Session.Shell shell = sshSession.startShell(); Session.Shell shell = sshSession.startShell();
log.debug("Shell已启动: sessionId={}", sessionId);
// 保存会话信息 // 保存会话信息
webSocketSessions.put(sessionId, session); webSocketSessions.put(sessionId, session);
@ -376,9 +371,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
shell.changeWindowDimensions(request.getCols(), request.getRows(), shell.changeWindowDimensions(request.getCols(), request.getRows(),
widthPixels, heightPixels); widthPixels, heightPixels);
log.debug("SSH终端尺寸已调整: sessionId={}, cols={}, rows={}",
sessionId, request.getCols(), request.getRows());
} else { } else {
log.warn("未找到SSH Shell无法调整尺寸: sessionId={}", sessionId); log.warn("未找到SSH Shell无法调整尺寸: sessionId={}", sessionId);
} }
@ -495,53 +487,35 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
* @param shell SSH Shell * @param shell SSH Shell
*/ */
private void readSSHOutput(WebSocketSession session, Session.Shell shell) { private void readSSHOutput(WebSocketSession session, Session.Shell shell) {
// 从session.attributes中获取增强后的sessionId
String sessionId = getSessionId(session); String sessionId = getSessionId(session);
log.debug("开始监听SSH输出: sessionId={}", sessionId);
try { try {
InputStream inputStream = shell.getInputStream(); InputStream inputStream = shell.getInputStream();
byte[] buffer = new byte[1024]; byte[] buffer = new byte[1024];
int len; int len;
log.debug("SSH输出流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = inputStream.read(buffer)) > 0) { while (session.isOpen() && (len = inputStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8); String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH输出: sessionId={}, length={}, content={}",
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
log.debug("准备发送输出到前端: sessionId={}, session.isOpen={}", sessionId, session.isOpen());
sendOutput(session, output); sendOutput(session, output);
log.debug("SSH输出已发送到前端: sessionId={}", sessionId);
} }
log.debug("SSH输出监听结束: sessionId={}, session.isOpen={}", sessionId, session.isOpen());
} catch (java.io.InterruptedIOException e) { } catch (java.io.InterruptedIOException e) {
// 线程被中断正常的清理过程检查是否是WebSocket关闭导致的 if (session.isOpen()) {
if (!session.isOpen()) { log.warn("SSH输出监听线程被中断: sessionId={}", sessionId);
log.debug("SSH输出监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId);
} else {
log.warn("SSH输出监听线程被中断但WebSocket仍打开: sessionId={}", sessionId);
// 只在session仍然打开时尝试发送错误消息
try { try {
sendError(session, "SSH连接被中断"); sendError(session, "SSH连接被中断");
} catch (Exception ex) { } catch (Exception ex) {
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId); // 忽略
} }
} }
} catch (IOException e) { } catch (IOException e) {
// 其他IO异常真正的错误
if (session.isOpen()) { if (session.isOpen()) {
log.error("读取SSH输出失败: sessionId={}", sessionId, e); log.error("读取SSH输出失败: sessionId={}", sessionId, e);
try { try {
sendError(session, "读取SSH输出失败: " + e.getMessage()); sendError(session, "读取SSH输出失败: " + e.getMessage());
} catch (Exception ex) { } catch (Exception ex) {
log.debug("发送错误消息失败session可能已关闭: sessionId={}", sessionId); // 忽略
} }
} else {
log.debug("读取SSH输出时发生IO异常但session已关闭正常: sessionId={}", sessionId);
} }
} }
} }
@ -554,38 +528,25 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
* @param shell SSH Shell * @param shell SSH Shell
*/ */
private void readSSHError(WebSocketSession session, Session.Shell shell) { private void readSSHError(WebSocketSession session, Session.Shell shell) {
// 从session.attributes中获取增强后的sessionId
String sessionId = getSessionId(session); String sessionId = getSessionId(session);
log.debug("开始监听SSH错误流: sessionId={}", sessionId);
try { try {
InputStream errorStream = shell.getErrorStream(); InputStream errorStream = shell.getErrorStream();
byte[] buffer = new byte[1024]; byte[] buffer = new byte[1024];
int len; int len;
log.debug("SSH错误流已获取开始循环读取: sessionId={}", sessionId);
while (session.isOpen() && (len = errorStream.read(buffer)) > 0) { while (session.isOpen() && (len = errorStream.read(buffer)) > 0) {
String output = new String(buffer, 0, len, StandardCharsets.UTF_8); String output = new String(buffer, 0, len, StandardCharsets.UTF_8);
log.debug("读取到SSH错误流输出: sessionId={}, length={}, content={}", sendOutput(session, output);
sessionId, len, output.replaceAll("\\r", "\\\\r").replaceAll("\\n", "\\\\n"));
sendOutput(session, output); // 错误流也作为output发送到前端
log.debug("SSH错误流输出已发送到前端: sessionId={}", sessionId);
} }
log.debug("SSH错误流监听结束: sessionId={}", sessionId);
} catch (java.io.InterruptedIOException e) { } catch (java.io.InterruptedIOException e) {
if (!session.isOpen()) { if (session.isOpen()) {
log.debug("SSH错误流监听线程被正常中断WebSocket已关闭: sessionId={}", sessionId); log.warn("SSH错误流监听线程被中断: sessionId={}", sessionId);
} else {
log.warn("SSH错误流监听线程被中断但WebSocket仍打开: sessionId={}", sessionId);
} }
} catch (IOException e) { } catch (IOException e) {
if (session.isOpen()) { if (session.isOpen()) {
log.error("读取SSH错误流失败: sessionId={}", sessionId, e); log.error("读取SSH错误流失败: sessionId={}", sessionId, e);
} else {
log.debug("读取SSH错误流时发生IO异常但session已关闭正常: sessionId={}", sessionId);
} }
} }
} }
@ -611,7 +572,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
* @param eventData 事件数据 * @param eventData 事件数据
*/ */
private void cleanupSession(String sessionId, SSHEventData eventData) { private void cleanupSession(String sessionId, SSHEventData eventData) {
log.debug("清理会话资源: sessionId={}", sessionId);
try { try {
// 1. 触发断开前事件异步不阻塞清理 // 1. 触发断开前事件异步不阻塞清理
@ -686,13 +646,10 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
* @param output 输出内容 * @param output 输出内容
*/ */
protected void sendOutput(WebSocketSession session, String output) { protected void sendOutput(WebSocketSession session, String output) {
// 从session.attributes中获取sessionId
String sessionId = getSessionId(session); String sessionId = getSessionId(session);
log.debug("→ sendOutput开始: sessionId={}, outputLength={}", sessionId, output.length());
try { try {
if (!session.isOpen()) { if (!session.isOpen()) {
log.warn("WebSocket已关闭跳过发送输出: sessionId={}", sessionId);
return; return;
} }
@ -706,11 +663,7 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
// 创建消息 // 创建消息
SSHWebSocketMessage msg = new SSHWebSocketMessage(SSHMessageType.OUTPUT, data); SSHWebSocketMessage msg = new SSHWebSocketMessage(SSHMessageType.OUTPUT, data);
log.debug(" ├─ 准备调用session.sendMessage: sessionId={}", sessionId);
session.sendMessage(new TextMessage(JsonUtils.toJson(msg))); session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
log.debug(" └─ sendOutput完成: sessionId={}", sessionId);
} catch (IOException e) { } catch (IOException e) {
log.error("发送输出消息失败(IOException): sessionId={}", sessionId, e); log.error("发送输出消息失败(IOException): sessionId={}", sessionId, e);
} catch (Exception e) { } catch (Exception e) {
@ -724,7 +677,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
protected void sendStatus(WebSocketSession session, SSHStatusEnum status) { protected void sendStatus(WebSocketSession session, SSHStatusEnum status) {
try { try {
if (!session.isOpen()) { if (!session.isOpen()) {
log.debug("WebSocket已关闭跳过发送状态: sessionId={}", session.getId());
return; return;
} }
@ -750,7 +702,6 @@ public abstract class AbstractSSHWebSocketHandler extends TextWebSocketHandler {
protected void sendError(WebSocketSession session, String error) { protected void sendError(WebSocketSession session, String error) {
try { try {
if (!session.isOpen()) { if (!session.isOpen()) {
log.debug("WebSocket已关闭跳过发送错误消息: sessionId={}", session.getId());
return; return;
} }

View File

@ -47,6 +47,7 @@ ssh.file.io.error=文件IO错误
ssh.file.timeout=操作超时 ssh.file.timeout=操作超时
ssh.file.operation.failed=文件操作失败 ssh.file.operation.failed=文件操作失败
ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0} ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0}
ssh.file.upload.cancelled=用户取消上传
# -------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------
# 业务异常 (Business Exceptions) - 2xxx # 业务异常 (Business Exceptions) - 2xxx

View File

@ -47,6 +47,7 @@ ssh.file.io.error=File IO error
ssh.file.timeout=Operation timeout ssh.file.timeout=Operation timeout
ssh.file.operation.failed=File operation failed ssh.file.operation.failed=File operation failed
ssh.file.upload.size.exceeded=File too large, maximum upload size is {0} ssh.file.upload.size.exceeded=File too large, maximum upload size is {0}
ssh.file.upload.cancelled=Upload cancelled by user
# -------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------
# Business Exceptions - 2xxx # Business Exceptions - 2xxx

View File

@ -47,6 +47,7 @@ ssh.file.io.error=文件IO错误
ssh.file.timeout=操作超时 ssh.file.timeout=操作超时
ssh.file.operation.failed=文件操作失败 ssh.file.operation.failed=文件操作失败
ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0} ssh.file.upload.size.exceeded=文件过大,最大允许上传 {0}
ssh.file.upload.cancelled=用户取消上传
# -------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------
# 业务异常 (Business Exceptions) - 2xxx # 业务异常 (Business Exceptions) - 2xxx