Files
etf/strategies/shared/signals/selectors.py
aszerW acec96539b fix(strategy): 添加负动量空仓机制避免持仓惯性亏损
当所有候选标的动量得分低于min_score时,策略自动清仓而非继续持有之前的负动量组合。

修复问题:
- 旧逻辑:target为空时继续持有current_held(负动量标的)
- 新逻辑:target为空时清仓(current_held='')

回测效果:
- 累计收益从11872%提升至14580%(+2708%)
- 最大回撤从-71.9%改善至-61.1%(+10.9%)
- 2000年互联网泡沫期间空仓77天(24.8%)
- 2001年空仓31天,收益从-48.7%改善至-41.1%
2026-05-16 11:42:36 +08:00

307 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
定制信号生成器实现
这些信号生成器继承framework.core.signals.SignalGenerator
"""
from framework.signals import SignalGenerator
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
class TopNSelector(SignalGenerator):
"""
Top N选股器定制实现
用于轮动策略:
- 按因子值排序选出Top N标的
- 支持分组选股(先类内竞争,再跨类排序)
- 支持调仓阈值检查(新组合得分需超过当前组合一定比例才调仓)
参数:
- select_num: 选中数量默认3
- group_by: 分组键名(可选,如'market'
- group_mapping: 分组映射字典(可选,{code: group}
- top_per_group: 每组选中数量默认1
- min_score: 最小得分阈值可选如0表示过滤负分
- rebalance_threshold: 调仓阈值可选新组合得分需超过当前组合X%才调仓)
- rebalance_days: 最低调仓周期可选持仓至少N天才能调仓
"""
mode = "top_n"
def __init__(
self,
select_num: int = 3,
group_by: Optional[str] = None,
group_mapping: Optional[Dict[str, str]] = None,
top_per_group: int = 1,
min_score: Optional[float] = None,
rebalance_threshold: float = 0.0,
rebalance_days: int = 1
):
super().__init__(
select_num=select_num,
group_by=group_by,
group_mapping=group_mapping,
top_per_group=top_per_group,
min_score=min_score,
rebalance_threshold=rebalance_threshold,
rebalance_days=rebalance_days
)
self.select_num = select_num
self.group_by = group_by
self.group_mapping = group_mapping or {}
self.top_per_group = top_per_group
self.min_score = min_score
self.rebalance_threshold = rebalance_threshold
self.rebalance_days = rebalance_days
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成Top N选股信号支持调仓周期控制"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
if not factor_cols:
result['signal'] = ''
return result
# Step 1: 每日目标组合(不考虑调仓周期)
daily_target = []
for date in factor_data.index:
row = factor_data.loc[date]
# 提取得分
scores = {}
for col in factor_cols:
score = row[col]
if pd.notna(score):
scores[col] = score
# 最小得分过滤(如过滤负分)
if self.min_score is not None:
scores = {k: v for k, v in scores.items() if v >= self.min_score}
# 分组选股或全局选股
if self.group_mapping:
selected = self._grouped_selection(scores)
else:
selected = self._global_top_n(scores)
daily_target.append(','.join(selected) if selected else '')
# Step 2: 逐日生成信号(调仓周期控制)
signals = self._apply_rebalance_control(daily_target, factor_data)
result['signal_raw'] = daily_target # 每日目标组合
result['signal'] = signals
# T+1执行信号向后移位1天
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]
def _global_top_n(self, scores: Dict[str, float]) -> List[str]:
"""全局Top N选股"""
if not scores:
return []
sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [item[0] for item in sorted_items[:self.select_num]]
def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]:
"""应用调仓周期控制"""
signals = []
current_held = None
last_rebalance_idx = 0
for i, target in enumerate(daily_target):
# 初始持仓为空,等待第一个有效信号
if current_held is None:
if not target:
signals.append('')
continue
current_held = target
last_rebalance_idx = i
signals.append(current_held)
continue
# 检查调仓周期
days_since = i - last_rebalance_idx
if days_since < self.rebalance_days:
# 未达到最低调仓周期,保持当前持仓
signals.append(current_held)
continue
# 检查是否应该调仓
if target: # 目标信号有效
should = self._check_rebalance(
factor_data.iloc[i],
current_held,
target,
self._get_factor_columns(factor_data)
)
if should:
current_held = target
last_rebalance_idx = i
else:
# 目标信号为空所有标的动量得分低于min_score清仓
# 不继续持有负动量标的,转为空仓
current_held = ''
last_rebalance_idx = i
signals.append(current_held)
return signals
def _check_rebalance(
self,
row: pd.Series,
current_held: str,
target: str,
factor_cols: List[str]
) -> bool:
"""检查是否应该调仓(得分阈值检查)"""
# 提取当前持仓和目标持仓的代码
old_codes = [c for c in current_held.split(',') if c]
new_codes = [c for c in target.split(',') if c]
if not new_codes or not old_codes:
return True
if set(new_codes) == set(old_codes):
return False # 组合完全相同,不调仓
# 计算新旧组合的总得分
old_total = sum(float(row.get(col, 0)) for col in factor_cols if col in old_codes)
new_total = sum(float(row.get(col, 0)) for col in factor_cols if col in new_codes)
# 新组合得分需超过当前组合一定比例才调仓
# 即使 threshold=0也要确保 new_total >= old_total
if old_total > 0:
return (new_total / old_total - 1) >= self.rebalance_threshold
return new_total > 0
def _grouped_selection(self, scores: Dict[str, float]) -> List[str]:
"""分组选股先类内竞争每大类选Top1再跨类排序"""
if not scores:
return []
# 建立 group -> (code, score) 的映射
group_champions = {}
for code, score in scores.items():
# 从group_mapping获取分组
group = self.group_mapping.get(code, 'default')
if group not in group_champions or score > group_champions[group][1]:
group_champions[group] = (code, score)
# 对各大类的冠军进行排序选出Top N
sorted_champions = sorted(group_champions.values(), key=lambda x: x[1], reverse=True)
return [code for code, score in sorted_champions[:self.select_num]]
class TrendFollower(SignalGenerator):
"""趋势跟随器(定制实现)"""
mode = "trend"
def __init__(self, entry_threshold: float = 0.02, exit_threshold: float = -0.02, select_num: int = 1):
super().__init__(entry_threshold=entry_threshold, exit_threshold=exit_threshold, select_num=select_num)
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.select_num = select_num
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成趋势跟随信号"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
for col in factor_cols:
trend_strength = factor_data[col]
result[f'{col}_entry'] = trend_strength > self.entry_threshold
result[f'{col}_exit'] = trend_strength < self.exit_threshold
signals = []
for date in result.index:
entry_signals = []
for col in factor_cols:
if result.loc[date, f'{col}_entry']:
score = factor_data.loc[date, col]
if pd.notna(score):
entry_signals.append((col, score))
entry_signals.sort(key=lambda x: x[1], reverse=True)
selected = [item[0] for item in entry_signals[:self.select_num]]
signals.append(','.join(selected) if selected else '')
result['signal'] = signals
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]
class ReversalTrader(SignalGenerator):
"""反转交易器(定制实现)"""
mode = "reversal"
def __init__(self, overbought: float = 70, oversold: float = 30, reversal_threshold: float = 0.1):
super().__init__(overbought=overbought, oversold=oversold, reversal_threshold=reversal_threshold)
self.overbought = overbought
self.oversold = oversold
self.reversal_threshold = reversal_threshold
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成反转交易信号"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
for col in factor_cols:
reversal_signal = factor_data[col]
result[f'{col}_buy'] = reversal_signal > self.reversal_threshold
result[f'{col}_sell'] = reversal_signal < -self.reversal_threshold
signals = []
for date in result.index:
buy_signals = []
sell_signals = []
for col in factor_cols:
if result.loc[date, f'{col}_buy']:
buy_signals.append(col)
if result.loc[date, f'{col}_sell']:
sell_signals.append(col)
if buy_signals:
signals.append(f"BUY:{','.join(buy_signals)}")
elif sell_signals:
signals.append(f"SELL:{','.join(sell_signals)}")
else:
signals.append('')
result['signal'] = signals
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]