docs: 将英文文件名重命名为中文

重命名13个英文文档为中文:
- etf_pool_selection.md → ETF候选池筛选报告.md
- etf_tracking_error_calculation.md → ETF跟踪误差计算方法.md
- FLASK_SERVICE_SUMMARY.md → Flask服务总结.md
- flask_api_README.md → Flask_API接口说明.md
- cross_market_effectiveness_survey.md → 跨市场有效性调研.md
- etf_rotation_deep_analysis.md → ETF轮动深度分析.md
- etf_rotation_framework.md → ETF轮动框架.md
- momentum_rotation_survey.md → 动量轮动调研.md
- strategy_evolution_report.md → 策略演进报告.md
- universal_fetcher_*.md → 通用数据源*.md

同时更新文档内部的交叉引用链接
This commit is contained in:
2026-06-20 23:24:04 +08:00
parent a600a71aa3
commit b698857e49
13 changed files with 6 additions and 6 deletions

View File

@@ -0,0 +1,430 @@
# 统一数据获取接口 - 架构设计
## 设计目标
创建一个简单易用的数据获取接口,能够:
1. **自动识别**资产类型A股、美股、港股、期货、加密货币等
2. **自动路由**到正确的数据源Tushare、YFinance、CCXT
3. **统一返回**格式化的 DataFrame
4. **容错处理**(重试机制、错误处理)
## 整体架构
```
┌─────────────────────────────────────────────────────────┐
│ UniversalDataFetcher │
│ │
│ 用户调用: fetch(code, start, end) │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ AssetTypeDetector.detect(code) │
│ │
│ 输入: "000300.SH" │
│ 输出: "china_index" │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 数据源路由器 │
│ │
│ china_index → _fetch_china() → Tushare │
│ china_etf → _fetch_china() → Tushare │
│ china_stock → _fetch_china() → Tushare │
│ hk_* → _fetch_yfinance() → YFinance │
│ us_* → _fetch_yfinance() → YFinance │
│ futures → _fetch_futures() → Tushare │
│ crypto → _fetch_crypto() → CCXT/OKX │
└───────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 标准化 DataFrame 返回 │
│ │
│ Index: DatetimeIndex │
│ Columns: [open, high, low, close, volume, code] │
└─────────────────────────────────────────────────────────┘
```
## 核心组件
### 1. AssetTypeDetector资产类型检测器
**职责**: 根据代码格式自动识别资产类型
**检测规则**(按优先级):
```
1. 加密货币代码集合匹配 (BTC, ETH, ...)
└─ → crypto
2. 期货后缀匹配 (.SHF, .DCE, .CZC, ...)
└─ → futures
3. 港股后缀匹配 (.HK)
├─ 在 YF_CODE_MAP 中 → hk_index
└─ 否则 → hk_stock
4. A股后缀匹配 (.SH, .SZ, .SS, .CSI)
├─ 6位数字代码:
│ ├─ 前缀 000,001,002,399,930,931,932 → china_index
│ ├─ 前缀 51,52,56,58,15,16 → china_etf
│ └─ 其他 → china_stock
└─ 非6位数字 → china_stock
5. 美股指数映射表匹配 (NDX, SPX, ...)
└─ → us_index
6. 默认
└─ → us_stock
```
**代码示例**:
```python
class AssetTypeDetector:
@classmethod
def detect(cls, code: str) -> str:
# 加密货币优先
if code.upper() in cls.CRYPTO_CODES:
return 'crypto'
# 期货
if any(code.endswith(suffix) for suffix in cls.FUTURES_SUFFIXES):
return 'futures'
# ... 其他规则
```
### 2. UniversalDataFetcher统一数据获取器
**职责**: 封装所有数据源,提供统一接口
**主要方法**:
| 方法 | 功能 | 返回值 |
|------|------|--------|
| `fetch()` | 获取单只标的 | `DataFrame``None` |
| `fetch_multiple()` | 批量获取 | `Dict[str, DataFrame]` |
**数据获取流程**:
```python
def fetch(self, code, start_date, end_date, retry=3):
for attempt in range(retry):
try:
# 1. 检测资产类型
asset_type = AssetTypeDetector.detect(code)
# 2. 路由到对应的获取方法
if asset_type in ('china_index', 'china_etf', 'china_stock'):
return self._fetch_china(code, start_date, end_date, asset_type)
elif asset_type == 'futures':
return self._fetch_futures(code, start_date, end_date)
elif asset_type in ('hk_index', 'hk_stock', 'us_index', 'us_stock'):
return self._fetch_yfinance(code, start_date, end_date, asset_type)
elif asset_type == 'crypto':
return self._fetch_crypto(code, start_date, end_date)
except Exception as e:
# 3. 重试机制
if attempt < retry - 1:
time.sleep(2)
else:
return None
```
### 3. 数据源适配器
#### 3.1 Tushare 适配器 (_fetch_china)
**功能**: 获取A股数据指数、ETF、股票、期货
**接口选择**:
```python
if asset_type == 'china_index':
df = pro.index_daily(ts_code, ...) # 指数日线
elif asset_type == 'china_etf':
df = pro.fund_daily(ts_code, ...) # 基金日线
if 无数据:
df = pro.daily(ts_code, ...) # 股票日线
elif asset_type == 'futures':
df = pro.fut_daily(ts_code, ...) # 期货日线
```
**代理处理**:
```python
# Tushare 是国内服务,需要临时清除代理
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", ...]:
original_proxy[key] = os.environ.pop(key, None)
try:
# 调用 Tushare API
df = pro.index_daily(...)
finally:
# 恢复代理设置
for key, value in original_proxy.items():
if value is not None:
os.environ[key] = value
```
#### 3.2 YFinance 适配器 (_fetch_yfinance)
**功能**: 获取港股、美股数据
**代码转换**:
```python
# 使用映射表转换代码
yf_code = AssetTypeDetector.YF_CODE_MAP.get(code, code)
# 美股指数需要加 ^ 前缀
if asset_type == 'us_index' and not yf_code.startswith('^'):
yf_code = f'^{yf_code}'
```
**日期处理**:
```python
# yfinance 的 end 参数是排他的,需要加一天
end_date_obj = pd.Timestamp(end_date) + timedelta(days=1)
data = ticker.history(start=start_date, end=end_date_obj)
```
#### 3.3 CCXT 适配器 (_fetch_crypto)
**功能**: 获取加密货币数据(通过 OKX 交易所)
**实现**: 直接复用 `HybridDataSource._fetch_ccxt()`
### 4. 标准化输出
**所有数据源返回统一格式**:
```python
# DataFrame 结构
Index: DatetimeIndex (日期)
Columns:
- open: 开盘价 (float)
- high: 最高价 (float)
- low: 最低价 (float)
- close: 收盘价 (float)
- volume: 成交量 (float)
- code: 标的代码 (str)
```
**标准化代码**:
```python
# 统一列名
df = df.rename(columns={
"trade_date": "date",
"vol": "volume",
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
})
# 设置日期索引
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()
# 选择需要的列
cols = ['open', 'high', 'low', 'close', 'volume']
available = [c for c in cols if c in df.columns]
df = df[available]
df['code'] = code
```
## 设计模式
### 1. 策略模式 (Strategy Pattern)
不同的数据源实现了相同的接口(获取 OHLCV 数据),根据资产类型动态选择策略。
```
数据源策略:
├─ TushareStrategy (中国A股)
├─ YFinanceStrategy (港美股)
└─ CCXTStrategy (加密货币)
```
### 2. 外观模式 (Facade Pattern)
`UniversalDataFetcher` 作为外观,隐藏了底层多个数据源的复杂性,提供简化的接口。
```
用户 → UniversalDataFetcher → [Tushare, YFinance, CCXT]
```
### 3. 上下文管理器 (Context Manager)
支持 `with` 语句,自动管理 SSH 隧道的生命周期。
```python
with UniversalDataFetcher(ssh_config=ssh_config) as fetcher:
df = fetcher.fetch(...)
# SSH 隧道自动关闭
```
## 错误处理策略
### 1. 重试机制
```python
for attempt in range(retry):
try:
return self._fetch_xxx(...)
except Exception as e:
if attempt < retry - 1:
time.sleep(2) # 等待后重试
else:
return None # 重试失败
```
### 2. 优雅降级
- 数据获取失败时返回 `None`,不抛出异常
- 批量获取时单个失败不影响其他标的
### 3. 详细日志
```python
print(f"✓ {code}: {len(df)} 条") # 成功
print(f"✗ {code}: 无数据") # 失败
```
## 性能优化
### 1. 批量分组
```python
# 按资产类型分组,减少重复检测
grouped = {}
for code in codes:
asset_type = AssetTypeDetector.detect(code)
if asset_type not in grouped:
grouped[asset_type] = []
grouped[asset_type].append(code)
```
### 2. 延迟加载
```python
# Tushare token 延迟加载
def _get_tushare_token(self):
if self._tushare_token is None:
self._tushare_token = os.getenv("TUSHARE_TOKEN")
return self._tushare_token
```
### 3. 限流处理
```python
# YFinance 添加延迟避免限流
time.sleep(0.5)
data = ticker.history(...)
```
## 扩展性设计
### 添加新的资产类型
**步骤1**: 在 `AssetTypeDetector` 中添加检测规则
```python
class AssetTypeDetector:
NEW_ASSET_CODES = {'CODE1', 'CODE2'}
@classmethod
def detect(cls, code: str) -> str:
if code in cls.NEW_ASSET_CODES:
return 'new_asset'
# ... 其他规则
```
**步骤2**: 在 `UniversalDataFetcher` 中添加获取方法
```python
class UniversalDataFetcher:
def _fetch_new_asset(self, code, start_date, end_date):
# 实现数据获取逻辑
return df
```
**步骤3**: 在路由中添加分支
```python
def fetch(self, code, ...):
asset_type = AssetTypeDetector.detect(code)
if asset_type == 'new_asset':
return self._fetch_new_asset(code, start_date, end_date)
# ... 其他分支
```
## 测试策略
### 1. 单元测试
- 测试资产类型检测的准确性
- 测试各种代码格式的识别
### 2. 集成测试
- 测试真实数据获取(需要网络)
- 测试不同资产类型的数据获取
### 3. 边界测试
- 无效代码处理
- 空数据范围
- 网络异常处理
## 与现有代码的关系
```
现有代码:
HybridDataSource (轮动策略使用)
├─ _fetch_tushare()
├─ _fetch_yfinance()
├─ _fetch_ccxt()
└─ fetch_all()
新增代码:
UniversalDataFetcher (通用接口)
├─ 复用 HybridDataSource 的底层方法
├─ 增加资产类型检测
├─ 增加A股股票支持
├─ 简化API
└─ 提供便捷函数
```
**兼容性**:
- 不修改现有 `HybridDataSource`
- 新接口是对现有代码的封装和扩展
- 现有轮动策略代码无需修改
## 未来改进方向
1. **缓存机制**: 添加本地缓存,减少重复请求
2. **异步支持**: 使用 asyncio 提高批量获取性能
3. **分钟级数据**: 扩展支持分钟级K线
4. **实时数据**: 接入实时行情接口
5. **数据质量检查**: 自动检测异常数据(停牌、涨跌停等)
## 总结
`UniversalDataFetcher` 通过以下设计实现了"一个接口,所有资产"的目标:
**自动识别**: 智能检测资产类型
**自动路由**: 选择最优数据源
**统一格式**: 返回标准化 DataFrame
**容错处理**: 重试、降级、日志
**易于扩展**: 支持新资产类型
**向后兼容**: 不破坏现有代码