""" DEAP遗传编程挖掘器实现 """ import random import operator from typing import List, Tuple, Optional from dataclasses import dataclass import numpy as np import pandas as pd from deap import algorithms, base, creator, gp, tools from factor_mining.operators import FactorFormula, get_registry, get_operator from factor_mining.mining import FactorMiner, MiningConfig from data import compute_forward_returns @dataclass class GPConfig(MiningConfig): """GP挖掘配置""" population_size: int = 200 generations: int = 30 tournament_size: int = 5 crossover_prob: float = 0.9 mutation_prob: float = 0.05 elitism: int = 5 max_depth_init: int = 1 max_depth: int = 8 complexity_penalty: float = 0.001 class GPMiner(FactorMiner): """DEAP遗传编程挖掘器""" def __init__(self, config: GPConfig): super().__init__(config) self.config: GPConfig = config self.toolbox: Optional[base.Toolbox] = None self.pset: Optional[gp.PrimitiveSetTyped] = None self.features: Optional[List[pd.Series]] = None def get_name(self) -> str: return "gp" def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped: """构建GP原始集合""" registry = get_registry() pset = gp.PrimitiveSetTyped( "MAIN", [np.ndarray for _ in feature_names], np.ndarray ) # 命名参数 for i, name in enumerate(feature_names): pset.renameArguments(**{f"ARG{i}": name}) # 添加算子 for op_name in registry.list_all(): op = registry.get(op_name) if op: sig = op.get_signature() params = list(sig.parameters.values()) # 根据参数数量判断是一元还是二元算子 if len(params) == 1: # 一元算子 pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name) elif len(params) == 2: # 二元算子 pset.addPrimitive( op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name ) # 添加常量 # def _const() -> np.ndarray: # return np.array(random.uniform(-2.0, 2.0)) # pset.addEphemeralConstant("const", _const, np.ndarray) return pset def _evaluate_individual(self, individual, target: pd.Series) -> Tuple[float]: """评估个体适应度""" func = self.toolbox.compile(expr=individual) # 构建特征矩阵 idx = target.index inputs = [f.reindex(idx).to_numpy() for f in self.features] try: raw = func(*inputs) except Exception: return (-1e6,) # 确保数组长度 if not isinstance(raw, np.ndarray): return (-1e6,) if raw.shape[0] != len(idx): return (-1e6,) # 转换为Series并清理 factor = pd.Series(raw, index=idx) factor = factor.replace([np.inf, -np.inf], np.nan) factor = factor.ffill().bfill() # 计算滚动IC window = self.config.ic_window if len(factor) < window + 10: return (-1e6,) from validation import compute_rolling_ic ic_series = compute_rolling_ic( factor, target, window=window, method=self.config.ic_method ) mean_ic = ic_series.mean() if not np.isfinite(mean_ic): return (-1e6,) # 复杂度惩罚 complexity = len(individual) fitness = mean_ic - self.config.complexity_penalty * complexity if not np.isfinite(fitness): fitness = -1e6 return (fitness,) def _individual_to_formula( self, individual, feature_names: List[str] ) -> FactorFormula: """将GP个体转换为因子公式""" # GP表达式是PrimitiveTree,转换为字符串后是函数调用形式 # 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)" expr_str = str(individual) # 替换ARG0, ARG1等为实际特征名 for i, name in enumerate(feature_names): expr_str = expr_str.replace(f"ARG{i}", name) # GP表达式已经是Python可执行的函数调用格式 # 例如: "add(close, open)" 可以直接eval # 但需要确保所有算子都在环境中可用 return FactorFormula(expr_str, feature_names) def mine( self, data: pd.DataFrame, feature_cols: List[str], price_col: str = "close" ) -> List[FactorFormula]: """执行GP挖掘""" if self.config.seed is not None: random.seed(self.config.seed) np.random.seed(self.config.seed) # 准备数据 price = data[price_col].astype(float) forward_ret = compute_forward_returns(price, self.config.ret_horizon) target = forward_ret self.features = [data[c].astype(float) for c in feature_cols] # 构建原始集合 self.pset = self._build_pset(feature_cols) # 创建DEAP类型 if not hasattr(creator, "FitnessMax"): creator.create("FitnessMax", base.Fitness, weights=(1.0,)) if not hasattr(creator, "Individual"): creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) # 构建工具箱 self.toolbox = base.Toolbox() self.toolbox.register( "expr", gp.genHalfAndHalf, pset=self.pset, min_=1, max_=self.config.max_depth_init, ) self.toolbox.register( "individual", tools.initIterate, creator.Individual, self.toolbox.expr ) self.toolbox.register( "population", tools.initRepeat, list, self.toolbox.individual ) self.toolbox.register("compile", gp.compile, pset=self.pset) self.toolbox.register("evaluate", self._evaluate_individual, target=target) # 遗传算子 self.toolbox.register( "select", tools.selTournament, tournsize=self.config.tournament_size ) self.toolbox.register("mate", gp.cxOnePoint) self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) self.toolbox.register( "mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset ) # 控制树深度 self.toolbox.decorate( "mate", gp.staticLimit( key=operator.attrgetter("height"), max_value=self.config.max_depth ), ) self.toolbox.decorate( "mutate", gp.staticLimit( key=operator.attrgetter("height"), max_value=self.config.max_depth ), ) # 运行进化 pop = self.toolbox.population(n=self.config.population_size) hof = tools.HallOfFame(maxsize=max(5000, self.config.elitism)) stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0]) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.nanmean) mstats.register("std", np.nanstd) mstats.register("min", np.nanmin) mstats.register("max", np.nanmax) pop, logbook = algorithms.eaSimple( pop, self.toolbox, cxpb=self.config.crossover_prob, mutpb=self.config.mutation_prob, ngen=self.config.generations, stats=mstats, halloffame=hof, verbose=True, ) # 转换为因子公式 formulas = [] for individual in hof: formula = self._individual_to_formula(individual, feature_cols) formulas.append(formula) return formulas