From 1807258176df5f9380399b59a205318098aad7dd Mon Sep 17 00:00:00 2001 From: aszerW Date: Sun, 24 May 2026 22:54:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(v2):=20=E5=AE=9E=E7=8E=B0=E5=85=A8?= =?UTF-8?q?=E7=90=83=E8=BD=AE=E5=8A=A8=E7=AD=96=E7=95=A5=E6=AD=A3=E5=BC=8F?= =?UTF-8?q?=E7=89=88=EF=BC=88GlobalRotationStrategy=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 核心功能: - 交易成本计算:每次调仓扣除 0.1%(829 次调仓) - 动态短债阈值:标的动量 < 短债动量 × 1.0 → 不持有 - 强制分散化:每个 group 内竞争,只选 Top 1 - 溢价过滤:预留接口(阈值 10%) - 调仓控制:rebalance_days + rebalance_threshold(预留接口) - A 股交易日过滤:只保留 SSE 交易日(1539 天) 策略逻辑: 1. 计算各指数标的动量得分(加权线性回归) 2. 使用动态短债阈值过滤负动量标的 3. 每个 group 内竞争,只选 Top 1(强制分散化) 4. 溢价过滤:排除溢价率 > 阈值的 ETF 5. 调仓控制:最低持仓天数 + 调仓阈值 6. 等权分配仓位 7. 扣除交易成本(0.1%) 回测验证(2020-01-10 ~ 2026-05-22): - 总收益:135.63%(vs V1 的 103.29%,+32.34%) - 年化收益:15.07%(vs V1 的 12.32%,+2.75%) - 最大回撤:-17.57%(vs V1 的 -17.72%,略好) - 夏普比率:1.15(vs V1 的 0.78,+47%) - 调仓次数:829 次(vs V1 的 404 次) 新增文件: - rotation.py: GlobalRotationStrategy 正式版实现(456 行) - __init__.py: 导出 SimpleRotationStrategy 和 GlobalRotationStrategy - backtest_global_rotation.py: 正式版回测脚本(117 行) --- .../scripts/backtest_global_rotation.py | 116 +++++ framework_v2/strategies/rotation/__init__.py | 8 + framework_v2/strategies/rotation/rotation.py | 455 ++++++++++++++++++ 3 files changed, 579 insertions(+) create mode 100644 framework_v2/scripts/backtest_global_rotation.py create mode 100644 framework_v2/strategies/rotation/__init__.py create mode 100644 framework_v2/strategies/rotation/rotation.py diff --git a/framework_v2/scripts/backtest_global_rotation.py b/framework_v2/scripts/backtest_global_rotation.py new file mode 100644 index 0000000..649195a --- /dev/null +++ b/framework_v2/scripts/backtest_global_rotation.py @@ -0,0 +1,116 @@ +""" +全球资产大类轮动策略回测脚本(V2 正式版) + +支持功能: +- 信号-交易分离(指数信号 → ETF收益) +- 强制分散化选股(每个 group 只选 1 个) +- 动态短债阈值(标的动量 < 短债动量 → 不持有) +- 溢价过滤(避免买入高溢价 ETF) +- 调仓控制(rebalance_days + rebalance_threshold) +- 交易成本计算(trade_cost: 0.1%) + +用法: + python framework_v2/scripts/backtest_global_rotation.py +""" + +import sys +from pathlib import Path + +# 添加项目根目录到 Python 路径 +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from framework_v2.config import load_config +from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy + + +def run_backtest(): + """运行回测""" + print("=" * 70) + print(" 全球资产大类轮动策略回测(V2 正式版)") + print(" 场景:指数信号 → ETF收益,完整功能") + print("=" * 70) + + # 加载配置 + config_file = project_root / "framework_v2" / "strategies" / "rotation" / "config_simple.yaml" + print(f"\n配置文件: {config_file}") + + config = load_config(str(config_file)) + + # 打印配置摘要 + print("\n" + "=" * 70) + print(" 配置摘要") + print("=" * 70) + print(f"策略名称: {config.metadata.strategy}") + print(f"回测区间: {config.backtest.start_date} ~ {config.backtest.end_date or '至今'}") + print(f"因子类型: {config.factor.type.value}") + print(f"动量窗口: {config.factor.n_days} 天") + print(f"选股数量: {config.rotation.select_num}") + print(f"强制分散: {config.rotation.diversified}") + + # 打印策略参数 + rotation_config = config.rotation + print(f"\n策略参数:") + print(f" 动态阈值: {'启用' if rotation_config and rotation_config.threshold and rotation_config.threshold.mode == 'dynamic' else '禁用'}") + print(f" 调仓控制: rebalance_days={getattr(rotation_config, 'rebalance_days', 1)}, threshold={getattr(rotation_config, 'rebalance_threshold', 0.0)}") + print(f" 交易成本: {getattr(config.backtest, 'trade_cost', 0.001):.2%}") + print(f" 溢价控制: {'启用' if hasattr(config, 'premium_control') and config.premium_control.enabled else '禁用'}") + + # 打印资产池 + print(f"\n资产池 ({config.asset_pools.count()} 个标的):") + groups = config.asset_pools.by_group + for group_name, assets in groups.items(): + print(f" [{group_name}] {len(assets)} 个标的:") + for code, asset in assets.items(): + print(f" {code}: {asset.name}") + print(f" 信号: {asset.signal_source}, 交易: {asset.trade_source}") + print(f" 跨市场: {'是' if asset.is_cross_market else '否'}") + + # 创建策略 + print("\n" + "=" * 70) + print(" 运行回测...") + print("=" * 70) + + strategy = GlobalRotationStrategy(config) + result = strategy.run() + + # 打印结果 + print("\n" + "=" * 70) + print(" 回测结果") + print("=" * 70) + + metrics = result['metrics'] + print(f"总收益: {metrics['total_return']:.2%}") + print(f"年化收益: {metrics['annual_return']:.2%}") + print(f"最大回撤: {metrics['max_drawdown']:.2%}") + print(f"夏普比率: {metrics['sharpe_ratio']:.2f}") + print(f"交易天数: {metrics['n_days']}") + print(f"调仓次数: {metrics['rebalance_count']}") + + # 打印净值曲线 + equity_curve = result['equity_curve'] + print(f"\n净值曲线:") + print(f" 起始净值: {equity_curve.iloc[0]:.4f}") + print(f" 结束净值: {equity_curve.iloc[-1]:.4f}") + print(f" 数据点数: {len(equity_curve)}") + + # 保存结果 + output_dir = project_root / "framework_v2" / "results" + output_dir.mkdir(exist_ok=True) + + # 保存净值曲线 + equity_curve.to_csv(output_dir / "global_rotation_equity.csv") + print(f"\n净值曲线已保存: {output_dir / 'global_rotation_equity.csv'}") + + # 保存持仓记录 + positions = result['positions'] + positions.to_csv(output_dir / "global_rotation_positions.csv") + print(f"持仓记录已保存: {output_dir / 'global_rotation_positions.csv'}") + + print("\n" + "=" * 70) + print(" 回测完成!") + print("=" * 70) + + +if __name__ == "__main__": + run_backtest() diff --git a/framework_v2/strategies/rotation/__init__.py b/framework_v2/strategies/rotation/__init__.py new file mode 100644 index 0000000..cf1dab2 --- /dev/null +++ b/framework_v2/strategies/rotation/__init__.py @@ -0,0 +1,8 @@ +""" +轮动策略模块 +""" + +from framework_v2.strategies.rotation.simple import SimpleRotationStrategy +from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy + +__all__ = ['SimpleRotationStrategy', 'GlobalRotationStrategy'] diff --git a/framework_v2/strategies/rotation/rotation.py b/framework_v2/strategies/rotation/rotation.py new file mode 100644 index 0000000..5bae48a --- /dev/null +++ b/framework_v2/strategies/rotation/rotation.py @@ -0,0 +1,455 @@ +""" +全球资产大类轮动策略(V2 正式版) + +基于动量因子的全球资产轮动策略 +- 支持信号-交易分离(指数信号 → ETF收益) +- 强制分散化选股(每个 group 只选 1 个) +- 动态短债阈值(标的动量 < 短债动量 → 不持有) +- 溢价过滤(避免买入高溢价 ETF) +- 调仓控制(rebalance_days + rebalance_threshold) +- 交易成本计算(trade_cost: 0.1%) +""" + +import pandas as pd +import numpy as np +from typing import Dict, Optional, Tuple +from datetime import datetime, timedelta + +from framework_v2.core.strategy import StrategyBase +from framework_v2.config.schemas import StrategyConfig +from framework_v2.shared.factors import MomentumFactor + + +class GlobalRotationStrategy(StrategyBase): + """ + 全球资产大类轮动策略(V2 正式版) + + 策略逻辑: + 1. 计算各指数标的动量得分(加权线性回归) + 2. 使用动态短债阈值过滤负动量标的 + 3. 每个 group 内竞争,只选 Top 1(强制分散化) + 4. 溢价过滤:排除溢价率 > 阈值的 ETF + 5. 调仓控制:最低持仓天数 + 调仓阈值 + 6. 等权分配仓位 + 7. 扣除交易成本(0.1%) + + 示例: + from framework_v2.config import load_config + from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy + + config = load_config('config_simple.yaml') + strategy = GlobalRotationStrategy(config) + result = strategy.run() + """ + + def __init__(self, config: StrategyConfig): + """ + 初始化策略 + + Args: + config: 策略配置 + """ + super().__init__(config) + + # 初始化动量因子 + self.momentum = MomentumFactor( + n_days=config.factor.n_days, + weighted=(config.factor.type.value == 'weighted_momentum') + ) + + # 策略参数(从 config 中读取) + rotation_config = config.rotation + self.select_num = rotation_config.select_num if rotation_config else 3 + self.diversified = rotation_config.diversified if rotation_config else True + + # 动态阈值配置 + self.use_dynamic_threshold = False + self.bond_code = None + self.bond_ratio = 1.0 + self.fill_bond = True + + if rotation_config and rotation_config.threshold: + threshold_config = rotation_config.threshold + if hasattr(threshold_config, 'mode') and threshold_config.mode == 'dynamic': + self.use_dynamic_threshold = True + dynamic_config = threshold_config.dynamic + self.bond_code = dynamic_config.reference + self.bond_ratio = dynamic_config.ratio + + # 调仓控制 + self.rebalance_days = getattr(rotation_config, 'rebalance_days', 1) if rotation_config else 1 + self.rebalance_threshold = getattr(rotation_config, 'rebalance_threshold', 0.0) if rotation_config else 0.0 + + # 交易成本 + self.trade_cost = getattr(config.backtest, 'trade_cost', 0.001) if config.backtest else 0.001 + + # 溢价控制 + self.use_premium_control = False + self.premium_threshold = 0.10 # 默认 10% + + if hasattr(config, 'premium_control'): + premium_config = config.premium_control + self.use_premium_control = getattr(premium_config, 'enabled', False) + if self.use_premium_control: + self.premium_threshold = getattr(premium_config, 'default_threshold', 0.10) + + def get_codes(self) -> list: + """ + 获取标的列表(信号标的 + 交易标的 + 短债) + + Returns: + 标的代码列表 + """ + codes = set() + + # 添加所有信号标的 + codes.update(self.config.asset_pools.get_signal_codes()) + + # 添加所有交易标的 + codes.update(self.config.asset_pools.get_trade_codes()) + + # 如果使用动态阈值,添加短债标的 + if self.use_dynamic_threshold and self.bond_code: + codes.add(self.bond_code) + + return list(codes) + + def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]: + """ + 计算动量因子(只使用信号标的的数据) + + Args: + data: 数据字典 {code: DataFrame} + + Returns: + 因子字典 {signal_source: Series} + """ + factors = {} + + # 只使用信号标的计算因子 + signal_codes = self.config.asset_pools.get_signal_codes() + + for code in signal_codes: + if code not in data: + print(f" 警告: {code} 数据不存在,跳过") + continue + + try: + df = data[code] + factor_values = self.momentum.compute(df) + factors[code] = factor_values + except Exception as e: + print(f" 警告: {code} 因子计算失败 - {e}") + continue + + # 如果使用动态阈值,计算短债因子 + if self.use_dynamic_threshold and self.bond_code and self.bond_code in data: + try: + df = data[self.bond_code] + bond_factor = self.momentum.compute(df) + factors[self.bond_code] = bond_factor + print(f" [阈值] 短债动量因子已计算: {self.bond_code}") + except Exception as e: + print(f" 警告: 短债因子计算失败 - {e}") + + return factors + + def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame: + """ + 生成轮动信号(支持动态阈值和强制分散化) + + 逻辑: + 1. 计算动态短债阈值(如果使用) + 2. 每个 group 内竞争,选 Top 1 + 3. 溢价过滤(如果启用) + 4. 组合所有 group 的选股结果 + + Args: + factors: 因子字典 {code: Series} + + Returns: + 信号 DataFrame(index=日期, columns=signal_source, values=1或0) + """ + if not factors: + return pd.DataFrame() + + # 对齐所有因子的日期 + factor_df = pd.DataFrame(factors) + + # 获取动态短债阈值(如果使用) + bond_threshold = None + if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors: + bond_threshold = factors[self.bond_code] + print(f" [阈值] 使用动态短债阈值: {self.bond_code}") + + # 获取溢价率数据(如果启用溢价控制) + premium_data = None + if self.use_premium_control: + premium_data = self._get_premium_data() + print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}") + + # 按 group 分组选股 + signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0) + + groups = self.config.asset_pools.by_group + + for date in factor_df.index: + selected_codes = [] + + # 对每个 group 独立选股 + for group_name, assets in groups.items(): + # 获取该 group 的信号标的 + group_signal_codes = [asset.signal_source for asset in assets.values()] + + # 获取当日因子值 + date_factors = factor_df.loc[date][group_signal_codes].dropna() + + if date_factors.empty: + continue + + # 应用动态阈值过滤 + if bond_threshold is not None and date in bond_threshold.index: + threshold_value = bond_threshold.loc[date] * self.bond_ratio + date_factors = date_factors[date_factors >= threshold_value] + + if date_factors.empty: + continue + + # 应用溢价过滤 + if premium_data is not None: + date_factors = self._filter_by_premium( + date_factors, date, premium_data + ) + + if date_factors.empty: + continue + + # 选择 Top 1(强制分散化) + top_code = date_factors.idxmax() + selected_codes.append(top_code) + + # 标记信号 + if selected_codes: + signals.loc[date, selected_codes] = 1 + + return signals.astype(int) + + def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame: + """ + 仓位管理(等权分配 + 调仓控制) + + Args: + signals: 信号 DataFrame + + Returns: + 仓位 DataFrame + """ + positions = signals.astype(float).copy() + + # 跟踪上次调仓日期 + last_rebalance_date = None + + for date in positions.index: + signal_row = positions.loc[date].copy() + n_selected = signal_row.sum() + + if n_selected == 0: + # 空仓 + positions.loc[date] = 0 + continue + + # 检查是否需要调仓 + if last_rebalance_date is not None: + # 检查持仓天数 + holding_days = (date - last_rebalance_date).days + if holding_days < self.rebalance_days: + # 未达到最低持仓天数,保持上次仓位 + positions.loc[date] = positions.loc[last_rebalance_date] + continue + + # 等权分配 + positions.loc[date] = signal_row / n_selected + last_rebalance_date = date + + return positions + + def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]: + """ + 执行回测(包含交易成本和调仓控制) + + Args: + positions: 仓位 DataFrame + data: 数据字典 + + Returns: + 回测结果字典 + """ + # 获取信号→交易映射 + signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() + + # 提取交易标的的收盘价 + close_prices = {} + for signal_code, trade_code in signal_to_trade.items(): + if trade_code in data: + close_prices[signal_code] = data[trade_code]['close'] + else: + print(f" 警告: {trade_code} 数据不存在,跳过") + + close_df = pd.DataFrame(close_prices) + + # 计算收益率 + returns = close_df.pct_change() + + # 获取 A 股交易日历并过滤 + print("\n [过滤] 获取 A 股交易日历...") + trading_calendar = self._get_trading_calendar() + + # 过滤到 A 股交易日 + original_days = len(returns) + returns = returns[returns.index.isin(trading_calendar)] + positions = positions[positions.index.isin(trading_calendar)] + filtered_days = len(returns) + print(f" [过滤] 原始数据: {original_days} 天 -> A 股交易日: {filtered_days} 天 (过滤 {original_days - filtered_days} 天)") + + # 计算策略收益(仓位加权,T+1 执行) + positions_delayed = positions.shift(1).fillna(0) + strategy_returns = (positions_delayed * returns).sum(axis=1) + + # 扣除交易成本 + strategy_returns, rebalance_count = self._apply_trade_cost( + strategy_returns, positions + ) + print(f" [成本] 调仓次数: {rebalance_count}, 交易成本: {self.trade_cost:.2%}") + + # 计算净值曲线 + equity_curve = (1 + strategy_returns).cumprod() + + # 检查是否有数据 + if len(equity_curve) == 0: + return { + 'equity_curve': equity_curve, + 'strategy_returns': strategy_returns, + 'positions': positions, + 'metrics': { + 'total_return': 0, + 'annual_return': 0, + 'max_drawdown': 0, + 'sharpe_ratio': 0, + 'n_days': 0, + 'rebalance_count': 0, + } + } + + # 计算绩效指标 + total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1 + n_days = len(strategy_returns) + annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0 + + # 最大回撤 + cumulative_max = equity_curve.cummax() + drawdown = (equity_curve - cumulative_max) / cumulative_max + max_drawdown = drawdown.min() + + # 夏普比率 + sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0 + + return { + 'equity_curve': equity_curve, + 'strategy_returns': strategy_returns, + 'positions': positions, + 'metrics': { + 'total_return': total_return, + 'annual_return': annual_return, + 'max_drawdown': max_drawdown, + 'sharpe_ratio': sharpe, + 'n_days': n_days, + 'rebalance_count': rebalance_count, + } + } + + def _apply_trade_cost(self, strategy_returns: pd.Series, positions: pd.DataFrame) -> Tuple[pd.Series, int]: + """ + 扣除交易成本 + + Args: + strategy_returns: 策略收益率 + positions: 仓位 DataFrame + + Returns: + (扣除成本后的收益率, 调仓次数) + """ + if self.trade_cost <= 0: + return strategy_returns, 0 + + # 检测调仓(持仓变化) + position_changes = (positions != positions.shift(1)).any(axis=1) + rebalance_count = position_changes.sum() + + # 扣除交易成本 + strategy_returns[position_changes] -= self.trade_cost + + return strategy_returns, rebalance_count + + def _get_premium_data(self) -> Optional[Dict]: + """ + 获取溢价率数据 + + Returns: + 溢价率数据字典 {trade_code: {date: premium_rate}} + """ + # TODO: 从数据源获取溢价率数据 + # 当前返回 None,后续实现 + return None + + def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series: + """ + 溢价过滤 + + Args: + factors: 因子 Series + date: 日期 + premium_data: 溢价率数据 + + Returns: + 过滤后的因子 Series + """ + if premium_data is None: + return factors + + # TODO: 实现溢价过滤逻辑 + return factors + + def _get_trading_calendar(self) -> pd.DatetimeIndex: + """ + 获取 A 股交易日历 + + Returns: + A 股交易日历 DatetimeIndex + """ + from datetime import date + + # 获取回测区间 + start = self.config.backtest.start_date + end = self.config.backtest.end_date + if end is None: + end = date.today().strftime('%Y-%m-%d') + + # 创建临时数据获取器来获取交易日历 + if self._data_fetcher is None: + self._data_fetcher = self._create_data_fetcher() + + try: + # 调用 get_trading_calendar 方法 + calendar = self._data_fetcher.get_trading_calendar( + market='A', + start=start, + end=end + ) + print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})") + return calendar + except Exception as e: + print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}") + # 降级方案:使用 pandas 生成工作日 + start_dt = pd.Timestamp(start) + end_dt = pd.Timestamp(end) + return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日