Files
etf/docs/universal_fetcher_ARCHITECTURE.md
aszerW 0e531a1876 docs: 添加完整项目文档
- universal_fetcher_README.md:统一数据获取接口完整文档
- universal_fetcher_QUICKSTART.md:5分钟快速上手指南
- universal_fetcher_ARCHITECTURE.md:架构设计说明
- universal_fetcher_TEST_REPORT.md:测试报告与修复记录
- flask_api_README.md:Flask API 完整文档
- FLASK_SERVICE_SUMMARY.md:项目实现总结

总计 2000+ 行文档,涵盖 API 说明、使用示例、架构设计
2026-05-07 21:20:03 +08:00

431 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 统一数据获取接口 - 架构设计
## 设计目标
创建一个简单易用的数据获取接口,能够:
1. **自动识别**资产类型A股、美股、港股、期货、加密货币等
2. **自动路由**到正确的数据源Tushare、YFinance、CCXT
3. **统一返回**格式化的 DataFrame
4. **容错处理**(重试机制、错误处理)
## 整体架构
```
┌─────────────────────────────────────────────────────────┐
│ UniversalDataFetcher │
│ │
│ 用户调用: fetch(code, start, end) │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ AssetTypeDetector.detect(code) │
│ │
│ 输入: "000300.SH" │
│ 输出: "china_index" │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 数据源路由器 │
│ │
│ china_index → _fetch_china() → Tushare │
│ china_etf → _fetch_china() → Tushare │
│ china_stock → _fetch_china() → Tushare │
│ hk_* → _fetch_yfinance() → YFinance │
│ us_* → _fetch_yfinance() → YFinance │
│ futures → _fetch_futures() → Tushare │
│ crypto → _fetch_crypto() → CCXT/OKX │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 标准化 DataFrame 返回 │
│ │
│ Index: DatetimeIndex │
│ Columns: [open, high, low, close, volume, code] │
└─────────────────────────────────────────────────────────┘
```
## 核心组件
### 1. AssetTypeDetector资产类型检测器
**职责**: 根据代码格式自动识别资产类型
**检测规则**(按优先级):
```
1. 加密货币代码集合匹配 (BTC, ETH, ...)
└─ → crypto
2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...)
└─ → futures
3. 港股后缀匹配 (.HK)
├─ 在 YF_CODE_MAP 中 → hk_index
└─ 否则 → hk_stock
4. A股后缀匹配 (.SH, .SZ, .SS, .CSI)
├─ 6位数字代码:
│ ├─ 前缀 000,001,002,399,930,931,932 → china_index
│ ├─ 前缀 51,52,56,58,15,16 → china_etf
│ └─ 其他 → china_stock
└─ 非6位数字 → china_stock
5. 美股指数映射表匹配 (NDX, SPX, ...)
└─ → us_index
6. 默认
└─ → us_stock
```
**代码示例**:
```python
class AssetTypeDetector:
@classmethod
def detect(cls, code: str) -> str:
# 加密货币优先
if code.upper() in cls.CRYPTO_CODES:
return 'crypto'
# 期货
if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES):
return 'futures'
# ... 其他规则
```
### 2. UniversalDataFetcher统一数据获取器
**职责**: 封装所有数据源,提供统一接口
**主要方法**:
| 方法 | 功能 | 返回值 |
|------|------|--------|
| `fetch()` | 获取单只标的 | `DataFrame``None` |
| `fetch_multiple()` | 批量获取 | `Dict[str, DataFrame]` |
**数据获取流程**:
```python
def fetch(self, code, start_date, end_date, retry=3):
for attempt in range(retry):
try:
# 1. 检测资产类型
asset_type = AssetTypeDetector.detect(code)
# 2. 路由到对应的获取方法
if asset_type in ('china_index', 'china_etf', 'china_stock'):
return self._fetch_china(code, start_date, end_date, asset_type)
elif asset_type == 'futures':
return self._fetch_futures(code, start_date, end_date)
elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'):
return self._fetch_yfinance(code, start_date, end_date, asset_type)
elif asset_type == 'crypto':
return self._fetch_crypto(code, start_date, end_date)
except Exception as e:
# 3. 重试机制
if attempt < retry - 1:
time.sleep(2)
else:
return None
```
### 3. 数据源适配器
#### 3.1 Tushare 适配器 (_fetch_china)
**功能**: 获取A股数据指数、ETF、股票、期货
**接口选择**:
```python
if asset_type == 'china_index':
df = pro.index_daily(ts_code, ...) # 指数日线
elif asset_type == 'china_etf':
df = pro.fund_daily(ts_code, ...) # 基金日线
if 无数据:
df = pro.daily(ts_code, ...) # 股票日线
elif asset_type == 'futures':
df = pro.fut_daily(ts_code, ...) # 期货日线
```
**代理处理**:
```python
# Tushare 是国内服务,需要临时清除代理
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]:
original_proxy[key] = os.environ.pop(key, None)
try:
# 调用 Tushare API
df = pro.index_daily(...)
finally:
# 恢复代理设置
for key, value in original_proxy.items():
if value is not None:
os.environ[key] = value
```
#### 3.2 YFinance 适配器 (_fetch_yfinance)
**功能**: 获取港股、美股数据
**代码转换**:
```python
# 使用映射表转换代码
yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code)
# 美股指数需要加 ^ 前缀
if asset_type == 'us_index' and not yf_code.startswith('^'):
yf_code = f'^{yf_code}'
```
**日期处理**:
```python
# yfinance 的 end 参数是排他的,需要加一天
end_date_obj = pd.Timestamp(end_date) + timedelta(days=1)
data = ticker.history(start=start_date, end=end_date_obj)
```
#### 3.3 CCXT 适配器 (_fetch_crypto)
**功能**: 获取加密货币数据(通过 OKX 交易所)
**实现**: 直接复用 `HybridDataSource._fetch_ccxt()`
### 4. 标准化输出
**所有数据源返回统一格式**:
```python
# DataFrame 结构
Index: DatetimeIndex (日期)
Columns:
- open: 开盘价 (float)
- high: 最高价 (float)
- low: 最低价 (float)
- close: 收盘价 (float)
- volume: 成交量 (float)
- code: 标的代码 (str)
```
**标准化代码**:
```python
# 统一列名
df = df.rename(columns={
"trade_date": "date",
"vol": "volume",
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
})
# 设置日期索引
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()
# 选择需要的列
cols = ['open', 'high', 'low', 'close', 'volume']
available = [c for c in cols if c in df.columns]
df = df[available]
df['code'] = code
```
## 设计模式
### 1. 策略模式 (Strategy Pattern)
不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。
```
数据源策略:
├─ TushareStrategy (中国A股)
├─ YFinanceStrategy (港美股)
└─ CCXTStrategy (加密货币)
```
### 2. 外观模式 (Facade Pattern)
`UniversalDataFetcher` 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。
```
用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT]
```
### 3. 上下文管理器 (Context Manager)
支持 `with` 语句,自动管理 SSH 隧道的生命周期。
```python
with UniversalDataFetcher(ssh_config=ssh_config) as fetcher:
df = fetcher.fetch(...)
# SSH 隧道自动关闭
```
## 错误处理策略
### 1. 重试机制
```python
for attempt in range(retry):
try:
return self._fetch_xxx(...)
except Exception as e:
if attempt < retry - 1:
time.sleep(2) # 等待后重试
else:
return None # 重试失败
```
### 2. 优雅降级
- 数据获取失败时返回 `None`,不抛出异常
- 批量获取时单个失败不影响其他标的
### 3. 详细日志
```python
print(f"✓ {code}: {len(df)} 条") # 成功
print(f"✗ {code}: 无数据") # 失败
```
## 性能优化
### 1. 批量分组
```python
# 按资产类型分组,减少重复检测
grouped = {}
for code in codes:
asset_type = AssetTypeDetector.detect(code)
if asset_type not in grouped:
grouped[asset_type] = []
grouped[asset_type].append(code)
```
### 2. 延迟加载
```python
# Tushare token 延迟加载
def _get_tushare_token(self):
if self._tushare_token is None:
self._tushare_token = os.getenv("TUSHARE_TOKEN")
return self._tushare_token
```
### 3. 限流处理
```python
# YFinance 添加延迟避免限流
time.sleep(0.5)
data = ticker.history(...)
```
## 扩展性设计
### 添加新的资产类型
**步骤1**: 在 `AssetTypeDetector` 中添加检测规则
```python
class AssetTypeDetector:
NEW_ASSET_CODES = {'CODE1', 'CODE2'}
@classmethod
def detect(cls, code: str) -> str:
if code in cls.NEW_ASSET_CODES:
return 'new_asset'
# ... 其他规则
```
**步骤2**: 在 `UniversalDataFetcher` 中添加获取方法
```python
class UniversalDataFetcher:
def _fetch_new_asset(self, code, start_date, end_date):
# 实现数据获取逻辑
return df
```
**步骤3**: 在路由中添加分支
```python
def fetch(self, code, ...):
asset_type = AssetTypeDetector.detect(code)
if asset_type == 'new_asset':
return self._fetch_new_asset(code, start_date, end_date)
# ... 其他分支
```
## 测试策略
### 1. 单元测试
- 测试资产类型检测的准确性
- 测试各种代码格式的识别
### 2. 集成测试
- 测试真实数据获取(需要网络)
- 测试不同资产类型的数据获取
### 3. 边界测试
- 无效代码处理
- 空数据范围
- 网络异常处理
## 与现有代码的关系
```
现有代码:
HybridDataSource (轮动策略使用)
├─ _fetch_tushare()
├─ _fetch_yfinance()
├─ _fetch_ccxt()
└─ fetch_all()
新增代码:
UniversalDataFetcher (通用接口)
├─ 复用 HybridDataSource 的底层方法
├─ 增加资产类型检测
├─ 增加A股股票支持
├─ 简化API
└─ 提供便捷函数
```
**兼容性**:
- 不修改现有 `HybridDataSource`
- 新接口是对现有代码的封装和扩展
- 现有轮动策略代码无需修改
## 未来改进方向
1. **缓存机制**: 添加本地缓存,减少重复请求
2. **异步支持**: 使用 asyncio 提高批量获取性能
3. **分钟级数据**: 扩展支持分钟级K线
4. **实时数据**: 接入实时行情接口
5. **数据质量检查**: 自动检测异常数据(停牌、涨跌停等)
## 总结
`UniversalDataFetcher` 通过以下设计实现了"一个接口,所有资产"的目标:
**自动识别**: 智能检测资产类型
**自动路由**: 选择最优数据源
**统一格式**: 返回标准化 DataFrame
**容错处理**: 重试、降级、日志
**易于扩展**: 支持新资产类型
**向后兼容**: 不破坏现有代码