From c15e418ead1cef9583a9413f48c20b240b234f56 Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 18 May 2026 23:07:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=9F=AD=E5=80=BA=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E9=98=88=E5=80=BC=E4=BB=93=E4=BD=8D=E5=88=86=E9=85=8D=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 设计理念: - 每份仓位 = 1/select_num - 每个选中标的持有基础份额 1/select_num - 被排除标的的份额归短债(BOND)继承 信号生成: - generate()返回signal和signal_ranks - _grouped_selection_with_ranks()返回标的和排名 仓位分配: - DynamicThresholdAllocator.allocate()计算权重 - 短债继承被排除标的的份额 示例(短债排名2,select_num=3): - NDX排名1 → 1/3(基础) - 短债排名2 → 1/3(基础)+ 1/3(继承)= 2/3 - 排名3的份额被短债继承 --- strategies/shared/allocators/__init__.py | 10 ++ .../shared/allocators/dynamic_threshold.py | 112 ++++++++++++++++++ strategies/shared/signals/selectors.py | 90 ++++++++++++-- 3 files changed, 200 insertions(+), 12 deletions(-) create mode 100644 strategies/shared/allocators/__init__.py create mode 100644 strategies/shared/allocators/dynamic_threshold.py diff --git a/strategies/shared/allocators/__init__.py b/strategies/shared/allocators/__init__.py new file mode 100644 index 0000000..3b58226 --- /dev/null +++ b/strategies/shared/allocators/__init__.py @@ -0,0 +1,10 @@ +""" +仓位分配模块 + +提供多种仓位分配策略: +- DynamicThresholdAllocator: 短债动态阈值仓位分配 +""" + +from .dynamic_threshold import DynamicThresholdAllocator + +__all__ = ['DynamicThresholdAllocator'] diff --git a/strategies/shared/allocators/dynamic_threshold.py b/strategies/shared/allocators/dynamic_threshold.py new file mode 100644 index 0000000..115e1de --- /dev/null +++ b/strategies/shared/allocators/dynamic_threshold.py @@ -0,0 +1,112 @@ +""" +短债动态阈值仓位分配 + +设计理念: +1. 每份仓位 = 1/select_num +2. 每个选中标的持有基础份额 1/select_num +3. 被排除标的的份额归短债(BOND)继承 + +示例(select_num=3, 短债排名2): +- NDX排名1 → 1/3(基础份额) +- 短债排名2 → 1/3(基础)+ 1/3(继承排名3的份额)= 2/3 +""" + +from typing import Dict, List, Tuple + + +class DynamicThresholdAllocator: + """短债动态阈值仓位分配器""" + + def __init__(self, select_num: int, group_mapping: Dict[str, str] = None): + self.select_num = select_num + self.group_mapping = group_mapping or {} + + def allocate(self, selected: List[str], ranks: List[int]) -> Dict[str, float]: + """ + 计算仓位分配 + + Args: + selected: 选中标的列表 + ranks: 对应排名列表 + + Returns: + {code: weight} 权重字典 + """ + if not selected: + return {} + + # 每份仓位 + unit_weight = 1.0 / self.select_num + + # 基础仓位:每个选中标的持有 1/select_num + weights = {code: unit_weight for code in selected} + + # 找出短债(BOND大类) + bond_code = None + for code in selected: + if self.group_mapping.get(code) == 'BOND': + bond_code = code + break + + # 计算被排除的份额数量 + # effective_threshold = min(短债排名, select_num) + # 被排除份额 = select_num - effective_threshold + if bond_code and ranks: + bond_rank = ranks[selected.index(bond_code)] + effective_threshold = min(bond_rank, self.select_num) + excluded_slots = self.select_num - effective_threshold + + # 被排除的份额归短债继承 + if excluded_slots > 0: + weights[bond_code] += excluded_slots * unit_weight + + # 确保权重总和为100% + total_weight = sum(weights.values()) + if total_weight > 1.0: + # 如果超过100%,需要调整(极端情况) + for code in weights: + weights[code] /= total_weight + + return weights + + def get_position_info(self, selected: List[str], ranks: List[int]) -> Dict: + """ + 获取详细仓位信息(用于报告) + + Returns: + { + 'weights': {code: weight}, + 'unit_weight': float, + 'effective_threshold': int, + 'excluded_slots': int, + 'bond_inherited': float, + } + """ + weights = self.allocate(selected, ranks) + unit_weight = 1.0 / self.select_num + + # 找短债 + bond_code = None + bond_rank = None + for code in selected: + if self.group_mapping.get(code) == 'BOND': + bond_code = code + bond_rank = ranks[selected.index(code)] + break + + info = { + 'weights': weights, + 'unit_weight': unit_weight, + 'effective_threshold': self.select_num, + 'excluded_slots': 0, + 'bond_inherited': 0, + } + + if bond_code and bond_rank: + effective_threshold = min(bond_rank, self.select_num) + excluded_slots = self.select_num - effective_threshold + info['effective_threshold'] = effective_threshold + info['excluded_slots'] = excluded_slots + info['bond_inherited'] = excluded_slots * unit_weight + + return info diff --git a/strategies/shared/signals/selectors.py b/strategies/shared/signals/selectors.py index 5b4ea2b..d5601cd 100644 --- a/strategies/shared/signals/selectors.py +++ b/strategies/shared/signals/selectors.py @@ -7,7 +7,7 @@ from framework.signals import SignalGenerator import pandas as pd import numpy as np -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Tuple class TopNSelector(SignalGenerator): @@ -59,17 +59,24 @@ class TopNSelector(SignalGenerator): self.rebalance_days = rebalance_days def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: - """生成Top N选股信号(支持调仓周期控制)""" + """生成Top N选股信号(支持调仓周期控制) + + 返回DataFrame包含: + - signal: 选中标的列表(逗号分隔) + - signal_ranks: 选中标的的排名列表(逗号分隔) + """ result = pd.DataFrame(index=factor_data.index) factor_cols = self._get_factor_columns(factor_data) if not factor_cols: result['signal'] = '' + result['signal_ranks'] = '' return result # Step 1: 每日目标组合(不考虑调仓周期) daily_target = [] + daily_ranks = [] for date in factor_data.index: row = factor_data.loc[date] @@ -86,16 +93,17 @@ class TopNSelector(SignalGenerator): # 分组选股或全局选股 if self.group_mapping: - selected = self._grouped_selection(scores) + selected, ranks = self._grouped_selection_with_ranks(scores) else: - selected = self._global_top_n(scores) + selected, ranks = self._global_top_n_with_ranks(scores) daily_target.append(','.join(selected) if selected else '') + daily_ranks.append(','.join(str(r) for r in ranks) if ranks else '') - # Step 2: 逐日生成信号(调仓周期控制) + result['signal_raw'] = daily_target + result['ranks_raw'] = daily_ranks signals = self._apply_rebalance_control(daily_target, factor_data) - result['signal_raw'] = daily_target # 每日目标组合 result['signal'] = signals # T+1执行:信号向后移位1天 @@ -105,16 +113,74 @@ class TopNSelector(SignalGenerator): def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: """获取因子列名""" - exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume'] + exclude_cols = ['signal', 'signal_raw', 'ranks_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume'] return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] - def _global_top_n(self, scores: Dict[str, float]) -> List[str]: - """全局Top N选股""" + def _global_top_n_with_ranks(self, scores: Dict[str, float]) -> Tuple[List[str], List[int]]: + """全局Top N选股(返回标的和排名)""" if not scores: - return [] + return [], [] - sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True) - return [item[0] for item in sorted_items[:self.select_num]] + # 计算全局排名 + all_sorted = sorted(scores.items(), key=lambda x: x[1], reverse=True) + rank_map = {code: rank + 1 for rank, (code, _) in enumerate(all_sorted)} + + # 选出Top N + selected = [item[0] for item in all_sorted[:self.select_num]] + ranks = [rank_map[code] for code in selected] + + return selected, ranks + + def _grouped_selection_with_ranks(self, scores: Dict[str, float]) -> Tuple[List[str], List[int]]: + """分组选股:返回标的和排名 + + 设计理念:短债作为"动态过滤阈值" + - 短债正常参与动量排序,没有任何特殊处理 + - 短债排名 <= select_num → 短债被选中,比短债弱的标的被排除 + - 短债排名 > select_num → 短债被排除(有更好的选择) + - effective_threshold = min(短债排名, select_num) + """ + if not scores: + return [], [] + + # 建立 group -> (code, score) 的映射 + group_champions = {} + for code, score in scores.items(): + group = self.group_mapping.get(code, 'default') + if group not in group_champions or score > group_champions[group][1]: + group_champions[group] = (code, score) + + # 计算全局动量排名 + all_sorted = sorted(scores.items(), key=lambda x: x[1], reverse=True) + rank_map = {code: rank + 1 for rank, (code, _) in enumerate(all_sorted)} + + # ⭐ 找出短债(BOND大类)的排名位置 + bond_rank = None + for group, (code, score) in group_champions.items(): + if group == 'BOND': + bond_rank = rank_map.get(code, len(all_sorted) + 1) + break + + # ⭐ 确定有效排名阈值 + if bond_rank is not None and bond_rank <= self.select_num: + effective_threshold = bond_rank + else: + effective_threshold = self.select_num + + # ⭐ 大类冠军过滤 + valid_champions = [] + for group, (code, score) in group_champions.items(): + rank = rank_map.get(code, len(all_sorted) + 1) + + if score >= self.min_score and rank <= effective_threshold: + valid_champions.append((code, score, rank)) + + # 对有效冠军按得分排序,选出Top N + sorted_champions = sorted(valid_champions, key=lambda x: x[1], reverse=True) + selected = [code for code, score, rank in sorted_champions[:self.select_num]] + ranks = [rank for code, score, rank in sorted_champions[:self.select_num]] + + return selected, ranks def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]: """应用调仓周期控制"""