# Aligner + Schema 验证整合方案 ## 📋 设计目标 将 **CrossMarketAligner**(数据对齐)与 **Pydantic Schema**(结构验证)结合,形成完整的数据质量保障体系。 --- ## 🏗️ 架构设计 ### 验证流程 ``` 原始 OHLCV 数据 ↓ [1. 输入验证] ← OHLCVInputSchema(Pydantic) │ - 检查 close 列存在 │ - 检查数值类型 │ - 检查价格 > 0 ↓ [2. CrossMarketAligner] ← 对齐 + 内置验证 │ - reindex + ffill │ - NaN 比例检查 │ - 异常值检测 ↓ [3. 输出验证] ← AlignedFactorSchema / AlignedReturnsSchema(Pydantic) │ - 检查列名(value, is_filled) │ - 检查类型(float, bool) │ - 检查无 NaN ↓ 下游组件(因子、信号、回测) ``` --- ## 📐 Schema 定义 ### 1. 输入验证 Schema ```python """framework_v2/shared/data/schemas.py""" class OHLCVInputSchema(BaseModel): """OHLCV 输入数据验证""" close: float = Field(..., description="收盘价(必需)", gt=0) open: Optional[float] = Field(None, description="开盘价", gt=0) high: Optional[float] = Field(None, description="最高价", gt=0) low: Optional[float] = Field(None, description="最低价", gt=0) volume: Optional[float] = Field(None, description="成交量", ge=0) @field_validator('close', 'open', 'high', 'low') def check_positive(cls, v): if v is not None and v <= 0: raise ValueError(f"价格必须为正数,当前值: {v}") return v ``` **使用场景**:对齐前验证原始 OHLCV 数据 ```python # 示例 df = pd.DataFrame({ 'close': [100.0, 101.0, 102.0], 'open': [99.0, 100.0, 101.0] }) # 验证每一行 for idx, row in df.head(10).iterrows(): OHLCVInputSchema(**row.to_dict()) # 失败会抛出异常 ``` --- ### 2. 输出验证 Schema #### 因子对齐输出 ```python class AlignedFactorSchema(BaseModel): """对齐后的因子数据验证""" value: float = Field(..., description="对齐后的因子值") is_filled: bool = Field(False, description="是否为填充值") class Config: arbitrary_types_allowed = True # 允许 NaN ``` **验证装饰器**: ```python def validate_factor_after_align(func): @wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) # 1. 检查返回类型 if not isinstance(result, pd.DataFrame): raise TypeError(f"必须返回 DataFrame") # 2. 检查列 required_cols = ['value', 'is_filled'] missing_cols = [col for col in required_cols if col not in result.columns] if missing_cols: raise ValueError(f"缺少列: {missing_cols}") # 3. 检查类型 if not pd.api.types.is_numeric_dtype(result['value']): raise TypeError(f"value 列必须是数值类型") if not pd.api.types.is_bool_dtype(result['is_filled']): raise TypeError(f"is_filled 列必须是布尔类型") return result return wrapper ``` --- #### 收益率对齐输出 ```python class AlignedReturnsSchema(BaseModel): """对齐后的收益率数据验证""" returns: float = Field(..., description="收益率") @field_validator('returns') def check_returns_range(cls, v): """收益率应在合理范围内(-50% ~ 50%)""" if abs(v) > 0.5: import warnings warnings.warn(f"收益率异常: {v:.2%}") return v ``` **验证装饰器**: ```python def validate_returns_after_align(func): @wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) # 1. 检查返回类型 if not isinstance(result, pd.Series): raise TypeError(f"必须返回 Series") # 2. 检查无 NaN if result.isna().any(): nan_count = result.isna().sum() raise ValueError(f"收益率包含 {nan_count} 个 NaN") # 3. 检查收益率范围 max_return = result.abs().max() if max_return > 0.5: import warnings warnings.warn(f"发现异常收益率: {max_return:.2%}") return result return wrapper ``` --- #### 对齐验证结果 ```python class AlignmentValidationResult(BaseModel): """对齐验证结果""" signals_aligned: bool = Field(..., description="信号是否已对齐") returns_aligned: bool = Field(..., description="收益率是否已对齐") common_dates_count: int = Field(..., description="共同日期数量") lost_signals: int = Field(0, description="丢失的信号数") lost_returns: int = Field(0, description="丢失的收益数") @field_validator('common_dates_count') def check_min_dates(cls, v): """共同日期至少 10 天""" if v < 10: raise ValueError(f"共同日期太少: {v} 天") return v ``` --- ## 💻 在 CrossMarketAligner 中使用 ### 完整示例 ```python """framework_v2/shared/data/alignment.py""" from framework_v2.shared.data.schemas import ( OHLCVInputSchema, AlignedFactorSchema, AlignedReturnsSchema, AlignmentValidationResult, validate_factor_after_align, validate_returns_after_align ) class CrossMarketAligner: """跨市场数据对齐器""" @validate_factor_after_align # ← Pydantic Schema 验证 def align_factor( self, factor_series: pd.Series, source_calendar: pd.Index, code: str = '' ) -> pd.DataFrame: """ 对齐因子值到目标日历 验证流程: 1. 装饰器检查返回类型、列名、类型 2. 内置验证检查 NaN 比例、填充比例 """ # 1. reindex + ffill aligned = factor_series.reindex(self.target_calendar, method='ffill') # 2. 标记填充值 is_filled = ~aligned.index.isin(source_calendar) # 3. 内置验证(NaN 比例、填充比例) self._validate_factor_alignment(aligned, is_filled, code) return pd.DataFrame({ 'value': aligned, 'is_filled': is_filled }, index=self.target_calendar) @validate_returns_after_align # ← Pydantic Schema 验证 def align_returns( self, close_series: pd.Series, code: str ) -> pd.Series: """ 对齐收益率到目标日历 验证流程: 1. 装饰器检查返回类型、无 NaN、收益率范围 2. 内置验证检查 NaN 比例、异常值、索引一致性 """ # 1. 价格对齐 close_aligned = close_series.reindex( self.target_calendar, method='ffill' ) # 2. 计算收益率 returns = close_aligned.pct_change(fill_method=None) # 3. 填充首日 NaN if len(returns) > 0: returns.iloc[0] = 0.0 # 4. 填充剩余 NaN nan_ratio = returns.isna().sum() / len(returns) if nan_ratio > 0: returns = returns.fillna(0.0) warnings.warn(f"{code}: 收益率 NaN 比例 {nan_ratio:.1%},已填充为 0") # 5. 内置验证 self._validate_returns(returns, code) return returns def validate_alignment( self, signals: pd.DataFrame, returns_df: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ 验证信号与收益率对齐 验证流程: 1. 内置验证(共同日期、丢失日期) 2. Pydantic Schema 验证(AlignmentValidationResult) """ # 1. 找共同日期 common_dates = signals.index.intersection(returns_df.index) # 2. 检查丢失的日期 lost_signals = len(signals) - len(common_dates) lost_returns = len(returns_df) - len(common_dates) if lost_signals > 0 or lost_returns > 0: warnings.warn(...) # 3. 检查对齐后日期是否太少 if len(common_dates) < 10: raise ValueError(...) # 4. 裁剪到共同日期 aligned_signals = signals.loc[common_dates] aligned_returns = returns_df.loc[common_dates] # 5. 使用 Pydantic Schema 验证结果 validation_result = AlignmentValidationResult( signals_aligned=True, returns_aligned=True, common_dates_count=len(common_dates), lost_signals=lost_signals, lost_returns=lost_returns ) # 6. 如果验证失败,Pydantic 会抛出异常 # (例如 common_dates_count < 10) return aligned_signals, aligned_returns ``` --- ## 📊 验证层级对比 | 层级 | 验证内容 | 工具 | 失败处理 | |------|----------|------|----------| | **输入验证** | 列名、类型、值范围 | Pydantic Schema | 立即报错 | | **对齐验证** | NaN 比例、填充比例、异常值 | 内置方法 | 警告/报错 | | **输出验证** | 返回类型、列名、类型、无 NaN | 装饰器 + Pydantic | 立即报错 | | **最终验证** | 共同日期数、丢失日期 | Pydantic Schema | 立即报错 | --- ## 🎯 使用示例 ### 完整流程 ```python from framework_v2.shared.data.alignment import CrossMarketAligner from framework_v2.core.alignment_schemas import OHLCVInputSchema # 1. 获取原始数据 index_data = { '^GSPC': df_sp500, # 美股日历 '^HSI': df_hsi, # 港股日历 '000300.SH': df_hs300 # A 股日历 } # 2. 输入验证(可选,开发阶段使用) for code, df in index_data.items(): for idx, row in df.head(10).iterrows(): OHLCVInputSchema(**row.to_dict()) # 验证前 10 行 # 3. 创建对齐器 aligner = CrossMarketAligner(target_calendar=a_share_dates) # 4. 对齐因子(自动验证) factor_aligned = {} for code, factor_series in factor_raw.items(): # ✅ 自动触发 @validate_factor_after_align result = aligner.align_factor( factor_series, source_calendar=index_data[code].index, code=code ) factor_aligned[code] = result['value'] # 5. 对齐收益率(自动验证) returns_df = aligner.align_multi_asset({ code: df['close'] for code, df in index_data.items() }) # ✅ 内部调用 align_returns,自动触发 @validate_returns_after_align # 6. 验证信号与收益率对齐(自动验证) aligned_signals, aligned_returns = aligner.validate_alignment( signals, returns_df ) # ✅ 自动创建 AlignmentValidationResult 并验证 ``` --- ## 🔧 配置验证级别 ### 开发环境:完整验证 ```python # 开发环境:所有验证都启用 class CrossMarketAligner: @validate_factor_after_align def align_factor(self, ...): ... @validate_returns_after_align def align_returns(self, ...): ... ``` ### 生产环境:轻量验证 ```python # 生产环境:只保留关键验证 import os VALIDATION_LEVEL = os.getenv('FRAMEWORK_VALIDATION', 'light') if VALIDATION_LEVEL == 'full': # 完整验证 def align_factor(self, ...): ... elif VALIDATION_LEVEL == 'light': # 轻量验证(只检查关键项) def align_factor(self, ...): # 跳过 Pydantic 验证,只保留内置验证 ... ``` --- ## 📈 错误信息示例 ### 输入验证失败 ```python # 错误:close 列缺失 df = pd.DataFrame({'open': [100.0, 101.0]}) OHLCVInputSchema(**df.iloc[0].to_dict()) # 错误信息: ValidationError: 1 validation error for OHLCVInputSchema close Field required [type=missing, input_value={'open': 100.0}, input_type=dict] ``` ### 输出验证失败 ```python # 错误:返回值不是 DataFrame def align_factor(self, ...): return pd.Series(...) # ❌ 错误 # 错误信息: TypeError: align_factor 必须返回 DataFrame,当前返回 ``` ### 对齐验证失败 ```python # 错误:共同日期太少 common_dates = pd.date_range('2024-01-01', periods=5) # 只有 5 天 AlignmentValidationResult( signals_aligned=True, returns_aligned=True, common_dates_count=5, # ❌ < 10 lost_signals=0, lost_returns=0 ) # 错误信息: ValidationError: 1 validation error for AlignmentValidationResult common_dates_count 共同日期太少: 5 天 [type=value_error, input_value=5, input_type=int] ``` --- ## 🎯 优势总结 ### 1. 早期失败 ```python # ❌ 没有验证:错误在回测阶段才发现 returns = aligner.align_returns(close_series, code='^GSPC') # ... 回测跑到一半才发现收益率有 NaN # ✅ 有验证:立即报错 returns = aligner.align_returns(close_series, code='^GSPC') # ValidationError: 收益率包含 3 个 NaN ``` ### 2. 类型安全 ```python # ✅ IDE 自动补全 result = aligner.align_factor(...) result['value'] # IDE 知道这是 float result['is_filled'] # IDE 知道这是 bool # ✅ 类型检查 if result['is_filled']: # IDE 不会报警告 ... ``` ### 3. 文档化 ```python # Schema 本身就是文档 class AlignedFactorSchema(BaseModel): value: float = Field(..., description="对齐后的因子值") is_filled: bool = Field(False, description="是否为填充值") # 开发者一看就知道输出格式 ``` ### 4. 一致性保证 ```python # ✅ 所有对齐结果都经过相同验证 result1 = aligner.align_factor(...) # 验证 result2 = aligner.align_factor(...) # 验证 result3 = aligner.align_factor(...) # 验证 # 保证:所有结果都有 value 和 is_filled 列,类型正确 ``` --- ## 🔗 相关文档 - **[数据架构方案](DATA_ARCHITECTURE.md)** - 完整的数据架构设计 - **[跨市场对齐方案](ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用指南 - **[数据流完整推演](DATA_FLOW_DEMO.md)** - 从 OHLCV 到最终收益的 7 个阶段推演 --- *创建日期: 2026-05-06* *版本: 1.0.0*