diff --git a/factor_mining/gp_miner.py b/factor_mining/gp_miner.py index 7302f01..e71ef3f 100644 --- a/factor_mining/gp_miner.py +++ b/factor_mining/gp_miner.py @@ -1,6 +1,7 @@ """ DEAP遗传编程挖掘器实现 """ + import random import operator from typing import List, Tuple, Optional @@ -17,6 +18,7 @@ from data import compute_forward_returns @dataclass class GPConfig(MiningConfig): """GP挖掘配置""" + population_size: int = 200 generations: int = 30 tournament_size: int = 5 @@ -30,144 +32,142 @@ class GPConfig(MiningConfig): class GPMiner(FactorMiner): """DEAP遗传编程挖掘器""" - + def __init__(self, config: GPConfig): super().__init__(config) self.config: GPConfig = config self.toolbox: Optional[base.Toolbox] = None self.pset: Optional[gp.PrimitiveSetTyped] = None self.features: Optional[List[pd.Series]] = None - + def get_name(self) -> str: return "gp" - + def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped: """构建GP原始集合""" registry = get_registry() - pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray) - + pset = gp.PrimitiveSetTyped( + "MAIN", [np.ndarray for _ in feature_names], np.ndarray + ) + # 命名参数 for i, name in enumerate(feature_names): pset.renameArguments(**{f"ARG{i}": name}) - + # 添加算子 for op_name in registry.list_all(): op = registry.get(op_name) if op: sig = op.get_signature() params = list(sig.parameters.values()) - + # 根据参数数量判断是一元还是二元算子 if len(params) == 1: # 一元算子 pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name) elif len(params) == 2: # 二元算子 - pset.addPrimitive(op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name) - + pset.addPrimitive( + op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name + ) + # 添加常量 - def _const() -> np.ndarray: - return np.array(random.uniform(-2.0, 2.0)) - pset.addEphemeralConstant("const", _const, np.ndarray) - + # def _const() -> np.ndarray: + # return np.array(random.uniform(-2.0, 2.0)) + # pset.addEphemeralConstant("const", _const, np.ndarray) + return pset - - def _evaluate_individual( - self, - individual, - target: pd.Series - ) -> Tuple[float]: + + def _evaluate_individual(self, individual, target: pd.Series) -> Tuple[float]: """评估个体适应度""" func = self.toolbox.compile(expr=individual) - + # 构建特征矩阵 idx = target.index inputs = [f.reindex(idx).to_numpy() for f in self.features] - + try: raw = func(*inputs) except Exception: return (-1e6,) - + # 确保数组长度 if not isinstance(raw, np.ndarray): return (-1e6,) if raw.shape[0] != len(idx): return (-1e6,) - + # 转换为Series并清理 factor = pd.Series(raw, index=idx) factor = factor.replace([np.inf, -np.inf], np.nan) factor = factor.ffill().bfill() - + # 计算滚动IC window = self.config.ic_window if len(factor) < window + 10: return (-1e6,) - + from validation import compute_rolling_ic - ic_series = compute_rolling_ic(factor, target, window=window, method=self.config.ic_method) + + ic_series = compute_rolling_ic( + factor, target, window=window, method=self.config.ic_method + ) mean_ic = ic_series.mean() - + if not np.isfinite(mean_ic): return (-1e6,) - + # 复杂度惩罚 complexity = len(individual) fitness = mean_ic - self.config.complexity_penalty * complexity - + if not np.isfinite(fitness): fitness = -1e6 - + return (fitness,) - + def _individual_to_formula( - self, - individual, - feature_names: List[str] + self, individual, feature_names: List[str] ) -> FactorFormula: """将GP个体转换为因子公式""" # GP表达式是PrimitiveTree,转换为字符串后是函数调用形式 # 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)" expr_str = str(individual) - + # 替换ARG0, ARG1等为实际特征名 for i, name in enumerate(feature_names): expr_str = expr_str.replace(f"ARG{i}", name) - + # GP表达式已经是Python可执行的函数调用格式 # 例如: "add(close, open)" 可以直接eval # 但需要确保所有算子都在环境中可用 - + return FactorFormula(expr_str, feature_names) - + def mine( - self, - data: pd.DataFrame, - feature_cols: List[str], - price_col: str = "close" + self, data: pd.DataFrame, feature_cols: List[str], price_col: str = "close" ) -> List[FactorFormula]: """执行GP挖掘""" if self.config.seed is not None: random.seed(self.config.seed) np.random.seed(self.config.seed) - + # 准备数据 price = data[price_col].astype(float) forward_ret = compute_forward_returns(price, self.config.ret_horizon) target = forward_ret - + self.features = [data[c].astype(float) for c in feature_cols] - + # 构建原始集合 self.pset = self._build_pset(feature_cols) - + # 创建DEAP类型 if not hasattr(creator, "FitnessMax"): creator.create("FitnessMax", base.Fitness, weights=(1.0,)) if not hasattr(creator, "Individual"): creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) - + # 构建工具箱 self.toolbox = base.Toolbox() self.toolbox.register( @@ -175,38 +175,46 @@ class GPMiner(FactorMiner): gp.genHalfAndHalf, pset=self.pset, min_=1, - max_=self.config.max_depth_init + max_=self.config.max_depth_init, ) - self.toolbox.register("individual", tools.initIterate, creator.Individual, self.toolbox.expr) - self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) - self.toolbox.register("compile", gp.compile, pset=self.pset) - self.toolbox.register( - "evaluate", - self._evaluate_individual, - target=target + "individual", tools.initIterate, creator.Individual, self.toolbox.expr ) - + self.toolbox.register( + "population", tools.initRepeat, list, self.toolbox.individual + ) + self.toolbox.register("compile", gp.compile, pset=self.pset) + + self.toolbox.register("evaluate", self._evaluate_individual, target=target) + # 遗传算子 - self.toolbox.register("select", tools.selTournament, tournsize=self.config.tournament_size) + self.toolbox.register( + "select", tools.selTournament, tournsize=self.config.tournament_size + ) self.toolbox.register("mate", gp.cxOnePoint) self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) - self.toolbox.register("mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset) - + self.toolbox.register( + "mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset + ) + # 控制树深度 self.toolbox.decorate( "mate", - gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth) + gp.staticLimit( + key=operator.attrgetter("height"), max_value=self.config.max_depth + ), ) self.toolbox.decorate( "mutate", - gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth) + gp.staticLimit( + key=operator.attrgetter("height"), max_value=self.config.max_depth + ), ) - + # 运行进化 pop = self.toolbox.population(n=self.config.population_size) - hof = tools.HallOfFame(maxsize=max(5, self.config.elitism)) - + hof = tools.HallOfFame(maxsize=max(5000, self.config.elitism)) + stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0]) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) @@ -214,7 +222,7 @@ class GPMiner(FactorMiner): mstats.register("std", np.nanstd) mstats.register("min", np.nanmin) mstats.register("max", np.nanmax) - + pop, logbook = algorithms.eaSimple( pop, self.toolbox, @@ -225,12 +233,11 @@ class GPMiner(FactorMiner): halloffame=hof, verbose=True, ) - + # 转换为因子公式 formulas = [] for individual in hof: formula = self._individual_to_formula(individual, feature_cols) formulas.append(formula) - - return formulas + return formulas diff --git a/factor_mining/operators.py b/factor_mining/operators.py index 0707b17..2b31ada 100644 --- a/factor_mining/operators.py +++ b/factor_mining/operators.py @@ -9,6 +9,8 @@ from typing import Dict, Callable, List, Optional, Any from abc import ABC, abstractmethod import inspect +import talib + class Operator(ABC): """算子基类""" @@ -99,6 +101,9 @@ def get_registry() -> OperatorRegistry: return _registry +# 定义period参数的值范围 +PERIOD_RANGE = range(10, 100) # 10到99 + # ==================== 基础数学算子 ==================== @@ -153,8 +158,6 @@ def _pow(x: np.ndarray, y: np.ndarray) -> np.ndarray: # ==================== 时间序列算子 ==================== - - def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy() @@ -184,8 +187,20 @@ def _delay(x: np.ndarray, period: int) -> np.ndarray: return s.shift(period).to_numpy() +def _pct_change(x: np.ndarray, period: int = 1) -> np.ndarray: + """百分比变化""" + s = pd.Series(x) + return s.pct_change(periods=period, fill_method=None).to_numpy() + + +# 注册单参数百分比变化算子 +@register_operator("pct", "百分比变化: PCT(x, 1)") +def _pct(x: np.ndarray) -> np.ndarray: + return _pct_change(x, 1) + + # 注册时间序列算子(带不同窗口) -for w in (3, 6, 12, 24, 48, 96): +for w in PERIOD_RANGE: _registry.register_function( f"sma{w}", lambda x, w=w: _rolling_mean(x, w), f"简单移动平均: SMA(x, {w})" ) @@ -203,6 +218,347 @@ for w in (3, 6, 12, 24, 48, 96): ) +# ==================== 技术指标算子(含自定义与ta-lib)==================== + + +def _try_float(x): + try: + return float(x) + except Exception: + return x + + +def _convert_input(v): + # 如果是pd.Series,返回np.ndarray; 如果已经是np.ndarray则原样返回 + if isinstance(v, pd.Series): + return v.values + return v + + +# 注册 ta-lib 技术指标 +# 获取 TA-Lib 的所有函数名(常用financial indicators均为大写) +talib_func_list = [f for f in dir(talib) if f.isupper() and callable(getattr(talib, f))] + +# 定义需要生成多版本的参数名(period相关参数) +# 按优先级排序,优先匹配主要的period参数 +PERIOD_PARAM_NAMES = [ + "timeperiod", # 最常见的参数名 + "period", # 通用period参数 + "optintimeperiod", # TA-Lib内部参数名 +] + +# 多period参数的函数(需要特殊处理) +# 对于这些函数,明确指定主要period参数,避免自动检测错误 +MULTI_PERIOD_FUNCTIONS = { + # 函数名: (主要period参数名, 次要period参数列表,仅用于文档) + "MACD": ("fastperiod", ["slowperiod", "signalperiod"]), + "MACDEXT": ("fastperiod", ["slowperiod", "signalperiod"]), + "MACDFIX": ("signalperiod", []), + "STOCH": ("fastk_period", ["slowk_period", "slowd_period"]), + "STOCHF": ("fastk_period", ["fastd_period"]), + "STOCHRSI": ("timeperiod", ["fastk_period", "fastd_period"]), + "BBANDS": ("timeperiod", ["nbdevup", "nbdevdn"]), + "APO": ("fastperiod", ["slowperiod"]), + "PPO": ("fastperiod", ["slowperiod"]), + "ULTOSC": ("timeperiod1", ["timeperiod2", "timeperiod3"]), + "BOP": ("", []), # 无period参数,注册默认版本 +} + + +def build_talib_wrapper(func, func_name, fixed_params=None): + """构建talib函数包装器,支持固定某些参数""" + fixed_params = fixed_params or {} + + def _talib_wrap(*args, **kwargs): + # 合并固定参数和传入参数 + merged_kwargs = {**fixed_params, **kwargs} + # ta-lib 有些函数只支持关键字参数 + # 自动转换所有输入类型 + args = tuple(_convert_input(arg) for arg in args) + for k in merged_kwargs: + merged_kwargs[k] = _convert_input(merged_kwargs[k]) + result = func(*args, **merged_kwargs) + # TA-Lib有些输出是tuple(比如MACD),统一返回ndarray/tuple[ndarray] + if isinstance(result, tuple): + # 保持tuple结构 + return tuple( + np.asarray(item) if item is not None else None for item in result + ) + return np.asarray(result) + + _talib_wrap.__name__ = f"talib_{func_name.lower()}" + return _talib_wrap + + +for func_name in talib_func_list: + func = getattr(talib, func_name) + sig = inspect.signature(func) + params = sig.parameters + + # 检查是否在特殊配置字典中 + if func_name in MULTI_PERIOD_FUNCTIONS: + main_period_param, _ = MULTI_PERIOD_FUNCTIONS[func_name] + # 如果配置中指定了主要period参数,使用它 + if main_period_param and main_period_param in params: + for period_value in PERIOD_RANGE: + fixed_params = {main_period_param: period_value} + wrapper = build_talib_wrapper(func, func_name, fixed_params) + op_name = f"talib_{func_name.lower()}_{period_value}" + desc = f"ta-lib: {func_name}({main_period_param}={period_value})" + _registry.register_function(op_name, wrapper, desc) + else: + # 配置中指定无period参数,注册默认版本 + wrapper = build_talib_wrapper(func, func_name) + op_name = f"talib_{func_name.lower()}" + desc = f"ta-lib: {func_name}" + _registry.register_function(op_name, wrapper, desc) + else: + # 不在特殊配置中,自动检测period参数 + period_params = {} + for param_name, param in params.items(): + param_lower = param_name.lower() + # 检查是否是period相关参数 + if any( + period_keyword in param_lower for period_keyword in PERIOD_PARAM_NAMES + ): + period_params[param_name] = param + + if period_params: + # 如果有period参数,为每个period值生成一个版本 + # 优先选择timeperiod,否则选择第一个 + main_period_param = None + for preferred in ["timeperiod", "period", "optintimeperiod"]: + for param_name in period_params.keys(): + if preferred in param_name.lower(): + main_period_param = param_name + break + if main_period_param: + break + + if not main_period_param: + main_period_param = list(period_params.keys())[0] + + for period_value in PERIOD_RANGE: + fixed_params = {main_period_param: period_value} + wrapper = build_talib_wrapper(func, func_name, fixed_params) + op_name = f"talib_{func_name.lower()}_{period_value}" + desc = f"ta-lib: {func_name}({main_period_param}={period_value})" + _registry.register_function(op_name, wrapper, desc) + else: + # 如果没有period参数,注册默认版本 + wrapper = build_talib_wrapper(func, func_name) + op_name = f"talib_{func_name.lower()}" + desc = f"ta-lib: {func_name}" + _registry.register_function(op_name, wrapper, desc) + +# ==================== 自定义常见技术指标 ==================== + + +def _ewm_forward(x: np.ndarray, alpha: float) -> np.ndarray: + """指数加权移动平均(前向计算)""" + result = np.zeros_like(x) + if len(x) == 0: + return result + result[0] = x[0] + for i in range(1, len(x)): + result[i] = x[i] * alpha + (1 - alpha) * result[i - 1] + return result + + +def _rsv(x: np.ndarray, window: int) -> np.ndarray: + """相对强弱值: (当前值 - 最小值) / (最大值 - 最小值)""" + s = pd.Series(x) + rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both") + min_val = rolling.min() + max_val = rolling.max() + diff = max_val - min_val + # 避免除零 + diff = np.where(np.abs(diff) < 1e-12, np.nan, diff) + result = (s - min_val) / diff + return result.to_numpy() + + +def _bband(x: np.ndarray, window: int) -> np.ndarray: + """布林带指标: (当前值 - 均值) / 标准差""" + s = pd.Series(x) + rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both") + mean_val = rolling.mean() + std_val = rolling.std() + # 避免除零 + std_val = np.where(np.abs(std_val) < 1e-12, np.nan, std_val) + result = (s - mean_val) / std_val + return result.to_numpy() + + +def _rsi(x: np.ndarray, window: int, threshold: float = 0.00001) -> np.ndarray: + """相对强弱指标: 上涨和下跌的比例""" + s = pd.Series(x) + diff = s.diff() + rolling = diff.rolling(window, min_periods=max(2, window // 2), closed="both") + + def _rsi_calc(series): + up_sum = series[series > threshold].sum() + down_sum = abs(series[series < -threshold].sum()) + total = up_sum + down_sum + if total < 1e-12: + return np.nan + return up_sum / total + + result = rolling.apply(_rsi_calc, raw=False) + return result.to_numpy() + + +def _rolling_skew(x: np.ndarray, window: int) -> np.ndarray: + """滚动偏度""" + s = pd.Series(x) + return ( + s.rolling(window, min_periods=max(2, window // 2), closed="both") + .skew() + .to_numpy() + ) + + +def _rolling_kurtosis(x: np.ndarray, window: int) -> np.ndarray: + """滚动峰度""" + s = pd.Series(x) + return ( + s.rolling(window, min_periods=max(2, window // 2), closed="both") + .kurt() + .to_numpy() + ) + + +def _rolling_linear(x: np.ndarray, window: int) -> np.ndarray: + """滚动线性回归斜率""" + s = pd.Series(x) + + def _linear_slope(series): + valid = series.dropna() + if len(valid) < 2: + return np.nan + try: + coeffs = np.polyfit(np.arange(len(valid)), valid.values, 1) + return coeffs[0] + except: + return np.nan + + result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply( + _linear_slope, raw=False + ) + return result.to_numpy() + + +def _rolling_autocorr(x: np.ndarray, window: int, lag: int = 1) -> np.ndarray: + """滚动自相关""" + s = pd.Series(x) + result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply( + lambda series: ( + series.autocorr(lag=lag) if len(series.dropna()) >= 2 else np.nan + ), + raw=False, + ) + return result.to_numpy() + + +def _rolling_max(x: np.ndarray, window: int) -> np.ndarray: + """滚动最大值""" + s = pd.Series(x) + return ( + s.rolling(window, min_periods=max(2, window // 2), closed="both") + .max() + .to_numpy() + ) + + +def _rolling_min(x: np.ndarray, window: int) -> np.ndarray: + """滚动最小值""" + s = pd.Series(x) + return ( + s.rolling(window, min_periods=max(2, window // 2), closed="both") + .min() + .to_numpy() + ) + + +def _huanbi(x: np.ndarray, window: int) -> np.ndarray: + """环比: 当前值 / 窗口起始值""" + s = pd.Series(x) + + def _huanbi_calc(series): + if len(series) < 2: + return np.nan + start_val = series.iloc[0] + end_val = series.iloc[-1] + if abs(start_val) < 1e-12: + return np.nan + return end_val / start_val + + result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply( + _huanbi_calc, raw=False + ) + return result.to_numpy() + + +# 注册技术指标算子(带不同窗口) +for w in PERIOD_RANGE: + # EWM算子(使用固定alpha值) + alpha = 2.0 / (w + 1) + _registry.register_function( + f"ewm{w}", + lambda x, w=w, a=alpha: _ewm_forward(x, a), + f"指数加权移动平均: EWM(x, {w})", + ) + + # 百分比变化 + _registry.register_function( + f"pct{w}", lambda x, w=w: _pct_change(x, w), f"百分比变化: PCT(x, {w})" + ) + + # RSV(相对强弱值) + _registry.register_function( + f"rsv{w}", lambda x, w=w: _rsv(x, w), f"相对强弱值: RSV(x, {w})" + ) + + # 布林带 + _registry.register_function( + f"bband{w}", lambda x, w=w: _bband(x, w), f"布林带指标: BBAND(x, {w})" + ) + + # RSI + _registry.register_function( + f"rsi{w}", lambda x, w=w: _rsi(x, w), f"相对强弱指标: RSI(x, {w})" + ) + + # 统计量 + _registry.register_function( + f"skew{w}", lambda x, w=w: _rolling_skew(x, w), f"滚动偏度: SKEW(x, {w})" + ) + _registry.register_function( + f"kurt{w}", lambda x, w=w: _rolling_kurtosis(x, w), f"滚动峰度: KURT(x, {w})" + ) + _registry.register_function( + f"linear{w}", + lambda x, w=w: _rolling_linear(x, w), + f"滚动线性斜率: LINEAR(x, {w})", + ) + _registry.register_function( + f"autocorr{w}", + lambda x, w=w: _rolling_autocorr(x, w), + f"滚动自相关: AUTOCORR(x, {w})", + ) + _registry.register_function( + f"max{w}", lambda x, w=w: _rolling_max(x, w), f"滚动最大值: MAX(x, {w})" + ) + _registry.register_function( + f"min{w}", lambda x, w=w: _rolling_min(x, w), f"滚动最小值: MIN(x, {w})" + ) + + # 环比 + _registry.register_function( + f"huanbi{w}", lambda x, w=w: _huanbi(x, w), f"环比: HUANBI(x, {w})" + ) + + # ==================== 因子公式解析与计算 ==================== diff --git a/factors.py b/factors.py deleted file mode 100644 index 44abb5b..0000000 --- a/factors.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -因子挖掘模块:支持规则因子和遗传编程因子 -""" -import numpy as np -import pandas as pd -from typing import Callable, Dict, List, Optional -from abc import ABC, abstractmethod - - -class BaseFactor(ABC): - """因子基类""" - - def __init__(self, name: str): - self.name = name - - @abstractmethod - def compute(self, data: pd.DataFrame) -> pd.Series: - """计算因子值""" - pass - - -class RuleFactor(BaseFactor): - """规则因子:基于固定规则""" - - def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]): - super().__init__(name) - self.compute_func = compute_func - - def compute(self, data: pd.DataFrame) -> pd.Series: - return self.compute_func(data) - - -def create_trend_factor(data: pd.DataFrame) -> pd.Series: - """趋势因子:价格趋势方向""" - trend = pd.Series(0, index=data.index) - trend[data['close'] > data['ema16']] = 1 - trend[data['close'] < data['ema4']] = -1 - return trend - - -def create_volatility_factor(data: pd.DataFrame) -> pd.Series: - """波动率因子:滚动12期收益率标准差""" - return data['volatility'] - - -def create_volume_price_factor(data: pd.DataFrame) -> pd.Series: - """量价因子:成交量放大且价格上涨""" - volume_signal = (data['volume'] > data['volume_ma6']).astype(int) - return volume_signal * data['return'] - - -def create_reversal_factor(data: pd.DataFrame) -> pd.Series: - """反转因子:短期反转效应""" - return -data['return'].shift(1) - - -def create_momentum_factor(data: pd.DataFrame) -> pd.Series: - """动量因子:基于MACD""" - return data['macd'] - - -def create_rsi_factor(data: pd.DataFrame) -> pd.Series: - """RSI因子:相对强弱指数(标准化)""" - return (data['rsi'] - 50) / 50 # 归一化到[-1, 1] - - -class FactorMiner: - """因子挖掘器""" - - def __init__(self): - self.factors: Dict[str, BaseFactor] = {} - - def register_factor(self, factor: BaseFactor): - """注册因子""" - self.factors[factor.name] = factor - - def register_rule_factor(self, name: str, compute_func: Callable): - """注册规则因子""" - factor = RuleFactor(name, compute_func) - self.register_factor(factor) - - def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame: - """计算所有因子""" - factor_df = pd.DataFrame(index=data.index) - - for name, factor in self.factors.items(): - try: - factor_df[name] = factor.compute(data) - except Exception as e: - print(f"计算因子 {name} 时出错: {e}") - factor_df[name] = np.nan - - return factor_df - - def get_factor(self, name: str) -> Optional[BaseFactor]: - """获取指定因子""" - return self.factors.get(name) - - -def create_default_factors() -> FactorMiner: - """创建默认因子集合""" - miner = FactorMiner() - - # 注册基础因子 - miner.register_rule_factor('TREND', create_trend_factor) - miner.register_rule_factor('VOL', create_volatility_factor) - miner.register_rule_factor('VOLP', create_volume_price_factor) - miner.register_rule_factor('REV', create_reversal_factor) - miner.register_rule_factor('MOM', create_momentum_factor) - miner.register_rule_factor('RSI', create_rsi_factor) - - return miner - diff --git a/signal.py b/signal.py deleted file mode 100644 index 5753ad8..0000000 --- a/signal.py +++ /dev/null @@ -1,109 +0,0 @@ -""" -信号生成模块 -""" -import numpy as np -import pandas as pd -from typing import Optional, TYPE_CHECKING - -if TYPE_CHECKING: - from pandas import Series - - -def generate_signals( - score: 'pd.Series', - buy_threshold: float = 0.8, - sell_threshold: float = -0.8, - window: int = 30, - use_rolling_std: bool = True -) -> 'pd.Series': - """ - 基于因子得分生成买卖信号 - - Parameters: - ----------- - score : Series - 因子综合得分 - buy_threshold : float - 买入阈值(标准差倍数) - sell_threshold : float - 卖出阈值(标准差倍数) - window : int - 滚动窗口(用于计算标准差) - use_rolling_std : bool - 是否使用滚动标准差 - - Returns: - -------- - Series: 交易信号(1=买入,-1=卖出,0=持有) - """ - signals = pd.Series(0, index=score.index) - - if use_rolling_std: - # 使用滚动标准差 - rolling_std = score.rolling(window).std() - buy_line = buy_threshold * rolling_std - sell_line = sell_threshold * rolling_std - else: - # 使用固定阈值 - std = score.std() - buy_line = buy_threshold * std - sell_line = sell_threshold * std - - # 生成原始信号 - raw_signals = pd.Series(0, index=score.index) - raw_signals[score > buy_line] = 1 # 买入信号 - raw_signals[score < sell_line] = -1 # 卖出信号 - - # 只在信号变化时产生交易信号,其他时候保持持仓状态 - signals = pd.Series(0, index=score.index) - position = 0 # 当前持仓状态:0=空仓,1=满仓 - - for i in range(len(raw_signals)): - current_signal = raw_signals.iloc[i] - - # 只在信号变化时产生交易 - if current_signal == 1 and position == 0: - signals.iloc[i] = 1 # 买入 - position = 1 - elif current_signal == -1 and position == 1: - signals.iloc[i] = -1 # 卖出 - position = 0 - # 其他情况保持当前持仓状态,不产生交易信号 - - return signals.astype(int) - - -def generate_signals_with_position( - score: 'pd.Series', - buy_threshold: float = 0.8, - sell_threshold: float = -0.8, - window: int = 30, - current_position: int = 0 -) -> 'pd.Series': - """ - 生成信号(考虑当前持仓状态) - - Parameters: - ----------- - current_position : int - 当前持仓:0=空仓,1=满仓 - """ - raw_signals = generate_signals(score, buy_threshold, sell_threshold, window) - signals = pd.Series(0, index=score.index) - - position = current_position - - for i in range(len(raw_signals)): - signal = raw_signals.iloc[i] - - if signal == 1 and position == 0: - signals.iloc[i] = 1 # 买入 - position = 1 - elif signal == -1 and position == 1: - signals.iloc[i] = -1 # 卖出 - position = 0 - else: - signals.iloc[i] = 0 # 持有 - - return signals - diff --git a/validation.py b/validation.py index adf7990..439adc4 100644 --- a/validation.py +++ b/validation.py @@ -1,63 +1,44 @@ """ -因子检验模块:IC检验、分组回测、因子跨度回归 +因子检验模块: IC检验、分组回测、因子跨度回归 """ + import numpy as np import pandas as pd from typing import Dict, List, Tuple from statsmodels.regression.linear_model import OLS -def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series: - """ - 计算IC(信息系数) - - Parameters: - ----------- - factor : Series - 因子值 - forward_return : Series - 未来收益率 - method : str - 相关性计算方法:'spearman' 或 'pearson' - """ - aligned = pd.concat([factor, forward_return], axis=1).dropna() - if len(aligned) < 10: - return pd.Series(dtype=float) - - if method == 'spearman': - ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank()) - else: - ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1]) - - return pd.Series([ic], index=[aligned.index[-1]]) - - def compute_rolling_ic( factor: pd.Series, forward_return: pd.Series, window: int = 30, - method: str = 'spearman' + method: str = "spearman", ) -> pd.Series: - """计算滚动IC(向量化优化)""" + """计算滚动IC (向量化优化)""" # 对齐数据 aligned = pd.concat([factor, forward_return], axis=1).dropna() if len(aligned) < window: return pd.Series(dtype=float, index=factor.index[window:]) - - aligned.columns = ['factor', 'return'] - - if method == 'spearman': + + aligned.columns = ["factor", "return"] + + if method == "spearman": # 使用rank计算Spearman相关性 - factor_rank = aligned['factor'].rank() - return_rank = aligned['return'].rank() - # 使用DataFrame的rolling().corr()方法 - df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank}) - ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return']) + # 这里是全局的 rank,理论上应该是按照 window 滚动排序 + factor_rank = aligned["factor"].rank() + return_rank = aligned["return"].rank() + # 使用DataFrame的rolling().corr()方法, 该方法pandas优化过 + df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank}) + ic_series = ( + df_rank["factor"] + .rolling(window, min_periods=window) + .corr(df_rank["return"]) + ) else: # Pearson相关性 - df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']}) - ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return']) - + df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]}) + ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"]) + return ic_series @@ -65,85 +46,75 @@ def group_backtest( factor: pd.Series, forward_return: pd.Series, n_groups: int = 3, - group_period: int = 180 + group_period: int = 180, ) -> Dict: """ 分组回测:将数据按因子值分组,计算各组收益 - + Returns: -------- dict: 包含各组收益、H-L收益差、t统计量等 """ aligned = pd.concat([factor, forward_return], axis=1).dropna() - aligned.columns = ['factor', 'return'] - - results = { - 'group_returns': [], - 'h_l_return': [], - 'h_l_tstat': [], - 'periods': [] - } - + aligned.columns = ["factor", "return"] + + results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []} + # 按月分组(每180个4h周期)- 使用更高效的步长 step = max(group_period // 2, 90) # 减少重叠计算 for start in range(0, len(aligned) - group_period, step): end = start + group_period period_data = aligned.iloc[start:end] - + if len(period_data) < 30: continue - + # 按因子值分组(向量化) try: period_data = period_data.copy() - period_data['group'] = pd.qcut( - period_data['factor'], - q=n_groups, - labels=False, - duplicates='drop' + period_data["group"] = pd.qcut( + period_data["factor"], q=n_groups, labels=False, duplicates="drop" ) - + # 计算各组收益(向量化) - group_returns = period_data.groupby('group')['return'].mean() - results['group_returns'].append(group_returns) - + group_returns = period_data.groupby("group")["return"].mean() + results["group_returns"].append(group_returns) + # H-L收益差 if len(group_returns) >= 2: h_return = group_returns.iloc[-1] # 高因子组 - l_return = group_returns.iloc[0] # 低因子组 + l_return = group_returns.iloc[0] # 低因子组 h_l_diff = h_return - l_return - - results['h_l_return'].append(h_l_diff) - results['periods'].append(period_data.index[-1]) + + results["h_l_return"].append(h_l_diff) + results["periods"].append(period_data.index[-1]) except (ValueError, KeyError): # qcut失败时跳过 continue - + # 计算平均H-L收益和t统计量 - if results['h_l_return']: - h_l_series = pd.Series(results['h_l_return'], index=results['periods']) + if results["h_l_return"]: + h_l_series = pd.Series(results["h_l_return"], index=results["periods"]) mean_h_l = h_l_series.mean() std_h_l = h_l_series.std() t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8) - - results['mean_h_l_return'] = mean_h_l - results['mean_h_l_tstat'] = t_stat - results['h_l_series'] = h_l_series + + results["mean_h_l_return"] = mean_h_l + results["mean_h_l_tstat"] = t_stat + results["h_l_series"] = h_l_series else: - results['mean_h_l_return'] = 0 - results['mean_h_l_tstat'] = 0 - + results["mean_h_l_return"] = 0 + results["mean_h_l_tstat"] = 0 + return results def factor_span_regression( - factors: pd.DataFrame, - forward_return: pd.Series, - target_factor: str + factors: pd.DataFrame, forward_return: pd.Series, target_factor: str ) -> Dict: """ 因子跨度回归:检验因子的边际解释力 - + Parameters: ----------- factors : DataFrame @@ -152,7 +123,7 @@ def factor_span_regression( 未来收益率 target_factor : str 目标因子名称 - + Returns: -------- dict: 包含回归系数、t统计量、R²等 @@ -160,49 +131,46 @@ def factor_span_regression( # 对齐数据 data = pd.concat([factors, forward_return], axis=1).dropna() if len(data) < 30: - return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} - + return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0} + y = data.iloc[:, -1].values X_all = data.iloc[:, :-1].values - + # 全模型(包含目标因子) try: - model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6}) r2_all = model_all.rsquared - + # 目标因子的系数和t统计量 target_idx = factors.columns.get_loc(target_factor) beta = model_all.params[target_idx] tstat = model_all.tvalues[target_idx] - + # 不含目标因子的模型 X_without = np.delete(X_all, target_idx, axis=1) - model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6}) r2_without = model_without.rsquared - + r2_change = r2_all - r2_without - + return { - 'beta': beta, - 'tstat': tstat, - 'r2': r2_all, - 'r2_change': r2_change, - 'pvalue': model_all.pvalues[target_idx] + "beta": beta, + "tstat": tstat, + "r2": r2_all, + "r2_change": r2_change, + "pvalue": model_all.pvalues[target_idx], } except Exception as e: print(f"回归分析出错: {e}") - return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} + return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0} def validate_factor( - factor: pd.Series, - forward_return: pd.Series, - ic_window: int = 30, - n_groups: int = 3 + factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3 ) -> Dict: """ 综合因子检验 - + Returns: -------- dict: 包含IC、分组回测、显著性等指标 @@ -211,16 +179,15 @@ def validate_factor( rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window) mean_ic = rolling_ic.mean() ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率 - + # 分组回测 group_result = group_backtest(factor, forward_return, n_groups=n_groups) - - return { - 'mean_ic': mean_ic, - 'ic_ir': ic_ir, - 'ic_series': rolling_ic, - 'mean_h_l_return': group_result['mean_h_l_return'], - 'mean_h_l_tstat': group_result['mean_h_l_tstat'], - 'group_returns': group_result['group_returns'] - } + return { + "mean_ic": mean_ic, + "ic_ir": ic_ir, + "ic_series": rolling_ic, + "mean_h_l_return": group_result["mean_h_l_return"], + "mean_h_l_tstat": group_result["mean_h_l_tstat"], + "group_returns": group_result["group_returns"], + }