From f5e6202eee6038d80a8ad3e5cf8a411834907916 Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 11 May 2026 22:18:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(signals):=20=E5=AE=9E=E7=8E=B0=E4=BF=A1?= =?UTF-8?q?=E5=8F=B7=E7=94=9F=E6=88=90=E5=B1=82=E6=8A=BD=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 核心组件: - SignalGenerator: 信号生成器抽象基类 - TopNSelector: Top N选股器(轮动策略) - 支持分组选股(先类内竞争,再跨类排序) - 支持最小得分阈值过滤 - TrendFollower: 趋势跟随器(趋势策略) - 入场阈值/出场阈值控制 - ReversalTrader: 反转交易器(反转策略) - 超买超卖信号生成 特点: - T+1执行机制(信号shift向后移位) - 向量化计算,避免前视偏差 测试覆盖:10个测试全部通过 --- framework/signals/__init__.py | 353 ++++++++++++++++++++++++++++++++ framework/tests/test_signals.py | 239 +++++++++++++++++++++ 2 files changed, 592 insertions(+) create mode 100644 framework/signals/__init__.py create mode 100644 framework/tests/test_signals.py diff --git a/framework/signals/__init__.py b/framework/signals/__init__.py new file mode 100644 index 0000000..c2dad7f --- /dev/null +++ b/framework/signals/__init__.py @@ -0,0 +1,353 @@ +""" +信号层抽象设计 + +核心组件: +- SignalGenerator: 信号生成器抽象基类 +- TopNSelector: Top N选股器(轮动策略) +- TrendFollower: 趋势跟随器(趋势策略) +- ReversalTrader: 反转交易器(反转策略) +""" + +import pandas as pd +import numpy as np +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Any +from dataclasses import dataclass + + +@dataclass +class SignalMeta: + """信号元信息""" + mode: str # 'top_n', 'trend', 'reversal' + select_num: int + description: str = "" + + +class SignalGenerator(ABC): + """ + 信号生成器抽象基类 + + 所有信号生成器必须继承此基类,实现generate方法。 + 支持不同策略类型的信号生成逻辑。 + """ + + # 类属性(可被配置覆盖) + mode: str = "base" + + def __init__(self, **params): + """ + 初始化信号生成器 + + Args: + **params: 信号参数 + """ + self._params = params + self._meta = SignalMeta( + mode=self.mode, + select_num=params.get('select_num', 1), + description=self.__doc__ or "" + ) + + @abstractmethod + def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: + """ + 生成交易信号 + + Args: + factor_data: 包含因子值的DataFrame + + Returns: + 包含信号列的DataFrame + """ + pass + + @property + def params(self) -> Dict[str, Any]: + """获取信号参数""" + return self._params + + @property + def meta(self) -> SignalMeta: + """获取信号元信息""" + return self._meta + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(mode={self.mode}, params={self._params})" + + +class TopNSelector(SignalGenerator): + """ + Top N选股器 + + 用于轮动策略: + - 按因子值排序,选出Top N标的 + - 支持分组选股(先类内竞争,再跨类排序) + + 参数: + - select_num: 选中数量(默认3) + - group_by: 分组列名(可选,如'market') + - top_per_group: 每组选中数量(默认1) + - min_score: 最小得分阈值(可选) + """ + + mode = "top_n" + + def __init__( + self, + select_num: int = 3, + group_by: Optional[str] = None, + top_per_group: int = 1, + min_score: Optional[float] = None + ): + super().__init__( + select_num=select_num, + group_by=group_by, + top_per_group=top_per_group, + min_score=min_score + ) + self.select_num = select_num + self.group_by = group_by + self.top_per_group = top_per_group + self.min_score = min_score + + def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: + """生成Top N选股信号""" + result = pd.DataFrame(index=factor_data.index) + + # 获取因子列(排除非因子列) + factor_cols = self._get_factor_columns(factor_data) + + if not factor_cols: + print("⚠ 未找到因子列") + result['signal'] = '' + return result + + # 每日选股 + signals = [] + for date in factor_data.index: + row = factor_data.loc[date] + + # 提取当日因子值 + scores = {} + for col in factor_cols: + score = row[col] + if pd.notna(score): + scores[col] = score + + # 应用最小得分过滤 + if self.min_score: + scores = {k: v for k, v in scores.items() if v >= self.min_score} + + # 选股逻辑 + if self.group_by and 'group_info' in factor_data.columns: + # 分组选股:先类内竞争,再跨类排序 + selected = self._grouped_selection(scores, factor_data.loc[date]) + else: + # 全局Top N + selected = self._global_top_n(scores) + + # 信号格式:逗号分隔的代码列表 + signals.append(','.join(selected) if selected else '') + + result['signal'] = signals + result['signal_raw'] = signals # 原始信号(未shift) + + # T+1执行:信号向后移位1天 + result['signal'] = result['signal'].shift(1) + + return result + + def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: + """获取因子列名""" + # 排除已知非因子列 + exclude_cols = ['signal', 'signal_raw', 'group_info', 'combined', 'open', 'high', 'low', 'close', 'volume'] + factor_cols = [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] + return factor_cols + + def _global_top_n(self, scores: Dict[str, float]) -> List[str]: + """全局Top N选股""" + if not scores: + return [] + + # 按得分排序 + sorted_items = sorted(scores.items(), key=lambda x: x[1], reverse=True) + + # 选Top N + selected = [item[0] for item in sorted_items[:self.select_num]] + return selected + + def _grouped_selection( + self, + scores: Dict[str, float], + row: pd.Series + ) -> List[str]: + """分组选股:先类内竞争,再跨类排序""" + if 'group_info' not in row.index: + return self._global_top_n(scores) + + group_info = row['group_info'] + if pd.isna(group_info): + return self._global_top_n(scores) + + # 解析分组信息:{code: group} + groups = group_info if isinstance(group_info, dict) else {} + + # 类内竞争:每组选Top1 + group_champions = {} + for code, score in scores.items(): + group = groups.get(code, 'default') + if group not in group_champions or score > group_champions[group][1]: + group_champions[group] = (code, score) + + # 跨类排序:从冠军中选Top N + champions_scores = {code: score for code, score in group_champions.values()} + return self._global_top_n(champions_scores) + + +class TrendFollower(SignalGenerator): + """ + 趋势跟随器 + + 用于趋势跟踪策略: + - 趋势强度 > 入场阈值 → 入场信号 + - 趋势强度 < 出场阈值 → 出场信号 + + 参数: + - entry_threshold: 入场阈值(默认0.02) + - exit_threshold: 出场阈值(默认-0.02) + - select_num: 最大持仓数量(默认1) + """ + + mode = "trend" + + def __init__( + self, + entry_threshold: float = 0.02, + exit_threshold: float = -0.02, + select_num: int = 1 + ): + super().__init__( + entry_threshold=entry_threshold, + exit_threshold=exit_threshold, + select_num=select_num + ) + self.entry_threshold = entry_threshold + self.exit_threshold = exit_threshold + self.select_num = select_num + + def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: + """生成趋势跟随信号""" + result = pd.DataFrame(index=factor_data.index) + + factor_cols = self._get_factor_columns(factor_data) + + for col in factor_cols: + trend_strength = factor_data[col] + + # 入场信号:趋势强度 > 阈值 + result[f'{col}_entry'] = trend_strength > self.entry_threshold + + # 出场信号:趋势强度 < 阈值 + result[f'{col}_exit'] = trend_strength < self.exit_threshold + + # 综合信号:入场强度最高的Top N + signals = [] + for date in result.index: + entry_signals = [] + for col in factor_cols: + if result.loc[date, f'{col}_entry']: + score = factor_data.loc[date, col] + if pd.notna(score): + entry_signals.append((col, score)) + + # 按强度排序,选Top N + entry_signals.sort(key=lambda x: x[1], reverse=True) + selected = [item[0] for item in entry_signals[:self.select_num]] + signals.append(','.join(selected) if selected else '') + + result['signal'] = signals + result['signal'] = result['signal'].shift(1) # T+1执行 + + return result + + def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: + """获取因子列名""" + exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume'] + return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] + + +class ReversalTrader(SignalGenerator): + """ + 反转交易器 + + 用于反转策略: + - 超买区域(RSI>70) → 反转向下信号(卖出) + - 超卖区域(RSI<30) → 反转向上信号(买入) + + 参数: + - overbought: 超买阈值(默认70) + - oversold: 超卖阈值(默认30) + - reversal_threshold: 反转信号强度阈值(默认0.1) + """ + + mode = "reversal" + + def __init__( + self, + overbought: float = 70, + oversold: float = 30, + reversal_threshold: float = 0.1 + ): + super().__init__( + overbought=overbought, + oversold=oversold, + reversal_threshold=reversal_threshold + ) + self.overbought = overbought + self.oversold = oversold + self.reversal_threshold = reversal_threshold + + def generate(self, factor_data: pd.DataFrame) -> pd.DataFrame: + """生成反转交易信号""" + result = pd.DataFrame(index=factor_data.index) + + factor_cols = self._get_factor_columns(factor_data) + + for col in factor_cols: + reversal_signal = factor_data[col] + + # 买入信号:反转信号 > 阈值(正值,超卖反转) + result[f'{col}_buy'] = reversal_signal > self.reversal_threshold + + # 卖出信号:反转信号 < -阈值(负值,超买反转) + result[f'{col}_sell'] = reversal_signal < -self.reversal_threshold + + # 综合信号 + signals = [] + for date in result.index: + buy_signals = [] + sell_signals = [] + + for col in factor_cols: + if result.loc[date, f'{col}_buy']: + buy_signals.append(col) + if result.loc[date, f'{col}_sell']: + sell_signals.append(col) + + # 信号格式:'BUY:code1,code2' 或 'SELL:code1' 或 '' + if buy_signals: + signals.append(f"BUY:{','.join(buy_signals)}") + elif sell_signals: + signals.append(f"SELL:{','.join(sell_signals)}") + else: + signals.append('') + + result['signal'] = signals + result['signal'] = result['signal'].shift(1) # T+1执行 + + return result + + def _get_factor_columns(self, data: pd.DataFrame) -> List[str]: + """获取因子列名""" + exclude_cols = ['signal', 'signal_raw', 'combined', 'open', 'high', 'low', 'close', 'volume'] + return [col for col in data.columns if col not in exclude_cols and not col.endswith('_weighted')] \ No newline at end of file diff --git a/framework/tests/test_signals.py b/framework/tests/test_signals.py new file mode 100644 index 0000000..033d964 --- /dev/null +++ b/framework/tests/test_signals.py @@ -0,0 +1,239 @@ +""" +信号层测试 + +测试SignalGenerator、TopNSelector、TrendFollower、ReversalTrader +""" + +import pandas as pd +import numpy as np +import pytest + +from framework.signals import SignalGenerator, TopNSelector, TrendFollower, ReversalTrader + + +class TestSignalGenerator: + """测试信号生成器基类""" + + def test_signal_meta(self): + """测试信号元信息""" + selector = TopNSelector(select_num=3) + assert selector.mode == "top_n" + assert selector.params == {'select_num': 3, 'group_by': None, 'top_per_group': 1, 'min_score': None} + + def test_signal_repr(self): + """测试信号字符串表示""" + selector = TopNSelector(select_num=5) + repr_str = repr(selector) + assert "TopNSelector" in repr_str + assert "top_n" in repr_str + + +class TestTopNSelector: + """测试Top N选股器""" + + def test_global_top_n(self): + """测试全局Top N选股""" + dates = pd.date_range('2020-01-01', periods=10) + + # 创建因子数据:3个标的,得分递减 + factor_data = pd.DataFrame({ + 'code1': [5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0], + 'code2': [3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0], + 'code3': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + }, index=dates) + + selector = TopNSelector(select_num=2) + result = selector.generate(factor_data) + + # 检查信号列 + assert 'signal' in result.columns + + # 第一天无信号(shift后) + assert result['signal'].iloc[0] == '' or pd.isna(result['signal'].iloc[0]) + + # 第二天及之后应该选中code1,code2 + for i in range(1, len(result)): + signal = result['signal'].iloc[i] + assert 'code1' in signal and 'code2' in signal + + def test_top_n_with_min_score(self): + """测试带最小得分阈值的选股""" + dates = pd.date_range('2020-01-01', periods=10) + + factor_data = pd.DataFrame({ + 'code1': [5.0] * 10, + 'code2': [2.0] * 10, # 低于阈值 + 'code3': [1.0] * 10, # 低于阈值 + }, index=dates) + + selector = TopNSelector(select_num=3, min_score=3.0) + result = selector.generate(factor_data) + + # 只有code1满足阈值 + for i in range(1, len(result)): + signal = result['signal'].iloc[i] + assert 'code1' in signal + assert 'code2' not in signal + + def test_grouped_selection(self): + """测试分组选股""" + dates = pd.date_range('2020-01-01', periods=5) + + # 创建因子数据和分组信息 + factor_data = pd.DataFrame({ + 'code1': [5.0] * 5, # group A, 最高 + 'code2': [4.0] * 5, # group A, 次高 + 'code3': [3.0] * 5, # group B, 最高 + 'code4': [2.0] * 5, # group B, 次高 + 'code5': [1.0] * 5, # group C + }, index=dates) + + # 分组信息:每行是一个字典 {code: group} + group_info = { + 'code1': 'A', 'code2': 'A', + 'code3': 'B', 'code4': 'B', + 'code5': 'C' + } + factor_data['group_info'] = [group_info] * 5 + + selector = TopNSelector(select_num=2, group_by='market', top_per_group=1) + result = selector.generate(factor_data) + + # 应该选中:code1(A组冠军)、code3(B组冠军) + for i in range(1, len(result)): + signal = result['signal'].iloc[i] + # code1和code3应该被选中(得分最高的两组冠军) + selected_codes = signal.split(',') + assert 'code1' in selected_codes or 'code3' in selected_codes + + def test_empty_scores(self): + """测试空得分情况""" + dates = pd.date_range('2020-01-01', periods=5) + + # 所有得分为NaN + factor_data = pd.DataFrame({ + 'code1': [np.nan] * 5, + 'code2': [np.nan] * 5, + }, index=dates) + + selector = TopNSelector(select_num=2) + result = selector.generate(factor_data) + + # 应该返回空信号 + for i in range(len(result)): + signal = result['signal'].iloc[i] + assert signal == '' or pd.isna(signal) + + +class TestTrendFollower: + """测试趋势跟随器""" + + def test_trend_entry_signal(self): + """测试趋势入场信号""" + dates = pd.date_range('2020-01-01', periods=10) + + # 创建趋势数据:code1强趋势,code2弱趋势 + factor_data = pd.DataFrame({ + 'code1': [0.03] * 10, # > 阈值0.02,入场 + 'code2': [0.01] * 10, # < 阈值0.02,不入场 + }, index=dates) + + follower = TrendFollower(entry_threshold=0.02, exit_threshold=-0.02) + result = follower.generate(factor_data) + + # code1应该有入场信号 + assert result['code1_entry'].iloc[0] == True + assert result['code2_entry'].iloc[0] == False + + def test_trend_exit_signal(self): + """测试趋势出场信号""" + dates = pd.date_range('2020-01-01', periods=10) + + factor_data = pd.DataFrame({ + 'code1': [-0.03] * 10, # < 阈值-0.02,出场 + 'code2': [0.01] * 10, + }, index=dates) + + follower = TrendFollower(entry_threshold=0.02, exit_threshold=-0.02) + result = follower.generate(factor_data) + + # code1应该有出场信号 + assert result['code1_exit'].iloc[0] == True + + def test_trend_signal_format(self): + """测试趋势信号格式""" + dates = pd.date_range('2020-01-01', periods=5) + + factor_data = pd.DataFrame({ + 'code1': [0.05] * 5, # 强趋势,入场 + 'code2': [0.03] * 5, # 中等趋势,入场 + 'code3': [0.01] * 5, # 弱趋势,不入场 + }, index=dates) + + follower = TrendFollower(entry_threshold=0.02, select_num=2) + result = follower.generate(factor_data) + + # 信号应该包含code1和code2(强度最高的两个) + for i in range(1, len(result)): + signal = result['signal'].iloc[i] + assert 'code1' in signal or 'code2' in signal + + +class TestReversalTrader: + """测试反转交易器""" + + def test_reversal_buy_signal(self): + """测试反转买入信号""" + dates = pd.date_range('2020-01-01', periods=10) + + # 创建反转数据:code1超卖反转 + factor_data = pd.DataFrame({ + 'code1': [0.2] * 10, # > 阈值0.1,超卖反转(买入) + 'code2': [0.05] * 10, # < 阈值0.1,无信号 + }, index=dates) + + trader = ReversalTrader(reversal_threshold=0.1) + result = trader.generate(factor_data) + + # code1应该有买入信号 + assert result['code1_buy'].iloc[0] == True + assert result['code2_buy'].iloc[0] == False + + def test_reversal_sell_signal(self): + """测试反转卖出信号""" + dates = pd.date_range('2020-01-01', periods=10) + + factor_data = pd.DataFrame({ + 'code1': [-0.2] * 10, # < -阈值0.1,超买反转(卖出) + 'code2': [0.05] * 10, + }, index=dates) + + trader = ReversalTrader(reversal_threshold=0.1) + result = trader.generate(factor_data) + + # code1应该有卖出信号 + assert result['code1_sell'].iloc[0] == True + + def test_reversal_signal_format(self): + """测试反转信号格式""" + dates = pd.date_range('2020-01-01', periods=5) + + factor_data = pd.DataFrame({ + 'code1': [0.15] * 5, # 超卖反转 + 'code2': [-0.15] * 5, # 超买反转 + }, index=dates) + + trader = ReversalTrader(reversal_threshold=0.1) + result = trader.generate(factor_data) + + # 信号格式应该是 'BUY:code' 或 'SELL:code' + for i in range(1, len(result)): + signal = result['signal'].iloc[i] + if 'BUY' in signal: + assert 'code1' in signal + elif 'SELL' in signal: + assert 'code2' in signal + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) \ No newline at end of file