Compare commits

...

2 Commits

Author SHA1 Message Date
e5beada25e 添加talib算子 2025-11-09 20:19:08 +08:00
dc3d41d6e5 factor minner 2025-11-09 14:00:58 +08:00
8 changed files with 1513 additions and 333 deletions

179
.gitignore vendored Normal file
View File

@@ -0,0 +1,179 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Data files (keep structure but ignore large data)
data/
# IDE files
.vscode/
.idea/
*.swp
*.swo
*~
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Docker
.dockerignore
# Logs
*.log
logs/
# Temporary files
tmp/
temp/
*.tmp
*.temp
# API keys and secrets
.env
config.ini
secrets.json
api_keys.txt
# Database files
*.db
*.sqlite
*.sqlite3
# Backup files
*.bak
*.backup
*.csv
*.feather

243
factor_mining/gp_miner.py Normal file
View File

@@ -0,0 +1,243 @@
"""
DEAP遗传编程挖掘器实现
"""
import random
import operator
from typing import List, Tuple, Optional
from dataclasses import dataclass
import numpy as np
import pandas as pd
from deap import algorithms, base, creator, gp, tools
from factor_mining.operators import FactorFormula, get_registry, get_operator
from factor_mining.mining import FactorMiner, MiningConfig
from data import compute_forward_returns
@dataclass
class GPConfig(MiningConfig):
"""GP挖掘配置"""
population_size: int = 200
generations: int = 30
tournament_size: int = 5
crossover_prob: float = 0.9
mutation_prob: float = 0.05
elitism: int = 5
max_depth_init: int = 1
max_depth: int = 8
complexity_penalty: float = 0.001
class GPMiner(FactorMiner):
"""DEAP遗传编程挖掘器"""
def __init__(self, config: GPConfig):
super().__init__(config)
self.config: GPConfig = config
self.toolbox: Optional[base.Toolbox] = None
self.pset: Optional[gp.PrimitiveSetTyped] = None
self.features: Optional[List[pd.Series]] = None
def get_name(self) -> str:
return "gp"
def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped:
"""构建GP原始集合"""
registry = get_registry()
pset = gp.PrimitiveSetTyped(
"MAIN", [np.ndarray for _ in feature_names], np.ndarray
)
# 命名参数
for i, name in enumerate(feature_names):
pset.renameArguments(**{f"ARG{i}": name})
# 添加算子
for op_name in registry.list_all():
op = registry.get(op_name)
if op:
sig = op.get_signature()
params = list(sig.parameters.values())
# 根据参数数量判断是一元还是二元算子
if len(params) == 1:
# 一元算子
pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name)
elif len(params) == 2:
# 二元算子
pset.addPrimitive(
op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name
)
# 添加常量
# def _const() -> np.ndarray:
# return np.array(random.uniform(-2.0, 2.0))
# pset.addEphemeralConstant("const", _const, np.ndarray)
return pset
def _evaluate_individual(self, individual, target: pd.Series) -> Tuple[float]:
"""评估个体适应度"""
func = self.toolbox.compile(expr=individual)
# 构建特征矩阵
idx = target.index
inputs = [f.reindex(idx).to_numpy() for f in self.features]
try:
raw = func(*inputs)
except Exception:
return (-1e6,)
# 确保数组长度
if not isinstance(raw, np.ndarray):
return (-1e6,)
if raw.shape[0] != len(idx):
return (-1e6,)
# 转换为Series并清理
factor = pd.Series(raw, index=idx)
factor = factor.replace([np.inf, -np.inf], np.nan)
factor = factor.ffill().bfill()
# 计算滚动IC
window = self.config.ic_window
if len(factor) < window + 10:
return (-1e6,)
from validation import compute_rolling_ic
ic_series = compute_rolling_ic(
factor, target, window=window, method=self.config.ic_method
)
mean_ic = ic_series.mean()
if not np.isfinite(mean_ic):
return (-1e6,)
# 复杂度惩罚
complexity = len(individual)
fitness = mean_ic - self.config.complexity_penalty * complexity
if not np.isfinite(fitness):
fitness = -1e6
return (fitness,)
def _individual_to_formula(
self, individual, feature_names: List[str]
) -> FactorFormula:
"""将GP个体转换为因子公式"""
# GP表达式是PrimitiveTree转换为字符串后是函数调用形式
# 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)"
expr_str = str(individual)
# 替换ARG0, ARG1等为实际特征名
for i, name in enumerate(feature_names):
expr_str = expr_str.replace(f"ARG{i}", name)
# GP表达式已经是Python可执行的函数调用格式
# 例如: "add(close, open)" 可以直接eval
# 但需要确保所有算子都在环境中可用
return FactorFormula(expr_str, feature_names)
def mine(
self, data: pd.DataFrame, feature_cols: List[str], price_col: str = "close"
) -> List[FactorFormula]:
"""执行GP挖掘"""
if self.config.seed is not None:
random.seed(self.config.seed)
np.random.seed(self.config.seed)
# 准备数据
price = data[price_col].astype(float)
forward_ret = compute_forward_returns(price, self.config.ret_horizon)
target = forward_ret
self.features = [data[c].astype(float) for c in feature_cols]
# 构建原始集合
self.pset = self._build_pset(feature_cols)
# 创建DEAP类型
if not hasattr(creator, "FitnessMax"):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
# 构建工具箱
self.toolbox = base.Toolbox()
self.toolbox.register(
"expr",
gp.genHalfAndHalf,
pset=self.pset,
min_=1,
max_=self.config.max_depth_init,
)
self.toolbox.register(
"individual", tools.initIterate, creator.Individual, self.toolbox.expr
)
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
self.toolbox.register("compile", gp.compile, pset=self.pset)
self.toolbox.register("evaluate", self._evaluate_individual, target=target)
# 遗传算子
self.toolbox.register(
"select", tools.selTournament, tournsize=self.config.tournament_size
)
self.toolbox.register("mate", gp.cxOnePoint)
self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
self.toolbox.register(
"mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset
)
# 控制树深度
self.toolbox.decorate(
"mate",
gp.staticLimit(
key=operator.attrgetter("height"), max_value=self.config.max_depth
),
)
self.toolbox.decorate(
"mutate",
gp.staticLimit(
key=operator.attrgetter("height"), max_value=self.config.max_depth
),
)
# 运行进化
pop = self.toolbox.population(n=self.config.population_size)
hof = tools.HallOfFame(maxsize=max(5000, self.config.elitism))
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.nanmean)
mstats.register("std", np.nanstd)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
pop, logbook = algorithms.eaSimple(
pop,
self.toolbox,
cxpb=self.config.crossover_prob,
mutpb=self.config.mutation_prob,
ngen=self.config.generations,
stats=mstats,
halloffame=hof,
verbose=True,
)
# 转换为因子公式
formulas = []
for individual in hof:
formula = self._individual_to_formula(individual, feature_cols)
formulas.append(formula)
return formulas

123
factor_mining/mining.py Normal file
View File

@@ -0,0 +1,123 @@
"""
因子挖掘抽象层支持多种挖掘算法DEAP、DL、RL等
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
import pandas as pd
from dataclasses import dataclass
from factor_mining.operators import FactorFormula
@dataclass
class MiningConfig:
"""挖掘配置基类"""
ret_horizon: int = 1
ic_window: int = 30
ic_method: str = "spearman" # "spearman" or "pearson"
seed: Optional[int] = None
class FactorMiner(ABC):
"""因子挖掘器抽象基类"""
def __init__(self, config: MiningConfig):
self.config = config
@abstractmethod
def mine(
self,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = "close"
) -> List[FactorFormula]:
"""
挖掘因子
Parameters:
-----------
data : DataFrame
数据
feature_cols : List[str]
特征列名列表
price_col : str
价格列名
Returns:
--------
List[FactorFormula]: 挖掘出的因子公式列表
"""
pass
@abstractmethod
def get_name(self) -> str:
"""获取挖掘器名称"""
pass
class MiningPipeline:
"""挖掘流程管理器"""
def __init__(self):
self.miners: Dict[str, FactorMiner] = {}
def register_miner(self, miner: FactorMiner):
"""注册挖掘器"""
name = miner.get_name()
if name in self.miners:
raise ValueError(f"挖掘器 '{name}' 已存在")
self.miners[name] = miner
def get_miner(self, name: str) -> Optional[FactorMiner]:
"""获取挖掘器"""
return self.miners.get(name)
def list_miners(self) -> List[str]:
"""列出所有挖掘器"""
return list(self.miners.keys())
def mine(
self,
miner_name: str,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = "close"
) -> List[FactorFormula]:
"""
使用指定挖掘器进行挖掘
Parameters:
-----------
miner_name : str
挖掘器名称
data : DataFrame
数据
feature_cols : List[str]
特征列名列表
price_col : str
价格列名
Returns:
--------
List[FactorFormula]: 挖掘出的因子公式列表
"""
miner = self.get_miner(miner_name)
if miner is None:
raise ValueError(f"挖掘器 '{miner_name}' 不存在")
return miner.mine(data, feature_cols, price_col)
# 全局挖掘流程管理器
_pipeline = MiningPipeline()
def register_miner(miner: FactorMiner):
"""注册挖掘器到全局管理器"""
_pipeline.register_miner(miner)
def get_pipeline() -> MiningPipeline:
"""获取全局挖掘流程管理器"""
return _pipeline

653
factor_mining/operators.py Normal file
View File

@@ -0,0 +1,653 @@
"""
算子系统:基础数学算子和技术指标算子的注册与管理
支持算子的注册、查询、反射调用
"""
import numpy as np
import pandas as pd
from typing import Dict, Callable, List, Optional, Any
from abc import ABC, abstractmethod
import inspect
import talib
class Operator(ABC):
"""算子基类"""
def __init__(self, name: str, func: Callable, description: str = ""):
"""
Parameters:
-----------
name : str
算子名称(唯一标识)
func : Callable
算子函数
description : str
算子描述
"""
self.name = name
self.func = func
self.description = description
self._signature = inspect.signature(func)
def __call__(self, *args, **kwargs):
"""调用算子函数"""
return self.func(*args, **kwargs)
def get_signature(self):
"""获取函数签名"""
return self._signature
def __repr__(self):
return f"Operator(name='{self.name}', description='{self.description}')"
class OperatorRegistry:
"""算子注册表"""
def __init__(self):
self._operators: Dict[str, Operator] = {}
def register(self, operator: Operator):
"""注册算子"""
if operator.name in self._operators:
raise ValueError(f"算子 '{operator.name}' 已存在")
self._operators[operator.name] = operator
def register_function(self, name: str, func: Callable, description: str = ""):
"""直接注册函数为算子"""
operator = Operator(name, func, description)
self.register(operator)
def get(self, name: str) -> Optional[Operator]:
"""获取算子"""
return self._operators.get(name)
def has(self, name: str) -> bool:
"""检查算子是否存在"""
return name in self._operators
def list_all(self) -> List[str]:
"""列出所有算子名称"""
return list(self._operators.keys())
def get_all(self) -> Dict[str, Operator]:
"""获取所有算子"""
return self._operators.copy()
# 全局算子注册表
_registry = OperatorRegistry()
def register_operator(name: str, description: str = ""):
"""装饰器:注册算子"""
def decorator(func: Callable):
_registry.register_function(name, func, description)
return func
return decorator
def get_operator(name: str) -> Optional[Operator]:
"""获取算子"""
return _registry.get(name)
def get_registry() -> OperatorRegistry:
"""获取全局注册表"""
return _registry
# 定义period参数的值范围
PERIOD_RANGE = range(10, 100) # 10到99
# ==================== 基础数学算子 ====================
@register_operator("add", "加法: x + y")
def _add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x + y
@register_operator("sub", "减法: x - y")
def _sub(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x - y
@register_operator("mul", "乘法: x * y")
def _mul(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x * y
@register_operator("div", "除法: x / y (安全除法)")
def _div(x: np.ndarray, y: np.ndarray) -> np.ndarray:
denom = np.where(np.abs(y) < 1e-12, np.nan, y)
return x / denom
@register_operator("neg", "取负: -x")
def _neg(x: np.ndarray) -> np.ndarray:
return np.negative(x)
@register_operator("abs", "绝对值: |x|")
def _abs(x: np.ndarray) -> np.ndarray:
return np.abs(x)
@register_operator("log", "对数: log(|x|)")
def _log(x: np.ndarray) -> np.ndarray:
return np.log(np.clip(np.abs(x), 1e-12, None))
@register_operator("sqrt", "平方根: sqrt(x)")
def _sqrt(x: np.ndarray) -> np.ndarray:
return np.sqrt(np.clip(x, 0.0, None))
@register_operator("pow", "幂运算: x^y (限制范围)")
def _pow(x: np.ndarray, y: np.ndarray) -> np.ndarray:
y_clip = np.clip(y, -3.0, 3.0)
with np.errstate(over="ignore", invalid="ignore"):
out = np.power(np.clip(x, -1e6, 1e6), y_clip)
out[~np.isfinite(out)] = np.nan
return out
# ==================== 时间序列算子 ====================
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy()
def _rolling_std(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy()
def _ts_delta(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.diff(period).to_numpy()
def _ts_rank(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2))
.apply(lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False)
.to_numpy()
)
def _delay(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.shift(period).to_numpy()
def _pct_change(x: np.ndarray, period: int = 1) -> np.ndarray:
"""百分比变化"""
s = pd.Series(x)
return s.pct_change(periods=period, fill_method=None).to_numpy()
# 注册单参数百分比变化算子
@register_operator("pct", "百分比变化: PCT(x, 1)")
def _pct(x: np.ndarray) -> np.ndarray:
return _pct_change(x, 1)
# 注册时间序列算子(带不同窗口)
for w in PERIOD_RANGE:
_registry.register_function(
f"sma{w}", lambda x, w=w: _rolling_mean(x, w), f"简单移动平均: SMA(x, {w})"
)
_registry.register_function(
f"std{w}", lambda x, w=w: _rolling_std(x, w), f"滚动标准差: STD(x, {w})"
)
_registry.register_function(
f"rank{w}", lambda x, w=w: _ts_rank(x, w), f"滚动排名: RANK(x, {w})"
)
_registry.register_function(
f"delta{w}", lambda x, w=w: _ts_delta(x, w), f"差分: DELTA(x, {w})"
)
_registry.register_function(
f"delay{w}", lambda x, w=w: _delay(x, w), f"延迟: DELAY(x, {w})"
)
# ==================== 技术指标算子含自定义与ta-lib====================
def _try_float(x):
try:
return float(x)
except Exception:
return x
def _convert_input(v):
# 如果是pd.Series,返回np.ndarray; 如果已经是np.ndarray则原样返回
if isinstance(v, pd.Series):
return v.values
return v
# 注册 ta-lib 技术指标
# 获取 TA-Lib 的所有函数名常用financial indicators均为大写
talib_func_list = [f for f in dir(talib) if f.isupper() and callable(getattr(talib, f))]
# 定义需要生成多版本的参数名period相关参数
# 按优先级排序优先匹配主要的period参数
PERIOD_PARAM_NAMES = [
"timeperiod", # 最常见的参数名
"period", # 通用period参数
"optintimeperiod", # TA-Lib内部参数名
]
# 多period参数的函数需要特殊处理
# 对于这些函数明确指定主要period参数避免自动检测错误
MULTI_PERIOD_FUNCTIONS = {
# 函数名: (主要period参数名, 次要period参数列表仅用于文档)
"MACD": ("fastperiod", ["slowperiod", "signalperiod"]),
"MACDEXT": ("fastperiod", ["slowperiod", "signalperiod"]),
"MACDFIX": ("signalperiod", []),
"STOCH": ("fastk_period", ["slowk_period", "slowd_period"]),
"STOCHF": ("fastk_period", ["fastd_period"]),
"STOCHRSI": ("timeperiod", ["fastk_period", "fastd_period"]),
"BBANDS": ("timeperiod", ["nbdevup", "nbdevdn"]),
"APO": ("fastperiod", ["slowperiod"]),
"PPO": ("fastperiod", ["slowperiod"]),
"ULTOSC": ("timeperiod1", ["timeperiod2", "timeperiod3"]),
"BOP": ("", []), # 无period参数注册默认版本
}
def build_talib_wrapper(func, func_name, fixed_params=None):
"""构建talib函数包装器支持固定某些参数"""
fixed_params = fixed_params or {}
def _talib_wrap(*args, **kwargs):
# 合并固定参数和传入参数
merged_kwargs = {**fixed_params, **kwargs}
# ta-lib 有些函数只支持关键字参数
# 自动转换所有输入类型
args = tuple(_convert_input(arg) for arg in args)
for k in merged_kwargs:
merged_kwargs[k] = _convert_input(merged_kwargs[k])
result = func(*args, **merged_kwargs)
# TA-Lib有些输出是tuple比如MACD统一返回ndarray/tuple[ndarray]
if isinstance(result, tuple):
# 保持tuple结构
return tuple(
np.asarray(item) if item is not None else None for item in result
)
return np.asarray(result)
_talib_wrap.__name__ = f"talib_{func_name.lower()}"
return _talib_wrap
for func_name in talib_func_list:
func = getattr(talib, func_name)
sig = inspect.signature(func)
params = sig.parameters
# 检查是否在特殊配置字典中
if func_name in MULTI_PERIOD_FUNCTIONS:
main_period_param, _ = MULTI_PERIOD_FUNCTIONS[func_name]
# 如果配置中指定了主要period参数使用它
if main_period_param and main_period_param in params:
for period_value in PERIOD_RANGE:
fixed_params = {main_period_param: period_value}
wrapper = build_talib_wrapper(func, func_name, fixed_params)
op_name = f"talib_{func_name.lower()}_{period_value}"
desc = f"ta-lib: {func_name}({main_period_param}={period_value})"
_registry.register_function(op_name, wrapper, desc)
else:
# 配置中指定无period参数注册默认版本
wrapper = build_talib_wrapper(func, func_name)
op_name = f"talib_{func_name.lower()}"
desc = f"ta-lib: {func_name}"
_registry.register_function(op_name, wrapper, desc)
else:
# 不在特殊配置中自动检测period参数
period_params = {}
for param_name, param in params.items():
param_lower = param_name.lower()
# 检查是否是period相关参数
if any(
period_keyword in param_lower for period_keyword in PERIOD_PARAM_NAMES
):
period_params[param_name] = param
if period_params:
# 如果有period参数为每个period值生成一个版本
# 优先选择timeperiod否则选择第一个
main_period_param = None
for preferred in ["timeperiod", "period", "optintimeperiod"]:
for param_name in period_params.keys():
if preferred in param_name.lower():
main_period_param = param_name
break
if main_period_param:
break
if not main_period_param:
main_period_param = list(period_params.keys())[0]
for period_value in PERIOD_RANGE:
fixed_params = {main_period_param: period_value}
wrapper = build_talib_wrapper(func, func_name, fixed_params)
op_name = f"talib_{func_name.lower()}_{period_value}"
desc = f"ta-lib: {func_name}({main_period_param}={period_value})"
_registry.register_function(op_name, wrapper, desc)
else:
# 如果没有period参数注册默认版本
wrapper = build_talib_wrapper(func, func_name)
op_name = f"talib_{func_name.lower()}"
desc = f"ta-lib: {func_name}"
_registry.register_function(op_name, wrapper, desc)
# ==================== 自定义常见技术指标 ====================
def _ewm_forward(x: np.ndarray, alpha: float) -> np.ndarray:
"""指数加权移动平均(前向计算)"""
result = np.zeros_like(x)
if len(x) == 0:
return result
result[0] = x[0]
for i in range(1, len(x)):
result[i] = x[i] * alpha + (1 - alpha) * result[i - 1]
return result
def _rsv(x: np.ndarray, window: int) -> np.ndarray:
"""相对强弱值: (当前值 - 最小值) / (最大值 - 最小值)"""
s = pd.Series(x)
rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both")
min_val = rolling.min()
max_val = rolling.max()
diff = max_val - min_val
# 避免除零
diff = np.where(np.abs(diff) < 1e-12, np.nan, diff)
result = (s - min_val) / diff
return result.to_numpy()
def _bband(x: np.ndarray, window: int) -> np.ndarray:
"""布林带指标: (当前值 - 均值) / 标准差"""
s = pd.Series(x)
rolling = s.rolling(window, min_periods=max(2, window // 2), closed="both")
mean_val = rolling.mean()
std_val = rolling.std()
# 避免除零
std_val = np.where(np.abs(std_val) < 1e-12, np.nan, std_val)
result = (s - mean_val) / std_val
return result.to_numpy()
def _rsi(x: np.ndarray, window: int, threshold: float = 0.00001) -> np.ndarray:
"""相对强弱指标: 上涨和下跌的比例"""
s = pd.Series(x)
diff = s.diff()
rolling = diff.rolling(window, min_periods=max(2, window // 2), closed="both")
def _rsi_calc(series):
up_sum = series[series > threshold].sum()
down_sum = abs(series[series < -threshold].sum())
total = up_sum + down_sum
if total < 1e-12:
return np.nan
return up_sum / total
result = rolling.apply(_rsi_calc, raw=False)
return result.to_numpy()
def _rolling_skew(x: np.ndarray, window: int) -> np.ndarray:
"""滚动偏度"""
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2), closed="both")
.skew()
.to_numpy()
)
def _rolling_kurtosis(x: np.ndarray, window: int) -> np.ndarray:
"""滚动峰度"""
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2), closed="both")
.kurt()
.to_numpy()
)
def _rolling_linear(x: np.ndarray, window: int) -> np.ndarray:
"""滚动线性回归斜率"""
s = pd.Series(x)
def _linear_slope(series):
valid = series.dropna()
if len(valid) < 2:
return np.nan
try:
coeffs = np.polyfit(np.arange(len(valid)), valid.values, 1)
return coeffs[0]
except:
return np.nan
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
_linear_slope, raw=False
)
return result.to_numpy()
def _rolling_autocorr(x: np.ndarray, window: int, lag: int = 1) -> np.ndarray:
"""滚动自相关"""
s = pd.Series(x)
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
lambda series: (
series.autocorr(lag=lag) if len(series.dropna()) >= 2 else np.nan
),
raw=False,
)
return result.to_numpy()
def _rolling_max(x: np.ndarray, window: int) -> np.ndarray:
"""滚动最大值"""
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2), closed="both")
.max()
.to_numpy()
)
def _rolling_min(x: np.ndarray, window: int) -> np.ndarray:
"""滚动最小值"""
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2), closed="both")
.min()
.to_numpy()
)
def _huanbi(x: np.ndarray, window: int) -> np.ndarray:
"""环比: 当前值 / 窗口起始值"""
s = pd.Series(x)
def _huanbi_calc(series):
if len(series) < 2:
return np.nan
start_val = series.iloc[0]
end_val = series.iloc[-1]
if abs(start_val) < 1e-12:
return np.nan
return end_val / start_val
result = s.rolling(window, min_periods=max(2, window // 2), closed="both").apply(
_huanbi_calc, raw=False
)
return result.to_numpy()
# 注册技术指标算子(带不同窗口)
for w in PERIOD_RANGE:
# EWM算子使用固定alpha值
alpha = 2.0 / (w + 1)
_registry.register_function(
f"ewm{w}",
lambda x, w=w, a=alpha: _ewm_forward(x, a),
f"指数加权移动平均: EWM(x, {w})",
)
# 百分比变化
_registry.register_function(
f"pct{w}", lambda x, w=w: _pct_change(x, w), f"百分比变化: PCT(x, {w})"
)
# RSV相对强弱值
_registry.register_function(
f"rsv{w}", lambda x, w=w: _rsv(x, w), f"相对强弱值: RSV(x, {w})"
)
# 布林带
_registry.register_function(
f"bband{w}", lambda x, w=w: _bband(x, w), f"布林带指标: BBAND(x, {w})"
)
# RSI
_registry.register_function(
f"rsi{w}", lambda x, w=w: _rsi(x, w), f"相对强弱指标: RSI(x, {w})"
)
# 统计量
_registry.register_function(
f"skew{w}", lambda x, w=w: _rolling_skew(x, w), f"滚动偏度: SKEW(x, {w})"
)
_registry.register_function(
f"kurt{w}", lambda x, w=w: _rolling_kurtosis(x, w), f"滚动峰度: KURT(x, {w})"
)
_registry.register_function(
f"linear{w}",
lambda x, w=w: _rolling_linear(x, w),
f"滚动线性斜率: LINEAR(x, {w})",
)
_registry.register_function(
f"autocorr{w}",
lambda x, w=w: _rolling_autocorr(x, w),
f"滚动自相关: AUTOCORR(x, {w})",
)
_registry.register_function(
f"max{w}", lambda x, w=w: _rolling_max(x, w), f"滚动最大值: MAX(x, {w})"
)
_registry.register_function(
f"min{w}", lambda x, w=w: _rolling_min(x, w), f"滚动最小值: MIN(x, {w})"
)
# 环比
_registry.register_function(
f"huanbi{w}", lambda x, w=w: _huanbi(x, w), f"环比: HUANBI(x, {w})"
)
# ==================== 因子公式解析与计算 ====================
class FactorFormula:
"""因子公式:支持序列化和反序列化"""
def __init__(self, expression: str, feature_names: List[str]):
"""
Parameters:
-----------
expression : str
因子表达式(使用算子名称)
feature_names : List[str]
特征名称列表
"""
self.expression = expression
self.feature_names = feature_names
def compute(self, features: Dict[str, np.ndarray]) -> np.ndarray:
"""
计算因子值
Parameters:
-----------
features : Dict[str, np.ndarray]
特征字典key为特征名称
Returns:
--------
np.ndarray: 因子值
"""
# 构建计算环境
env = {}
# 添加特征
for name in self.feature_names:
if name not in features:
raise KeyError(f"特征 '{name}' 不存在")
env[name] = features[name]
# 添加算子
for op_name in _registry.list_all():
op = _registry.get(op_name)
if op:
env[op_name] = op.func
# 添加numpy和pandas用于某些表达式
env["np"] = np
env["pd"] = pd
# 执行表达式
try:
# 限制可用的内置函数
safe_builtins = {
"abs": abs,
"min": min,
"max": max,
"sum": sum,
"len": len,
}
result = eval(self.expression, {"__builtins__": safe_builtins}, env)
# 确保结果是numpy数组
if not isinstance(result, np.ndarray):
if isinstance(result, (int, float)):
# 标量转换为数组(广播)
result = np.full(len(features[self.feature_names[0]]), result)
else:
result = np.array(result)
# 确保长度一致
expected_len = len(features[self.feature_names[0]])
if len(result) != expected_len:
raise ValueError(
f"表达式结果长度 {len(result)} 与特征长度 {expected_len} 不匹配"
)
return result
except Exception as e:
raise RuntimeError(f"计算因子表达式失败: {e}\n表达式: {self.expression}")
def to_dict(self) -> Dict:
"""序列化为字典"""
return {"expression": self.expression, "feature_names": self.feature_names}
@classmethod
def from_dict(cls, data: Dict) -> "FactorFormula":
"""从字典反序列化"""
return cls(data["expression"], data["feature_names"])
def __repr__(self):
return f"FactorFormula(expression='{self.expression}', features={self.feature_names})"

237
factor_mining/validator.py Normal file
View File

@@ -0,0 +1,237 @@
"""
因子有效性检验模块:整合所有检验方案
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from dataclasses import dataclass
from statsmodels.regression.linear_model import OLS
from validation import (
compute_ic,
compute_rolling_ic,
group_backtest,
factor_span_regression
)
@dataclass
class ValidationConfig:
"""验证配置"""
ic_window: int = 30
ic_method: str = "spearman" # "spearman" or "pearson"
n_groups: int = 3
group_period: int = 180
min_ic: float = 0.01
min_tstat: float = 1.5
min_r2_change: float = 0.05
class FactorValidator:
"""因子有效性检验器"""
def __init__(self, config: ValidationConfig):
self.config = config
def validate_ic(
self,
factor: pd.Series,
forward_return: pd.Series
) -> Dict:
"""
IC检验
Returns:
--------
dict: 包含mean_ic, ic_ir, ic_series等
"""
rolling_ic = compute_rolling_ic(
factor,
forward_return,
window=self.config.ic_window,
method=self.config.ic_method
)
mean_ic = rolling_ic.mean()
ic_std = rolling_ic.std()
ic_ir = mean_ic / (ic_std + 1e-8) # IC信息比率
return {
"mean_ic": mean_ic,
"ic_std": ic_std,
"ic_ir": ic_ir,
"ic_series": rolling_ic,
"is_valid": abs(mean_ic) >= self.config.min_ic
}
def validate_group_backtest(
self,
factor: pd.Series,
forward_return: pd.Series
) -> Dict:
"""
分组回测检验
Returns:
--------
dict: 包含mean_h_l_return, mean_h_l_tstat等
"""
result = group_backtest(
factor,
forward_return,
n_groups=self.config.n_groups,
group_period=self.config.group_period
)
is_valid = abs(result.get('mean_h_l_tstat', 0)) >= self.config.min_tstat
return {
**result,
"is_valid": is_valid
}
def validate_regression(
self,
factor: pd.Series,
forward_return: pd.Series,
other_factors: Optional[pd.DataFrame] = None
) -> Dict:
"""
因子跨度回归检验
Parameters:
-----------
factor : Series
待检验因子
forward_return : Series
未来收益率
other_factors : DataFrame, optional
其他因子(用于控制变量)
Returns:
--------
dict: 包含beta, tstat, r2_change等
"""
if other_factors is None:
other_factors = pd.DataFrame()
# 合并因子
factors_df = pd.concat([other_factors, factor.to_frame(name='target')], axis=1)
result = factor_span_regression(
factors_df,
forward_return,
target_factor='target'
)
is_valid = (
abs(result.get('tstat', 0)) >= self.config.min_tstat and
result.get('r2_change', 0) >= self.config.min_r2_change
)
return {
**result,
"is_valid": is_valid
}
def validate_all(
self,
factor: pd.Series,
forward_return: pd.Series,
other_factors: Optional[pd.DataFrame] = None
) -> Dict:
"""
综合检验:执行所有检验方法
Returns:
--------
dict: 包含所有检验结果和综合判断
"""
results = {}
# IC检验
ic_result = self.validate_ic(factor, forward_return)
results['ic'] = ic_result
# 分组回测
group_result = self.validate_group_backtest(factor, forward_return)
results['group_backtest'] = group_result
# 回归检验
reg_result = self.validate_regression(factor, forward_return, other_factors)
results['regression'] = reg_result
# 综合判断
is_valid = (
ic_result['is_valid'] and
group_result['is_valid'] and
reg_result['is_valid']
)
results['is_valid'] = is_valid
results['score'] = self._calculate_score(ic_result, group_result, reg_result)
return results
def _calculate_score(
self,
ic_result: Dict,
group_result: Dict,
reg_result: Dict
) -> float:
"""计算综合得分"""
score = 0.0
# IC得分权重0.3
ic_score = abs(ic_result.get('mean_ic', 0)) * 10
score += ic_score * 0.3
# 分组回测得分权重0.4
tstat = abs(group_result.get('mean_h_l_tstat', 0))
tstat_score = min(tstat / 3.0, 1.0) # 归一化到[0, 1]
score += tstat_score * 0.4
# 回归得分权重0.3
r2_change = reg_result.get('r2_change', 0)
r2_score = min(r2_change / 0.1, 1.0) # 归一化到[0, 1]
score += r2_score * 0.3
return score
def filter_factors(
self,
factors: pd.DataFrame,
forward_return: pd.Series
) -> pd.DataFrame:
"""
批量过滤因子:只保留有效因子
Returns:
--------
DataFrame: 有效因子
"""
valid_factors = []
for col in factors.columns:
factor = factors[col]
result = self.validate_all(factor, forward_return, factors.drop(columns=[col]))
if result['is_valid']:
valid_factors.append(col)
return factors[valid_factors] if valid_factors else pd.DataFrame()
def create_validator(
ic_window: int = 30,
min_ic: float = 0.01,
min_tstat: float = 1.5
) -> FactorValidator:
"""创建验证器(便捷函数)"""
config = ValidationConfig(
ic_window=ic_window,
min_ic=min_ic,
min_tstat=min_tstat
)
return FactorValidator(config)

View File

@@ -1,113 +0,0 @@
"""
因子挖掘模块:支持规则因子和遗传编程因子
"""
import numpy as np
import pandas as pd
from typing import Callable, Dict, List, Optional
from abc import ABC, abstractmethod
class BaseFactor(ABC):
"""因子基类"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算因子值"""
pass
class RuleFactor(BaseFactor):
"""规则因子:基于固定规则"""
def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]):
super().__init__(name)
self.compute_func = compute_func
def compute(self, data: pd.DataFrame) -> pd.Series:
return self.compute_func(data)
def create_trend_factor(data: pd.DataFrame) -> pd.Series:
"""趋势因子:价格趋势方向"""
trend = pd.Series(0, index=data.index)
trend[data['close'] > data['ema16']] = 1
trend[data['close'] < data['ema4']] = -1
return trend
def create_volatility_factor(data: pd.DataFrame) -> pd.Series:
"""波动率因子滚动12期收益率标准差"""
return data['volatility']
def create_volume_price_factor(data: pd.DataFrame) -> pd.Series:
"""量价因子:成交量放大且价格上涨"""
volume_signal = (data['volume'] > data['volume_ma6']).astype(int)
return volume_signal * data['return']
def create_reversal_factor(data: pd.DataFrame) -> pd.Series:
"""反转因子:短期反转效应"""
return -data['return'].shift(1)
def create_momentum_factor(data: pd.DataFrame) -> pd.Series:
"""动量因子基于MACD"""
return data['macd']
def create_rsi_factor(data: pd.DataFrame) -> pd.Series:
"""RSI因子相对强弱指数标准化"""
return (data['rsi'] - 50) / 50 # 归一化到[-1, 1]
class FactorMiner:
"""因子挖掘器"""
def __init__(self):
self.factors: Dict[str, BaseFactor] = {}
def register_factor(self, factor: BaseFactor):
"""注册因子"""
self.factors[factor.name] = factor
def register_rule_factor(self, name: str, compute_func: Callable):
"""注册规则因子"""
factor = RuleFactor(name, compute_func)
self.register_factor(factor)
def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame:
"""计算所有因子"""
factor_df = pd.DataFrame(index=data.index)
for name, factor in self.factors.items():
try:
factor_df[name] = factor.compute(data)
except Exception as e:
print(f"计算因子 {name} 时出错: {e}")
factor_df[name] = np.nan
return factor_df
def get_factor(self, name: str) -> Optional[BaseFactor]:
"""获取指定因子"""
return self.factors.get(name)
def create_default_factors() -> FactorMiner:
"""创建默认因子集合"""
miner = FactorMiner()
# 注册基础因子
miner.register_rule_factor('TREND', create_trend_factor)
miner.register_rule_factor('VOL', create_volatility_factor)
miner.register_rule_factor('VOLP', create_volume_price_factor)
miner.register_rule_factor('REV', create_reversal_factor)
miner.register_rule_factor('MOM', create_momentum_factor)
miner.register_rule_factor('RSI', create_rsi_factor)
return miner

109
signal.py
View File

@@ -1,109 +0,0 @@
"""
信号生成模块
"""
import numpy as np
import pandas as pd
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from pandas import Series
def generate_signals(
score: 'pd.Series',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30,
use_rolling_std: bool = True
) -> 'pd.Series':
"""
基于因子得分生成买卖信号
Parameters:
-----------
score : Series
因子综合得分
buy_threshold : float
买入阈值(标准差倍数)
sell_threshold : float
卖出阈值(标准差倍数)
window : int
滚动窗口(用于计算标准差)
use_rolling_std : bool
是否使用滚动标准差
Returns:
--------
Series: 交易信号1=买入,-1=卖出0=持有)
"""
signals = pd.Series(0, index=score.index)
if use_rolling_std:
# 使用滚动标准差
rolling_std = score.rolling(window).std()
buy_line = buy_threshold * rolling_std
sell_line = sell_threshold * rolling_std
else:
# 使用固定阈值
std = score.std()
buy_line = buy_threshold * std
sell_line = sell_threshold * std
# 生成原始信号
raw_signals = pd.Series(0, index=score.index)
raw_signals[score > buy_line] = 1 # 买入信号
raw_signals[score < sell_line] = -1 # 卖出信号
# 只在信号变化时产生交易信号,其他时候保持持仓状态
signals = pd.Series(0, index=score.index)
position = 0 # 当前持仓状态0=空仓1=满仓
for i in range(len(raw_signals)):
current_signal = raw_signals.iloc[i]
# 只在信号变化时产生交易
if current_signal == 1 and position == 0:
signals.iloc[i] = 1 # 买入
position = 1
elif current_signal == -1 and position == 1:
signals.iloc[i] = -1 # 卖出
position = 0
# 其他情况保持当前持仓状态,不产生交易信号
return signals.astype(int)
def generate_signals_with_position(
score: 'pd.Series',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30,
current_position: int = 0
) -> 'pd.Series':
"""
生成信号(考虑当前持仓状态)
Parameters:
-----------
current_position : int
当前持仓0=空仓1=满仓
"""
raw_signals = generate_signals(score, buy_threshold, sell_threshold, window)
signals = pd.Series(0, index=score.index)
position = current_position
for i in range(len(raw_signals)):
signal = raw_signals.iloc[i]
if signal == 1 and position == 0:
signals.iloc[i] = 1 # 买入
position = 1
elif signal == -1 and position == 1:
signals.iloc[i] = -1 # 卖出
position = 0
else:
signals.iloc[i] = 0 # 持有
return signals

View File

@@ -1,62 +1,43 @@
"""
因子检验模块IC检验、分组回测、因子跨度回归
因子检验模块: IC检验、分组回测、因子跨度回归
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from statsmodels.regression.linear_model import OLS
def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series:
"""
计算IC信息系数
Parameters:
-----------
factor : Series
因子值
forward_return : Series
未来收益率
method : str
相关性计算方法:'spearman''pearson'
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < 10:
return pd.Series(dtype=float)
if method == 'spearman':
ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank())
else:
ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1])
return pd.Series([ic], index=[aligned.index[-1]])
def compute_rolling_ic(
factor: pd.Series,
forward_return: pd.Series,
window: int = 30,
method: str = 'spearman'
method: str = "spearman",
) -> pd.Series:
"""计算滚动IC向量化优化"""
"""计算滚动IC (向量化优化)"""
# 对齐数据
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < window:
return pd.Series(dtype=float, index=factor.index[window:])
aligned.columns = ['factor', 'return']
aligned.columns = ["factor", "return"]
if method == 'spearman':
if method == "spearman":
# 使用rank计算Spearman相关性
factor_rank = aligned['factor'].rank()
return_rank = aligned['return'].rank()
# 使用DataFrame的rolling().corr()方法
df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank})
ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return'])
# 这里是全局的 rank理论上应该是按照 window 滚动排序
factor_rank = aligned["factor"].rank()
return_rank = aligned["return"].rank()
# 使用DataFrame的rolling().corr()方法, 该方法pandas优化过
df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank})
ic_series = (
df_rank["factor"]
.rolling(window, min_periods=window)
.corr(df_rank["return"])
)
else:
# Pearson相关性
df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']})
ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return'])
df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]})
ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"])
return ic_series
@@ -65,7 +46,7 @@ def group_backtest(
factor: pd.Series,
forward_return: pd.Series,
n_groups: int = 3,
group_period: int = 180
group_period: int = 180,
) -> Dict:
"""
分组回测:将数据按因子值分组,计算各组收益
@@ -75,14 +56,9 @@ def group_backtest(
dict: 包含各组收益、H-L收益差、t统计量等
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
aligned.columns = ['factor', 'return']
aligned.columns = ["factor", "return"]
results = {
'group_returns': [],
'h_l_return': [],
'h_l_tstat': [],
'periods': []
}
results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []}
# 按月分组每180个4h周期- 使用更高效的步长
step = max(group_period // 2, 90) # 减少重叠计算
@@ -96,50 +72,45 @@ def group_backtest(
# 按因子值分组(向量化)
try:
period_data = period_data.copy()
period_data['group'] = pd.qcut(
period_data['factor'],
q=n_groups,
labels=False,
duplicates='drop'
period_data["group"] = pd.qcut(
period_data["factor"], q=n_groups, labels=False, duplicates="drop"
)
# 计算各组收益(向量化)
group_returns = period_data.groupby('group')['return'].mean()
results['group_returns'].append(group_returns)
group_returns = period_data.groupby("group")["return"].mean()
results["group_returns"].append(group_returns)
# H-L收益差
if len(group_returns) >= 2:
h_return = group_returns.iloc[-1] # 高因子组
l_return = group_returns.iloc[0] # 低因子组
l_return = group_returns.iloc[0] # 低因子组
h_l_diff = h_return - l_return
results['h_l_return'].append(h_l_diff)
results['periods'].append(period_data.index[-1])
results["h_l_return"].append(h_l_diff)
results["periods"].append(period_data.index[-1])
except (ValueError, KeyError):
# qcut失败时跳过
continue
# 计算平均H-L收益和t统计量
if results['h_l_return']:
h_l_series = pd.Series(results['h_l_return'], index=results['periods'])
if results["h_l_return"]:
h_l_series = pd.Series(results["h_l_return"], index=results["periods"])
mean_h_l = h_l_series.mean()
std_h_l = h_l_series.std()
t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8)
results['mean_h_l_return'] = mean_h_l
results['mean_h_l_tstat'] = t_stat
results['h_l_series'] = h_l_series
results["mean_h_l_return"] = mean_h_l
results["mean_h_l_tstat"] = t_stat
results["h_l_series"] = h_l_series
else:
results['mean_h_l_return'] = 0
results['mean_h_l_tstat'] = 0
results["mean_h_l_return"] = 0
results["mean_h_l_tstat"] = 0
return results
def factor_span_regression(
factors: pd.DataFrame,
forward_return: pd.Series,
target_factor: str
factors: pd.DataFrame, forward_return: pd.Series, target_factor: str
) -> Dict:
"""
因子跨度回归:检验因子的边际解释力
@@ -160,14 +131,14 @@ def factor_span_regression(
# 对齐数据
data = pd.concat([factors, forward_return], axis=1).dropna()
if len(data) < 30:
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
y = data.iloc[:, -1].values
X_all = data.iloc[:, :-1].values
# 全模型(包含目标因子)
try:
model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_all = model_all.rsquared
# 目标因子的系数和t统计量
@@ -177,28 +148,25 @@ def factor_span_regression(
# 不含目标因子的模型
X_without = np.delete(X_all, target_idx, axis=1)
model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_without = model_without.rsquared
r2_change = r2_all - r2_without
return {
'beta': beta,
'tstat': tstat,
'r2': r2_all,
'r2_change': r2_change,
'pvalue': model_all.pvalues[target_idx]
"beta": beta,
"tstat": tstat,
"r2": r2_all,
"r2_change": r2_change,
"pvalue": model_all.pvalues[target_idx],
}
except Exception as e:
print(f"回归分析出错: {e}")
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
def validate_factor(
factor: pd.Series,
forward_return: pd.Series,
ic_window: int = 30,
n_groups: int = 3
factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3
) -> Dict:
"""
综合因子检验
@@ -216,11 +184,10 @@ def validate_factor(
group_result = group_backtest(factor, forward_return, n_groups=n_groups)
return {
'mean_ic': mean_ic,
'ic_ir': ic_ir,
'ic_series': rolling_ic,
'mean_h_l_return': group_result['mean_h_l_return'],
'mean_h_l_tstat': group_result['mean_h_l_tstat'],
'group_returns': group_result['group_returns']
"mean_ic": mean_ic,
"ic_ir": ic_ir,
"ic_series": rolling_ic,
"mean_h_l_return": group_result["mean_h_l_return"],
"mean_h_l_tstat": group_result["mean_h_l_tstat"],
"group_returns": group_result["group_returns"],
}