生成信号

This commit is contained in:
2025-11-18 00:14:34 +08:00
parent abcb185505
commit f28b225d42
9 changed files with 21497 additions and 1134 deletions

13910
IC.ipynb Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1,121 +0,0 @@
# 基于BTC 4h数据的时间序列因子模型实操流程因子挖掘→检验→回测→信号生成
结合《ssrn.3255748.pdf》中时间序列TS因子模型的核心逻辑以及高维高频金融数据建模的前沿方法如投影主成分分析P-PCA以下为针对BTC 4h数据的完整实操流程包含每一步的理论原理、操作细节及论文引用依据。
## 一、数据准备与预处理(基础步骤,确保数据质量)
### 1. 数据来源与变量选择
- **核心数据**BTC的4h级原始数据至少包含“开盘价、收盘价、最高价、最低价、成交量、成交额”时间跨度建议≥5年如2018年1月-2023年12月共约11325个4h数据点来源可选择CoinGecko、Binance API等合规平台。
- **扩展变量**基于原始数据计算技术指标变量作为因子候选参考《ssrn.3255748.pdf》中“资产特征驱动因子”的逻辑具体包括
- 收益类4h收益率\(R_t = \ln(Close_t/Close_{t-1})\)、滚动12期48h收益率标准差波动率、滚动6期24h收益率偏度尾部风险
- 趋势类EMA指数移动平均如4期/8期/16期、MACD异同移动平均线、RSI相对强弱指数14期
- 量能类成交量滚动6期均值、成交额/成交量比值(量价配合度);
- 波动类ATR平均真实波幅14期、高低价差率\((High_t-Low_t)/Close_{t-1}\))。
### 2. 数据预处理(消除噪声与异常值)
- **异常值处理**采用“3σ法则”识别异常收益率如单日涨跌幅20%的4h数据用前后相邻数据的线性插值替换避免极端值对因子估计的干扰——这与中山大学研究中“抑制高频特异性波动”的思路一致摘要1、5该研究指出高频数据中的异常波动会扭曲因子估计需通过预处理降低噪声。
- **缺失值填补**:若存在数据缺失(如交易所维护导致的断更),采用“前向填充+滚动均值平滑”如用前3期均值填补确保时间序列的连续性。
- **标准化**对所有候选因子变量进行“Z-score标准化”\(X_{std}=(X-\mu)/\sigma\),其中\(\mu\)、\(\sigma\)为滚动30期120h的均值和标准差避免量纲差异影响因子权重——参考《ssrn.3255748.pdf》中CS因子“标准化匹配TS因子标准差”的操作逻辑
## 二、因子挖掘:基于时间序列逻辑构建候选因子
### 1. 因子构建原则贴合TS因子“预设规则、可解释”的核心特性
根据《ssrn.3255748.pdf》中TS因子的构建逻辑BTC 4h因子需满足“基于固定规则、反映特定风险/收益驱动逻辑”避免CS因子“月度优化、非可投资”的缺陷。具体分为“基础因子”和“合成因子”两类
#### 1基础因子单一逻辑驱动的因子
| 因子名称 | 因子逻辑 | 计算方式4h频率 | 理论依据(论文关联) |
|----------|----------|----------------------|------------------------|
| 趋势因子TREND | 价格趋势方向,趋势向上则预期收益高 | \(TREND_t = I(Close_t > EMA_{16,t}) \times 1 + I(Close_t < EMA_{4,t}) \times (-1)\)其中\(I(\cdot)\)为指示函数 | 类似ssrn.3255748.pdf特征驱动收益逻辑用EMA交叉反映趋势特征 |
| 波动率因子VOL | 波动率越高风险溢价越高 | \(VOL_t = \text{滚动12期收益率标准差}\) | 对应中山大学研究中高频波动捕捉风险的思路摘要15波动率是高频金融数据的核心风险特征 |
| 量价因子VOLP | 量价配合度成交量放大且价格上涨则动量强 | \(VOLP_t = I(Volume_t > \text{滚动6期Volume均值}) \times R_t\) | 参考《ssrn.3255748.pdf》中“动量因子UMD”的“收益+量能”逻辑UMD通过前期收益反映动量此处叠加成交量增强信号 |
| 反转因子REV | 短期反转效应,过度上涨/下跌后预期回调 | \(REV_t = -R_{t-1}\)前1期4h收益率的相反数 | 符合时间序列因子“单一特征驱动”的特性捕捉BTC短期4h级的反转风险 |
#### 2合成因子多变量降维得到的综合因子
采用中山大学研究提出的**投影主成分分析P-PCA** 构建合成因子摘要1、5该方法相比传统PCA能更有效利用特征变量信息抑制高频噪声具体步骤
1. **选择输入变量**将上述基础因子TREND、VOL、VOLP、REV及3个核心技术指标MACD差值、RSI、ATR作为P-PCA的输入矩阵\(X_{T \times K}\)T为时间维度K=7为变量维度
2. **投影步骤**根据P-PCA理论摘要1、5先将输入变量投影到“可观测特征空间”此处选择“滚动12期收益率”作为工具变量反映BTC收益的长期动态得到投影矩阵\(P\)
3. **提取主成分**:对投影后的矩阵\(P \times X\)进行PCA取前2个主成分累计方差解释率需≥80%)作为合成因子:
- 合成因子1PC1命名为“趋势-量能因子”权重集中在TREND、VOLP反映趋势与量能的协同效应
- 合成因子2PC2命名为“风险因子”权重集中在VOL、ATR反映4h级的风险暴露程度。
4. **因子方向校准**通过“因子与未来1期收益率的相关性”调整方向如PC1与\(R_{t+1}\)正相关则保留原方向负相关则取反确保因子值越高预期收益越高——贴合《ssrn.3255748.pdf》中“因子收益差为正”的TS因子设计逻辑如HML=高BM收益-低BM收益
## 三、因子检验:验证因子的有效性与稳健性
### 1. 因子收益检验(核心:因子能否区分未来收益)
参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的检验逻辑对每个候选因子进行“分组回测”步骤如下
- **分组规则**每月按4h频率约180个数据点将BTC 4h数据按因子值分为3组低因子组L、中因子组M、高因子组H
- **计算组收益**每组的4h收益为该组内因子值对应的BTC收益率因仅单标的此处为“因子值分位数对应的收益”如高因子组H为因子值前30%的4h数据的平均收益
- **检验指标**
- 因子收益差:\(H-L\)收益(高因子组收益 - 低因子组收益),若显著为正,说明因子能区分收益;
- t统计量用Newey-West调整的t统计量滞后6期对应24h检验\(H-L\)收益的显著性避免自相关导致的虚假显著参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的计算方式
**示例结果要求**如趋势因子TREND的\(H-L\)收益为0.35%/4h年化约84%t统计量=2.892显著说明该因子有效。
### 2. 因子跨度回归(检验因子的边际解释力)
根据《ssrn.3255748.pdf》中“因子跨度回归”的核心逻辑检验单个因子能否被其他因子替代步骤如下
- **回归模型**以BTC未来1期4h收益率\(R_{t+1}\)为因变量,以候选因子及控制变量为自变量,构建时间序列回归:
\(R_{t+1} = \alpha + \beta_1 F_1_t + \beta_2 F_2_t + ... + \beta_k F_k_t + e_{t+1}\)
其中\(F_1,F_2,...F_k\)为候选因子,\(\alpha\)为定价误差,\(\beta_i\)为因子载荷。
- **检验标准**:若某因子的\(\beta_i\)显著不为0t统计量2且加入该因子后模型\(R^2\)提升≥5%说明该因子具有“不可替代的边际解释力”未被其他因子吸收——类似《ssrn.3255748.pdf》中TS因子“市场、规模因子边际信息显著”的结论
**示例**若加入合成因子PC1后模型\(R^2\)从0.12提升至0.18PC1的\(\beta=0.25\)t=3.12说明PC1具有独立解释力。
### 3. 稳健性检验(排除偶然因素)
- **样本外检验**将数据分为“训练集2018-2021年”和“样本外集2022-2023年若因子在样本外的\(H-L\)收益t统计量仍1.8接近显著说明因子稳健——参考《ssrn.3255748.pdf》中“跨样本验证因子表现”的逻辑
- **频率敏感性检验**将4h频率调整为2h或8h若因子收益差的显著性变化≤20%说明因子不受频率小幅变动影响——符合中山大学研究中“高频因子需跨频率稳健”的要求摘要1、5
## 四、因子组合构建多因子模型贴合TS模型“常数载荷、可投资”特性
### 1. 因子权重确定避免CS模型“月度优化”的复杂性
根据《ssrn.3255748.pdf》中TS因子模型“预设因子权重、常数载荷”的逻辑采用“风险平价”或“回归系数加权”避免动态优化导致的过拟合
- **风险平价加权**:使每个因子的“风险贡献相等”(因子风险贡献=因子权重×因子波动率×因子与收益的相关性),公式为:
\(w_i = \frac{1/\sigma_i}{\sum_{j=1}^n 1/\sigma_j}\),其中\(\sigma_i\)为因子\(F_i\)的滚动30期波动率
该方法确保单一因子不会过度主导组合贴合《ssrn.3255748.pdf》中“多因子分散风险”的思路如FF五因子模型的等权重逻辑
- **回归系数加权**:用训练集的时间序列回归系数作为权重(如因子\(F_i\)的权重\(w_i = \beta_i / \sum_{j=1}^n |\beta_j|\)\(\beta_i\)为\(R_{t+1}\)对\(F_i\)的回归系数),确保权重与因子解释力正相关。
### 2. 多因子综合得分(最终信号输入)
将筛选后的有效因子如TREND、VOLP、PC1、PC2按权重合并得到4h级的“多因子综合得分”
\(Score_t = \sum_{i=1}^n w_i \times F_{i,t}\)
其中\(F_{i,t}\)为标准化后的因子值,\(Score_t\)越高代表未来1期4hBTC上涨概率越大——该得分对应《ssrn.3255748.pdf》中“因子组合预测收益”的逻辑通过多因子协同提升预测准确性。
## 五、回测:验证因子模型的实战有效性
### 1. 回测框架设计贴合TS因子“可投资”的核心优势
参考《ssrn.3255748.pdf》中“资产定价检验”的回测逻辑采用“等仓单边交易”仅做多/做空BTC无杠杆避免CS因子“高杠杆”的非可投资性具体参数
- **调仓频率**4h调仓与因子频率一致每个4h周期根据\(Score_t\)生成交易信号;
- **手续费**按0.1%/次现货交易手续费参考Binance等平台滑点按0.05%/次4h级BTC流动性充足滑点较低
- **回测区间**2019年1月-2023年12月共约4680个4h数据点包含牛熊周期检验模型适应性
### 2. 交易信号规则(基于多因子得分的阈值策略)
根据《ssrn.3255748.pdf》中“因子值与收益正相关”的结论设定阈值生成买卖信号
- **买入信号**:当\(Score_t > 0.8\sigma_{Score}\)\(\sigma_{Score}\)为Score的滚动30期标准差且前1期无持仓时买入BTC满仓
- **卖出信号**:当\(Score_t < -0.8\sigma_{Score}\)且前1期有持仓时卖出BTC空仓
- **观望信号**\(Score_t\)\([-0.8\sigma_{Score}, 0.8\sigma_{Score}]\)之间维持原有持仓避免频繁交易)。
### 3. 回测指标与评估(参考论文中的资产定价检验指标)
采用ssrn.3255748.pdf平均收益夏普比率最大回撤等核心指标同时加入高频数据特有的胜率盈亏比”,具体如下
| 回测指标 | 计算方式 | 合格标准BTC 4h策略 |
|----------|----------|--------------------------|
| 年化收益率 | \((1+\text{累计收益})^{252×6/24} - 1\)假设年交易252天每天6个4h周期 | 30%跑赢BTC现货年化收益 |
| 夏普比率 | 年化收益率 / 年化波动率 | 1.5风险调整收益优秀 |
| 最大回撤 | 回测期间最大亏损幅度 | 50%控制极端风险 |
| 胜率 | 盈利交易次数 / 总交易次数 | 55%信号准确性高 |
**示例结果**若回测得到年化收益45%、夏普比率1.8最大回撤42%、胜率58%说明模型有效——类似ssrn.3255748.pdfTS因子模型解释力达标的实证结论()。
## 六、信号优化与上线:动态适应市场变化
### 1. 因子载荷动态调整(参考“时变载荷”的改进思路)
虽然ssrn.3255748.pdf中TS模型默认常数载荷”,但中山大学研究指出因子载荷时变能提升预测准确性”(摘要15因此可引入滚动窗口调整权重”:
- 每30天180个4h周期重新估计因子权重如风险平价权重的波动率用最新30期数据避免因子失效如BTC在牛熊周期中波动率因子的重要性会变化
- 若某因子连续60个4h周期10天\(H-L\)收益t统计量1.0暂时剔除该因子待其恢复显著性后重新加入——贴合ssrn.3255748.pdf因子边际信息动态检验的逻辑()。
### 2. 实盘上线与监控
- **信号输出**每4h生成Score_t及对应买卖信号通过API对接交易所如Binance Spot API实现自动交易
- **风险监控**实时监控因子有效性指标”(如当前因子的\(H-L\)收益t统计量模型\(R^2\)若指标连续3天不达标如t统计量1.2暂停自动交易人工排查原因如市场结构变化导致因子失效
- **日志记录**保存每4h的因子值信号交易结果每月进行回测复盘对比实盘与回测的差异优化因子参数如调整EMA周期阈值系数)。
## 七、关键论文引用与理论支撑总结
1. ssrn.3255748.pdf》(Fama & French 2018核心支撑TS因子预设规则可投资常数斜率回归检验的逻辑指导因子构建检验回测的整体框架(、、);
2. 中山大学高维高频金融数据的因子建模》(摘要15提供P-PCA合成因子高频噪声处理时变载荷调整的方法解决BTC 4h高频数据的因子估计问题
3. 国家金融与发展实验室收益率曲线三因子模型》(摘要2借鉴因子解释度动态调整的思路用于合成因子的方差解释率检验和权重动态优化

View File

@@ -1,180 +0,0 @@
"""
回测模块
"""
import numpy as np
import pandas as pd
from typing import Dict, Optional, Tuple
class BacktestEngine:
"""回测引擎"""
def __init__(
self,
commission: float = 0.001, # 手续费率
slippage: float = 0.0005, # 滑点
initial_capital: float = 10000.0
):
self.commission = commission
self.slippage = slippage
self.initial_capital = initial_capital
def run(
self,
signals: pd.Series,
price: pd.Series,
score: Optional[pd.Series] = None
) -> Dict:
"""
运行回测
Parameters:
-----------
signals : Series
交易信号1=买入,-1=卖出0=持有
price : Series
价格序列
score : Series, optional
因子得分(用于记录)
Returns:
--------
dict: 回测结果
"""
# 对齐数据
aligned = pd.concat([signals, price], axis=1).dropna()
aligned.columns = ['signal', 'price']
if score is not None:
aligned = pd.concat([aligned, score], axis=1)
aligned.columns = ['signal', 'price', 'score']
# 向量化优化:先计算价格变化率
price_pct = aligned['price'].pct_change().fillna(0)
# 初始化
capital = self.initial_capital
position = 0 # 持仓0=空仓1=满仓
equity = np.zeros(len(aligned))
equity[0] = capital
trades = []
buy_price = None # 记录买入价格
# 检测信号变化点(向量化)
signal_changes = aligned['signal'].diff().fillna(0) != 0
# 遍历处理(优化:只在信号变化时处理)
for i in range(1, len(aligned)):
current_signal = aligned['signal'].iloc[i]
current_price = aligned['price'].iloc[i]
prev_signal = aligned['signal'].iloc[i-1]
# 计算收益率(基于价格变化)
if position == 1:
period_return = price_pct.iloc[i]
else:
period_return = 0
# 交易逻辑(只在信号变化时处理)
if signal_changes.iloc[i]:
if current_signal == 1 and position == 0: # 买入
# 扣除手续费和滑点
cost = self.commission + self.slippage
capital *= (1 - cost)
position = 1
buy_price = current_price
trades.append({
'date': aligned.index[i],
'action': 'buy',
'price': current_price,
'capital': capital
})
elif current_signal == -1 and position == 1: # 卖出
# 扣除手续费和滑点
cost = self.commission + self.slippage
capital *= (1 - cost)
position = 0
buy_price = None
trades.append({
'date': aligned.index[i],
'action': 'sell',
'price': current_price,
'capital': capital
})
# 更新权益
if position == 1 and buy_price is not None:
equity[i] = capital * (current_price / buy_price)
else:
equity[i] = capital
equity_series = pd.Series(equity, index=aligned.index)
returns_series = price_pct * (aligned['signal'].shift(1) == 1).astype(int)
# 计算回测指标
metrics = self._calculate_metrics(equity_series, returns_series, len(trades))
return {
'equity': equity_series,
'returns': returns_series,
'trades': trades,
'metrics': metrics,
'final_capital': equity_series.iloc[-1] if len(equity_series) > 0 else self.initial_capital
}
def _calculate_metrics(
self,
equity: pd.Series,
returns: pd.Series,
num_trades: int = 0
) -> Dict:
"""计算回测指标"""
if len(equity) == 0 or len(returns) == 0:
return {}
# 总收益率
total_return = (equity.iloc[-1] / equity.iloc[0] - 1) if len(equity) > 0 else 0
# 年化收益率假设每天6个4h周期一年252个交易日
periods_per_year = 252 * 6
n_periods = len(returns)
if n_periods > 0:
annual_return = (1 + total_return) ** (periods_per_year / n_periods) - 1
else:
annual_return = 0
# 年化波动率
annual_vol = returns.std() * np.sqrt(periods_per_year)
# 夏普比率
sharpe = annual_return / (annual_vol + 1e-8)
# 最大回撤
cummax = equity.cummax()
drawdown = (equity - cummax) / cummax
max_drawdown = drawdown.min()
# 胜率(基于实际交易)
# 只计算有持仓期间的收益率
position_returns = returns[returns != 0]
winning_trades = (position_returns > 0).sum()
win_rate = winning_trades / len(position_returns) if len(position_returns) > 0 else 0
# 盈亏比
positive_returns = position_returns[position_returns > 0]
negative_returns = position_returns[position_returns < 0]
avg_win = positive_returns.mean() if len(positive_returns) > 0 else 0
avg_loss = abs(negative_returns.mean()) if len(negative_returns) > 0 else 0
profit_loss_ratio = avg_win / (avg_loss + 1e-8)
return {
'total_return': total_return,
'annual_return': annual_return,
'annual_volatility': annual_vol,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'profit_loss_ratio': profit_loss_ratio,
'total_trades': num_trades # 实际交易次数
}

View File

@@ -1,436 +0,0 @@
import argparse
import math
import operator
import random
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from deap import algorithms, base, creator, gp, tools
# ------------------------------
# Data & Config
# ------------------------------
@dataclass
class EvolutionConfig:
population_size: int = 200
generations: int = 30
tournament_size: int = 5
crossover_prob: float = 0.9
mutation_prob: float = 0.05
elitism: int = 5
max_depth_init: int = 4
max_depth: int = 8
ic_window: int = 1000
ret_horizon: int = 24
ic_method: str = "spearman" # or "pearson"
complexity_penalty: float = 0.001
seed: Optional[int] = 42
# ------------------------------
# Safe operators for GP
# ------------------------------
def _safe_div(left: np.ndarray, right: np.ndarray) -> np.ndarray:
denom = np.where(np.abs(right) < 1e-12, np.nan, right)
return left / denom
def _safe_log(x: np.ndarray) -> np.ndarray:
return np.log(np.clip(np.abs(x), 1e-12, None))
def _safe_sqrt(x: np.ndarray) -> np.ndarray:
return np.sqrt(np.clip(x, 0.0, None))
def _safe_pow(x: np.ndarray, y: np.ndarray) -> np.ndarray:
# Limit exponent to avoid overflow
y_clip = np.clip(y, -3.0, 3.0)
with np.errstate(over="ignore", invalid="ignore"):
out = np.power(np.clip(x, -1e6, 1e6), y_clip)
out[~np.isfinite(out)] = np.nan
return out
def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy()
def _rolling_std(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy()
def _ts_delta(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.diff(period).to_numpy()
def _ts_rank(x: np.ndarray, window: int) -> np.ndarray:
s = pd.Series(x)
return s.rolling(window, min_periods=max(2, window // 2)).apply(
lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False
).to_numpy()
def _delay(x: np.ndarray, period: int) -> np.ndarray:
s = pd.Series(x)
return s.shift(period).to_numpy()
# ------------------------------
# Primitive set
# ------------------------------
def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped:
# Each feature is a numpy array of floats; GP outputs numpy array
pset = gp.PrimitiveSetTyped("MAIN", [np.ndarray for _ in feature_names], np.ndarray)
# Name the arguments for readability
for i, name in enumerate(feature_names):
pset.renameArguments(**{f"ARG{i}": name})
# Binary arithmetic
pset.addPrimitive(lambda x, y: x + y, [np.ndarray, np.ndarray], np.ndarray, name="add")
pset.addPrimitive(lambda x, y: x - y, [np.ndarray, np.ndarray], np.ndarray, name="sub")
pset.addPrimitive(lambda x, y: x * y, [np.ndarray, np.ndarray], np.ndarray, name="mul")
pset.addPrimitive(_safe_div, [np.ndarray, np.ndarray], np.ndarray, name="div")
# Unary transforms
pset.addPrimitive(np.negative, [np.ndarray], np.ndarray, name="neg")
pset.addPrimitive(np.abs, [np.ndarray], np.ndarray, name="abs")
pset.addPrimitive(_safe_log, [np.ndarray], np.ndarray, name="log")
pset.addPrimitive(_safe_sqrt, [np.ndarray], np.ndarray, name="sqrt")
# Power
pset.addPrimitive(_safe_pow, [np.ndarray, np.ndarray], np.ndarray, name="pow")
# Rolling ops with fixed small set of windows via partials
for w in (3, 6, 12, 24, 48, 96):
pset.addPrimitive(lambda x, w=w: _rolling_mean(x, w), [np.ndarray], np.ndarray, name=f"sma{w}")
pset.addPrimitive(lambda x, w=w: _rolling_std(x, w), [np.ndarray], np.ndarray, name=f"std{w}")
pset.addPrimitive(lambda x, w=w: _ts_rank(x, w), [np.ndarray], np.ndarray, name=f"rank{w}")
pset.addPrimitive(lambda x, w=w: _ts_delta(x, w), [np.ndarray], np.ndarray, name=f"delta{w}")
pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}")
# Ephemeral constants: scalar to array via broadcasting
# 随机加一个常数 不一定合理
def _const() -> np.ndarray:
return np.array(random.uniform(-2.0, 2.0))
pset.addEphemeralConstant("const", _const, np.ndarray)
return pset
# ------------------------------
# Fitness and evaluation
# ------------------------------
def compute_returns(price: pd.Series, horizon: int) -> pd.Series:
return price.pct_change(horizon).shift(-horizon)
def rank_ic(a: pd.Series, b: pd.Series, method: str = "spearman") -> float:
mask = a.notna() & b.notna()
if mask.sum() < 10:
return np.nan
x = a[mask]
y = b[mask]
if method == "spearman":
return x.rank(pct=True).corr(y.rank(pct=True))
return x.corr(y)
def series_zscore(x: pd.Series) -> pd.Series:
return (x - x.mean()) / (x.std(ddof=0) + 1e-12)
def evaluate_individual(
individual,
toolbox: base.Toolbox,
features: List[pd.Series],
target: pd.Series,
config: EvolutionConfig,
) -> Tuple[float]:
func = toolbox.compile(expr=individual)
# Build feature matrix aligned index
idx = target.index
inputs = [f.reindex(idx).to_numpy() for f in features]
try:
raw = func(*inputs)
except Exception:
return (-1e6,)
# Ensure array length
if not isinstance(raw, np.ndarray):
return (-1e6,)
if raw.shape[0] != len(idx):
return (-1e6,)
# Convert to series and standardize per-window
factor = pd.Series(raw, index=idx)
factor = factor.replace([np.inf, -np.inf], np.nan)
factor = factor.ffill().bfill()
# Rolling IC over window segments
window = config.ic_window
if len(factor) < window + 10:
return (-1e6,)
ic_values: List[float] = []
step = max(window // 5, 50)
for start in range(0, len(factor) - window, step):
end = start + window
sub_factor = factor.iloc[start:end]
sub_target = target.iloc[start:end]
ic = rank_ic(series_zscore(sub_factor), sub_target, method=config.ic_method)
if np.isfinite(ic):
ic_values.append(ic)
if not ic_values:
return (-1e6,)
mean_ic = float(np.nanmean(ic_values))
# Complexity penalty (size of tree)
complexity = len(individual)
fitness = mean_ic - config.complexity_penalty * complexity
if not np.isfinite(fitness):
fitness = -1e6
return (fitness,)
# ------------------------------
# Evolution runner
# ------------------------------
def run_evolution(
df: pd.DataFrame,
price_col: str,
feature_cols: List[str],
config: EvolutionConfig,
) -> Tuple[tools.HallOfFame, base.Toolbox, gp.PrimitiveSetTyped, List[pd.Series]]:
if config.seed is not None:
random.seed(config.seed)
np.random.seed(config.seed)
price = df[price_col].astype(float)
forward_ret = compute_returns(price, config.ret_horizon)
target = forward_ret
features = [df[c].astype(float) for c in feature_cols]
pset = build_pset(feature_cols)
# Fitness: maximize IC (single objective)
if not hasattr(creator, "FitnessMax"):
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("expr",
gp.genHalfAndHalf,
pset=pset,
min_=1,
max_=config.max_depth_init)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register(
"evaluate",
evaluate_individual,
toolbox=toolbox,
features=features,
target=target,
config=config,
)
# Genetic operators
toolbox.register("select", tools.selTournament, tournsize=config.tournament_size)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
# bloat control
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=config.max_depth))
pop = toolbox.population(n=config.population_size)
hof = tools.HallOfFame(maxsize=max(5, config.elitism))
stats_fit = tools.Statistics(lambda ind: ind.fitness.values[0])
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.nanmean)
mstats.register("std", np.nanstd)
mstats.register("min", np.nanmin)
mstats.register("max", np.nanmax)
pop, logbook = algorithms.eaSimple(
pop,
toolbox,
cxpb=config.crossover_prob,
mutpb=config.mutation_prob,
ngen=config.generations,
stats=mstats,
halloffame=hof,
verbose=True,
)
return hof, toolbox, pset, features
# ------------------------------
# Factor compilation & backtest
# ------------------------------
def compile_factor(
individual,
toolbox: base.Toolbox,
index: pd.Index,
features: List[pd.Series],
) -> pd.Series:
func = toolbox.compile(expr=individual)
inputs = [f.reindex(index).to_numpy() for f in features]
raw = func(*inputs)
s = pd.Series(raw, index=index)
s = s.replace([np.inf, -np.inf], np.nan).ffill().bfill()
return s
def simple_long_short_backtest(
factor: pd.Series,
price: pd.Series,
ret_horizon: int,
top_quantile: float = 0.2,
bottom_quantile: float = 0.2,
) -> pd.Series:
f = factor.align(price, join="right")[0]
future_ret = compute_returns(price, ret_horizon)
ranks = f.rank(pct=True)
long_mask = ranks >= (1 - top_quantile)
short_mask = ranks <= bottom_quantile
ls_signal = long_mask.astype(float) - short_mask.astype(float)
ls_signal = ls_signal.shift(1) # trade on next bar
pnl = ls_signal * future_ret
pnl = pnl.replace([np.inf, -np.inf], np.nan).fillna(0.0)
equity = (1.0 + pnl).cumprod()
return equity
# ------------------------------
# CLI
# ------------------------------
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="DEAP-based factor mining (genetic programming)")
p.add_argument("--data", type=str, default="ETH_USDT-1h.feather", help="Input feather/csv file")
p.add_argument("--price_col", type=str, default="close", help="Price column name")
p.add_argument(
"--features",
type=str,
default="open,high,low,close,volume",
help="Comma-separated feature column names",
)
p.add_argument("--ret_horizon", type=int, default=24)
p.add_argument("--population", type=int, default=200)
p.add_argument("--generations", type=int, default=30)
p.add_argument("--ic_window", type=int, default=1000)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--ic_method", type=str, default="spearman", choices=["spearman", "pearson"])
p.add_argument("--complexity_penalty", type=float, default=0.001)
p.add_argument("--save_best", type=str, default="best_factors.txt")
return p.parse_args()
def load_dataframe(path: str) -> pd.DataFrame:
if path.endswith(".feather"):
df = pd.read_feather(path)
elif path.endswith(".csv"):
df = pd.read_csv(path)
else:
raise ValueError("Unsupported file format. Use .feather or .csv")
# Try to parse datetime index if present
for col in ["datetime", "time", "timestamp", "date"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col])
df = df.set_index(col).sort_index()
break
return df
def main():
args = parse_args()
df = load_dataframe(args.data)
df = df.head(1000)
feature_cols = [c.strip() for c in args.features.split(",") if c.strip()]
for c in [args.price_col] + feature_cols:
if c not in df.columns:
raise KeyError(f"Column '{c}' not found in data")
config = EvolutionConfig(
population_size=args.population,
generations=args.generations,
ic_window=args.ic_window,
ret_horizon=args.ret_horizon,
ic_method=args.ic_method,
complexity_penalty=args.complexity_penalty,
seed=args.seed,
)
hof, toolbox, pset, features = run_evolution(df, args.price_col, feature_cols, config)
price = df[args.price_col].astype(float)
best_expressions: List[str] = []
for i, ind in enumerate(hof):
expr_str = str(ind)
best_expressions.append(expr_str)
# Save best expressions
with open(args.save_best, "w", encoding="utf-8") as f:
for expr in best_expressions:
f.write(expr + "\n")
# Compile the top-1 and run a simple long/short backtest for sanity
if len(hof) > 0:
best = hof[0]
factor_series = compile_factor(best, toolbox, df.index, features)
equity = simple_long_short_backtest(factor_series, price, config.ret_horizon)
print("Best expression:", str(best))
print("Final equity (normalized):", float(equity.iloc[-1]))
# Also export factor and equity
out = pd.DataFrame({
"factor": factor_series,
"equity": equity,
})
out.to_csv("deap_factor_output.csv")
print("Saved best expressions to", args.save_best)
print("Saved factor/equity to deap_factor_output.csv")
if __name__ == "__main__":
main()

View File

@@ -1,110 +0,0 @@
"""
使用示例:时间序列因子挖掘流程
"""
from pipeline import FactorPipeline
from factors import FactorMiner, create_default_factors
# 方式1使用默认流程最简单
def example_simple():
"""简单示例"""
pipeline = FactorPipeline(
ret_horizon=1, # 未来1期收益率
ic_window=30, # IC计算窗口
commission=0.001, # 手续费0.1%
slippage=0.0005 # 滑点0.05%
)
# 运行完整流程
results = pipeline.run_full_pipeline(
file_path="ETH_USDT-1h.feather",
min_ic=0.01, # 最小IC阈值
min_tstat=1.5, # 最小t统计量
weight_method='risk_parity', # 权重方法risk_parity, regression, equal
buy_threshold=0.8, # 买入阈值(标准差倍数)
sell_threshold=-0.8 # 卖出阈值(标准差倍数)
)
return results
# 方式2分步骤执行更灵活
def example_step_by_step():
"""分步骤示例"""
pipeline = FactorPipeline(ret_horizon=1, ic_window=30)
# 步骤1加载和预处理数据
pipeline.load_and_preprocess("ETH_USDT-1h.feather")
# 步骤2因子挖掘可以使用自定义因子
custom_miner = create_default_factors()
# 可以在这里添加自定义因子
# custom_miner.register_rule_factor('CUSTOM', your_custom_function)
pipeline.mine_factors(custom_miner)
# 步骤3因子检验
pipeline.validate_factors(min_ic=0.01, min_tstat=1.5)
# 步骤4因子组合
pipeline.combine_factors(weight_method='risk_parity')
# 步骤5生成信号
signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8)
# 步骤6回测
backtest_results = pipeline.backtest(signals)
return {
'factors': pipeline.factors,
'score': pipeline.score,
'signals': signals,
'backtest': backtest_results
}
# 方式3自定义因子
def example_custom_factors():
"""自定义因子示例"""
from factors import RuleFactor
import pandas as pd
import numpy as np
# 定义自定义因子函数
def my_custom_factor(data: pd.DataFrame) -> pd.Series:
"""自定义因子:价格与均线的距离"""
return (data['close'] - data['ema8']) / data['ema8']
# 创建因子挖掘器
miner = create_default_factors()
# 注册自定义因子
miner.register_rule_factor('CUSTOM_DISTANCE', my_custom_factor)
# 使用自定义因子挖掘器
pipeline = FactorPipeline()
pipeline.load_and_preprocess("ETH_USDT-1h.feather")
pipeline.mine_factors(custom_miner=miner)
pipeline.validate_factors()
pipeline.combine_factors()
pipeline.backtest()
return pipeline
if __name__ == "__main__":
# 运行简单示例
print("运行简单示例...")
results = example_simple()
# 保存结果
if results['factors'] is not None:
results['factors'].to_csv("factors_output.csv")
print("\n因子数据已保存到 factors_output.csv")
if results['score'] is not None:
results['score'].to_csv("score_output.csv")
print("综合得分已保存到 score_output.csv")
if results['backtest'] is not None and 'equity' in results['backtest']:
results['backtest']['equity'].to_csv("equity_curve.csv")
print("权益曲线已保存到 equity_curve.csv")

View File

@@ -1,287 +0,0 @@
"""
主流程:时间序列因子挖掘、检验、回测、信号生成
"""
import pandas as pd
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')
from data import load_data, compute_technical_indicators, preprocess_data, compute_forward_returns
from factors import FactorMiner, create_default_factors
from validation import validate_factor, factor_span_regression
from combination import MultiFactorModel
from backtest import BacktestEngine
from signal import generate_signals
class FactorPipeline:
"""因子挖掘流程"""
def __init__(
self,
ret_horizon: int = 1,
ic_window: int = 30,
commission: float = 0.001,
slippage: float = 0.0005
):
"""
Parameters:
-----------
ret_horizon : int
未来收益率周期
ic_window : int
IC计算窗口
commission : float
手续费率
slippage : float
滑点
"""
self.ret_horizon = ret_horizon
self.ic_window = ic_window
self.commission = commission
self.slippage = slippage
self.data: Optional[pd.DataFrame] = None
self.factors: Optional[pd.DataFrame] = None
self.forward_return: Optional[pd.Series] = None
self.factor_miner: Optional[FactorMiner] = None
self.validation_results: Dict = {}
self.model: Optional[MultiFactorModel] = None
self.score: Optional[pd.Series] = None
self.backtest_results: Optional[Dict] = None
def load_and_preprocess(self, file_path: str) -> 'FactorPipeline':
"""步骤1加载和预处理数据"""
print("=" * 50)
print("步骤1加载和预处理数据")
print("=" * 50)
# 加载数据
self.data = load_data(file_path)
print(f"加载数据: {len(self.data)} 条记录")
# 计算技术指标
self.data = compute_technical_indicators(self.data)
print("计算技术指标完成")
# 预处理
self.data = preprocess_data(self.data)
print("数据预处理完成")
# 计算未来收益率
self.forward_return = compute_forward_returns(
self.data['close'],
horizon=self.ret_horizon
)
print(f"计算未来收益率完成(周期={self.ret_horizon}")
return self
def mine_factors(self, custom_miner: Optional[FactorMiner] = None) -> 'FactorPipeline':
"""步骤2因子挖掘"""
print("\n" + "=" * 50)
print("步骤2因子挖掘")
print("=" * 50)
if self.data is None:
raise ValueError("请先加载数据")
# 使用自定义或默认因子挖掘器
if custom_miner is None:
self.factor_miner = create_default_factors()
else:
self.factor_miner = custom_miner
# 计算所有因子
self.factors = self.factor_miner.compute_all_factors(self.data)
print(f"计算因子完成: {list(self.factors.columns)}")
return self
def validate_factors(self, min_ic: float = 0.01, min_tstat: float = 1.5) -> 'FactorPipeline':
"""步骤3因子检验"""
print("\n" + "=" * 50)
print("步骤3因子检验")
print("=" * 50)
if self.factors is None or self.forward_return is None:
raise ValueError("请先完成因子挖掘")
valid_factors = []
self.validation_results = {}
for factor_name in self.factors.columns:
factor = self.factors[factor_name]
# 综合检验
result = validate_factor(factor, self.forward_return, ic_window=self.ic_window)
self.validation_results[factor_name] = result
# 筛选有效因子
if (abs(result['mean_ic']) >= min_ic and
abs(result['mean_h_l_tstat']) >= min_tstat):
valid_factors.append(factor_name)
print(f"\n因子 {factor_name}:")
print(f" 平均IC: {result['mean_ic']:.4f}")
print(f" IC信息比率: {result['ic_ir']:.4f}")
print(f" H-L收益差: {result['mean_h_l_return']:.4f}")
print(f" H-L t统计量: {result['mean_h_l_tstat']:.4f}")
else:
print(f"\n因子 {factor_name} 未通过检验 (IC={result['mean_ic']:.4f}, t={result['mean_h_l_tstat']:.4f})")
# 只保留有效因子
if valid_factors:
self.factors = self.factors[valid_factors]
print(f"\n有效因子: {valid_factors}")
else:
print("\n警告:没有因子通过检验!")
return self
def combine_factors(
self,
weight_method: str = 'risk_parity',
window: Optional[int] = None
) -> 'FactorPipeline':
"""步骤4因子组合"""
print("\n" + "=" * 50)
print("步骤4因子组合")
print("=" * 50)
if self.factors is None or len(self.factors.columns) == 0:
raise ValueError("没有有效因子可组合")
# 创建多因子模型
self.model = MultiFactorModel(weight_method=weight_method)
self.model.fit(
self.factors,
forward_return=self.forward_return,
window=window
)
# 计算综合得分
self.score = self.model.predict(self.factors)
# 显示权重
weights = self.model.get_weights()
print("因子权重:")
for name, weight in weights.items():
print(f" {name}: {weight:.4f}")
print(f"\n综合得分统计:")
print(f" 均值: {self.score.mean():.4f}")
print(f" 标准差: {self.score.std():.4f}")
return self
def generate_signals(
self,
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30
) -> pd.Series:
"""步骤5生成交易信号"""
if self.score is None:
raise ValueError("请先完成因子组合")
signals = generate_signals(
self.score,
buy_threshold=buy_threshold,
sell_threshold=sell_threshold,
window=window
)
return signals
def backtest(
self,
signals: Optional[pd.Series] = None,
buy_threshold: float = 0.8,
sell_threshold: float = -0.8,
window: int = 30
) -> Dict:
"""步骤6回测"""
print("\n" + "=" * 50)
print("步骤6回测")
print("=" * 50)
if self.data is None:
raise ValueError("请先加载数据")
if signals is None:
signals = self.generate_signals(buy_threshold, sell_threshold, window)
# 创建回测引擎
engine = BacktestEngine(
commission=self.commission,
slippage=self.slippage
)
# 运行回测
self.backtest_results = engine.run(
signals,
self.data['close'],
score=self.score
)
# 显示结果
metrics = self.backtest_results['metrics']
print("\n回测结果:")
print(f" 总收益率: {metrics.get('total_return', 0)*100:.2f}%")
print(f" 年化收益率: {metrics.get('annual_return', 0)*100:.2f}%")
print(f" 年化波动率: {metrics.get('annual_volatility', 0)*100:.2f}%")
print(f" 夏普比率: {metrics.get('sharpe_ratio', 0):.2f}")
print(f" 最大回撤: {metrics.get('max_drawdown', 0)*100:.2f}%")
print(f" 胜率: {metrics.get('win_rate', 0)*100:.2f}%")
print(f" 盈亏比: {metrics.get('profit_loss_ratio', 0):.2f}")
print(f" 交易次数: {metrics.get('total_trades', 0)}")
return self.backtest_results
def run_full_pipeline(
self,
file_path: str,
custom_miner: Optional[FactorMiner] = None,
min_ic: float = 0.01,
min_tstat: float = 1.5,
weight_method: str = 'risk_parity',
buy_threshold: float = 0.8,
sell_threshold: float = -0.8
) -> Dict:
"""运行完整流程"""
self.load_and_preprocess(file_path) \
.mine_factors(custom_miner) \
.validate_factors(min_ic, min_tstat) \
.combine_factors(weight_method) \
.backtest(buy_threshold=buy_threshold, sell_threshold=sell_threshold)
return {
'factors': self.factors,
'score': self.score,
'validation': self.validation_results,
'backtest': self.backtest_results
}
if __name__ == "__main__":
# 示例使用
pipeline = FactorPipeline(ret_horizon=1, ic_window=30)
results = pipeline.run_full_pipeline(
file_path="ETH_USDT-1h.feather",
min_ic=0.01,
min_tstat=1.5,
weight_method='risk_parity',
buy_threshold=0.8,
sell_threshold=-0.8
)
# 保存结果
if results['factors'] is not None:
results['factors'].to_csv("factors.csv")
print("\n因子数据已保存到 factors.csv")
if results['score'] is not None:
results['score'].to_csv("score.csv")
print("综合得分已保存到 score.csv")

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
pandas>=1.3.0
numpy>=1.20.0
scipy>=1.7.0
statsmodels>=0.13.0
deap>=1.3.0

58
test.py Normal file
View File

@@ -0,0 +1,58 @@
import warnings
import numpy as np
import pandas as pd
import json
# 抑制numpy的警告由于数据中包含NaN值这是正常的
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy")
np.seterr(all="ignore") # 忽略numpy的浮点错误警告
from data import load_data
from factor_mining.gp_miner import GPMiner, GPConfig
if __name__ == "__main__":
df = load_data("/Users/aszer/Documents/vscode/factorhack/ETH_USDT-1h.feather")
# 以4小时为周期重采样K线数据假定有datetime索引常见ohlcv列
df = (
df.resample("4h")
.agg(
{
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
)
.dropna()
)
df = df[df.index < '2023-01-01']
print("数据加载成功前5行")
print(df.head())
print(f"\n数据形状: {df.shape}")
print(f"数据列: {df.columns.tolist()}")
gp_config = GPConfig(
ret_horizon=48,
ic_window=120,
ic_method="spearman",
seed=None,
population_size=200,
generations=30,
tournament_size=5,
crossover_prob=0.9,
mutation_prob=0.05,
elitism=5,
max_depth_init=1,
max_depth=30,
complexity_penalty=0.001,
)
miner = GPMiner(config=gp_config)
res = miner.mine(df, ["open", "high", "low", "close", "volume"])
with open("gp_miner_result.txt", "w") as out_file:
for formula, ic_tuple in res:
# ic_tuple 是元组取第一个元素作为IC值
ic = ic_tuple[0] if isinstance(ic_tuple, tuple) else ic_tuple
print(f"{formula.expression}, IC: {ic:.4f}")
# 将因子公式转换为字典并写入文件
out_file.write(json.dumps(formula.to_dict(), ensure_ascii=False))
out_file.write("\n")

7523
收益测算.ipynb Normal file

File diff suppressed because one or more lines are too long