- universal_fetcher_README.md:统一数据获取接口完整文档 - universal_fetcher_QUICKSTART.md:5分钟快速上手指南 - universal_fetcher_ARCHITECTURE.md:架构设计说明 - universal_fetcher_TEST_REPORT.md:测试报告与修复记录 - flask_api_README.md:Flask API 完整文档 - FLASK_SERVICE_SUMMARY.md:项目实现总结 总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
9.0 KiB
9.0 KiB
统一数据获取接口 (UniversalDataFetcher)
概述
UniversalDataFetcher 是一个封装了 Tushare、YFinance、CCXT 等多个数据源的统一接口,能够自动识别资产类型并路由到对应的数据源获取K线数据。
支持的资产类型
| 资产类型 | 代码格式示例 | 数据源 | 说明 |
|---|---|---|---|
| A股指数 | 000300.SH, 399006.SZ, H30269.CSI |
Tushare (index_daily) | 沪深交易所指数、中证指数 |
| A股ETF | 510300.SH, 159915.SZ |
Tushare (fund_daily) | 场内交易型开放式指数基金 |
| A股股票 | 600000.SH, 000001.SZ |
Tushare (daily) | 沪深A股 |
| 港股指数 | HSI, HSTECH.HK |
YFinance | 恒生指数、恒生科技等 |
| 美股指数 | NDX, SPX, DJI |
YFinance | 纳斯达克、标普500、道琼斯 |
| 美股股票 | AAPL, MSFT, GOOGL |
YFinance | 美股个股 |
| 期货合约 | AU.SHF, CU.SHF |
Tushare (fut_daily) | 上期所、大商所等期货 |
| 加密货币 | BTC, ETH |
CCXT/OKX | 比特币、以太坊等 |
快速开始
1. 基础用法
from core.datasource.universal_fetcher import fetch_kline
# 获取A股指数
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
print(df.tail())
输出:
open high low close volume code
date
2024-03-25 3564.488 3574.77 3547.43 3559.70 2.8576e+08 000300.SH
2024-03-26 3561.748 3592.50 3561.75 3591.53 2.9412e+08 000300.SH
2024-03-27 3587.960 3595.71 3561.65 3564.16 3.0183e+08 000300.SH
2024-03-28 3562.771 3575.31 3546.99 3570.56 2.7411e+08 000300.SH
2024-03-29 3573.374 3586.97 3568.63 3586.97 2.5862e+08 000300.SH
2. 检测资产类型
from core.datasource.universal_fetcher import detect_asset_type
codes = ["000300.SH", "510300.SH", "NDX", "AAPL", "BTC", "AU.SHF"]
for code in codes:
asset_type = detect_asset_type(code)
print(f"{code:15s} -> {asset_type}")
输出:
000300.SH -> china_index
510300.SH -> china_etf
NDX -> us_index
AAPL -> us_stock
BTC -> crypto
AU.SHF -> futures
3. 批量获取
from core.datasource.universal_fetcher import UniversalDataFetcher
codes = ["000300.SH", "NDX", "HSI", "AU.SHF"]
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")
for code, df in results.items():
if df is not None:
print(f"✓ {code}: {len(df)} 条, 最新收盘价 {df['close'].iloc[-1]:.3f}")
4. 配置SSH隧道(访问受限数据源)
ssh_config = {
"enabled": True,
"host": "your-server.com",
"port": 22,
"username": "your-username",
"key_path": "/path/to/private/key.pem",
"local_port": 1080,
}
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
with fetcher:
df = fetcher.fetch("NDX", "2024-01-01", "2024-03-31")
API 参考
UniversalDataFetcher 类
初始化
UniversalDataFetcher(ssh_config: Optional[dict] = None, use_cache: bool = True)
参数:
ssh_config: SSH隧道配置字典(可选)use_cache: 是否使用缓存(可选,默认 True)
fetch() - 获取单只标的
fetcher.fetch(
code: str,
start_date: str,
end_date: str,
retry: int = 3
) -> Optional[pd.DataFrame]
参数:
code: 标的代码start_date: 开始日期,格式 'YYYY-MM-DD'end_date: 结束日期,格式 'YYYY-MM-DD'retry: 重试次数(默认 3)
返回:
- DataFrame,包含列:
[open, high, low, close, volume, code] - 索引为日期(DatetimeIndex)
- 失败时返回 None
fetch_multiple() - 批量获取
fetcher.fetch_multiple(
codes: List[str],
start_date: str,
end_date: str,
retry: int = 3
) -> Dict[str, Optional[pd.DataFrame]]
返回:
- 字典
{code: DataFrame}
便捷函数
fetch_kline()
from core.datasource.universal_fetcher import fetch_kline
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
detect_asset_type()
from core.datasource.universal_fetcher import detect_asset_type
asset_type = detect_asset_type("000300.SH") # 返回 'china_index'
使用示例
示例1: 跨市场组合分析
from core.datasource.universal_fetcher import UniversalDataFetcher
import pandas as pd
portfolio = {
"000300.SH": "沪深300",
"NDX": "纳斯达克100",
"HSI": "恒生指数",
"N225": "日经225",
}
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(
list(portfolio.keys()),
"2024-01-01",
"2024-12-31"
)
# 合并收盘价
close_prices = pd.DataFrame()
for code, name in portfolio.items():
if results[code] is not None:
close_prices[name] = results[code]['close']
# 计算年化收益率
returns = close_prices.pct_change().dropna()
annual_returns = (returns.mean() * 252 * 100).round(2)
print(annual_returns)
示例2: 结合技术指标
from core.datasource.universal_fetcher import fetch_kline
df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30")
# 计算移动平均线
df['MA5'] = df['close'].rolling(5).mean()
df['MA20'] = df['close'].rolling(20).mean()
# 计算RSI
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
df['RSI'] = 100 - (100 / (1 + gain/loss))
print(df[['close', 'MA5', 'MA20', 'RSI']].tail())
示例3: 与轮动策略集成
from core.datasource.universal_fetcher import UniversalDataFetcher
code_config = {
"399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"},
"NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"},
}
all_codes = list(code_config.keys())
etf_codes = [cfg['etf'] for cfg in code_config.values()]
fetcher = UniversalDataFetcher()
with fetcher:
# 获取指数数据(用于因子计算)
index_data = fetcher.fetch_multiple(all_codes, "2024-01-01", "2024-03-31")
# 获取ETF数据(用于收益计算)
etf_data = fetcher.fetch_multiple(etf_codes, "2024-01-01", "2024-03-31")
数据源路由逻辑
输入代码
↓
资产类型检测器 (AssetTypeDetector)
↓
┌─────────────────────────────────┐
│ 检测规则(按优先级): │
│ 1. 加密货币代码集合匹配 │
│ 2. 期货后缀匹配 (.SHF等) │
│ 3. 港股后缀匹配 (.HK) │
│ 4. A股后缀匹配 (.SH/.SZ等) │
│ 5. 美股指数映射表匹配 │
│ 6. 默认: 美股股票 │
└─────────────────────────────────┘
↓
数据源路由:
├─ china_index/etf/stock → Tushare
├─ hk/us_index/stock → YFinance
├─ futures → Tushare (fut_daily)
└─ crypto → CCXT/OKX
A股代码分类规则
6位数字代码:
├─ 指数前缀: 000, 001, 002, 399, 930, 931, 932
│ └─ → china_index
├─ ETF前缀: 51, 52, 56, 58, 15, 16
│ └─ → china_etf
└─ 其他
└─ → china_stock
环境要求
必需
- Python 3.8+
- pandas
- yfinance
按数据源
- Tushare: 需要设置环境变量
TUSHARE_TOKEN - YFinance: 网络可达(中国地区可能需要SSH隧道)
- CCXT: 需要安装
ccxt库
安装依赖
pip install pandas yfinance tushare ccxt
配置 Tushare Token
在 .env 文件中添加:
TUSHARE_TOKEN=your_tushare_api_token
测试
运行测试脚本:
python tests/test_universal_fetcher.py
示例代码
查看更多示例:
python examples/universal_fetcher_examples.py
与现有代码的关系
UniversalDataFetcher 是对现有 HybridDataSource 的封装和扩展:
- 复用: 内部使用
HybridDataSource的底层数据获取逻辑 - 扩展: 增加了对A股股票、自动资产类型检测、便捷函数等功能
- 简化: 提供了更简单的API,无需手动判断数据源
常见问题
Q: 为什么获取美股数据失败?
A: YFinance 在中国大陆可能被限制,需要配置SSH隧道或使用代理。
Q: 如何获取ETF净值数据?
A: 当前接口返回的是交易价格(收盘价)。如需净值数据,请直接使用 HybridDataSource._fetch_etf_nav() 方法。
Q: 支持分钟级数据吗?
A: 当前仅支持日线数据。如需分钟级数据,需要扩展接口。
Q: 如何添加新的资产类型?
A: 在 AssetTypeDetector 中添加检测规则,在 UniversalDataFetcher 中添加对应的获取方法即可。
更新日志
v1.0.0 (2024-XX-XX)
- 初始版本
- 支持8种资产类型
- 自动资产类型检测
- 批量获取功能
- SSH隧道支持
- 完善的测试和示例
许可证
MIT License