- universal_fetcher_README.md:统一数据获取接口完整文档 - universal_fetcher_QUICKSTART.md:5分钟快速上手指南 - universal_fetcher_ARCHITECTURE.md:架构设计说明 - universal_fetcher_TEST_REPORT.md:测试报告与修复记录 - flask_api_README.md:Flask API 完整文档 - FLASK_SERVICE_SUMMARY.md:项目实现总结 总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
431 lines
12 KiB
Markdown
431 lines
12 KiB
Markdown
# 统一数据获取接口 - 架构设计
|
||
|
||
## 设计目标
|
||
|
||
创建一个简单易用的数据获取接口,能够:
|
||
1. **自动识别**资产类型(A股、美股、港股、期货、加密货币等)
|
||
2. **自动路由**到正确的数据源(Tushare、YFinance、CCXT)
|
||
3. **统一返回**格式化的 DataFrame
|
||
4. **容错处理**(重试机制、错误处理)
|
||
|
||
## 整体架构
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ UniversalDataFetcher │
|
||
│ │
|
||
│ 用户调用: fetch(code, start, end) │
|
||
└───────────────────┬─────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ AssetTypeDetector.detect(code) │
|
||
│ │
|
||
│ 输入: "000300.SH" │
|
||
│ 输出: "china_index" │
|
||
└───────────────────┬─────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ 数据源路由器 │
|
||
│ │
|
||
│ china_index → _fetch_china() → Tushare │
|
||
│ china_etf → _fetch_china() → Tushare │
|
||
│ china_stock → _fetch_china() → Tushare │
|
||
│ hk_* → _fetch_yfinance() → YFinance │
|
||
│ us_* → _fetch_yfinance() → YFinance │
|
||
│ futures → _fetch_futures() → Tushare │
|
||
│ crypto → _fetch_crypto() → CCXT/OKX │
|
||
└───────────────────┬─────────────────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────────────────────┐
|
||
│ 标准化 DataFrame 返回 │
|
||
│ │
|
||
│ Index: DatetimeIndex │
|
||
│ Columns: [open, high, low, close, volume, code] │
|
||
└─────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
## 核心组件
|
||
|
||
### 1. AssetTypeDetector(资产类型检测器)
|
||
|
||
**职责**: 根据代码格式自动识别资产类型
|
||
|
||
**检测规则**(按优先级):
|
||
|
||
```
|
||
1. 加密货币代码集合匹配 (BTC, ETH, ...)
|
||
└─ → crypto
|
||
|
||
2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...)
|
||
└─ → futures
|
||
|
||
3. 港股后缀匹配 (.HK)
|
||
├─ 在 YF_CODE_MAP 中 → hk_index
|
||
└─ 否则 → hk_stock
|
||
|
||
4. A股后缀匹配 (.SH, .SZ, .SS, .CSI)
|
||
├─ 6位数字代码:
|
||
│ ├─ 前缀 000,001,002,399,930,931,932 → china_index
|
||
│ ├─ 前缀 51,52,56,58,15,16 → china_etf
|
||
│ └─ 其他 → china_stock
|
||
└─ 非6位数字 → china_stock
|
||
|
||
5. 美股指数映射表匹配 (NDX, SPX, ...)
|
||
└─ → us_index
|
||
|
||
6. 默认
|
||
└─ → us_stock
|
||
```
|
||
|
||
**代码示例**:
|
||
|
||
```python
|
||
class AssetTypeDetector:
|
||
@classmethod
|
||
def detect(cls, code: str) -> str:
|
||
# 加密货币优先
|
||
if code.upper() in cls.CRYPTO_CODES:
|
||
return 'crypto'
|
||
|
||
# 期货
|
||
if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES):
|
||
return 'futures'
|
||
|
||
# ... 其他规则
|
||
```
|
||
|
||
### 2. UniversalDataFetcher(统一数据获取器)
|
||
|
||
**职责**: 封装所有数据源,提供统一接口
|
||
|
||
**主要方法**:
|
||
|
||
| 方法 | 功能 | 返回值 |
|
||
|------|------|--------|
|
||
| `fetch()` | 获取单只标的 | `DataFrame` 或 `None` |
|
||
| `fetch_multiple()` | 批量获取 | `Dict[str, DataFrame]` |
|
||
|
||
**数据获取流程**:
|
||
|
||
```python
|
||
def fetch(self, code, start_date, end_date, retry=3):
|
||
for attempt in range(retry):
|
||
try:
|
||
# 1. 检测资产类型
|
||
asset_type = AssetTypeDetector.detect(code)
|
||
|
||
# 2. 路由到对应的获取方法
|
||
if asset_type in ('china_index', 'china_etf', 'china_stock'):
|
||
return self._fetch_china(code, start_date, end_date, asset_type)
|
||
elif asset_type == 'futures':
|
||
return self._fetch_futures(code, start_date, end_date)
|
||
elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'):
|
||
return self._fetch_yfinance(code, start_date, end_date, asset_type)
|
||
elif asset_type == 'crypto':
|
||
return self._fetch_crypto(code, start_date, end_date)
|
||
except Exception as e:
|
||
# 3. 重试机制
|
||
if attempt < retry - 1:
|
||
time.sleep(2)
|
||
else:
|
||
return None
|
||
```
|
||
|
||
### 3. 数据源适配器
|
||
|
||
#### 3.1 Tushare 适配器 (_fetch_china)
|
||
|
||
**功能**: 获取A股数据(指数、ETF、股票、期货)
|
||
|
||
**接口选择**:
|
||
|
||
```python
|
||
if asset_type == 'china_index':
|
||
df = pro.index_daily(ts_code, ...) # 指数日线
|
||
elif asset_type == 'china_etf':
|
||
df = pro.fund_daily(ts_code, ...) # 基金日线
|
||
if 无数据:
|
||
df = pro.daily(ts_code, ...) # 股票日线
|
||
elif asset_type == 'futures':
|
||
df = pro.fut_daily(ts_code, ...) # 期货日线
|
||
```
|
||
|
||
**代理处理**:
|
||
|
||
```python
|
||
# Tushare 是国内服务,需要临时清除代理
|
||
original_proxy = {}
|
||
for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]:
|
||
original_proxy[key] = os.environ.pop(key, None)
|
||
|
||
try:
|
||
# 调用 Tushare API
|
||
df = pro.index_daily(...)
|
||
finally:
|
||
# 恢复代理设置
|
||
for key, value in original_proxy.items():
|
||
if value is not None:
|
||
os.environ[key] = value
|
||
```
|
||
|
||
#### 3.2 YFinance 适配器 (_fetch_yfinance)
|
||
|
||
**功能**: 获取港股、美股数据
|
||
|
||
**代码转换**:
|
||
|
||
```python
|
||
# 使用映射表转换代码
|
||
yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code)
|
||
|
||
# 美股指数需要加 ^ 前缀
|
||
if asset_type == 'us_index' and not yf_code.startswith('^'):
|
||
yf_code = f'^{yf_code}'
|
||
```
|
||
|
||
**日期处理**:
|
||
|
||
```python
|
||
# yfinance 的 end 参数是排他的,需要加一天
|
||
end_date_obj = pd.Timestamp(end_date) + timedelta(days=1)
|
||
data = ticker.history(start=start_date, end=end_date_obj)
|
||
```
|
||
|
||
#### 3.3 CCXT 适配器 (_fetch_crypto)
|
||
|
||
**功能**: 获取加密货币数据(通过 OKX 交易所)
|
||
|
||
**实现**: 直接复用 `HybridDataSource._fetch_ccxt()`
|
||
|
||
### 4. 标准化输出
|
||
|
||
**所有数据源返回统一格式**:
|
||
|
||
```python
|
||
# DataFrame 结构
|
||
Index: DatetimeIndex (日期)
|
||
Columns:
|
||
- open: 开盘价 (float)
|
||
- high: 最高价 (float)
|
||
- low: 最低价 (float)
|
||
- close: 收盘价 (float)
|
||
- volume: 成交量 (float)
|
||
- code: 标的代码 (str)
|
||
```
|
||
|
||
**标准化代码**:
|
||
|
||
```python
|
||
# 统一列名
|
||
df = df.rename(columns={
|
||
"trade_date": "date",
|
||
"vol": "volume",
|
||
"Open": "open",
|
||
"High": "high",
|
||
"Low": "low",
|
||
"Close": "close",
|
||
})
|
||
|
||
# 设置日期索引
|
||
df["date"] = pd.to_datetime(df["date"])
|
||
df = df.set_index("date")
|
||
df = df.sort_index()
|
||
|
||
# 选择需要的列
|
||
cols = ['open', 'high', 'low', 'close', 'volume']
|
||
available = [c for c in cols if c in df.columns]
|
||
df = df[available]
|
||
df['code'] = code
|
||
```
|
||
|
||
## 设计模式
|
||
|
||
### 1. 策略模式 (Strategy Pattern)
|
||
|
||
不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。
|
||
|
||
```
|
||
数据源策略:
|
||
├─ TushareStrategy (中国A股)
|
||
├─ YFinanceStrategy (港美股)
|
||
└─ CCXTStrategy (加密货币)
|
||
```
|
||
|
||
### 2. 外观模式 (Facade Pattern)
|
||
|
||
`UniversalDataFetcher` 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。
|
||
|
||
```
|
||
用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT]
|
||
```
|
||
|
||
### 3. 上下文管理器 (Context Manager)
|
||
|
||
支持 `with` 语句,自动管理 SSH 隧道的生命周期。
|
||
|
||
```python
|
||
with UniversalDataFetcher(ssh_config=ssh_config) as fetcher:
|
||
df = fetcher.fetch(...)
|
||
# SSH 隧道自动关闭
|
||
```
|
||
|
||
## 错误处理策略
|
||
|
||
### 1. 重试机制
|
||
|
||
```python
|
||
for attempt in range(retry):
|
||
try:
|
||
return self._fetch_xxx(...)
|
||
except Exception as e:
|
||
if attempt < retry - 1:
|
||
time.sleep(2) # 等待后重试
|
||
else:
|
||
return None # 重试失败
|
||
```
|
||
|
||
### 2. 优雅降级
|
||
|
||
- 数据获取失败时返回 `None`,不抛出异常
|
||
- 批量获取时单个失败不影响其他标的
|
||
|
||
### 3. 详细日志
|
||
|
||
```python
|
||
print(f"✓ {code}: {len(df)} 条") # 成功
|
||
print(f"✗ {code}: 无数据") # 失败
|
||
```
|
||
|
||
## 性能优化
|
||
|
||
### 1. 批量分组
|
||
|
||
```python
|
||
# 按资产类型分组,减少重复检测
|
||
grouped = {}
|
||
for code in codes:
|
||
asset_type = AssetTypeDetector.detect(code)
|
||
if asset_type not in grouped:
|
||
grouped[asset_type] = []
|
||
grouped[asset_type].append(code)
|
||
```
|
||
|
||
### 2. 延迟加载
|
||
|
||
```python
|
||
# Tushare token 延迟加载
|
||
def _get_tushare_token(self):
|
||
if self._tushare_token is None:
|
||
self._tushare_token = os.getenv("TUSHARE_TOKEN")
|
||
return self._tushare_token
|
||
```
|
||
|
||
### 3. 限流处理
|
||
|
||
```python
|
||
# YFinance 添加延迟避免限流
|
||
time.sleep(0.5)
|
||
data = ticker.history(...)
|
||
```
|
||
|
||
## 扩展性设计
|
||
|
||
### 添加新的资产类型
|
||
|
||
**步骤1**: 在 `AssetTypeDetector` 中添加检测规则
|
||
|
||
```python
|
||
class AssetTypeDetector:
|
||
NEW_ASSET_CODES = {'CODE1', 'CODE2'}
|
||
|
||
@classmethod
|
||
def detect(cls, code: str) -> str:
|
||
if code in cls.NEW_ASSET_CODES:
|
||
return 'new_asset'
|
||
# ... 其他规则
|
||
```
|
||
|
||
**步骤2**: 在 `UniversalDataFetcher` 中添加获取方法
|
||
|
||
```python
|
||
class UniversalDataFetcher:
|
||
def _fetch_new_asset(self, code, start_date, end_date):
|
||
# 实现数据获取逻辑
|
||
return df
|
||
```
|
||
|
||
**步骤3**: 在路由中添加分支
|
||
|
||
```python
|
||
def fetch(self, code, ...):
|
||
asset_type = AssetTypeDetector.detect(code)
|
||
|
||
if asset_type == 'new_asset':
|
||
return self._fetch_new_asset(code, start_date, end_date)
|
||
# ... 其他分支
|
||
```
|
||
|
||
## 测试策略
|
||
|
||
### 1. 单元测试
|
||
|
||
- 测试资产类型检测的准确性
|
||
- 测试各种代码格式的识别
|
||
|
||
### 2. 集成测试
|
||
|
||
- 测试真实数据获取(需要网络)
|
||
- 测试不同资产类型的数据获取
|
||
|
||
### 3. 边界测试
|
||
|
||
- 无效代码处理
|
||
- 空数据范围
|
||
- 网络异常处理
|
||
|
||
## 与现有代码的关系
|
||
|
||
```
|
||
现有代码:
|
||
HybridDataSource (轮动策略使用)
|
||
├─ _fetch_tushare()
|
||
├─ _fetch_yfinance()
|
||
├─ _fetch_ccxt()
|
||
└─ fetch_all()
|
||
|
||
新增代码:
|
||
UniversalDataFetcher (通用接口)
|
||
├─ 复用 HybridDataSource 的底层方法
|
||
├─ 增加资产类型检测
|
||
├─ 增加A股股票支持
|
||
├─ 简化API
|
||
└─ 提供便捷函数
|
||
```
|
||
|
||
**兼容性**:
|
||
- 不修改现有 `HybridDataSource`
|
||
- 新接口是对现有代码的封装和扩展
|
||
- 现有轮动策略代码无需修改
|
||
|
||
## 未来改进方向
|
||
|
||
1. **缓存机制**: 添加本地缓存,减少重复请求
|
||
2. **异步支持**: 使用 asyncio 提高批量获取性能
|
||
3. **分钟级数据**: 扩展支持分钟级K线
|
||
4. **实时数据**: 接入实时行情接口
|
||
5. **数据质量检查**: 自动检测异常数据(停牌、涨跌停等)
|
||
|
||
## 总结
|
||
|
||
`UniversalDataFetcher` 通过以下设计实现了"一个接口,所有资产"的目标:
|
||
|
||
✅ **自动识别**: 智能检测资产类型
|
||
✅ **自动路由**: 选择最优数据源
|
||
✅ **统一格式**: 返回标准化 DataFrame
|
||
✅ **容错处理**: 重试、降级、日志
|
||
✅ **易于扩展**: 支持新资产类型
|
||
✅ **向后兼容**: 不破坏现有代码
|