""" SSH隧道管理器 通过SSH隧道建立本地SOCKS5代理,用于访问境外数据源 """ import os import sys import time import subprocess from pathlib import Path from typing import Optional class SSHTunnelManager: """SSH隧道管理器""" def __init__(self, config: dict): """ 初始化SSH隧道 Args: config: SSH配置字典 - host: SSH服务器地址 - port: SSH端口(默认22) - username: SSH用户名 - key_path: SSH私钥路径(相对或绝对) - local_port: 本地SOCKS5端口(默认1080) """ self.enabled = config.get("enabled", False) self.host = config.get("host", "") self.port = config.get("port", 22) self.username = config.get("username", "root") self.local_port = config.get("local_port", 1080) self._process: Optional[subprocess.Popen] = None # 处理 key_path:相对路径转换为绝对路径 key_path = config.get("key_path", "") if key_path and not os.path.isabs(key_path): # 相对于项目根目录 project_root = Path(__file__).parent.parent key_path = str(project_root / key_path) self.key_path = key_path def _cleanup_old_processes(self): """清理残留的同端口SSH进程""" try: # 查找监听同一端口的SSH进程 result = subprocess.run( ['pgrep', '-f', f'ssh.*-D.*{self.local_port}'], capture_output=True, text=True ) if result.returncode == 0 and result.stdout.strip(): pids = result.stdout.strip().split('\n') for pid in pids: try: subprocess.run(['kill', '-9', pid], check=True) print(f" 清理残留SSH进程: PID {pid}") except subprocess.CalledProcessError: pass except Exception: pass # pgrep不可用或其他问题,忽略 def start(self) -> bool: """启动SSH隧道""" if not self.enabled: return True if not all([self.host, self.username, self.key_path]): print("SSH配置不完整,跳过隧道建立") return False # 先清理残留的同端口SSH进程 self._cleanup_old_processes() print(f"建立SSH隧道: {self.host}:{self.port} -> 本地SOCKS5端口 {self.local_port}") cmd = [ "ssh", "-N", "-D", f"127.0.0.1:{self.local_port}", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", self.key_path, "-p", str(self.port), f"{self.username}@{self.host}" ] try: self._process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(2) if self._process.poll() is not None: stdout, stderr = self._process.communicate() print("✗ SSH隧道建立失败") if stderr: print(f"错误: {stderr.decode()}") return False # 设置代理环境变量 # 使用 socks5h:// 让代理服务器远程解析DNS,避免IPv6问题 proxy_url = f"socks5h://127.0.0.1:{self.local_port}" os.environ["HTTP_PROXY"] = proxy_url os.environ["HTTPS_PROXY"] = proxy_url os.environ["ALL_PROXY"] = proxy_url print(f"✓ SSH隧道已建立: {proxy_url}") time.sleep(1) return True except Exception as e: print(f"✗ SSH隧道异常: {e}") return False def stop(self): """停止SSH隧道""" if self._process: self._process.terminate() self._process.wait() for key in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"]: os.environ.pop(key, None) print("SSH隧道已关闭") def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() def create_ssh_tunnel_from_yaml(config_path: str) -> SSHTunnelManager: """从YAML配置创建SSH隧道""" import yaml with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) ssh_config = config.get('ssh_tunnel', {}) return SSHTunnelManager(ssh_config)