import argparse import math import operator import random from dataclasses import dataclass from typing import Callable, Dict, List, Optional, Tuple import numpy as np import pandas as pd from deap import algorithms, base, creator, gp, tools # ------------------------------ # Data & Config # ------------------------------ @dataclass class EvolutionConfig: population_size: int = 200 generations: int = 30 tournament_size: int = 5 crossover_prob: float = 0.9 mutation_prob: float = 0.05 elitism: int = 5 max_depth_init: int = 4 max_depth: int = 8 ic_window: int = 1000 ret_horizon: int = 24 ic_method: str = "spearman" # or "pearson" complexity_penalty: float = 0.001 seed: Optional[int] = 42 # ------------------------------ # Safe operators for GP # ------------------------------ def _safe_div(left: np.ndarray, right: np.ndarray) -> np.ndarray: denom = np.where(np.abs(right) < 1e-12, np.nan, right) return left / denom def _safe_log(x: np.ndarray) -> np.ndarray: return np.log(np.clip(np.abs(x), 1e-12, None)) def _safe_sqrt(x: np.ndarray) -> np.ndarray: return np.sqrt(np.clip(x, 0.0, None)) def _safe_pow(x: np.ndarray, y: np.ndarray) -> np.ndarray: # Limit exponent to avoid overflow y_clip = np.clip(y, -3.0, 3.0) with np.errstate(over="ignore", invalid="ignore"): out = np.power(np.clip(x, -1e6, 1e6), y_clip) out[~np.isfinite(out)] = np.nan return out def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy() def _rolling_std(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy() def _ts_delta(x: np.ndarray, period: int) -> np.ndarray: s = pd.Series(x) return s.diff(period).to_numpy() def _ts_rank(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).apply( lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False ).to_numpy() def _delay(x: np.ndarray, period: int) -> np.ndarray: s = pd.Series(x) return s.shift(period).to_numpy() # ------------------------------ # Primitive set # ------------------------------ def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped: # Each feature is a numpy array of floats; GP outputs numpy array pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray) # Name the arguments for readability for i, name in enumerate(feature_names): pset.renameArguments(**{f"ARG{i}": name}) # Binary arithmetic pset.addPrimitive(lambda x, y: x + y, [np.ndarray, np.ndarray], np.ndarray, name="add") pset.addPrimitive(lambda x, y: x - y, [np.ndarray, np.ndarray], np.ndarray, name="sub") pset.addPrimitive(lambda x, y: x * y, [np.ndarray, np.ndarray], np.ndarray, name="mul") pset.addPrimitive(_safe_div, [np.ndarray, np.ndarray], np.ndarray, name="div") # Unary transforms pset.addPrimitive(np.negative, [np.ndarray], np.ndarray, name="neg") pset.addPrimitive(np.abs, [np.ndarray], np.ndarray, name="abs") pset.addPrimitive(_safe_log, [np.ndarray], np.ndarray, name="log") pset.addPrimitive(_safe_sqrt, [np.ndarray], np.ndarray, name="sqrt") # Power pset.addPrimitive(_safe_pow, [np.ndarray, np.ndarray], np.ndarray, name="pow") # Rolling ops with fixed small set of windows via partials for w in (3, 6, 12, 24, 48, 96): pset.addPrimitive(lambda x, w=w: _rolling_mean(x, w), [np.ndarray], np.ndarray, name=f"sma{w}") pset.addPrimitive(lambda x, w=w: _rolling_std(x, w), [np.ndarray], np.ndarray, name=f"std{w}") pset.addPrimitive(lambda x, w=w: _ts_rank(x, w), [np.ndarray], np.ndarray, name=f"rank{w}") pset.addPrimitive(lambda x, w=w: _ts_delta(x, w), [np.ndarray], np.ndarray, name=f"delta{w}") pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}") # Ephemeral constants: scalar to array via broadcasting def _const() -> np.ndarray: return np.array(random.uniform(-2.0, 2.0)) pset.addEphemeralConstant("const", _const, np.ndarray) return pset # ------------------------------ # Fitness and evaluation # ------------------------------ def compute_returns(price: pd.Series, horizon: int) -> pd.Series: return price.pct_change(horizon).shift(-horizon) def rank_ic(a: pd.Series, b: pd.Series, method: str = "spearman") -> float: mask = a.notna() & b.notna() if mask.sum() < 10: return np.nan x = a[mask] y = b[mask] if method == "spearman": return x.rank(pct=True).corr(y.rank(pct=True)) return x.corr(y) def series_zscore(x: pd.Series) -> pd.Series: return (x - x.mean()) / (x.std(ddof=0) + 1e-12) def evaluate_individual( individual, toolbox: base.Toolbox, features: List[pd.Series], target: pd.Series, config: EvolutionConfig, ) -> Tuple[float]: func = toolbox.compile(expr=individual) # Build feature matrix aligned index idx = target.index inputs = [f.reindex(idx).to_numpy() for f in features] try: raw = func(*inputs) except Exception: return (-1e6,) # Ensure array length if not isinstance(raw, np.ndarray): return (-1e6,) if raw.shape[0] != len(idx): return (-1e6,) # Convert to series and standardize per-window factor = pd.Series(raw, index=idx) factor = factor.replace([np.inf, -np.inf], np.nan) factor = factor.ffill().bfill() # Rolling IC over window segments window = config.ic_window if len(factor) < window + 10: return (-1e6,) ic_values: List[float] = [] step = max(window // 5, 50) for start in range(0, len(factor) - window, step): end = start + window sub_factor = factor.iloc[start:end] sub_target = target.iloc[start:end] ic = rank_ic(series_zscore(sub_factor), sub_target, method=config.ic_method) if np.isfinite(ic): ic_values.append(ic) if not ic_values: return (-1e6,) mean_ic = float(np.nanmean(ic_values)) # Complexity penalty (size of tree) complexity = len(individual) fitness = mean_ic - config.complexity_penalty * complexity if not np.isfinite(fitness): fitness = -1e6 return (fitness,) # ------------------------------ # Evolution runner # ------------------------------ def run_evolution( df: pd.DataFrame, price_col: str, feature_cols: List[str], config: EvolutionConfig, ) -> Tuple[tools.HallOfFame, base.Toolbox, gp.PrimitiveSetTyped, List[pd.Series]]: if config.seed is not None: random.seed(config.seed) np.random.seed(config.seed) price = df[price_col].astype(float) forward_ret = compute_returns(price, config.ret_horizon) target = forward_ret features = [df[c].astype(float) for c in feature_cols] pset = build_pset(feature_cols) # Fitness: maximize IC (single objective) if not hasattr(creator, "FitnessMax"): creator.create("FitnessMax", base.Fitness, weights=(1.0,)) if not hasattr(creator, "Individual"): creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) toolbox = base.Toolbox() toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=config.max_depth_init) toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr) toolbox.register("population", tools.initRepeat, list, toolbox.individual) toolbox.register("compile", gp.compile, pset=pset) toolbox.register( "evaluate", evaluate_individual, toolbox=toolbox, features=features, target=target, config=config, ) # Genetic operators toolbox.register("select", tools.selTournament, tournsize=config.tournament_size) toolbox.register("mate", gp.cxOnePoint) toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset) # bloat control toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth)) toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth)) pop = toolbox.population(n=config.population_size) hof = tools.HallOfFame(maxsize=max(5, config.elitism)) stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0]) stats_size = tools.Statistics(len) mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) mstats.register("avg", np.nanmean) mstats.register("std", np.nanstd) mstats.register("min", np.nanmin) mstats.register("max", np.nanmax) pop, logbook = algorithms.eaSimple( pop, toolbox, cxpb=config.crossover_prob, mutpb=config.mutation_prob, ngen=config.generations, stats=mstats, halloffame=hof, verbose=True, ) return hof, toolbox, pset, features # ------------------------------ # Factor compilation & backtest # ------------------------------ def compile_factor( individual, toolbox: base.Toolbox, index: pd.Index, features: List[pd.Series], ) -> pd.Series: func = toolbox.compile(expr=individual) inputs = [f.reindex(index).to_numpy() for f in features] raw = func(*inputs) s = pd.Series(raw, index=index) s = s.replace([np.inf, -np.inf], np.nan).ffill().bfill() return s def simple_long_short_backtest( factor: pd.Series, price: pd.Series, ret_horizon: int, top_quantile: float = 0.2, bottom_quantile: float = 0.2, ) -> pd.Series: f = factor.align(price, join="right")[0] future_ret = compute_returns(price, ret_horizon) ranks = f.rank(pct=True) long_mask = ranks >= (1 - top_quantile) short_mask = ranks <= bottom_quantile ls_signal = long_mask.astype(float) - short_mask.astype(float) ls_signal = ls_signal.shift(1) # trade on next bar pnl = ls_signal * future_ret pnl = pnl.replace([np.inf, -np.inf], np.nan).fillna(0.0) equity = (1.0 + pnl).cumprod() return equity # ------------------------------ # CLI # ------------------------------ def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="DEAP-based factor mining (genetic programming)") p.add_argument("--data", type=str, default="ETH_USDT-1h.feather", help="Input feather/csv file") p.add_argument("--price_col", type=str, default="close", help="Price column name") p.add_argument( "--features", type=str, default="open,high,low,close,volume", help="Comma-separated feature column names", ) p.add_argument("--ret_horizon", type=int, default=24) p.add_argument("--population", type=int, default=200) p.add_argument("--generations", type=int, default=30) p.add_argument("--ic_window", type=int, default=1000) p.add_argument("--seed", type=int, default=42) p.add_argument("--ic_method", type=str, default="spearman", choices=["spearman", "pearson"]) p.add_argument("--complexity_penalty", type=float, default=0.001) p.add_argument("--save_best", type=str, default="best_factors.txt") return p.parse_args() def load_dataframe(path: str) -> pd.DataFrame: if path.endswith(".feather"): df = pd.read_feather(path) elif path.endswith(".csv"): df = pd.read_csv(path) else: raise ValueError("Unsupported file format. Use .feather or .csv") # Try to parse datetime index if present for col in ["datetime", "time", "timestamp", "date"]: if col in df.columns: df[col] = pd.to_datetime(df[col]) df = df.set_index(col).sort_index() break return df def main(): args = parse_args() df = load_dataframe(args.data) df = df.head(1000) feature_cols = [c.strip() for c in args.features.split(",") if c.strip()] for c in [args.price_col] + feature_cols: if c not in df.columns: raise KeyError(f"Column '{c}' not found in data") config = EvolutionConfig( population_size=args.population, generations=args.generations, ic_window=args.ic_window, ret_horizon=args.ret_horizon, ic_method=args.ic_method, complexity_penalty=args.complexity_penalty, seed=args.seed, ) hof, toolbox, pset, features = run_evolution(df, args.price_col, feature_cols, config) price = df[args.price_col].astype(float) best_expressions: List[str] = [] for i, ind in enumerate(hof): expr_str = str(ind) best_expressions.append(expr_str) # Save best expressions with open(args.save_best, "w", encoding="utf-8") as f: for expr in best_expressions: f.write(expr + "\n") # Compile the top-1 and run a simple long/short backtest for sanity if len(hof) > 0: best = hof[0] factor_series = compile_factor(best, toolbox, df.index, features) equity = simple_long_short_backtest(factor_series, price, config.ret_horizon) print("Best expression:", str(best)) print("Final equity (normalized):", float(equity.iloc[-1])) # Also export factor and equity out = pd.DataFrame({ "factor": factor_series, "equity": equity, }) out.to_csv("deap_factor_output.csv") print("Saved best expressions to", args.save_best) print("Saved factor/equity to deap_factor_output.csv") if __name__ == "__main__": main()