Files
etf/datasource/ssh_tunnel.py
aszerW 2fba6d82f4 fix: SSH隧道启动前清理残留进程
问题:
- 多次运行回测后残留SSH进程干扰代理连接
- yfinance因代理冲突无法获取数据

修复:
- SSHTunnelManager添加 _cleanup_old_processes 方法
- 启动新隧道前自动清理同端口残留进程

验证:
- 清理后YFinance成功下载纳指、日经、DAX等数据
2026-05-12 22:40:35 +08:00

138 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SSH隧道管理器
通过SSH隧道建立本地SOCKS5代理用于访问境外数据源
"""
import os
import sys
import time
import subprocess
from pathlib import Path
from typing import Optional
class SSHTunnelManager:
"""SSH隧道管理器"""
def __init__(self, config: dict):
"""
初始化SSH隧道
Args:
config: SSH配置字典
- host: SSH服务器地址
- port: SSH端口默认22
- username: SSH用户名
- key_path: SSH私钥路径相对或绝对
- local_port: 本地SOCKS5端口默认1080
"""
self.enabled = config.get("enabled", False)
self.host = config.get("host", "")
self.port = config.get("port", 22)
self.username = config.get("username", "root")
self.local_port = config.get("local_port", 1080)
self._process: Optional[subprocess.Popen] = None
# 处理 key_path相对路径转换为绝对路径
key_path = config.get("key_path", "")
if key_path and not os.path.isabs(key_path):
# 相对于项目根目录
project_root = Path(__file__).parent.parent
key_path = str(project_root / key_path)
self.key_path = key_path
def _cleanup_old_processes(self):
"""清理残留的同端口SSH进程"""
try:
# 查找监听同一端口的SSH进程
result = subprocess.run(
['pgrep', '-f', f'ssh.*-D.*{self.local_port}'],
capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
try:
subprocess.run(['kill', '-9', pid], check=True)
print(f" 清理残留SSH进程: PID {pid}")
except subprocess.CalledProcessError:
pass
except Exception:
pass # pgrep不可用或其他问题忽略
def start(self) -> bool:
"""启动SSH隧道"""
if not self.enabled:
return True
if not all([self.host, self.username, self.key_path]):
print("SSH配置不完整跳过隧道建立")
return False
# 先清理残留的同端口SSH进程
self._cleanup_old_processes()
print(f"建立SSH隧道: {self.host}:{self.port} -> 本地SOCKS5端口 {self.local_port}")
cmd = [
"ssh", "-N", "-D", f"127.0.0.1:{self.local_port}",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-i", self.key_path,
"-p", str(self.port),
f"{self.username}@{self.host}"
]
try:
self._process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(2)
if self._process.poll() is not None:
stdout, stderr = self._process.communicate()
print("✗ SSH隧道建立失败")
if stderr:
print(f"错误: {stderr.decode()}")
return False
# 设置代理环境变量
# 使用 socks5h:// 让代理服务器远程解析DNS避免IPv6问题
proxy_url = f"socks5h://127.0.0.1:{self.local_port}"
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
os.environ["ALL_PROXY"] = proxy_url
print(f"✓ SSH隧道已建立: {proxy_url}")
time.sleep(1)
return True
except Exception as e:
print(f"✗ SSH隧道异常: {e}")
return False
def stop(self):
"""停止SSH隧道"""
if self._process:
self._process.terminate()
self._process.wait()
for key in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY"]:
os.environ.pop(key, None)
print("SSH隧道已关闭")
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def create_ssh_tunnel_from_yaml(config_path: str) -> SSHTunnelManager:
"""从YAML配置创建SSH隧道"""
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
ssh_config = config.get('ssh_tunnel', {})
return SSHTunnelManager(ssh_config)