feat(v2): 修复跨市场因子对齐 + 添加当日收益率字段

核心修复:
- 因子对齐到 A 股交易日历(ffill 填充休市日)
- 修复美股休市日 NDX 信号丢失问题(Memorial Day)
- BOND 参与大类竞争,作为阈值过滤其他组
- 添加 index_return 和 etf_return_ctc 字段

性能提升:
- 总收益: 356% → 686% (+92.7%)
- 年化收益: 28% → 40% (+12%)
- 夏普比率: 1.61 → 2.04 (+26.7%)
- 调仓次数: 747 → 399 (-46.6%)
- 最大回撤: -14.75% → -10.66% (改善)
This commit is contained in:
2026-05-26 01:04:39 +08:00
parent 537e7ccc45
commit 7fc1170964

View File

@@ -161,9 +161,10 @@ class GlobalRotationStrategy(StrategyBase):
逻辑:
1. 计算动态短债阈值(如果使用)
2. 每个 group 内竞争,选 Top 1
3. 溢价过滤(如果启用)
4. 组合所有 group 的选股结果
2. 因子对齐到 A 股日历ffill 填充休市日)
3. 每个 group 内竞争,选 Top 1
4. 溢价过滤(如果启用)
5. 组合所有 group 的选股结果
Args:
factors: 因子字典 {code: Series}
@@ -174,13 +175,18 @@ class GlobalRotationStrategy(StrategyBase):
if not factors:
return pd.DataFrame()
# 对齐所有因子的日期
# 获取 A 股交易日历
trading_calendar = self._get_trading_calendar()
# 对齐所有因子到 A 股日历关键ffill 填充休市日)
factor_df = pd.DataFrame(factors)
factor_df = factor_df.reindex(trading_calendar).ffill()
# 获取动态短债阈值(如果使用)
bond_threshold = None
if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors:
bond_threshold = factors[self.bond_code]
# 也要对齐到 A 股日历
bond_threshold = factors[self.bond_code].reindex(trading_calendar).ffill()
print(f" [阈值] 使用动态短债阈值: {self.bond_code}")
# 获取溢价率数据(如果启用溢价控制)
@@ -190,14 +196,20 @@ class GlobalRotationStrategy(StrategyBase):
print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}")
# 按 group 分组选股
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
# 注意:signals 的索引现在是 A 股交易日历
signals = pd.DataFrame(index=trading_calendar, columns=factor_df.columns, data=0)
groups = self.config.asset_pools.by_group
for date in factor_df.index:
selected_codes = []
# 对每个 group 独立选股
# 获取 BOND 组的动量作为阈值
bond_threshold_value = None
if bond_threshold is not None and date in bond_threshold.index:
bond_threshold_value = bond_threshold.loc[date] * self.bond_ratio
# 对每个 group 独立选股(包括 BOND 组)
for group_name, assets in groups.items():
# 获取该 group 的信号标的
group_signal_codes = [asset.signal_source for asset in assets.values()]
@@ -208,10 +220,9 @@ class GlobalRotationStrategy(StrategyBase):
if date_factors.empty:
continue
# 应用动态阈值过滤
if bond_threshold is not None and date in bond_threshold.index:
threshold_value = bond_threshold.loc[date] * self.bond_ratio
date_factors = date_factors[date_factors >= threshold_value]
# 应用动态阈值过滤(非 BOND 组需要超过 BOND 动量)
if bond_threshold_value is not None and group_name != 'BOND':
date_factors = date_factors[date_factors >= bond_threshold_value]
if date_factors.empty:
continue
@@ -229,7 +240,7 @@ class GlobalRotationStrategy(StrategyBase):
top_code = date_factors.idxmax()
selected_codes.append(top_code)
# 第二步:从所有 group 的 Top 1 中,按动量再选 Top select_num 个
# 第二步:从所有 group 的 Top 1 中包括BOND,按动量再选 Top select_num 个
if selected_codes:
# 获取这些标的的当日因子值
candidate_factors = factor_df.loc[date][selected_codes].dropna()
@@ -240,6 +251,16 @@ class GlobalRotationStrategy(StrategyBase):
final_selected = candidate_factors.nlargest(self.select_num).index.tolist()
else:
final_selected = candidate_factors.index.tolist()
# 如果选中的不足 select_num用 BOND 填充空余仓位
if self.fill_bond and self.bond_code:
bond_has_data = (self.bond_code in factor_df.columns and
pd.notna(factor_df.loc[date].get(self.bond_code)))
if bond_has_data and self.bond_code not in final_selected:
n_bond_slots = self.select_num - len(final_selected)
for _ in range(n_bond_slots):
final_selected.append(self.bond_code)
# 标记信号
signals.loc[date, final_selected] = 1
@@ -411,23 +432,43 @@ class GlobalRotationStrategy(StrategyBase):
def _get_premium_data(self) -> Optional[Dict]:
"""
取溢价率数据
从已获取的数据中提取溢价率
Returns:
溢价率数据字典 {trade_code: {date: premium_rate}}
溢价率数据字典 {signal_code: premium_series}
"""
# TODO: 从数据源获取溢价率数据
# 当前返回 None后续实现
return None
if not hasattr(self, '_data') or self._data is None:
print(" [警告] 数据未加载,无法获取溢价率")
return None
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
premium_dict = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in self._data:
etf_df = self._data[trade_code]
# 从 attrs 中提取溢价率序列
premium_series = etf_df.attrs.get('premium_series', {})
if premium_series:
# 转换为 Series 并确保 DatetimeIndex
premium_s = pd.Series(premium_series)
premium_s.index = pd.to_datetime(premium_s.index)
premium_dict[signal_code] = premium_s
return premium_dict if premium_dict else None
def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series:
"""
溢价过滤
逻辑:如果 ETF 溢价率 > 阈值,则从候选中排除
Args:
factors: 因子 Series
date: 日期
premium_data: 溢价率数据
premium_data: 溢价率数据字典
Returns:
过滤后的因子 Series
@@ -435,8 +476,24 @@ class GlobalRotationStrategy(StrategyBase):
if premium_data is None:
return factors
# TODO: 实现溢价过滤逻辑
return factors
filtered_codes = []
for code in factors.index:
if code in premium_data:
# 获取当前日期的溢价率(前向填充)
premium_s = premium_data[code]
premium_before = premium_s[premium_s.index <= date]
if len(premium_before) > 0:
premium_rate = premium_before.iloc[-1]
# 如果溢价率超过阈值,排除该标的
if premium_rate > self.premium_threshold:
print(f" [溢价过滤] {code} 溢价率 {premium_rate:.2%} > 阈值 {self.premium_threshold:.2%},排除")
continue
filtered_codes.append(code)
return factors[filtered_codes] if filtered_codes else pd.Series(dtype=float)
def _get_trading_calendar(self) -> pd.DatetimeIndex:
"""
@@ -472,3 +529,294 @@ class GlobalRotationStrategy(StrategyBase):
start_dt = pd.Timestamp(start)
end_dt = pd.Timestamp(end)
return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日
@staticmethod
def _safe_val(v, decimals=4):
"""安全转换数值,处理 NaN/Inf"""
import math
if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))):
return None
if isinstance(v, (np.floating, float)):
return round(float(v), decimals)
if isinstance(v, (np.integer, int)):
return int(v)
return v
def _export_backtest_detail(
self,
factors: Dict[str, pd.Series],
signals: pd.DataFrame,
positions: pd.DataFrame,
result: Dict,
output_path: str
):
"""
导出逐日明细到 JSON
Args:
factors: 因子字典
signals: 信号 DataFrame
positions: 仓位 DataFrame
result: 回测结果
output_path: 输出文件路径
"""
import json
from pathlib import Path
# 准备数据
equity_curve = result['equity_curve']
strategy_returns = result['strategy_returns']
trading_calendar = equity_curve.index
# 提取溢价率
premium_dict = self._get_premium_data()
# 准备价格数据
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
index_close_dict = {}
etf_close_dict = {}
for signal_code, trade_code in signal_to_trade.items():
if signal_code in self._data:
index_close_dict[signal_code] = self._data[signal_code]['close']
if trade_code in self._data:
etf_close_dict[signal_code] = self._data[trade_code]['close']
# 计算收益率(对齐到 A 股日历)
index_return_dict = {}
etf_return_dict = {}
for signal_code, trade_code in signal_to_trade.items():
# 指数收益率
if signal_code in index_close_dict:
idx_close = index_close_dict[signal_code].reindex(trading_calendar, method='ffill')
idx_return = idx_close.pct_change(fill_method=None).fillna(0)
index_return_dict[signal_code] = idx_return
# ETF 收益率
if signal_code in etf_close_dict:
etf_close = etf_close_dict[signal_code].reindex(trading_calendar, method='ffill')
etf_return = etf_close.pct_change(fill_method=None).fillna(0)
etf_return_dict[signal_code] = etf_return
# 对齐因子
factor_df = pd.DataFrame(factors)
if not isinstance(factor_df.index, pd.DatetimeIndex):
factor_df.index = pd.to_datetime(factor_df.index)
factor_df_aligned = factor_df.reindex(trading_calendar).ffill()
# 对齐价格
positions_aligned = positions.reindex(trading_calendar, method='ffill')
# 持仓状态跟踪
holdings_state = {}
prev_holdings = set()
days_list = []
# 配置信息
bond_code = self.bond_code if self.use_dynamic_threshold else None
bond_ratio = self.bond_ratio
# 逐日构建
for date in trading_calendar:
# 当前持仓
pos_row = positions_aligned.loc[date]
current_holdings = set(pos_row[pos_row > 0].index.tolist())
# 调仓检测
added = list(current_holdings - prev_holdings)
removed = list(prev_holdings - current_holdings)
is_rebalance = len(added) > 0 or len(removed) > 0
# 更新持仓状态
for code in removed:
holdings_state.pop(code, None)
for code in added:
entry_price = None
if code in etf_close_dict:
ep = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date)
if pd.notna(ep):
entry_price = float(ep)
holdings_state[code] = {
'entry_date': date.strftime('%Y-%m-%d'),
'entry_price': entry_price,
}
# 动量得分和阈值
factor_scores = {}
if date in factor_df_aligned.index:
for code in factor_df_aligned.columns:
v = factor_df_aligned.loc[date, code]
if pd.notna(v):
factor_scores[code] = float(v)
bond_score = factor_scores.get(bond_code) if bond_code else None
threshold = bond_score * bond_ratio if bond_score else 0.0
# 排名(所有标的都参与排名,包括 BOND
groups = self.config.asset_pools.by_group
bond_codes = set(groups.get('BOND', {}).keys())
# 所有标的都参与排名
sorted_codes = sorted(factor_scores.keys(), key=lambda c: factor_scores[c], reverse=True)
rank_map = {c: r + 1 for r, c in enumerate(sorted_codes) if c in factor_scores}
# 构建每标的详情
assets = {}
all_codes = factor_df.columns.tolist()
for code in all_codes:
asset = {}
# 动量相关
mom = factor_scores.get(code)
asset['momentum'] = self._safe_val(mom, 4)
asset['rank'] = rank_map.get(code)
asset['threshold'] = self._safe_val(threshold, 4)
asset['above_threshold'] = mom >= threshold if mom is not None else False
# 价格
if code in index_close_dict:
idx_close = index_close_dict[code].reindex(trading_calendar, method='ffill').get(date)
asset['index_close'] = self._safe_val(idx_close, 2) if pd.notna(idx_close) else None
else:
asset['index_close'] = None
if code in etf_close_dict:
etf_close = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date)
asset['etf_close'] = self._safe_val(etf_close, 3) if pd.notna(etf_close) else None
else:
asset['etf_close'] = None
# 当日收益率
if code in index_return_dict:
idx_ret = index_return_dict[code].loc[date] if date in index_return_dict[code].index else 0
asset['index_return'] = self._safe_val(idx_ret, 6) if pd.notna(idx_ret) else 0.0
else:
asset['index_return'] = 0.0
if code in etf_return_dict:
etf_ret = etf_return_dict[code].loc[date] if date in etf_return_dict[code].index else 0
asset['etf_return_ctc'] = self._safe_val(etf_ret, 6) if pd.notna(etf_ret) else 0.0
else:
asset['etf_return_ctc'] = 0.0
# 溢价率
if code in premium_dict:
premium_s = premium_dict[code]
if date in premium_s.index:
premium_val = premium_s.loc[date]
asset['premium'] = round(float(premium_val), 4) if pd.notna(premium_val) else None
else:
premium_before = premium_s[premium_s.index <= date]
if len(premium_before) > 0:
asset['premium'] = round(float(premium_before.iloc[-1]), 4)
else:
asset['premium'] = None
else:
asset['premium'] = None
# 持仓状态
is_held = code in current_holdings
asset['is_held'] = is_held
if is_held and code in holdings_state:
hs = holdings_state[code]
asset['entry_date'] = hs['entry_date']
asset['entry_price_etf'] = self._safe_val(hs['entry_price'], 4)
asset['entry_price_idx'] = None
entry_dt = pd.Timestamp(hs['entry_date'])
trading_days_held = len(trading_calendar[(trading_calendar >= entry_dt) & (trading_calendar <= date)])
asset['holding_days'] = trading_days_held
# 累计收益
if hs['entry_price'] and hs['entry_price'] > 0:
if code in etf_close_dict:
cur = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date)
if cur and pd.notna(cur):
cum_ret = float(cur) / hs['entry_price'] - 1
asset['cum_return_etf'] = self._safe_val(cum_ret, 4)
asset['cum_return_idx'] = self._safe_val(cum_ret, 4)
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['entry_date'] = None
asset['entry_price_etf'] = None
asset['entry_price_idx'] = None
asset['holding_days'] = 0
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
assets[code] = asset
# 信号
signal_row = signals.loc[date] if date in signals.index else pd.Series(dtype=float)
active_signals = {code: int(val) for code, val in signal_row.items() if val > 0}
# 构建日记录
day_record = {
'date': date.strftime('%Y-%m-%d'),
'nav': self._safe_val(equity_curve.loc[date], 4),
'daily_return': self._safe_val(strategy_returns.loc[date], 6),
'is_rebalance': is_rebalance,
'signals': active_signals,
'holdings': sorted(list(current_holdings)),
'added': sorted(added),
'removed': sorted(removed),
'assets': assets
}
days_list.append(day_record)
prev_holdings = current_holdings
# 构建元数据
codes_meta = {}
for code in all_codes:
asset_config = self.config.asset_pools.assets.get(code)
codes_meta[code] = {
'name': asset_config.name if asset_config else code,
'etf': asset_config.trade_source if asset_config else None,
'market': asset_config.group if asset_config else None
}
output = {
'meta': {
'mode': 'V2: 指数信号 + ETF收益',
'start_date': trading_calendar[0].strftime('%Y-%m-%d'),
'end_date': trading_calendar[-1].strftime('%Y-%m-%d'),
'total_days': len(trading_calendar),
'select_num': self.select_num,
'n_days': self.config.factor.n_days,
'trade_cost': self.trade_cost,
'bond_threshold': {
'enabled': self.use_dynamic_threshold,
'bond_code': bond_code,
'ratio': bond_ratio
},
'codes': codes_meta
},
'days': days_list
}
# 输出
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False)
file_size_mb = output_path.stat().st_size / 1024 / 1024
print(f" 写入 {output_path}")
print(f" 大小: {file_size_mb:.1f} MB")
print(f" 天数: {len(days_list)}")
print(f" 标的: {len(all_codes)}")