# 跨市场数据流完整推演 ## 📋 场景设定 ### 标的池 | 标的 | 市场 | 年交易日 | 特点 | |------|------|----------|------| | ^GSPC(标普500) | 美股 | 252 天 | 美国假日(马丁路德金日、感恩节) | | ^HSI(恒生指数) | 港股 | 250 天 | 香港假日(佛诞日、圣诞节) | | 000300.SH(沪深300) | A 股 | 244 天 | 中国假日(春节、国庆) | **目标日历**:A 股交易日(244 天) ### 假日差异示例(2024 年 1 月) | 日期 | 美股 | 港股 | A 股 | 说明 | |------|------|------|------|------| | 2024-01-01 | 休市 | 休市 | 休市 | 元旦(共同假日) | | 2024-01-02 | 交易 | 交易 | 交易 | - | | 2024-01-15 | 休市 | 交易 | 交易 | 马丁路德金日(仅美股休市) | | 2024-02-10 | 交易 | 交易 | 休市 | 春节(仅 A 股休市) | --- ## 🎬 完整数据流推演(7 个阶段) ### 阶段 0:原始数据(不同日历) ```python # 从数据源获取原始 OHLCV 数据 index_data = { '^GSPC': DataFrame(252 rows, US calendar), # 美股 '^HSI': DataFrame(250 rows, HK calendar), # 港股 '000300.SH': DataFrame(244 rows, CN calendar) # A 股 } # 示例:2024 年 1 月第一周 ^GSPC (美股): 日期 close 2024-01-01 4770.0 ← 元旦(美股休市,无数据或从 yfinance 获取) 2024-01-02 4780.0 2024-01-03 4790.0 2024-01-04 4785.0 2024-01-05 4800.0 ... ^HSI (港股): 日期 close 2024-01-01 17050.0 ← 元旦(港股休市) 2024-01-02 17100.0 2024-01-03 17150.0 2024-01-04 17120.0 2024-01-05 17200.0 ... 000300.SH (A 股): 日期 close 2024-01-01 3500.0 ← 元旦(A 股休市) 2024-01-02 3510.0 2024-01-03 3520.0 2024-01-04 3515.0 2024-01-05 3530.0 ... ``` **关键问题**: - ❌ 三个市场的交易日历不同 - ❌ 直接合并会导致大量 NaN - ❌ 无法直接计算因子和收益 --- ### 阶段 1:因子计算(在原始日历) ```python from framework_v2.shared.factors import MomentumFactor factor = MomentumFactor(n_days=25, weighted=True, crash_filter=True) # ✅ 关键:在原始交易日历计算因子(不 ffill) factor_raw = {} for code, df in index_data.items(): factor_raw[code] = factor.compute(df) # 结果:每个标的在各自的交易日历上有因子值 ^GSPC 因子(美股日历 252 天): 日期 factor 2024-01-01 NaN ← 前 25 天数据不足 ... 2024-01-26 0.15 ← 第 26 天开始有因子值 2024-01-29 0.16 2024-01-30 0.17 ... ^HSI 因子(港股日历 250 天): 日期 factor 2024-01-02 NaN ... 2024-01-26 0.14 2024-01-29 0.15 2024-01-30 0.16 ... 000300.SH 因子(A 股日历 244 天): 日期 factor 2024-01-02 NaN ... 2024-01-29 0.13 2024-01-30 0.14 ... ``` **为什么在原始日历计算?** 1. ✅ rolling window 使用**真实交易日**(25 天) 2. ✅ 线性回归权重基于真实数据分布 3. ❌ 如果先 ffill,窗口会包含重复值,影响因子精度 --- ### 阶段 2:对齐因子到 A 股日历 ```python from framework_v2.shared.data.alignment import CrossMarketAligner # 创建对齐器(目标日历 = A 股) aligner = CrossMarketAligner(target_calendar=a_share_dates) # 244 天 # 对齐每个标的的因子值 factor_aligned = {} for code, factor_series in factor_raw.items(): aligned = aligner.align_factor( factor_series, source_calendar=index_data[code].index, # 原始日历 code=code ) factor_aligned[code] = aligned['value'] # 提取因子值列 # 结果:所有因子值对齐到 A 股日历(244 天) ^GSPC 因子(A 股日历 244 天): 日期 factor is_filled 2024-01-01 NaN False ← A 股休市(元旦) 2024-01-02 NaN False ← 数据不足(前 25 天) ... 2024-01-26 0.15 False ← 真实值 2024-01-29 0.16 False ← 真实值 2024-01-30 0.16 True ← ffill(美股休市,填充前一天的因子值) 2024-01-31 0.17 False ← 真实值 ... ^HSI 因子(A 股日历 244 天): 日期 factor is_filled 2024-01-01 NaN False ← A 股休市 2024-01-02 0.14 False ← 真实值 ... 2024-01-29 0.15 False ← 真实值 2024-01-30 0.15 True ← ffill(港股休市) ... 000300.SH 因子(A 股日历 244 天): 日期 factor is_filled 2024-01-01 NaN False ← A 股休市 2024-01-02 0.13 False ← 真实值 ... 2024-01-29 0.14 False ← 真实值 2024-01-30 0.14 False ← 真实值(A 股正常交易) ... ``` **CrossMarketAligner 的作用**: ```python def align_factor(self, factor_series, source_calendar, code): # 1. reindex + ffill aligned = factor_series.reindex(self.target_calendar, method='ffill') # 2. 标记填充值(不在 source_calendar 中的日期) is_filled = ~aligned.index.isin(source_calendar) # 3. 验证 self._validate_factor_alignment(aligned, is_filled, code) return pd.DataFrame({ 'value': aligned, 'is_filled': is_filled }) ``` **验证逻辑**: ```python def _validate_factor_alignment(self, aligned, is_filled, code): # 1. 检查 NaN 比例 nan_ratio = aligned.isna().sum() / len(aligned) if nan_ratio > 0.1: # > 10% warnings.warn(f"{code}: 因子 NaN 比例过高 ({nan_ratio:.1%})") # 2. 检查填充比例 fill_ratio = is_filled.sum() / len(is_filled) if fill_ratio > 0.3: # > 30% warnings.warn(f"{code}: 因子填充比例过高 ({fill_ratio:.1%})") ``` --- ### 阶段 3:生成信号 ```python from framework_v2.shared.signals import TopNSelector selector = TopNSelector(select_num=3, min_score=0.0) # 合并所有因子为 DataFrame factor_df = pd.DataFrame(factor_aligned) # 索引:A 股日历(244 天) # 列:['^GSPC', '^HSI', '000300.SH'] # 生成信号 signals = selector.generate(factor_df) # 结果: signals DataFrame(A 股日历 244 天): 日期 signal 2024-01-01 '' ← 因子全 NaN,空信号 2024-01-02 '' ← 因子全 NaN ... 2024-01-29 '^GSPC,^HSI,000300.SH' ← Top 3 2024-01-30 '^GSPC,^HSI,000300.SH' ← 调仓控制(保持上次信号) 2024-01-31 '^GSPC,000300.SH' ← ^HSI 动量下降,被替换 ... ``` --- ### 阶段 4:对齐收益率到 A 股日历(⭐ 关键步骤) ```python # 对每个标的计算收益率 returns_aligned = {} for code, df in index_data.items(): close_series = df['close'] # ✅ 使用 aligner 对齐收益率 returns = aligner.align_returns( close_series, code=code ) returns_aligned[code] = returns # 结果: ^GSPC 收益率(A 股日历 244 天): 日期 close(ffill) returns 2024-01-01 4770.0 0.0000 ← 首日 2024-01-02 4780.0 0.0021 ← +0.21% 2024-01-03 4790.0 0.0021 ← +0.21% 2024-01-04 4785.0 -0.0010 ← -0.10% ... 2024-01-30 4810.0 ← ffill(美股休市) 0.0000 ← 0%(价格不变)✓ 2024-01-31 4820.0 0.0021 ← +0.21% ^HSI 收益率(A 股日历 244 天): 日期 close(ffill) returns 2024-01-01 17050.0 0.0000 ← 首日(ffill) 2024-01-02 17100.0 0.0029 ← +0.29% ... 2024-01-30 17200.0 ← ffill(港股休市) 0.0000 ← 0% ✓ 000300.SH 收益率(A 股日历 244 天): 日期 close returns 2024-01-01 3500.0 0.0000 ← 首日 2024-01-02 3510.0 0.0029 ← +0.29% ... 2024-01-30 3540.0 0.0028 ← +0.28%(A 股正常交易) ``` **CrossMarketAligner 的核心逻辑**: ```python def align_returns(self, close_series, code): # ✅ 步骤 1:价格先对齐到 A 股日历 close_aligned = close_series.reindex( self.target_calendar, method='ffill' ) # 休市日价格不变(ffill 填充前一天的价格) # 例:2024-01-30 美股休市,close_aligned['2024-01-30'] = 4800(前一日价格) # ✅ 步骤 2:计算收益率 returns = close_aligned.pct_change(fill_method=None) # 休市日:(今日价格 - 昨日价格) / 昨日价格 # = (4800 - 4800) / 4800 = 0% # 因为 ffill 后,今日价格 = 昨日价格 # ✅ 步骤 3:填充首日 NaN returns.iloc[0] = 0.0 # 首日无前一日,收益率 = 0 # ✅ 步骤 4:填充剩余 NaN(如果有) returns = returns.fillna(0.0) # 用 0 填充(表示"无数据,收益率为 0") # ✅ 步骤 5:验证 self._validate_returns(returns, code) return returns ``` **为什么这样正确?** | 日期 | 美股状态 | 价格(ffill) | 收益率计算 | 结果 | |------|----------|---------------|------------|------| | 2024-01-29 | 正常交易 | 4800 | (4800 - 4790) / 4790 | +0.21% | | 2024-01-30 | **休市** | 4800 (ffill) | (4800 - 4800) / 4800 | **0%** ✓ | | 2024-01-31 | 正常交易 | 4820 | (4820 - 4800) / 4800 | +0.42% | **对比错误做法**: ```python # ❌ 错误:先计算收益率,再 ffill returns_wrong = close.pct_change() # 美股日历 returns_aligned = returns_wrong.reindex(a_share_dates, method='ffill') 日期 收益率(美股) A股日历 对齐后收益率 2024-01-29 +0.21% 2024-01-29 +0.21% 2024-01-30 无(休市) 2024-01-30 +0.21% ← 错误!复制了前一天的收益率 2024-01-31 +0.42% 2024-01-31 +0.42% # 问题:A 股交易日"继承"了美股休市前一天的收益率 # 结果:净值被高估(多算了 0.21%) ``` **验证逻辑**: ```python def _validate_returns(self, returns, code): # 1. 检查 NaN 比例 nan_ratio = returns.isna().sum() / len(returns) if nan_ratio > 0.1: # > 10% raise ValueError(f"{code}: 收益率 NaN 比例过高 ({nan_ratio:.1%})") # 2. 检查异常值 max_return = returns.abs().max() if max_return > 0.5: # 单日涨跌 > 50% warnings.warn(f"{code}: 发现异常收益率 ({max_return:.1%})") # 3. 检查索引是否匹配目标日历 if not returns.index.equals(self.target_calendar): raise ValueError(f"{code}: 收益率索引与目标日历不匹配") ``` --- ### 阶段 5:合并多标的收益率 ```python # 合并为 DataFrame returns_df = pd.DataFrame(returns_aligned) # 结果: returns_df(A 股日历 244 天 × 3 列): 日期 ^GSPC ^HSI 000300.SH 2024-01-01 0.0000 0.0000 0.0000 2024-01-02 0.0021 0.0029 0.0029 2024-01-03 0.0021 0.0029 0.0028 2024-01-04 -0.0010 -0.0018 -0.0014 ... 2024-01-30 0.0000 0.0000 0.0028 ← 美股/港股休市(0%),A 股正常 2024-01-31 0.0021 0.0025 0.0030 # ✅ 验证:无 NaN assert not returns_df.isna().any().any() ``` **CrossMarketAligner 的作用**: ```python def align_multi_asset(self, close_dict): returns_dict = {} for code, close_series in close_dict.items(): try: # 对每个标的调用 align_returns returns_dict[code] = self.align_returns(close_series, code) except Exception as e: # 如果失败,填充全 0 warnings.warn(f"{code}: 收益率对齐失败 - {e}") returns_dict[code] = pd.Series(0.0, index=self.target_calendar) returns_df = pd.DataFrame(returns_dict) # 最终验证:不能有 NaN if returns_df.isna().any().any(): raise ValueError("收益率 DataFrame 包含 NaN,这不应该发生") return returns_df ``` --- ### 阶段 6:验证信号与收益率对齐 ```python # 验证 aligned_signals, aligned_returns = aligner.validate_alignment( signals, returns_df ) # 内部逻辑: def validate_alignment(self, signals, returns_df): # 1. 找共同日期 common_dates = signals.index.intersection(returns_df.index) # 2. 检查丢失的日期 lost_signals = len(signals) - len(common_dates) lost_returns = len(returns_df) - len(common_dates) if lost_signals > 0 or lost_returns > 0: warnings.warn( f"信号与收益率对齐丢失日期\n" f"信号: {len(signals)} → {len(common_dates)} (丢失 {lost_signals})\n" f"收益: {len(returns_df)} → {len(common_dates)} (丢失 {lost_returns})" ) # 3. 检查对齐后日期是否太少 if len(common_dates) < 10: raise ValueError(f"对齐后日期太少: {len(common_dates)} 天") # 4. 裁剪到共同日期 aligned_signals = signals.loc[common_dates] aligned_returns = returns_df.loc[common_dates] return aligned_signals, aligned_returns ``` --- ### 阶段 7:计算组合收益 ```python from framework_v2.execution import BacktestExecutor executor = BacktestExecutor( initial_capital=100000, trade_cost=0.001, # 0.1% 交易成本 select_num=3 ) # 执行回测 portfolio = executor.execute(aligned_signals, aligned_returns) # 内部逻辑: def execute(self, signals, returns_df): nav = [self.initial_capital] # 初始净值 for date, row in signals.iterrows(): signal = row['signal'] # '^GSPC,^HSI,000300.SH' if not signal or signal == '': # 空信号,持有现金 daily_return = 0.0 else: # 解析信号 codes = signal.split(',') # 获取当日收益率 daily_returns = [ returns_df.loc[date, code] for code in codes ] # 等权平均 daily_return = np.mean(daily_returns) # 扣除交易成本 daily_return -= self.trade_cost # 更新净值 new_nav = nav[-1] * (1 + daily_return) nav.append(new_nav) return pd.DataFrame({ 'nav': nav[1:], 'daily_return': ... }) ``` **示例计算**: ``` 日期 signal 收益率计算 净值 2024-01-29 ^GSPC,^HSI,000300.SH (0.16+0.15+0.14)/3-0.001 100,000 → 100,048 2024-01-30 ^GSPC,^HSI,000300.SH (0.00+0.00+0.28)/3-0.001 100,048 → 100,047 ↑ 美股/港股休市(0%) 2024-01-31 ^GSPC,000300.SH (0.21+0.30)/2-0.001 100,047 → 100,072 ↑ 只持有 2 只标的 ``` --- ## 📊 完整数据流图 ``` 原始 OHLCV(不同日历) │ ├─ ^GSPC (252 天,美股日历) ├─ ^HSI (250 天,港股日历) └─ 000300.SH (244 天,A 股日历) │ ├──────────────────────────────────────────────┐ │ 阶段 1:因子计算(原始日历) │ │ factor.compute(df) │ │ → 在各自日历计算 rolling(25) │ │ → 使用真实交易日(25 天) │ └──────────────────────────────────────────────┘ │ ├─ ^GSPC 因子 (252 天,美股日历) ├─ ^HSI 因子 (250 天,港股日历) └─ 000300.SH 因子 (244 天,A 股日历) │ ├──────────────────────────────────────────────┐ │ 阶段 2:CrossMarketAligner.align_factor() │ │ → reindex(a_share_dates, method='ffill') │ │ → 标记 is_filled(哪些是填充值) │ │ → 验证 NaN 比例(> 10% 警告) │ │ → 验证填充比例(> 30% 警告) │ └──────────────────────────────────────────────┘ │ ├─ ^GSPC 因子 (244 天,A 股日历) ├─ ^HSI 因子 (244 天,A 股日历) └─ 000300.SH 因子 (244 天,A 股日历) │ ┌─ 合并为 factor_df (244 天 × 3 列) │ ├──────────────────────────────────────────────┐ │ 阶段 3:信号生成 │ │ selector.generate(factor_df) │ │ → Top 3 选股(跳过 NaN) │ │ → 调仓控制(每 N 天调仓) │ └──────────────────────────────────────────────┘ │ └─ signals DataFrame (244 天) 列:signal(逗号分隔的标的代码) │ ├──────────────────────────────────────────────┐ │ 阶段 4:CrossMarketAligner.align_returns() │ │ → close.reindex(a_share_dates, ffill) │ │ → pct_change(fill_method=None) │ │ → 休市日收益率 = 0%(价格不变) │ │ → 验证 NaN 比例(> 10% 报错) │ │ → 验证异常值(> 50% 警告) │ │ → 验证索引一致性 │ └──────────────────────────────────────────────┘ │ ├─ ^GSPC 收益率 (244 天,A 股日历) ├─ ^HSI 收益率 (244 天,A 股日历) └─ 000300.SH 收益率 (244 天,A 股日历) │ ┌─ 合并为 returns_df (244 天 × 3 列) │ ├──────────────────────────────────────────────┐ │ 阶段 5:CrossMarketAligner.validate_alignment()│ │ → intersection(共同日期) │ │ → 裁剪到共同日期 │ │ → 验证日期一致性(丢失 > 0 警告) │ │ → 验证最小日期数(< 10 报错) │ └──────────────────────────────────────────────┘ │ ├─ aligned_signals (N 天,N ≤ 244) └─ aligned_returns (N 天) │ ├──────────────────────────────────────────────┐ │ 阶段 6:执行回测 │ │ executor.execute(signals, returns) │ │ → 解析信号(逗号分隔 → 列表) │ │ → 等权组合(np.mean) │ │ → 扣除交易成本 │ │ → 计算净值曲线 │ └──────────────────────────────────────────────┘ │ └─ 最终结果 ├─ 净值曲线(DataFrame) ├─ 日收益率(Series) └─ 绩效指标(年化收益、夏普、最大回撤) ``` --- ## 🎯 CrossMarketAligner 的核心价值 ### 解决的问题 | 问题 | 严重度 | 表现 | Aligner 的解决方案 | |------|--------|------|-------------------| | **跨市场日历不同** | 🔴 严重 | 因子/收益无法直接合并 | align_factor() reindex + ffill | | **ffill 收益率陷阱** | 🔴 严重 | 休市日复制非零收益率 | align_returns() 先对齐价格 | | **NaN 传播** | 🔴 严重 | 组合收益变 NaN | fillna(0.0) + 严格验证 | | **信号与收益不对齐** | 🟡 中等 | 回测丢失日期 | validate_alignment() 裁剪 | | **异常值未检测** | 🟡 中等 | 单日涨跌 > 50% | max_return 验证 | | **填充值未知** | 🟢 轻微 | 无法评估数据质量 | is_filled 标记 | ### 关键设计决策 #### 1. 为什么因子在原始日历计算? ```python # ✅ 正确:在原始日历计算 factor = close.rolling(25).apply(weighted_momentum) # 25 个真实交易日 aligned = factor.reindex(a_share_dates, method='ffill') # ❌ 错误:先对齐再计算 close_aligned = close.reindex(a_share_dates, method='ffill') factor = close_aligned.rolling(25).apply(...) # 包含 2-3 个重复值! ``` **原因**: - rolling window 需要**真实交易日**(25 天) - ffill 会引入重复值,影响线性回归权重 - 对齐的是**因子值**,不是价格 #### 2. 为什么收益率要先对齐价格? ```python # ✅ 正确:先对齐价格,再计算收益率 close_aligned = close.reindex(a_share_dates, method='ffill') returns = close_aligned.pct_change() # 休市日 = 0% # ❌ 错误:先计算收益率,再对齐 returns = close.pct_change() returns_aligned = returns.reindex(a_share_dates, method='ffill') # 复制非零收益率! ``` **原因**: - 休市日价格不变 → 收益率 = 0% - 如果先计算收益率,ffill 会复制前一天的非零收益率 - 导致净值高估/低估 #### 3. 为什么标记 is_filled? ```python aligned = aligner.align_factor(...) # aligned['is_filled'] = True/False ``` **用途**: - 分析哪些因子值是"真实计算"的 - 哪些是"ffill 填充"的 - 可以统计填充比例,评估数据质量 - 后续可用于加权(真实值权重更高) --- ## 📈 验证测试 ### 测试覆盖 ``` ✓ 测试 1: 因子对齐 - 填充值正确标记 ✓ 测试 2: 收益率对齐 - 休市日收益率 = 0% ✓ 测试 3: 多标的对齐 - 无 NaN,索引一致 ✓ 测试 4: 信号与收益对齐 - 日期裁剪验证 ✓ 测试 5: ffill 陷阱对比 - 错误 vs 正确做法 总计: 5/5 通过 ``` ### 关键验证点 ```python # 1. 休市日收益率 = 0% assert returns['2024-01-30'] == 0.0 # 美股休市 # 2. 无 NaN assert not returns_df.isna().any().any() # 3. 索引一致 assert aligned_signals.index.equals(aligned_returns.index) # 4. 填充值标记 assert aligned.loc['2024-01-30', 'is_filled'] == True # 美股休市 # 5. NaN 比例 < 10% nan_ratio = factor_df.isna().sum() / len(factor_df) assert (nan_ratio < 0.1).all() ``` --- ## 🔧 使用示例 ### 完整代码 ```python from framework_v2.shared.data.alignment import CrossMarketAligner from framework_v2.shared.factors import MomentumFactor from framework_v2.shared.signals import TopNSelector from framework_v2.execution import BacktestExecutor # 1. 获取数据 index_data = { '^GSPC': df_sp500, # 美股日历 '^HSI': df_hsi, # 港股日历 '000300.SH': df_hs300 # A 股日历 } a_share_dates = pd.date_range('2020-01-01', '2024-01-01', freq='B') # 示例 # 2. 创建对齐器 aligner = CrossMarketAligner(target_calendar=a_share_dates) # 3. 计算因子(原始日历) factor = MomentumFactor(n_days=25, weighted=True, crash_filter=True) factor_raw = { code: factor.compute(df) for code, df in index_data.items() } # 4. 对齐因子到 A 股日历 factor_aligned = { code: aligner.align_factor( factor_raw[code], source_calendar=index_data[code].index, code=code )['value'] for code in index_data.keys() } # 5. 生成信号 factor_df = pd.DataFrame(factor_aligned) selector = TopNSelector(select_num=3, min_score=0.0) signals = selector.generate(factor_df) # 6. 对齐收益率到 A 股日历 returns_df = aligner.align_multi_asset({ code: df['close'] for code, df in index_data.items() }) # 7. 验证信号与收益率对齐 aligned_signals, aligned_returns = aligner.validate_alignment( signals, returns_df ) # 8. 执行回测 executor = BacktestExecutor( initial_capital=100000, trade_cost=0.001, select_num=3 ) portfolio = executor.execute(aligned_signals, aligned_returns) # 9. 查看结果 print(f"最终净值: {portfolio['nav'].iloc[-1]:,.2f}") print(f"年化收益: {portfolio['annual_return']:.2%}") print(f"夏普比率: {portfolio['sharpe_ratio']:.2f}") print(f"最大回撤: {portfolio['max_drawdown']:.2%}") ``` --- ## 📝 注意事项 1. **填充值标记**:`is_filled` 列标记哪些是 ffill 填充的,可用于后续分析 2. **NaN 处理**:收益率对齐后自动填充为 0(表示"无数据,收益率为 0") 3. **异常检测**:单日收益率 > 50% 会发出警告 4. **索引验证**:对齐后严格验证索引是否匹配目标日历 5. **统计信息**:通过 `aligner.get_stats()` 获取对齐统计 6. **性能优化**:避免在循环中多次 reindex,批量处理更高效 --- ## 🔗 相关文档 - **[数据架构方案](DATA_ARCHITECTURE.md)** - 完整的数据架构设计(Schema、验证、性能优化) - **[跨市场对齐方案](ALIGNMENT_GUIDE.md)** - CrossMarketAligner 使用指南 - **[框架 V2 README](README.md)** - 框架总览 --- *创建日期: 2026-05-06* *版本: 1.0.0*