""" 定制信号生成器实现 这些信号生成器继承framework.core.signals.SignalGenerator """ from framework.signals import SignalGenerator import pandas as pd import numpy as np from typing import Dict, List, Optional, Any class TopNSelector(SignalGenerator): """ Top N选股器(定制实现) 用于轮动策略: - 按因子值排序,选出Top N标的 - 支持分组选股(先类内竞争,再跨类排序) - 支持调仓阈值检查(新组合得分需超过当前组合一定比例才调仓) - V3: 支持动态阈值(短债动量作为过滤阈值) 参数: - select_num: 选中数量(默认3) - group_by: 分组键名(可选,如'market') - group_mapping: 分组映射字典(可选,{code: group}) - top_per_group: 每组选中数量(默认1) - min_score: 最小得分阈值(可选,如0表示过滤负分) - rebalance_threshold: 调仓阈值(可选,新组合得分需超过当前组合X%才调仓) - rebalance_days: 最低调仓周期(可选,持仓至少N天才能调仓) - bond_threshold_config: V3动态阈值配置 """ mode = "top_n" def __init__( self, select_num: int = 3, group_by: Optional[str] = None, group_mapping: Optional[Dict[str, str]] = None, top_per_group: int = 1, min_score: Optional[float] = None, rebalance_threshold: float = 0.0, rebalance_days: int = 1, bond_threshold_config: Optional[Dict] = None ): super().__init__( select_num=select_num, group_by=group_by, group_mapping=group_mapping, top_per_group=top_per_group, min_score=min_score, rebalance_threshold=rebalance_threshold, rebalance_days=rebalance_days ) self.select_num = select_num self.group_by = group_by self.group_mapping = group_mapping or {} self.top_per_group = top_per_group self.min_score = min_score self.rebalance_threshold = rebalance_threshold self.rebalance_days = rebalance_days self.bond_threshold_config = bond_threshold_config or {} def _get_dynamic_threshold(self, scores: Dict[str, float]) -> float: """获取动态阈值:短债动量 × ratio,无数据时退化为 min_score V3动态阈值逻辑: - 若bond_threshold.enabled=true,阈值 = 短债动量 × ratio - 若短债无数据或动量<0,退化为固定min_score - 若enabled=false,退化为固定min_score """ cfg = self.bond_threshold_config if not cfg.get('enabled', False): return self.min_score if self.min_score is not None else 0.0 bond_code = cfg.get('bond_code', '931862.CSI') ratio = cfg.get('ratio', 1.0) bond_score = scores.get(bond_code, None) if bond_score is None or bond_score < 0: return self.min_score if self.min_score is not None else 0.0 return bond_score * ratio def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: """生成Top N选股信号(支持调仓周期控制)""" result = pd.DataFrame(index=factor_data.index) factor_cols = self._get_factor_columns(factor_data) if not factor_cols: result['signal'] = '' return result # Step 1: 每日目标组合(不考虑调仓周期) daily_target = [] for date in factor_data.index: row = factor_data.loc[date] # 提取得分 scores = {} for col in factor_cols: score = row[col] if pd.notna(score): scores[col] = score # V3: 动态阈值过滤(替代固定 min_score) threshold = self._get_dynamic_threshold(scores) scores = {k: v for k, v in scores.items() if v >= threshold} # 分组选股或全局选股 if self.group_mapping: selected = self._grouped_selection(scores) else: selected = self._global_top_n(scores) daily_target.append(','.join(selected) if selected else '') # Step 2: 逐日生成信号(调仓周期控制) signals = self._apply_rebalance_control(daily_target, factor_data) result['signal_raw'] = daily_target # 每日目标组合 result['signal'] = signals # T+1执行:信号向后移位1天 result['signal'] = result['signal'].shift(1) return result def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: """获取因子列名""" exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume'] return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] def _global_top_n(self, scores: Dict[str, float]) -> List[str]: """全局Top N选股""" if not scores: return [] sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True) return [item[0] for item in sorted_items[:self.select_num]] def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]: """应用调仓周期控制""" signals = [] current_held = None last_rebalance_idx = 0 for i, target in enumerate(daily_target): # 初始持仓为空,等待第一个有效信号 if current_held is None: if not target: signals.append('') continue current_held = target last_rebalance_idx = i signals.append(current_held) continue # 检查调仓周期 days_since = i - last_rebalance_idx if days_since < self.rebalance_days: # 未达到最低调仓周期,保持当前持仓 signals.append(current_held) continue # 检查是否应该调仓 if target: # 目标信号有效 should = self._check_rebalance( factor_data.iloc[i], current_held, target, self._get_factor_columns(factor_data) ) if should: current_held = target last_rebalance_idx = i else: # 目标信号为空(所有标的动量得分低于min_score),清仓 # 不继续持有负动量标的,转为空仓 current_held = '' last_rebalance_idx = i signals.append(current_held) return signals def _check_rebalance( self, row: pd.Series, current_held: str, target: str, factor_cols: List[str] ) -> bool: """检查是否应该调仓(得分阈值检查)""" # 提取当前持仓和目标持仓的代码 old_codes = [c for c in current_held.split(',') if c] new_codes = [c for c in target.split(',') if c] if not new_codes or not old_codes: return True if set(new_codes) == set(old_codes): return False # 组合完全相同,不调仓 # 计算新旧组合的总得分 old_total = sum(float(row.get(col, 0)) for col in factor_cols if col in old_codes) new_total = sum(float(row.get(col, 0)) for col in factor_cols if col in new_codes) # 新组合得分需超过当前组合一定比例才调仓 # 即使 threshold=0,也要确保 new_total >= old_total if old_total > 0: return (new_total / old_total - 1) >= self.rebalance_threshold return new_total > 0 def _grouped_selection(self, scores: Dict[str, float]) -> List[str]: """V3分组选股:BOND不参与竞争,空余仓位填充短债 V3逻辑: 1. BOND大类标的不参与冠军竞争(它是阈值,不是候选) 2. 选出不足 select_num 只时,用短债填充 3. V2退化:若bond_threshold.enabled=false,BOND正常参与竞争 """ if not scores: return [] cfg = self.bond_threshold_config bond_code = cfg.get('bond_code', '931862.CSI') if cfg.get('enabled') else None # 建立 group -> (code, score) 映射 # V3: 排除 BOND 大类(它不参与竞争) group_champions = {} for code, score in scores.items(): group = self.group_mapping.get(code, 'default') # V3: BOND大类不参与竞争 if cfg.get('enabled') and group == 'BOND': continue if group not in group_champions or score > group_champions[group][1]: group_champions[group] = (code, score) # 跨类排序取 Top N sorted_champions = sorted(group_champions.values(), key=lambda x: x[1], reverse=True) selected = [code for code, score in sorted_champions[:self.select_num]] # V3: 空余仓位填充短债 if cfg.get('fill_bond', False) and bond_code: n_bond_slots = self.select_num - len(selected) if n_bond_slots > 0: # 检查短债是否满足阈值条件(需在原始scores中存在) bond_score = scores.get(bond_code, None) if bond_score is not None: # 用短债代码填充空余仓位(可能重复填充多次) for _ in range(n_bond_slots): selected.append(bond_code) return selected class TrendFollower(SignalGenerator): """趋势跟随器(定制实现)""" mode = "trend" def __init__(self, entry_threshold: float = 0.02, exit_threshold: float = -0.02, select_num: int = 1): super().__init__(entry_threshold=entry_threshold, exit_threshold=exit_threshold, select_num=select_num) self.entry_threshold = entry_threshold self.exit_threshold = exit_threshold self.select_num = select_num def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: """生成趋势跟随信号""" result = pd.DataFrame(index=factor_data.index) factor_cols = self._get_factor_columns(factor_data) for col in factor_cols: trend_strength = factor_data[col] result[f'{col}_entry'] = trend_strength > self.entry_threshold result[f'{col}_exit'] = trend_strength < self.exit_threshold signals = [] for date in result.index: entry_signals = [] for col in factor_cols: if result.loc[date, f'{col}_entry']: score = factor_data.loc[date, col] if pd.notna(score): entry_signals.append((col, score)) entry_signals.sort(key=lambda x: x[1], reverse=True) selected = [item[0] for item in entry_signals[:self.select_num]] signals.append(','.join(selected) if selected else '') result['signal'] = signals result['signal'] = result['signal'].shift(1) return result def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: """获取因子列名""" exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume'] return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] class ReversalTrader(SignalGenerator): """反转交易器(定制实现)""" mode = "reversal" def __init__(self, overbought: float = 70, oversold: float = 30, reversal_threshold: float = 0.1): super().__init__(overbought=overbought, oversold=oversold, reversal_threshold=reversal_threshold) self.overbought = overbought self.oversold = oversold self.reversal_threshold = reversal_threshold def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: """生成反转交易信号""" result = pd.DataFrame(index=factor_data.index) factor_cols = self._get_factor_columns(factor_data) for col in factor_cols: reversal_signal = factor_data[col] result[f'{col}_buy'] = reversal_signal > self.reversal_threshold result[f'{col}_sell'] = reversal_signal < -self.reversal_threshold signals = [] for date in result.index: buy_signals = [] sell_signals = [] for col in factor_cols: if result.loc[date, f'{col}_buy']: buy_signals.append(col) if result.loc[date, f'{col}_sell']: sell_signals.append(col) if buy_signals: signals.append(f"BUY:{','.join(buy_signals)}") elif sell_signals: signals.append(f"SELL:{','.join(sell_signals)}") else: signals.append('') result['signal'] = signals result['signal'] = result['signal'].shift(1) return result def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: """获取因子列名""" exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume'] return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]