Files
etf/archive/legacy_examples/examples/universal_fetcher_examples.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

300 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一数据获取接口使用示例
========================
展示如何使用 UniversalDataFetcher 获取各种资产的K线数据
"""
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
from core.datasource.universal_fetcher import (
UniversalDataFetcher,
detect_asset_type,
fetch_kline
)
import pandas as pd
# ============================================================
# 示例1: 快速上手 - 获取单只标的
# ============================================================
def example_basic():
"""基础用法获取单只标的的K线数据"""
print("\n" + "="*60)
print("示例1: 基础用法")
print("="*60)
# 获取A股指数
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
if df is not None:
print(f"\n沪深300指数数据:")
print(f" 数据量: {len(df)}")
print(f" 日期范围: {df.index.min()} ~ {df.index.max()}")
print(f" 列: {list(df.columns)}")
print(f"\n最新5条数据:")
print(df.tail())
# ============================================================
# 示例2: 检测资产类型
# ============================================================
def example_detect_type():
"""自动检测资产类型"""
print("\n" + "="*60)
print("示例2: 资产类型检测")
print("="*60)
codes = [
"000300.SH", # A股指数
"510300.SH", # A股ETF
"600000.SH", # A股股票
"NDX", # 美股指数
"AAPL", # 美股股票
"HSI", # 港股指数
"AU.SHF", # 期货
"BTC", # 加密货币
]
print("\n资产类型检测结果:")
for code in codes:
asset_type = detect_asset_type(code)
print(f" {code:15s} -> {asset_type}")
# ============================================================
# 示例3: 批量获取多只标的
# ============================================================
def example_batch_fetch():
"""批量获取多只标的"""
print("\n" + "="*60)
print("示例3: 批量获取")
print("="*60)
# 定义要获取的标的列表
codes = [
"000300.SH", # 沪深300
"399006.SZ", # 创业板指
"510300.SH", # 沪深300ETF
"NDX", # 纳斯达克100
"HSTECH.HK", # 恒生科技
]
# 使用上下文管理器(推荐)
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")
# 处理结果
print("\n获取结果汇总:")
for code, df in results.items():
if df is not None:
print(f"{code:15s} {len(df):4d} 条, "
f"收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(f"{code:15s} 无数据")
# ============================================================
# 示例4: 跨市场组合分析
# ============================================================
def example_cross_market():
"""跨市场数据分析示例"""
print("\n" + "="*60)
print("示例4: 跨市场组合分析")
print("="*60)
# 定义全球资产组合
portfolio = {
"000300.SH": "沪深300",
"NDX": "纳斯达克100",
"HSI": "恒生指数",
"N225": "日经225",
"GDAXI": "德国DAX",
}
# 获取数据
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(
list(portfolio.keys()),
"2024-01-01",
"2024-12-31"
)
# 合并收盘价
close_prices = pd.DataFrame()
for code, name in portfolio.items():
if results[code] is not None:
close_prices[name] = results[code]['close']
# 计算收益率
if len(close_prices) > 0:
returns = close_prices.pct_change().dropna()
print("\n各市场收益率统计:")
print((returns.mean() * 252 * 100).round(2).to_string())
print("\n相关系数矩阵:")
print(returns.corr().round(3).to_string())
# ============================================================
# 示例5: 结合技术指标计算
# ============================================================
def example_with_indicators():
"""结合技术指标计算"""
print("\n" + "="*60)
print("示例5: 技术指标计算")
print("="*60)
# 获取数据
df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30")
if df is None:
print("数据获取失败")
return
# 计算移动平均线
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA60'] = df['close'].rolling(window=60).mean()
# 计算RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
print("\n技术指标计算结果最新10条:")
print(df[['close', 'MA5', 'MA20', 'MA60', 'RSI']].tail(10).round(2))
# ============================================================
# 示例6: 错误处理与重试
# ============================================================
def example_error_handling():
"""错误处理示例"""
print("\n" + "="*60)
print("示例6: 错误处理")
print("="*60)
# 测试无效代码
invalid_codes = ["INVALID", "999999.SH", ""]
for code in invalid_codes:
print(f"\n尝试获取: {code}")
df = fetch_kline(code, "2024-01-01", "2024-01-31")
if df is None:
print(f" ✓ 正确返回 None")
else:
print(f" ✗ 意外获取到数据")
# 测试重试机制
print("\n测试重试机制(网络错误):")
fetcher = UniversalDataFetcher()
df = fetcher.fetch("NDX", "2024-01-01", "2024-01-31", retry=3)
if df is not None:
print(f" ✓ 重试成功: {len(df)}")
else:
print(f" ✗ 重试后仍失败")
# ============================================================
# 示例7: 与现有轮动策略集成
# ============================================================
def example_integration_with_rotation():
"""与现有轮动策略集成示例"""
print("\n" + "="*60)
print("示例7: 与轮动策略集成")
print("="*60)
# 模拟轮动策略的代码配置
code_config = {
"399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"},
"NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"},
"HSI": {"name": "恒生指数", "etf": "159920.SZ", "market": "HK"},
}
# 使用 UniversalDataFetcher 获取数据
all_codes = list(code_config.keys())
etf_codes = [cfg['etf'] for cfg in code_config.values() if cfg.get('etf')]
fetcher = UniversalDataFetcher()
with fetcher:
# 获取指数数据(用于因子计算)
index_data = fetcher.fetch_multiple(
all_codes, "2024-01-01", "2024-03-31"
)
# 获取ETF数据用于收益计算
etf_data = fetcher.fetch_multiple(
etf_codes, "2024-01-01", "2024-03-31"
)
# 验证数据完整性
print("\n数据完整性检查:")
for code in all_codes:
idx_df = index_data.get(code)
etf_code = code_config[code].get('etf')
etf_df = etf_data.get(etf_code) if etf_code else None
print(f" {code}:")
print(f" 指数数据: {'' if idx_df is not None else ''} "
f"({len(idx_df) if idx_df is not None else 0} 条)")
if etf_code:
print(f" ETF数据 ({etf_code}): {'' if etf_df is not None else ''} "
f"({len(etf_df) if etf_df is not None else 0} 条)")
# ============================================================
# 运行所有示例
# ============================================================
if __name__ == "__main__":
print("\n" + "="*60)
print("统一数据获取接口 - 使用示例")
print("="*60)
# 选择要运行的示例
examples = [
("基础用法", example_basic),
("资产类型检测", example_detect_type),
("批量获取", example_batch_fetch),
("跨市场组合分析", example_cross_market),
("技术指标计算", example_with_indicators),
("错误处理", example_error_handling),
("与轮动策略集成", example_integration_with_rotation),
]
print("\n可用示例:")
for i, (name, _) in enumerate(examples, 1):
print(f" {i}. {name}")
# 运行特定示例或全部
run_all = True # 改为 False 可以选择性运行
if run_all:
for name, func in examples:
try:
func()
except Exception as e:
print(f"\n示例 '{name}' 运行失败: {e}")
import traceback
traceback.print_exc()
else:
# 运行单个示例
example_index = 0 # 修改为 1-7 运行特定示例
if 0 <= example_index < len(examples):
examples[example_index][1]()
print("\n" + "="*60)
print("示例运行完成")
print("="*60)