Files
etf/docs/universal_fetcher_README.md
aszerW 0e531a1876 docs: 添加完整项目文档
- universal_fetcher_README.md:统一数据获取接口完整文档
- universal_fetcher_QUICKSTART.md:5分钟快速上手指南
- universal_fetcher_ARCHITECTURE.md:架构设计说明
- universal_fetcher_TEST_REPORT.md:测试报告与修复记录
- flask_api_README.md:Flask API 完整文档
- FLASK_SERVICE_SUMMARY.md:项目实现总结

总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
2026-05-07 21:20:03 +08:00

9.0 KiB
Raw Permalink Blame History

统一数据获取接口 (UniversalDataFetcher)

概述

UniversalDataFetcher 是一个封装了 Tushare、YFinance、CCXT 等多个数据源的统一接口能够自动识别资产类型并路由到对应的数据源获取K线数据。

支持的资产类型

资产类型 代码格式示例 数据源 说明
A股指数 000300.SH, 399006.SZ, H30269.CSI Tushare (index_daily) 沪深交易所指数、中证指数
A股ETF 510300.SH, 159915.SZ Tushare (fund_daily) 场内交易型开放式指数基金
A股股票 600000.SH, 000001.SZ Tushare (daily) 沪深A股
港股指数 HSI, HSTECH.HK YFinance 恒生指数、恒生科技等
美股指数 NDX, SPX, DJI YFinance 纳斯达克、标普500、道琼斯
美股股票 AAPL, MSFT, GOOGL YFinance 美股个股
期货合约 AU.SHF, CU.SHF Tushare (fut_daily) 上期所、大商所等期货
加密货币 BTC, ETH CCXT/OKX 比特币、以太坊等

快速开始

1. 基础用法

from core.datasource.universal_fetcher import fetch_kline

# 获取A股指数
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
print(df.tail())

输出:

                 open    high     low   close    volume      code
date                                                            
2024-03-25  3564.488  3574.77  3547.43  3559.70  2.8576e+08  000300.SH
2024-03-26  3561.748  3592.50  3561.75  3591.53  2.9412e+08  000300.SH
2024-03-27  3587.960  3595.71  3561.65  3564.16  3.0183e+08  000300.SH
2024-03-28  3562.771  3575.31  3546.99  3570.56  2.7411e+08  000300.SH
2024-03-29  3573.374  3586.97  3568.63  3586.97  2.5862e+08  000300.SH

2. 检测资产类型

from core.datasource.universal_fetcher import detect_asset_type

codes = ["000300.SH", "510300.SH", "NDX", "AAPL", "BTC", "AU.SHF"]

for code in codes:
    asset_type = detect_asset_type(code)
    print(f"{code:15s} -> {asset_type}")

输出:

000300.SH       -> china_index
510300.SH       -> china_etf
NDX             -> us_index
AAPL            -> us_stock
BTC             -> crypto
AU.SHF          -> futures

3. 批量获取

from core.datasource.universal_fetcher import UniversalDataFetcher

codes = ["000300.SH", "NDX", "HSI", "AU.SHF"]

fetcher = UniversalDataFetcher()
with fetcher:
    results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")

for code, df in results.items():
    if df is not None:
        print(f"✓ {code}: {len(df)} 条, 最新收盘价 {df['close'].iloc[-1]:.3f}")

4. 配置SSH隧道访问受限数据源

ssh_config = {
    "enabled": True,
    "host": "your-server.com",
    "port": 22,
    "username": "your-username",
    "key_path": "/path/to/private/key.pem",
    "local_port": 1080,
}

fetcher = UniversalDataFetcher(ssh_config=ssh_config)
with fetcher:
    df = fetcher.fetch("NDX", "2024-01-01", "2024-03-31")

API 参考

UniversalDataFetcher 类

初始化

UniversalDataFetcher(ssh_config: Optional[dict] = None, use_cache: bool = True)

参数:

  • ssh_config: SSH隧道配置字典可选
  • use_cache: 是否使用缓存(可选,默认 True

fetch() - 获取单只标的

fetcher.fetch(
    code: str,
    start_date: str,
    end_date: str,
    retry: int = 3
) -> Optional[pd.DataFrame]

参数:

  • code: 标的代码
  • start_date: 开始日期,格式 'YYYY-MM-DD'
  • end_date: 结束日期,格式 'YYYY-MM-DD'
  • retry: 重试次数(默认 3

返回:

  • DataFrame包含列: [open, high, low, close, volume, code]
  • 索引为日期DatetimeIndex
  • 失败时返回 None

fetch_multiple() - 批量获取

fetcher.fetch_multiple(
    codes: List[str],
    start_date: str,
    end_date: str,
    retry: int = 3
) -> Dict[str, Optional[pd.DataFrame]]

返回:

  • 字典 {code: DataFrame}

便捷函数

fetch_kline()

from core.datasource.universal_fetcher import fetch_kline

df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")

detect_asset_type()

from core.datasource.universal_fetcher import detect_asset_type

asset_type = detect_asset_type("000300.SH")  # 返回 'china_index'

使用示例

示例1: 跨市场组合分析

from core.datasource.universal_fetcher import UniversalDataFetcher
import pandas as pd

portfolio = {
    "000300.SH": "沪深300",
    "NDX": "纳斯达克100",
    "HSI": "恒生指数",
    "N225": "日经225",
}

fetcher = UniversalDataFetcher()
with fetcher:
    results = fetcher.fetch_multiple(
        list(portfolio.keys()),
        "2024-01-01",
        "2024-12-31"
    )

# 合并收盘价
close_prices = pd.DataFrame()
for code, name in portfolio.items():
    if results[code] is not None:
        close_prices[name] = results[code]['close']

# 计算年化收益率
returns = close_prices.pct_change().dropna()
annual_returns = (returns.mean() * 252 * 100).round(2)
print(annual_returns)

示例2: 结合技术指标

from core.datasource.universal_fetcher import fetch_kline

df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30")

# 计算移动平均线
df['MA5'] = df['close'].rolling(5).mean()
df['MA20'] = df['close'].rolling(20).mean()

# 计算RSI
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
df['RSI'] = 100 - (100 / (1 + gain/loss))

print(df[['close', 'MA5', 'MA20', 'RSI']].tail())

示例3: 与轮动策略集成

from core.datasource.universal_fetcher import UniversalDataFetcher

code_config = {
    "399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"},
    "NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"},
}

all_codes = list(code_config.keys())
etf_codes = [cfg['etf'] for cfg in code_config.values()]

fetcher = UniversalDataFetcher()
with fetcher:
    # 获取指数数据(用于因子计算)
    index_data = fetcher.fetch_multiple(all_codes, "2024-01-01", "2024-03-31")
    
    # 获取ETF数据用于收益计算
    etf_data = fetcher.fetch_multiple(etf_codes, "2024-01-01", "2024-03-31")

数据源路由逻辑

输入代码
  ↓
资产类型检测器 (AssetTypeDetector)
  ↓
┌─────────────────────────────────┐
│  检测规则(按优先级):            │
│  1. 加密货币代码集合匹配          │
│  2. 期货后缀匹配 (.SHF等)        │
│  3. 港股后缀匹配 (.HK)           │
│  4. A股后缀匹配 (.SH/.SZ等)      │
│  5. 美股指数映射表匹配            │
│  6. 默认: 美股股票               │
└─────────────────────────────────┘
  ↓
数据源路由:
  ├─ china_index/etf/stock → Tushare
  ├─ hk/us_index/stock     → YFinance
  ├─ futures               → Tushare (fut_daily)
  └─ crypto                → CCXT/OKX

A股代码分类规则

6位数字代码:
  ├─ 指数前缀: 000, 001, 002, 399, 930, 931, 932
  │   └─ → china_index
  ├─ ETF前缀: 51, 52, 56, 58, 15, 16
  │   └─ → china_etf
  └─ 其他
      └─ → china_stock

环境要求

必需

  • Python 3.8+
  • pandas
  • yfinance

按数据源

  • Tushare: 需要设置环境变量 TUSHARE_TOKEN
  • YFinance: 网络可达中国地区可能需要SSH隧道
  • CCXT: 需要安装 ccxt

安装依赖

pip install pandas yfinance tushare ccxt

配置 Tushare Token

.env 文件中添加:

TUSHARE_TOKEN=your_tushare_api_token

测试

运行测试脚本:

python tests/test_universal_fetcher.py

示例代码

查看更多示例:

python examples/universal_fetcher_examples.py

与现有代码的关系

UniversalDataFetcher 是对现有 HybridDataSource 的封装和扩展:

  • 复用: 内部使用 HybridDataSource 的底层数据获取逻辑
  • 扩展: 增加了对A股股票、自动资产类型检测、便捷函数等功能
  • 简化: 提供了更简单的API无需手动判断数据源

常见问题

Q: 为什么获取美股数据失败?

A: YFinance 在中国大陆可能被限制需要配置SSH隧道或使用代理。

Q: 如何获取ETF净值数据

A: 当前接口返回的是交易价格(收盘价)。如需净值数据,请直接使用 HybridDataSource._fetch_etf_nav() 方法。

Q: 支持分钟级数据吗?

A: 当前仅支持日线数据。如需分钟级数据,需要扩展接口。

Q: 如何添加新的资产类型?

A: 在 AssetTypeDetector 中添加检测规则,在 UniversalDataFetcher 中添加对应的获取方法即可。

更新日志

v1.0.0 (2024-XX-XX)

  • 初始版本
  • 支持8种资产类型
  • 自动资产类型检测
  • 批量获取功能
  • SSH隧道支持
  • 完善的测试和示例

许可证

MIT License