## 文档体系(5 个文档,互相关联) - README.md - 框架总览 + 文档索引 - DATA_ARCHITECTURE.md - 数据架构方案(Schema、验证、性能优化) - ALIGNMENT_GUIDE.md - CrossMarketAligner 使用指南 - DATA_FLOW_DEMO.md - 从 OHLCV 到最终收益的 7 个阶段推演 - ALIGNMENT_SCHEMA_INTEGRATION.md - Aligner + Schema 整合方案 ## 文档特色 - 大量代码示例(✅ 正确 vs ❌ 错误对比) - 数据流可视化(ASCII 图) - 表格总结(问题、严重度、解决方案) - 实际场景推演(2024-01-01 ~ 2024-01-31) - 文档互链(形成知识网络) ## 修复 - .gitignore: 添加 !framework_v2/shared/data/ 例外 - 允许提交对齐器相关文件
14 KiB
14 KiB
Aligner + Schema 验证整合方案
📋 设计目标
将 CrossMarketAligner(数据对齐)与 Pydantic Schema(结构验证)结合,形成完整的数据质量保障体系。
🏗️ 架构设计
验证流程
原始 OHLCV 数据
↓
[1. 输入验证] ← OHLCVInputSchema(Pydantic)
│ - 检查 close 列存在
│ - 检查数值类型
│ - 检查价格 > 0
↓
[2. CrossMarketAligner] ← 对齐 + 内置验证
│ - reindex + ffill
│ - NaN 比例检查
│ - 异常值检测
↓
[3. 输出验证] ← AlignedFactorSchema / AlignedReturnsSchema(Pydantic)
│ - 检查列名(value, is_filled)
│ - 检查类型(float, bool)
│ - 检查无 NaN
↓
下游组件(因子、信号、回测)
📐 Schema 定义
1. 输入验证 Schema
"""framework_v2/shared/data/schemas.py"""
class OHLCVInputSchema(BaseModel):
"""OHLCV 输入数据验证"""
close: float = Field(..., description="收盘价(必需)", gt=0)
open: Optional[float] = Field(None, description="开盘价", gt=0)
high: Optional[float] = Field(None, description="最高价", gt=0)
low: Optional[float] = Field(None, description="最低价", gt=0)
volume: Optional[float] = Field(None, description="成交量", ge=0)
@field_validator('close', 'open', 'high', 'low')
def check_positive(cls, v):
if v is not None and v <= 0:
raise ValueError(f"价格必须为正数,当前值: {v}")
return v
使用场景:对齐前验证原始 OHLCV 数据
# 示例
df = pd.DataFrame({
'close': [100.0, 101.0, 102.0],
'open': [99.0, 100.0, 101.0]
})
# 验证每一行
for idx, row in df.head(10).iterrows():
OHLCVInputSchema(**row.to_dict()) # 失败会抛出异常
2. 输出验证 Schema
因子对齐输出
class AlignedFactorSchema(BaseModel):
"""对齐后的因子数据验证"""
value: float = Field(..., description="对齐后的因子值")
is_filled: bool = Field(False, description="是否为填充值")
class Config:
arbitrary_types_allowed = True # 允许 NaN
验证装饰器:
def validate_factor_after_align(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
# 1. 检查返回类型
if not isinstance(result, pd.DataFrame):
raise TypeError(f"必须返回 DataFrame")
# 2. 检查列
required_cols = ['value', 'is_filled']
missing_cols = [col for col in required_cols if col not in result.columns]
if missing_cols:
raise ValueError(f"缺少列: {missing_cols}")
# 3. 检查类型
if not pd.api.types.is_numeric_dtype(result['value']):
raise TypeError(f"value 列必须是数值类型")
if not pd.api.types.is_bool_dtype(result['is_filled']):
raise TypeError(f"is_filled 列必须是布尔类型")
return result
return wrapper
收益率对齐输出
class AlignedReturnsSchema(BaseModel):
"""对齐后的收益率数据验证"""
returns: float = Field(..., description="收益率")
@field_validator('returns')
def check_returns_range(cls, v):
"""收益率应在合理范围内(-50% ~ 50%)"""
if abs(v) > 0.5:
import warnings
warnings.warn(f"收益率异常: {v:.2%}")
return v
验证装饰器:
def validate_returns_after_align(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
# 1. 检查返回类型
if not isinstance(result, pd.Series):
raise TypeError(f"必须返回 Series")
# 2. 检查无 NaN
if result.isna().any():
nan_count = result.isna().sum()
raise ValueError(f"收益率包含 {nan_count} 个 NaN")
# 3. 检查收益率范围
max_return = result.abs().max()
if max_return > 0.5:
import warnings
warnings.warn(f"发现异常收益率: {max_return:.2%}")
return result
return wrapper
对齐验证结果
class AlignmentValidationResult(BaseModel):
"""对齐验证结果"""
signals_aligned: bool = Field(..., description="信号是否已对齐")
returns_aligned: bool = Field(..., description="收益率是否已对齐")
common_dates_count: int = Field(..., description="共同日期数量")
lost_signals: int = Field(0, description="丢失的信号数")
lost_returns: int = Field(0, description="丢失的收益数")
@field_validator('common_dates_count')
def check_min_dates(cls, v):
"""共同日期至少 10 天"""
if v < 10:
raise ValueError(f"共同日期太少: {v} 天")
return v
💻 在 CrossMarketAligner 中使用
完整示例
"""framework_v2/shared/data/alignment.py"""
from framework_v2.shared.data.schemas import (
OHLCVInputSchema,
AlignedFactorSchema,
AlignedReturnsSchema,
AlignmentValidationResult,
validate_factor_after_align,
validate_returns_after_align
)
class CrossMarketAligner:
"""跨市场数据对齐器"""
@validate_factor_after_align # ← Pydantic Schema 验证
def align_factor(
self,
factor_series: pd.Series,
source_calendar: pd.Index,
code: str = ''
) -> pd.DataFrame:
"""
对齐因子值到目标日历
验证流程:
1. 装饰器检查返回类型、列名、类型
2. 内置验证检查 NaN 比例、填充比例
"""
# 1. reindex + ffill
aligned = factor_series.reindex(self.target_calendar, method='ffill')
# 2. 标记填充值
is_filled = ~aligned.index.isin(source_calendar)
# 3. 内置验证(NaN 比例、填充比例)
self._validate_factor_alignment(aligned, is_filled, code)
return pd.DataFrame({
'value': aligned,
'is_filled': is_filled
}, index=self.target_calendar)
@validate_returns_after_align # ← Pydantic Schema 验证
def align_returns(
self,
close_series: pd.Series,
code: str
) -> pd.Series:
"""
对齐收益率到目标日历
验证流程:
1. 装饰器检查返回类型、无 NaN、收益率范围
2. 内置验证检查 NaN 比例、异常值、索引一致性
"""
# 1. 价格对齐
close_aligned = close_series.reindex(
self.target_calendar,
method='ffill'
)
# 2. 计算收益率
returns = close_aligned.pct_change(fill_method=None)
# 3. 填充首日 NaN
if len(returns) > 0:
returns.iloc[0] = 0.0
# 4. 填充剩余 NaN
nan_ratio = returns.isna().sum() / len(returns)
if nan_ratio > 0:
returns = returns.fillna(0.0)
warnings.warn(f"{code}: 收益率 NaN 比例 {nan_ratio:.1%},已填充为 0")
# 5. 内置验证
self._validate_returns(returns, code)
return returns
def validate_alignment(
self,
signals: pd.DataFrame,
returns_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
验证信号与收益率对齐
验证流程:
1. 内置验证(共同日期、丢失日期)
2. Pydantic Schema 验证(AlignmentValidationResult)
"""
# 1. 找共同日期
common_dates = signals.index.intersection(returns_df.index)
# 2. 检查丢失的日期
lost_signals = len(signals) - len(common_dates)
lost_returns = len(returns_df) - len(common_dates)
if lost_signals > 0 or lost_returns > 0:
warnings.warn(...)
# 3. 检查对齐后日期是否太少
if len(common_dates) < 10:
raise ValueError(...)
# 4. 裁剪到共同日期
aligned_signals = signals.loc[common_dates]
aligned_returns = returns_df.loc[common_dates]
# 5. 使用 Pydantic Schema 验证结果
validation_result = AlignmentValidationResult(
signals_aligned=True,
returns_aligned=True,
common_dates_count=len(common_dates),
lost_signals=lost_signals,
lost_returns=lost_returns
)
# 6. 如果验证失败,Pydantic 会抛出异常
# (例如 common_dates_count < 10)
return aligned_signals, aligned_returns
📊 验证层级对比
| 层级 | 验证内容 | 工具 | 失败处理 |
|---|---|---|---|
| 输入验证 | 列名、类型、值范围 | Pydantic Schema | 立即报错 |
| 对齐验证 | NaN 比例、填充比例、异常值 | 内置方法 | 警告/报错 |
| 输出验证 | 返回类型、列名、类型、无 NaN | 装饰器 + Pydantic | 立即报错 |
| 最终验证 | 共同日期数、丢失日期 | Pydantic Schema | 立即报错 |
🎯 使用示例
完整流程
from framework_v2.shared.data.alignment import CrossMarketAligner
from framework_v2.core.alignment_schemas import OHLCVInputSchema
# 1. 获取原始数据
index_data = {
'^GSPC': df_sp500, # 美股日历
'^HSI': df_hsi, # 港股日历
'000300.SH': df_hs300 # A 股日历
}
# 2. 输入验证(可选,开发阶段使用)
for code, df in index_data.items():
for idx, row in df.head(10).iterrows():
OHLCVInputSchema(**row.to_dict()) # 验证前 10 行
# 3. 创建对齐器
aligner = CrossMarketAligner(target_calendar=a_share_dates)
# 4. 对齐因子(自动验证)
factor_aligned = {}
for code, factor_series in factor_raw.items():
# ✅ 自动触发 @validate_factor_after_align
result = aligner.align_factor(
factor_series,
source_calendar=index_data[code].index,
code=code
)
factor_aligned[code] = result['value']
# 5. 对齐收益率(自动验证)
returns_df = aligner.align_multi_asset({
code: df['close']
for code, df in index_data.items()
})
# ✅ 内部调用 align_returns,自动触发 @validate_returns_after_align
# 6. 验证信号与收益率对齐(自动验证)
aligned_signals, aligned_returns = aligner.validate_alignment(
signals,
returns_df
)
# ✅ 自动创建 AlignmentValidationResult 并验证
🔧 配置验证级别
开发环境:完整验证
# 开发环境:所有验证都启用
class CrossMarketAligner:
@validate_factor_after_align
def align_factor(self, ...):
...
@validate_returns_after_align
def align_returns(self, ...):
...
生产环境:轻量验证
# 生产环境:只保留关键验证
import os
VALIDATION_LEVEL = os.getenv('FRAMEWORK_VALIDATION', 'light')
if VALIDATION_LEVEL == 'full':
# 完整验证
def align_factor(self, ...):
...
elif VALIDATION_LEVEL == 'light':
# 轻量验证(只检查关键项)
def align_factor(self, ...):
# 跳过 Pydantic 验证,只保留内置验证
...
📈 错误信息示例
输入验证失败
# 错误:close 列缺失
df = pd.DataFrame({'open': [100.0, 101.0]})
OHLCVInputSchema(**df.iloc[0].to_dict())
# 错误信息:
ValidationError: 1 validation error for OHLCVInputSchema
close
Field required [type=missing, input_value={'open': 100.0}, input_type=dict]
输出验证失败
# 错误:返回值不是 DataFrame
def align_factor(self, ...):
return pd.Series(...) # ❌ 错误
# 错误信息:
TypeError: align_factor 必须返回 DataFrame,当前返回 <class 'pandas.core.series.Series'>
对齐验证失败
# 错误:共同日期太少
common_dates = pd.date_range('2024-01-01', periods=5) # 只有 5 天
AlignmentValidationResult(
signals_aligned=True,
returns_aligned=True,
common_dates_count=5, # ❌ < 10
lost_signals=0,
lost_returns=0
)
# 错误信息:
ValidationError: 1 validation error for AlignmentValidationResult
common_dates_count
共同日期太少: 5 天 [type=value_error, input_value=5, input_type=int]
🎯 优势总结
1. 早期失败
# ❌ 没有验证:错误在回测阶段才发现
returns = aligner.align_returns(close_series, code='^GSPC')
# ... 回测跑到一半才发现收益率有 NaN
# ✅ 有验证:立即报错
returns = aligner.align_returns(close_series, code='^GSPC')
# ValidationError: 收益率包含 3 个 NaN
2. 类型安全
# ✅ IDE 自动补全
result = aligner.align_factor(...)
result['value'] # IDE 知道这是 float
result['is_filled'] # IDE 知道这是 bool
# ✅ 类型检查
if result['is_filled']: # IDE 不会报警告
...
3. 文档化
# Schema 本身就是文档
class AlignedFactorSchema(BaseModel):
value: float = Field(..., description="对齐后的因子值")
is_filled: bool = Field(False, description="是否为填充值")
# 开发者一看就知道输出格式
4. 一致性保证
# ✅ 所有对齐结果都经过相同验证
result1 = aligner.align_factor(...) # 验证
result2 = aligner.align_factor(...) # 验证
result3 = aligner.align_factor(...) # 验证
# 保证:所有结果都有 value 和 is_filled 列,类型正确
🔗 相关文档
创建日期: 2026-05-06 版本: 1.0.0