- 修复分散化选股逻辑:每个 group 选 Top 1 后,需要再按动量排序选 Top select_num - 之前:所有 group 的 Top 1 都标记为信号(忽略 select_num) - 现在:先从每个 group 选 Top 1,再从中按动量选 Top select_num 个 - 影响:配置 select_num=3 时,实际持仓 3 只而不是 4 只(group 数量)
473 lines
17 KiB
Python
473 lines
17 KiB
Python
"""
|
||
全球资产大类轮动策略(V2 正式版)
|
||
|
||
基于动量因子的全球资产轮动策略
|
||
- 支持信号-交易分离(指数信号 → ETF收益)
|
||
- 强制分散化选股(每个 group 只选 1 个)
|
||
- 动态短债阈值(标的动量 < 短债动量 → 不持有)
|
||
- 溢价过滤(避免买入高溢价 ETF)
|
||
- 调仓控制(rebalance_days + rebalance_threshold)
|
||
- 交易成本计算(trade_cost: 0.1%)
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from typing import Dict, Optional, Tuple
|
||
from datetime import datetime, timedelta
|
||
|
||
from framework_v2.core.strategy import StrategyBase
|
||
from framework_v2.config.schemas import StrategyConfig
|
||
from framework_v2.shared.factors import MomentumFactor
|
||
from framework_v2.shared.data.alignment import CrossMarketAligner
|
||
|
||
|
||
class GlobalRotationStrategy(StrategyBase):
|
||
"""
|
||
全球资产大类轮动策略(V2 正式版)
|
||
|
||
策略逻辑:
|
||
1. 计算各指数标的动量得分(加权线性回归)
|
||
2. 使用动态短债阈值过滤负动量标的
|
||
3. 每个 group 内竞争,只选 Top 1(强制分散化)
|
||
4. 溢价过滤:排除溢价率 > 阈值的 ETF
|
||
5. 调仓控制:最低持仓天数 + 调仓阈值
|
||
6. 等权分配仓位
|
||
7. 扣除交易成本(0.1%)
|
||
|
||
示例:
|
||
from framework_v2.config import load_config
|
||
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
|
||
|
||
config = load_config('config_simple.yaml')
|
||
strategy = GlobalRotationStrategy(config)
|
||
result = strategy.run()
|
||
"""
|
||
|
||
def __init__(self, config: StrategyConfig):
|
||
"""
|
||
初始化策略
|
||
|
||
Args:
|
||
config: 策略配置
|
||
"""
|
||
super().__init__(config)
|
||
|
||
# 初始化动量因子
|
||
self.momentum = MomentumFactor(
|
||
n_days=config.factor.n_days,
|
||
weighted=(config.factor.type.value == 'weighted_momentum')
|
||
)
|
||
|
||
# 策略参数(从 config 中读取)
|
||
rotation_config = config.rotation
|
||
self.select_num = rotation_config.select_num if rotation_config else 3
|
||
self.diversified = rotation_config.diversified if rotation_config else True
|
||
|
||
# 动态阈值配置
|
||
self.use_dynamic_threshold = False
|
||
self.bond_code = None
|
||
self.bond_ratio = 1.0
|
||
self.fill_bond = True
|
||
|
||
if rotation_config and rotation_config.threshold:
|
||
threshold_config = rotation_config.threshold
|
||
if hasattr(threshold_config, 'mode') and threshold_config.mode == 'dynamic':
|
||
self.use_dynamic_threshold = True
|
||
dynamic_config = threshold_config.dynamic
|
||
self.bond_code = dynamic_config.reference
|
||
self.bond_ratio = dynamic_config.ratio
|
||
|
||
# 调仓控制
|
||
self.rebalance_days = getattr(rotation_config, 'rebalance_days', 1) if rotation_config else 1
|
||
self.rebalance_threshold = getattr(rotation_config, 'rebalance_threshold', 0.0) if rotation_config else 0.0
|
||
|
||
# 交易成本
|
||
self.trade_cost = getattr(config.backtest, 'trade_cost', 0.001) if config.backtest else 0.001
|
||
|
||
# 溢价控制
|
||
self.use_premium_control = False
|
||
self.premium_threshold = 0.10 # 默认 10%
|
||
|
||
if hasattr(config, 'premium_control'):
|
||
premium_config = config.premium_control
|
||
self.use_premium_control = getattr(premium_config, 'enabled', False)
|
||
if self.use_premium_control:
|
||
self.premium_threshold = getattr(premium_config, 'default_threshold', 0.10)
|
||
|
||
def get_codes(self) -> list:
|
||
"""
|
||
获取标的列表(信号标的 + 交易标的 + 短债)
|
||
|
||
Returns:
|
||
标的代码列表
|
||
"""
|
||
codes = set()
|
||
|
||
# 添加所有信号标的
|
||
codes.update(self.config.asset_pools.get_signal_codes())
|
||
|
||
# 添加所有交易标的
|
||
codes.update(self.config.asset_pools.get_trade_codes())
|
||
|
||
# 如果使用动态阈值,添加短债标的
|
||
if self.use_dynamic_threshold and self.bond_code:
|
||
codes.add(self.bond_code)
|
||
|
||
return list(codes)
|
||
|
||
def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]:
|
||
"""
|
||
计算动量因子(只使用信号标的的数据)
|
||
|
||
Args:
|
||
data: 数据字典 {code: DataFrame}
|
||
|
||
Returns:
|
||
因子字典 {signal_source: Series}
|
||
"""
|
||
factors = {}
|
||
|
||
# 只使用信号标的计算因子
|
||
signal_codes = self.config.asset_pools.get_signal_codes()
|
||
|
||
for code in signal_codes:
|
||
if code not in data:
|
||
print(f" 警告: {code} 数据不存在,跳过")
|
||
continue
|
||
|
||
try:
|
||
df = data[code]
|
||
factor_values = self.momentum.compute(df)
|
||
factors[code] = factor_values
|
||
except Exception as e:
|
||
print(f" 警告: {code} 因子计算失败 - {e}")
|
||
continue
|
||
|
||
# 如果使用动态阈值,计算短债因子
|
||
if self.use_dynamic_threshold and self.bond_code and self.bond_code in data:
|
||
try:
|
||
df = data[self.bond_code]
|
||
bond_factor = self.momentum.compute(df)
|
||
factors[self.bond_code] = bond_factor
|
||
print(f" [阈值] 短债动量因子已计算: {self.bond_code}")
|
||
except Exception as e:
|
||
print(f" 警告: 短债因子计算失败 - {e}")
|
||
|
||
return factors
|
||
|
||
def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame:
|
||
"""
|
||
生成轮动信号(支持动态阈值和强制分散化)
|
||
|
||
逻辑:
|
||
1. 计算动态短债阈值(如果使用)
|
||
2. 每个 group 内竞争,选 Top 1
|
||
3. 溢价过滤(如果启用)
|
||
4. 组合所有 group 的选股结果
|
||
|
||
Args:
|
||
factors: 因子字典 {code: Series}
|
||
|
||
Returns:
|
||
信号 DataFrame(index=日期, columns=signal_source, values=1或0)
|
||
"""
|
||
if not factors:
|
||
return pd.DataFrame()
|
||
|
||
# 对齐所有因子的日期
|
||
factor_df = pd.DataFrame(factors)
|
||
|
||
# 获取动态短债阈值(如果使用)
|
||
bond_threshold = None
|
||
if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors:
|
||
bond_threshold = factors[self.bond_code]
|
||
print(f" [阈值] 使用动态短债阈值: {self.bond_code}")
|
||
|
||
# 获取溢价率数据(如果启用溢价控制)
|
||
premium_data = None
|
||
if self.use_premium_control:
|
||
premium_data = self._get_premium_data()
|
||
print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}")
|
||
|
||
# 按 group 分组选股
|
||
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
|
||
|
||
groups = self.config.asset_pools.by_group
|
||
|
||
for date in factor_df.index:
|
||
selected_codes = []
|
||
|
||
# 对每个 group 独立选股
|
||
for group_name, assets in groups.items():
|
||
# 获取该 group 的信号标的
|
||
group_signal_codes = [asset.signal_source for asset in assets.values()]
|
||
|
||
# 获取当日因子值
|
||
date_factors = factor_df.loc[date][group_signal_codes].dropna()
|
||
|
||
if date_factors.empty:
|
||
continue
|
||
|
||
# 应用动态阈值过滤
|
||
if bond_threshold is not None and date in bond_threshold.index:
|
||
threshold_value = bond_threshold.loc[date] * self.bond_ratio
|
||
date_factors = date_factors[date_factors >= threshold_value]
|
||
|
||
if date_factors.empty:
|
||
continue
|
||
|
||
# 应用溢价过滤
|
||
if premium_data is not None:
|
||
date_factors = self._filter_by_premium(
|
||
date_factors, date, premium_data
|
||
)
|
||
|
||
if date_factors.empty:
|
||
continue
|
||
|
||
# 选择 Top 1(强制分散化)
|
||
top_code = date_factors.idxmax()
|
||
selected_codes.append(top_code)
|
||
|
||
# 第二步:从所有 group 的 Top 1 中,按动量再选 Top select_num 个
|
||
if selected_codes:
|
||
# 获取这些标的的当日因子值
|
||
candidate_factors = factor_df.loc[date][selected_codes].dropna()
|
||
|
||
if not candidate_factors.empty:
|
||
# 按动量排序,选 Top select_num
|
||
if len(candidate_factors) > self.select_num:
|
||
final_selected = candidate_factors.nlargest(self.select_num).index.tolist()
|
||
else:
|
||
final_selected = candidate_factors.index.tolist()
|
||
|
||
# 标记信号
|
||
signals.loc[date, final_selected] = 1
|
||
|
||
return signals.astype(int)
|
||
|
||
def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame:
|
||
"""
|
||
仓位管理(等权分配 + 调仓控制)
|
||
|
||
Args:
|
||
signals: 信号 DataFrame
|
||
|
||
Returns:
|
||
仓位 DataFrame
|
||
"""
|
||
positions = signals.astype(float).copy()
|
||
|
||
# 跟踪上次调仓日期
|
||
last_rebalance_date = None
|
||
|
||
for date in positions.index:
|
||
signal_row = positions.loc[date].copy()
|
||
n_selected = signal_row.sum()
|
||
|
||
if n_selected == 0:
|
||
# 空仓
|
||
positions.loc[date] = 0
|
||
continue
|
||
|
||
# 检查是否需要调仓
|
||
if last_rebalance_date is not None:
|
||
# 检查持仓天数
|
||
holding_days = (date - last_rebalance_date).days
|
||
if holding_days < self.rebalance_days:
|
||
# 未达到最低持仓天数,保持上次仓位
|
||
positions.loc[date] = positions.loc[last_rebalance_date]
|
||
continue
|
||
|
||
# 等权分配
|
||
positions.loc[date] = signal_row / n_selected
|
||
last_rebalance_date = date
|
||
|
||
return positions
|
||
|
||
def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
|
||
"""
|
||
执行回测(使用 CrossMarketAligner 进行正确的数据对齐)
|
||
|
||
Args:
|
||
positions: 仓位 DataFrame
|
||
data: 数据字典
|
||
|
||
Returns:
|
||
回测结果字典
|
||
"""
|
||
# 获取信号→交易映射
|
||
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
|
||
|
||
# 获取 A 股交易日历
|
||
print("\n [对齐] 获取 A 股交易日历...")
|
||
trading_calendar = self._get_trading_calendar()
|
||
print(f" [日历] A 股交易日: {len(trading_calendar)} 天 ({trading_calendar[0]} ~ {trading_calendar[-1]})")
|
||
|
||
# 创建对齐器
|
||
aligner = CrossMarketAligner(target_calendar=trading_calendar)
|
||
|
||
# 提取交易标的的收盘价,并对齐到 A 股日历
|
||
print(" [对齐] 对齐 ETF 价格到 A 股日历...")
|
||
close_dict = {}
|
||
for signal_code, trade_code in signal_to_trade.items():
|
||
if trade_code in data:
|
||
# 提取收盘价
|
||
close_series = data[trade_code]['close']
|
||
# 使用 signal_code 作为键(与 positions 列名一致)
|
||
close_dict[signal_code] = close_series
|
||
else:
|
||
print(f" 警告: {trade_code} 数据不存在,跳过")
|
||
|
||
# 使用 CrossMarketAligner 对齐多标的收益率
|
||
# 内部逻辑:先 ffill 价格到 A 股日历,再计算收益率
|
||
print(" [对齐] 计算收益率(先对齐价格,再计算)...")
|
||
returns_df = aligner.align_multi_asset(close_dict)
|
||
print(f" [对齐] 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的")
|
||
|
||
# 对齐 positions 到 A 股日历
|
||
positions = positions.reindex(trading_calendar, method='ffill')
|
||
|
||
# 计算策略收益(仓位加权,T+1 执行)
|
||
positions_delayed = positions.shift(1).fillna(0)
|
||
strategy_returns = (positions_delayed * returns_df).sum(axis=1)
|
||
|
||
# 扣除交易成本
|
||
strategy_returns, rebalance_count = self._apply_trade_cost(
|
||
strategy_returns, positions
|
||
)
|
||
print(f" [成本] 调仓次数: {rebalance_count}, 交易成本: {self.trade_cost:.2%}")
|
||
|
||
# 计算净值曲线
|
||
equity_curve = (1 + strategy_returns).cumprod()
|
||
|
||
# 检查是否有数据
|
||
if len(equity_curve) == 0:
|
||
return {
|
||
'equity_curve': equity_curve,
|
||
'strategy_returns': strategy_returns,
|
||
'positions': positions,
|
||
'metrics': {
|
||
'total_return': 0,
|
||
'annual_return': 0,
|
||
'max_drawdown': 0,
|
||
'sharpe_ratio': 0,
|
||
'n_days': 0,
|
||
'rebalance_count': 0,
|
||
}
|
||
}
|
||
|
||
# 计算绩效指标
|
||
total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1
|
||
n_days = len(strategy_returns)
|
||
annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
|
||
|
||
# 最大回撤
|
||
cumulative_max = equity_curve.cummax()
|
||
drawdown = (equity_curve - cumulative_max) / cumulative_max
|
||
max_drawdown = drawdown.min()
|
||
|
||
# 夏普比率
|
||
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
|
||
|
||
return {
|
||
'equity_curve': equity_curve,
|
||
'strategy_returns': strategy_returns,
|
||
'positions': positions,
|
||
'metrics': {
|
||
'total_return': total_return,
|
||
'annual_return': annual_return,
|
||
'max_drawdown': max_drawdown,
|
||
'sharpe_ratio': sharpe,
|
||
'n_days': n_days,
|
||
'rebalance_count': rebalance_count,
|
||
}
|
||
}
|
||
|
||
def _apply_trade_cost(self, strategy_returns: pd.Series, positions: pd.DataFrame) -> Tuple[pd.Series, int]:
|
||
"""
|
||
扣除交易成本
|
||
|
||
Args:
|
||
strategy_returns: 策略收益率
|
||
positions: 仓位 DataFrame
|
||
|
||
Returns:
|
||
(扣除成本后的收益率, 调仓次数)
|
||
"""
|
||
if self.trade_cost <= 0:
|
||
return strategy_returns, 0
|
||
|
||
# 检测调仓(持仓变化)
|
||
position_changes = (positions != positions.shift(1)).any(axis=1)
|
||
rebalance_count = position_changes.sum()
|
||
|
||
# 扣除交易成本
|
||
strategy_returns[position_changes] -= self.trade_cost
|
||
|
||
return strategy_returns, rebalance_count
|
||
|
||
def _get_premium_data(self) -> Optional[Dict]:
|
||
"""
|
||
获取溢价率数据
|
||
|
||
Returns:
|
||
溢价率数据字典 {trade_code: {date: premium_rate}}
|
||
"""
|
||
# TODO: 从数据源获取溢价率数据
|
||
# 当前返回 None,后续实现
|
||
return None
|
||
|
||
def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series:
|
||
"""
|
||
溢价过滤
|
||
|
||
Args:
|
||
factors: 因子 Series
|
||
date: 日期
|
||
premium_data: 溢价率数据
|
||
|
||
Returns:
|
||
过滤后的因子 Series
|
||
"""
|
||
if premium_data is None:
|
||
return factors
|
||
|
||
# TODO: 实现溢价过滤逻辑
|
||
return factors
|
||
|
||
def _get_trading_calendar(self) -> pd.DatetimeIndex:
|
||
"""
|
||
获取 A 股交易日历
|
||
|
||
Returns:
|
||
A 股交易日历 DatetimeIndex
|
||
"""
|
||
from datetime import date
|
||
|
||
# 获取回测区间
|
||
start = self.config.backtest.start_date
|
||
end = self.config.backtest.end_date
|
||
if end is None:
|
||
end = date.today().strftime('%Y-%m-%d')
|
||
|
||
# 创建临时数据获取器来获取交易日历
|
||
if self._data_fetcher is None:
|
||
self._data_fetcher = self._create_data_fetcher()
|
||
|
||
try:
|
||
# 调用 get_trading_calendar 方法
|
||
calendar = self._data_fetcher.get_trading_calendar(
|
||
market='A',
|
||
start=start,
|
||
end=end
|
||
)
|
||
print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})")
|
||
return calendar
|
||
except Exception as e:
|
||
print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}")
|
||
# 降级方案:使用 pandas 生成工作日
|
||
start_dt = pd.Timestamp(start)
|
||
end_dt = pd.Timestamp(end)
|
||
return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日
|