From dcfe2d84d54a523e9236f1aa8160aef96c0cc581 Mon Sep 17 00:00:00 2001 From: aszerW Date: Thu, 6 Nov 2025 00:05:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E7=A1=80deap=E5=9B=A0=E5=AD=90?= =?UTF-8?q?=E6=8C=96=E6=8E=98=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deap_factor_mining.py | 435 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 435 insertions(+) create mode 100644 deap_factor_mining.py diff --git a/deap_factor_mining.py b/deap_factor_mining.py new file mode 100644 index 0000000..80f938c --- /dev/null +++ b/deap_factor_mining.py @@ -0,0 +1,435 @@ +import argparse +import math +import operator +import random +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional, Tuple + +import numpy as np +import pandas as pd +from deap import algorithms, base, creator, gp, tools + + +# ------------------------------ +# Data & Config +# ------------------------------ + + +@dataclass +class EvolutionConfig: + population_size: int = 200 + generations: int = 30 + tournament_size: int = 5 + crossover_prob: float = 0.9 + mutation_prob: float = 0.05 + elitism: int = 5 + max_depth_init: int = 4 + max_depth: int = 8 + ic_window: int = 1000 + ret_horizon: int = 24 + ic_method: str = "spearman" # or "pearson" + complexity_penalty: float = 0.001 + seed: Optional[int] = 42 + + +# ------------------------------ +# Safe operators for GP +# ------------------------------ + + +def _safe_div(left: np.ndarray, right: np.ndarray) -> np.ndarray: + denom = np.where(np.abs(right) < 1e-12, np.nan, right) + return left / denom + + +def _safe_log(x: np.ndarray) -> np.ndarray: + return np.log(np.clip(np.abs(x), 1e-12, None)) + + +def _safe_sqrt(x: np.ndarray) -> np.ndarray: + return np.sqrt(np.clip(x, 0.0, None)) + + +def _safe_pow(x: np.ndarray, y: np.ndarray) -> np.ndarray: + # Limit exponent to avoid overflow + y_clip = np.clip(y, -3.0, 3.0) + with np.errstate(over="ignore", invalid="ignore"): + out = np.power(np.clip(x, -1e6, 1e6), y_clip) + out[~np.isfinite(out)] = np.nan + return out + + +def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy() + + +def _rolling_std(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy() + + +def _ts_delta(x: np.ndarray, period: int) -> np.ndarray: + s = pd.Series(x) + return s.diff(period).to_numpy() + + +def _ts_rank(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return s.rolling(window, min_periods=max(2, window // 2)).apply( + lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False + ).to_numpy() + + +def _delay(x: np.ndarray, period: int) -> np.ndarray: + s = pd.Series(x) + return s.shift(period).to_numpy() + + +# ------------------------------ +# Primitive set +# ------------------------------ + + +def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped: + # Each feature is a numpy array of floats; GP outputs numpy array + pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray) + + # Name the arguments for readability + for i, name in enumerate(feature_names): + pset.renameArguments(**{f"ARG{i}": name}) + + # Binary arithmetic + pset.addPrimitive(lambda x, y: x + y, [np.ndarray, np.ndarray], np.ndarray, name="add") + pset.addPrimitive(lambda x, y: x - y, [np.ndarray, np.ndarray], np.ndarray, name="sub") + pset.addPrimitive(lambda x, y: x * y, [np.ndarray, np.ndarray], np.ndarray, name="mul") + pset.addPrimitive(_safe_div, [np.ndarray, np.ndarray], np.ndarray, name="div") + + # Unary transforms + pset.addPrimitive(np.negative, [np.ndarray], np.ndarray, name="neg") + pset.addPrimitive(np.abs, [np.ndarray], np.ndarray, name="abs") + pset.addPrimitive(_safe_log, [np.ndarray], np.ndarray, name="log") + pset.addPrimitive(_safe_sqrt, [np.ndarray], np.ndarray, name="sqrt") + + # Power + pset.addPrimitive(_safe_pow, [np.ndarray, np.ndarray], np.ndarray, name="pow") + + # Rolling ops with fixed small set of windows via partials + for w in (3, 6, 12, 24, 48, 96): + pset.addPrimitive(lambda x, w=w: _rolling_mean(x, w), [np.ndarray], np.ndarray, name=f"sma{w}") + pset.addPrimitive(lambda x, w=w: _rolling_std(x, w), [np.ndarray], np.ndarray, name=f"std{w}") + pset.addPrimitive(lambda x, w=w: _ts_rank(x, w), [np.ndarray], np.ndarray, name=f"rank{w}") + pset.addPrimitive(lambda x, w=w: _ts_delta(x, w), [np.ndarray], np.ndarray, name=f"delta{w}") + pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}") + + # Ephemeral constants: scalar to array via broadcasting + def _const() -> np.ndarray: + return np.array(random.uniform(-2.0, 2.0)) + + pset.addEphemeralConstant("const", _const, np.ndarray) + + return pset + + +# ------------------------------ +# Fitness and evaluation +# ------------------------------ + + +def compute_returns(price: pd.Series, horizon: int) -> pd.Series: + return price.pct_change(horizon).shift(-horizon) + + +def rank_ic(a: pd.Series, b: pd.Series, method: str = "spearman") -> float: + mask = a.notna() & b.notna() + if mask.sum() < 10: + return np.nan + x = a[mask] + y = b[mask] + if method == "spearman": + return x.rank(pct=True).corr(y.rank(pct=True)) + return x.corr(y) + + +def series_zscore(x: pd.Series) -> pd.Series: + return (x - x.mean()) / (x.std(ddof=0) + 1e-12) + + +def evaluate_individual( + individual, + toolbox: base.Toolbox, + features: List[pd.Series], + target: pd.Series, + config: EvolutionConfig, +) -> Tuple[float]: + func = toolbox.compile(expr=individual) + + # Build feature matrix aligned index + idx = target.index + inputs = [f.reindex(idx).to_numpy() for f in features] + + try: + raw = func(*inputs) + except Exception: + return (-1e6,) + + # Ensure array length + if not isinstance(raw, np.ndarray): + return (-1e6,) + if raw.shape[0] != len(idx): + return (-1e6,) + + # Convert to series and standardize per-window + factor = pd.Series(raw, index=idx) + factor = factor.replace([np.inf, -np.inf], np.nan) + factor = factor.ffill().bfill() + + # Rolling IC over window segments + window = config.ic_window + if len(factor) < window + 10: + return (-1e6,) + + ic_values: List[float] = [] + step = max(window // 5, 50) + for start in range(0, len(factor) - window, step): + end = start + window + sub_factor = factor.iloc[start:end] + sub_target = target.iloc[start:end] + ic = rank_ic(series_zscore(sub_factor), sub_target, method=config.ic_method) + if np.isfinite(ic): + ic_values.append(ic) + + if not ic_values: + return (-1e6,) + + mean_ic = float(np.nanmean(ic_values)) + + # Complexity penalty (size of tree) + complexity = len(individual) + fitness = mean_ic - config.complexity_penalty * complexity + if not np.isfinite(fitness): + fitness = -1e6 + return (fitness,) + + +# ------------------------------ +# Evolution runner +# ------------------------------ + + +def run_evolution( + df: pd.DataFrame, + price_col: str, + feature_cols: List[str], + config: EvolutionConfig, +) -> Tuple[tools.HallOfFame, base.Toolbox, gp.PrimitiveSetTyped, List[pd.Series]]: + if config.seed is not None: + random.seed(config.seed) + np.random.seed(config.seed) + + price = df[price_col].astype(float) + forward_ret = compute_returns(price, config.ret_horizon) + target = forward_ret + + features = [df[c].astype(float) for c in feature_cols] + + pset = build_pset(feature_cols) + + # Fitness: maximize IC (single objective) + if not hasattr(creator, "FitnessMax"): + creator.create("FitnessMax", base.Fitness, weights=(1.0,)) + if not hasattr(creator, "Individual"): + creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) + + toolbox = base.Toolbox() + toolbox.register("expr", + gp.genHalfAndHalf, + pset=pset, + min_=1, + max_=config.max_depth_init) + toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr) + toolbox.register("population", tools.initRepeat, list, toolbox.individual) + toolbox.register("compile", gp.compile, pset=pset) + + toolbox.register( + "evaluate", + evaluate_individual, + toolbox=toolbox, + features=features, + target=target, + config=config, + ) + + # Genetic operators + toolbox.register("select", tools.selTournament, tournsize=config.tournament_size) + toolbox.register("mate", gp.cxOnePoint) + toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) + toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset) + + # bloat control + toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth)) + toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth)) + + pop = toolbox.population(n=config.population_size) + hof = tools.HallOfFame(maxsize=max(5, config.elitism)) + + stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0]) + stats_size = tools.Statistics(len) + mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) + mstats.register("avg", np.nanmean) + mstats.register("std", np.nanstd) + mstats.register("min", np.nanmin) + mstats.register("max", np.nanmax) + + pop, logbook = algorithms.eaSimple( + pop, + toolbox, + cxpb=config.crossover_prob, + mutpb=config.mutation_prob, + ngen=config.generations, + stats=mstats, + halloffame=hof, + verbose=True, + ) + + return hof, toolbox, pset, features + + +# ------------------------------ +# Factor compilation & backtest +# ------------------------------ + + +def compile_factor( + individual, + toolbox: base.Toolbox, + index: pd.Index, + features: List[pd.Series], +) -> pd.Series: + func = toolbox.compile(expr=individual) + inputs = [f.reindex(index).to_numpy() for f in features] + raw = func(*inputs) + s = pd.Series(raw, index=index) + s = s.replace([np.inf, -np.inf], np.nan).ffill().bfill() + return s + + +def simple_long_short_backtest( + factor: pd.Series, + price: pd.Series, + ret_horizon: int, + top_quantile: float = 0.2, + bottom_quantile: float = 0.2, +) -> pd.Series: + f = factor.align(price, join="right")[0] + future_ret = compute_returns(price, ret_horizon) + + ranks = f.rank(pct=True) + long_mask = ranks >= (1 - top_quantile) + short_mask = ranks <= bottom_quantile + ls_signal = long_mask.astype(float) - short_mask.astype(float) + ls_signal = ls_signal.shift(1) # trade on next bar + + pnl = ls_signal * future_ret + pnl = pnl.replace([np.inf, -np.inf], np.nan).fillna(0.0) + equity = (1.0 + pnl).cumprod() + return equity + + +# ------------------------------ +# CLI +# ------------------------------ + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="DEAP-based factor mining (genetic programming)") + p.add_argument("--data", type=str, default="ETH_USDT-1h.feather", help="Input feather/csv file") + p.add_argument("--price_col", type=str, default="close", help="Price column name") + p.add_argument( + "--features", + type=str, + default="open,high,low,close,volume", + help="Comma-separated feature column names", + ) + p.add_argument("--ret_horizon", type=int, default=24) + p.add_argument("--population", type=int, default=200) + p.add_argument("--generations", type=int, default=30) + p.add_argument("--ic_window", type=int, default=1000) + p.add_argument("--seed", type=int, default=42) + p.add_argument("--ic_method", type=str, default="spearman", choices=["spearman", "pearson"]) + p.add_argument("--complexity_penalty", type=float, default=0.001) + p.add_argument("--save_best", type=str, default="best_factors.txt") + return p.parse_args() + + +def load_dataframe(path: str) -> pd.DataFrame: + if path.endswith(".feather"): + df = pd.read_feather(path) + elif path.endswith(".csv"): + df = pd.read_csv(path) + else: + raise ValueError("Unsupported file format. Use .feather or .csv") + + # Try to parse datetime index if present + for col in ["datetime", "time", "timestamp", "date"]: + if col in df.columns: + df[col] = pd.to_datetime(df[col]) + df = df.set_index(col).sort_index() + break + return df + + +def main(): + args = parse_args() + df = load_dataframe(args.data) + df = df.head(1000) + + feature_cols = [c.strip() for c in args.features.split(",") if c.strip()] + for c in [args.price_col] + feature_cols: + if c not in df.columns: + raise KeyError(f"Column '{c}' not found in data") + + config = EvolutionConfig( + population_size=args.population, + generations=args.generations, + ic_window=args.ic_window, + ret_horizon=args.ret_horizon, + ic_method=args.ic_method, + complexity_penalty=args.complexity_penalty, + seed=args.seed, + ) + + hof, toolbox, pset, features = run_evolution(df, args.price_col, feature_cols, config) + + price = df[args.price_col].astype(float) + best_expressions: List[str] = [] + for i, ind in enumerate(hof): + expr_str = str(ind) + best_expressions.append(expr_str) + + # Save best expressions + with open(args.save_best, "w", encoding="utf-8") as f: + for expr in best_expressions: + f.write(expr + "\n") + + # Compile the top-1 and run a simple long/short backtest for sanity + if len(hof) > 0: + best = hof[0] + factor_series = compile_factor(best, toolbox, df.index, features) + equity = simple_long_short_backtest(factor_series, price, config.ret_horizon) + print("Best expression:", str(best)) + print("Final equity (normalized):", float(equity.iloc[-1])) + # Also export factor and equity + out = pd.DataFrame({ + "factor": factor_series, + "equity": equity, + }) + out.to_csv("deap_factor_output.csv") + print("Saved best expressions to", args.save_best) + print("Saved factor/equity to deap_factor_output.csv") + + +if __name__ == "__main__": + main() + +