Files
etf/strategies/shared/signals/selectors.py
aszerW 98597aa369 feat(selector): 短债作为动态现金底仓
设计理念变更:
- 短债(BOND大类)永远参与持仓,不受排名过滤影响
- 其他大类冠军需要排名 <= select_num才能入选
- 当其他标动量都不强时,仓位自动集中到短债(现金)
- 永远不会真正空仓,短债作为现金底仓始终存在

测试验证:
- 正常情况:[NDX, 399006.SZ, 931862.CSI](2动量强+1现金)
- 极端情况:[931862.CSI](仓位集中到现金)
2026-05-18 22:42:08 +08:00

349 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
定制信号生成器实现
这些信号生成器继承framework.core.signals.SignalGenerator
"""
from framework.signals import SignalGenerator
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
class TopNSelector(SignalGenerator):
"""
Top N选股器定制实现
用于轮动策略:
- 按因子值排序选出Top N标的
- 支持分组选股(先类内竞争,再跨类排序)
- 支持调仓阈值检查(新组合得分需超过当前组合一定比例才调仓)
参数:
- select_num: 选中数量默认3
- group_by: 分组键名(可选,如'market'
- group_mapping: 分组映射字典(可选,{code: group}
- top_per_group: 每组选中数量默认1
- min_score: 最小得分阈值可选如0表示过滤负分
- rebalance_threshold: 调仓阈值可选新组合得分需超过当前组合X%才调仓)
- rebalance_days: 最低调仓周期可选持仓至少N天才能调仓
"""
mode = "top_n"
def __init__(
self,
select_num: int = 3,
group_by: Optional[str] = None,
group_mapping: Optional[Dict[str, str]] = None,
top_per_group: int = 1,
min_score: Optional[float] = None,
rebalance_threshold: float = 0.0,
rebalance_days: int = 1
):
super().__init__(
select_num=select_num,
group_by=group_by,
group_mapping=group_mapping,
top_per_group=top_per_group,
min_score=min_score,
rebalance_threshold=rebalance_threshold,
rebalance_days=rebalance_days
)
self.select_num = select_num
self.group_by = group_by
self.group_mapping = group_mapping or {}
self.top_per_group = top_per_group
self.min_score = min_score
self.rebalance_threshold = rebalance_threshold
self.rebalance_days = rebalance_days
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成Top N选股信号支持调仓周期控制"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
if not factor_cols:
result['signal'] = ''
return result
# Step 1: 每日目标组合(不考虑调仓周期)
daily_target = []
for date in factor_data.index:
row = factor_data.loc[date]
# 提取得分
scores = {}
for col in factor_cols:
score = row[col]
if pd.notna(score):
scores[col] = score
# 最小得分过滤(如过滤负分)
if self.min_score is not None:
scores = {k: v for k, v in scores.items() if v >= self.min_score}
# 分组选股或全局选股
if self.group_mapping:
selected = self._grouped_selection(scores)
else:
selected = self._global_top_n(scores)
daily_target.append(','.join(selected) if selected else '')
# Step 2: 逐日生成信号(调仓周期控制)
signals = self._apply_rebalance_control(daily_target, factor_data)
result['signal_raw'] = daily_target # 每日目标组合
result['signal'] = signals
# T+1执行信号向后移位1天
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]
def _global_top_n(self, scores: Dict[str, float]) -> List[str]:
"""全局Top N选股"""
if not scores:
return []
sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [item[0] for item in sorted_items[:self.select_num]]
def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]:
"""应用调仓周期控制"""
signals = []
current_held = None
last_rebalance_idx = 0
for i, target in enumerate(daily_target):
# 初始持仓为空,等待第一个有效信号
if current_held is None:
if not target:
signals.append('')
continue
current_held = target
last_rebalance_idx = i
signals.append(current_held)
continue
# 检查调仓周期
days_since = i - last_rebalance_idx
if days_since < self.rebalance_days:
# 未达到最低调仓周期,保持当前持仓
signals.append(current_held)
continue
# 检查是否应该调仓
if target: # 目标信号有效
should = self._check_rebalance(
factor_data.iloc[i],
current_held,
target,
self._get_factor_columns(factor_data)
)
if should:
current_held = target
last_rebalance_idx = i
else:
# 目标信号为空所有标的动量得分低于min_score清仓
# 不继续持有负动量标的,转为空仓
current_held = ''
last_rebalance_idx = i
signals.append(current_held)
return signals
def _check_rebalance(
self,
row: pd.Series,
current_held: str,
target: str,
factor_cols: List[str]
) -> bool:
"""检查是否应该调仓(得分阈值检查)"""
# 提取当前持仓和目标持仓的代码
old_codes = [c for c in current_held.split(',') if c]
new_codes = [c for c in target.split(',') if c]
if not new_codes or not old_codes:
return True
if set(new_codes) == set(old_codes):
return False # 组合完全相同,不调仓
# 计算新旧组合的总得分
old_total = sum(float(row.get(col, 0)) for col in factor_cols if col in old_codes)
new_total = sum(float(row.get(col, 0)) for col in factor_cols if col in new_codes)
# 新组合得分需超过当前组合一定比例才调仓
# 即使 threshold=0也要确保 new_total >= old_total
if old_total > 0:
return (new_total / old_total - 1) >= self.rebalance_threshold
return new_total > 0
def _grouped_selection(self, scores: Dict[str, float]) -> List[str]:
"""分组选股先类内竞争每大类选Top1再跨类排序
设计理念:短债作为"动态现金"
1. 短债(BOND大类)永远参与持仓,不受排名过滤影响
2. 其他大类冠军需要排名 <= select_num才能入选
3. 当其他标的动量都不强时,仓位自动集中到短债(现金)
4. 这样永远不会真正空仓,短债作为现金底仓始终存在
"""
if not scores:
return []
# 建立 group -> (code, score) 的映射
group_champions = {}
for code, score in scores.items():
# 从group_mapping获取分组
group = self.group_mapping.get(code, 'default')
if group not in group_champions or score > group_champions[group][1]:
group_champions[group] = (code, score)
# ⭐ 计算全局动量排名(用于二次过滤)
all_sorted = sorted(scores.items(), key=lambda x: x[1], reverse=True)
rank_map = {code: rank + 1 for rank, (code, _) in enumerate(all_sorted)}
# ⭐ 大类冠军二次过滤特殊处理BOND大类
valid_champions = []
bond_champion = None # 保存短债作为现金底仓
for group, (code, score) in group_champions.items():
rank = rank_map.get(code, len(all_sorted) + 1)
# ⭐ BOND大类短债特殊处理永远保留作为现金底仓
if group == 'BOND':
if score >= self.min_score: # 只需满足min_score不受排名限制
bond_champion = (code, score, rank)
continue
# 其他大类:需要排名 <= select_num 才有效
if score >= self.min_score and rank <= self.select_num:
valid_champions.append((code, score, rank))
# ⭐ 组合构建:动量强的标的 + 短债(现金)
# 对有效冠军按得分排序
sorted_champions = sorted(valid_champions, key=lambda x: x[1], reverse=True)
# 先选动量强的标的最多select_num-1个留1个位置给短债
top_codes = [code for code, score, rank in sorted_champions[:self.select_num - 1]]
# 再加入短债作为现金底仓(如果有)
if bond_champion:
top_codes.append(bond_champion[0])
# 如果没有短债min_score未满足则从其他冠军补满
if not bond_champion and len(top_codes) < self.select_num:
remaining = [code for code, score, rank in sorted_champions[self.select_num - 1:self.select_num]]
top_codes.extend(remaining)
return top_codes
class TrendFollower(SignalGenerator):
"""趋势跟随器(定制实现)"""
mode = "trend"
def __init__(self, entry_threshold: float = 0.02, exit_threshold: float = -0.02, select_num: int = 1):
super().__init__(entry_threshold=entry_threshold, exit_threshold=exit_threshold, select_num=select_num)
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.select_num = select_num
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成趋势跟随信号"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
for col in factor_cols:
trend_strength = factor_data[col]
result[f'{col}_entry'] = trend_strength > self.entry_threshold
result[f'{col}_exit'] = trend_strength < self.exit_threshold
signals = []
for date in result.index:
entry_signals = []
for col in factor_cols:
if result.loc[date, f'{col}_entry']:
score = factor_data.loc[date, col]
if pd.notna(score):
entry_signals.append((col, score))
entry_signals.sort(key=lambda x: x[1], reverse=True)
selected = [item[0] for item in entry_signals[:self.select_num]]
signals.append(','.join(selected) if selected else '')
result['signal'] = signals
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]
class ReversalTrader(SignalGenerator):
"""反转交易器(定制实现)"""
mode = "reversal"
def __init__(self, overbought: float = 70, oversold: float = 30, reversal_threshold: float = 0.1):
super().__init__(overbought=overbought, oversold=oversold, reversal_threshold=reversal_threshold)
self.overbought = overbought
self.oversold = oversold
self.reversal_threshold = reversal_threshold
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成反转交易信号"""
result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data)
for col in factor_cols:
reversal_signal = factor_data[col]
result[f'{col}_buy'] = reversal_signal > self.reversal_threshold
result[f'{col}_sell'] = reversal_signal < -self.reversal_threshold
signals = []
for date in result.index:
buy_signals = []
sell_signals = []
for col in factor_cols:
if result.loc[date, f'{col}_buy']:
buy_signals.append(col)
if result.loc[date, f'{col}_sell']:
sell_signals.append(col)
if buy_signals:
signals.append(f"BUY:{','.join(buy_signals)}")
elif sell_signals:
signals.append(f"SELL:{','.join(sell_signals)}")
else:
signals.append('')
result['signal'] = signals
result['signal'] = result['signal'].shift(1)
return result
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]