Archive legacy framework and utility modules that are no longer referenced by the active core (datasource/ and rotation/): - framework/ -> archive/framework/ - framework_v2/ -> archive/framework_v2/ - strategies/ -> archive/strategies/ - config/ -> archive/config/ - visualization/ -> archive/visualization/ - scripts/ -> archive/scripts/ - tests/ -> archive/tests/ - run_rotation.py, run_us_rotation.py -> archive/single_files/ - compare_*.py, test_api_dates.py -> archive/single_files/
525 lines
14 KiB
Markdown
525 lines
14 KiB
Markdown
# Aligner + Schema 验证整合方案
|
||
|
||
## 📋 设计目标
|
||
|
||
将 **CrossMarketAligner**(数据对齐)与 **Pydantic Schema**(结构验证)结合,形成完整的数据质量保障体系。
|
||
|
||
---
|
||
|
||
## 🏗️ 架构设计
|
||
|
||
### 验证流程
|
||
|
||
```
|
||
原始 OHLCV 数据
|
||
↓
|
||
[1. 输入验证] ← OHLCVInputSchema(Pydantic)
|
||
│ - 检查 close 列存在
|
||
│ - 检查数值类型
|
||
│ - 检查价格 > 0
|
||
↓
|
||
[2. CrossMarketAligner] ← 对齐 + 内置验证
|
||
│ - reindex + ffill
|
||
│ - NaN 比例检查
|
||
│ - 异常值检测
|
||
↓
|
||
[3. 输出验证] ← AlignedFactorSchema / AlignedReturnsSchema(Pydantic)
|
||
│ - 检查列名(value, is_filled)
|
||
│ - 检查类型(float, bool)
|
||
│ - 检查无 NaN
|
||
↓
|
||
下游组件(因子、信号、回测)
|
||
```
|
||
|
||
---
|
||
|
||
## 📐 Schema 定义
|
||
|
||
### 1. 输入验证 Schema
|
||
|
||
```python
|
||
"""framework_v2/shared/data/schemas.py"""
|
||
|
||
class OHLCVInputSchema(BaseModel):
|
||
"""OHLCV 输入数据验证"""
|
||
close: float = Field(..., description="收盘价(必需)", gt=0)
|
||
open: Optional[float] = Field(None, description="开盘价", gt=0)
|
||
high: Optional[float] = Field(None, description="最高价", gt=0)
|
||
low: Optional[float] = Field(None, description="最低价", gt=0)
|
||
volume: Optional[float] = Field(None, description="成交量", ge=0)
|
||
|
||
@field_validator('close', 'open', 'high', 'low')
|
||
def check_positive(cls, v):
|
||
if v is not None and v <= 0:
|
||
raise ValueError(f"价格必须为正数,当前值: {v}")
|
||
return v
|
||
```
|
||
|
||
**使用场景**:对齐前验证原始 OHLCV 数据
|
||
|
||
```python
|
||
# 示例
|
||
df = pd.DataFrame({
|
||
'close': [100.0, 101.0, 102.0],
|
||
'open': [99.0, 100.0, 101.0]
|
||
})
|
||
|
||
# 验证每一行
|
||
for idx, row in df.head(10).iterrows():
|
||
OHLCVInputSchema(**row.to_dict()) # 失败会抛出异常
|
||
```
|
||
|
||
---
|
||
|
||
### 2. 输出验证 Schema
|
||
|
||
#### 因子对齐输出
|
||
|
||
```python
|
||
class AlignedFactorSchema(BaseModel):
|
||
"""对齐后的因子数据验证"""
|
||
value: float = Field(..., description="对齐后的因子值")
|
||
is_filled: bool = Field(False, description="是否为填充值")
|
||
|
||
class Config:
|
||
arbitrary_types_allowed = True # 允许 NaN
|
||
```
|
||
|
||
**验证装饰器**:
|
||
|
||
```python
|
||
def validate_factor_after_align(func):
|
||
@wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
result = func(self, *args, **kwargs)
|
||
|
||
# 1. 检查返回类型
|
||
if not isinstance(result, pd.DataFrame):
|
||
raise TypeError(f"必须返回 DataFrame")
|
||
|
||
# 2. 检查列
|
||
required_cols = ['value', 'is_filled']
|
||
missing_cols = [col for col in required_cols if col not in result.columns]
|
||
if missing_cols:
|
||
raise ValueError(f"缺少列: {missing_cols}")
|
||
|
||
# 3. 检查类型
|
||
if not pd.api.types.is_numeric_dtype(result['value']):
|
||
raise TypeError(f"value 列必须是数值类型")
|
||
|
||
if not pd.api.types.is_bool_dtype(result['is_filled']):
|
||
raise TypeError(f"is_filled 列必须是布尔类型")
|
||
|
||
return result
|
||
return wrapper
|
||
```
|
||
|
||
---
|
||
|
||
#### 收益率对齐输出
|
||
|
||
```python
|
||
class AlignedReturnsSchema(BaseModel):
|
||
"""对齐后的收益率数据验证"""
|
||
returns: float = Field(..., description="收益率")
|
||
|
||
@field_validator('returns')
|
||
def check_returns_range(cls, v):
|
||
"""收益率应在合理范围内(-50% ~ 50%)"""
|
||
if abs(v) > 0.5:
|
||
import warnings
|
||
warnings.warn(f"收益率异常: {v:.2%}")
|
||
return v
|
||
```
|
||
|
||
**验证装饰器**:
|
||
|
||
```python
|
||
def validate_returns_after_align(func):
|
||
@wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
result = func(self, *args, **kwargs)
|
||
|
||
# 1. 检查返回类型
|
||
if not isinstance(result, pd.Series):
|
||
raise TypeError(f"必须返回 Series")
|
||
|
||
# 2. 检查无 NaN
|
||
if result.isna().any():
|
||
nan_count = result.isna().sum()
|
||
raise ValueError(f"收益率包含 {nan_count} 个 NaN")
|
||
|
||
# 3. 检查收益率范围
|
||
max_return = result.abs().max()
|
||
if max_return > 0.5:
|
||
import warnings
|
||
warnings.warn(f"发现异常收益率: {max_return:.2%}")
|
||
|
||
return result
|
||
return wrapper
|
||
```
|
||
|
||
---
|
||
|
||
#### 对齐验证结果
|
||
|
||
```python
|
||
class AlignmentValidationResult(BaseModel):
|
||
"""对齐验证结果"""
|
||
signals_aligned: bool = Field(..., description="信号是否已对齐")
|
||
returns_aligned: bool = Field(..., description="收益率是否已对齐")
|
||
common_dates_count: int = Field(..., description="共同日期数量")
|
||
lost_signals: int = Field(0, description="丢失的信号数")
|
||
lost_returns: int = Field(0, description="丢失的收益数")
|
||
|
||
@field_validator('common_dates_count')
|
||
def check_min_dates(cls, v):
|
||
"""共同日期至少 10 天"""
|
||
if v < 10:
|
||
raise ValueError(f"共同日期太少: {v} 天")
|
||
return v
|
||
```
|
||
|
||
---
|
||
|
||
## 💻 在 CrossMarketAligner 中使用
|
||
|
||
### 完整示例
|
||
|
||
```python
|
||
"""framework_v2/shared/data/alignment.py"""
|
||
|
||
from framework_v2.shared.data.schemas import (
|
||
OHLCVInputSchema,
|
||
AlignedFactorSchema,
|
||
AlignedReturnsSchema,
|
||
AlignmentValidationResult,
|
||
validate_factor_after_align,
|
||
validate_returns_after_align
|
||
)
|
||
|
||
|
||
class CrossMarketAligner:
|
||
"""跨市场数据对齐器"""
|
||
|
||
@validate_factor_after_align # ← Pydantic Schema 验证
|
||
def align_factor(
|
||
self,
|
||
factor_series: pd.Series,
|
||
source_calendar: pd.Index,
|
||
code: str = ''
|
||
) -> pd.DataFrame:
|
||
"""
|
||
对齐因子值到目标日历
|
||
|
||
验证流程:
|
||
1. 装饰器检查返回类型、列名、类型
|
||
2. 内置验证检查 NaN 比例、填充比例
|
||
"""
|
||
# 1. reindex + ffill
|
||
aligned = factor_series.reindex(self.target_calendar, method='ffill')
|
||
|
||
# 2. 标记填充值
|
||
is_filled = ~aligned.index.isin(source_calendar)
|
||
|
||
# 3. 内置验证(NaN 比例、填充比例)
|
||
self._validate_factor_alignment(aligned, is_filled, code)
|
||
|
||
return pd.DataFrame({
|
||
'value': aligned,
|
||
'is_filled': is_filled
|
||
}, index=self.target_calendar)
|
||
|
||
@validate_returns_after_align # ← Pydantic Schema 验证
|
||
def align_returns(
|
||
self,
|
||
close_series: pd.Series,
|
||
code: str
|
||
) -> pd.Series:
|
||
"""
|
||
对齐收益率到目标日历
|
||
|
||
验证流程:
|
||
1. 装饰器检查返回类型、无 NaN、收益率范围
|
||
2. 内置验证检查 NaN 比例、异常值、索引一致性
|
||
"""
|
||
# 1. 价格对齐
|
||
close_aligned = close_series.reindex(
|
||
self.target_calendar,
|
||
method='ffill'
|
||
)
|
||
|
||
# 2. 计算收益率
|
||
returns = close_aligned.pct_change(fill_method=None)
|
||
|
||
# 3. 填充首日 NaN
|
||
if len(returns) > 0:
|
||
returns.iloc[0] = 0.0
|
||
|
||
# 4. 填充剩余 NaN
|
||
nan_ratio = returns.isna().sum() / len(returns)
|
||
if nan_ratio > 0:
|
||
returns = returns.fillna(0.0)
|
||
warnings.warn(f"{code}: 收益率 NaN 比例 {nan_ratio:.1%},已填充为 0")
|
||
|
||
# 5. 内置验证
|
||
self._validate_returns(returns, code)
|
||
|
||
return returns
|
||
|
||
def validate_alignment(
|
||
self,
|
||
signals: pd.DataFrame,
|
||
returns_df: pd.DataFrame
|
||
) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
||
"""
|
||
验证信号与收益率对齐
|
||
|
||
验证流程:
|
||
1. 内置验证(共同日期、丢失日期)
|
||
2. Pydantic Schema 验证(AlignmentValidationResult)
|
||
"""
|
||
# 1. 找共同日期
|
||
common_dates = signals.index.intersection(returns_df.index)
|
||
|
||
# 2. 检查丢失的日期
|
||
lost_signals = len(signals) - len(common_dates)
|
||
lost_returns = len(returns_df) - len(common_dates)
|
||
|
||
if lost_signals > 0 or lost_returns > 0:
|
||
warnings.warn(...)
|
||
|
||
# 3. 检查对齐后日期是否太少
|
||
if len(common_dates) < 10:
|
||
raise ValueError(...)
|
||
|
||
# 4. 裁剪到共同日期
|
||
aligned_signals = signals.loc[common_dates]
|
||
aligned_returns = returns_df.loc[common_dates]
|
||
|
||
# 5. 使用 Pydantic Schema 验证结果
|
||
validation_result = AlignmentValidationResult(
|
||
signals_aligned=True,
|
||
returns_aligned=True,
|
||
common_dates_count=len(common_dates),
|
||
lost_signals=lost_signals,
|
||
lost_returns=lost_returns
|
||
)
|
||
|
||
# 6. 如果验证失败,Pydantic 会抛出异常
|
||
# (例如 common_dates_count < 10)
|
||
|
||
return aligned_signals, aligned_returns
|
||
```
|
||
|
||
---
|
||
|
||
## 📊 验证层级对比
|
||
|
||
| 层级 | 验证内容 | 工具 | 失败处理 |
|
||
|------|----------|------|----------|
|
||
| **输入验证** | 列名、类型、值范围 | Pydantic Schema | 立即报错 |
|
||
| **对齐验证** | NaN 比例、填充比例、异常值 | 内置方法 | 警告/报错 |
|
||
| **输出验证** | 返回类型、列名、类型、无 NaN | 装饰器 + Pydantic | 立即报错 |
|
||
| **最终验证** | 共同日期数、丢失日期 | Pydantic Schema | 立即报错 |
|
||
|
||
---
|
||
|
||
## 🎯 使用示例
|
||
|
||
### 完整流程
|
||
|
||
```python
|
||
from framework_v2.shared.data.alignment import CrossMarketAligner
|
||
from framework_v2.core.alignment_schemas import OHLCVInputSchema
|
||
|
||
# 1. 获取原始数据
|
||
index_data = {
|
||
'^GSPC': df_sp500, # 美股日历
|
||
'^HSI': df_hsi, # 港股日历
|
||
'000300.SH': df_hs300 # A 股日历
|
||
}
|
||
|
||
# 2. 输入验证(可选,开发阶段使用)
|
||
for code, df in index_data.items():
|
||
for idx, row in df.head(10).iterrows():
|
||
OHLCVInputSchema(**row.to_dict()) # 验证前 10 行
|
||
|
||
# 3. 创建对齐器
|
||
aligner = CrossMarketAligner(target_calendar=a_share_dates)
|
||
|
||
# 4. 对齐因子(自动验证)
|
||
factor_aligned = {}
|
||
for code, factor_series in factor_raw.items():
|
||
# ✅ 自动触发 @validate_factor_after_align
|
||
result = aligner.align_factor(
|
||
factor_series,
|
||
source_calendar=index_data[code].index,
|
||
code=code
|
||
)
|
||
factor_aligned[code] = result['value']
|
||
|
||
# 5. 对齐收益率(自动验证)
|
||
returns_df = aligner.align_multi_asset({
|
||
code: df['close']
|
||
for code, df in index_data.items()
|
||
})
|
||
# ✅ 内部调用 align_returns,自动触发 @validate_returns_after_align
|
||
|
||
# 6. 验证信号与收益率对齐(自动验证)
|
||
aligned_signals, aligned_returns = aligner.validate_alignment(
|
||
signals,
|
||
returns_df
|
||
)
|
||
# ✅ 自动创建 AlignmentValidationResult 并验证
|
||
```
|
||
|
||
---
|
||
|
||
## 🔧 配置验证级别
|
||
|
||
### 开发环境:完整验证
|
||
|
||
```python
|
||
# 开发环境:所有验证都启用
|
||
class CrossMarketAligner:
|
||
@validate_factor_after_align
|
||
def align_factor(self, ...):
|
||
...
|
||
|
||
@validate_returns_after_align
|
||
def align_returns(self, ...):
|
||
...
|
||
```
|
||
|
||
### 生产环境:轻量验证
|
||
|
||
```python
|
||
# 生产环境:只保留关键验证
|
||
import os
|
||
|
||
VALIDATION_LEVEL = os.getenv('FRAMEWORK_VALIDATION', 'light')
|
||
|
||
if VALIDATION_LEVEL == 'full':
|
||
# 完整验证
|
||
def align_factor(self, ...):
|
||
...
|
||
elif VALIDATION_LEVEL == 'light':
|
||
# 轻量验证(只检查关键项)
|
||
def align_factor(self, ...):
|
||
# 跳过 Pydantic 验证,只保留内置验证
|
||
...
|
||
```
|
||
|
||
---
|
||
|
||
## 📈 错误信息示例
|
||
|
||
### 输入验证失败
|
||
|
||
```python
|
||
# 错误:close 列缺失
|
||
df = pd.DataFrame({'open': [100.0, 101.0]})
|
||
OHLCVInputSchema(**df.iloc[0].to_dict())
|
||
|
||
# 错误信息:
|
||
ValidationError: 1 validation error for OHLCVInputSchema
|
||
close
|
||
Field required [type=missing, input_value={'open': 100.0}, input_type=dict]
|
||
```
|
||
|
||
### 输出验证失败
|
||
|
||
```python
|
||
# 错误:返回值不是 DataFrame
|
||
def align_factor(self, ...):
|
||
return pd.Series(...) # ❌ 错误
|
||
|
||
# 错误信息:
|
||
TypeError: align_factor 必须返回 DataFrame,当前返回 <class 'pandas.core.series.Series'>
|
||
```
|
||
|
||
### 对齐验证失败
|
||
|
||
```python
|
||
# 错误:共同日期太少
|
||
common_dates = pd.date_range('2024-01-01', periods=5) # 只有 5 天
|
||
|
||
AlignmentValidationResult(
|
||
signals_aligned=True,
|
||
returns_aligned=True,
|
||
common_dates_count=5, # ❌ < 10
|
||
lost_signals=0,
|
||
lost_returns=0
|
||
)
|
||
|
||
# 错误信息:
|
||
ValidationError: 1 validation error for AlignmentValidationResult
|
||
common_dates_count
|
||
共同日期太少: 5 天 [type=value_error, input_value=5, input_type=int]
|
||
```
|
||
|
||
---
|
||
|
||
## 🎯 优势总结
|
||
|
||
### 1. 早期失败
|
||
|
||
```python
|
||
# ❌ 没有验证:错误在回测阶段才发现
|
||
returns = aligner.align_returns(close_series, code='^GSPC')
|
||
# ... 回测跑到一半才发现收益率有 NaN
|
||
|
||
# ✅ 有验证:立即报错
|
||
returns = aligner.align_returns(close_series, code='^GSPC')
|
||
# ValidationError: 收益率包含 3 个 NaN
|
||
```
|
||
|
||
### 2. 类型安全
|
||
|
||
```python
|
||
# ✅ IDE 自动补全
|
||
result = aligner.align_factor(...)
|
||
result['value'] # IDE 知道这是 float
|
||
result['is_filled'] # IDE 知道这是 bool
|
||
|
||
# ✅ 类型检查
|
||
if result['is_filled']: # IDE 不会报警告
|
||
...
|
||
```
|
||
|
||
### 3. 文档化
|
||
|
||
```python
|
||
# Schema 本身就是文档
|
||
class AlignedFactorSchema(BaseModel):
|
||
value: float = Field(..., description="对齐后的因子值")
|
||
is_filled: bool = Field(False, description="是否为填充值")
|
||
|
||
# 开发者一看就知道输出格式
|
||
```
|
||
|
||
### 4. 一致性保证
|
||
|
||
```python
|
||
# ✅ 所有对齐结果都经过相同验证
|
||
result1 = aligner.align_factor(...) # 验证
|
||
result2 = aligner.align_factor(...) # 验证
|
||
result3 = aligner.align_factor(...) # 验证
|
||
|
||
# 保证:所有结果都有 value 和 is_filled 列,类型正确
|
||
```
|
||
|
||
---
|
||
|
||
## 🔗 相关文档
|
||
|
||
- **[数据架构方案](DATA_ARCHITECTURE.md)** - 完整的数据架构设计
|
||
- **[跨市场对齐方案](ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用指南
|
||
- **[数据流完整推演](DATA_FLOW_DEMO.md)** - 从 OHLCV 到最终收益的 7 个阶段推演
|
||
|
||
---
|
||
|
||
*创建日期: 2026-05-06*
|
||
*版本: 1.0.0*
|