Files
etf/framework_v2/strategies/rotation/rotation.py
aszerW 6749f8cf61 feat(v2): GlobalRotationStrategy 使用 CrossMarketAligner 进行数据对齐
核心改进:
- 替换手动对齐逻辑为框架标准的 CrossMarketAligner
- 修复收益率计算顺序:先对齐价格 → 再计算收益率
- 修复首日收益率 NaN 问题(填充为 0%)
- 添加 Pydantic Schema 验证(数据质量保证)

对齐逻辑变更:
修复前(错误):
  1. returns = close_df.pct_change()  # 在原始日历计算
  2. returns = returns[returns.index.isin(trading_calendar)]  # 过滤

修复后(正确):
  1. aligner = CrossMarketAligner(target_calendar)
  2. returns_df = aligner.align_multi_asset(close_dict)
     - 内部:先 ffill 价格到 A 股日历
     - 内部:再计算收益率(休市日 = 0%)
     - 内部:填充首日 NaN 为 0%
     - 内部:Pydantic Schema 验证

回测验证(2020-01-10 ~ 2026-05-22):
- 修复前:总收益 135.63%,年化 15.07%,夏普 1.15
- 修复后:总收益 137.88%,年化 15.25%,夏普 1.16
- 收益提升:+2.25%(修复首日 NaN 和跨日收益率问题)

关键修复:
1. 首日收益率从 NaN 改为 0%(避免收益丢失)
2. 休市日收益率正确 = 0%(价格 ffill 后不变)
3. 消除跨多日收益率被当作单日收益率的 bug
4. 统一使用框架标准组件(CrossMarketAligner)
2026-05-25 00:29:49 +08:00

462 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
全球资产大类轮动策略V2 正式版)
基于动量因子的全球资产轮动策略
- 支持信号-交易分离(指数信号 → ETF收益
- 强制分散化选股(每个 group 只选 1 个)
- 动态短债阈值(标的动量 < 短债动量 → 不持有)
- 溢价过滤(避免买入高溢价 ETF
- 调仓控制rebalance_days + rebalance_threshold
- 交易成本计算trade_cost: 0.1%
"""
import pandas as pd
import numpy as np
from typing import Dict, Optional, Tuple
from datetime import datetime, timedelta
from framework_v2.core.strategy import StrategyBase
from framework_v2.config.schemas import StrategyConfig
from framework_v2.shared.factors import MomentumFactor
from framework_v2.shared.data.alignment import CrossMarketAligner
class GlobalRotationStrategy(StrategyBase):
"""
全球资产大类轮动策略V2 正式版)
策略逻辑:
1. 计算各指数标的动量得分(加权线性回归)
2. 使用动态短债阈值过滤负动量标的
3. 每个 group 内竞争,只选 Top 1强制分散化
4. 溢价过滤:排除溢价率 > 阈值的 ETF
5. 调仓控制:最低持仓天数 + 调仓阈值
6. 等权分配仓位
7. 扣除交易成本0.1%
示例:
from framework_v2.config import load_config
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
config = load_config('config_simple.yaml')
strategy = GlobalRotationStrategy(config)
result = strategy.run()
"""
def __init__(self, config: StrategyConfig):
"""
初始化策略
Args:
config: 策略配置
"""
super().__init__(config)
# 初始化动量因子
self.momentum = MomentumFactor(
n_days=config.factor.n_days,
weighted=(config.factor.type.value == 'weighted_momentum')
)
# 策略参数(从 config 中读取)
rotation_config = config.rotation
self.select_num = rotation_config.select_num if rotation_config else 3
self.diversified = rotation_config.diversified if rotation_config else True
# 动态阈值配置
self.use_dynamic_threshold = False
self.bond_code = None
self.bond_ratio = 1.0
self.fill_bond = True
if rotation_config and rotation_config.threshold:
threshold_config = rotation_config.threshold
if hasattr(threshold_config, 'mode') and threshold_config.mode == 'dynamic':
self.use_dynamic_threshold = True
dynamic_config = threshold_config.dynamic
self.bond_code = dynamic_config.reference
self.bond_ratio = dynamic_config.ratio
# 调仓控制
self.rebalance_days = getattr(rotation_config, 'rebalance_days', 1) if rotation_config else 1
self.rebalance_threshold = getattr(rotation_config, 'rebalance_threshold', 0.0) if rotation_config else 0.0
# 交易成本
self.trade_cost = getattr(config.backtest, 'trade_cost', 0.001) if config.backtest else 0.001
# 溢价控制
self.use_premium_control = False
self.premium_threshold = 0.10 # 默认 10%
if hasattr(config, 'premium_control'):
premium_config = config.premium_control
self.use_premium_control = getattr(premium_config, 'enabled', False)
if self.use_premium_control:
self.premium_threshold = getattr(premium_config, 'default_threshold', 0.10)
def get_codes(self) -> list:
"""
获取标的列表(信号标的 + 交易标的 + 短债)
Returns:
标的代码列表
"""
codes = set()
# 添加所有信号标的
codes.update(self.config.asset_pools.get_signal_codes())
# 添加所有交易标的
codes.update(self.config.asset_pools.get_trade_codes())
# 如果使用动态阈值,添加短债标的
if self.use_dynamic_threshold and self.bond_code:
codes.add(self.bond_code)
return list(codes)
def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]:
"""
计算动量因子(只使用信号标的的数据)
Args:
data: 数据字典 {code: DataFrame}
Returns:
因子字典 {signal_source: Series}
"""
factors = {}
# 只使用信号标的计算因子
signal_codes = self.config.asset_pools.get_signal_codes()
for code in signal_codes:
if code not in data:
print(f" 警告: {code} 数据不存在,跳过")
continue
try:
df = data[code]
factor_values = self.momentum.compute(df)
factors[code] = factor_values
except Exception as e:
print(f" 警告: {code} 因子计算失败 - {e}")
continue
# 如果使用动态阈值,计算短债因子
if self.use_dynamic_threshold and self.bond_code and self.bond_code in data:
try:
df = data[self.bond_code]
bond_factor = self.momentum.compute(df)
factors[self.bond_code] = bond_factor
print(f" [阈值] 短债动量因子已计算: {self.bond_code}")
except Exception as e:
print(f" 警告: 短债因子计算失败 - {e}")
return factors
def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame:
"""
生成轮动信号(支持动态阈值和强制分散化)
逻辑:
1. 计算动态短债阈值(如果使用)
2. 每个 group 内竞争,选 Top 1
3. 溢价过滤(如果启用)
4. 组合所有 group 的选股结果
Args:
factors: 因子字典 {code: Series}
Returns:
信号 DataFrameindex=日期, columns=signal_source, values=1或0
"""
if not factors:
return pd.DataFrame()
# 对齐所有因子的日期
factor_df = pd.DataFrame(factors)
# 获取动态短债阈值(如果使用)
bond_threshold = None
if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors:
bond_threshold = factors[self.bond_code]
print(f" [阈值] 使用动态短债阈值: {self.bond_code}")
# 获取溢价率数据(如果启用溢价控制)
premium_data = None
if self.use_premium_control:
premium_data = self._get_premium_data()
print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}")
# 按 group 分组选股
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
groups = self.config.asset_pools.by_group
for date in factor_df.index:
selected_codes = []
# 对每个 group 独立选股
for group_name, assets in groups.items():
# 获取该 group 的信号标的
group_signal_codes = [asset.signal_source for asset in assets.values()]
# 获取当日因子值
date_factors = factor_df.loc[date][group_signal_codes].dropna()
if date_factors.empty:
continue
# 应用动态阈值过滤
if bond_threshold is not None and date in bond_threshold.index:
threshold_value = bond_threshold.loc[date] * self.bond_ratio
date_factors = date_factors[date_factors >= threshold_value]
if date_factors.empty:
continue
# 应用溢价过滤
if premium_data is not None:
date_factors = self._filter_by_premium(
date_factors, date, premium_data
)
if date_factors.empty:
continue
# 选择 Top 1强制分散化
top_code = date_factors.idxmax()
selected_codes.append(top_code)
# 标记信号
if selected_codes:
signals.loc[date, selected_codes] = 1
return signals.astype(int)
def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame:
"""
仓位管理(等权分配 + 调仓控制)
Args:
signals: 信号 DataFrame
Returns:
仓位 DataFrame
"""
positions = signals.astype(float).copy()
# 跟踪上次调仓日期
last_rebalance_date = None
for date in positions.index:
signal_row = positions.loc[date].copy()
n_selected = signal_row.sum()
if n_selected == 0:
# 空仓
positions.loc[date] = 0
continue
# 检查是否需要调仓
if last_rebalance_date is not None:
# 检查持仓天数
holding_days = (date - last_rebalance_date).days
if holding_days < self.rebalance_days:
# 未达到最低持仓天数,保持上次仓位
positions.loc[date] = positions.loc[last_rebalance_date]
continue
# 等权分配
positions.loc[date] = signal_row / n_selected
last_rebalance_date = date
return positions
def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
"""
执行回测(使用 CrossMarketAligner 进行正确的数据对齐)
Args:
positions: 仓位 DataFrame
data: 数据字典
Returns:
回测结果字典
"""
# 获取信号→交易映射
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
# 获取 A 股交易日历
print("\n [对齐] 获取 A 股交易日历...")
trading_calendar = self._get_trading_calendar()
print(f" [日历] A 股交易日: {len(trading_calendar)} 天 ({trading_calendar[0]} ~ {trading_calendar[-1]})")
# 创建对齐器
aligner = CrossMarketAligner(target_calendar=trading_calendar)
# 提取交易标的的收盘价,并对齐到 A 股日历
print(" [对齐] 对齐 ETF 价格到 A 股日历...")
close_dict = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in data:
# 提取收盘价
close_series = data[trade_code]['close']
# 使用 signal_code 作为键(与 positions 列名一致)
close_dict[signal_code] = close_series
else:
print(f" 警告: {trade_code} 数据不存在,跳过")
# 使用 CrossMarketAligner 对齐多标的收益率
# 内部逻辑:先 ffill 价格到 A 股日历,再计算收益率
print(" [对齐] 计算收益率(先对齐价格,再计算)...")
returns_df = aligner.align_multi_asset(close_dict)
print(f" [对齐] 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的")
# 对齐 positions 到 A 股日历
positions = positions.reindex(trading_calendar, method='ffill')
# 计算策略收益仓位加权T+1 执行)
positions_delayed = positions.shift(1).fillna(0)
strategy_returns = (positions_delayed * returns_df).sum(axis=1)
# 扣除交易成本
strategy_returns, rebalance_count = self._apply_trade_cost(
strategy_returns, positions
)
print(f" [成本] 调仓次数: {rebalance_count}, 交易成本: {self.trade_cost:.2%}")
# 计算净值曲线
equity_curve = (1 + strategy_returns).cumprod()
# 检查是否有数据
if len(equity_curve) == 0:
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': 0,
'annual_return': 0,
'max_drawdown': 0,
'sharpe_ratio': 0,
'n_days': 0,
'rebalance_count': 0,
}
}
# 计算绩效指标
total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1
n_days = len(strategy_returns)
annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
# 最大回撤
cumulative_max = equity_curve.cummax()
drawdown = (equity_curve - cumulative_max) / cumulative_max
max_drawdown = drawdown.min()
# 夏普比率
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'n_days': n_days,
'rebalance_count': rebalance_count,
}
}
def _apply_trade_cost(self, strategy_returns: pd.Series, positions: pd.DataFrame) -> Tuple[pd.Series, int]:
"""
扣除交易成本
Args:
strategy_returns: 策略收益率
positions: 仓位 DataFrame
Returns:
(扣除成本后的收益率, 调仓次数)
"""
if self.trade_cost <= 0:
return strategy_returns, 0
# 检测调仓(持仓变化)
position_changes = (positions != positions.shift(1)).any(axis=1)
rebalance_count = position_changes.sum()
# 扣除交易成本
strategy_returns[position_changes] -= self.trade_cost
return strategy_returns, rebalance_count
def _get_premium_data(self) -> Optional[Dict]:
"""
获取溢价率数据
Returns:
溢价率数据字典 {trade_code: {date: premium_rate}}
"""
# TODO: 从数据源获取溢价率数据
# 当前返回 None后续实现
return None
def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series:
"""
溢价过滤
Args:
factors: 因子 Series
date: 日期
premium_data: 溢价率数据
Returns:
过滤后的因子 Series
"""
if premium_data is None:
return factors
# TODO: 实现溢价过滤逻辑
return factors
def _get_trading_calendar(self) -> pd.DatetimeIndex:
"""
获取 A 股交易日历
Returns:
A 股交易日历 DatetimeIndex
"""
from datetime import date
# 获取回测区间
start = self.config.backtest.start_date
end = self.config.backtest.end_date
if end is None:
end = date.today().strftime('%Y-%m-%d')
# 创建临时数据获取器来获取交易日历
if self._data_fetcher is None:
self._data_fetcher = self._create_data_fetcher()
try:
# 调用 get_trading_calendar 方法
calendar = self._data_fetcher.get_trading_calendar(
market='A',
start=start,
end=end
)
print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})")
return calendar
except Exception as e:
print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}")
# 降级方案:使用 pandas 生成工作日
start_dt = pd.Timestamp(start)
end_dt = pd.Timestamp(end)
return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日