Files
etf/framework_v2/strategies/rotation/simple.py
aszerW e6657bd2cc feat(framework_v2): 对齐 V1 配置,实现指数信号→ETF收益回测
配置对齐:
- config_simple.yaml 严格对齐 V1 config.yaml
  * 11 个标的覆盖 7 个策略分组
  * 回测区间: 2020-01-01 ~ 至今
  * 选股数量: Top-3,强制分散化
  * V3 动态阈值(短债动量参考)
  * 溢价控制启用(HK/US 10%阈值)

策略实现:
- SimpleRotationStrategy 支持 signal_source/trade_source 分离
  * get_codes() 同时获取信号和交易标的
  * compute_factors() 只使用 signal_source 计算因子
  * _execute_backtest() 使用 trade_source 计算收益
  * 支持跨市场场景(指数信号 → ETF收益)

回测验证:
- 成功运行端到端回测
- 获取 21 个标的(11 signal + 10 trade)
- 平均仓位 84.42%
- ⚠️ 已知问题: Flask API 只返回缓存数据(2026年),需修复

修复项:
- StrategyBase.run() 兼容信号矩阵(移除 'weight' 列假设)
2026-05-24 14:58:41 +08:00

257 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
简单轮动策略
基于动量因子的 ETF 轮动策略
- 计算各标的动量得分
- 选择 Top-N 标的
- 等权分配仓位
"""
import pandas as pd
import numpy as np
from typing import Dict
from framework_v2.core.strategy import StrategyBase
from framework_v2.config.schemas import StrategyConfig
from framework_v2.shared.factors import MomentumFactor
class SimpleRotationStrategy(StrategyBase):
"""
简单轮动策略
策略逻辑:
1. 计算各标的动量得分(加权线性回归)
2. 选择得分最高的 Top-N 标的
3. 等权分配仓位
示例:
from framework_v2.config import load_config
from framework_v2.strategies.rotation.simple import SimpleRotationStrategy
config = load_config('rotation_simple.yaml')
strategy = SimpleRotationStrategy(config)
result = strategy.run()
"""
def __init__(self, config: StrategyConfig):
"""
初始化策略
Args:
config: 策略配置
"""
super().__init__(config)
# 初始化动量因子
self.momentum = MomentumFactor(
n_days=config.factor.n_days,
weighted=(config.factor.type.value == 'weighted_momentum')
)
# 策略参数
self.select_num = config.rotation.select_num if config.rotation else 3
self.min_score = config.rotation.threshold.fixed_value if config.rotation else 0.0
def get_codes(self) -> list:
"""
获取标的列表(信号标的 + 交易标的)
返回所有需要的数据标的:
- signal_source: 用于计算因子和信号
- trade_source: 用于计算收益
"""
codes = set()
# 添加所有信号标的
codes.update(self.config.asset_pools.get_signal_codes())
# 添加所有交易标的
codes.update(self.config.asset_pools.get_trade_codes())
return list(codes)
def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]:
"""
计算动量因子(只使用信号标的的数据)
Args:
data: 数据字典 {code: DataFrame}(包含 signal_source 和 trade_source
Returns:
因子字典 {signal_source: Series}
"""
factors = {}
# 只使用信号标的计算因子
signal_codes = self.config.asset_pools.get_signal_codes()
for code in signal_codes:
if code not in data:
print(f" 警告: {code} 数据不存在,跳过")
continue
try:
df = data[code]
# 计算动量得分(使用信号标的的数据)
factor_values = self.momentum.compute(df)
factors[code] = factor_values
except Exception as e:
print(f" 警告: {code} 因子计算失败 - {e}")
continue
return factors
def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame:
"""
生成轮动信号
逻辑:
1. 每个交易日选择动量得分最高的 Top-N 标的
2. 过滤得分低于阈值的标的
Args:
factors: 因子字典 {code: Series}
Returns:
信号 DataFrameindex=日期, columns=标的, values=1或0
"""
if not factors:
return pd.DataFrame()
# 对齐所有因子的日期
factor_df = pd.DataFrame(factors)
# 生成信号
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
for date in factor_df.index:
# 获取当日因子值
scores = factor_df.loc[date].dropna()
if scores.empty:
continue
# 过滤低分标的
if self.min_score > 0:
scores = scores[scores >= self.min_score]
# 选择 Top-N
if len(scores) > self.select_num:
top_codes = scores.nlargest(self.select_num).index
else:
top_codes = scores.index
# 标记信号
signals.loc[date, top_codes] = 1
return signals.astype(int)
def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame:
"""
仓位管理(等权分配)
Args:
signals: 信号 DataFrame
Returns:
仓位 DataFrame包含 'weight' 列)
"""
positions = signals.astype(float).copy()
# 计算每个日期的权重
for date in positions.index:
signal_row = positions.loc[date]
n_selected = signal_row.sum()
if n_selected > 0:
# 等权分配
positions.loc[date] = signal_row / n_selected
else:
# 空仓
positions.loc[date] = 0
return positions
def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
"""
执行回测
核心逻辑:
1. 使用 signal_source 计算信号positions 的 columns 是 signal_source
2. 使用 trade_source 计算收益(通过 signal→trade 映射)
3. T+1 执行:今天的信号明天生效
Args:
positions: 仓位 DataFramecolumns=signal_source
data: 数据字典 {code: DataFrame}(包含 signal_source 和 trade_source
Returns:
回测结果字典
"""
# 获取信号→交易映射
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
# 提取交易标的的收盘价
close_prices = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in data:
# 使用交易标的的数据计算收益
close_prices[signal_code] = data[trade_code]['close']
else:
print(f" 警告: {trade_code} 数据不存在,跳过")
close_df = pd.DataFrame(close_prices)
# 计算收益率
returns = close_df.pct_change()
# 计算策略收益(仓位加权)
# 注意T+1 执行,今天的信号明天生效
positions_delayed = positions.shift(1).fillna(0)
strategy_returns = (positions_delayed * returns).sum(axis=1)
# 计算净值曲线
equity_curve = (1 + strategy_returns).cumprod()
# 检查是否有数据
if len(equity_curve) == 0:
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': 0,
'annual_return': 0,
'max_drawdown': 0,
'sharpe_ratio': 0,
'n_days': 0,
}
}
# 计算绩效指标
total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1
n_days = len(strategy_returns)
annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
# 最大回撤
cumulative_max = equity_curve.cummax()
drawdown = (equity_curve - cumulative_max) / cumulative_max
max_drawdown = drawdown.min()
# 夏普比率
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'n_days': n_days,
}
}