- universal_fetcher_README.md:统一数据获取接口完整文档 - universal_fetcher_QUICKSTART.md:5分钟快速上手指南 - universal_fetcher_ARCHITECTURE.md:架构设计说明 - universal_fetcher_TEST_REPORT.md:测试报告与修复记录 - flask_api_README.md:Flask API 完整文档 - FLASK_SERVICE_SUMMARY.md:项目实现总结 总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
12 KiB
12 KiB
统一数据获取接口 - 架构设计
设计目标
创建一个简单易用的数据获取接口,能够:
- 自动识别资产类型(A股、美股、港股、期货、加密货币等)
- 自动路由到正确的数据源(Tushare、YFinance、CCXT)
- 统一返回格式化的 DataFrame
- 容错处理(重试机制、错误处理)
整体架构
┌─────────────────────────────────────────────────────────┐
│ UniversalDataFetcher │
│ │
│ 用户调用: fetch(code, start, end) │
└───────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ AssetTypeDetector.detect(code) │
│ │
│ 输入: "000300.SH" │
│ 输出: "china_index" │
└───────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据源路由器 │
│ │
│ china_index → _fetch_china() → Tushare │
│ china_etf → _fetch_china() → Tushare │
│ china_stock → _fetch_china() → Tushare │
│ hk_* → _fetch_yfinance() → YFinance │
│ us_* → _fetch_yfinance() → YFinance │
│ futures → _fetch_futures() → Tushare │
│ crypto → _fetch_crypto() → CCXT/OKX │
└───────────────────┬─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 标准化 DataFrame 返回 │
│ │
│ Index: DatetimeIndex │
│ Columns: [open, high, low, close, volume, code] │
└─────────────────────────────────────────────────────────┘
核心组件
1. AssetTypeDetector(资产类型检测器)
职责: 根据代码格式自动识别资产类型
检测规则(按优先级):
1. 加密货币代码集合匹配 (BTC, ETH, ...)
└─ → crypto
2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...)
└─ → futures
3. 港股后缀匹配 (.HK)
├─ 在 YF_CODE_MAP 中 → hk_index
└─ 否则 → hk_stock
4. A股后缀匹配 (.SH, .SZ, .SS, .CSI)
├─ 6位数字代码:
│ ├─ 前缀 000,001,002,399,930,931,932 → china_index
│ ├─ 前缀 51,52,56,58,15,16 → china_etf
│ └─ 其他 → china_stock
└─ 非6位数字 → china_stock
5. 美股指数映射表匹配 (NDX, SPX, ...)
└─ → us_index
6. 默认
└─ → us_stock
代码示例:
class AssetTypeDetector:
@classmethod
def detect(cls, code: str) -> str:
# 加密货币优先
if code.upper() in cls.CRYPTO_CODES:
return 'crypto'
# 期货
if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES):
return 'futures'
# ... 其他规则
2. UniversalDataFetcher(统一数据获取器)
职责: 封装所有数据源,提供统一接口
主要方法:
| 方法 | 功能 | 返回值 |
|---|---|---|
fetch() |
获取单只标的 | DataFrame 或 None |
fetch_multiple() |
批量获取 | Dict[str, DataFrame] |
数据获取流程:
def fetch(self, code, start_date, end_date, retry=3):
for attempt in range(retry):
try:
# 1. 检测资产类型
asset_type = AssetTypeDetector.detect(code)
# 2. 路由到对应的获取方法
if asset_type in ('china_index', 'china_etf', 'china_stock'):
return self._fetch_china(code, start_date, end_date, asset_type)
elif asset_type == 'futures':
return self._fetch_futures(code, start_date, end_date)
elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'):
return self._fetch_yfinance(code, start_date, end_date, asset_type)
elif asset_type == 'crypto':
return self._fetch_crypto(code, start_date, end_date)
except Exception as e:
# 3. 重试机制
if attempt < retry - 1:
time.sleep(2)
else:
return None
3. 数据源适配器
3.1 Tushare 适配器 (_fetch_china)
功能: 获取A股数据(指数、ETF、股票、期货)
接口选择:
if asset_type == 'china_index':
df = pro.index_daily(ts_code, ...) # 指数日线
elif asset_type == 'china_etf':
df = pro.fund_daily(ts_code, ...) # 基金日线
if 无数据:
df = pro.daily(ts_code, ...) # 股票日线
elif asset_type == 'futures':
df = pro.fut_daily(ts_code, ...) # 期货日线
代理处理:
# Tushare 是国内服务,需要临时清除代理
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]:
original_proxy[key] = os.environ.pop(key, None)
try:
# 调用 Tushare API
df = pro.index_daily(...)
finally:
# 恢复代理设置
for key, value in original_proxy.items():
if value is not None:
os.environ[key] = value
3.2 YFinance 适配器 (_fetch_yfinance)
功能: 获取港股、美股数据
代码转换:
# 使用映射表转换代码
yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code)
# 美股指数需要加 ^ 前缀
if asset_type == 'us_index' and not yf_code.startswith('^'):
yf_code = f'^{yf_code}'
日期处理:
# yfinance 的 end 参数是排他的,需要加一天
end_date_obj = pd.Timestamp(end_date) + timedelta(days=1)
data = ticker.history(start=start_date, end=end_date_obj)
3.3 CCXT 适配器 (_fetch_crypto)
功能: 获取加密货币数据(通过 OKX 交易所)
实现: 直接复用 HybridDataSource._fetch_ccxt()
4. 标准化输出
所有数据源返回统一格式:
# DataFrame 结构
Index: DatetimeIndex (日期)
Columns:
- open: 开盘价 (float)
- high: 最高价 (float)
- low: 最低价 (float)
- close: 收盘价 (float)
- volume: 成交量 (float)
- code: 标的代码 (str)
标准化代码:
# 统一列名
df = df.rename(columns={
"trade_date": "date",
"vol": "volume",
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
})
# 设置日期索引
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()
# 选择需要的列
cols = ['open', 'high', 'low', 'close', 'volume']
available = [c for c in cols if c in df.columns]
df = df[available]
df['code'] = code
设计模式
1. 策略模式 (Strategy Pattern)
不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。
数据源策略:
├─ TushareStrategy (中国A股)
├─ YFinanceStrategy (港美股)
└─ CCXTStrategy (加密货币)
2. 外观模式 (Facade Pattern)
UniversalDataFetcher 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。
用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT]
3. 上下文管理器 (Context Manager)
支持 with 语句,自动管理 SSH 隧道的生命周期。
with UniversalDataFetcher(ssh_config=ssh_config) as fetcher:
df = fetcher.fetch(...)
# SSH 隧道自动关闭
错误处理策略
1. 重试机制
for attempt in range(retry):
try:
return self._fetch_xxx(...)
except Exception as e:
if attempt < retry - 1:
time.sleep(2) # 等待后重试
else:
return None # 重试失败
2. 优雅降级
- 数据获取失败时返回
None,不抛出异常 - 批量获取时单个失败不影响其他标的
3. 详细日志
print(f"✓ {code}: {len(df)} 条") # 成功
print(f"✗ {code}: 无数据") # 失败
性能优化
1. 批量分组
# 按资产类型分组,减少重复检测
grouped = {}
for code in codes:
asset_type = AssetTypeDetector.detect(code)
if asset_type not in grouped:
grouped[asset_type] = []
grouped[asset_type].append(code)
2. 延迟加载
# Tushare token 延迟加载
def _get_tushare_token(self):
if self._tushare_token is None:
self._tushare_token = os.getenv("TUSHARE_TOKEN")
return self._tushare_token
3. 限流处理
# YFinance 添加延迟避免限流
time.sleep(0.5)
data = ticker.history(...)
扩展性设计
添加新的资产类型
步骤1: 在 AssetTypeDetector 中添加检测规则
class AssetTypeDetector:
NEW_ASSET_CODES = {'CODE1', 'CODE2'}
@classmethod
def detect(cls, code: str) -> str:
if code in cls.NEW_ASSET_CODES:
return 'new_asset'
# ... 其他规则
步骤2: 在 UniversalDataFetcher 中添加获取方法
class UniversalDataFetcher:
def _fetch_new_asset(self, code, start_date, end_date):
# 实现数据获取逻辑
return df
步骤3: 在路由中添加分支
def fetch(self, code, ...):
asset_type = AssetTypeDetector.detect(code)
if asset_type == 'new_asset':
return self._fetch_new_asset(code, start_date, end_date)
# ... 其他分支
测试策略
1. 单元测试
- 测试资产类型检测的准确性
- 测试各种代码格式的识别
2. 集成测试
- 测试真实数据获取(需要网络)
- 测试不同资产类型的数据获取
3. 边界测试
- 无效代码处理
- 空数据范围
- 网络异常处理
与现有代码的关系
现有代码:
HybridDataSource (轮动策略使用)
├─ _fetch_tushare()
├─ _fetch_yfinance()
├─ _fetch_ccxt()
└─ fetch_all()
新增代码:
UniversalDataFetcher (通用接口)
├─ 复用 HybridDataSource 的底层方法
├─ 增加资产类型检测
├─ 增加A股股票支持
├─ 简化API
└─ 提供便捷函数
兼容性:
- 不修改现有
HybridDataSource - 新接口是对现有代码的封装和扩展
- 现有轮动策略代码无需修改
未来改进方向
- 缓存机制: 添加本地缓存,减少重复请求
- 异步支持: 使用 asyncio 提高批量获取性能
- 分钟级数据: 扩展支持分钟级K线
- 实时数据: 接入实时行情接口
- 数据质量检查: 自动检测异常数据(停牌、涨跌停等)
总结
UniversalDataFetcher 通过以下设计实现了"一个接口,所有资产"的目标:
✅ 自动识别: 智能检测资产类型
✅ 自动路由: 选择最优数据源
✅ 统一格式: 返回标准化 DataFrame
✅ 容错处理: 重试、降级、日志
✅ 易于扩展: 支持新资产类型
✅ 向后兼容: 不破坏现有代码