diff --git a/framework_v2/strategies/rotation/rotation.py b/framework_v2/strategies/rotation/rotation.py index 5bae48a..80e580b 100644 --- a/framework_v2/strategies/rotation/rotation.py +++ b/framework_v2/strategies/rotation/rotation.py @@ -18,6 +18,7 @@ from datetime import datetime, timedelta from framework_v2.core.strategy import StrategyBase from framework_v2.config.schemas import StrategyConfig from framework_v2.shared.factors import MomentumFactor +from framework_v2.shared.data.alignment import CrossMarketAligner class GlobalRotationStrategy(StrategyBase): @@ -275,7 +276,7 @@ class GlobalRotationStrategy(StrategyBase): def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]: """ - 执行回测(包含交易成本和调仓控制) + 执行回测(使用 CrossMarketAligner 进行正确的数据对齐) Args: positions: 仓位 DataFrame @@ -287,33 +288,38 @@ class GlobalRotationStrategy(StrategyBase): # 获取信号→交易映射 signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() - # 提取交易标的的收盘价 - close_prices = {} + # 获取 A 股交易日历 + print("\n [对齐] 获取 A 股交易日历...") + trading_calendar = self._get_trading_calendar() + print(f" [日历] A 股交易日: {len(trading_calendar)} 天 ({trading_calendar[0]} ~ {trading_calendar[-1]})") + + # 创建对齐器 + aligner = CrossMarketAligner(target_calendar=trading_calendar) + + # 提取交易标的的收盘价,并对齐到 A 股日历 + print(" [对齐] 对齐 ETF 价格到 A 股日历...") + close_dict = {} for signal_code, trade_code in signal_to_trade.items(): if trade_code in data: - close_prices[signal_code] = data[trade_code]['close'] + # 提取收盘价 + close_series = data[trade_code]['close'] + # 使用 signal_code 作为键(与 positions 列名一致) + close_dict[signal_code] = close_series else: print(f" 警告: {trade_code} 数据不存在,跳过") - close_df = pd.DataFrame(close_prices) + # 使用 CrossMarketAligner 对齐多标的收益率 + # 内部逻辑:先 ffill 价格到 A 股日历,再计算收益率 + print(" [对齐] 计算收益率(先对齐价格,再计算)...") + returns_df = aligner.align_multi_asset(close_dict) + print(f" [对齐] 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的") - # 计算收益率 - returns = close_df.pct_change() - - # 获取 A 股交易日历并过滤 - print("\n [过滤] 获取 A 股交易日历...") - trading_calendar = self._get_trading_calendar() - - # 过滤到 A 股交易日 - original_days = len(returns) - returns = returns[returns.index.isin(trading_calendar)] - positions = positions[positions.index.isin(trading_calendar)] - filtered_days = len(returns) - print(f" [过滤] 原始数据: {original_days} 天 -> A 股交易日: {filtered_days} 天 (过滤 {original_days - filtered_days} 天)") + # 对齐 positions 到 A 股日历 + positions = positions.reindex(trading_calendar, method='ffill') # 计算策略收益(仓位加权,T+1 执行) positions_delayed = positions.shift(1).fillna(0) - strategy_returns = (positions_delayed * returns).sum(axis=1) + strategy_returns = (positions_delayed * returns_df).sum(axis=1) # 扣除交易成本 strategy_returns, rebalance_count = self._apply_trade_cost(