# 统一数据获取接口 - 架构设计 ## 设计目标 创建一个简单易用的数据获取接口,能够: 1. **自动识别**资产类型(A股、美股、港股、期货、加密货币等) 2. **自动路由**到正确的数据源(Tushare、YFinance、CCXT) 3. **统一返回**格式化的 DataFrame 4. **容错处理**(重试机制、错误处理) ## 整体架构 ``` ┌─────────────────────────────────────────────────────────┐ │ UniversalDataFetcher │ │ │ │ 用户调用: fetch(code, start, end) │ └───────────────────┬─────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ AssetTypeDetector.detect(code) │ │ │ │ 输入: "000300.SH" │ │ 输出: "china_index" │ └───────────────────┬─────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 数据源路由器 │ │ │ │ china_index → _fetch_china() → Tushare │ │ china_etf → _fetch_china() → Tushare │ │ china_stock → _fetch_china() → Tushare │ │ hk_* → _fetch_yfinance() → YFinance │ │ us_* → _fetch_yfinance() → YFinance │ │ futures → _fetch_futures() → Tushare │ │ crypto → _fetch_crypto() → CCXT/OKX │ └───────────────────┬─────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 标准化 DataFrame 返回 │ │ │ │ Index: DatetimeIndex │ │ Columns: [open, high, low, close, volume, code] │ └─────────────────────────────────────────────────────────┘ ``` ## 核心组件 ### 1. AssetTypeDetector(资产类型检测器) **职责**: 根据代码格式自动识别资产类型 **检测规则**(按优先级): ``` 1. 加密货币代码集合匹配 (BTC, ETH, ...) └─ → crypto 2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...) └─ → futures 3. 港股后缀匹配 (.HK) ├─ 在 YF_CODE_MAP 中 → hk_index └─ 否则 → hk_stock 4. A股后缀匹配 (.SH, .SZ, .SS, .CSI) ├─ 6位数字代码: │ ├─ 前缀 000,001,002,399,930,931,932 → china_index │ ├─ 前缀 51,52,56,58,15,16 → china_etf │ └─ 其他 → china_stock └─ 非6位数字 → china_stock 5. 美股指数映射表匹配 (NDX, SPX, ...) └─ → us_index 6. 默认 └─ → us_stock ``` **代码示例**: ```python class AssetTypeDetector: @classmethod def detect(cls, code: str) -> str: # 加密货币优先 if code.upper() in cls.CRYPTO_CODES: return 'crypto' # 期货 if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES): return 'futures' # ... 其他规则 ``` ### 2. UniversalDataFetcher(统一数据获取器) **职责**: 封装所有数据源,提供统一接口 **主要方法**: | 方法 | 功能 | 返回值 | |------|------|--------| | `fetch()` | 获取单只标的 | `DataFrame` 或 `None` | | `fetch_multiple()` | 批量获取 | `Dict[str, DataFrame]` | **数据获取流程**: ```python def fetch(self, code, start_date, end_date, retry=3): for attempt in range(retry): try: # 1. 检测资产类型 asset_type = AssetTypeDetector.detect(code) # 2. 路由到对应的获取方法 if asset_type in ('china_index', 'china_etf', 'china_stock'): return self._fetch_china(code, start_date, end_date, asset_type) elif asset_type == 'futures': return self._fetch_futures(code, start_date, end_date) elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'): return self._fetch_yfinance(code, start_date, end_date, asset_type) elif asset_type == 'crypto': return self._fetch_crypto(code, start_date, end_date) except Exception as e: # 3. 重试机制 if attempt < retry - 1: time.sleep(2) else: return None ``` ### 3. 数据源适配器 #### 3.1 Tushare 适配器 (_fetch_china) **功能**: 获取A股数据(指数、ETF、股票、期货) **接口选择**: ```python if asset_type == 'china_index': df = pro.index_daily(ts_code, ...) # 指数日线 elif asset_type == 'china_etf': df = pro.fund_daily(ts_code, ...) # 基金日线 if 无数据: df = pro.daily(ts_code, ...) # 股票日线 elif asset_type == 'futures': df = pro.fut_daily(ts_code, ...) # 期货日线 ``` **代理处理**: ```python # Tushare 是国内服务,需要临时清除代理 original_proxy = {} for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]: original_proxy[key] = os.environ.pop(key, None) try: # 调用 Tushare API df = pro.index_daily(...) finally: # 恢复代理设置 for key, value in original_proxy.items(): if value is not None: os.environ[key] = value ``` #### 3.2 YFinance 适配器 (_fetch_yfinance) **功能**: 获取港股、美股数据 **代码转换**: ```python # 使用映射表转换代码 yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code) # 美股指数需要加 ^ 前缀 if asset_type == 'us_index' and not yf_code.startswith('^'): yf_code = f'^{yf_code}' ``` **日期处理**: ```python # yfinance 的 end 参数是排他的,需要加一天 end_date_obj = pd.Timestamp(end_date) + timedelta(days=1) data = ticker.history(start=start_date, end=end_date_obj) ``` #### 3.3 CCXT 适配器 (_fetch_crypto) **功能**: 获取加密货币数据(通过 OKX 交易所) **实现**: 直接复用 `HybridDataSource._fetch_ccxt()` ### 4. 标准化输出 **所有数据源返回统一格式**: ```python # DataFrame 结构 Index: DatetimeIndex (日期) Columns: - open: 开盘价 (float) - high: 最高价 (float) - low: 最低价 (float) - close: 收盘价 (float) - volume: 成交量 (float) - code: 标的代码 (str) ``` **标准化代码**: ```python # 统一列名 df = df.rename(columns={ "trade_date": "date", "vol": "volume", "Open": "open", "High": "high", "Low": "low", "Close": "close", }) # 设置日期索引 df["date"] = pd.to_datetime(df["date"]) df = df.set_index("date") df = df.sort_index() # 选择需要的列 cols = ['open', 'high', 'low', 'close', 'volume'] available = [c for c in cols if c in df.columns] df = df[available] df['code'] = code ``` ## 设计模式 ### 1. 策略模式 (Strategy Pattern) 不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。 ``` 数据源策略: ├─ TushareStrategy (中国A股) ├─ YFinanceStrategy (港美股) └─ CCXTStrategy (加密货币) ``` ### 2. 外观模式 (Facade Pattern) `UniversalDataFetcher` 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。 ``` 用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT] ``` ### 3. 上下文管理器 (Context Manager) 支持 `with` 语句,自动管理 SSH 隧道的生命周期。 ```python with UniversalDataFetcher(ssh_config=ssh_config) as fetcher: df = fetcher.fetch(...) # SSH 隧道自动关闭 ``` ## 错误处理策略 ### 1. 重试机制 ```python for attempt in range(retry): try: return self._fetch_xxx(...) except Exception as e: if attempt < retry - 1: time.sleep(2) # 等待后重试 else: return None # 重试失败 ``` ### 2. 优雅降级 - 数据获取失败时返回 `None`,不抛出异常 - 批量获取时单个失败不影响其他标的 ### 3. 详细日志 ```python print(f"✓ {code}: {len(df)} 条") # 成功 print(f"✗ {code}: 无数据") # 失败 ``` ## 性能优化 ### 1. 批量分组 ```python # 按资产类型分组,减少重复检测 grouped = {} for code in codes: asset_type = AssetTypeDetector.detect(code) if asset_type not in grouped: grouped[asset_type] = [] grouped[asset_type].append(code) ``` ### 2. 延迟加载 ```python # Tushare token 延迟加载 def _get_tushare_token(self): if self._tushare_token is None: self._tushare_token = os.getenv("TUSHARE_TOKEN") return self._tushare_token ``` ### 3. 限流处理 ```python # YFinance 添加延迟避免限流 time.sleep(0.5) data = ticker.history(...) ``` ## 扩展性设计 ### 添加新的资产类型 **步骤1**: 在 `AssetTypeDetector` 中添加检测规则 ```python class AssetTypeDetector: NEW_ASSET_CODES = {'CODE1', 'CODE2'} @classmethod def detect(cls, code: str) -> str: if code in cls.NEW_ASSET_CODES: return 'new_asset' # ... 其他规则 ``` **步骤2**: 在 `UniversalDataFetcher` 中添加获取方法 ```python class UniversalDataFetcher: def _fetch_new_asset(self, code, start_date, end_date): # 实现数据获取逻辑 return df ``` **步骤3**: 在路由中添加分支 ```python def fetch(self, code, ...): asset_type = AssetTypeDetector.detect(code) if asset_type == 'new_asset': return self._fetch_new_asset(code, start_date, end_date) # ... 其他分支 ``` ## 测试策略 ### 1. 单元测试 - 测试资产类型检测的准确性 - 测试各种代码格式的识别 ### 2. 集成测试 - 测试真实数据获取(需要网络) - 测试不同资产类型的数据获取 ### 3. 边界测试 - 无效代码处理 - 空数据范围 - 网络异常处理 ## 与现有代码的关系 ``` 现有代码: HybridDataSource (轮动策略使用) ├─ _fetch_tushare() ├─ _fetch_yfinance() ├─ _fetch_ccxt() └─ fetch_all() 新增代码: UniversalDataFetcher (通用接口) ├─ 复用 HybridDataSource 的底层方法 ├─ 增加资产类型检测 ├─ 增加A股股票支持 ├─ 简化API └─ 提供便捷函数 ``` **兼容性**: - 不修改现有 `HybridDataSource` - 新接口是对现有代码的封装和扩展 - 现有轮动策略代码无需修改 ## 未来改进方向 1. **缓存机制**: 添加本地缓存,减少重复请求 2. **异步支持**: 使用 asyncio 提高批量获取性能 3. **分钟级数据**: 扩展支持分钟级K线 4. **实时数据**: 接入实时行情接口 5. **数据质量检查**: 自动检测异常数据(停牌、涨跌停等) ## 总结 `UniversalDataFetcher` 通过以下设计实现了"一个接口,所有资产"的目标: ✅ **自动识别**: 智能检测资产类型 ✅ **自动路由**: 选择最优数据源 ✅ **统一格式**: 返回标准化 DataFrame ✅ **容错处理**: 重试、降级、日志 ✅ **易于扩展**: 支持新资产类型 ✅ **向后兼容**: 不破坏现有代码