From c5a41b71ae6dacd497eef68170c7a943acdeb96d Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 11 May 2026 23:23:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(signals):=20=E5=AE=8C=E5=96=84TopNSelector?= =?UTF-8?q?=E5=88=86=E6=95=A3=E5=8C=96=E9=80=89=E8=82=A1=E5=92=8C=E8=B0=83?= =?UTF-8?q?=E4=BB=93=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 支持group_mapping分组映射(替代group_info列) - 每大类选Top1,然后跨类排序选Top3 - 添加调仓周期控制(rebalance_days) - 添加调仓阈值检查(rebalance_threshold) - 支持最小得分过滤(min_score过滤负分) --- strategies/shared/signals/selectors.py | 134 +++++++++++++++++++++---- 1 file changed, 112 insertions(+), 22 deletions(-) diff --git a/strategies/shared/signals/selectors.py b/strategies/shared/signals/selectors.py index 97c8bc7..c74dd90 100644 --- a/strategies/shared/signals/selectors.py +++ b/strategies/shared/signals/selectors.py @@ -17,12 +17,16 @@ class TopNSelector(SignalGenerator): 用于轮动策略: - 按因子值排序,选出Top N标的 - 支持分组选股(先类内竞争,再跨类排序) + - 支持调仓阈值检查(新组合得分需超过当前组合一定比例才调仓) 参数: - select_num: 选中数量(默认3) - - group_by: 分组列名(可选,如'market') + - group_by: 分组键名(可选,如'market') + - group_mapping: 分组映射字典(可选,{code: group}) - top_per_group: 每组选中数量(默认1) - - min_score: 最小得分阈值(可选) + - min_score: 最小得分阈值(可选,如0表示过滤负分) + - rebalance_threshold: 调仓阈值(可选,新组合得分需超过当前组合X%才调仓) + - rebalance_days: 最低调仓周期(可选,持仓至少N天才能调仓) """ mode = "top_n" @@ -31,22 +35,31 @@ class TopNSelector(SignalGenerator): self, select_num: int = 3, group_by: Optional[str] = None, + group_mapping: Optional[Dict[str, str]] = None, top_per_group: int = 1, - min_score: Optional[float] = None + min_score: Optional[float] = None, + rebalance_threshold: float = 0.0, + rebalance_days: int = 1 ): super().__init__( select_num=select_num, group_by=group_by, + group_mapping=group_mapping, top_per_group=top_per_group, - min_score=min_score + min_score=min_score, + rebalance_threshold=rebalance_threshold, + rebalance_days=rebalance_days ) self.select_num = select_num self.group_by = group_by + self.group_mapping = group_mapping or {} self.top_per_group = top_per_group self.min_score = min_score + self.rebalance_threshold = rebalance_threshold + self.rebalance_days = rebalance_days def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: - """生成Top N选股信号""" + """生成Top N选股信号(支持调仓周期控制)""" result = pd.DataFrame(index=factor_data.index) factor_cols = self._get_factor_columns(factor_data) @@ -55,29 +68,37 @@ class TopNSelector(SignalGenerator): result['signal'] = '' return result - signals = [] + # Step 1: 每日目标组合(不考虑调仓周期) + daily_target = [] for date in factor_data.index: row = factor_data.loc[date] + # 提取得分 scores = {} for col in factor_cols: score = row[col] if pd.notna(score): scores[col] = score - if self.min_score: + # 最小得分过滤(如过滤负分) + if self.min_score is not None: scores = {k: v for k, v in scores.items() if v >= self.min_score} - if self.group_by and 'group_info' in factor_data.columns: - selected = self._grouped_selection(scores, factor_data.loc[date]) + # 分组选股或全局选股 + if self.group_mapping: + selected = self._grouped_selection(scores) else: selected = self._global_top_n(scores) - signals.append(','.join(selected) if selected else '') + daily_target.append(','.join(selected) if selected else '') + # Step 2: 逐日生成信号(调仓周期控制) + signals = self._apply_rebalance_control(daily_target, factor_data) + + result['signal_raw'] = daily_target # 每日目标组合 result['signal'] = signals - result['signal_raw'] = signals + # T+1执行:信号向后移位1天 result['signal'] = result['signal'].shift(1) return result @@ -95,25 +116,94 @@ class TopNSelector(SignalGenerator): sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True) return [item[0] for item in sorted_items[:self.select_num]] - def _grouped_selection(self, scores: Dict[str, float], row: pd.Series) -> List[str]: - """分组选股:先类内竞争,再跨类排序""" - if 'group_info' not in row.index: - return self._global_top_n(scores) + def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]: + """应用调仓周期控制""" + signals = [] + current_held = None + last_rebalance_idx = 0 - group_info = row['group_info'] - if pd.isna(group_info): - return self._global_top_n(scores) + for i, target in enumerate(daily_target): + # 初始持仓为空,等待第一个有效信号 + if current_held is None: + if not target: + signals.append('') + continue + current_held = target + last_rebalance_idx = i + signals.append(current_held) + continue + + # 检查调仓周期 + days_since = i - last_rebalance_idx + if days_since < self.rebalance_days: + # 未达到最低调仓周期,保持当前持仓 + signals.append(current_held) + continue + + # 检查是否应该调仓 + if target: # 目标信号有效 + should = self._check_rebalance( + factor_data.iloc[i], + current_held, + target, + self._get_factor_columns(factor_data) + ) + if should: + current_held = target + last_rebalance_idx = i + + signals.append(current_held) - groups = group_info if isinstance(group_info, dict) else {} + return signals + + def _check_rebalance( + self, + row: pd.Series, + current_held: str, + target: str, + factor_cols: List[str] + ) -> bool: + """检查是否应该调仓(得分阈值检查)""" + if self.rebalance_threshold <= 0: + # 无阈值,直接调仓 + return target != current_held + # 提取当前持仓和目标持仓的代码 + old_codes = [c for c in current_held.split(',') if c] + new_codes = [c for c in target.split(',') if c] + + if not new_codes or not old_codes: + return True + + if set(new_codes) == set(old_codes): + return False + + # 计算新旧组合的总得分 + old_total = sum(float(row.get(col, 0)) for col in factor_cols if col in old_codes) + new_total = sum(float(row.get(col, 0)) for col in factor_cols if col in new_codes) + + # 新组合得分需超过当前组合一定比例才调仓 + if old_total > 0: + return (new_total / old_total - 1) >= self.rebalance_threshold + + return new_total > 0 + + def _grouped_selection(self, scores: Dict[str, float]) -> List[str]: + """分组选股:先类内竞争(每大类选Top1),再跨类排序""" + if not scores: + return [] + + # 建立 group -> (code, score) 的映射 group_champions = {} for code, score in scores.items(): - group = groups.get(code, 'default') + # 从group_mapping获取分组 + group = self.group_mapping.get(code, 'default') if group not in group_champions or score > group_champions[group][1]: group_champions[group] = (code, score) - champions_scores = {code: score for code, score in group_champions.values()} - return self._global_top_n(champions_scores) + # 对各大类的冠军进行排序,选出Top N + sorted_champions = sorted(group_champions.values(), key=lambda x: x[1], reverse=True) + return [code for code, score in sorted_champions[:self.select_num]] class TrendFollower(SignalGenerator):