Files
etf/archive/legacy_tests/tests/test_universal_fetcher.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

229 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一数据获取接口测试
====================
测试各种资产类型的K线数据获取
"""
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
from core.datasource.universal_fetcher import (
UniversalDataFetcher,
AssetTypeDetector,
detect_asset_type,
fetch_kline
)
def test_asset_detection():
"""测试资产类型检测"""
print("\n" + "="*60)
print("测试1: 资产类型检测")
print("="*60)
test_cases = [
# A股指数
("000300.SH", "china_index"),
("399006.SZ", "china_index"),
("H30269.CSI", "china_index"),
# A股ETF
("510300.SH", "china_etf"),
("159915.SZ", "china_etf"),
("513100.SH", "china_etf"),
# A股股票
("600000.SH", "china_stock"),
("000001.SZ", "china_stock"),
# 港股
("HSI", "hk_index"),
("HSTECH.HK", "hk_index"),
# 美股
("NDX", "us_index"),
("SPX", "us_index"),
("AAPL", "us_stock"),
# 期货
("AU.SHF", "futures"),
("CU.SHF", "futures"),
# 加密货币
("BTC", "crypto"),
("ETH", "crypto"),
]
correct = 0
for code, expected in test_cases:
result = detect_asset_type(code)
status = "" if result == expected else ""
if result == expected:
correct += 1
print(f" {status} {code:15s} -> {result:15s} (期望: {expected})")
print(f"\n检测准确率: {correct}/{len(test_cases)} ({100*correct/len(test_cases):.1f}%)")
def test_single_fetch():
"""测试单只标的获取"""
print("\n" + "="*60)
print("测试2: 单只标的获取")
print("="*60)
# 测试A股指数
print("\n[A股指数] 000300.SH (沪深300)")
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
if df is not None:
print(f" ✓ 获取成功: {len(df)}")
print(f" 日期范围: {df.index.min()} ~ {df.index.max()}")
print(f" 列: {list(df.columns)}")
print(f" 最新数据:\n{df.tail(3)}")
else:
print(" ✗ 获取失败")
# 测试A股ETF
print("\n[A股ETF] 510300.SH (沪深300ETF)")
df = fetch_kline("510300.SH", "2024-01-01", "2024-03-31")
if df is not None:
print(f" ✓ 获取成功: {len(df)}")
print(f" 最新收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(" ✗ 获取失败")
# 测试美股指数
print("\n[美股指数] NDX (纳斯达克100)")
df = fetch_kline("NDX", "2024-01-01", "2024-03-31")
if df is not None:
print(f" ✓ 获取成功: {len(df)}")
print(f" 最新收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(" ✗ 获取失败可能需要SSH隧道")
# 测试港股指数
print("\n[港股指数] HSI (恒生指数)")
df = fetch_kline("HSI", "2024-01-01", "2024-03-31")
if df is not None:
print(f" ✓ 获取成功: {len(df)}")
print(f" 最新收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(" ✗ 获取失败可能需要SSH隧道")
def test_multiple_fetch():
"""测试批量获取"""
print("\n" + "="*60)
print("测试3: 批量获取")
print("="*60)
codes = [
"000300.SH", # A股指数
"510300.SH", # A股ETF
"NDX", # 美股指数
"HSI", # 港股指数
"AU.SHF", # 期货
# "BTC", # 加密货币需要SSH隧道
]
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")
print(f"\n获取结果:")
for code, df in results.items():
if df is not None:
print(f"{code:15s} {len(df):4d} 条, "
f"最新收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(f"{code:15s} 无数据")
def test_context_manager():
"""测试上下文管理器SSH隧道"""
print("\n" + "="*60)
print("测试4: 上下文管理器SSH隧道")
print("="*60)
# 不启用SSH
print("\n[不启用SSH] 获取A股数据应成功")
fetcher = UniversalDataFetcher(ssh_config={"enabled": False})
with fetcher:
df = fetcher.fetch("000300.SH", "2024-01-01", "2024-01-31")
if df is not None:
print(f" ✓ 成功: {len(df)}")
else:
print(" ✗ 失败")
# 启用SSH如果配置了
ssh_config = {
"enabled": False, # 改为 True 并填入实际配置以测试
"host": "",
"port": 22,
"username": "",
"key_path": "",
"local_port": 1080,
}
if ssh_config["enabled"]:
print("\n[启用SSH] 获取美股数据")
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
with fetcher:
df = fetcher.fetch("NDX", "2024-01-01", "2024-01-31")
if df is not None:
print(f" ✓ 成功: {len(df)}")
else:
print(" ✗ 失败")
else:
print("\n[跳过SSH测试] SSH未启用")
def test_edge_cases():
"""测试边界情况"""
print("\n" + "="*60)
print("测试5: 边界情况")
print("="*60)
# 测试无效代码
print("\n[无效代码] INVALID")
df = fetch_kline("INVALID", "2024-01-01", "2024-01-31")
print(f" 结果: {df}")
# 测试日期范围
print("\n[空日期范围]")
df = fetch_kline("000300.SH", "2030-01-01", "2030-01-31")
if df is None or len(df) == 0:
print(" ✓ 正确处理(无数据)")
else:
print(f" ✗ 意外获取到数据: {len(df)}")
# 测试代码格式转换
print("\n[代码格式转换] 000300.SS -> 000300.SH")
df = fetch_kline("000300.SS", "2024-01-01", "2024-01-31")
if df is not None:
print(f" ✓ 转换成功: {len(df)}")
else:
print(" ✗ 失败")
if __name__ == "__main__":
print("\n" + "="*60)
print("统一数据获取接口测试")
print("="*60)
# 运行所有测试
test_asset_detection()
test_single_fetch()
test_multiple_fetch()
test_context_manager()
test_edge_cases()
print("\n" + "="*60)
print("测试完成")
print("="*60)