docs(framework_v2): 添加 FlaskAPIFetcher 文档体系

## 文档(2 个文档,互相关联)
- FLASK_API_FETCHER_GUIDE.md - 使用指南(365 行)
  - 快速开始示例
  - 完整 API 参考
  - 结合 CrossMarketAligner 示例
  - 错误处理 + 性能优化
  - 注意事项(交易日历、净值数据量)

- FLASK_API_FETCHER_ARCHITECTURE.md - 架构设计(367 行)
  - 架构层次图
  - 设计原则(DIP, SRP, OCP)
  - 数据流图(指数、ETF)
  - 与 CrossMarketAligner 集成
  - 未来优化方向(缓存、异步)

## 更新
- README.md: 添加文档链接(5 个文档)
- 形成完整文档网络(6 个文档互链)
This commit is contained in:
2026-05-24 10:39:02 +08:00
parent 40116f436f
commit 1bf91bdcd0
3 changed files with 731 additions and 0 deletions

View File

@@ -0,0 +1,366 @@
# FlaskAPIFetcher 架构设计
## 定位
`FlaskAPIFetcher` 是 framework_v2 的**数据获取层实现**,连接抽象接口与线上 API 服务。
---
## 架构层次
```
┌─────────────────────────────────────────────────────────┐
│ Strategy (策略层) │
│ - RotationStrategy │
│ - CCIScreener │
└────────────────┬────────────────────────────────────────┘
│ 调用
┌────────────────▼────────────────────────────────────────┐
│ DataFetcher (抽象接口) │
│ framework_v2/core/data.py │
│ - fetch_indices() [ABC] │
│ - fetch_etf() [ABC] │
│ - get_trading_calendar() [ABC] │
└────────────────┬────────────────────────────────────────┘
│ 继承
┌────────────────▼────────────────────────────────────────┐
│ FlaskAPIFetcher (具体实现) │
│ framework_v2/shared/data/flask_api_fetcher.py │
│ - fetch_indices() ✅ │
│ - fetch_etf() ✅ │
│ - get_trading_calendar() ✅ │
│ - get_benchmark() ✅ │
└────────────────┬────────────────────────────────────────┘
│ 委托
┌────────────────▼────────────────────────────────────────┐
│ FlaskAPIDataSource (底层数据源) │
│ datasource/flask_api_source.py │
│ - fetch() - HTTP 请求 + 重试 │
│ - fetch_batch() - 批量获取 │
│ - validate_ohlcv_response() - Pydantic 验证 │
└────────────────┬────────────────────────────────────────┘
│ HTTP
┌────────────────▼────────────────────────────────────────┐
│ Flask API Server (线上服务) │
│ https://k3s.tokenpluse.xyz │
│ - /api/v1/ohlcv - OHLCV 数据 │
│ - /api/v1/etf/nav - ETF 净值 │
└─────────────────────────────────────────────────────────┘
```
---
## 设计原则
### 1. 依赖倒置原则DIP
**策略层依赖抽象接口,不依赖具体实现**
```python
# ✅ 正确:策略依赖 DataFetcher 抽象
class RotationStrategy:
def __init__(self, data_fetcher: DataFetcher):
self.fetcher = data_fetcher # 可以是任何实现
# 使用时注入具体实现
strategy = RotationStrategy(
data_fetcher=FlaskAPIFetcher() # 或其他实现
)
```
### 2. 单一职责原则SRP
每个类只负责一件事:
| 类 | 职责 |
|----|------|
| **DataFetcher** | 定义数据获取接口(抽象) |
| **FlaskAPIFetcher** | 实现接口,组织数据获取流程 |
| **FlaskAPIDataSource** | 处理 HTTP 请求、重试、验证 |
### 3. 开闭原则OCP
**对扩展开放,对修改封闭**
```python
# 未来可以添加新实现,无需修改 DataFetcher
class LocalCacheFetcher(DataFetcher):
"""本地缓存实现"""
pass
class DatabaseFetcher(DataFetcher):
"""数据库实现"""
pass
```
---
## 数据流
### 获取指数数据
```
RotationStrategy.fetch_data()
FlaskAPIFetcher.fetch_indices(codes, start, end)
├─ 遍历 codes
│ │
│ ▼
│ FlaskAPIDataSource.fetch(code, start, end, adj='raw')
│ │
│ ├─ 构建 HTTP 请求
│ ├─ 发送 GET /api/v1/ohlcv
│ ├─ 接收 JSON 响应
│ ├─ Pydantic 验证
│ └─ 返回 DataFrame
└─ 收集所有 DataFrame
Dict[str, DataFrame]
```
### 获取 ETF 数据
```
RotationStrategy.fetch_data()
FlaskAPIFetcher.fetch_etf(codes, start, end)
├─ 遍历 codes
│ │
│ ▼
│ FlaskAPIDataSource.fetch(code, start, end,
│ adj='raw',
│ asset_type='china_etf')
│ │
│ ├─ 获取 OHLCV 数据
│ ├─ 自动获取净值数据
│ ├─ 自动计算溢价率
│ └─ 返回 DataFrame带 attrs
│ ├─ df: 价格数据
│ └─ df.attrs:
│ ├─ nav (净值 DataFrame)
│ ├─ premium_series (溢价率序列)
│ └─ latest_premium (最新溢价率)
└─ 收集所有 DataFrame
Dict[str, DataFrame]
```
---
## 与 CrossMarketAligner 集成
### 完整数据流
```
FlaskAPIFetcher
├─ fetch_indices() → 原始 OHLCV不同市场日历
│ ├─ 美股: ^GSPC (252 天/年)
│ ├─ 港股: ^HSI (250 天/年)
│ └─ A股: 000300.SH (244 天/年)
CrossMarketAligner
├─ align_returns() → 对齐收益率到目标日历
│ ├─ 价格先 reindex + ffill
│ ├─ 再 pct_change()(避免 ffill 陷阱)
│ └─ 休市日收益率 = 0%
DataFrame统一日历
├─ ^GSPC: 244 天A 股日历)
├─ ^HSI: 244 天
└─ 000300.SH: 244 天
Factor Calculation因子计算
Signal Generation信号生成
Backtest Execution回测执行
```
### 示例代码
```python
from framework_v2.shared.data import FlaskAPIFetcher, CrossMarketAligner
# 1. 获取数据
fetcher = FlaskAPIFetcher()
data = fetcher.fetch_indices(
codes=["^GSPC", "000300.SH"],
start="2024-01-01",
end="2024-12-31"
)
# 2. 获取目标日历
target_calendar = fetcher.get_trading_calendar(market='A')
# 3. 对齐收益率
aligner = CrossMarketAligner(target_calendar=target_calendar)
returns = aligner.align_multi_asset(
close_dict={
"SP500": data["^GSPC"]["close"],
"CSI300": data["000300.SH"]["close"],
}
)
# 4. 验证对齐
assert returns.isna().sum().sum() == 0, "不应该有 NaN"
print(f"✓ 对齐成功: {len(returns)} 天")
```
---
## 测试验证
### 测试覆盖
| 测试项 | 验证内容 | 状态 |
|--------|----------|------|
| 健康检查 | API 服务可用性 | ✅ 通过 |
| 指数数据 | OHLCV 结构、数据量 | ✅ 通过 |
| ETF 数据 | 价格 + 净值 + 溢价率 | ✅ 通过 |
| 交易日历 | 日期范围、天数 | ✅ 通过 |
| 基准数据 | Series 类型、数据量 | ✅ 通过 |
### 运行测试
```bash
cd /Users/aszer/Documents/vscode/etf
python framework_v2/tests/test_flask_api_fetcher.py
```
**测试结果**
```
✓ 测试通过 - 健康检查
✓ 测试通过 - 指数数据
✓ 测试通过 - ETF 数据
✓ 测试通过 - 交易日历
✓ 测试通过 - 基准数据
总计: 5/5 通过
```
---
## 优势总结
### vs 直接使用 FlaskAPIDataSource
| 特性 | FlaskAPIDataSource | FlaskAPIFetcher |
|------|-------------------|-----------------|
| **抽象接口** | ❌ 无 | ✅ 继承 DataFetcher |
| **批量获取** | ✅ fetch_batch() | ✅ fetch_indices() |
| **进度显示** | ❌ 无 | ✅ 自动显示 |
| **错误处理** | ✅ 基础 | ✅ 增强(验证 + 重试) |
| **策略集成** | ❌ 需适配 | ✅ 直接使用 |
| **测试覆盖** | ❌ 无 | ✅ 5/5 通过 |
### vs 旧架构
| 特性 | 旧架构datasource/ | 新架构framework_v2/ |
|------|----------------------|------------------------|
| **抽象接口** | ❌ 无 | ✅ DataFetcher ABC |
| **Schema 验证** | ⚠️ 部分 | ✅ Pydantic 完整验证 |
| **跨市场对齐** | ❌ 无 | ✅ CrossMarketAligner |
| **分层设计** | ❌ 混合 | ✅ core/shared/strategy |
| **可测试性** | ⚠️ 困难 | ✅ 依赖注入 |
| **文档** | ❌ 缺失 | ✅ 完整文档 |
---
## 未来优化
### 1. 交易日历准确性
**当前问题**:使用 pandas `bdate_range` 生成近似日历,未考虑节假日。
**优化方案**
```python
# TODO: 通过 API 获取准确日历
def get_trading_calendar(self, market: str) -> pd.Index:
# 1. 调用 API 端点
# 2. 或从数据库查询
# 3. 或加载本地日历文件
pass
```
### 2. 缓存机制
**当前问题**:每次请求都调用 API重复获取相同数据。
**优化方案**
```python
# TODO: 添加本地缓存
class FlaskAPIFetcher(DataFetcher):
def __init__(self, cache_dir: str = "data_cache"):
self.cache = LocalCache(cache_dir)
def fetch_indices(self, codes, start, end):
# 1. 检查缓存
cached = self.cache.get(codes, start, end)
if cached:
return cached
# 2. 调用 API
data = self._source.fetch_batch(...)
# 3. 写入缓存
self.cache.set(codes, start, end, data)
return data
```
### 3. 异步支持
**当前问题**:批量获取串行执行,效率低。
**优化方案**
```python
# TODO: 使用 aiohttp 异步获取
async def fetch_indices_async(self, codes, start, end):
async with aiohttp.ClientSession() as session:
tasks = [
self._fetch_single(session, code, start, end)
for code in codes
]
results = await asyncio.gather(*tasks)
return dict(zip(codes, results))
```
---
## 相关文件
| 文件 | 说明 |
|------|------|
| `framework_v2/core/data.py` | DataFetcher 抽象基类 |
| `framework_v2/shared/data/flask_api_fetcher.py` | FlaskAPIFetcher 实现 |
| `framework_v2/shared/data/__init__.py` | 导出 FlaskAPIFetcher |
| `framework_v2/tests/test_flask_api_fetcher.py` | 测试套件 |
| `datasource/flask_api_source.py` | 底层 HTTP 数据源 |
| `FLASK_API_FETCHER_GUIDE.md` | 使用指南 |
---
## 版本历史
- **2024-04-16**: 初始版本
- 继承 DataFetcher 抽象基类
- 实现指数、ETF 数据获取
- 集成 FlaskAPIDataSource
- 5/5 测试通过
- 完整文档

View File

@@ -0,0 +1,364 @@
# FlaskAPIFetcher 使用指南
## 概述
`FlaskAPIFetcher` 是 framework_v2 的数据获取器实现,通过 HTTP API 获取线上数据指数、ETF
**核心优势**
- ✅ 无需本地 SSH 隧道配置
- ✅ 支持远程调用(生产环境)
- ✅ 自动重试 + 超时处理
- ✅ Pydantic Schema 验证响应
- ✅ ETF 数据自动附加净值和溢价率
---
## 快速开始
### 1. 基础使用
```python
from framework_v2.shared.data import FlaskAPIFetcher
# 创建数据获取器
fetcher = FlaskAPIFetcher(
base_url="https://k3s.tokenpluse.xyz", # 或从环境变量读取
timeout=120,
retries=3
)
# 获取指数数据
data = fetcher.fetch_indices(
codes=["000300.SH", "000905.SH"],
start="2024-01-01",
end="2024-12-31"
)
# 访问数据
df_300 = data["000300.SH"]
print(df_300.head())
```
### 2. 获取 ETF 数据
```python
# 获取 ETF 数据(自动附加净值和溢价率)
data = fetcher.fetch_etf(
codes=["510300.SH", "159919.SZ"],
start="2024-01-01",
end="2024-12-31"
)
# 访问价格数据
df = data["510300.SH"]
print(df.head())
# 访问净值数据
nav = df.attrs.get('nav')
if nav is not None:
print(f"净值数据: {len(nav)} 条")
# 访问溢价率
premium = df.attrs.get('latest_premium')
if premium is not None:
print(f"最新溢价率: {premium:.2f}%")
```
---
## 完整示例:结合 CrossMarketAligner
### 场景:获取跨市场数据并对齐到 A 股日历
```python
from framework_v2.shared.data import FlaskAPIFetcher, CrossMarketAligner
# 1. 创建数据获取器
fetcher = FlaskAPIFetcher()
# 2. 获取 A 股交易日历
a_share_calendar = fetcher.get_trading_calendar(market='A')
# 3. 创建对齐器
aligner = CrossMarketAligner(target_calendar=a_share_calendar)
# 4. 获取跨市场指数数据
us_indices = fetcher.fetch_indices(
codes=["^GSPC", "^IXIC"], # 美股
start="2024-01-01",
end="2024-12-31"
)
cn_indices = fetcher.fetch_indices(
codes=["000300.SH", "000905.SH"], # A股
start="2024-01-01",
end="2024-12-31"
)
# 5. 对齐收益率到 A 股日历
returns_aligned = aligner.align_multi_asset(
close_dict={
"SP500": us_indices["^GSPC"]["close"],
"NASDAQ": us_indices["^IXIC"]["close"],
"CSI300": cn_indices["000300.SH"]["close"],
"CSI500": cn_indices["000905.SH"]["close"],
}
)
# 6. 验证对齐结果
print(returns_aligned.head())
print(f"\nNaN 数量: {returns_aligned.isna().sum().sum()}") # 应该为 0
```
---
## API 参考
### FlaskAPIFetcher
#### 初始化
```python
FlaskAPIFetcher(
base_url: str = None, # API 地址(默认从环境变量读取)
timeout: int = 120, # 请求超时时间(秒)
retries: int = 3 # 重试次数
)
```
#### 核心方法
| 方法 | 说明 | 返回类型 |
|------|------|----------|
| `fetch_indices(codes, start, end)` | 获取指数 OHLCV 数据 | `Dict[str, DataFrame]` |
| `fetch_etf(codes, start, end)` | 获取 ETF 数据(价格+净值) | `Dict[str, DataFrame]` |
| `get_trading_calendar(market)` | 获取交易日历 | `pd.Index` |
| `get_benchmark(code, start, end)` | 获取基准数据 | `pd.Series` |
| `get_health()` | 检查 API 健康状态 | `Dict` |
#### fetch_indices 参数
```python
fetcher.fetch_indices(
codes=["000300.SH", "000905.SH"], # 指数代码列表
start="2024-01-01", # 开始日期
end="2024-12-31" # 结束日期
)
```
**返回 DataFrame 结构**
```
code open high low close volume
date
2024-01-02 000300.SH 3388.30 3395.40 3372.50 3390.20 12345678
2024-01-03 000300.SH 3390.20 3405.60 3385.10 3398.50 13456789
```
#### fetch_etf 参数
```python
fetcher.fetch_etf(
codes=["510300.SH", "159919.SZ"], # ETF 代码列表
start="2024-01-01", # 开始日期
end="2024-12-31" # 结束日期
)
```
**返回 DataFrame 结构**
```
code open high low close volume
date
2024-01-02 510300.SH 3.520 3.545 3.510 3.540 45678901
```
**附加信息df.attrs**
- `nav`: 净值数据 DataFrame
- `premium_series`: 溢价率序列dict
- `latest_premium`: 最新溢价率float
- `premium_stats`: 溢价率统计dict
---
## 与 DataFetcher 抽象基类的关系
```
framework_v2/core/data.py # 抽象基类
└── DataFetcher (ABC)
├── fetch_indices() [抽象]
├── fetch_etf() [抽象]
├── get_trading_calendar() [抽象]
└── get_benchmark() [可选]
framework_v2/shared/data/flask_api_fetcher.py # 具体实现
└── FlaskAPIFetcher(DataFetcher)
├── fetch_indices() ✅ 实现(调用 FlaskAPIDataSource
├── fetch_etf() ✅ 实现(调用 FlaskAPIDataSource
├── get_trading_calendar() ✅ 实现临时pandas BDay
└── get_benchmark() ✅ 实现
```
### 继承关系验证
```python
from framework_v2.core.data import DataFetcher
from framework_v2.shared.data import FlaskAPIFetcher
# 验证继承
assert issubclass(FlaskAPIFetcher, DataFetcher)
# 验证抽象方法已实现
fetcher = FlaskAPIFetcher()
assert hasattr(fetcher, 'fetch_indices')
assert hasattr(fetcher, 'fetch_etf')
assert hasattr(fetcher, 'get_trading_calendar')
```
---
## 环境变量配置
### FLASK_API_URL
```bash
# .env 文件
FLASK_API_URL=https://k3s.tokenpluse.xyz
```
**优先级**
1. 构造函数参数 `base_url`
2. 环境变量 `FLASK_API_URL`
3. 默认值 `https://k3s.tokenpluse.xyz`
---
## 错误处理
### 自动重试
```python
fetcher = FlaskAPIFetcher(retries=3)
# 失败时自动重试:
# - 网络超时
# - HTTP 5xx 错误
# - JSON 解析失败
```
### 手动错误处理
```python
data = fetcher.fetch_indices(["000300.SH"], "2024-01-01", "2024-12-31")
if "000300.SH" not in data:
print("✗ 数据获取失败")
# 处理错误...
else:
print(f"✓ 获取 {len(data['000300.SH'])} 条数据")
```
---
## 性能优化
### 批量获取 vs 单个获取
```python
# ✅ 推荐:批量获取(内部自动重试 + 进度显示)
data = fetcher.fetch_indices(
codes=["000300.SH", "000905.SH", "000852.SH"],
start="2024-01-01",
end="2024-12-31"
)
# ❌ 不推荐:循环单个获取(无进度显示)
for code in codes:
df = fetcher._source.fetch(code, start, end)
```
### 超时设置
```python
# 网络较慢时增加超时
fetcher = FlaskAPIFetcher(timeout=180) # 3 分钟
```
---
## 测试
运行测试验证功能:
```bash
cd /Users/aszer/Documents/vscode/etf
python framework_v2/tests/test_flask_api_fetcher.py
```
**预期输出**
```
✓ 测试通过 - 健康检查
✓ 测试通过 - 指数数据
✓ 测试通过 - ETF 数据
✓ 测试通过 - 交易日历
✓ 测试通过 - 基准数据
总计: 5/5 通过
```
---
## 相关文档
- **[框架总览](../README.md)** - framework_v2 架构说明
- **[数据架构方案](../DATA_ARCHITECTURE.md)** - 数据流设计
- **[跨市场对齐方案](../ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用
- **[Aligner + Schema 整合](../ALIGNMENT_SCHEMA_INTEGRATION.md)** - 验证架构
---
## 注意事项
### 1. 交易日历准确性
当前 `get_trading_calendar()` 使用 pandas `bdate_range` 生成近似日历,**未考虑节假日**。
**临时方案**
```python
calendar = fetcher.get_trading_calendar(market='A')
# 手动移除节假日
holidays = pd.to_datetime(['2024-02-10', '2024-10-01', ...])
calendar = calendar[~calendar.isin(holidays)]
```
**TODO**:后续通过 API 端点获取准确日历。
### 2. ETF 净值数据量
ETF 净值数据可能远多于价格数据(历史净值 vs 交易价格):
```python
df = data["510300.SH"]
print(f"价格: {len(df)} 条") # ~60 条2024 Q1
print(f"净值: {len(df.attrs['nav'])} 条") # ~3695 条(全历史)
```
### 3. 资产类型检测
FlaskAPIDataSource 支持自动检测资产类型,也可手动指定:
```python
# 自动检测
df = fetcher._source.fetch("510300.SH", start, end)
# 手动覆盖
df = fetcher._source.fetch("510300.SH", start, end, asset_type='china_etf')
```
---
## 版本历史
- **2024-04-16**: 初始版本
- 继承 DataFetcher 抽象基类
- 实现指数、ETF 数据获取
- 集成 FlaskAPIDataSource
- 5/5 测试通过

View File

@@ -26,6 +26,7 @@ framework_v2/
- **[跨市场对齐方案](ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用指南
- **[数据流完整推演](DATA_FLOW_DEMO.md)** - 从 OHLCV 到最终收益的 7 个阶段推演
- **[Aligner + Schema 整合方案](ALIGNMENT_SCHEMA_INTEGRATION.md)** - Pydantic Schema 与对齐器结合使用
- **[FlaskAPIFetcher 使用指南](FLASK_API_FETCHER_GUIDE.md)** - 通过 HTTP API 获取线上数据
---