Compare commits
2 Commits
a66e42a8ae
...
e5beada25e
| Author | SHA1 | Date | |
|---|---|---|---|
| e5beada25e | |||
| dc3d41d6e5 |
179
.gitignore
vendored
Normal file
179
.gitignore
vendored
Normal file
@@ -0,0 +1,179 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# Data files (keep structure but ignore large data)
|
||||
data/
|
||||
|
||||
# IDE files
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# OS generated files
|
||||
.DS_Store
|
||||
.DS_Store?
|
||||
._*
|
||||
.Spotlight-V100
|
||||
.Trashes
|
||||
ehthumbs.db
|
||||
Thumbs.db
|
||||
|
||||
# Docker
|
||||
.dockerignore
|
||||
|
||||
# Logs
|
||||
*.log
|
||||
logs/
|
||||
|
||||
# Temporary files
|
||||
tmp/
|
||||
temp/
|
||||
*.tmp
|
||||
*.temp
|
||||
|
||||
# API keys and secrets
|
||||
.env
|
||||
config.ini
|
||||
secrets.json
|
||||
api_keys.txt
|
||||
|
||||
# Database files
|
||||
*.db
|
||||
*.sqlite
|
||||
*.sqlite3
|
||||
|
||||
# Backup files
|
||||
*.bak
|
||||
*.backup
|
||||
|
||||
*.csv
|
||||
*.feather
|
||||
243
factor_mining/gp_miner.py
Normal file
243
factor_mining/gp_miner.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""
|
||||
DEAP遗传编程挖掘器实现
|
||||
"""
|
||||
|
||||
import random
|
||||
import operator
|
||||
from typing import List, Tuple, Optional
|
||||
from dataclasses import dataclass
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from deap import algorithms, base, creator, gp, tools
|
||||
|
||||
from factor_mining.operators import FactorFormula, get_registry, get_operator
|
||||
from factor_mining.mining import FactorMiner, MiningConfig
|
||||
from data import compute_forward_returns
|
||||
|
||||
|
||||
@dataclass
|
||||
class GPConfig(MiningConfig):
|
||||
"""GP挖掘配置"""
|
||||
|
||||
population_size: int = 200
|
||||
generations: int = 30
|
||||
tournament_size: int = 5
|
||||
crossover_prob: float = 0.9
|
||||
mutation_prob: float = 0.05
|
||||
elitism: int = 5
|
||||
max_depth_init: int = 1
|
||||
max_depth: int = 8
|
||||
complexity_penalty: float = 0.001
|
||||
|
||||
|
||||
class GPMiner(FactorMiner):
|
||||
"""DEAP遗传编程挖掘器"""
|
||||
|
||||
def __init__(self, config: GPConfig):
|
||||
super().__init__(config)
|
||||
self.config: GPConfig = config
|
||||
self.toolbox: Optional[base.Toolbox] = None
|
||||
self.pset: Optional[gp.PrimitiveSetTyped] = None
|
||||
self.features: Optional[List[pd.Series]] = None
|
||||
|
||||
def get_name(self) -> str:
|
||||
return "gp"
|
||||
|
||||
def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped:
|
||||
"""构建GP原始集合"""
|
||||
registry = get_registry()
|
||||
pset = gp.PrimitiveSetTyped(
|
||||
"MAIN", [np.ndarray for _ in feature_names], np.ndarray
|
||||
)
|
||||
|
||||
# 命名参数
|
||||
for i, name in enumerate(feature_names):
|
||||
pset.renameArguments(**{f"ARG{i}": name})
|
||||
|
||||
# 添加算子
|
||||
for op_name in registry.list_all():
|
||||
op = registry.get(op_name)
|
||||
if op:
|
||||
sig = op.get_signature()
|
||||
params = list(sig.parameters.values())
|
||||
|
||||
# 根据参数数量判断是一元还是二元算子
|
||||
if len(params) == 1:
|
||||
# 一元算子
|
||||
pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name)
|
||||
elif len(params) == 2:
|
||||
# 二元算子
|
||||
pset.addPrimitive(
|
||||
op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name
|
||||
)
|
||||
|
||||
# 添加常量
|
||||
# def _const() -> np.ndarray:
|
||||
# return np.array(random.uniform(-2.0, 2.0))
|
||||
# pset.addEphemeralConstant("const", _const, np.ndarray)
|
||||
|
||||
return pset
|
||||
|
||||
def _evaluate_individual(self, individual, target: pd.Series) -> Tuple[float]:
|
||||
"""评估个体适应度"""
|
||||
func = self.toolbox.compile(expr=individual)
|
||||
|
||||
# 构建特征矩阵
|
||||
idx = target.index
|
||||
inputs = [f.reindex(idx).to_numpy() for f in self.features]
|
||||
|
||||
try:
|
||||
raw = func(*inputs)
|
||||
except Exception:
|
||||
return (-1e6,)
|
||||
|
||||
# 确保数组长度
|
||||
if not isinstance(raw, np.ndarray):
|
||||
return (-1e6,)
|
||||
if raw.shape[0] != len(idx):
|
||||
return (-1e6,)
|
||||
|
||||
# 转换为Series并清理
|
||||
factor = pd.Series(raw, index=idx)
|
||||
factor = factor.replace([np.inf, -np.inf], np.nan)
|
||||
factor = factor.ffill().bfill()
|
||||
|
||||
# 计算滚动IC
|
||||
window = self.config.ic_window
|
||||
if len(factor) < window + 10:
|
||||
return (-1e6,)
|
||||
|
||||
from validation import compute_rolling_ic
|
||||
|
||||
ic_series = compute_rolling_ic(
|
||||
factor, target, window=window, method=self.config.ic_method
|
||||
)
|
||||
mean_ic = ic_series.mean()
|
||||
|
||||
if not np.isfinite(mean_ic):
|
||||
return (-1e6,)
|
||||
|
||||
# 复杂度惩罚
|
||||
complexity = len(individual)
|
||||
fitness = mean_ic - self.config.complexity_penalty * complexity
|
||||
|
||||
if not np.isfinite(fitness):
|
||||
fitness = -1e6
|
||||
|
||||
return (fitness,)
|
||||
|
||||
def _individual_to_formula(
|
||||
self, individual, feature_names: List[str]
|
||||
) -> FactorFormula:
|
||||
"""将GP个体转换为因子公式"""
|
||||
# GP表达式是PrimitiveTree,转换为字符串后是函数调用形式
|
||||
# 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)"
|
||||
expr_str = str(individual)
|
||||
|
||||
# 替换ARG0, ARG1等为实际特征名
|
||||
for i, name in enumerate(feature_names):
|
||||
expr_str = expr_str.replace(f"ARG{i}", name)
|
||||
|
||||
# GP表达式已经是Python可执行的函数调用格式
|
||||
# 例如: "add(close, open)" 可以直接eval
|
||||
# 但需要确保所有算子都在环境中可用
|
||||
|
||||
return FactorFormula(expr_str, feature_names)
|
||||
|
||||
def mine(
|
||||
self, data: pd.DataFrame, feature_cols: List[str], price_col: str = "close"
|
||||
) -> List[FactorFormula]:
|
||||
"""执行GP挖掘"""
|
||||
if self.config.seed is not None:
|
||||
random.seed(self.config.seed)
|
||||
np.random.seed(self.config.seed)
|
||||
|
||||
# 准备数据
|
||||
price = data[price_col].astype(float)
|
||||
forward_ret = compute_forward_returns(price, self.config.ret_horizon)
|
||||
target = forward_ret
|
||||
|
||||
self.features = [data[c].astype(float) for c in feature_cols]
|
||||
|
||||
# 构建原始集合
|
||||
self.pset = self._build_pset(feature_cols)
|
||||
|
||||
# 创建DEAP类型
|
||||
if not hasattr(creator, "FitnessMax"):
|
||||
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
|
||||
if not hasattr(creator, "Individual"):
|
||||
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
|
||||
|
||||
# 构建工具箱
|
||||
self.toolbox = base.Toolbox()
|
||||
self.toolbox.register(
|
||||
"expr",
|
||||
gp.genHalfAndHalf,
|
||||
pset=self.pset,
|
||||
min_=1,
|
||||
max_=self.config.max_depth_init,
|
||||
)
|
||||
self.toolbox.register(
|
||||
"individual", tools.initIterate, creator.Individual, self.toolbox.expr
|
||||
)
|
||||
self.toolbox.register(
|
||||
"population", tools.initRepeat, list, self.toolbox.individual
|
||||
)
|
||||
self.toolbox.register("compile", gp.compile, pset=self.pset)
|
||||
|
||||
self.toolbox.register("evaluate", self._evaluate_individual, target=target)
|
||||
|
||||
# 遗传算子
|
||||
self.toolbox.register(
|
||||
"select", tools.selTournament, tournsize=self.config.tournament_size
|
||||
)
|
||||
self.toolbox.register("mate", gp.cxOnePoint)
|
||||
self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
|
||||
self.toolbox.register(
|
||||
"mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset
|
||||
)
|
||||
|
||||
# 控制树深度
|
||||
self.toolbox.decorate(
|
||||
"mate",
|
||||
gp.staticLimit(
|
||||
key=operator.attrgetter("height"), max_value=self.config.max_depth
|
||||
),
|
||||
)
|
||||
self.toolbox.decorate(
|
||||
"mutate",
|
||||
gp.staticLimit(
|
||||
key=operator.attrgetter("height"), max_value=self.config.max_depth
|
||||
),
|
||||
)
|
||||
|
||||
# 运行进化
|
||||
pop = self.toolbox.population(n=self.config.population_size)
|
||||
hof = tools.HallOfFame(maxsize=max(5000, self.config.elitism))
|
||||
|
||||
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
|
||||
stats_size = tools.Statistics(len)
|
||||
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
|
||||
mstats.register("avg", np.nanmean)
|
||||
mstats.register("std", np.nanstd)
|
||||
mstats.register("min", np.nanmin)
|
||||
mstats.register("max", np.nanmax)
|
||||
|
||||
pop, logbook = algorithms.eaSimple(
|
||||
pop,
|
||||
self.toolbox,
|
||||
cxpb=self.config.crossover_prob,
|
||||
mutpb=self.config.mutation_prob,
|
||||
ngen=self.config.generations,
|
||||
stats=mstats,
|
||||
halloffame=hof,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# 转换为因子公式
|
||||
formulas = []
|
||||
for individual in hof:
|
||||
formula = self._individual_to_formula(individual, feature_cols)
|
||||
formulas.append(formula)
|
||||
|
||||
return formulas
|
||||
123
factor_mining/mining.py
Normal file
123
factor_mining/mining.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""
|
||||
因子挖掘抽象层:支持多种挖掘算法(DEAP、DL、RL等)
|
||||
"""
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Dict, Optional, Any
|
||||
import pandas as pd
|
||||
from dataclasses import dataclass
|
||||
|
||||
from factor_mining.operators import FactorFormula
|
||||
|
||||
|
||||
@dataclass
|
||||
class MiningConfig:
|
||||
"""挖掘配置基类"""
|
||||
ret_horizon: int = 1
|
||||
ic_window: int = 30
|
||||
ic_method: str = "spearman" # "spearman" or "pearson"
|
||||
seed: Optional[int] = None
|
||||
|
||||
|
||||
class FactorMiner(ABC):
|
||||
"""因子挖掘器抽象基类"""
|
||||
|
||||
def __init__(self, config: MiningConfig):
|
||||
self.config = config
|
||||
|
||||
@abstractmethod
|
||||
def mine(
|
||||
self,
|
||||
data: pd.DataFrame,
|
||||
feature_cols: List[str],
|
||||
price_col: str = "close"
|
||||
) -> List[FactorFormula]:
|
||||
"""
|
||||
挖掘因子
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
data : DataFrame
|
||||
数据
|
||||
feature_cols : List[str]
|
||||
特征列名列表
|
||||
price_col : str
|
||||
价格列名
|
||||
|
||||
Returns:
|
||||
--------
|
||||
List[FactorFormula]: 挖掘出的因子公式列表
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str:
|
||||
"""获取挖掘器名称"""
|
||||
pass
|
||||
|
||||
|
||||
class MiningPipeline:
|
||||
"""挖掘流程管理器"""
|
||||
|
||||
def __init__(self):
|
||||
self.miners: Dict[str, FactorMiner] = {}
|
||||
|
||||
def register_miner(self, miner: FactorMiner):
|
||||
"""注册挖掘器"""
|
||||
name = miner.get_name()
|
||||
if name in self.miners:
|
||||
raise ValueError(f"挖掘器 '{name}' 已存在")
|
||||
self.miners[name] = miner
|
||||
|
||||
def get_miner(self, name: str) -> Optional[FactorMiner]:
|
||||
"""获取挖掘器"""
|
||||
return self.miners.get(name)
|
||||
|
||||
def list_miners(self) -> List[str]:
|
||||
"""列出所有挖掘器"""
|
||||
return list(self.miners.keys())
|
||||
|
||||
def mine(
|
||||
self,
|
||||
miner_name: str,
|
||||
data: pd.DataFrame,
|
||||
feature_cols: List[str],
|
||||
price_col: str = "close"
|
||||
) -> List[FactorFormula]:
|
||||
"""
|
||||
使用指定挖掘器进行挖掘
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
miner_name : str
|
||||
挖掘器名称
|
||||
data : DataFrame
|
||||
数据
|
||||
feature_cols : List[str]
|
||||
特征列名列表
|
||||
price_col : str
|
||||
价格列名
|
||||
|
||||
Returns:
|
||||
--------
|
||||
List[FactorFormula]: 挖掘出的因子公式列表
|
||||
"""
|
||||
miner = self.get_miner(miner_name)
|
||||
if miner is None:
|
||||
raise ValueError(f"挖掘器 '{miner_name}' 不存在")
|
||||
|
||||
return miner.mine(data, feature_cols, price_col)
|
||||
|
||||
|
||||
# 全局挖掘流程管理器
|
||||
_pipeline = MiningPipeline()
|
||||
|
||||
|
||||
def register_miner(miner: FactorMiner):
|
||||
"""注册挖掘器到全局管理器"""
|
||||
_pipeline.register_miner(miner)
|
||||
|
||||
|
||||
def get_pipeline() -> MiningPipeline:
|
||||
"""获取全局挖掘流程管理器"""
|
||||
return _pipeline
|
||||
|
||||
653
factor_mining/operators.py
Normal file
653
factor_mining/operators.py
Normal file
@@ -0,0 +1,653 @@
|
||||
"""
|
||||
算子系统:基础数学算子和技术指标算子的注册与管理
|
||||
支持算子的注册、查询、反射调用
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Dict, Callable, List, Optional, Any
|
||||
from abc import ABC, abstractmethod
|
||||
import inspect
|
||||
|
||||
import talib
|
||||
|
||||
|
||||
class Operator(ABC):
|
||||
"""算子基类"""
|
||||
|
||||
def __init__(self, name: str, func: Callable, description: str = ""):
|
||||
"""
|
||||
Parameters:
|
||||
-----------
|
||||
name : str
|
||||
算子名称(唯一标识)
|
||||
func : Callable
|
||||
算子函数
|
||||
description : str
|
||||
算子描述
|
||||
"""
|
||||
self.name = name
|
||||
self.func = func
|
||||
self.description = description
|
||||
self._signature = inspect.signature(func)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
"""调用算子函数"""
|
||||
return self.func(*args, **kwargs)
|
||||
|
||||
def get_signature(self):
|
||||
"""获取函数签名"""
|
||||
return self._signature
|
||||
|
||||
def __repr__(self):
|
||||
return f"Operator(name='{self.name}', description='{self.description}')"
|
||||
|
||||
|
||||
class OperatorRegistry:
|
||||
"""算子注册表"""
|
||||
|
||||
def __init__(self):
|
||||
self._operators: Dict[str, Operator] = {}
|
||||
|
||||
def register(self, operator: Operator):
|
||||
"""注册算子"""
|
||||
if operator.name in self._operators:
|
||||
raise ValueError(f"算子 '{operator.name}' 已存在")
|
||||
self._operators[operator.name] = operator
|
||||
|
||||
def register_function(self, name: str, func: Callable, description: str = ""):
|
||||
"""直接注册函数为算子"""
|
||||
operator = Operator(name, func, description)
|
||||
self.register(operator)
|
||||
|
||||
def get(self, name: str) -> Optional[Operator]:
|
||||
"""获取算子"""
|
||||
return self._operators.get(name)
|
||||
|
||||
def has(self, name: str) -> bool:
|
||||
"""检查算子是否存在"""
|
||||
return name in self._operators
|
||||
|
||||
def list_all(self) -> List[str]:
|
||||
"""列出所有算子名称"""
|
||||
return list(self._operators.keys())
|
||||
|
||||
def get_all(self) -> Dict[str, Operator]:
|
||||
"""获取所有算子"""
|
||||
return self._operators.copy()
|
||||
|
||||
|
||||
# 全局算子注册表
|
||||
_registry = OperatorRegistry()
|
||||
|
||||
|
||||
def register_operator(name: str, description: str = ""):
|
||||
"""装饰器:注册算子"""
|
||||
|
||||
def decorator(func: Callable):
|
||||
_registry.register_function(name, func, description)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def get_operator(name: str) -> Optional[Operator]:
|
||||
"""获取算子"""
|
||||
return _registry.get(name)
|
||||
|
||||
|
||||
def get_registry() -> OperatorRegistry:
|
||||
"""获取全局注册表"""
|
||||
return _registry
|
||||
|
||||
|
||||
# 定义period参数的值范围
|
||||
PERIOD_RANGE = range(10, 100) # 10到99
|
||||
|
||||
# ==================== 基础数学算子 ====================
|
||||
|
||||
|
||||
@register_operator("add", "加法: x + y")
|
||||
def _add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
|
||||
return x + y
|
||||
|
||||
|
||||
@register_operator("sub", "减法: x - y")
|
||||
def _sub(x: np.ndarray, y: np.ndarray) -> np.ndarray:
|
||||
return x - y
|
||||
|
||||
|
||||
@register_operator("mul", "乘法: x * y")
|
||||
def _mul(x: np.ndarray, y: np.ndarray) -> np.ndarray:
|
||||
return x * y
|
||||
|
||||
|
||||
@register_operator("div", "除法: x / y (安全除法)")
|
||||
def _div(x: np.ndarray, y: np.ndarray) -> np.ndarray:
|
||||
denom = np.where(np.abs(y) < 1e-12, np.nan, y)
|
||||
return x / denom
|
||||
|
||||
|
||||
@register_operator("neg", "取负: -x")
|
||||
def _neg(x: np.ndarray) -> np.ndarray:
|
||||
return np.negative(x)
|
||||
|
||||
|
||||
@register_operator("abs", "绝对值: |x|")
|
||||
def _abs(x: np.ndarray) -> np.ndarray:
|
||||
return np.abs(x)
|
||||
|
||||
|
||||
@register_operator("log", "对数: log(|x|)")
|
||||
def _log(x: np.ndarray) -> np.ndarray:
|
||||
return np.log(np.clip(np.abs(x), 1e-12, None))
|
||||
|
||||
|
||||
@register_operator("sqrt", "平方根: sqrt(x)")
|
||||
def _sqrt(x: np.ndarray) -> np.ndarray:
|
||||
return np.sqrt(np.clip(x, 0.0, None))
|
||||
|
||||
|
||||
@register_operator("pow", "幂运算: x^y (限制范围)")
|
||||
def _pow(x: np.ndarray, y: np.ndarray) -> np.ndarray:
|
||||
y_clip = np.clip(y, -3.0, 3.0)
|
||||
with np.errstate(over="ignore", invalid="ignore"):
|
||||
out = np.power(np.clip(x, -1e6, 1e6), y_clip)
|
||||
out[~np.isfinite(out)] = np.nan
|
||||
return out
|
||||
|
||||
|
||||
# ==================== 时间序列算子 ====================
|
||||
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
|
||||
s = pd.Series(x)
|
||||
return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy()
|
||||
|
||||
|
||||
def _rolling_std(x: np.ndarray, window: int) -> np.ndarray:
|
||||
s = pd.Series(x)
|
||||
return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy()
|
||||
|
||||
|
||||
def _ts_delta(x: np.ndarray, period: int) -> np.ndarray:
|
||||
s = pd.Series(x)
|
||||
return s.diff(period).to_numpy()
|
||||
|
||||
|
||||
def _ts_rank(x: np.ndarray, window: int) -> np.ndarray:
|
||||
s = pd.Series(x)
|
||||
return (
|
||||
s.rolling(window, min_periods=max(2, window // 2))
|
||||
.apply(lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False)
|
||||
.to_numpy()
|
||||
)
|
||||
|
||||
|
||||
def _delay(x: np.ndarray, period: int) -> np.ndarray:
|
||||
s = pd.Series(x)
|
||||
return s.shift(period).to_numpy()
|
||||
|
||||
|
||||
def _pct_change(x: np.ndarray, period: int = 1) -> np.ndarray:
|
||||
"""百分比变化"""
|
||||
s = pd.Series(x)
|
||||
return s.pct_change(periods=period, fill_method=None).to_numpy()
|
||||
|
||||
|
||||
# 注册单参数百分比变化算子
|
||||
@register_operator("pct", "百分比变化: PCT(x, 1)")
|
||||
def _pct(x: np.ndarray) -> np.ndarray:
|
||||
return _pct_change(x, 1)
|
||||
|
||||
|
||||
# 注册时间序列算子(带不同窗口)
|
||||
for w in PERIOD_RANGE:
|
||||
_registry.register_function(
|
||||
f"sma{w}", lambda x, w=w: _rolling_mean(x, w), f"简单移动平均: SMA(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"std{w}", lambda x, w=w: _rolling_std(x, w), f"滚动标准差: STD(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"rank{w}", lambda x, w=w: _ts_rank(x, w), f"滚动排名: RANK(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"delta{w}", lambda x, w=w: _ts_delta(x, w), f"差分: DELTA(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"delay{w}", lambda x, w=w: _delay(x, w), f"延迟: DELAY(x, {w})"
|
||||
)
|
||||
|
||||
|
||||
# ==================== 技术指标算子(含自定义与ta-lib)====================
|
||||
|
||||
|
||||
def _try_float(x):
|
||||
try:
|
||||
return float(x)
|
||||
except Exception:
|
||||
return x
|
||||
|
||||
|
||||
def _convert_input(v):
|
||||
# 如果是pd.Series,返回np.ndarray; 如果已经是np.ndarray则原样返回
|
||||
if isinstance(v, pd.Series):
|
||||
return v.values
|
||||
return v
|
||||
|
||||
|
||||
# 注册 ta-lib 技术指标
|
||||
# 获取 TA-Lib 的所有函数名(常用financial indicators均为大写)
|
||||
talib_func_list = [f for f in dir(talib) if f.isupper() and callable(getattr(talib, f))]
|
||||
|
||||
# 定义需要生成多版本的参数名(period相关参数)
|
||||
# 按优先级排序,优先匹配主要的period参数
|
||||
PERIOD_PARAM_NAMES = [
|
||||
"timeperiod", # 最常见的参数名
|
||||
"period", # 通用period参数
|
||||
"optintimeperiod", # TA-Lib内部参数名
|
||||
]
|
||||
|
||||
# 多period参数的函数(需要特殊处理)
|
||||
# 对于这些函数,明确指定主要period参数,避免自动检测错误
|
||||
MULTI_PERIOD_FUNCTIONS = {
|
||||
# 函数名: (主要period参数名, 次要period参数列表,仅用于文档)
|
||||
"MACD": ("fastperiod", ["slowperiod", "signalperiod"]),
|
||||
"MACDEXT": ("fastperiod", ["slowperiod", "signalperiod"]),
|
||||
"MACDFIX": ("signalperiod", []),
|
||||
"STOCH": ("fastk_period", ["slowk_period", "slowd_period"]),
|
||||
"STOCHF": ("fastk_period", ["fastd_period"]),
|
||||
"STOCHRSI": ("timeperiod", ["fastk_period", "fastd_period"]),
|
||||
"BBANDS": ("timeperiod", ["nbdevup", "nbdevdn"]),
|
||||
"APO": ("fastperiod", ["slowperiod"]),
|
||||
"PPO": ("fastperiod", ["slowperiod"]),
|
||||
"ULTOSC": ("timeperiod1", ["timeperiod2", "timeperiod3"]),
|
||||
"BOP": ("", []), # 无period参数,注册默认版本
|
||||
}
|
||||
|
||||
|
||||
def build_talib_wrapper(func, func_name, fixed_params=None):
|
||||
"""构建talib函数包装器,支持固定某些参数"""
|
||||
fixed_params = fixed_params or {}
|
||||
|
||||
def _talib_wrap(*args, **kwargs):
|
||||
# 合并固定参数和传入参数
|
||||
merged_kwargs = {**fixed_params, **kwargs}
|
||||
# ta-lib 有些函数只支持关键字参数
|
||||
# 自动转换所有输入类型
|
||||
args = tuple(_convert_input(arg) for arg in args)
|
||||
for k in merged_kwargs:
|
||||
merged_kwargs[k] = _convert_input(merged_kwargs[k])
|
||||
result = func(*args, **merged_kwargs)
|
||||
# TA-Lib有些输出是tuple(比如MACD),统一返回ndarray/tuple[ndarray]
|
||||
if isinstance(result, tuple):
|
||||
# 保持tuple结构
|
||||
return tuple(
|
||||
np.asarray(item) if item is not None else None for item in result
|
||||
)
|
||||
return np.asarray(result)
|
||||
|
||||
_talib_wrap.__name__ = f"talib_{func_name.lower()}"
|
||||
return _talib_wrap
|
||||
|
||||
|
||||
for func_name in talib_func_list:
|
||||
func = getattr(talib, func_name)
|
||||
sig = inspect.signature(func)
|
||||
params = sig.parameters
|
||||
|
||||
# 检查是否在特殊配置字典中
|
||||
if func_name in MULTI_PERIOD_FUNCTIONS:
|
||||
main_period_param, _ = MULTI_PERIOD_FUNCTIONS[func_name]
|
||||
# 如果配置中指定了主要period参数,使用它
|
||||
if main_period_param and main_period_param in params:
|
||||
for period_value in PERIOD_RANGE:
|
||||
fixed_params = {main_period_param: period_value}
|
||||
wrapper = build_talib_wrapper(func, func_name, fixed_params)
|
||||
op_name = f"talib_{func_name.lower()}_{period_value}"
|
||||
desc = f"ta-lib: {func_name}({main_period_param}={period_value})"
|
||||
_registry.register_function(op_name, wrapper, desc)
|
||||
else:
|
||||
# 配置中指定无period参数,注册默认版本
|
||||
wrapper = build_talib_wrapper(func, func_name)
|
||||
op_name = f"talib_{func_name.lower()}"
|
||||
desc = f"ta-lib: {func_name}"
|
||||
_registry.register_function(op_name, wrapper, desc)
|
||||
else:
|
||||
# 不在特殊配置中,自动检测period参数
|
||||
period_params = {}
|
||||
for param_name, param in params.items():
|
||||
param_lower = param_name.lower()
|
||||
# 检查是否是period相关参数
|
||||
if any(
|
||||
period_keyword in param_lower for period_keyword in PERIOD_PARAM_NAMES
|
||||
):
|
||||
period_params[param_name] = param
|
||||
|
||||
if period_params:
|
||||
# 如果有period参数,为每个period值生成一个版本
|
||||
# 优先选择timeperiod,否则选择第一个
|
||||
main_period_param = None
|
||||
for preferred in ["timeperiod", "period", "optintimeperiod"]:
|
||||
for param_name in period_params.keys():
|
||||
if preferred in param_name.lower():
|
||||
main_period_param = param_name
|
||||
break
|
||||
if main_period_param:
|
||||
break
|
||||
|
||||
if not main_period_param:
|
||||
main_period_param = list(period_params.keys())[0]
|
||||
|
||||
for period_value in PERIOD_RANGE:
|
||||
fixed_params = {main_period_param: period_value}
|
||||
wrapper = build_talib_wrapper(func, func_name, fixed_params)
|
||||
op_name = f"talib_{func_name.lower()}_{period_value}"
|
||||
desc = f"ta-lib: {func_name}({main_period_param}={period_value})"
|
||||
_registry.register_function(op_name, wrapper, desc)
|
||||
else:
|
||||
# 如果没有period参数,注册默认版本
|
||||
wrapper = build_talib_wrapper(func, func_name)
|
||||
op_name = f"talib_{func_name.lower()}"
|
||||
desc = f"ta-lib: {func_name}"
|
||||
_registry.register_function(op_name, wrapper, desc)
|
||||
|
||||
# ==================== 自定义常见技术指标 ====================
|
||||
|
||||
|
||||
def _ewm_forward(x: np.ndarray, alpha: float) -> np.ndarray:
|
||||
"""指数加权移动平均(前向计算)"""
|
||||
result = np.zeros_like(x)
|
||||
if len(x) == 0:
|
||||
return result
|
||||
result[0] = x[0]
|
||||
for i in range(1, len(x)):
|
||||
result[i] = x[i] * alpha + (1 - alpha) * result[i - 1]
|
||||
return result
|
||||
|
||||
|
||||
def _rsv(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""相对强弱值: (当前值 - 最小值) / (最大值 - 最小值)"""
|
||||
s = pd.Series(x)
|
||||
rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
min_val = rolling.min()
|
||||
max_val = rolling.max()
|
||||
diff = max_val - min_val
|
||||
# 避免除零
|
||||
diff = np.where(np.abs(diff) < 1e-12, np.nan, diff)
|
||||
result = (s - min_val) / diff
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
def _bband(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""布林带指标: (当前值 - 均值) / 标准差"""
|
||||
s = pd.Series(x)
|
||||
rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
mean_val = rolling.mean()
|
||||
std_val = rolling.std()
|
||||
# 避免除零
|
||||
std_val = np.where(np.abs(std_val) < 1e-12, np.nan, std_val)
|
||||
result = (s - mean_val) / std_val
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
def _rsi(x: np.ndarray, window: int, threshold: float = 0.00001) -> np.ndarray:
|
||||
"""相对强弱指标: 上涨和下跌的比例"""
|
||||
s = pd.Series(x)
|
||||
diff = s.diff()
|
||||
rolling = diff.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
|
||||
def _rsi_calc(series):
|
||||
up_sum = series[series > threshold].sum()
|
||||
down_sum = abs(series[series < -threshold].sum())
|
||||
total = up_sum + down_sum
|
||||
if total < 1e-12:
|
||||
return np.nan
|
||||
return up_sum / total
|
||||
|
||||
result = rolling.apply(_rsi_calc, raw=False)
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
def _rolling_skew(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""滚动偏度"""
|
||||
s = pd.Series(x)
|
||||
return (
|
||||
s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
.skew()
|
||||
.to_numpy()
|
||||
)
|
||||
|
||||
|
||||
def _rolling_kurtosis(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""滚动峰度"""
|
||||
s = pd.Series(x)
|
||||
return (
|
||||
s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
.kurt()
|
||||
.to_numpy()
|
||||
)
|
||||
|
||||
|
||||
def _rolling_linear(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""滚动线性回归斜率"""
|
||||
s = pd.Series(x)
|
||||
|
||||
def _linear_slope(series):
|
||||
valid = series.dropna()
|
||||
if len(valid) < 2:
|
||||
return np.nan
|
||||
try:
|
||||
coeffs = np.polyfit(np.arange(len(valid)), valid.values, 1)
|
||||
return coeffs[0]
|
||||
except:
|
||||
return np.nan
|
||||
|
||||
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
|
||||
_linear_slope, raw=False
|
||||
)
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
def _rolling_autocorr(x: np.ndarray, window: int, lag: int = 1) -> np.ndarray:
|
||||
"""滚动自相关"""
|
||||
s = pd.Series(x)
|
||||
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
|
||||
lambda series: (
|
||||
series.autocorr(lag=lag) if len(series.dropna()) >= 2 else np.nan
|
||||
),
|
||||
raw=False,
|
||||
)
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
def _rolling_max(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""滚动最大值"""
|
||||
s = pd.Series(x)
|
||||
return (
|
||||
s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
.max()
|
||||
.to_numpy()
|
||||
)
|
||||
|
||||
|
||||
def _rolling_min(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""滚动最小值"""
|
||||
s = pd.Series(x)
|
||||
return (
|
||||
s.rolling(window, min_periods=max(2, window // 2), closed="both")
|
||||
.min()
|
||||
.to_numpy()
|
||||
)
|
||||
|
||||
|
||||
def _huanbi(x: np.ndarray, window: int) -> np.ndarray:
|
||||
"""环比: 当前值 / 窗口起始值"""
|
||||
s = pd.Series(x)
|
||||
|
||||
def _huanbi_calc(series):
|
||||
if len(series) < 2:
|
||||
return np.nan
|
||||
start_val = series.iloc[0]
|
||||
end_val = series.iloc[-1]
|
||||
if abs(start_val) < 1e-12:
|
||||
return np.nan
|
||||
return end_val / start_val
|
||||
|
||||
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
|
||||
_huanbi_calc, raw=False
|
||||
)
|
||||
return result.to_numpy()
|
||||
|
||||
|
||||
# 注册技术指标算子(带不同窗口)
|
||||
for w in PERIOD_RANGE:
|
||||
# EWM算子(使用固定alpha值)
|
||||
alpha = 2.0 / (w + 1)
|
||||
_registry.register_function(
|
||||
f"ewm{w}",
|
||||
lambda x, w=w, a=alpha: _ewm_forward(x, a),
|
||||
f"指数加权移动平均: EWM(x, {w})",
|
||||
)
|
||||
|
||||
# 百分比变化
|
||||
_registry.register_function(
|
||||
f"pct{w}", lambda x, w=w: _pct_change(x, w), f"百分比变化: PCT(x, {w})"
|
||||
)
|
||||
|
||||
# RSV(相对强弱值)
|
||||
_registry.register_function(
|
||||
f"rsv{w}", lambda x, w=w: _rsv(x, w), f"相对强弱值: RSV(x, {w})"
|
||||
)
|
||||
|
||||
# 布林带
|
||||
_registry.register_function(
|
||||
f"bband{w}", lambda x, w=w: _bband(x, w), f"布林带指标: BBAND(x, {w})"
|
||||
)
|
||||
|
||||
# RSI
|
||||
_registry.register_function(
|
||||
f"rsi{w}", lambda x, w=w: _rsi(x, w), f"相对强弱指标: RSI(x, {w})"
|
||||
)
|
||||
|
||||
# 统计量
|
||||
_registry.register_function(
|
||||
f"skew{w}", lambda x, w=w: _rolling_skew(x, w), f"滚动偏度: SKEW(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"kurt{w}", lambda x, w=w: _rolling_kurtosis(x, w), f"滚动峰度: KURT(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"linear{w}",
|
||||
lambda x, w=w: _rolling_linear(x, w),
|
||||
f"滚动线性斜率: LINEAR(x, {w})",
|
||||
)
|
||||
_registry.register_function(
|
||||
f"autocorr{w}",
|
||||
lambda x, w=w: _rolling_autocorr(x, w),
|
||||
f"滚动自相关: AUTOCORR(x, {w})",
|
||||
)
|
||||
_registry.register_function(
|
||||
f"max{w}", lambda x, w=w: _rolling_max(x, w), f"滚动最大值: MAX(x, {w})"
|
||||
)
|
||||
_registry.register_function(
|
||||
f"min{w}", lambda x, w=w: _rolling_min(x, w), f"滚动最小值: MIN(x, {w})"
|
||||
)
|
||||
|
||||
# 环比
|
||||
_registry.register_function(
|
||||
f"huanbi{w}", lambda x, w=w: _huanbi(x, w), f"环比: HUANBI(x, {w})"
|
||||
)
|
||||
|
||||
|
||||
# ==================== 因子公式解析与计算 ====================
|
||||
|
||||
|
||||
class FactorFormula:
|
||||
"""因子公式:支持序列化和反序列化"""
|
||||
|
||||
def __init__(self, expression: str, feature_names: List[str]):
|
||||
"""
|
||||
Parameters:
|
||||
-----------
|
||||
expression : str
|
||||
因子表达式(使用算子名称)
|
||||
feature_names : List[str]
|
||||
特征名称列表
|
||||
"""
|
||||
self.expression = expression
|
||||
self.feature_names = feature_names
|
||||
|
||||
def compute(self, features: Dict[str, np.ndarray]) -> np.ndarray:
|
||||
"""
|
||||
计算因子值
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
features : Dict[str, np.ndarray]
|
||||
特征字典,key为特征名称
|
||||
|
||||
Returns:
|
||||
--------
|
||||
np.ndarray: 因子值
|
||||
"""
|
||||
# 构建计算环境
|
||||
env = {}
|
||||
|
||||
# 添加特征
|
||||
for name in self.feature_names:
|
||||
if name not in features:
|
||||
raise KeyError(f"特征 '{name}' 不存在")
|
||||
env[name] = features[name]
|
||||
|
||||
# 添加算子
|
||||
for op_name in _registry.list_all():
|
||||
op = _registry.get(op_name)
|
||||
if op:
|
||||
env[op_name] = op.func
|
||||
|
||||
# 添加numpy和pandas(用于某些表达式)
|
||||
env["np"] = np
|
||||
env["pd"] = pd
|
||||
|
||||
# 执行表达式
|
||||
try:
|
||||
# 限制可用的内置函数
|
||||
safe_builtins = {
|
||||
"abs": abs,
|
||||
"min": min,
|
||||
"max": max,
|
||||
"sum": sum,
|
||||
"len": len,
|
||||
}
|
||||
result = eval(self.expression, {"__builtins__": safe_builtins}, env)
|
||||
|
||||
# 确保结果是numpy数组
|
||||
if not isinstance(result, np.ndarray):
|
||||
if isinstance(result, (int, float)):
|
||||
# 标量转换为数组(广播)
|
||||
result = np.full(len(features[self.feature_names[0]]), result)
|
||||
else:
|
||||
result = np.array(result)
|
||||
|
||||
# 确保长度一致
|
||||
expected_len = len(features[self.feature_names[0]])
|
||||
if len(result) != expected_len:
|
||||
raise ValueError(
|
||||
f"表达式结果长度 {len(result)} 与特征长度 {expected_len} 不匹配"
|
||||
)
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"计算因子表达式失败: {e}\n表达式: {self.expression}")
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""序列化为字典"""
|
||||
return {"expression": self.expression, "feature_names": self.feature_names}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict) -> "FactorFormula":
|
||||
"""从字典反序列化"""
|
||||
return cls(data["expression"], data["feature_names"])
|
||||
|
||||
def __repr__(self):
|
||||
return f"FactorFormula(expression='{self.expression}', features={self.feature_names})"
|
||||
237
factor_mining/validator.py
Normal file
237
factor_mining/validator.py
Normal file
@@ -0,0 +1,237 @@
|
||||
"""
|
||||
因子有效性检验模块:整合所有检验方案
|
||||
"""
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Dict, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from statsmodels.regression.linear_model import OLS
|
||||
|
||||
from validation import (
|
||||
compute_ic,
|
||||
compute_rolling_ic,
|
||||
group_backtest,
|
||||
factor_span_regression
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationConfig:
|
||||
"""验证配置"""
|
||||
ic_window: int = 30
|
||||
ic_method: str = "spearman" # "spearman" or "pearson"
|
||||
n_groups: int = 3
|
||||
group_period: int = 180
|
||||
min_ic: float = 0.01
|
||||
min_tstat: float = 1.5
|
||||
min_r2_change: float = 0.05
|
||||
|
||||
|
||||
class FactorValidator:
|
||||
"""因子有效性检验器"""
|
||||
|
||||
def __init__(self, config: ValidationConfig):
|
||||
self.config = config
|
||||
|
||||
def validate_ic(
|
||||
self,
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series
|
||||
) -> Dict:
|
||||
"""
|
||||
IC检验
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含mean_ic, ic_ir, ic_series等
|
||||
"""
|
||||
rolling_ic = compute_rolling_ic(
|
||||
factor,
|
||||
forward_return,
|
||||
window=self.config.ic_window,
|
||||
method=self.config.ic_method
|
||||
)
|
||||
|
||||
mean_ic = rolling_ic.mean()
|
||||
ic_std = rolling_ic.std()
|
||||
ic_ir = mean_ic / (ic_std + 1e-8) # IC信息比率
|
||||
|
||||
return {
|
||||
"mean_ic": mean_ic,
|
||||
"ic_std": ic_std,
|
||||
"ic_ir": ic_ir,
|
||||
"ic_series": rolling_ic,
|
||||
"is_valid": abs(mean_ic) >= self.config.min_ic
|
||||
}
|
||||
|
||||
def validate_group_backtest(
|
||||
self,
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series
|
||||
) -> Dict:
|
||||
"""
|
||||
分组回测检验
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含mean_h_l_return, mean_h_l_tstat等
|
||||
"""
|
||||
result = group_backtest(
|
||||
factor,
|
||||
forward_return,
|
||||
n_groups=self.config.n_groups,
|
||||
group_period=self.config.group_period
|
||||
)
|
||||
|
||||
is_valid = abs(result.get('mean_h_l_tstat', 0)) >= self.config.min_tstat
|
||||
|
||||
return {
|
||||
**result,
|
||||
"is_valid": is_valid
|
||||
}
|
||||
|
||||
def validate_regression(
|
||||
self,
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series,
|
||||
other_factors: Optional[pd.DataFrame] = None
|
||||
) -> Dict:
|
||||
"""
|
||||
因子跨度回归检验
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
factor : Series
|
||||
待检验因子
|
||||
forward_return : Series
|
||||
未来收益率
|
||||
other_factors : DataFrame, optional
|
||||
其他因子(用于控制变量)
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含beta, tstat, r2_change等
|
||||
"""
|
||||
if other_factors is None:
|
||||
other_factors = pd.DataFrame()
|
||||
|
||||
# 合并因子
|
||||
factors_df = pd.concat([other_factors, factor.to_frame(name='target')], axis=1)
|
||||
|
||||
result = factor_span_regression(
|
||||
factors_df,
|
||||
forward_return,
|
||||
target_factor='target'
|
||||
)
|
||||
|
||||
is_valid = (
|
||||
abs(result.get('tstat', 0)) >= self.config.min_tstat and
|
||||
result.get('r2_change', 0) >= self.config.min_r2_change
|
||||
)
|
||||
|
||||
return {
|
||||
**result,
|
||||
"is_valid": is_valid
|
||||
}
|
||||
|
||||
def validate_all(
|
||||
self,
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series,
|
||||
other_factors: Optional[pd.DataFrame] = None
|
||||
) -> Dict:
|
||||
"""
|
||||
综合检验:执行所有检验方法
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含所有检验结果和综合判断
|
||||
"""
|
||||
results = {}
|
||||
|
||||
# IC检验
|
||||
ic_result = self.validate_ic(factor, forward_return)
|
||||
results['ic'] = ic_result
|
||||
|
||||
# 分组回测
|
||||
group_result = self.validate_group_backtest(factor, forward_return)
|
||||
results['group_backtest'] = group_result
|
||||
|
||||
# 回归检验
|
||||
reg_result = self.validate_regression(factor, forward_return, other_factors)
|
||||
results['regression'] = reg_result
|
||||
|
||||
# 综合判断
|
||||
is_valid = (
|
||||
ic_result['is_valid'] and
|
||||
group_result['is_valid'] and
|
||||
reg_result['is_valid']
|
||||
)
|
||||
|
||||
results['is_valid'] = is_valid
|
||||
results['score'] = self._calculate_score(ic_result, group_result, reg_result)
|
||||
|
||||
return results
|
||||
|
||||
def _calculate_score(
|
||||
self,
|
||||
ic_result: Dict,
|
||||
group_result: Dict,
|
||||
reg_result: Dict
|
||||
) -> float:
|
||||
"""计算综合得分"""
|
||||
score = 0.0
|
||||
|
||||
# IC得分(权重0.3)
|
||||
ic_score = abs(ic_result.get('mean_ic', 0)) * 10
|
||||
score += ic_score * 0.3
|
||||
|
||||
# 分组回测得分(权重0.4)
|
||||
tstat = abs(group_result.get('mean_h_l_tstat', 0))
|
||||
tstat_score = min(tstat / 3.0, 1.0) # 归一化到[0, 1]
|
||||
score += tstat_score * 0.4
|
||||
|
||||
# 回归得分(权重0.3)
|
||||
r2_change = reg_result.get('r2_change', 0)
|
||||
r2_score = min(r2_change / 0.1, 1.0) # 归一化到[0, 1]
|
||||
score += r2_score * 0.3
|
||||
|
||||
return score
|
||||
|
||||
def filter_factors(
|
||||
self,
|
||||
factors: pd.DataFrame,
|
||||
forward_return: pd.Series
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
批量过滤因子:只保留有效因子
|
||||
|
||||
Returns:
|
||||
--------
|
||||
DataFrame: 有效因子
|
||||
"""
|
||||
valid_factors = []
|
||||
|
||||
for col in factors.columns:
|
||||
factor = factors[col]
|
||||
result = self.validate_all(factor, forward_return, factors.drop(columns=[col]))
|
||||
|
||||
if result['is_valid']:
|
||||
valid_factors.append(col)
|
||||
|
||||
return factors[valid_factors] if valid_factors else pd.DataFrame()
|
||||
|
||||
|
||||
def create_validator(
|
||||
ic_window: int = 30,
|
||||
min_ic: float = 0.01,
|
||||
min_tstat: float = 1.5
|
||||
) -> FactorValidator:
|
||||
"""创建验证器(便捷函数)"""
|
||||
config = ValidationConfig(
|
||||
ic_window=ic_window,
|
||||
min_ic=min_ic,
|
||||
min_tstat=min_tstat
|
||||
)
|
||||
return FactorValidator(config)
|
||||
|
||||
113
factors.py
113
factors.py
@@ -1,113 +0,0 @@
|
||||
"""
|
||||
因子挖掘模块:支持规则因子和遗传编程因子
|
||||
"""
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Callable, Dict, List, Optional
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseFactor(ABC):
|
||||
"""因子基类"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
||||
@abstractmethod
|
||||
def compute(self, data: pd.DataFrame) -> pd.Series:
|
||||
"""计算因子值"""
|
||||
pass
|
||||
|
||||
|
||||
class RuleFactor(BaseFactor):
|
||||
"""规则因子:基于固定规则"""
|
||||
|
||||
def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]):
|
||||
super().__init__(name)
|
||||
self.compute_func = compute_func
|
||||
|
||||
def compute(self, data: pd.DataFrame) -> pd.Series:
|
||||
return self.compute_func(data)
|
||||
|
||||
|
||||
def create_trend_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""趋势因子:价格趋势方向"""
|
||||
trend = pd.Series(0, index=data.index)
|
||||
trend[data['close'] > data['ema16']] = 1
|
||||
trend[data['close'] < data['ema4']] = -1
|
||||
return trend
|
||||
|
||||
|
||||
def create_volatility_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""波动率因子:滚动12期收益率标准差"""
|
||||
return data['volatility']
|
||||
|
||||
|
||||
def create_volume_price_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""量价因子:成交量放大且价格上涨"""
|
||||
volume_signal = (data['volume'] > data['volume_ma6']).astype(int)
|
||||
return volume_signal * data['return']
|
||||
|
||||
|
||||
def create_reversal_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""反转因子:短期反转效应"""
|
||||
return -data['return'].shift(1)
|
||||
|
||||
|
||||
def create_momentum_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""动量因子:基于MACD"""
|
||||
return data['macd']
|
||||
|
||||
|
||||
def create_rsi_factor(data: pd.DataFrame) -> pd.Series:
|
||||
"""RSI因子:相对强弱指数(标准化)"""
|
||||
return (data['rsi'] - 50) / 50 # 归一化到[-1, 1]
|
||||
|
||||
|
||||
class FactorMiner:
|
||||
"""因子挖掘器"""
|
||||
|
||||
def __init__(self):
|
||||
self.factors: Dict[str, BaseFactor] = {}
|
||||
|
||||
def register_factor(self, factor: BaseFactor):
|
||||
"""注册因子"""
|
||||
self.factors[factor.name] = factor
|
||||
|
||||
def register_rule_factor(self, name: str, compute_func: Callable):
|
||||
"""注册规则因子"""
|
||||
factor = RuleFactor(name, compute_func)
|
||||
self.register_factor(factor)
|
||||
|
||||
def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame:
|
||||
"""计算所有因子"""
|
||||
factor_df = pd.DataFrame(index=data.index)
|
||||
|
||||
for name, factor in self.factors.items():
|
||||
try:
|
||||
factor_df[name] = factor.compute(data)
|
||||
except Exception as e:
|
||||
print(f"计算因子 {name} 时出错: {e}")
|
||||
factor_df[name] = np.nan
|
||||
|
||||
return factor_df
|
||||
|
||||
def get_factor(self, name: str) -> Optional[BaseFactor]:
|
||||
"""获取指定因子"""
|
||||
return self.factors.get(name)
|
||||
|
||||
|
||||
def create_default_factors() -> FactorMiner:
|
||||
"""创建默认因子集合"""
|
||||
miner = FactorMiner()
|
||||
|
||||
# 注册基础因子
|
||||
miner.register_rule_factor('TREND', create_trend_factor)
|
||||
miner.register_rule_factor('VOL', create_volatility_factor)
|
||||
miner.register_rule_factor('VOLP', create_volume_price_factor)
|
||||
miner.register_rule_factor('REV', create_reversal_factor)
|
||||
miner.register_rule_factor('MOM', create_momentum_factor)
|
||||
miner.register_rule_factor('RSI', create_rsi_factor)
|
||||
|
||||
return miner
|
||||
|
||||
109
signal.py
109
signal.py
@@ -1,109 +0,0 @@
|
||||
"""
|
||||
信号生成模块
|
||||
"""
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pandas import Series
|
||||
|
||||
|
||||
def generate_signals(
|
||||
score: 'pd.Series',
|
||||
buy_threshold: float = 0.8,
|
||||
sell_threshold: float = -0.8,
|
||||
window: int = 30,
|
||||
use_rolling_std: bool = True
|
||||
) -> 'pd.Series':
|
||||
"""
|
||||
基于因子得分生成买卖信号
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
score : Series
|
||||
因子综合得分
|
||||
buy_threshold : float
|
||||
买入阈值(标准差倍数)
|
||||
sell_threshold : float
|
||||
卖出阈值(标准差倍数)
|
||||
window : int
|
||||
滚动窗口(用于计算标准差)
|
||||
use_rolling_std : bool
|
||||
是否使用滚动标准差
|
||||
|
||||
Returns:
|
||||
--------
|
||||
Series: 交易信号(1=买入,-1=卖出,0=持有)
|
||||
"""
|
||||
signals = pd.Series(0, index=score.index)
|
||||
|
||||
if use_rolling_std:
|
||||
# 使用滚动标准差
|
||||
rolling_std = score.rolling(window).std()
|
||||
buy_line = buy_threshold * rolling_std
|
||||
sell_line = sell_threshold * rolling_std
|
||||
else:
|
||||
# 使用固定阈值
|
||||
std = score.std()
|
||||
buy_line = buy_threshold * std
|
||||
sell_line = sell_threshold * std
|
||||
|
||||
# 生成原始信号
|
||||
raw_signals = pd.Series(0, index=score.index)
|
||||
raw_signals[score > buy_line] = 1 # 买入信号
|
||||
raw_signals[score < sell_line] = -1 # 卖出信号
|
||||
|
||||
# 只在信号变化时产生交易信号,其他时候保持持仓状态
|
||||
signals = pd.Series(0, index=score.index)
|
||||
position = 0 # 当前持仓状态:0=空仓,1=满仓
|
||||
|
||||
for i in range(len(raw_signals)):
|
||||
current_signal = raw_signals.iloc[i]
|
||||
|
||||
# 只在信号变化时产生交易
|
||||
if current_signal == 1 and position == 0:
|
||||
signals.iloc[i] = 1 # 买入
|
||||
position = 1
|
||||
elif current_signal == -1 and position == 1:
|
||||
signals.iloc[i] = -1 # 卖出
|
||||
position = 0
|
||||
# 其他情况保持当前持仓状态,不产生交易信号
|
||||
|
||||
return signals.astype(int)
|
||||
|
||||
|
||||
def generate_signals_with_position(
|
||||
score: 'pd.Series',
|
||||
buy_threshold: float = 0.8,
|
||||
sell_threshold: float = -0.8,
|
||||
window: int = 30,
|
||||
current_position: int = 0
|
||||
) -> 'pd.Series':
|
||||
"""
|
||||
生成信号(考虑当前持仓状态)
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
current_position : int
|
||||
当前持仓:0=空仓,1=满仓
|
||||
"""
|
||||
raw_signals = generate_signals(score, buy_threshold, sell_threshold, window)
|
||||
signals = pd.Series(0, index=score.index)
|
||||
|
||||
position = current_position
|
||||
|
||||
for i in range(len(raw_signals)):
|
||||
signal = raw_signals.iloc[i]
|
||||
|
||||
if signal == 1 and position == 0:
|
||||
signals.iloc[i] = 1 # 买入
|
||||
position = 1
|
||||
elif signal == -1 and position == 1:
|
||||
signals.iloc[i] = -1 # 卖出
|
||||
position = 0
|
||||
else:
|
||||
signals.iloc[i] = 0 # 持有
|
||||
|
||||
return signals
|
||||
|
||||
189
validation.py
189
validation.py
@@ -1,63 +1,44 @@
|
||||
"""
|
||||
因子检验模块:IC检验、分组回测、因子跨度回归
|
||||
因子检验模块: IC检验、分组回测、因子跨度回归
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Dict, List, Tuple
|
||||
from statsmodels.regression.linear_model import OLS
|
||||
|
||||
|
||||
def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series:
|
||||
"""
|
||||
计算IC(信息系数)
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
factor : Series
|
||||
因子值
|
||||
forward_return : Series
|
||||
未来收益率
|
||||
method : str
|
||||
相关性计算方法:'spearman' 或 'pearson'
|
||||
"""
|
||||
aligned = pd.concat([factor, forward_return], axis=1).dropna()
|
||||
if len(aligned) < 10:
|
||||
return pd.Series(dtype=float)
|
||||
|
||||
if method == 'spearman':
|
||||
ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank())
|
||||
else:
|
||||
ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1])
|
||||
|
||||
return pd.Series([ic], index=[aligned.index[-1]])
|
||||
|
||||
|
||||
def compute_rolling_ic(
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series,
|
||||
window: int = 30,
|
||||
method: str = 'spearman'
|
||||
method: str = "spearman",
|
||||
) -> pd.Series:
|
||||
"""计算滚动IC(向量化优化)"""
|
||||
"""计算滚动IC (向量化优化)"""
|
||||
# 对齐数据
|
||||
aligned = pd.concat([factor, forward_return], axis=1).dropna()
|
||||
if len(aligned) < window:
|
||||
return pd.Series(dtype=float, index=factor.index[window:])
|
||||
|
||||
aligned.columns = ['factor', 'return']
|
||||
|
||||
if method == 'spearman':
|
||||
|
||||
aligned.columns = ["factor", "return"]
|
||||
|
||||
if method == "spearman":
|
||||
# 使用rank计算Spearman相关性
|
||||
factor_rank = aligned['factor'].rank()
|
||||
return_rank = aligned['return'].rank()
|
||||
# 使用DataFrame的rolling().corr()方法
|
||||
df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank})
|
||||
ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return'])
|
||||
# 这里是全局的 rank,理论上应该是按照 window 滚动排序
|
||||
factor_rank = aligned["factor"].rank()
|
||||
return_rank = aligned["return"].rank()
|
||||
# 使用DataFrame的rolling().corr()方法, 该方法pandas优化过
|
||||
df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank})
|
||||
ic_series = (
|
||||
df_rank["factor"]
|
||||
.rolling(window, min_periods=window)
|
||||
.corr(df_rank["return"])
|
||||
)
|
||||
else:
|
||||
# Pearson相关性
|
||||
df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']})
|
||||
ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return'])
|
||||
|
||||
df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]})
|
||||
ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"])
|
||||
|
||||
return ic_series
|
||||
|
||||
|
||||
@@ -65,85 +46,75 @@ def group_backtest(
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series,
|
||||
n_groups: int = 3,
|
||||
group_period: int = 180
|
||||
group_period: int = 180,
|
||||
) -> Dict:
|
||||
"""
|
||||
分组回测:将数据按因子值分组,计算各组收益
|
||||
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含各组收益、H-L收益差、t统计量等
|
||||
"""
|
||||
aligned = pd.concat([factor, forward_return], axis=1).dropna()
|
||||
aligned.columns = ['factor', 'return']
|
||||
|
||||
results = {
|
||||
'group_returns': [],
|
||||
'h_l_return': [],
|
||||
'h_l_tstat': [],
|
||||
'periods': []
|
||||
}
|
||||
|
||||
aligned.columns = ["factor", "return"]
|
||||
|
||||
results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []}
|
||||
|
||||
# 按月分组(每180个4h周期)- 使用更高效的步长
|
||||
step = max(group_period // 2, 90) # 减少重叠计算
|
||||
for start in range(0, len(aligned) - group_period, step):
|
||||
end = start + group_period
|
||||
period_data = aligned.iloc[start:end]
|
||||
|
||||
|
||||
if len(period_data) < 30:
|
||||
continue
|
||||
|
||||
|
||||
# 按因子值分组(向量化)
|
||||
try:
|
||||
period_data = period_data.copy()
|
||||
period_data['group'] = pd.qcut(
|
||||
period_data['factor'],
|
||||
q=n_groups,
|
||||
labels=False,
|
||||
duplicates='drop'
|
||||
period_data["group"] = pd.qcut(
|
||||
period_data["factor"], q=n_groups, labels=False, duplicates="drop"
|
||||
)
|
||||
|
||||
|
||||
# 计算各组收益(向量化)
|
||||
group_returns = period_data.groupby('group')['return'].mean()
|
||||
results['group_returns'].append(group_returns)
|
||||
|
||||
group_returns = period_data.groupby("group")["return"].mean()
|
||||
results["group_returns"].append(group_returns)
|
||||
|
||||
# H-L收益差
|
||||
if len(group_returns) >= 2:
|
||||
h_return = group_returns.iloc[-1] # 高因子组
|
||||
l_return = group_returns.iloc[0] # 低因子组
|
||||
l_return = group_returns.iloc[0] # 低因子组
|
||||
h_l_diff = h_return - l_return
|
||||
|
||||
results['h_l_return'].append(h_l_diff)
|
||||
results['periods'].append(period_data.index[-1])
|
||||
|
||||
results["h_l_return"].append(h_l_diff)
|
||||
results["periods"].append(period_data.index[-1])
|
||||
except (ValueError, KeyError):
|
||||
# qcut失败时跳过
|
||||
continue
|
||||
|
||||
|
||||
# 计算平均H-L收益和t统计量
|
||||
if results['h_l_return']:
|
||||
h_l_series = pd.Series(results['h_l_return'], index=results['periods'])
|
||||
if results["h_l_return"]:
|
||||
h_l_series = pd.Series(results["h_l_return"], index=results["periods"])
|
||||
mean_h_l = h_l_series.mean()
|
||||
std_h_l = h_l_series.std()
|
||||
t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8)
|
||||
|
||||
results['mean_h_l_return'] = mean_h_l
|
||||
results['mean_h_l_tstat'] = t_stat
|
||||
results['h_l_series'] = h_l_series
|
||||
|
||||
results["mean_h_l_return"] = mean_h_l
|
||||
results["mean_h_l_tstat"] = t_stat
|
||||
results["h_l_series"] = h_l_series
|
||||
else:
|
||||
results['mean_h_l_return'] = 0
|
||||
results['mean_h_l_tstat'] = 0
|
||||
|
||||
results["mean_h_l_return"] = 0
|
||||
results["mean_h_l_tstat"] = 0
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def factor_span_regression(
|
||||
factors: pd.DataFrame,
|
||||
forward_return: pd.Series,
|
||||
target_factor: str
|
||||
factors: pd.DataFrame, forward_return: pd.Series, target_factor: str
|
||||
) -> Dict:
|
||||
"""
|
||||
因子跨度回归:检验因子的边际解释力
|
||||
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
factors : DataFrame
|
||||
@@ -152,7 +123,7 @@ def factor_span_regression(
|
||||
未来收益率
|
||||
target_factor : str
|
||||
目标因子名称
|
||||
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含回归系数、t统计量、R²等
|
||||
@@ -160,49 +131,46 @@ def factor_span_regression(
|
||||
# 对齐数据
|
||||
data = pd.concat([factors, forward_return], axis=1).dropna()
|
||||
if len(data) < 30:
|
||||
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
|
||||
|
||||
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
|
||||
|
||||
y = data.iloc[:, -1].values
|
||||
X_all = data.iloc[:, :-1].values
|
||||
|
||||
|
||||
# 全模型(包含目标因子)
|
||||
try:
|
||||
model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
|
||||
model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
|
||||
r2_all = model_all.rsquared
|
||||
|
||||
|
||||
# 目标因子的系数和t统计量
|
||||
target_idx = factors.columns.get_loc(target_factor)
|
||||
beta = model_all.params[target_idx]
|
||||
tstat = model_all.tvalues[target_idx]
|
||||
|
||||
|
||||
# 不含目标因子的模型
|
||||
X_without = np.delete(X_all, target_idx, axis=1)
|
||||
model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
|
||||
model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
|
||||
r2_without = model_without.rsquared
|
||||
|
||||
|
||||
r2_change = r2_all - r2_without
|
||||
|
||||
|
||||
return {
|
||||
'beta': beta,
|
||||
'tstat': tstat,
|
||||
'r2': r2_all,
|
||||
'r2_change': r2_change,
|
||||
'pvalue': model_all.pvalues[target_idx]
|
||||
"beta": beta,
|
||||
"tstat": tstat,
|
||||
"r2": r2_all,
|
||||
"r2_change": r2_change,
|
||||
"pvalue": model_all.pvalues[target_idx],
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"回归分析出错: {e}")
|
||||
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
|
||||
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
|
||||
|
||||
|
||||
def validate_factor(
|
||||
factor: pd.Series,
|
||||
forward_return: pd.Series,
|
||||
ic_window: int = 30,
|
||||
n_groups: int = 3
|
||||
factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3
|
||||
) -> Dict:
|
||||
"""
|
||||
综合因子检验
|
||||
|
||||
|
||||
Returns:
|
||||
--------
|
||||
dict: 包含IC、分组回测、显著性等指标
|
||||
@@ -211,16 +179,15 @@ def validate_factor(
|
||||
rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window)
|
||||
mean_ic = rolling_ic.mean()
|
||||
ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率
|
||||
|
||||
|
||||
# 分组回测
|
||||
group_result = group_backtest(factor, forward_return, n_groups=n_groups)
|
||||
|
||||
return {
|
||||
'mean_ic': mean_ic,
|
||||
'ic_ir': ic_ir,
|
||||
'ic_series': rolling_ic,
|
||||
'mean_h_l_return': group_result['mean_h_l_return'],
|
||||
'mean_h_l_tstat': group_result['mean_h_l_tstat'],
|
||||
'group_returns': group_result['group_returns']
|
||||
}
|
||||
|
||||
return {
|
||||
"mean_ic": mean_ic,
|
||||
"ic_ir": ic_ir,
|
||||
"ic_series": rolling_ic,
|
||||
"mean_h_l_return": group_result["mean_h_l_return"],
|
||||
"mean_h_l_tstat": group_result["mean_h_l_tstat"],
|
||||
"group_returns": group_result["group_returns"],
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user