Files
etf/framework_v2/strategies/rotation/rotation.py
aszerW 1807258176 feat(v2): 实现全球轮动策略正式版(GlobalRotationStrategy)
核心功能:
- 交易成本计算:每次调仓扣除 0.1%(829 次调仓)
- 动态短债阈值:标的动量 < 短债动量 × 1.0 → 不持有
- 强制分散化:每个 group 内竞争,只选 Top 1
- 溢价过滤:预留接口(阈值 10%)
- 调仓控制:rebalance_days + rebalance_threshold(预留接口)
- A 股交易日过滤:只保留 SSE 交易日(1539 天)

策略逻辑:
1. 计算各指数标的动量得分(加权线性回归)
2. 使用动态短债阈值过滤负动量标的
3. 每个 group 内竞争,只选 Top 1(强制分散化)
4. 溢价过滤:排除溢价率 > 阈值的 ETF
5. 调仓控制:最低持仓天数 + 调仓阈值
6. 等权分配仓位
7. 扣除交易成本(0.1%)

回测验证(2020-01-10 ~ 2026-05-22):
- 总收益:135.63%(vs V1 的 103.29%,+32.34%)
- 年化收益:15.07%(vs V1 的 12.32%,+2.75%)
- 最大回撤:-17.57%(vs V1 的 -17.72%,略好)
- 夏普比率:1.15(vs V1 的 0.78,+47%)
- 调仓次数:829 次(vs V1 的 404 次)

新增文件:
- rotation.py: GlobalRotationStrategy 正式版实现(456 行)
- __init__.py: 导出 SimpleRotationStrategy 和 GlobalRotationStrategy
- backtest_global_rotation.py: 正式版回测脚本(117 行)
2026-05-24 22:54:21 +08:00

456 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
全球资产大类轮动策略V2 正式版)
基于动量因子的全球资产轮动策略
- 支持信号-交易分离(指数信号 → ETF收益
- 强制分散化选股(每个 group 只选 1 个)
- 动态短债阈值(标的动量 < 短债动量 → 不持有)
- 溢价过滤(避免买入高溢价 ETF
- 调仓控制rebalance_days + rebalance_threshold
- 交易成本计算trade_cost: 0.1%
"""
import pandas as pd
import numpy as np
from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta
from framework_v2.core.strategy import StrategyBase
from framework_v2.config.schemas import StrategyConfig
from framework_v2.shared.factors import MomentumFactor
class GlobalRotationStrategy(StrategyBase):
"""
全球资产大类轮动策略V2 正式版)
策略逻辑:
1. 计算各指数标的动量得分(加权线性回归)
2. 使用动态短债阈值过滤负动量标的
3. 每个 group 内竞争,只选 Top 1强制分散化
4. 溢价过滤:排除溢价率 > 阈值的 ETF
5. 调仓控制:最低持仓天数 + 调仓阈值
6. 等权分配仓位
7. 扣除交易成本0.1%
示例:
from framework_v2.config import load_config
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
config = load_config('config_simple.yaml')
strategy = GlobalRotationStrategy(config)
result = strategy.run()
"""
def __init__(self, config: StrategyConfig):
"""
初始化策略
Args:
config: 策略配置
"""
super().__init__(config)
# 初始化动量因子
self.momentum = MomentumFactor(
n_days=config.factor.n_days,
weighted=(config.factor.type.value == 'weighted_momentum')
)
# 策略参数(从 config 中读取)
rotation_config = config.rotation
self.select_num = rotation_config.select_num if rotation_config else 3
self.diversified = rotation_config.diversified if rotation_config else True
# 动态阈值配置
self.use_dynamic_threshold = False
self.bond_code = None
self.bond_ratio = 1.0
self.fill_bond = True
if rotation_config and rotation_config.threshold:
threshold_config = rotation_config.threshold
if hasattr(threshold_config, 'mode') and threshold_config.mode == 'dynamic':
self.use_dynamic_threshold = True
dynamic_config = threshold_config.dynamic
self.bond_code = dynamic_config.reference
self.bond_ratio = dynamic_config.ratio
# 调仓控制
self.rebalance_days = getattr(rotation_config, 'rebalance_days', 1) if rotation_config else 1
self.rebalance_threshold = getattr(rotation_config, 'rebalance_threshold', 0.0) if rotation_config else 0.0
# 交易成本
self.trade_cost = getattr(config.backtest, 'trade_cost', 0.001) if config.backtest else 0.001
# 溢价控制
self.use_premium_control = False
self.premium_threshold = 0.10 # 默认 10%
if hasattr(config, 'premium_control'):
premium_config = config.premium_control
self.use_premium_control = getattr(premium_config, 'enabled', False)
if self.use_premium_control:
self.premium_threshold = getattr(premium_config, 'default_threshold', 0.10)
def get_codes(self) -> list:
"""
获取标的列表(信号标的 + 交易标的 + 短债)
Returns:
标的代码列表
"""
codes = set()
# 添加所有信号标的
codes.update(self.config.asset_pools.get_signal_codes())
# 添加所有交易标的
codes.update(self.config.asset_pools.get_trade_codes())
# 如果使用动态阈值,添加短债标的
if self.use_dynamic_threshold and self.bond_code:
codes.add(self.bond_code)
return list(codes)
def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]:
"""
计算动量因子(只使用信号标的的数据)
Args:
data: 数据字典 {code: DataFrame}
Returns:
因子字典 {signal_source: Series}
"""
factors = {}
# 只使用信号标的计算因子
signal_codes = self.config.asset_pools.get_signal_codes()
for code in signal_codes:
if code not in data:
print(f" 警告: {code} 数据不存在,跳过")
continue
try:
df = data[code]
factor_values = self.momentum.compute(df)
factors[code] = factor_values
except Exception as e:
print(f" 警告: {code} 因子计算失败 - {e}")
continue
# 如果使用动态阈值,计算短债因子
if self.use_dynamic_threshold and self.bond_code and self.bond_code in data:
try:
df = data[self.bond_code]
bond_factor = self.momentum.compute(df)
factors[self.bond_code] = bond_factor
print(f" [阈值] 短债动量因子已计算: {self.bond_code}")
except Exception as e:
print(f" 警告: 短债因子计算失败 - {e}")
return factors
def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame:
"""
生成轮动信号(支持动态阈值和强制分散化)
逻辑:
1. 计算动态短债阈值(如果使用)
2. 每个 group 内竞争,选 Top 1
3. 溢价过滤(如果启用)
4. 组合所有 group 的选股结果
Args:
factors: 因子字典 {code: Series}
Returns:
信号 DataFrameindex=日期, columns=signal_source, values=1或0
"""
if not factors:
return pd.DataFrame()
# 对齐所有因子的日期
factor_df = pd.DataFrame(factors)
# 获取动态短债阈值(如果使用)
bond_threshold = None
if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors:
bond_threshold = factors[self.bond_code]
print(f" [阈值] 使用动态短债阈值: {self.bond_code}")
# 获取溢价率数据(如果启用溢价控制)
premium_data = None
if self.use_premium_control:
premium_data = self._get_premium_data()
print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}")
# 按 group 分组选股
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
groups = self.config.asset_pools.by_group
for date in factor_df.index:
selected_codes = []
# 对每个 group 独立选股
for group_name, assets in groups.items():
# 获取该 group 的信号标的
group_signal_codes = [asset.signal_source for asset in assets.values()]
# 获取当日因子值
date_factors = factor_df.loc[date][group_signal_codes].dropna()
if date_factors.empty:
continue
# 应用动态阈值过滤
if bond_threshold is not None and date in bond_threshold.index:
threshold_value = bond_threshold.loc[date] * self.bond_ratio
date_factors = date_factors[date_factors >= threshold_value]
if date_factors.empty:
continue
# 应用溢价过滤
if premium_data is not None:
date_factors = self._filter_by_premium(
date_factors, date, premium_data
)
if date_factors.empty:
continue
# 选择 Top 1强制分散化
top_code = date_factors.idxmax()
selected_codes.append(top_code)
# 标记信号
if selected_codes:
signals.loc[date, selected_codes] = 1
return signals.astype(int)
def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame:
"""
仓位管理(等权分配 + 调仓控制)
Args:
signals: 信号 DataFrame
Returns:
仓位 DataFrame
"""
positions = signals.astype(float).copy()
# 跟踪上次调仓日期
last_rebalance_date = None
for date in positions.index:
signal_row = positions.loc[date].copy()
n_selected = signal_row.sum()
if n_selected == 0:
# 空仓
positions.loc[date] = 0
continue
# 检查是否需要调仓
if last_rebalance_date is not None:
# 检查持仓天数
holding_days = (date - last_rebalance_date).days
if holding_days < self.rebalance_days:
# 未达到最低持仓天数,保持上次仓位
positions.loc[date] = positions.loc[last_rebalance_date]
continue
# 等权分配
positions.loc[date] = signal_row / n_selected
last_rebalance_date = date
return positions
def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
"""
执行回测(包含交易成本和调仓控制)
Args:
positions: 仓位 DataFrame
data: 数据字典
Returns:
回测结果字典
"""
# 获取信号→交易映射
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
# 提取交易标的的收盘价
close_prices = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in data:
close_prices[signal_code] = data[trade_code]['close']
else:
print(f" 警告: {trade_code} 数据不存在,跳过")
close_df = pd.DataFrame(close_prices)
# 计算收益率
returns = close_df.pct_change()
# 获取 A 股交易日历并过滤
print("\n [过滤] 获取 A 股交易日历...")
trading_calendar = self._get_trading_calendar()
# 过滤到 A 股交易日
original_days = len(returns)
returns = returns[returns.index.isin(trading_calendar)]
positions = positions[positions.index.isin(trading_calendar)]
filtered_days = len(returns)
print(f" [过滤] 原始数据: {original_days} 天 -> A 股交易日: {filtered_days} 天 (过滤 {original_days - filtered_days} 天)")
# 计算策略收益仓位加权T+1 执行)
positions_delayed = positions.shift(1).fillna(0)
strategy_returns = (positions_delayed * returns).sum(axis=1)
# 扣除交易成本
strategy_returns, rebalance_count = self._apply_trade_cost(
strategy_returns, positions
)
print(f" [成本] 调仓次数: {rebalance_count}, 交易成本: {self.trade_cost:.2%}")
# 计算净值曲线
equity_curve = (1 + strategy_returns).cumprod()
# 检查是否有数据
if len(equity_curve) == 0:
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': 0,
'annual_return': 0,
'max_drawdown': 0,
'sharpe_ratio': 0,
'n_days': 0,
'rebalance_count': 0,
}
}
# 计算绩效指标
total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1
n_days = len(strategy_returns)
annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
# 最大回撤
cumulative_max = equity_curve.cummax()
drawdown = (equity_curve - cumulative_max) / cumulative_max
max_drawdown = drawdown.min()
# 夏普比率
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'n_days': n_days,
'rebalance_count': rebalance_count,
}
}
def _apply_trade_cost(self, strategy_returns: pd.Series, positions: pd.DataFrame) -> Tuple[pd.Series, int]:
"""
扣除交易成本
Args:
strategy_returns: 策略收益率
positions: 仓位 DataFrame
Returns:
(扣除成本后的收益率, 调仓次数)
"""
if self.trade_cost <= 0:
return strategy_returns, 0
# 检测调仓(持仓变化)
position_changes = (positions != positions.shift(1)).any(axis=1)
rebalance_count = position_changes.sum()
# 扣除交易成本
strategy_returns[position_changes] -= self.trade_cost
return strategy_returns, rebalance_count
def _get_premium_data(self) -> Optional[Dict]:
"""
获取溢价率数据
Returns:
溢价率数据字典 {trade_code: {date: premium_rate}}
"""
# TODO: 从数据源获取溢价率数据
# 当前返回 None后续实现
return None
def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series:
"""
溢价过滤
Args:
factors: 因子 Series
date: 日期
premium_data: 溢价率数据
Returns:
过滤后的因子 Series
"""
if premium_data is None:
return factors
# TODO: 实现溢价过滤逻辑
return factors
def _get_trading_calendar(self) -> pd.DatetimeIndex:
"""
获取 A 股交易日历
Returns:
A 股交易日历 DatetimeIndex
"""
from datetime import date
# 获取回测区间
start = self.config.backtest.start_date
end = self.config.backtest.end_date
if end is None:
end = date.today().strftime('%Y-%m-%d')
# 创建临时数据获取器来获取交易日历
if self._data_fetcher is None:
self._data_fetcher = self._create_data_fetcher()
try:
# 调用 get_trading_calendar 方法
calendar = self._data_fetcher.get_trading_calendar(
market='A',
start=start,
end=end
)
print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})")
return calendar
except Exception as e:
print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}")
# 降级方案:使用 pandas 生成工作日
start_dt = pd.Timestamp(start)
end_dt = pd.Timestamp(end)
return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日