Files
factorhack/factor_mining/gp_miner.py
2025-11-09 14:00:58 +08:00

237 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
DEAP遗传编程挖掘器实现
"""
import random
import operator
from typing import List, Tuple, Optional
from dataclasses import dataclass
import numpy as np
import pandas as pd
from deap import algorithms, base, creator, gp, tools
from factor_mining.operators import FactorFormula, get_registry, get_operator
from factor_mining.mining import FactorMiner, MiningConfig
from data import compute_forward_returns
@dataclass
class GPConfig(MiningConfig):
"""GP挖掘配置"""
population_size: int = 200
generations: int = 30
tournament_size: int = 5
crossover_prob: float = 0.9
mutation_prob: float = 0.05
elitism: int = 5
max_depth_init: int = 1
max_depth: int = 8
complexity_penalty: float = 0.001
class GPMiner(FactorMiner):
"""DEAP遗传编程挖掘器"""
def __init__(self, config: GPConfig):
super().__init__(config)
self.config: GPConfig = config
self.toolbox: Optional[base.Toolbox] = None
self.pset: Optional[gp.PrimitiveSetTyped] = None
self.features: Optional[List[pd.Series]] = None
def get_name(self) -> str:
return "gp"
def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped:
"""构建GP原始集合"""
registry = get_registry()
pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray)
# 命名参数
for i, name in enumerate(feature_names):
pset.renameArguments(**{f"ARG{i}": name})
# 添加算子
for op_name in registry.list_all():
op = registry.get(op_name)
if op:
sig = op.get_signature()
params = list(sig.parameters.values())
# 根据参数数量判断是一元还是二元算子
if len(params) == 1:
# 一元算子
pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name)
elif len(params) == 2:
# 二元算子
pset.addPrimitive(op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name)
# 添加常量
def _const() -> np.ndarray:
return np.array(random.uniform(-2.0, 2.0))
pset.addEphemeralConstant("const", _const, np.ndarray)
return pset
def _evaluate_individual(
self,
individual,
target: pd.Series
) -> Tuple[float]:
"""评估个体适应度"""
func = self.toolbox.compile(expr=individual)
# 构建特征矩阵
idx = target.index
inputs = [f.reindex(idx).to_numpy() for f in self.features]
try:
raw = func(*inputs)
except Exception:
return (-1e6,)
# 确保数组长度
if not isinstance(raw, np.ndarray):
return (-1e6,)
if raw.shape[0] != len(idx):
return (-1e6,)
# 转换为Series并清理
factor = pd.Series(raw, index=idx)
factor = factor.replace([np.inf, -np.inf], np.nan)
factor = factor.ffill().bfill()
# 计算滚动IC
window = self.config.ic_window
if len(factor) < window + 10:
return (-1e6,)
from validation import compute_rolling_ic
ic_series = compute_rolling_ic(factor, target, window=window, method=self.config.ic_method)
mean_ic = ic_series.mean()
if not np.isfinite(mean_ic):
return (-1e6,)
# 复杂度惩罚
complexity = len(individual)
fitness = mean_ic - self.config.complexity_penalty * complexity
if not np.isfinite(fitness):
fitness = -1e6
return (fitness,)
def _individual_to_formula(
self,
individual,
feature_names: List[str]
) -> FactorFormula:
"""将GP个体转换为因子公式"""
# GP表达式是PrimitiveTree转换为字符串后是函数调用形式
# 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)"
expr_str = str(individual)
# 替换ARG0, ARG1等为实际特征名
for i, name in enumerate(feature_names):
expr_str = expr_str.replace(f"ARG{i}", name)
# GP表达式已经是Python可执行的函数调用格式
# 例如: "add(close, open)" 可以直接eval
# 但需要确保所有算子都在环境中可用
return FactorFormula(expr_str, feature_names)
def mine(
self,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = "close"
) -> List[FactorFormula]:
"""执行GP挖掘"""
if self.config.seed is not None:
random.seed(self.config.seed)
np.random.seed(self.config.seed)
# 准备数据
price = data[price_col].astype(float)
forward_ret = compute_forward_returns(price, self.config.ret_horizon)
target = forward_ret
self.features = [data[c].astype(float) for c in feature_cols]
# 构建原始集合
self.pset = self._build_pset(feature_cols)
# 创建DEAP类型
if not hasattr(creator, "FitnessMax"):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
# 构建工具箱
self.toolbox = base.Toolbox()
self.toolbox.register(
"expr",
gp.genHalfAndHalf,
pset=self.pset,
min_=1,
max_=self.config.max_depth_init
)
self.toolbox.register("individual", tools.initIterate, creator.Individual, self.toolbox.expr)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("compile", gp.compile, pset=self.pset)
self.toolbox.register(
"evaluate",
self._evaluate_individual,
target=target
)
# 遗传算子
self.toolbox.register("select", tools.selTournament, tournsize=self.config.tournament_size)
self.toolbox.register("mate", gp.cxOnePoint)
self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
self.toolbox.register("mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset)
# 控制树深度
self.toolbox.decorate(
"mate",
gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth)
)
self.toolbox.decorate(
"mutate",
gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth)
)
# 运行进化
pop = self.toolbox.population(n=self.config.population_size)
hof = tools.HallOfFame(maxsize=max(5, self.config.elitism))
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.nanmean)
mstats.register("std", np.nanstd)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
pop, logbook = algorithms.eaSimple(
pop,
self.toolbox,
cxpb=self.config.crossover_prob,
mutpb=self.config.mutation_prob,
ngen=self.config.generations,
stats=mstats,
halloffame=hof,
verbose=True,
)
# 转换为因子公式
formulas = []
for individual in hof:
formula = self._individual_to_formula(individual, feature_cols)
formulas.append(formula)
return formulas