feat: 短债动态阈值仓位分配机制

设计理念:
- 每份仓位 = 1/select_num
- 每个选中标的持有基础份额 1/select_num
- 被排除标的的份额归短债(BOND)继承

信号生成:
- generate()返回signal和signal_ranks
- _grouped_selection_with_ranks()返回标的和排名

仓位分配:
- DynamicThresholdAllocator.allocate()计算权重
- 短债继承被排除标的的份额

示例(短债排名2,select_num=3):
- NDX排名1 → 1/3(基础)
- 短债排名2 → 1/3(基础)+ 1/3(继承)= 2/3
- 排名3的份额被短债继承
This commit is contained in:
2026-05-18 23:07:21 +08:00
parent 2c69c6136a
commit c15e418ead
3 changed files with 200 additions and 12 deletions

View File

@@ -0,0 +1,10 @@
"""
仓位分配模块
提供多种仓位分配策略:
- DynamicThresholdAllocator: 短债动态阈值仓位分配
"""
from .dynamic_threshold import DynamicThresholdAllocator
__all__ = ['DynamicThresholdAllocator']

View File

@@ -0,0 +1,112 @@
"""
短债动态阈值仓位分配
设计理念:
1. 每份仓位 = 1/select_num
2. 每个选中标的持有基础份额 1/select_num
3. 被排除标的的份额归短债(BOND)继承
示例select_num=3, 短债排名2
- NDX排名1 → 1/3基础份额
- 短债排名2 → 1/3基础+ 1/3继承排名3的份额= 2/3
"""
from typing import Dict, List, Tuple
class DynamicThresholdAllocator:
"""短债动态阈值仓位分配器"""
def __init__(self, select_num: int, group_mapping: Dict[str, str] = None):
self.select_num = select_num
self.group_mapping = group_mapping or {}
def allocate(self, selected: List[str], ranks: List[int]) -> Dict[str, float]:
"""
计算仓位分配
Args:
selected: 选中标的列表
ranks: 对应排名列表
Returns:
{code: weight} 权重字典
"""
if not selected:
return {}
# 每份仓位
unit_weight = 1.0 / self.select_num
# 基础仓位:每个选中标的持有 1/select_num
weights = {code: unit_weight for code in selected}
# 找出短债(BOND大类)
bond_code = None
for code in selected:
if self.group_mapping.get(code) == 'BOND':
bond_code = code
break
# 计算被排除的份额数量
# effective_threshold = min(短债排名, select_num)
# 被排除份额 = select_num - effective_threshold
if bond_code and ranks:
bond_rank = ranks[selected.index(bond_code)]
effective_threshold = min(bond_rank, self.select_num)
excluded_slots = self.select_num - effective_threshold
# 被排除的份额归短债继承
if excluded_slots > 0:
weights[bond_code] += excluded_slots * unit_weight
# 确保权重总和为100%
total_weight = sum(weights.values())
if total_weight > 1.0:
# 如果超过100%,需要调整(极端情况)
for code in weights:
weights[code] /= total_weight
return weights
def get_position_info(self, selected: List[str], ranks: List[int]) -> Dict:
"""
获取详细仓位信息(用于报告)
Returns:
{
'weights': {code: weight},
'unit_weight': float,
'effective_threshold': int,
'excluded_slots': int,
'bond_inherited': float,
}
"""
weights = self.allocate(selected, ranks)
unit_weight = 1.0 / self.select_num
# 找短债
bond_code = None
bond_rank = None
for code in selected:
if self.group_mapping.get(code) == 'BOND':
bond_code = code
bond_rank = ranks[selected.index(code)]
break
info = {
'weights': weights,
'unit_weight': unit_weight,
'effective_threshold': self.select_num,
'excluded_slots': 0,
'bond_inherited': 0,
}
if bond_code and bond_rank:
effective_threshold = min(bond_rank, self.select_num)
excluded_slots = self.select_num - effective_threshold
info['effective_threshold'] = effective_threshold
info['excluded_slots'] = excluded_slots
info['bond_inherited'] = excluded_slots * unit_weight
return info

View File

@@ -7,7 +7,7 @@
from framework.signals import SignalGenerator from framework.signals import SignalGenerator
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any, Tuple
class TopNSelector(SignalGenerator): class TopNSelector(SignalGenerator):
@@ -59,17 +59,24 @@ class TopNSelector(SignalGenerator):
self.rebalance_days = rebalance_days self.rebalance_days = rebalance_days
def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame:
"""生成Top N选股信号支持调仓周期控制""" """生成Top N选股信号支持调仓周期控制
返回DataFrame包含:
- signal: 选中标的列表(逗号分隔)
- signal_ranks: 选中标的的排名列表(逗号分隔)
"""
result = pd.DataFrame(index=factor_data.index) result = pd.DataFrame(index=factor_data.index)
factor_cols = self._get_factor_columns(factor_data) factor_cols = self._get_factor_columns(factor_data)
if not factor_cols: if not factor_cols:
result['signal'] = '' result['signal'] = ''
result['signal_ranks'] = ''
return result return result
# Step 1: 每日目标组合(不考虑调仓周期) # Step 1: 每日目标组合(不考虑调仓周期)
daily_target = [] daily_target = []
daily_ranks = []
for date in factor_data.index: for date in factor_data.index:
row = factor_data.loc[date] row = factor_data.loc[date]
@@ -86,16 +93,17 @@ class TopNSelector(SignalGenerator):
# 分组选股或全局选股 # 分组选股或全局选股
if self.group_mapping: if self.group_mapping:
selected = self._grouped_selection(scores) selected, ranks = self._grouped_selection_with_ranks(scores)
else: else:
selected = self._global_top_n(scores) selected, ranks = self._global_top_n_with_ranks(scores)
daily_target.append(','.join(selected) if selected else '') daily_target.append(','.join(selected) if selected else '')
daily_ranks.append(','.join(str(r) for r in ranks) if ranks else '')
# Step 2: 逐日生成信号(调仓周期控制) result['signal_raw'] = daily_target
result['ranks_raw'] = daily_ranks
signals = self._apply_rebalance_control(daily_target, factor_data) signals = self._apply_rebalance_control(daily_target, factor_data)
result['signal_raw'] = daily_target # 每日目标组合
result['signal'] = signals result['signal'] = signals
# T+1执行信号向后移位1天 # T+1执行信号向后移位1天
@@ -105,16 +113,74 @@ class TopNSelector(SignalGenerator):
def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: def _get_factor_columns(self, data: pd.DataFrame) -> List[str]:
"""获取因子列名""" """获取因子列名"""
exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume'] exclude_cols = ['signal', 'signal_raw', 'ranks_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume']
return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')]
def _global_top_n(self, scores: Dict[str, float]) -> List[str]: def _global_top_n_with_ranks(self, scores: Dict[str, float]) -> Tuple[List[str], List[int]]:
"""全局Top N选股""" """全局Top N选股(返回标的和排名)"""
if not scores: if not scores:
return [] return [], []
sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True) # 计算全局排名
return [item[0] for item in sorted_items[:self.select_num]] all_sorted = sorted(scores.items(), key=lambda x: x[1], reverse=True)
rank_map = {code: rank + 1 for rank, (code, _) in enumerate(all_sorted)}
# 选出Top N
selected = [item[0] for item in all_sorted[:self.select_num]]
ranks = [rank_map[code] for code in selected]
return selected, ranks
def _grouped_selection_with_ranks(self, scores: Dict[str, float]) -> Tuple[List[str], List[int]]:
"""分组选股:返回标的和排名
设计理念:短债作为"动态过滤阈值"
- 短债正常参与动量排序,没有任何特殊处理
- 短债排名 <= select_num → 短债被选中,比短债弱的标的被排除
- 短债排名 > select_num → 短债被排除(有更好的选择)
- effective_threshold = min(短债排名, select_num)
"""
if not scores:
return [], []
# 建立 group -> (code, score) 的映射
group_champions = {}
for code, score in scores.items():
group = self.group_mapping.get(code, 'default')
if group not in group_champions or score > group_champions[group][1]:
group_champions[group] = (code, score)
# 计算全局动量排名
all_sorted = sorted(scores.items(), key=lambda x: x[1], reverse=True)
rank_map = {code: rank + 1 for rank, (code, _) in enumerate(all_sorted)}
# ⭐ 找出短债(BOND大类)的排名位置
bond_rank = None
for group, (code, score) in group_champions.items():
if group == 'BOND':
bond_rank = rank_map.get(code, len(all_sorted) + 1)
break
# ⭐ 确定有效排名阈值
if bond_rank is not None and bond_rank <= self.select_num:
effective_threshold = bond_rank
else:
effective_threshold = self.select_num
# ⭐ 大类冠军过滤
valid_champions = []
for group, (code, score) in group_champions.items():
rank = rank_map.get(code, len(all_sorted) + 1)
if score >= self.min_score and rank <= effective_threshold:
valid_champions.append((code, score, rank))
# 对有效冠军按得分排序选出Top N
sorted_champions = sorted(valid_champions, key=lambda x: x[1], reverse=True)
selected = [code for code, score, rank in sorted_champions[:self.select_num]]
ranks = [rank for code, score, rank in sorted_champions[:self.select_num]]
return selected, ranks
def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]: def _apply_rebalance_control(self, daily_target: List[str], factor_data: pd.DataFrame) -> List[str]:
"""应用调仓周期控制""" """应用调仓周期控制"""