diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f9d0ebf --- /dev/null +++ b/.gitignore @@ -0,0 +1,179 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Data files (keep structure but ignore large data) +data/ + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Docker +.dockerignore + +# Logs +*.log +logs/ + +# Temporary files +tmp/ +temp/ +*.tmp +*.temp + +# API keys and secrets +.env +config.ini +secrets.json +api_keys.txt + +# Database files +*.db +*.sqlite +*.sqlite3 + +# Backup files +*.bak +*.backup + +*.csv +*.feather \ No newline at end of file diff --git a/factor_mining/gp_miner.py b/factor_mining/gp_miner.py new file mode 100644 index 0000000..7302f01 --- /dev/null +++ b/factor_mining/gp_miner.py @@ -0,0 +1,236 @@ +""" +DEAP遗传编程挖掘器实现 +""" +import random +import operator +from typing import List, Tuple, Optional +from dataclasses import dataclass +import numpy as np +import pandas as pd +from deap import algorithms, base, creator, gp, tools + +from factor_mining.operators import FactorFormula, get_registry, get_operator +from factor_mining.mining import FactorMiner, MiningConfig +from data import compute_forward_returns + + +@dataclass +class GPConfig(MiningConfig): + """GP挖掘配置""" + population_size: int = 200 + generations: int = 30 + tournament_size: int = 5 + crossover_prob: float = 0.9 + mutation_prob: float = 0.05 + elitism: int = 5 + max_depth_init: int = 1 + max_depth: int = 8 + complexity_penalty: float = 0.001 + + +class GPMiner(FactorMiner): + """DEAP遗传编程挖掘器""" + + def __init__(self, config: GPConfig): + super().__init__(config) + self.config: GPConfig = config + self.toolbox: Optional[base.Toolbox] = None + self.pset: Optional[gp.PrimitiveSetTyped] = None + self.features: Optional[List[pd.Series]] = None + + def get_name(self) -> str: + return "gp" + + def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped: + """构建GP原始集合""" + registry = get_registry() + pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray) + + # 命名参数 + for i, name in enumerate(feature_names): + pset.renameArguments(**{f"ARG{i}": name}) + + # 添加算子 + for op_name in registry.list_all(): + op = registry.get(op_name) + if op: + sig = op.get_signature() + params = list(sig.parameters.values()) + + # 根据参数数量判断是一元还是二元算子 + if len(params) == 1: + # 一元算子 + pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name) + elif len(params) == 2: + # 二元算子 + pset.addPrimitive(op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name) + + # 添加常量 + def _const() -> np.ndarray: + return np.array(random.uniform(-2.0, 2.0)) + pset.addEphemeralConstant("const", _const, np.ndarray) + + return pset + + def _evaluate_individual( + self, + individual, + target: pd.Series + ) -> Tuple[float]: + """评估个体适应度""" + func = self.toolbox.compile(expr=individual) + + # 构建特征矩阵 + idx = target.index + inputs = [f.reindex(idx).to_numpy() for f in self.features] + + try: + raw = func(*inputs) + except Exception: + return (-1e6,) + + # 确保数组长度 + if not isinstance(raw, np.ndarray): + return (-1e6,) + if raw.shape[0] != len(idx): + return (-1e6,) + + # 转换为Series并清理 + factor = pd.Series(raw, index=idx) + factor = factor.replace([np.inf, -np.inf], np.nan) + factor = factor.ffill().bfill() + + # 计算滚动IC + window = self.config.ic_window + if len(factor) < window + 10: + return (-1e6,) + + from validation import compute_rolling_ic + ic_series = compute_rolling_ic(factor, target, window=window, method=self.config.ic_method) + mean_ic = ic_series.mean() + + if not np.isfinite(mean_ic): + return (-1e6,) + + # 复杂度惩罚 + complexity = len(individual) + fitness = mean_ic - self.config.complexity_penalty * complexity + + if not np.isfinite(fitness): + fitness = -1e6 + + return (fitness,) + + def _individual_to_formula( + self, + individual, + feature_names: List[str] + ) -> FactorFormula: + """将GP个体转换为因子公式""" + # GP表达式是PrimitiveTree,转换为字符串后是函数调用形式 + # 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)" + expr_str = str(individual) + + # 替换ARG0, ARG1等为实际特征名 + for i, name in enumerate(feature_names): + expr_str = expr_str.replace(f"ARG{i}", name) + + # GP表达式已经是Python可执行的函数调用格式 + # 例如: "add(close, open)" 可以直接eval + # 但需要确保所有算子都在环境中可用 + + return FactorFormula(expr_str, feature_names) + + def mine( + self, + data: pd.DataFrame, + feature_cols: List[str], + price_col: str = "close" + ) -> List[FactorFormula]: + """执行GP挖掘""" + if self.config.seed is not None: + random.seed(self.config.seed) + np.random.seed(self.config.seed) + + # 准备数据 + price = data[price_col].astype(float) + forward_ret = compute_forward_returns(price, self.config.ret_horizon) + target = forward_ret + + self.features = [data[c].astype(float) for c in feature_cols] + + # 构建原始集合 + self.pset = self._build_pset(feature_cols) + + # 创建DEAP类型 + if not hasattr(creator, "FitnessMax"): + creator.create("FitnessMax", base.Fitness, weights=(1.0,)) + if not hasattr(creator, "Individual"): + creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax) + + # 构建工具箱 + self.toolbox = base.Toolbox() + self.toolbox.register( + "expr", + gp.genHalfAndHalf, + pset=self.pset, + min_=1, + max_=self.config.max_depth_init + ) + self.toolbox.register("individual", tools.initIterate, creator.Individual, self.toolbox.expr) + self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) + self.toolbox.register("compile", gp.compile, pset=self.pset) + + self.toolbox.register( + "evaluate", + self._evaluate_individual, + target=target + ) + + # 遗传算子 + self.toolbox.register("select", tools.selTournament, tournsize=self.config.tournament_size) + self.toolbox.register("mate", gp.cxOnePoint) + self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) + self.toolbox.register("mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset) + + # 控制树深度 + self.toolbox.decorate( + "mate", + gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth) + ) + self.toolbox.decorate( + "mutate", + gp.staticLimit(key=operator.attrgetter("height"), max_value=self.config.max_depth) + ) + + # 运行进化 + pop = self.toolbox.population(n=self.config.population_size) + hof = tools.HallOfFame(maxsize=max(5, self.config.elitism)) + + stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0]) + stats_size = tools.Statistics(len) + mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size) + mstats.register("avg", np.nanmean) + mstats.register("std", np.nanstd) + mstats.register("min", np.nanmin) + mstats.register("max", np.nanmax) + + pop, logbook = algorithms.eaSimple( + pop, + self.toolbox, + cxpb=self.config.crossover_prob, + mutpb=self.config.mutation_prob, + ngen=self.config.generations, + stats=mstats, + halloffame=hof, + verbose=True, + ) + + # 转换为因子公式 + formulas = [] + for individual in hof: + formula = self._individual_to_formula(individual, feature_cols) + formulas.append(formula) + + return formulas + diff --git a/factor_mining/mining.py b/factor_mining/mining.py new file mode 100644 index 0000000..99486eb --- /dev/null +++ b/factor_mining/mining.py @@ -0,0 +1,123 @@ +""" +因子挖掘抽象层:支持多种挖掘算法(DEAP、DL、RL等) +""" +from abc import ABC, abstractmethod +from typing import List, Dict, Optional, Any +import pandas as pd +from dataclasses import dataclass + +from factor_mining.operators import FactorFormula + + +@dataclass +class MiningConfig: + """挖掘配置基类""" + ret_horizon: int = 1 + ic_window: int = 30 + ic_method: str = "spearman" # "spearman" or "pearson" + seed: Optional[int] = None + + +class FactorMiner(ABC): + """因子挖掘器抽象基类""" + + def __init__(self, config: MiningConfig): + self.config = config + + @abstractmethod + def mine( + self, + data: pd.DataFrame, + feature_cols: List[str], + price_col: str = "close" + ) -> List[FactorFormula]: + """ + 挖掘因子 + + Parameters: + ----------- + data : DataFrame + 数据 + feature_cols : List[str] + 特征列名列表 + price_col : str + 价格列名 + + Returns: + -------- + List[FactorFormula]: 挖掘出的因子公式列表 + """ + pass + + @abstractmethod + def get_name(self) -> str: + """获取挖掘器名称""" + pass + + +class MiningPipeline: + """挖掘流程管理器""" + + def __init__(self): + self.miners: Dict[str, FactorMiner] = {} + + def register_miner(self, miner: FactorMiner): + """注册挖掘器""" + name = miner.get_name() + if name in self.miners: + raise ValueError(f"挖掘器 '{name}' 已存在") + self.miners[name] = miner + + def get_miner(self, name: str) -> Optional[FactorMiner]: + """获取挖掘器""" + return self.miners.get(name) + + def list_miners(self) -> List[str]: + """列出所有挖掘器""" + return list(self.miners.keys()) + + def mine( + self, + miner_name: str, + data: pd.DataFrame, + feature_cols: List[str], + price_col: str = "close" + ) -> List[FactorFormula]: + """ + 使用指定挖掘器进行挖掘 + + Parameters: + ----------- + miner_name : str + 挖掘器名称 + data : DataFrame + 数据 + feature_cols : List[str] + 特征列名列表 + price_col : str + 价格列名 + + Returns: + -------- + List[FactorFormula]: 挖掘出的因子公式列表 + """ + miner = self.get_miner(miner_name) + if miner is None: + raise ValueError(f"挖掘器 '{miner_name}' 不存在") + + return miner.mine(data, feature_cols, price_col) + + +# 全局挖掘流程管理器 +_pipeline = MiningPipeline() + + +def register_miner(miner: FactorMiner): + """注册挖掘器到全局管理器""" + _pipeline.register_miner(miner) + + +def get_pipeline() -> MiningPipeline: + """获取全局挖掘流程管理器""" + return _pipeline + diff --git a/factor_mining/operators.py b/factor_mining/operators.py new file mode 100644 index 0000000..0707b17 --- /dev/null +++ b/factor_mining/operators.py @@ -0,0 +1,297 @@ +""" +算子系统:基础数学算子和技术指标算子的注册与管理 +支持算子的注册、查询、反射调用 +""" + +import numpy as np +import pandas as pd +from typing import Dict, Callable, List, Optional, Any +from abc import ABC, abstractmethod +import inspect + + +class Operator(ABC): + """算子基类""" + + def __init__(self, name: str, func: Callable, description: str = ""): + """ + Parameters: + ----------- + name : str + 算子名称(唯一标识) + func : Callable + 算子函数 + description : str + 算子描述 + """ + self.name = name + self.func = func + self.description = description + self._signature = inspect.signature(func) + + def __call__(self, *args, **kwargs): + """调用算子函数""" + return self.func(*args, **kwargs) + + def get_signature(self): + """获取函数签名""" + return self._signature + + def __repr__(self): + return f"Operator(name='{self.name}', description='{self.description}')" + + +class OperatorRegistry: + """算子注册表""" + + def __init__(self): + self._operators: Dict[str, Operator] = {} + + def register(self, operator: Operator): + """注册算子""" + if operator.name in self._operators: + raise ValueError(f"算子 '{operator.name}' 已存在") + self._operators[operator.name] = operator + + def register_function(self, name: str, func: Callable, description: str = ""): + """直接注册函数为算子""" + operator = Operator(name, func, description) + self.register(operator) + + def get(self, name: str) -> Optional[Operator]: + """获取算子""" + return self._operators.get(name) + + def has(self, name: str) -> bool: + """检查算子是否存在""" + return name in self._operators + + def list_all(self) -> List[str]: + """列出所有算子名称""" + return list(self._operators.keys()) + + def get_all(self) -> Dict[str, Operator]: + """获取所有算子""" + return self._operators.copy() + + +# 全局算子注册表 +_registry = OperatorRegistry() + + +def register_operator(name: str, description: str = ""): + """装饰器:注册算子""" + + def decorator(func: Callable): + _registry.register_function(name, func, description) + return func + + return decorator + + +def get_operator(name: str) -> Optional[Operator]: + """获取算子""" + return _registry.get(name) + + +def get_registry() -> OperatorRegistry: + """获取全局注册表""" + return _registry + + +# ==================== 基础数学算子 ==================== + + +@register_operator("add", "加法: x + y") +def _add(x: np.ndarray, y: np.ndarray) -> np.ndarray: + return x + y + + +@register_operator("sub", "减法: x - y") +def _sub(x: np.ndarray, y: np.ndarray) -> np.ndarray: + return x - y + + +@register_operator("mul", "乘法: x * y") +def _mul(x: np.ndarray, y: np.ndarray) -> np.ndarray: + return x * y + + +@register_operator("div", "除法: x / y (安全除法)") +def _div(x: np.ndarray, y: np.ndarray) -> np.ndarray: + denom = np.where(np.abs(y) < 1e-12, np.nan, y) + return x / denom + + +@register_operator("neg", "取负: -x") +def _neg(x: np.ndarray) -> np.ndarray: + return np.negative(x) + + +@register_operator("abs", "绝对值: |x|") +def _abs(x: np.ndarray) -> np.ndarray: + return np.abs(x) + + +@register_operator("log", "对数: log(|x|)") +def _log(x: np.ndarray) -> np.ndarray: + return np.log(np.clip(np.abs(x), 1e-12, None)) + + +@register_operator("sqrt", "平方根: sqrt(x)") +def _sqrt(x: np.ndarray) -> np.ndarray: + return np.sqrt(np.clip(x, 0.0, None)) + + +@register_operator("pow", "幂运算: x^y (限制范围)") +def _pow(x: np.ndarray, y: np.ndarray) -> np.ndarray: + y_clip = np.clip(y, -3.0, 3.0) + with np.errstate(over="ignore", invalid="ignore"): + out = np.power(np.clip(x, -1e6, 1e6), y_clip) + out[~np.isfinite(out)] = np.nan + return out + + +# ==================== 时间序列算子 ==================== + + +def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy() + + +def _rolling_std(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy() + + +def _ts_delta(x: np.ndarray, period: int) -> np.ndarray: + s = pd.Series(x) + return s.diff(period).to_numpy() + + +def _ts_rank(x: np.ndarray, window: int) -> np.ndarray: + s = pd.Series(x) + return ( + s.rolling(window, min_periods=max(2, window // 2)) + .apply(lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False) + .to_numpy() + ) + + +def _delay(x: np.ndarray, period: int) -> np.ndarray: + s = pd.Series(x) + return s.shift(period).to_numpy() + + +# 注册时间序列算子(带不同窗口) +for w in (3, 6, 12, 24, 48, 96): + _registry.register_function( + f"sma{w}", lambda x, w=w: _rolling_mean(x, w), f"简单移动平均: SMA(x, {w})" + ) + _registry.register_function( + f"std{w}", lambda x, w=w: _rolling_std(x, w), f"滚动标准差: STD(x, {w})" + ) + _registry.register_function( + f"rank{w}", lambda x, w=w: _ts_rank(x, w), f"滚动排名: RANK(x, {w})" + ) + _registry.register_function( + f"delta{w}", lambda x, w=w: _ts_delta(x, w), f"差分: DELTA(x, {w})" + ) + _registry.register_function( + f"delay{w}", lambda x, w=w: _delay(x, w), f"延迟: DELAY(x, {w})" + ) + + +# ==================== 因子公式解析与计算 ==================== + + +class FactorFormula: + """因子公式:支持序列化和反序列化""" + + def __init__(self, expression: str, feature_names: List[str]): + """ + Parameters: + ----------- + expression : str + 因子表达式(使用算子名称) + feature_names : List[str] + 特征名称列表 + """ + self.expression = expression + self.feature_names = feature_names + + def compute(self, features: Dict[str, np.ndarray]) -> np.ndarray: + """ + 计算因子值 + + Parameters: + ----------- + features : Dict[str, np.ndarray] + 特征字典,key为特征名称 + + Returns: + -------- + np.ndarray: 因子值 + """ + # 构建计算环境 + env = {} + + # 添加特征 + for name in self.feature_names: + if name not in features: + raise KeyError(f"特征 '{name}' 不存在") + env[name] = features[name] + + # 添加算子 + for op_name in _registry.list_all(): + op = _registry.get(op_name) + if op: + env[op_name] = op.func + + # 添加numpy和pandas(用于某些表达式) + env["np"] = np + env["pd"] = pd + + # 执行表达式 + try: + # 限制可用的内置函数 + safe_builtins = { + "abs": abs, + "min": min, + "max": max, + "sum": sum, + "len": len, + } + result = eval(self.expression, {"__builtins__": safe_builtins}, env) + + # 确保结果是numpy数组 + if not isinstance(result, np.ndarray): + if isinstance(result, (int, float)): + # 标量转换为数组(广播) + result = np.full(len(features[self.feature_names[0]]), result) + else: + result = np.array(result) + + # 确保长度一致 + expected_len = len(features[self.feature_names[0]]) + if len(result) != expected_len: + raise ValueError( + f"表达式结果长度 {len(result)} 与特征长度 {expected_len} 不匹配" + ) + + return result + except Exception as e: + raise RuntimeError(f"计算因子表达式失败: {e}\n表达式: {self.expression}") + + def to_dict(self) -> Dict: + """序列化为字典""" + return {"expression": self.expression, "feature_names": self.feature_names} + + @classmethod + def from_dict(cls, data: Dict) -> "FactorFormula": + """从字典反序列化""" + return cls(data["expression"], data["feature_names"]) + + def __repr__(self): + return f"FactorFormula(expression='{self.expression}', features={self.feature_names})" diff --git a/factor_mining/validator.py b/factor_mining/validator.py new file mode 100644 index 0000000..725770e --- /dev/null +++ b/factor_mining/validator.py @@ -0,0 +1,237 @@ +""" +因子有效性检验模块:整合所有检验方案 +""" +import numpy as np +import pandas as pd +from typing import Dict, List, Optional +from dataclasses import dataclass +from statsmodels.regression.linear_model import OLS + +from validation import ( + compute_ic, + compute_rolling_ic, + group_backtest, + factor_span_regression +) + + +@dataclass +class ValidationConfig: + """验证配置""" + ic_window: int = 30 + ic_method: str = "spearman" # "spearman" or "pearson" + n_groups: int = 3 + group_period: int = 180 + min_ic: float = 0.01 + min_tstat: float = 1.5 + min_r2_change: float = 0.05 + + +class FactorValidator: + """因子有效性检验器""" + + def __init__(self, config: ValidationConfig): + self.config = config + + def validate_ic( + self, + factor: pd.Series, + forward_return: pd.Series + ) -> Dict: + """ + IC检验 + + Returns: + -------- + dict: 包含mean_ic, ic_ir, ic_series等 + """ + rolling_ic = compute_rolling_ic( + factor, + forward_return, + window=self.config.ic_window, + method=self.config.ic_method + ) + + mean_ic = rolling_ic.mean() + ic_std = rolling_ic.std() + ic_ir = mean_ic / (ic_std + 1e-8) # IC信息比率 + + return { + "mean_ic": mean_ic, + "ic_std": ic_std, + "ic_ir": ic_ir, + "ic_series": rolling_ic, + "is_valid": abs(mean_ic) >= self.config.min_ic + } + + def validate_group_backtest( + self, + factor: pd.Series, + forward_return: pd.Series + ) -> Dict: + """ + 分组回测检验 + + Returns: + -------- + dict: 包含mean_h_l_return, mean_h_l_tstat等 + """ + result = group_backtest( + factor, + forward_return, + n_groups=self.config.n_groups, + group_period=self.config.group_period + ) + + is_valid = abs(result.get('mean_h_l_tstat', 0)) >= self.config.min_tstat + + return { + **result, + "is_valid": is_valid + } + + def validate_regression( + self, + factor: pd.Series, + forward_return: pd.Series, + other_factors: Optional[pd.DataFrame] = None + ) -> Dict: + """ + 因子跨度回归检验 + + Parameters: + ----------- + factor : Series + 待检验因子 + forward_return : Series + 未来收益率 + other_factors : DataFrame, optional + 其他因子(用于控制变量) + + Returns: + -------- + dict: 包含beta, tstat, r2_change等 + """ + if other_factors is None: + other_factors = pd.DataFrame() + + # 合并因子 + factors_df = pd.concat([other_factors, factor.to_frame(name='target')], axis=1) + + result = factor_span_regression( + factors_df, + forward_return, + target_factor='target' + ) + + is_valid = ( + abs(result.get('tstat', 0)) >= self.config.min_tstat and + result.get('r2_change', 0) >= self.config.min_r2_change + ) + + return { + **result, + "is_valid": is_valid + } + + def validate_all( + self, + factor: pd.Series, + forward_return: pd.Series, + other_factors: Optional[pd.DataFrame] = None + ) -> Dict: + """ + 综合检验:执行所有检验方法 + + Returns: + -------- + dict: 包含所有检验结果和综合判断 + """ + results = {} + + # IC检验 + ic_result = self.validate_ic(factor, forward_return) + results['ic'] = ic_result + + # 分组回测 + group_result = self.validate_group_backtest(factor, forward_return) + results['group_backtest'] = group_result + + # 回归检验 + reg_result = self.validate_regression(factor, forward_return, other_factors) + results['regression'] = reg_result + + # 综合判断 + is_valid = ( + ic_result['is_valid'] and + group_result['is_valid'] and + reg_result['is_valid'] + ) + + results['is_valid'] = is_valid + results['score'] = self._calculate_score(ic_result, group_result, reg_result) + + return results + + def _calculate_score( + self, + ic_result: Dict, + group_result: Dict, + reg_result: Dict + ) -> float: + """计算综合得分""" + score = 0.0 + + # IC得分(权重0.3) + ic_score = abs(ic_result.get('mean_ic', 0)) * 10 + score += ic_score * 0.3 + + # 分组回测得分(权重0.4) + tstat = abs(group_result.get('mean_h_l_tstat', 0)) + tstat_score = min(tstat / 3.0, 1.0) # 归一化到[0, 1] + score += tstat_score * 0.4 + + # 回归得分(权重0.3) + r2_change = reg_result.get('r2_change', 0) + r2_score = min(r2_change / 0.1, 1.0) # 归一化到[0, 1] + score += r2_score * 0.3 + + return score + + def filter_factors( + self, + factors: pd.DataFrame, + forward_return: pd.Series + ) -> pd.DataFrame: + """ + 批量过滤因子:只保留有效因子 + + Returns: + -------- + DataFrame: 有效因子 + """ + valid_factors = [] + + for col in factors.columns: + factor = factors[col] + result = self.validate_all(factor, forward_return, factors.drop(columns=[col])) + + if result['is_valid']: + valid_factors.append(col) + + return factors[valid_factors] if valid_factors else pd.DataFrame() + + +def create_validator( + ic_window: int = 30, + min_ic: float = 0.01, + min_tstat: float = 1.5 +) -> FactorValidator: + """创建验证器(便捷函数)""" + config = ValidationConfig( + ic_window=ic_window, + min_ic=min_ic, + min_tstat=min_tstat + ) + return FactorValidator(config) +