Files
etf/archive/legacy_tests/tests/test_ssh_tunnel.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

218 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
使用香港服务器 SSH 隧道测试 UniversalDataFetcher
==============================================
通过阿里云香港 ECS 服务器建立 SOCKS5 代理,获取港美股数据
"""
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
from core.datasource.universal_fetcher import UniversalDataFetcher, fetch_kline
def test_with_ssh_tunnel():
"""使用 SSH 隧道获取港美股数据"""
print("\n" + "="*60)
print("使用香港服务器 SSH 隧道获取数据")
print("="*60)
# SSH 隧道配置(使用 hk_ecs.pem
ssh_config = {
"enabled": True,
"host": "8.218.167.69", # 阿里云香港 ECS IP
"port": 22, # SSH 端口
"username": "root", # SSH 用户名
"key_path": "hk_ecs.pem", # SSH 私钥路径(相对于项目根目录)
"local_port": 1080, # 本地 SOCKS5 代理端口
}
# 要测试的标的
test_codes = [
("NDX", "纳斯达克100"),
("SPX", "标普500"),
("HSI", "恒生指数"),
("N225", "日经225"),
("GDAXI", "德国DAX"),
]
print(f"\nSSH 配置:")
print(f" 服务器: {ssh_config['host']}:{ssh_config['port']}")
print(f" 用户名: {ssh_config['username']}")
print(f" 私钥: {ssh_config['key_path']}")
print(f" 本地端口: {ssh_config['local_port']}")
# 创建带 SSH 隧道的数据获取器
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
print("\n正在建立 SSH 隧道...")
with fetcher:
print("✓ SSH 隧道已建立")
print("\n开始获取数据...")
for code, name in test_codes:
print(f"\n[{code}] {name}")
try:
df = fetcher.fetch(code, "2024-01-01", "2024-03-31")
if df is not None and len(df) > 0:
print(f" ✓ 获取成功: {len(df)}")
print(f" 日期范围: {df.index.min().strftime('%Y-%m-%d')} ~ {df.index.max().strftime('%Y-%m-%d')}")
print(f" 最新收盘价: {df['close'].iloc[-1]:.3f}")
print(f" 数据预览:")
print(df.tail(3)[['open', 'high', 'low', 'close']].to_string())
else:
print(f" ✗ 无数据")
except Exception as e:
print(f" ✗ 错误: {e}")
print("\n✓ SSH 隧道已关闭")
def test_without_ssh():
"""不使用 SSH 隧道仅获取A股数据"""
print("\n" + "="*60)
print("不使用 SSH 隧道仅A股")
print("="*60)
test_codes = [
("000300.SH", "沪深300"),
("510300.SH", "沪深300ETF"),
("AU.SHF", "黄金期货"),
]
fetcher = UniversalDataFetcher(ssh_config={"enabled": False})
with fetcher:
for code, name in test_codes:
print(f"\n[{code}] {name}")
df = fetcher.fetch(code, "2024-01-01", "2024-03-31")
if df is not None:
print(f"{len(df)} 条, 最新: {df['close'].iloc[-1]:.3f}")
else:
print(f" ✗ 无数据")
def test_mixed_markets():
"""混合市场测试A股 + 港美股)"""
print("\n" + "="*60)
print("混合市场测试A股 + 港美股)")
print("="*60)
ssh_config = {
"enabled": True,
"host": "8.218.167.69",
"port": 22,
"username": "root",
"key_path": "hk_ecs.pem",
"local_port": 1080,
}
# 混合代码
codes = [
"000300.SH", # A股指数
"NDX", # 美股指数
"HSI", # 港股指数
"510300.SH", # A股ETF
"N225", # 日本指数
]
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
print("\n开始批量获取...")
with fetcher:
results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")
print("\n获取结果:")
for code, df in results.items():
if df is not None:
print(f"{code:15s} {len(df):4d} 条, "
f"最新: {df['close'].iloc[-1]:.3f}")
else:
print(f"{code:15s} 无数据")
def test_ssh_connection_only():
"""仅测试 SSH 连接"""
print("\n" + "="*60)
print("测试 SSH 连接")
print("="*60)
ssh_config = {
"enabled": True,
"host": "8.218.167.69",
"port": 22,
"username": "root",
"key_path": "hk_ecs.pem",
"local_port": 1080,
}
print("\n正在建立 SSH 隧道...")
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
try:
with fetcher:
print("✓ SSH 隧道建立成功!")
print(f" 本地 SOCKS5 代理: socks5h://127.0.0.1:{ssh_config['local_port']}")
# 测试一个简单的 HTTP 请求
import requests
print("\n测试代理连接...")
try:
response = requests.get(
"https://www.google.com",
proxies={
"http": f"socks5h://127.0.0.1:{ssh_config['local_port']}",
"https": f"socks5h://127.0.0.1:{ssh_config['local_port']}",
},
timeout=10
)
print(f"✓ 代理测试成功!状态码: {response.status_code}")
except Exception as e:
print(f"✗ 代理测试失败: {e}")
print("\n✓ SSH 隧道已关闭")
except Exception as e:
print(f"✗ SSH 隧道建立失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
print("\n" + "="*60)
print("UniversalDataFetcher + 香港 SSH 隧道测试")
print("="*60)
# 选择要运行的测试
tests = [
("1", "仅测试 SSH 连接", test_ssh_connection_only),
("2", "不使用 SSH仅A股", test_without_ssh),
("3", "使用 SSH 隧道(港美股)", test_with_ssh_tunnel),
("4", "混合市场测试", test_mixed_markets),
]
print("\n可用测试:")
for num, name, _ in tests:
print(f" {num}. {name}")
# 运行所有测试
print("\n" + "="*60)
print("运行所有测试...")
print("="*60)
for num, name, func in tests:
try:
func()
except Exception as e:
print(f"\n测试 '{name}' 失败: {e}")
import traceback
traceback.print_exc()
print("\n" + "="*60)
print("测试完成")
print("="*60)