Files
etf/archive/framework_v2/ALIGNMENT_SCHEMA_INTEGRATION.md
aszerW c905230a40 refactor(archive): move unused modules to archive/
Archive legacy framework and utility modules that are no longer
referenced by the active core (datasource/ and rotation/):

- framework/ -> archive/framework/
- framework_v2/ -> archive/framework_v2/
- strategies/ -> archive/strategies/
- config/ -> archive/config/
- visualization/ -> archive/visualization/
- scripts/ -> archive/scripts/
- tests/ -> archive/tests/
- run_rotation.py, run_us_rotation.py -> archive/single_files/
- compare_*.py, test_api_dates.py -> archive/single_files/
2026-06-03 23:41:46 +08:00

525 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Aligner + Schema 验证整合方案
## 📋 设计目标
**CrossMarketAligner**(数据对齐)与 **Pydantic Schema**(结构验证)结合,形成完整的数据质量保障体系。
---
## 🏗️ 架构设计
### 验证流程
```
原始 OHLCV 数据
[1. 输入验证] ← OHLCVInputSchemaPydantic
│ - 检查 close 列存在
│ - 检查数值类型
│ - 检查价格 > 0
[2. CrossMarketAligner] ← 对齐 + 内置验证
│ - reindex + ffill
│ - NaN 比例检查
│ - 异常值检测
[3. 输出验证] ← AlignedFactorSchema / AlignedReturnsSchemaPydantic
│ - 检查列名value, is_filled
│ - 检查类型float, bool
│ - 检查无 NaN
下游组件(因子、信号、回测)
```
---
## 📐 Schema 定义
### 1. 输入验证 Schema
```python
"""framework_v2/shared/data/schemas.py"""
class OHLCVInputSchema(BaseModel):
"""OHLCV 输入数据验证"""
close: float = Field(..., description="收盘价(必需)", gt=0)
open: Optional[float] = Field(None, description="开盘价", gt=0)
high: Optional[float] = Field(None, description="最高价", gt=0)
low: Optional[float] = Field(None, description="最低价", gt=0)
volume: Optional[float] = Field(None, description="成交量", ge=0)
@field_validator('close', 'open', 'high', 'low')
def check_positive(cls, v):
if v is not None and v <= 0:
raise ValueError(f"价格必须为正数,当前值: {v}")
return v
```
**使用场景**:对齐前验证原始 OHLCV 数据
```python
# 示例
df = pd.DataFrame({
'close': [100.0, 101.0, 102.0],
'open': [99.0, 100.0, 101.0]
})
# 验证每一行
for idx, row in df.head(10).iterrows():
OHLCVInputSchema(**row.to_dict()) # 失败会抛出异常
```
---
### 2. 输出验证 Schema
#### 因子对齐输出
```python
class AlignedFactorSchema(BaseModel):
"""对齐后的因子数据验证"""
value: float = Field(..., description="对齐后的因子值")
is_filled: bool = Field(False, description="是否为填充值")
class Config:
arbitrary_types_allowed = True # 允许 NaN
```
**验证装饰器**
```python
def validate_factor_after_align(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
# 1. 检查返回类型
if not isinstance(result, pd.DataFrame):
raise TypeError(f"必须返回 DataFrame")
# 2. 检查列
required_cols = ['value', 'is_filled']
missing_cols = [col for col in required_cols if col not in result.columns]
if missing_cols:
raise ValueError(f"缺少列: {missing_cols}")
# 3. 检查类型
if not pd.api.types.is_numeric_dtype(result['value']):
raise TypeError(f"value 列必须是数值类型")
if not pd.api.types.is_bool_dtype(result['is_filled']):
raise TypeError(f"is_filled 列必须是布尔类型")
return result
return wrapper
```
---
#### 收益率对齐输出
```python
class AlignedReturnsSchema(BaseModel):
"""对齐后的收益率数据验证"""
returns: float = Field(..., description="收益率")
@field_validator('returns')
def check_returns_range(cls, v):
"""收益率应在合理范围内(-50% ~ 50%"""
if abs(v) > 0.5:
import warnings
warnings.warn(f"收益率异常: {v:.2%}")
return v
```
**验证装饰器**
```python
def validate_returns_after_align(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
# 1. 检查返回类型
if not isinstance(result, pd.Series):
raise TypeError(f"必须返回 Series")
# 2. 检查无 NaN
if result.isna().any():
nan_count = result.isna().sum()
raise ValueError(f"收益率包含 {nan_count} 个 NaN")
# 3. 检查收益率范围
max_return = result.abs().max()
if max_return > 0.5:
import warnings
warnings.warn(f"发现异常收益率: {max_return:.2%}")
return result
return wrapper
```
---
#### 对齐验证结果
```python
class AlignmentValidationResult(BaseModel):
"""对齐验证结果"""
signals_aligned: bool = Field(..., description="信号是否已对齐")
returns_aligned: bool = Field(..., description="收益率是否已对齐")
common_dates_count: int = Field(..., description="共同日期数量")
lost_signals: int = Field(0, description="丢失的信号数")
lost_returns: int = Field(0, description="丢失的收益数")
@field_validator('common_dates_count')
def check_min_dates(cls, v):
"""共同日期至少 10 天"""
if v < 10:
raise ValueError(f"共同日期太少: {v} 天")
return v
```
---
## 💻 在 CrossMarketAligner 中使用
### 完整示例
```python
"""framework_v2/shared/data/alignment.py"""
from framework_v2.shared.data.schemas import (
OHLCVInputSchema,
AlignedFactorSchema,
AlignedReturnsSchema,
AlignmentValidationResult,
validate_factor_after_align,
validate_returns_after_align
)
class CrossMarketAligner:
"""跨市场数据对齐器"""
@validate_factor_after_align # ← Pydantic Schema 验证
def align_factor(
self,
factor_series: pd.Series,
source_calendar: pd.Index,
code: str = ''
) -> pd.DataFrame:
"""
对齐因子值到目标日历
验证流程:
1. 装饰器检查返回类型、列名、类型
2. 内置验证检查 NaN 比例、填充比例
"""
# 1. reindex + ffill
aligned = factor_series.reindex(self.target_calendar, method='ffill')
# 2. 标记填充值
is_filled = ~aligned.index.isin(source_calendar)
# 3. 内置验证NaN 比例、填充比例)
self._validate_factor_alignment(aligned, is_filled, code)
return pd.DataFrame({
'value': aligned,
'is_filled': is_filled
}, index=self.target_calendar)
@validate_returns_after_align # ← Pydantic Schema 验证
def align_returns(
self,
close_series: pd.Series,
code: str
) -> pd.Series:
"""
对齐收益率到目标日历
验证流程:
1. 装饰器检查返回类型、无 NaN、收益率范围
2. 内置验证检查 NaN 比例、异常值、索引一致性
"""
# 1. 价格对齐
close_aligned = close_series.reindex(
self.target_calendar,
method='ffill'
)
# 2. 计算收益率
returns = close_aligned.pct_change(fill_method=None)
# 3. 填充首日 NaN
if len(returns) > 0:
returns.iloc[0] = 0.0
# 4. 填充剩余 NaN
nan_ratio = returns.isna().sum() / len(returns)
if nan_ratio > 0:
returns = returns.fillna(0.0)
warnings.warn(f"{code}: 收益率 NaN 比例 {nan_ratio:.1%},已填充为 0")
# 5. 内置验证
self._validate_returns(returns, code)
return returns
def validate_alignment(
self,
signals: pd.DataFrame,
returns_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
验证信号与收益率对齐
验证流程:
1. 内置验证(共同日期、丢失日期)
2. Pydantic Schema 验证AlignmentValidationResult
"""
# 1. 找共同日期
common_dates = signals.index.intersection(returns_df.index)
# 2. 检查丢失的日期
lost_signals = len(signals) - len(common_dates)
lost_returns = len(returns_df) - len(common_dates)
if lost_signals > 0 or lost_returns > 0:
warnings.warn(...)
# 3. 检查对齐后日期是否太少
if len(common_dates) < 10:
raise ValueError(...)
# 4. 裁剪到共同日期
aligned_signals = signals.loc[common_dates]
aligned_returns = returns_df.loc[common_dates]
# 5. 使用 Pydantic Schema 验证结果
validation_result = AlignmentValidationResult(
signals_aligned=True,
returns_aligned=True,
common_dates_count=len(common_dates),
lost_signals=lost_signals,
lost_returns=lost_returns
)
# 6. 如果验证失败Pydantic 会抛出异常
# (例如 common_dates_count < 10
return aligned_signals, aligned_returns
```
---
## 📊 验证层级对比
| 层级 | 验证内容 | 工具 | 失败处理 |
|------|----------|------|----------|
| **输入验证** | 列名、类型、值范围 | Pydantic Schema | 立即报错 |
| **对齐验证** | NaN 比例、填充比例、异常值 | 内置方法 | 警告/报错 |
| **输出验证** | 返回类型、列名、类型、无 NaN | 装饰器 + Pydantic | 立即报错 |
| **最终验证** | 共同日期数、丢失日期 | Pydantic Schema | 立即报错 |
---
## 🎯 使用示例
### 完整流程
```python
from framework_v2.shared.data.alignment import CrossMarketAligner
from framework_v2.core.alignment_schemas import OHLCVInputSchema
# 1. 获取原始数据
index_data = {
'^GSPC': df_sp500, # 美股日历
'^HSI': df_hsi, # 港股日历
'000300.SH': df_hs300 # A 股日历
}
# 2. 输入验证(可选,开发阶段使用)
for code, df in index_data.items():
for idx, row in df.head(10).iterrows():
OHLCVInputSchema(**row.to_dict()) # 验证前 10 行
# 3. 创建对齐器
aligner = CrossMarketAligner(target_calendar=a_share_dates)
# 4. 对齐因子(自动验证)
factor_aligned = {}
for code, factor_series in factor_raw.items():
# ✅ 自动触发 @validate_factor_after_align
result = aligner.align_factor(
factor_series,
source_calendar=index_data[code].index,
code=code
)
factor_aligned[code] = result['value']
# 5. 对齐收益率(自动验证)
returns_df = aligner.align_multi_asset({
code: df['close']
for code, df in index_data.items()
})
# ✅ 内部调用 align_returns自动触发 @validate_returns_after_align
# 6. 验证信号与收益率对齐(自动验证)
aligned_signals, aligned_returns = aligner.validate_alignment(
signals,
returns_df
)
# ✅ 自动创建 AlignmentValidationResult 并验证
```
---
## 🔧 配置验证级别
### 开发环境:完整验证
```python
# 开发环境:所有验证都启用
class CrossMarketAligner:
@validate_factor_after_align
def align_factor(self, ...):
...
@validate_returns_after_align
def align_returns(self, ...):
...
```
### 生产环境:轻量验证
```python
# 生产环境:只保留关键验证
import os
VALIDATION_LEVEL = os.getenv('FRAMEWORK_VALIDATION', 'light')
if VALIDATION_LEVEL == 'full':
# 完整验证
def align_factor(self, ...):
...
elif VALIDATION_LEVEL == 'light':
# 轻量验证(只检查关键项)
def align_factor(self, ...):
# 跳过 Pydantic 验证,只保留内置验证
...
```
---
## 📈 错误信息示例
### 输入验证失败
```python
# 错误close 列缺失
df = pd.DataFrame({'open': [100.0, 101.0]})
OHLCVInputSchema(**df.iloc[0].to_dict())
# 错误信息:
ValidationError: 1 validation error for OHLCVInputSchema
close
Field required [type=missing, input_value={'open': 100.0}, input_type=dict]
```
### 输出验证失败
```python
# 错误:返回值不是 DataFrame
def align_factor(self, ...):
return pd.Series(...) # ❌ 错误
# 错误信息:
TypeError: align_factor 必须返回 DataFrame当前返回 <class 'pandas.core.series.Series'>
```
### 对齐验证失败
```python
# 错误:共同日期太少
common_dates = pd.date_range('2024-01-01', periods=5) # 只有 5 天
AlignmentValidationResult(
signals_aligned=True,
returns_aligned=True,
common_dates_count=5, # ❌ < 10
lost_signals=0,
lost_returns=0
)
# 错误信息:
ValidationError: 1 validation error for AlignmentValidationResult
common_dates_count
共同日期太少: 5 [type=value_error, input_value=5, input_type=int]
```
---
## 🎯 优势总结
### 1. 早期失败
```python
# ❌ 没有验证:错误在回测阶段才发现
returns = aligner.align_returns(close_series, code='^GSPC')
# ... 回测跑到一半才发现收益率有 NaN
# ✅ 有验证:立即报错
returns = aligner.align_returns(close_series, code='^GSPC')
# ValidationError: 收益率包含 3 个 NaN
```
### 2. 类型安全
```python
# ✅ IDE 自动补全
result = aligner.align_factor(...)
result['value'] # IDE 知道这是 float
result['is_filled'] # IDE 知道这是 bool
# ✅ 类型检查
if result['is_filled']: # IDE 不会报警告
...
```
### 3. 文档化
```python
# Schema 本身就是文档
class AlignedFactorSchema(BaseModel):
value: float = Field(..., description="对齐后的因子值")
is_filled: bool = Field(False, description="是否为填充值")
# 开发者一看就知道输出格式
```
### 4. 一致性保证
```python
# ✅ 所有对齐结果都经过相同验证
result1 = aligner.align_factor(...) # 验证
result2 = aligner.align_factor(...) # 验证
result3 = aligner.align_factor(...) # 验证
# 保证:所有结果都有 value 和 is_filled 列,类型正确
```
---
## 🔗 相关文档
- **[数据架构方案](DATA_ARCHITECTURE.md)** - 完整的数据架构设计
- **[跨市场对齐方案](ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用指南
- **[数据流完整推演](DATA_FLOW_DEMO.md)** - 从 OHLCV 到最终收益的 7 个阶段推演
---
*创建日期: 2026-05-06*
*版本: 1.0.0*