Compare commits

..

4 Commits

Author SHA1 Message Date
f28b225d42 生成信号 2025-11-18 00:14:34 +08:00
abcb185505 时间序列算子拆出来; 2025-11-09 23:07:20 +08:00
e5beada25e 添加talib算子 2025-11-09 20:19:08 +08:00
dc3d41d6e5 factor minner 2025-11-09 14:00:58 +08:00
19 changed files with 22693 additions and 1467 deletions

179
.gitignore vendored Normal file
View File

@@ -0,0 +1,179 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Data files (keep structure but ignore large data)
data/
# IDE files
.vscode/
.idea/
*.swp
*.swo
*~
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Docker
.dockerignore
# Logs
*.log
logs/
# Temporary files
tmp/
temp/
*.tmp
*.temp
# API keys and secrets
.env
config.ini
secrets.json
api_keys.txt
# Database files
*.db
*.sqlite
*.sqlite3
# Backup files
*.bak
*.backup
*.csv
*.feather

13910
IC.ipynb Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1,121 +0,0 @@
# 基于BTC 4h数据的时间序列因子模型实操流程因子挖掘→检验→回测→信号生成
结合《ssrn.3255748.pdf》中时间序列TS因子模型的核心逻辑以及高维高频金融数据建模的前沿方法如投影主成分分析P-PCA以下为针对BTC 4h数据的完整实操流程包含每一步的理论原理、操作细节及论文引用依据。
## 一、数据准备与预处理(基础步骤,确保数据质量)
### 1. 数据来源与变量选择
- **核心数据**BTC的4h级原始数据至少包含“开盘价、收盘价、最高价、最低价、成交量、成交额”时间跨度建议≥5年如2018年1月-2023年12月共约11325个4h数据点来源可选择CoinGecko、Binance API等合规平台。
- **扩展变量**基于原始数据计算技术指标变量作为因子候选参考《ssrn.3255748.pdf》中“资产特征驱动因子”的逻辑具体包括
- 收益类4h收益率\(R_t = \ln(Close_t/Close_{t-1})\)、滚动12期48h收益率标准差波动率、滚动6期24h收益率偏度尾部风险
- 趋势类EMA指数移动平均如4期/8期/16期、MACD异同移动平均线、RSI相对强弱指数14期
- 量能类成交量滚动6期均值、成交额/成交量比值(量价配合度);
- 波动类ATR平均真实波幅14期、高低价差率\((High_t-Low_t)/Close_{t-1}\))。
### 2. 数据预处理(消除噪声与异常值)
- **异常值处理**采用“3σ法则”识别异常收益率如单日涨跌幅20%的4h数据用前后相邻数据的线性插值替换避免极端值对因子估计的干扰——这与中山大学研究中“抑制高频特异性波动”的思路一致摘要1、5该研究指出高频数据中的异常波动会扭曲因子估计需通过预处理降低噪声。
- **缺失值填补**:若存在数据缺失(如交易所维护导致的断更),采用“前向填充+滚动均值平滑”如用前3期均值填补确保时间序列的连续性。
- **标准化**对所有候选因子变量进行“Z-score标准化”\(X_{std}=(X-\mu)/\sigma\),其中\(\mu\)、\(\sigma\)为滚动30期120h的均值和标准差避免量纲差异影响因子权重——参考《ssrn.3255748.pdf》中CS因子“标准化匹配TS因子标准差”的操作逻辑
## 二、因子挖掘:基于时间序列逻辑构建候选因子
### 1. 因子构建原则贴合TS因子“预设规则、可解释”的核心特性
根据《ssrn.3255748.pdf》中TS因子的构建逻辑BTC 4h因子需满足“基于固定规则、反映特定风险/收益驱动逻辑”避免CS因子“月度优化、非可投资”的缺陷。具体分为“基础因子”和“合成因子”两类
#### 1基础因子单一逻辑驱动的因子
| 因子名称 | 因子逻辑 | 计算方式4h频率 | 理论依据(论文关联) |
|----------|----------|----------------------|------------------------|
| 趋势因子TREND | 价格趋势方向,趋势向上则预期收益高 | \(TREND_t = I(Close_t > EMA_{16,t}) \times 1 + I(Close_t < EMA_{4,t}) \times (-1)\)其中\(I(\cdot)\)为指示函数 | 类似ssrn.3255748.pdf特征驱动收益逻辑用EMA交叉反映趋势特征 |
| 波动率因子VOL | 波动率越高风险溢价越高 | \(VOL_t = \text{滚动12期收益率标准差}\) | 对应中山大学研究中高频波动捕捉风险的思路摘要15波动率是高频金融数据的核心风险特征 |
| 量价因子VOLP | 量价配合度成交量放大且价格上涨则动量强 | \(VOLP_t = I(Volume_t > \text{滚动6期Volume均值}) \times R_t\) | 参考《ssrn.3255748.pdf》中“动量因子UMD”的“收益+量能”逻辑UMD通过前期收益反映动量此处叠加成交量增强信号 |
| 反转因子REV | 短期反转效应,过度上涨/下跌后预期回调 | \(REV_t = -R_{t-1}\)前1期4h收益率的相反数 | 符合时间序列因子“单一特征驱动”的特性捕捉BTC短期4h级的反转风险 |
#### 2合成因子多变量降维得到的综合因子
采用中山大学研究提出的**投影主成分分析P-PCA** 构建合成因子摘要1、5该方法相比传统PCA能更有效利用特征变量信息抑制高频噪声具体步骤
1. **选择输入变量**将上述基础因子TREND、VOL、VOLP、REV及3个核心技术指标MACD差值、RSI、ATR作为P-PCA的输入矩阵\(X_{T \times K}\)T为时间维度K=7为变量维度
2. **投影步骤**根据P-PCA理论摘要1、5先将输入变量投影到“可观测特征空间”此处选择“滚动12期收益率”作为工具变量反映BTC收益的长期动态得到投影矩阵\(P\)
3. **提取主成分**:对投影后的矩阵\(P \times X\)进行PCA取前2个主成分累计方差解释率需≥80%)作为合成因子:
- 合成因子1PC1命名为“趋势-量能因子”权重集中在TREND、VOLP反映趋势与量能的协同效应
- 合成因子2PC2命名为“风险因子”权重集中在VOL、ATR反映4h级的风险暴露程度。
4. **因子方向校准**通过“因子与未来1期收益率的相关性”调整方向如PC1与\(R_{t+1}\)正相关则保留原方向负相关则取反确保因子值越高预期收益越高——贴合《ssrn.3255748.pdf》中“因子收益差为正”的TS因子设计逻辑如HML=高BM收益-低BM收益
## 三、因子检验:验证因子的有效性与稳健性
### 1. 因子收益检验(核心:因子能否区分未来收益)
参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的检验逻辑对每个候选因子进行“分组回测”步骤如下
- **分组规则**每月按4h频率约180个数据点将BTC 4h数据按因子值分为3组低因子组L、中因子组M、高因子组H
- **计算组收益**每组的4h收益为该组内因子值对应的BTC收益率因仅单标的此处为“因子值分位数对应的收益”如高因子组H为因子值前30%的4h数据的平均收益
- **检验指标**
- 因子收益差:\(H-L\)收益(高因子组收益 - 低因子组收益),若显著为正,说明因子能区分收益;
- t统计量用Newey-West调整的t统计量滞后6期对应24h检验\(H-L\)收益的显著性避免自相关导致的虚假显著参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的计算方式
**示例结果要求**如趋势因子TREND的\(H-L\)收益为0.35%/4h年化约84%t统计量=2.892显著说明该因子有效。
### 2. 因子跨度回归(检验因子的边际解释力)
根据《ssrn.3255748.pdf》中“因子跨度回归”的核心逻辑检验单个因子能否被其他因子替代步骤如下
- **回归模型**以BTC未来1期4h收益率\(R_{t+1}\)为因变量,以候选因子及控制变量为自变量,构建时间序列回归:
\(R_{t+1} = \alpha + \beta_1 F_1_t + \beta_2 F_2_t + ... + \beta_k F_k_t + e_{t+1}\)
其中\(F_1,F_2,...F_k\)为候选因子,\(\alpha\)为定价误差,\(\beta_i\)为因子载荷。
- **检验标准**:若某因子的\(\beta_i\)显著不为0t统计量2且加入该因子后模型\(R^2\)提升≥5%说明该因子具有“不可替代的边际解释力”未被其他因子吸收——类似《ssrn.3255748.pdf》中TS因子“市场、规模因子边际信息显著”的结论
**示例**若加入合成因子PC1后模型\(R^2\)从0.12提升至0.18PC1的\(\beta=0.25\)t=3.12说明PC1具有独立解释力。
### 3. 稳健性检验(排除偶然因素)
- **样本外检验**将数据分为“训练集2018-2021年”和“样本外集2022-2023年若因子在样本外的\(H-L\)收益t统计量仍1.8接近显著说明因子稳健——参考《ssrn.3255748.pdf》中“跨样本验证因子表现”的逻辑
- **频率敏感性检验**将4h频率调整为2h或8h若因子收益差的显著性变化≤20%说明因子不受频率小幅变动影响——符合中山大学研究中“高频因子需跨频率稳健”的要求摘要1、5
## 四、因子组合构建多因子模型贴合TS模型“常数载荷、可投资”特性
### 1. 因子权重确定避免CS模型“月度优化”的复杂性
根据《ssrn.3255748.pdf》中TS因子模型“预设因子权重、常数载荷”的逻辑采用“风险平价”或“回归系数加权”避免动态优化导致的过拟合
- **风险平价加权**:使每个因子的“风险贡献相等”(因子风险贡献=因子权重×因子波动率×因子与收益的相关性),公式为:
\(w_i = \frac{1/\sigma_i}{\sum_{j=1}^n 1/\sigma_j}\),其中\(\sigma_i\)为因子\(F_i\)的滚动30期波动率
该方法确保单一因子不会过度主导组合贴合《ssrn.3255748.pdf》中“多因子分散风险”的思路如FF五因子模型的等权重逻辑
- **回归系数加权**:用训练集的时间序列回归系数作为权重(如因子\(F_i\)的权重\(w_i = \beta_i / \sum_{j=1}^n |\beta_j|\)\(\beta_i\)为\(R_{t+1}\)对\(F_i\)的回归系数),确保权重与因子解释力正相关。
### 2. 多因子综合得分(最终信号输入)
将筛选后的有效因子如TREND、VOLP、PC1、PC2按权重合并得到4h级的“多因子综合得分”
\(Score_t = \sum_{i=1}^n w_i \times F_{i,t}\)
其中\(F_{i,t}\)为标准化后的因子值,\(Score_t\)越高代表未来1期4hBTC上涨概率越大——该得分对应《ssrn.3255748.pdf》中“因子组合预测收益”的逻辑通过多因子协同提升预测准确性。
## 五、回测:验证因子模型的实战有效性
### 1. 回测框架设计贴合TS因子“可投资”的核心优势
参考《ssrn.3255748.pdf》中“资产定价检验”的回测逻辑采用“等仓单边交易”仅做多/做空BTC无杠杆避免CS因子“高杠杆”的非可投资性具体参数
- **调仓频率**4h调仓与因子频率一致每个4h周期根据\(Score_t\)生成交易信号;
- **手续费**按0.1%/次现货交易手续费参考Binance等平台滑点按0.05%/次4h级BTC流动性充足滑点较低
- **回测区间**2019年1月-2023年12月共约4680个4h数据点包含牛熊周期检验模型适应性
### 2. 交易信号规则(基于多因子得分的阈值策略)
根据《ssrn.3255748.pdf》中“因子值与收益正相关”的结论设定阈值生成买卖信号
- **买入信号**:当\(Score_t > 0.8\sigma_{Score}\)\(\sigma_{Score}\)为Score的滚动30期标准差且前1期无持仓时买入BTC满仓
- **卖出信号**:当\(Score_t < -0.8\sigma_{Score}\)且前1期有持仓时卖出BTC空仓
- **观望信号**\(Score_t\)\([-0.8\sigma_{Score}, 0.8\sigma_{Score}]\)之间维持原有持仓避免频繁交易)。
### 3. 回测指标与评估(参考论文中的资产定价检验指标)
采用ssrn.3255748.pdf平均收益夏普比率最大回撤等核心指标同时加入高频数据特有的胜率盈亏比”,具体如下
| 回测指标 | 计算方式 | 合格标准BTC 4h策略 |
|----------|----------|--------------------------|
| 年化收益率 | \((1+\text{累计收益})^{252×6/24} - 1\)假设年交易252天每天6个4h周期 | 30%跑赢BTC现货年化收益 |
| 夏普比率 | 年化收益率 / 年化波动率 | 1.5风险调整收益优秀 |
| 最大回撤 | 回测期间最大亏损幅度 | 50%控制极端风险 |
| 胜率 | 盈利交易次数 / 总交易次数 | 55%信号准确性高 |
**示例结果**若回测得到年化收益45%、夏普比率1.8最大回撤42%、胜率58%说明模型有效——类似ssrn.3255748.pdfTS因子模型解释力达标的实证结论()。
## 六、信号优化与上线:动态适应市场变化
### 1. 因子载荷动态调整(参考“时变载荷”的改进思路)
虽然ssrn.3255748.pdf中TS模型默认常数载荷”,但中山大学研究指出因子载荷时变能提升预测准确性”(摘要15因此可引入滚动窗口调整权重”:
- 每30天180个4h周期重新估计因子权重如风险平价权重的波动率用最新30期数据避免因子失效如BTC在牛熊周期中波动率因子的重要性会变化
- 若某因子连续60个4h周期10天\(H-L\)收益t统计量1.0暂时剔除该因子待其恢复显著性后重新加入——贴合ssrn.3255748.pdf因子边际信息动态检验的逻辑()。
### 2. 实盘上线与监控
- **信号输出**每4h生成Score_t及对应买卖信号通过API对接交易所如Binance Spot API实现自动交易
- **风险监控**实时监控因子有效性指标”(如当前因子的\(H-L\)收益t统计量模型\(R^2\)若指标连续3天不达标如t统计量1.2暂停自动交易人工排查原因如市场结构变化导致因子失效
- **日志记录**保存每4h的因子值信号交易结果每月进行回测复盘对比实盘与回测的差异优化因子参数如调整EMA周期阈值系数)。
## 七、关键论文引用与理论支撑总结
1. ssrn.3255748.pdf》(Fama & French 2018核心支撑TS因子预设规则可投资常数斜率回归检验的逻辑指导因子构建检验回测的整体框架(、、);
2. 中山大学高维高频金融数据的因子建模》(摘要15提供P-PCA合成因子高频噪声处理时变载荷调整的方法解决BTC 4h高频数据的因子估计问题
3. 国家金融与发展实验室收益率曲线三因子模型》(摘要2借鉴因子解释度动态调整的思路用于合成因子的方差解释率检验和权重动态优化

View File

@@ -1,180 +0,0 @@
"""
回测模块
"""
import numpy as np
import pandas as pd
from typing import Dict, Optional, Tuple
class BacktestEngine:
"""回测引擎"""
def __init__(
self,
commission: float = 0.001, # 手续费率
slippage: float = 0.0005, # 滑点
initial_capital: float = 10000.0
):
self.commission = commission
self.slippage = slippage
self.initial_capital = initial_capital
def run(
self,
signals: pd.Series,
price: pd.Series,
score: Optional[pd.Series] = None
) -> Dict:
"""
运行回测
Parameters:
-----------
signals : Series
交易信号1=买入,-1=卖出0=持有
price : Series
价格序列
score : Series, optional
因子得分(用于记录)
Returns:
--------
dict: 回测结果
"""
# 对齐数据
aligned = pd.concat([signals, price], axis=1).dropna()
aligned.columns = ['signal', 'price']
if score is not None:
aligned = pd.concat([aligned, score], axis=1)
aligned.columns = ['signal', 'price', 'score']
# 向量化优化:先计算价格变化率
price_pct = aligned['price'].pct_change().fillna(0)
# 初始化
capital = self.initial_capital
position = 0 # 持仓0=空仓1=满仓
equity = np.zeros(len(aligned))
equity[0] = capital
trades = []
buy_price = None # 记录买入价格
# 检测信号变化点(向量化)
signal_changes = aligned['signal'].diff().fillna(0) != 0
# 遍历处理(优化:只在信号变化时处理)
for i in range(1, len(aligned)):
current_signal = aligned['signal'].iloc[i]
current_price = aligned['price'].iloc[i]
prev_signal = aligned['signal'].iloc[i-1]
# 计算收益率(基于价格变化)
if position == 1:
period_return = price_pct.iloc[i]
else:
period_return = 0
# 交易逻辑(只在信号变化时处理)
if signal_changes.iloc[i]:
if current_signal == 1 and position == 0: # 买入
# 扣除手续费和滑点
cost = self.commission + self.slippage
capital *= (1 - cost)
position = 1
buy_price = current_price
trades.append({
'date': aligned.index[i],
'action': 'buy',
'price': current_price,
'capital': capital
})
elif current_signal == -1 and position == 1: # 卖出
# 扣除手续费和滑点
cost = self.commission + self.slippage
capital *= (1 - cost)
position = 0
buy_price = None
trades.append({
'date': aligned.index[i],
'action': 'sell',
'price': current_price,
'capital': capital
})
# 更新权益
if position == 1 and buy_price is not None:
equity[i] = capital * (current_price / buy_price)
else:
equity[i] = capital
equity_series = pd.Series(equity, index=aligned.index)
returns_series = price_pct * (aligned['signal'].shift(1) == 1).astype(int)
# 计算回测指标
metrics = self._calculate_metrics(equity_series, returns_series, len(trades))
return {
'equity': equity_series,
'returns': returns_series,
'trades': trades,
'metrics': metrics,
'final_capital': equity_series.iloc[-1] if len(equity_series) > 0 else self.initial_capital
}
def _calculate_metrics(
self,
equity: pd.Series,
returns: pd.Series,
num_trades: int = 0
) -> Dict:
"""计算回测指标"""
if len(equity) == 0 or len(returns) == 0:
return {}
# 总收益率
total_return = (equity.iloc[-1] / equity.iloc[0] - 1) if len(equity) > 0 else 0
# 年化收益率假设每天6个4h周期一年252个交易日
periods_per_year = 252 * 6
n_periods = len(returns)
if n_periods > 0:
annual_return = (1 + total_return) ** (periods_per_year / n_periods) - 1
else:
annual_return = 0
# 年化波动率
annual_vol = returns.std() * np.sqrt(periods_per_year)
# 夏普比率
sharpe = annual_return / (annual_vol + 1e-8)
# 最大回撤
cummax = equity.cummax()
drawdown = (equity - cummax) / cummax
max_drawdown = drawdown.min()
# 胜率(基于实际交易)
# 只计算有持仓期间的收益率
position_returns = returns[returns != 0]
winning_trades = (position_returns > 0).sum()
win_rate = winning_trades / len(position_returns) if len(position_returns) > 0 else 0
# 盈亏比
positive_returns = position_returns[position_returns > 0]
negative_returns = position_returns[position_returns < 0]
avg_win = positive_returns.mean() if len(positive_returns) > 0 else 0
avg_loss = abs(negative_returns.mean()) if len(negative_returns) > 0 else 0
profit_loss_ratio = avg_win / (avg_loss + 1e-8)
return {
'total_return': total_return,
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'profit_loss_ratio': profit_loss_ratio,
'total_trades': num_trades # 实际交易次数
}

View File

@@ -1,436 +0,0 @@
import argparse
import math
import operator
import random
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from deap import algorithms, base, creator, gp, tools
# ------------------------------
# Data & Config
# ------------------------------
@dataclass
class EvolutionConfig:
population_size: int = 200
generations: int = 30
tournament_size: int = 5
crossover_prob: float = 0.9
mutation_prob: float = 0.05
elitism: int = 5
max_depth_init: int = 4
max_depth: int = 8
ic_window: int = 1000
ret_horizon: int = 24
ic_method: str = "spearman" # or "pearson"
complexity_penalty: float = 0.001
seed: Optional[int] = 42
# ------------------------------
# Safe operators for GP
# ------------------------------
def _safe_div(left: np.ndarray, right: np.ndarray) -> np.ndarray:
denom = np.where(np.abs(right) < 1e-12, np.nan, right)
return left / denom
def _safe_log(x: np.ndarray) -> np.ndarray:
return np.log(np.clip(np.abs(x), 1e-12, None))
def _safe_sqrt(x: np.ndarray) -> np.ndarray:
return np.sqrt(np.clip(x, 0.0, None))
def _safe_pow(x: np.ndarray, y: np.ndarray) -> np.ndarray:
# Limit exponent to avoid overflow
y_clip = np.clip(y, -3.0, 3.0)
with np.errstate(over="ignore", invalid="ignore"):
out = np.power(np.clip(x, -1e6, 1e6), y_clip)
out[~np.isfinite(out)] = np.nan
return out
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy()
def _rolling_std(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy()
def _ts_delta(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.diff(period).to_numpy()
def _ts_rank(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).apply(
lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False
).to_numpy()
def _delay(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.shift(period).to_numpy()
# ------------------------------
# Primitive set
# ------------------------------
def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped:
# Each feature is a numpy array of floats; GP outputs numpy array
pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray)
# Name the arguments for readability
for i, name in enumerate(feature_names):
pset.renameArguments(**{f"ARG{i}": name})
# Binary arithmetic
pset.addPrimitive(lambda x, y: x + y, [np.ndarray, np.ndarray], np.ndarray, name="add")
pset.addPrimitive(lambda x, y: x - y, [np.ndarray, np.ndarray], np.ndarray, name="sub")
pset.addPrimitive(lambda x, y: x * y, [np.ndarray, np.ndarray], np.ndarray, name="mul")
pset.addPrimitive(_safe_div, [np.ndarray, np.ndarray], np.ndarray, name="div")
# Unary transforms
pset.addPrimitive(np.negative, [np.ndarray], np.ndarray, name="neg")
pset.addPrimitive(np.abs, [np.ndarray], np.ndarray, name="abs")
pset.addPrimitive(_safe_log, [np.ndarray], np.ndarray, name="log")
pset.addPrimitive(_safe_sqrt, [np.ndarray], np.ndarray, name="sqrt")
# Power
pset.addPrimitive(_safe_pow, [np.ndarray, np.ndarray], np.ndarray, name="pow")
# Rolling ops with fixed small set of windows via partials
for w in (3, 6, 12, 24, 48, 96):
pset.addPrimitive(lambda x, w=w: _rolling_mean(x, w), [np.ndarray], np.ndarray, name=f"sma{w}")
pset.addPrimitive(lambda x, w=w: _rolling_std(x, w), [np.ndarray], np.ndarray, name=f"std{w}")
pset.addPrimitive(lambda x, w=w: _ts_rank(x, w), [np.ndarray], np.ndarray, name=f"rank{w}")
pset.addPrimitive(lambda x, w=w: _ts_delta(x, w), [np.ndarray], np.ndarray, name=f"delta{w}")
pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}")
# Ephemeral constants: scalar to array via broadcasting
# 随机加一个常数 不一定合理
def _const() -> np.ndarray:
return np.array(random.uniform(-2.0, 2.0))
pset.addEphemeralConstant("const", _const, np.ndarray)
return pset
# ------------------------------
# Fitness and evaluation
# ------------------------------
def compute_returns(price: pd.Series, horizon: int) -> pd.Series:
return price.pct_change(horizon).shift(-horizon)
def rank_ic(a: pd.Series, b: pd.Series, method: str = "spearman") -> float:
mask = a.notna() & b.notna()
if mask.sum() < 10:
return np.nan
x = a[mask]
y = b[mask]
if method == "spearman":
return x.rank(pct=True).corr(y.rank(pct=True))
return x.corr(y)
def series_zscore(x: pd.Series) -> pd.Series:
return (x - x.mean()) / (x.std(ddof=0) + 1e-12)
def evaluate_individual(
individual,
toolbox: base.Toolbox,
features: List[pd.Series],
target: pd.Series,
config: EvolutionConfig,
) -> Tuple[float]:
func = toolbox.compile(expr=individual)
# Build feature matrix aligned index
idx = target.index
inputs = [f.reindex(idx).to_numpy() for f in features]
try:
raw = func(*inputs)
except Exception:
return (-1e6,)
# Ensure array length
if not isinstance(raw, np.ndarray):
return (-1e6,)
if raw.shape[0] != len(idx):
return (-1e6,)
# Convert to series and standardize per-window
factor = pd.Series(raw, index=idx)
factor = factor.replace([np.inf, -np.inf], np.nan)
factor = factor.ffill().bfill()
# Rolling IC over window segments
window = config.ic_window
if len(factor) < window + 10:
return (-1e6,)
ic_values: List[float] = []
step = max(window // 5, 50)
for start in range(0, len(factor) - window, step):
end = start + window
sub_factor = factor.iloc[start:end]
sub_target = target.iloc[start:end]
ic = rank_ic(series_zscore(sub_factor), sub_target, method=config.ic_method)
if np.isfinite(ic):
ic_values.append(ic)
if not ic_values:
return (-1e6,)
mean_ic = float(np.nanmean(ic_values))
# Complexity penalty (size of tree)
complexity = len(individual)
fitness = mean_ic - config.complexity_penalty * complexity
if not np.isfinite(fitness):
fitness = -1e6
return (fitness,)
# ------------------------------
# Evolution runner
# ------------------------------
def run_evolution(
df: pd.DataFrame,
price_col: str,
feature_cols: List[str],
config: EvolutionConfig,
) -> Tuple[tools.HallOfFame, base.Toolbox, gp.PrimitiveSetTyped, List[pd.Series]]:
if config.seed is not None:
random.seed(config.seed)
np.random.seed(config.seed)
price = df[price_col].astype(float)
forward_ret = compute_returns(price, config.ret_horizon)
target = forward_ret
features = [df[c].astype(float) for c in feature_cols]
pset = build_pset(feature_cols)
# Fitness: maximize IC (single objective)
if not hasattr(creator, "FitnessMax"):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("expr",
gp.genHalfAndHalf,
pset=pset,
min_=1,
max_=config.max_depth_init)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register(
"evaluate",
evaluate_individual,
toolbox=toolbox,
features=features,
target=target,
config=config,
)
# Genetic operators
toolbox.register("select", tools.selTournament, tournsize=config.tournament_size)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
# bloat control
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth))
pop = toolbox.population(n=config.population_size)
hof = tools.HallOfFame(maxsize=max(5, config.elitism))
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.nanmean)
mstats.register("std", np.nanstd)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
pop, logbook = algorithms.eaSimple(
pop,
toolbox,
cxpb=config.crossover_prob,
mutpb=config.mutation_prob,
ngen=config.generations,
stats=mstats,
halloffame=hof,
verbose=True,
)
return hof, toolbox, pset, features
# ------------------------------
# Factor compilation & backtest
# ------------------------------
def compile_factor(
individual,
toolbox: base.Toolbox,
index: pd.Index,
features: List[pd.Series],
) -> pd.Series:
func = toolbox.compile(expr=individual)
inputs = [f.reindex(index).to_numpy() for f in features]
raw = func(*inputs)
s = pd.Series(raw, index=index)
s = s.replace([np.inf, -np.inf], np.nan).ffill().bfill()
return s
def simple_long_short_backtest(
factor: pd.Series,
price: pd.Series,
ret_horizon: int,
top_quantile: float = 0.2,
bottom_quantile: float = 0.2,
) -> pd.Series:
f = factor.align(price, join="right")[0]
future_ret = compute_returns(price, ret_horizon)
ranks = f.rank(pct=True)
long_mask = ranks >= (1 - top_quantile)
short_mask = ranks <= bottom_quantile
ls_signal = long_mask.astype(float) - short_mask.astype(float)
ls_signal = ls_signal.shift(1) # trade on next bar
pnl = ls_signal * future_ret
pnl = pnl.replace([np.inf, -np.inf], np.nan).fillna(0.0)
equity = (1.0 + pnl).cumprod()
return equity
# ------------------------------
# CLI
# ------------------------------
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="DEAP-based factor mining (genetic programming)")
p.add_argument("--data", type=str, default="ETH_USDT-1h.feather", help="Input feather/csv file")
p.add_argument("--price_col", type=str, default="close", help="Price column name")
p.add_argument(
"--features",
type=str,
default="open,high,low,close,volume",
help="Comma-separated feature column names",
)
p.add_argument("--ret_horizon", type=int, default=24)
p.add_argument("--population", type=int, default=200)
p.add_argument("--generations", type=int, default=30)
p.add_argument("--ic_window", type=int, default=1000)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--ic_method", type=str, default="spearman", choices=["spearman", "pearson"])
p.add_argument("--complexity_penalty", type=float, default=0.001)
p.add_argument("--save_best", type=str, default="best_factors.txt")
return p.parse_args()
def load_dataframe(path: str) -> pd.DataFrame:
if path.endswith(".feather"):
df = pd.read_feather(path)
elif path.endswith(".csv"):
df = pd.read_csv(path)
else:
raise ValueError("Unsupported file format. Use .feather or .csv")
# Try to parse datetime index if present
for col in ["datetime", "time", "timestamp", "date"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col])
df = df.set_index(col).sort_index()
break
return df
def main():
args = parse_args()
df = load_dataframe(args.data)
df = df.head(1000)
feature_cols = [c.strip() for c in args.features.split(",") if c.strip()]
for c in [args.price_col] + feature_cols:
if c not in df.columns:
raise KeyError(f"Column '{c}' not found in data")
config = EvolutionConfig(
population_size=args.population,
generations=args.generations,
ic_window=args.ic_window,
ret_horizon=args.ret_horizon,
ic_method=args.ic_method,
complexity_penalty=args.complexity_penalty,
seed=args.seed,
)
hof, toolbox, pset, features = run_evolution(df, args.price_col, feature_cols, config)
price = df[args.price_col].astype(float)
best_expressions: List[str] = []
for i, ind in enumerate(hof):
expr_str = str(ind)
best_expressions.append(expr_str)
# Save best expressions
with open(args.save_best, "w", encoding="utf-8") as f:
for expr in best_expressions:
f.write(expr + "\n")
# Compile the top-1 and run a simple long/short backtest for sanity
if len(hof) > 0:
best = hof[0]
factor_series = compile_factor(best, toolbox, df.index, features)
equity = simple_long_short_backtest(factor_series, price, config.ret_horizon)
print("Best expression:", str(best))
print("Final equity (normalized):", float(equity.iloc[-1]))
# Also export factor and equity
out = pd.DataFrame({
"factor": factor_series,
"equity": equity,
})
out.to_csv("deap_factor_output.csv")
print("Saved best expressions to", args.save_best)
print("Saved factor/equity to deap_factor_output.csv")
if __name__ == "__main__":
main()

View File

@@ -1,110 +0,0 @@
"""
使用示例:时间序列因子挖掘流程
"""
from pipeline import FactorPipeline
from factors import FactorMiner, create_default_factors
# 方式1使用默认流程最简单
def example_simple():
"""简单示例"""
pipeline = FactorPipeline(
ret_horizon=1, # 未来1期收益率
ic_window=30, # IC计算窗口
commission=0.001, # 手续费0.1%
slippage=0.0005 # 滑点0.05%
)
# 运行完整流程
results = pipeline.run_full_pipeline(
file_path="ETH_USDT-1h.feather",
min_ic=0.01, # 最小IC阈值
min_tstat=1.5, # 最小t统计量
weight_method='risk_parity', # 权重方法risk_parity, regression, equal
buy_threshold=0.8, # 买入阈值(标准差倍数)
sell_threshold=-0.8 # 卖出阈值(标准差倍数)
)
return results
# 方式2分步骤执行更灵活
def example_step_by_step():
"""分步骤示例"""
pipeline = FactorPipeline(ret_horizon=1, ic_window=30)
# 步骤1加载和预处理数据
pipeline.load_and_preprocess("ETH_USDT-1h.feather")
# 步骤2因子挖掘可以使用自定义因子
custom_miner = create_default_factors()
# 可以在这里添加自定义因子
# custom_miner.register_rule_factor('CUSTOM', your_custom_function)
pipeline.mine_factors(custom_miner)
# 步骤3因子检验
pipeline.validate_factors(min_ic=0.01, min_tstat=1.5)
# 步骤4因子组合
pipeline.combine_factors(weight_method='risk_parity')
# 步骤5生成信号
signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8)
# 步骤6回测
backtest_results = pipeline.backtest(signals)
return {
'factors': pipeline.factors,
'score': pipeline.score,
'signals': signals,
'backtest': backtest_results
}
# 方式3自定义因子
def example_custom_factors():
"""自定义因子示例"""
from factors import RuleFactor
import pandas as pd
import numpy as np
# 定义自定义因子函数
def my_custom_factor(data: pd.DataFrame) -> pd.Series:
"""自定义因子:价格与均线的距离"""
return (data['close'] - data['ema8']) / data['ema8']
# 创建因子挖掘器
miner = create_default_factors()
# 注册自定义因子
miner.register_rule_factor('CUSTOM_DISTANCE', my_custom_factor)
# 使用自定义因子挖掘器
pipeline = FactorPipeline()
pipeline.load_and_preprocess("ETH_USDT-1h.feather")
pipeline.mine_factors(custom_miner=miner)
pipeline.validate_factors()
pipeline.combine_factors()
pipeline.backtest()
return pipeline
if __name__ == "__main__":
# 运行简单示例
print("运行简单示例...")
results = example_simple()
# 保存结果
if results['factors'] is not None:
results['factors'].to_csv("factors_output.csv")
print("\n因子数据已保存到 factors_output.csv")
if results['score'] is not None:
results['score'].to_csv("score_output.csv")
print("综合得分已保存到 score_output.csv")
if results['backtest'] is not None and 'equity' in results['backtest']:
results['backtest']['equity'].to_csv("equity_curve.csv")
print("权益曲线已保存到 equity_curve.csv")

View File

@@ -0,0 +1,102 @@
import numpy as np
import pandas as pd
from typing import Dict, Callable, List, Optional, Any
from abc import ABC, abstractmethod
import inspect
import talib
from factor_mining.time_series_op import register_time_series_operator
from factor_mining.operators import _registry
# ==================== 因子公式解析与计算 ====================
class FactorFormula:
"""因子公式:支持序列化和反序列化"""
def __init__(self, expression: str, feature_names: List[str]):
"""
Parameters:
-----------
expression : str
因子表达式(使用算子名称)
feature_names : List[str]
特征名称列表
"""
self.expression = expression
self.feature_names = feature_names
def compute(self, features: Dict[str, np.ndarray]) -> np.ndarray:
"""
计算因子值
Parameters:
-----------
features : Dict[str, np.ndarray]
特征字典key为特征名称
Returns:
--------
np.ndarray: 因子值
"""
# 构建计算环境
env = {}
# 添加特征
for name in self.feature_names:
if name not in features:
raise KeyError(f"特征 '{name}' 不存在")
env[name] = features[name]
# 添加算子
for op_name in _registry.list_all():
op = _registry.get(op_name)
if op:
env[op_name] = op.func
# 添加numpy和pandas用于某些表达式
env["np"] = np
env["pd"] = pd
# 执行表达式
try:
# 限制可用的内置函数
safe_builtins = {
"abs": abs,
"min": min,
"max": max,
"sum": sum,
"len": len,
}
result = eval(self.expression, {"__builtins__": safe_builtins}, env)
# 确保结果是numpy数组
if not isinstance(result, np.ndarray):
if isinstance(result, (int, float)):
# 标量转换为数组(广播)
result = np.full(len(features[self.feature_names[0]]), result)
else:
result = np.array(result)
# 确保长度一致
expected_len = len(features[self.feature_names[0]])
if len(result) != expected_len:
raise ValueError(
f"表达式结果长度 {len(result)} 与特征长度 {expected_len} 不匹配"
)
return result
except Exception as e:
raise RuntimeError(f"计算因子表达式失败: {e}\n表达式: {self.expression}")
def to_dict(self) -> Dict:
"""序列化为字典"""
return {"expression": self.expression, "feature_names": self.feature_names}
@classmethod
def from_dict(cls, data: Dict) -> "FactorFormula":
"""从字典反序列化"""
return cls(data["expression"], data["feature_names"])
def __repr__(self):
return f"FactorFormula(expression='{self.expression}', features={self.feature_names})"

243
factor_mining/gp_miner.py Normal file
View File

@@ -0,0 +1,243 @@
"""
DEAP遗传编程挖掘器实现
"""
import random
import operator
from typing import List, Tuple, Optional
from dataclasses import dataclass
import numpy as np
import pandas as pd
from deap import algorithms, base, creator, gp, tools
from factor_mining.operators import FactorFormula, get_registry, get_operator
from factor_mining.mining import FactorMiner, MiningConfig
from data import compute_forward_returns
@dataclass
class GPConfig(MiningConfig):
"""GP挖掘配置"""
population_size: int = 200
generations: int = 30
tournament_size: int = 5
crossover_prob: float = 0.9
mutation_prob: float = 0.05
elitism: int = 5
max_depth_init: int = 1
max_depth: int = 8
complexity_penalty: float = 0.001
class GPMiner(FactorMiner):
"""DEAP遗传编程挖掘器"""
def __init__(self, config: GPConfig):
super().__init__(config)
self.config: GPConfig = config
self.toolbox: Optional[base.Toolbox] = None
self.pset: Optional[gp.PrimitiveSetTyped] = None
self.features: Optional[List[pd.Series]] = None
def get_name(self) -> str:
return "gp"
def _build_pset(self, feature_names: List[str]) -> gp.PrimitiveSetTyped:
"""构建GP原始集合"""
registry = get_registry()
pset = gp.PrimitiveSetTyped(
"MAIN", [np.ndarray for _ in feature_names], np.ndarray
)
# 命名参数
for i, name in enumerate(feature_names):
pset.renameArguments(**{f"ARG{i}": name})
# 添加算子
for op_name in registry.list_all():
op = registry.get(op_name)
if op:
sig = op.get_signature()
params = list(sig.parameters.values())
# 根据参数数量判断是一元还是二元算子
if len(params) == 1:
# 一元算子
pset.addPrimitive(op.func, [np.ndarray], np.ndarray, name=op_name)
elif len(params) == 2:
# 二元算子
pset.addPrimitive(
op.func, [np.ndarray, np.ndarray], np.ndarray, name=op_name
)
# 添加常量
# def _const() -> np.ndarray:
# return np.array(random.uniform(-2.0, 2.0))
# pset.addEphemeralConstant("const", _const, np.ndarray)
return pset
def _evaluate_individual(self, individual, target: pd.Series) -> Tuple[float]:
"""评估个体适应度"""
func = self.toolbox.compile(expr=individual)
# 构建特征矩阵
idx = target.index
inputs = [f.reindex(idx).to_numpy() for f in self.features]
try:
raw = func(*inputs)
except Exception:
return (-1e6,)
# 确保数组长度
if not isinstance(raw, np.ndarray):
return (-1e6,)
if raw.shape[0] != len(idx):
return (-1e6,)
# 转换为Series并清理
factor = pd.Series(raw, index=idx)
factor = factor.replace([np.inf, -np.inf], np.nan)
factor = factor.ffill().bfill()
# 计算滚动IC
window = self.config.ic_window
if len(factor) < window + 10:
return (-1e6,)
from validation import compute_rolling_ic
ic_series = compute_rolling_ic(
factor, target, window=window, method=self.config.ic_method
)
mean_ic = ic_series.mean()
if not np.isfinite(mean_ic):
return (-1e6,)
# 复杂度惩罚
complexity = len(individual)
fitness = mean_ic - self.config.complexity_penalty * complexity
if not np.isfinite(fitness):
fitness = -1e6
return (fitness,)
def _individual_to_formula(
self, individual, feature_names: List[str]
) -> FactorFormula:
"""将GP个体转换为因子公式"""
# GP表达式是PrimitiveTree转换为字符串后是函数调用形式
# 例如: "add(ARG0, ARG1)" 或 "mul(add(ARG0, ARG1), const)"
expr_str = str(individual)
# 替换ARG0, ARG1等为实际特征名
for i, name in enumerate(feature_names):
expr_str = expr_str.replace(f"ARG{i}", name)
# GP表达式已经是Python可执行的函数调用格式
# 例如: "add(close, open)" 可以直接eval
# 但需要确保所有算子都在环境中可用
return FactorFormula(expr_str, feature_names)
def mine(
self, data: pd.DataFrame, feature_cols: List[str], price_col: str = "close"
) -> List[FactorFormula]:
"""执行GP挖掘"""
if self.config.seed is not None:
random.seed(self.config.seed)
np.random.seed(self.config.seed)
# 准备数据
price = data[price_col].astype(float)
forward_ret = compute_forward_returns(price, self.config.ret_horizon)
target = forward_ret
self.features = [data[c].astype(float) for c in feature_cols]
# 构建原始集合
self.pset = self._build_pset(feature_cols)
# 创建DEAP类型
if not hasattr(creator, "FitnessMax"):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
# 构建工具箱
self.toolbox = base.Toolbox()
self.toolbox.register(
"expr",
gp.genHalfAndHalf,
pset=self.pset,
min_=1,
max_=self.config.max_depth_init,
)
self.toolbox.register(
"individual", tools.initIterate, creator.Individual, self.toolbox.expr
)
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
self.toolbox.register("compile", gp.compile, pset=self.pset)
self.toolbox.register("evaluate", self._evaluate_individual, target=target)
# 遗传算子
self.toolbox.register(
"select", tools.selTournament, tournsize=self.config.tournament_size
)
self.toolbox.register("mate", gp.cxOnePoint)
self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
self.toolbox.register(
"mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset
)
# 控制树深度
self.toolbox.decorate(
"mate",
gp.staticLimit(
key=operator.attrgetter("height"), max_value=self.config.max_depth
),
)
self.toolbox.decorate(
"mutate",
gp.staticLimit(
key=operator.attrgetter("height"), max_value=self.config.max_depth
),
)
# 运行进化
pop = self.toolbox.population(n=self.config.population_size)
hof = tools.HallOfFame(maxsize=max(5000, self.config.elitism))
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.nanmean)
mstats.register("std", np.nanstd)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
pop, logbook = algorithms.eaSimple(
pop,
self.toolbox,
cxpb=self.config.crossover_prob,
mutpb=self.config.mutation_prob,
ngen=self.config.generations,
stats=mstats,
halloffame=hof,
verbose=True,
)
# 转换为因子公式
formulas = []
for individual in hof:
formula = self._individual_to_formula(individual, feature_cols)
formulas.append(formula)
return formulas

123
factor_mining/mining.py Normal file
View File

@@ -0,0 +1,123 @@
"""
因子挖掘抽象层支持多种挖掘算法DEAP、DL、RL等
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
import pandas as pd
from dataclasses import dataclass
from factor_mining.FactorFormula import FactorFormula
@dataclass
class MiningConfig:
"""挖掘配置基类"""
ret_horizon: int = 1
ic_window: int = 30
ic_method: str = "spearman" # "spearman" or "pearson"
seed: Optional[int] = None
class FactorMiner(ABC):
"""因子挖掘器抽象基类"""
def __init__(self, config: MiningConfig):
self.config = config
@abstractmethod
def mine(
self,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = "close"
) -> List[FactorFormula]:
"""
挖掘因子
Parameters:
-----------
data : DataFrame
数据
feature_cols : List[str]
特征列名列表
price_col : str
价格列名
Returns:
--------
List[FactorFormula]: 挖掘出的因子公式列表
"""
pass
@abstractmethod
def get_name(self) -> str:
"""获取挖掘器名称"""
pass
class MiningPipeline:
"""挖掘流程管理器"""
def __init__(self):
self.miners: Dict[str, FactorMiner] = {}
def register_miner(self, miner: FactorMiner):
"""注册挖掘器"""
name = miner.get_name()
if name in self.miners:
raise ValueError(f"挖掘器 '{name}' 已存在")
self.miners[name] = miner
def get_miner(self, name: str) -> Optional[FactorMiner]:
"""获取挖掘器"""
return self.miners.get(name)
def list_miners(self) -> List[str]:
"""列出所有挖掘器"""
return list(self.miners.keys())
def mine(
self,
miner_name: str,
data: pd.DataFrame,
feature_cols: List[str],
price_col: str = "close"
) -> List[FactorFormula]:
"""
使用指定挖掘器进行挖掘
Parameters:
-----------
miner_name : str
挖掘器名称
data : DataFrame
数据
feature_cols : List[str]
特征列名列表
price_col : str
价格列名
Returns:
--------
List[FactorFormula]: 挖掘出的因子公式列表
"""
miner = self.get_miner(miner_name)
if miner is None:
raise ValueError(f"挖掘器 '{miner_name}' 不存在")
return miner.mine(data, feature_cols, price_col)
# 全局挖掘流程管理器
_pipeline = MiningPipeline()
def register_miner(miner: FactorMiner):
"""注册挖掘器到全局管理器"""
_pipeline.register_miner(miner)
def get_pipeline() -> MiningPipeline:
"""获取全局挖掘流程管理器"""
return _pipeline

159
factor_mining/operators.py Normal file
View File

@@ -0,0 +1,159 @@
"""
算子系统:基础数学算子和技术指标算子的注册与管理
支持算子的注册、查询、反射调用
"""
import numpy as np
import pandas as pd
from typing import Dict, Callable, List, Optional, Any
from abc import ABC, abstractmethod
import inspect
import talib
from factor_mining.time_series_op import register_time_series_operator
class Operator(ABC):
"""算子基类"""
def __init__(self, name: str, func: Callable, description: str = ""):
"""
Parameters:
-----------
name : str
算子名称(唯一标识)
func : Callable
算子函数
description : str
算子描述
"""
self.name = name
self.func = func
self.description = description
self._signature = inspect.signature(func)
def __call__(self, *args, **kwargs):
"""调用算子函数"""
return self.func(*args, **kwargs)
def get_signature(self):
"""获取函数签名"""
return self._signature
def __repr__(self):
return f"Operator(name='{self.name}', description='{self.description}')"
class OperatorRegistry:
"""算子注册表"""
def __init__(self):
self._operators: Dict[str, Operator] = {}
def register(self, operator: Operator):
"""注册算子"""
if operator.name in self._operators:
raise ValueError(f"算子 '{operator.name}' 已存在")
self._operators[operator.name] = operator
def register_function(self, name: str, func: Callable, description: str = ""):
"""直接注册函数为算子"""
operator = Operator(name, func, description)
self.register(operator)
def get(self, name: str) -> Optional[Operator]:
"""获取算子"""
return self._operators.get(name)
def has(self, name: str) -> bool:
"""检查算子是否存在"""
return name in self._operators
def list_all(self) -> List[str]:
"""列出所有算子名称"""
return list(self._operators.keys())
def get_all(self) -> Dict[str, Operator]:
"""获取所有算子"""
return self._operators.copy()
# 全局算子注册表
_registry = OperatorRegistry()
def register_operator(name: str, description: str = ""):
"""装饰器:注册算子"""
def decorator(func: Callable):
_registry.register_function(name, func, description)
return func
return decorator
def get_operator(name: str) -> Optional[Operator]:
"""获取算子"""
return _registry.get(name)
def get_registry() -> OperatorRegistry:
"""获取全局注册表"""
return _registry
# ==================== 基础数学算子 ====================
@register_operator("add", "加法: x + y")
def _add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x + y
@register_operator("sub", "减法: x - y")
def _sub(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x - y
@register_operator("mul", "乘法: x * y")
def _mul(x: np.ndarray, y: np.ndarray) -> np.ndarray:
return x * y
@register_operator("div", "除法: x / y (安全除法)")
def _div(x: np.ndarray, y: np.ndarray) -> np.ndarray:
denom = np.where(np.abs(y) < 1e-12, np.nan, y)
return x / denom
@register_operator("neg", "取负: -x")
def _neg(x: np.ndarray) -> np.ndarray:
return np.negative(x)
@register_operator("abs", "绝对值: |x|")
def _abs(x: np.ndarray) -> np.ndarray:
return np.abs(x)
@register_operator("log", "对数: log(|x|)")
def _log(x: np.ndarray) -> np.ndarray:
return np.log(np.clip(np.abs(x), 1e-12, None))
@register_operator("sqrt", "平方根: sqrt(x)")
def _sqrt(x: np.ndarray) -> np.ndarray:
return np.sqrt(np.clip(x, 0.0, None))
@register_operator("pow", "幂运算: x^y (限制范围)")
def _pow(x: np.ndarray, y: np.ndarray) -> np.ndarray:
y_clip = np.clip(y, -3.0, 3.0)
with np.errstate(over="ignore", invalid="ignore"):
out = np.power(np.clip(x, -1e6, 1e6), y_clip)
out[~np.isfinite(out)] = np.nan
return out
# ==================== 时间序列算子 ====================
register_time_series_operator(_registry)

View File

@@ -0,0 +1,75 @@
import numpy as np
import pandas as pd
# ==================== 时间序列算子 ====================
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy()
def _rolling_std(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy()
def _ts_delta(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.diff(period).to_numpy()
def _ts_rank(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return (
s.rolling(window, min_periods=max(2, window // 2))
.apply(lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False)
.to_numpy()
)
def _delay(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.shift(period).to_numpy()
def _pct_change(x: np.ndarray, period: int = 1) -> np.ndarray:
"""百分比变化"""
s = pd.Series(x)
return s.pct_change(periods=period, fill_method=None).to_numpy()
def register_time_series_operator(registry) -> None:
"""注册算子"""
# 注册时间序列算子(带不同窗口)
for w in range(5, 50, 5):
registry.register_function(
f"sma{w}",
(lambda win: lambda x: _rolling_mean(x, win))(w),
f"简单移动平均: SMA(x, {w})",
)
registry.register_function(
f"std{w}",
(lambda win: lambda x: _rolling_std(x, win))(w),
f"滚动标准差: STD(x, {w})",
)
registry.register_function(
f"rank{w}",
(lambda win: lambda x: _ts_rank(x, win))(w),
f"滚动排名: RANK(x, {w})",
)
registry.register_function(
f"delta{w}",
(lambda win: lambda x: _ts_delta(x, win))(w),
f"差分: DELTA(x, {w})",
)
registry.register_function(
f"delay{w}",
(lambda win: lambda x: _delay(x, win))(w),
f"延迟: DELAY(x, {w})",
)
registry.register_function(
f"pct_change{w}",
(lambda win: lambda x: _pct_change(x, win))(w),
f"百分比变化: PCT_CHANGE(x, {w})",
)

237
factor_mining/validator.py Normal file
View File

@@ -0,0 +1,237 @@
"""
因子有效性检验模块:整合所有检验方案
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from dataclasses import dataclass
from statsmodels.regression.linear_model import OLS
from validation import (
compute_ic,
compute_rolling_ic,
group_backtest,
factor_span_regression
)
@dataclass
class ValidationConfig:
"""验证配置"""
ic_window: int = 30
ic_method: str = "spearman" # "spearman" or "pearson"
n_groups: int = 3
group_period: int = 180
min_ic: float = 0.01
min_tstat: float = 1.5
min_r2_change: float = 0.05
class FactorValidator:
"""因子有效性检验器"""
def __init__(self, config: ValidationConfig):
self.config = config
def validate_ic(
self,
factor: pd.Series,
forward_return: pd.Series
) -> Dict:
"""
IC检验
Returns:
--------
dict: 包含mean_ic, ic_ir, ic_series等
"""
rolling_ic = compute_rolling_ic(
factor,
forward_return,
window=self.config.ic_window,
method=self.config.ic_method
)
mean_ic = rolling_ic.mean()
ic_std = rolling_ic.std()
ic_ir = mean_ic / (ic_std + 1e-8) # IC信息比率
return {
"mean_ic": mean_ic,
"ic_std": ic_std,
"ic_ir": ic_ir,
"ic_series": rolling_ic,
"is_valid": abs(mean_ic) >= self.config.min_ic
}
def validate_group_backtest(
self,
factor: pd.Series,
forward_return: pd.Series
) -> Dict:
"""
分组回测检验
Returns:
--------
dict: 包含mean_h_l_return, mean_h_l_tstat等
"""
result = group_backtest(
factor,
forward_return,
n_groups=self.config.n_groups,
group_period=self.config.group_period
)
is_valid = abs(result.get('mean_h_l_tstat', 0)) >= self.config.min_tstat
return {
**result,
"is_valid": is_valid
}
def validate_regression(
self,
factor: pd.Series,
forward_return: pd.Series,
other_factors: Optional[pd.DataFrame] = None
) -> Dict:
"""
因子跨度回归检验
Parameters:
-----------
factor : Series
待检验因子
forward_return : Series
未来收益率
other_factors : DataFrame, optional
其他因子(用于控制变量)
Returns:
--------
dict: 包含beta, tstat, r2_change等
"""
if other_factors is None:
other_factors = pd.DataFrame()
# 合并因子
factors_df = pd.concat([other_factors, factor.to_frame(name='target')], axis=1)
result = factor_span_regression(
factors_df,
forward_return,
target_factor='target'
)
is_valid = (
abs(result.get('tstat', 0)) >= self.config.min_tstat and
result.get('r2_change', 0) >= self.config.min_r2_change
)
return {
**result,
"is_valid": is_valid
}
def validate_all(
self,
factor: pd.Series,
forward_return: pd.Series,
other_factors: Optional[pd.DataFrame] = None
) -> Dict:
"""
综合检验:执行所有检验方法
Returns:
--------
dict: 包含所有检验结果和综合判断
"""
results = {}
# IC检验
ic_result = self.validate_ic(factor, forward_return)
results['ic'] = ic_result
# 分组回测
group_result = self.validate_group_backtest(factor, forward_return)
results['group_backtest'] = group_result
# 回归检验
reg_result = self.validate_regression(factor, forward_return, other_factors)
results['regression'] = reg_result
# 综合判断
is_valid = (
ic_result['is_valid'] and
group_result['is_valid'] and
reg_result['is_valid']
)
results['is_valid'] = is_valid
results['score'] = self._calculate_score(ic_result, group_result, reg_result)
return results
def _calculate_score(
self,
ic_result: Dict,
group_result: Dict,
reg_result: Dict
) -> float:
"""计算综合得分"""
score = 0.0
# IC得分权重0.3
ic_score = abs(ic_result.get('mean_ic', 0)) * 10
score += ic_score * 0.3
# 分组回测得分权重0.4
tstat = abs(group_result.get('mean_h_l_tstat', 0))
tstat_score = min(tstat / 3.0, 1.0) # 归一化到[0, 1]
score += tstat_score * 0.4
# 回归得分权重0.3
r2_change = reg_result.get('r2_change', 0)
r2_score = min(r2_change / 0.1, 1.0) # 归一化到[0, 1]
score += r2_score * 0.3
return score
def filter_factors(
self,
factors: pd.DataFrame,
forward_return: pd.Series
) -> pd.DataFrame:
"""
批量过滤因子:只保留有效因子
Returns:
--------
DataFrame: 有效因子
"""
valid_factors = []
for col in factors.columns:
factor = factors[col]
result = self.validate_all(factor, forward_return, factors.drop(columns=[col]))
if result['is_valid']:
valid_factors.append(col)
return factors[valid_factors] if valid_factors else pd.DataFrame()
def create_validator(
ic_window: int = 30,
min_ic: float = 0.01,
min_tstat: float = 1.5
) -> FactorValidator:
"""创建验证器(便捷函数)"""
config = ValidationConfig(
ic_window=ic_window,
min_ic=min_ic,
min_tstat=min_tstat
)
return FactorValidator(config)

View File

@@ -1,113 +0,0 @@
"""
因子挖掘模块:支持规则因子和遗传编程因子
"""
import numpy as np
import pandas as pd
from typing import Callable, Dict, List, Optional
from abc import ABC, abstractmethod
class BaseFactor(ABC):
"""因子基类"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算因子值"""
pass
class RuleFactor(BaseFactor):
"""规则因子:基于固定规则"""
def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]):
super().__init__(name)
self.compute_func = compute_func
def compute(self, data: pd.DataFrame) -> pd.Series:
return self.compute_func(data)
def create_trend_factor(data: pd.DataFrame) -> pd.Series:
"""趋势因子:价格趋势方向"""
trend = pd.Series(0, index=data.index)
trend[data['close'] > data['ema16']] = 1
trend[data['close'] < data['ema4']] = -1
return trend
def create_volatility_factor(data: pd.DataFrame) -> pd.Series:
"""波动率因子滚动12期收益率标准差"""
return data['volatility']
def create_volume_price_factor(data: pd.DataFrame) -> pd.Series:
"""量价因子:成交量放大且价格上涨"""
volume_signal = (data['volume'] > data['volume_ma6']).astype(int)
return volume_signal * data['return']
def create_reversal_factor(data: pd.DataFrame) -> pd.Series:
"""反转因子:短期反转效应"""
return -data['return'].shift(1)
def create_momentum_factor(data: pd.DataFrame) -> pd.Series:
"""动量因子基于MACD"""
return data['macd']
def create_rsi_factor(data: pd.DataFrame) -> pd.Series:
"""RSI因子相对强弱指数标准化"""
return (data['rsi'] - 50) / 50 # 归一化到[-1, 1]
class FactorMiner:
"""因子挖掘器"""
def __init__(self):
self.factors: Dict[str, BaseFactor] = {}
def register_factor(self, factor: BaseFactor):
"""注册因子"""
self.factors[factor.name] = factor
def register_rule_factor(self, name: str, compute_func: Callable):
"""注册规则因子"""
factor = RuleFactor(name, compute_func)
self.register_factor(factor)
def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame:
"""计算所有因子"""
factor_df = pd.DataFrame(index=data.index)
for name, factor in self.factors.items():
try:
factor_df[name] = factor.compute(data)
except Exception as e:
print(f"计算因子 {name} 时出错: {e}")
factor_df[name] = np.nan
return factor_df
def get_factor(self, name: str) -> Optional[BaseFactor]:
"""获取指定因子"""
return self.factors.get(name)
def create_default_factors() -> FactorMiner:
"""创建默认因子集合"""
miner = FactorMiner()
# 注册基础因子
miner.register_rule_factor('TREND', create_trend_factor)
miner.register_rule_factor('VOL', create_volatility_factor)
miner.register_rule_factor('VOLP', create_volume_price_factor)
miner.register_rule_factor('REV', create_reversal_factor)
miner.register_rule_factor('MOM', create_momentum_factor)
miner.register_rule_factor('RSI', create_rsi_factor)
return miner

View File

@@ -1,287 +0,0 @@
"""
主流程:时间序列因子挖掘、检验、回测、信号生成
"""
import pandas as pd
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')
from data import load_data, compute_technical_indicators, preprocess_data, compute_forward_returns
from factors import FactorMiner, create_default_factors
from validation import validate_factor, factor_span_regression
from combination import MultiFactorModel
from backtest import BacktestEngine
from signal import generate_signals
class FactorPipeline:
"""因子挖掘流程"""
def __init__(
self,
ret_horizon: int = 1,
ic_window: int = 30,
commission: float = 0.001,
slippage: float = 0.0005
):
"""
Parameters:
-----------
ret_horizon : int
未来收益率周期
ic_window : int
IC计算窗口
commission : float
手续费率
slippage : float
滑点
"""
self.ret_horizon = ret_horizon
self.ic_window = ic_window
self.commission = commission
self.slippage = slippage
self.data: Optional[pd.DataFrame] = None
self.factors: Optional[pd.DataFrame] = None
self.forward_return: Optional[pd.Series] = None
self.factor_miner: Optional[FactorMiner] = None
self.validation_results: Dict = {}
self.model: Optional[MultiFactorModel] = None
self.score: Optional[pd.Series] = None
self.backtest_results: Optional[Dict] = None
def load_and_preprocess(self, file_path: str) -> 'FactorPipeline':
"""步骤1加载和预处理数据"""
print("=" * 50)
print("步骤1加载和预处理数据")
print("=" * 50)
# 加载数据
self.data = load_data(file_path)
print(f"加载数据: {len(self.data)} 条记录")
# 计算技术指标
self.data = compute_technical_indicators(self.data)
print("计算技术指标完成")
# 预处理
self.data = preprocess_data(self.data)
print("数据预处理完成")
# 计算未来收益率
self.forward_return = compute_forward_returns(
self.data['close'],
horizon=self.ret_horizon
)
print(f"计算未来收益率完成(周期={self.ret_horizon}")
return self
def mine_factors(self, custom_miner: Optional[FactorMiner] = None) -> 'FactorPipeline':
"""步骤2因子挖掘"""
print("\n" + "=" * 50)
print("步骤2因子挖掘")
print("=" * 50)
if self.data is None:
raise ValueError("请先加载数据")
# 使用自定义或默认因子挖掘器
if custom_miner is None:
self.factor_miner = create_default_factors()
else:
self.factor_miner = custom_miner
# 计算所有因子
self.factors = self.factor_miner.compute_all_factors(self.data)
print(f"计算因子完成: {list(self.factors.columns)}")
return self
def validate_factors(self, min_ic: float = 0.01, min_tstat: float = 1.5) -> 'FactorPipeline':
"""步骤3因子检验"""
print("\n" + "=" * 50)
print("步骤3因子检验")
print("=" * 50)
if self.factors is None or self.forward_return is None:
raise ValueError("请先完成因子挖掘")
valid_factors = []
self.validation_results = {}
for factor_name in self.factors.columns:
factor = self.factors[factor_name]
# 综合检验
result = validate_factor(factor, self.forward_return, ic_window=self.ic_window)
self.validation_results[factor_name] = result
# 筛选有效因子
if (abs(result['mean_ic']) >= min_ic and
abs(result['mean_h_l_tstat']) >= min_tstat):
valid_factors.append(factor_name)
print(f"\n因子 {factor_name}:")
print(f" 平均IC: {result['mean_ic']:.4f}")
print(f" IC信息比率: {result['ic_ir']:.4f}")
print(f" H-L收益差: {result['mean_h_l_return']:.4f}")
print(f" H-L t统计量: {result['mean_h_l_tstat']:.4f}")
else:
print(f"\n因子 {factor_name} 未通过检验 (IC={result['mean_ic']:.4f}, t={result['mean_h_l_tstat']:.4f})")
# 只保留有效因子
if valid_factors:
self.factors = self.factors[valid_factors]
print(f"\n有效因子: {valid_factors}")
else:
print("\n警告:没有因子通过检验!")
return self
def combine_factors(
self,
weight_method: str = 'risk_parity',
window: Optional[int] = None
) -> 'FactorPipeline':
"""步骤4因子组合"""
print("\n" + "=" * 50)
print("步骤4因子组合")
print("=" * 50)
if self.factors is None or len(self.factors.columns) == 0:
raise ValueError("没有有效因子可组合")
# 创建多因子模型
self.model = MultiFactorModel(weight_method=weight_method)
self.model.fit(
self.factors,
forward_return=self.forward_return,
window=window
)
# 计算综合得分
self.score = self.model.predict(self.factors)
# 显示权重
weights = self.model.get_weights()
print("因子权重:")
for name, weight in weights.items():
print(f" {name}: {weight:.4f}")
print(f"\n综合得分统计:")
print(f" 均值: {self.score.mean():.4f}")
print(f" 标准差: {self.score.std():.4f}")
return self
def generate_signals(
self,
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30
) -> pd.Series:
"""步骤5生成交易信号"""
if self.score is None:
raise ValueError("请先完成因子组合")
signals = generate_signals(
self.score,
buy_threshold=buy_threshold,
sell_threshold=sell_threshold,
window=window
)
return signals
def backtest(
self,
signals: Optional[pd.Series] = None,
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30
) -> Dict:
"""步骤6回测"""
print("\n" + "=" * 50)
print("步骤6回测")
print("=" * 50)
if self.data is None:
raise ValueError("请先加载数据")
if signals is None:
signals = self.generate_signals(buy_threshold, sell_threshold, window)
# 创建回测引擎
engine = BacktestEngine(
commission=self.commission,
slippage=self.slippage
)
# 运行回测
self.backtest_results = engine.run(
signals,
self.data['close'],
score=self.score
)
# 显示结果
metrics = self.backtest_results['metrics']
print("\n回测结果:")
print(f" 总收益率: {metrics.get('total_return', 0)*100:.2f}%")
print(f" 年化收益率: {metrics.get('annual_return', 0)*100:.2f}%")
print(f" 年化波动率: {metrics.get('annual_volatility', 0)*100:.2f}%")
print(f" 夏普比率: {metrics.get('sharpe_ratio', 0):.2f}")
print(f" 最大回撤: {metrics.get('max_drawdown', 0)*100:.2f}%")
print(f" 胜率: {metrics.get('win_rate', 0)*100:.2f}%")
print(f" 盈亏比: {metrics.get('profit_loss_ratio', 0):.2f}")
print(f" 交易次数: {metrics.get('total_trades', 0)}")
return self.backtest_results
def run_full_pipeline(
self,
file_path: str,
custom_miner: Optional[FactorMiner] = None,
min_ic: float = 0.01,
min_tstat: float = 1.5,
weight_method: str = 'risk_parity',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8
) -> Dict:
"""运行完整流程"""
self.load_and_preprocess(file_path) \
.mine_factors(custom_miner) \
.validate_factors(min_ic, min_tstat) \
.combine_factors(weight_method) \
.backtest(buy_threshold=buy_threshold, sell_threshold=sell_threshold)
return {
'factors': self.factors,
'score': self.score,
'validation': self.validation_results,
'backtest': self.backtest_results
}
if __name__ == "__main__":
# 示例使用
pipeline = FactorPipeline(ret_horizon=1, ic_window=30)
results = pipeline.run_full_pipeline(
file_path="ETH_USDT-1h.feather",
min_ic=0.01,
min_tstat=1.5,
weight_method='risk_parity',
buy_threshold=0.8,
sell_threshold=-0.8
)
# 保存结果
if results['factors'] is not None:
results['factors'].to_csv("factors.csv")
print("\n因子数据已保存到 factors.csv")
if results['score'] is not None:
results['score'].to_csv("score.csv")
print("综合得分已保存到 score.csv")

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
pandas>=1.3.0
numpy>=1.20.0
scipy>=1.7.0
statsmodels>=0.13.0
deap>=1.3.0

109
signal.py
View File

@@ -1,109 +0,0 @@
"""
信号生成模块
"""
import numpy as np
import pandas as pd
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from pandas import Series
def generate_signals(
score: 'pd.Series',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30,
use_rolling_std: bool = True
) -> 'pd.Series':
"""
基于因子得分生成买卖信号
Parameters:
-----------
score : Series
因子综合得分
buy_threshold : float
买入阈值(标准差倍数)
sell_threshold : float
卖出阈值(标准差倍数)
window : int
滚动窗口(用于计算标准差)
use_rolling_std : bool
是否使用滚动标准差
Returns:
--------
Series: 交易信号1=买入,-1=卖出0=持有)
"""
signals = pd.Series(0, index=score.index)
if use_rolling_std:
# 使用滚动标准差
rolling_std = score.rolling(window).std()
buy_line = buy_threshold * rolling_std
sell_line = sell_threshold * rolling_std
else:
# 使用固定阈值
std = score.std()
buy_line = buy_threshold * std
sell_line = sell_threshold * std
# 生成原始信号
raw_signals = pd.Series(0, index=score.index)
raw_signals[score > buy_line] = 1 # 买入信号
raw_signals[score < sell_line] = -1 # 卖出信号
# 只在信号变化时产生交易信号,其他时候保持持仓状态
signals = pd.Series(0, index=score.index)
position = 0 # 当前持仓状态0=空仓1=满仓
for i in range(len(raw_signals)):
current_signal = raw_signals.iloc[i]
# 只在信号变化时产生交易
if current_signal == 1 and position == 0:
signals.iloc[i] = 1 # 买入
position = 1
elif current_signal == -1 and position == 1:
signals.iloc[i] = -1 # 卖出
position = 0
# 其他情况保持当前持仓状态,不产生交易信号
return signals.astype(int)
def generate_signals_with_position(
score: 'pd.Series',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30,
current_position: int = 0
) -> 'pd.Series':
"""
生成信号(考虑当前持仓状态)
Parameters:
-----------
current_position : int
当前持仓0=空仓1=满仓
"""
raw_signals = generate_signals(score, buy_threshold, sell_threshold, window)
signals = pd.Series(0, index=score.index)
position = current_position
for i in range(len(raw_signals)):
signal = raw_signals.iloc[i]
if signal == 1 and position == 0:
signals.iloc[i] = 1 # 买入
position = 1
elif signal == -1 and position == 1:
signals.iloc[i] = -1 # 卖出
position = 0
else:
signals.iloc[i] = 0 # 持有
return signals

58
test.py Normal file
View File

@@ -0,0 +1,58 @@
import warnings
import numpy as np
import pandas as pd
import json
# 抑制numpy的警告由于数据中包含NaN值这是正常的
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy")
np.seterr(all="ignore") # 忽略numpy的浮点错误警告
from data import load_data
from factor_mining.gp_miner import GPMiner, GPConfig
if __name__ == "__main__":
df = load_data("/Users/aszer/Documents/vscode/factorhack/ETH_USDT-1h.feather")
# 以4小时为周期重采样K线数据假定有datetime索引常见ohlcv列
df = (
df.resample("4h")
.agg(
{
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
)
.dropna()
)
df = df[df.index < '2023-01-01']
print("数据加载成功前5行")
print(df.head())
print(f"\n数据形状: {df.shape}")
print(f"数据列: {df.columns.tolist()}")
gp_config = GPConfig(
ret_horizon=48,
ic_window=120,
ic_method="spearman",
seed=None,
population_size=200,
generations=30,
tournament_size=5,
crossover_prob=0.9,
mutation_prob=0.05,
elitism=5,
max_depth_init=1,
max_depth=30,
complexity_penalty=0.001,
)
miner = GPMiner(config=gp_config)
res = miner.mine(df, ["open", "high", "low", "close", "volume"])
with open("gp_miner_result.txt", "w") as out_file:
for formula, ic_tuple in res:
# ic_tuple 是元组取第一个元素作为IC值
ic = ic_tuple[0] if isinstance(ic_tuple, tuple) else ic_tuple
print(f"{formula.expression}, IC: {ic:.4f}")
# 将因子公式转换为字典并写入文件
out_file.write(json.dumps(formula.to_dict(), ensure_ascii=False))
out_file.write("\n")

View File

@@ -1,62 +1,43 @@
"""
因子检验模块IC检验、分组回测、因子跨度回归
因子检验模块: IC检验、分组回测、因子跨度回归
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from statsmodels.regression.linear_model import OLS
def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series:
"""
计算IC信息系数
Parameters:
-----------
factor : Series
因子值
forward_return : Series
未来收益率
method : str
相关性计算方法:'spearman''pearson'
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < 10:
return pd.Series(dtype=float)
if method == 'spearman':
ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank())
else:
ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1])
return pd.Series([ic], index=[aligned.index[-1]])
def compute_rolling_ic(
factor: pd.Series,
forward_return: pd.Series,
window: int = 30,
method: str = 'spearman'
method: str = "spearman",
) -> pd.Series:
"""计算滚动IC向量化优化"""
"""计算滚动IC (向量化优化)"""
# 对齐数据
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < window:
return pd.Series(dtype=float, index=factor.index[window:])
aligned.columns = ['factor', 'return']
aligned.columns = ["factor", "return"]
if method == 'spearman':
if method == "spearman":
# 使用rank计算Spearman相关性
factor_rank = aligned['factor'].rank()
return_rank = aligned['return'].rank()
# 使用DataFrame的rolling().corr()方法
df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank})
ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return'])
# 这里是全局的 rank理论上应该是按照 window 滚动排序
factor_rank = aligned["factor"].rank()
return_rank = aligned["return"].rank()
# 使用DataFrame的rolling().corr()方法, 该方法pandas优化过
df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank})
ic_series = (
df_rank["factor"]
.rolling(window, min_periods=window)
.corr(df_rank["return"])
)
else:
# Pearson相关性
df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']})
ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return'])
df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]})
ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"])
return ic_series
@@ -65,7 +46,7 @@ def group_backtest(
factor: pd.Series,
forward_return: pd.Series,
n_groups: int = 3,
group_period: int = 180
group_period: int = 180,
) -> Dict:
"""
分组回测:将数据按因子值分组,计算各组收益
@@ -75,14 +56,9 @@ def group_backtest(
dict: 包含各组收益、H-L收益差、t统计量等
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
aligned.columns = ['factor', 'return']
aligned.columns = ["factor", "return"]
results = {
'group_returns': [],
'h_l_return': [],
'h_l_tstat': [],
'periods': []
}
results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []}
# 按月分组每180个4h周期- 使用更高效的步长
step = max(group_period // 2, 90) # 减少重叠计算
@@ -96,16 +72,13 @@ def group_backtest(
# 按因子值分组(向量化)
try:
period_data = period_data.copy()
period_data['group'] = pd.qcut(
period_data['factor'],
q=n_groups,
labels=False,
duplicates='drop'
period_data["group"] = pd.qcut(
period_data["factor"], q=n_groups, labels=False, duplicates="drop"
)
# 计算各组收益(向量化)
group_returns = period_data.groupby('group')['return'].mean()
results['group_returns'].append(group_returns)
group_returns = period_data.groupby("group")["return"].mean()
results["group_returns"].append(group_returns)
# H-L收益差
if len(group_returns) >= 2:
@@ -113,33 +86,31 @@ def group_backtest(
l_return = group_returns.iloc[0] # 低因子组
h_l_diff = h_return - l_return
results['h_l_return'].append(h_l_diff)
results['periods'].append(period_data.index[-1])
results["h_l_return"].append(h_l_diff)
results["periods"].append(period_data.index[-1])
except (ValueError, KeyError):
# qcut失败时跳过
continue
# 计算平均H-L收益和t统计量
if results['h_l_return']:
h_l_series = pd.Series(results['h_l_return'], index=results['periods'])
if results["h_l_return"]:
h_l_series = pd.Series(results["h_l_return"], index=results["periods"])
mean_h_l = h_l_series.mean()
std_h_l = h_l_series.std()
t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8)
results['mean_h_l_return'] = mean_h_l
results['mean_h_l_tstat'] = t_stat
results['h_l_series'] = h_l_series
results["mean_h_l_return"] = mean_h_l
results["mean_h_l_tstat"] = t_stat
results["h_l_series"] = h_l_series
else:
results['mean_h_l_return'] = 0
results['mean_h_l_tstat'] = 0
results["mean_h_l_return"] = 0
results["mean_h_l_tstat"] = 0
return results
def factor_span_regression(
factors: pd.DataFrame,
forward_return: pd.Series,
target_factor: str
factors: pd.DataFrame, forward_return: pd.Series, target_factor: str
) -> Dict:
"""
因子跨度回归:检验因子的边际解释力
@@ -160,14 +131,14 @@ def factor_span_regression(
# 对齐数据
data = pd.concat([factors, forward_return], axis=1).dropna()
if len(data) < 30:
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
y = data.iloc[:, -1].values
X_all = data.iloc[:, :-1].values
# 全模型(包含目标因子)
try:
model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_all = model_all.rsquared
# 目标因子的系数和t统计量
@@ -177,28 +148,25 @@ def factor_span_regression(
# 不含目标因子的模型
X_without = np.delete(X_all, target_idx, axis=1)
model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_without = model_without.rsquared
r2_change = r2_all - r2_without
return {
'beta': beta,
'tstat': tstat,
'r2': r2_all,
'r2_change': r2_change,
'pvalue': model_all.pvalues[target_idx]
"beta": beta,
"tstat": tstat,
"r2": r2_all,
"r2_change": r2_change,
"pvalue": model_all.pvalues[target_idx],
}
except Exception as e:
print(f"回归分析出错: {e}")
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
def validate_factor(
factor: pd.Series,
forward_return: pd.Series,
ic_window: int = 30,
n_groups: int = 3
factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3
) -> Dict:
"""
综合因子检验
@@ -216,11 +184,10 @@ def validate_factor(
group_result = group_backtest(factor, forward_return, n_groups=n_groups)
return {
'mean_ic': mean_ic,
'ic_ir': ic_ir,
'ic_series': rolling_ic,
'mean_h_l_return': group_result['mean_h_l_return'],
'mean_h_l_tstat': group_result['mean_h_l_tstat'],
'group_returns': group_result['group_returns']
"mean_ic": mean_ic,
"ic_ir": ic_ir,
"ic_series": rolling_ic,
"mean_h_l_return": group_result["mean_h_l_return"],
"mean_h_l_tstat": group_result["mean_h_l_tstat"],
"group_returns": group_result["group_returns"],
}

7523
收益测算.ipynb Normal file

File diff suppressed because one or more lines are too long