From a66e42a8ae04dbc3ead4d9574e8d01e089b29116 Mon Sep 17 00:00:00 2001 From: aszerW Date: Sat, 8 Nov 2025 13:39:02 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E7=89=88=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 211 +++++++++++++++++++++++++++++++ TS因子挖掘构建流程.md | 121 ++++++++++++++++++ backtest.py | 180 ++++++++++++++++++++++++++ combination.py | 152 ++++++++++++++++++++++ data.py | 138 ++++++++++++++++++++ deap_factor_mining.py | 1 + example.py | 110 ++++++++++++++++ factors.py | 113 +++++++++++++++++ pipeline.py | 287 ++++++++++++++++++++++++++++++++++++++++++ signal.py | 109 ++++++++++++++++ validation.py | 226 +++++++++++++++++++++++++++++++++ 11 files changed, 1648 insertions(+) create mode 100644 README.md create mode 100644 TS因子挖掘构建流程.md create mode 100644 backtest.py create mode 100644 combination.py create mode 100644 data.py create mode 100644 example.py create mode 100644 factors.py create mode 100644 pipeline.py create mode 100644 signal.py create mode 100644 validation.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..8aa16db --- /dev/null +++ b/README.md @@ -0,0 +1,211 @@ +# 时间序列因子挖掘框架 + +一套简洁、灵活的时间序列因子挖掘、检验、回测、信号生成框架。 + +## 特性 + +- **流程化设计**:清晰的步骤划分,易于理解和扩展 +- **灵活度高**:支持自定义因子、权重方法、信号规则 +- **代码简洁**:避免过度设计,核心逻辑清晰 +- **完整流程**:从数据预处理到信号生成的完整链路 + +## 项目结构 + +``` +factorhack/ +├── data.py # 数据加载和预处理 +├── factors.py # 因子挖掘(规则因子、GP因子) +├── validation.py # 因子检验(IC、分组回测、回归) +├── combination.py # 因子组合(多因子模型) +├── backtest.py # 回测引擎 +├── signal.py # 信号生成 +├── pipeline.py # 主流程 +├── example.py # 使用示例 +└── README.md # 说明文档 +``` + +## 快速开始 + +### 1. 安装依赖 + +```bash +pip install pandas numpy scipy statsmodels +``` + +### 2. 基本使用 + +```python +from pipeline import FactorPipeline + +# 创建流程 +pipeline = FactorPipeline( + ret_horizon=1, # 未来1期收益率 + ic_window=30, # IC计算窗口 + commission=0.001, # 手续费0.1% + slippage=0.0005 # 滑点0.05% +) + +# 运行完整流程 +results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, # 最小IC阈值 + min_tstat=1.5, # 最小t统计量 + weight_method='risk_parity', + buy_threshold=0.8, + sell_threshold=-0.8 +) +``` + +### 3. 分步骤执行 + +```python +pipeline = FactorPipeline() + +# 步骤1:加载和预处理数据 +pipeline.load_and_preprocess("ETH_USDT-1h.feather") + +# 步骤2:因子挖掘 +pipeline.mine_factors() + +# 步骤3:因子检验 +pipeline.validate_factors(min_ic=0.01, min_tstat=1.5) + +# 步骤4:因子组合 +pipeline.combine_factors(weight_method='risk_parity') + +# 步骤5:生成信号 +signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8) + +# 步骤6:回测 +backtest_results = pipeline.backtest(signals) +``` + +## 核心模块说明 + +### 1. 数据模块 (`data.py`) + +- `load_data()`: 加载数据(支持feather和csv) +- `compute_technical_indicators()`: 计算技术指标 +- `preprocess_data()`: 数据预处理(异常值、缺失值、标准化) +- `compute_forward_returns()`: 计算未来收益率 + +### 2. 因子模块 (`factors.py`) + +- `BaseFactor`: 因子基类 +- `RuleFactor`: 规则因子 +- `FactorMiner`: 因子挖掘器 +- `create_default_factors()`: 创建默认因子集合 + +**默认因子**: +- `TREND`: 趋势因子 +- `VOL`: 波动率因子 +- `VOLP`: 量价因子 +- `REV`: 反转因子 +- `MOM`: 动量因子 +- `RSI`: RSI因子 + +### 3. 检验模块 (`validation.py`) + +- `compute_ic()`: 计算IC(信息系数) +- `compute_rolling_ic()`: 计算滚动IC +- `group_backtest()`: 分组回测 +- `factor_span_regression()`: 因子跨度回归 +- `validate_factor()`: 综合因子检验 + +### 4. 组合模块 (`combination.py`) + +- `risk_parity_weights()`: 风险平价权重 +- `regression_weights()`: 回归系数权重 +- `equal_weights()`: 等权重 +- `MultiFactorModel`: 多因子模型 + +### 5. 回测模块 (`backtest.py`) + +- `BacktestEngine`: 回测引擎 +- 支持手续费、滑点 +- 计算年化收益率、夏普比率、最大回撤、胜率等指标 + +### 6. 信号模块 (`signal.py`) + +- `generate_signals()`: 基于因子得分生成买卖信号 +- 支持滚动标准差阈值 +- 避免频繁交易 + +## 自定义扩展 + +### 添加自定义因子 + +```python +from factors import FactorMiner, RuleFactor +import pandas as pd + +def my_custom_factor(data: pd.DataFrame) -> pd.Series: + """自定义因子""" + return (data['close'] - data['ema8']) / data['ema8'] + +miner = create_default_factors() +miner.register_rule_factor('CUSTOM', my_custom_factor) +``` + +### 使用不同的权重方法 + +```python +# 风险平价 +pipeline.combine_factors(weight_method='risk_parity') + +# 回归系数 +pipeline.combine_factors(weight_method='regression') + +# 等权重 +pipeline.combine_factors(weight_method='equal') +``` + +### 自定义信号规则 + +```python +from signal import generate_signals + +signals = generate_signals( + score=pipeline.score, + buy_threshold=1.0, # 买入阈值 + sell_threshold=-1.0, # 卖出阈值 + window=30 # 滚动窗口 +) +``` + +## 数据格式要求 + +输入数据应包含以下列: +- `open`: 开盘价 +- `high`: 最高价 +- `low`: 最低价 +- `close`: 收盘价 +- `volume`: 成交量 + +可选时间列(用于设置索引): +- `datetime`, `time`, `timestamp`, `date` + +## 输出结果 + +流程完成后,可获得: +- **因子数据** (`factors`): 所有有效因子的时间序列 +- **综合得分** (`score`): 多因子综合得分 +- **检验结果** (`validation`): 各因子的IC、t统计量等 +- **回测结果** (`backtest`): 权益曲线、回测指标、交易记录 + +## 注意事项 + +1. 数据质量:确保输入数据无严重缺失和异常 +2. 参数调优:根据实际数据特点调整阈值和窗口参数 +3. 过拟合风险:避免在样本内过度优化参数 +4. 实盘差异:回测结果仅供参考,实盘可能存在滑点、延迟等问题 + +## 参考文档 + +- `TS因子挖掘构建流程.md`: 详细的因子挖掘理论和方法 +- `deap_factor_mining.py`: 基于遗传编程的因子挖掘示例 + +## 许可证 + +MIT License + diff --git a/TS因子挖掘构建流程.md b/TS因子挖掘构建流程.md new file mode 100644 index 0000000..6f53769 --- /dev/null +++ b/TS因子挖掘构建流程.md @@ -0,0 +1,121 @@ +# 基于BTC 4h数据的时间序列因子模型实操流程(因子挖掘→检验→回测→信号生成) +结合《ssrn.3255748.pdf》中时间序列(TS)因子模型的核心逻辑,以及高维高频金融数据建模的前沿方法(如投影主成分分析P-PCA),以下为针对BTC 4h数据的完整实操流程,包含每一步的理论原理、操作细节及论文引用依据。 + + +## 一、数据准备与预处理(基础步骤,确保数据质量) +### 1. 数据来源与变量选择 +- **核心数据**:BTC的4h级原始数据,至少包含“开盘价、收盘价、最高价、最低价、成交量、成交额”,时间跨度建议≥5年(如2018年1月-2023年12月,共约11325个4h数据点),来源可选择CoinGecko、Binance API等合规平台。 +- **扩展变量**:基于原始数据计算技术指标变量(作为因子候选),参考《ssrn.3255748.pdf》中“资产特征驱动因子”的逻辑(),具体包括: + - 收益类:4h收益率(\(R_t = \ln(Close_t/Close_{t-1})\))、滚动12期(48h)收益率标准差(波动率)、滚动6期(24h)收益率偏度(尾部风险); + - 趋势类:EMA(指数移动平均,如4期/8期/16期)、MACD(异同移动平均线)、RSI(相对强弱指数,14期); + - 量能类:成交量滚动6期均值、成交额/成交量比值(量价配合度); + - 波动类:ATR(平均真实波幅,14期)、高低价差率(\((High_t-Low_t)/Close_{t-1}\))。 + +### 2. 数据预处理(消除噪声与异常值) +- **异常值处理**:采用“3σ法则”识别异常收益率(如单日涨跌幅>20%的4h数据),用前后相邻数据的线性插值替换,避免极端值对因子估计的干扰——这与中山大学研究中“抑制高频特异性波动”的思路一致(摘要1、5),该研究指出高频数据中的异常波动会扭曲因子估计,需通过预处理降低噪声。 +- **缺失值填补**:若存在数据缺失(如交易所维护导致的断更),采用“前向填充+滚动均值平滑”(如用前3期均值填补),确保时间序列的连续性。 +- **标准化**:对所有候选因子变量进行“Z-score标准化”(\(X_{std}=(X-\mu)/\sigma\),其中\(\mu\)、\(\sigma\)为滚动30期(120h)的均值和标准差),避免量纲差异影响因子权重——参考《ssrn.3255748.pdf》中CS因子“标准化匹配TS因子标准差”的操作逻辑()。 + + +## 二、因子挖掘:基于时间序列逻辑构建候选因子 +### 1. 因子构建原则(贴合TS因子“预设规则、可解释”的核心特性) +根据《ssrn.3255748.pdf》中TS因子的构建逻辑(),BTC 4h因子需满足“基于固定规则、反映特定风险/收益驱动逻辑”,避免CS因子“月度优化、非可投资”的缺陷()。具体分为“基础因子”和“合成因子”两类: + +#### (1)基础因子:单一逻辑驱动的因子 +| 因子名称 | 因子逻辑 | 计算方式(4h频率) | 理论依据(论文关联) | +|----------|----------|----------------------|------------------------| +| 趋势因子(TREND) | 价格趋势方向,趋势向上则预期收益高 | \(TREND_t = I(Close_t > EMA_{16,t}) \times 1 + I(Close_t < EMA_{4,t}) \times (-1)\),其中\(I(\cdot)\)为指示函数 | 类似《ssrn.3255748.pdf》中“特征驱动收益”逻辑(),用EMA交叉反映趋势特征 | +| 波动率因子(VOL) | 波动率越高,风险溢价越高 | \(VOL_t = \text{滚动12期收益率标准差}\) | 对应中山大学研究中“高频波动捕捉风险”的思路(摘要1、5),波动率是高频金融数据的核心风险特征 | +| 量价因子(VOLP) | 量价配合度,成交量放大且价格上涨则动量强 | \(VOLP_t = I(Volume_t > \text{滚动6期Volume均值}) \times R_t\) | 参考《ssrn.3255748.pdf》中“动量因子(UMD)”的“收益+量能”逻辑(),UMD通过前期收益反映动量,此处叠加成交量增强信号 | +| 反转因子(REV) | 短期反转效应,过度上涨/下跌后预期回调 | \(REV_t = -R_{t-1}\)(前1期4h收益率的相反数) | 符合时间序列因子“单一特征驱动”的特性(),捕捉BTC短期(4h级)的反转风险 | + +#### (2)合成因子:多变量降维得到的综合因子 +采用中山大学研究提出的**投影主成分分析(P-PCA)** 构建合成因子(摘要1、5),该方法相比传统PCA能更有效利用特征变量信息,抑制高频噪声,具体步骤: +1. **选择输入变量**:将上述基础因子(TREND、VOL、VOLP、REV)及3个核心技术指标(MACD差值、RSI、ATR)作为P-PCA的输入矩阵\(X_{T \times K}\)(T为时间维度,K=7为变量维度)。 +2. **投影步骤**:根据P-PCA理论(摘要1、5),先将输入变量投影到“可观测特征空间”(此处选择“滚动12期收益率”作为工具变量,反映BTC收益的长期动态),得到投影矩阵\(P\); +3. **提取主成分**:对投影后的矩阵\(P \times X\)进行PCA,取前2个主成分(累计方差解释率需≥80%)作为合成因子: + - 合成因子1(PC1):命名为“趋势-量能因子”,权重集中在TREND、VOLP,反映趋势与量能的协同效应; + - 合成因子2(PC2):命名为“风险因子”,权重集中在VOL、ATR,反映4h级的风险暴露程度。 +4. **因子方向校准**:通过“因子与未来1期收益率的相关性”调整方向(如PC1与\(R_{t+1}\)正相关则保留原方向,负相关则取反),确保因子值越高,预期收益越高——贴合《ssrn.3255748.pdf》中“因子收益差为正”的TS因子设计逻辑(如HML=高BM收益-低BM收益)()。 + + +## 三、因子检验:验证因子的有效性与稳健性 +### 1. 因子收益检验(核心:因子能否区分未来收益) +参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的检验逻辑(),对每个候选因子进行“分组回测”,步骤如下: +- **分组规则**:每月(按4h频率约180个数据点)将BTC 4h数据按因子值分为3组(低因子组L、中因子组M、高因子组H); +- **计算组收益**:每组的4h收益为该组内因子值对应的BTC收益率(因仅单标的,此处为“因子值分位数对应的收益”,如高因子组H为因子值前30%的4h数据的平均收益); +- **检验指标**: + - 因子收益差:\(H-L\)收益(高因子组收益 - 低因子组收益),若显著为正,说明因子能区分收益; + - t统计量:用Newey-West调整的t统计量(滞后6期,对应24h)检验\(H-L\)收益的显著性(避免自相关导致的虚假显著),参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的计算方式()。 + +**示例结果要求**:如趋势因子(TREND)的\(H-L\)收益为0.35%/4h(年化约84%),t统计量=2.89(>2,显著),说明该因子有效。 + +### 2. 因子跨度回归(检验因子的边际解释力) +根据《ssrn.3255748.pdf》中“因子跨度回归”的核心逻辑(),检验单个因子能否被其他因子替代,步骤如下: +- **回归模型**:以BTC未来1期4h收益率\(R_{t+1}\)为因变量,以候选因子及控制变量为自变量,构建时间序列回归: + \(R_{t+1} = \alpha + \beta_1 F_1_t + \beta_2 F_2_t + ... + \beta_k F_k_t + e_{t+1}\) + 其中\(F_1,F_2,...F_k\)为候选因子,\(\alpha\)为定价误差,\(\beta_i\)为因子载荷。 +- **检验标准**:若某因子的\(\beta_i\)显著不为0(t统计量>2),且加入该因子后模型\(R^2\)提升≥5%,说明该因子具有“不可替代的边际解释力”,未被其他因子吸收——类似《ssrn.3255748.pdf》中TS因子“市场、规模因子边际信息显著”的结论()。 + +**示例**:若加入合成因子PC1后,模型\(R^2\)从0.12提升至0.18,PC1的\(\beta=0.25\)(t=3.12),说明PC1具有独立解释力。 + +### 3. 稳健性检验(排除偶然因素) +- **样本外检验**:将数据分为“训练集(2018-2021年)”和“样本外集(2022-2023年)”,若因子在样本外的\(H-L\)收益t统计量仍>1.8(接近显著),说明因子稳健——参考《ssrn.3255748.pdf》中“跨样本验证因子表现”的逻辑()。 +- **频率敏感性检验**:将4h频率调整为2h或8h,若因子收益差的显著性变化≤20%,说明因子不受频率小幅变动影响——符合中山大学研究中“高频因子需跨频率稳健”的要求(摘要1、5)。 + + +## 四、因子组合:构建多因子模型(贴合TS模型“常数载荷、可投资”特性) +### 1. 因子权重确定(避免CS模型“月度优化”的复杂性) +根据《ssrn.3255748.pdf》中TS因子模型“预设因子权重、常数载荷”的逻辑(),采用“风险平价”或“回归系数加权”,避免动态优化导致的过拟合: +- **风险平价加权**:使每个因子的“风险贡献相等”(因子风险贡献=因子权重×因子波动率×因子与收益的相关性),公式为: + \(w_i = \frac{1/\sigma_i}{\sum_{j=1}^n 1/\sigma_j}\),其中\(\sigma_i\)为因子\(F_i\)的滚动30期波动率; + 该方法确保单一因子不会过度主导组合,贴合《ssrn.3255748.pdf》中“多因子分散风险”的思路(如FF五因子模型的等权重逻辑)()。 +- **回归系数加权**:用训练集的时间序列回归系数作为权重(如因子\(F_i\)的权重\(w_i = \beta_i / \sum_{j=1}^n |\beta_j|\),\(\beta_i\)为\(R_{t+1}\)对\(F_i\)的回归系数),确保权重与因子解释力正相关。 + +### 2. 多因子综合得分(最终信号输入) +将筛选后的有效因子(如TREND、VOLP、PC1、PC2)按权重合并,得到4h级的“多因子综合得分”: +\(Score_t = \sum_{i=1}^n w_i \times F_{i,t}\) +其中\(F_{i,t}\)为标准化后的因子值,\(Score_t\)越高,代表未来1期(4h)BTC上涨概率越大——该得分对应《ssrn.3255748.pdf》中“因子组合预测收益”的逻辑(),通过多因子协同提升预测准确性。 + + +## 五、回测:验证因子模型的实战有效性 +### 1. 回测框架设计(贴合TS因子“可投资”的核心优势) +参考《ssrn.3255748.pdf》中“资产定价检验”的回测逻辑(),采用“等仓单边交易”(仅做多/做空BTC,无杠杆),避免CS因子“高杠杆”的非可投资性(),具体参数: +- **调仓频率**:4h调仓(与因子频率一致),每个4h周期根据\(Score_t\)生成交易信号; +- **手续费**:按0.1%/次(现货交易手续费,参考Binance等平台),滑点按0.05%/次(4h级BTC流动性充足,滑点较低); +- **回测区间**:2019年1月-2023年12月(共约4680个4h数据点,包含牛熊周期,检验模型适应性)。 + +### 2. 交易信号规则(基于多因子得分的阈值策略) +根据《ssrn.3255748.pdf》中“因子值与收益正相关”的结论(),设定阈值生成买卖信号: +- **买入信号**:当\(Score_t > 0.8\sigma_{Score}\)(\(\sigma_{Score}\)为Score的滚动30期标准差),且前1期无持仓时,买入BTC(满仓); +- **卖出信号**:当\(Score_t < -0.8\sigma_{Score}\),且前1期有持仓时,卖出BTC(空仓); +- **观望信号**:当\(Score_t\)在\([-0.8\sigma_{Score}, 0.8\sigma_{Score}]\)之间,维持原有持仓(避免频繁交易)。 + +### 3. 回测指标与评估(参考论文中的资产定价检验指标) +采用《ssrn.3255748.pdf》中“平均收益、夏普比率、最大回撤”等核心指标(),同时加入高频数据特有的“胜率、盈亏比”,具体如下: +| 回测指标 | 计算方式 | 合格标准(BTC 4h策略) | +|----------|----------|--------------------------| +| 年化收益率 | \((1+\text{累计收益})^{252×6/24} - 1\)(假设年交易252天,每天6个4h周期) | >30%(跑赢BTC现货年化收益) | +| 夏普比率 | 年化收益率 / 年化波动率 | >1.5(风险调整收益优秀) | +| 最大回撤 | 回测期间最大亏损幅度 | <50%(控制极端风险) | +| 胜率 | 盈利交易次数 / 总交易次数 | >55%(信号准确性高) | + +**示例结果**:若回测得到年化收益45%、夏普比率1.8、最大回撤42%、胜率58%,说明模型有效——类似《ssrn.3255748.pdf》中“TS因子模型解释力达标”的实证结论()。 + + +## 六、信号优化与上线:动态适应市场变化 +### 1. 因子载荷动态调整(参考“时变载荷”的改进思路) +虽然《ssrn.3255748.pdf》中TS模型默认“常数载荷”,但中山大学研究指出“因子载荷时变能提升预测准确性”(摘要1、5),因此可引入“滚动窗口调整权重”: +- 每30天(180个4h周期)重新估计因子权重(如风险平价权重的波动率用最新30期数据),避免因子失效(如BTC在牛熊周期中,波动率因子的重要性会变化); +- 若某因子连续60个4h周期(10天)的\(H-L\)收益t统计量<1.0,暂时剔除该因子,待其恢复显著性后重新加入——贴合《ssrn.3255748.pdf》中“因子边际信息动态检验”的逻辑()。 + +### 2. 实盘上线与监控 +- **信号输出**:每4h生成“Score_t”及对应买卖信号,通过API对接交易所(如Binance Spot API)实现自动交易; +- **风险监控**:实时监控“因子有效性指标”(如当前因子的\(H-L\)收益t统计量、模型\(R^2\)),若指标连续3天不达标(如t统计量<1.2),暂停自动交易,人工排查原因(如市场结构变化导致因子失效); +- **日志记录**:保存每4h的因子值、信号、交易结果,每月进行回测复盘,对比实盘与回测的差异,优化因子参数(如调整EMA周期、阈值系数)。 + + +## 七、关键论文引用与理论支撑总结 +1. 《ssrn.3255748.pdf》(Fama & French 2018):核心支撑TS因子“预设规则、可投资、常数斜率回归检验”的逻辑,指导因子构建、检验、回测的整体框架(、、); +2. 中山大学《高维、高频金融数据的因子建模》(摘要1、5):提供P-PCA合成因子、高频噪声处理、时变载荷调整的方法,解决BTC 4h高频数据的因子估计问题; +3. 国家金融与发展实验室《收益率曲线三因子模型》(摘要2):借鉴“因子解释度、动态调整”的思路,用于合成因子的方差解释率检验和权重动态优化。 \ No newline at end of file diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..2c2940c --- /dev/null +++ b/backtest.py @@ -0,0 +1,180 @@ +""" +回测模块 +""" +import numpy as np +import pandas as pd +from typing import Dict, Optional, Tuple + + +class BacktestEngine: + """回测引擎""" + + def __init__( + self, + commission: float = 0.001, # 手续费率 + slippage: float = 0.0005, # 滑点 + initial_capital: float = 10000.0 + ): + self.commission = commission + self.slippage = slippage + self.initial_capital = initial_capital + + def run( + self, + signals: pd.Series, + price: pd.Series, + score: Optional[pd.Series] = None + ) -> Dict: + """ + 运行回测 + + Parameters: + ----------- + signals : Series + 交易信号:1=买入,-1=卖出,0=持有 + price : Series + 价格序列 + score : Series, optional + 因子得分(用于记录) + + Returns: + -------- + dict: 回测结果 + """ + # 对齐数据 + aligned = pd.concat([signals, price], axis=1).dropna() + aligned.columns = ['signal', 'price'] + + if score is not None: + aligned = pd.concat([aligned, score], axis=1) + aligned.columns = ['signal', 'price', 'score'] + + # 向量化优化:先计算价格变化率 + price_pct = aligned['price'].pct_change().fillna(0) + + # 初始化 + capital = self.initial_capital + position = 0 # 持仓:0=空仓,1=满仓 + equity = np.zeros(len(aligned)) + equity[0] = capital + trades = [] + buy_price = None # 记录买入价格 + + # 检测信号变化点(向量化) + signal_changes = aligned['signal'].diff().fillna(0) != 0 + + # 遍历处理(优化:只在信号变化时处理) + for i in range(1, len(aligned)): + current_signal = aligned['signal'].iloc[i] + current_price = aligned['price'].iloc[i] + prev_signal = aligned['signal'].iloc[i-1] + + # 计算收益率(基于价格变化) + if position == 1: + period_return = price_pct.iloc[i] + else: + period_return = 0 + + # 交易逻辑(只在信号变化时处理) + if signal_changes.iloc[i]: + if current_signal == 1 and position == 0: # 买入 + # 扣除手续费和滑点 + cost = self.commission + self.slippage + capital *= (1 - cost) + position = 1 + buy_price = current_price + trades.append({ + 'date': aligned.index[i], + 'action': 'buy', + 'price': current_price, + 'capital': capital + }) + elif current_signal == -1 and position == 1: # 卖出 + # 扣除手续费和滑点 + cost = self.commission + self.slippage + capital *= (1 - cost) + position = 0 + buy_price = None + trades.append({ + 'date': aligned.index[i], + 'action': 'sell', + 'price': current_price, + 'capital': capital + }) + + # 更新权益 + if position == 1 and buy_price is not None: + equity[i] = capital * (current_price / buy_price) + else: + equity[i] = capital + + equity_series = pd.Series(equity, index=aligned.index) + returns_series = price_pct * (aligned['signal'].shift(1) == 1).astype(int) + + # 计算回测指标 + metrics = self._calculate_metrics(equity_series, returns_series, len(trades)) + + return { + 'equity': equity_series, + 'returns': returns_series, + 'trades': trades, + 'metrics': metrics, + 'final_capital': equity_series.iloc[-1] if len(equity_series) > 0 else self.initial_capital + } + + def _calculate_metrics( + self, + equity: pd.Series, + returns: pd.Series, + num_trades: int = 0 + ) -> Dict: + """计算回测指标""" + if len(equity) == 0 or len(returns) == 0: + return {} + + # 总收益率 + total_return = (equity.iloc[-1] / equity.iloc[0] - 1) if len(equity) > 0 else 0 + + # 年化收益率(假设每天6个4h周期,一年252个交易日) + periods_per_year = 252 * 6 + n_periods = len(returns) + if n_periods > 0: + annual_return = (1 + total_return) ** (periods_per_year / n_periods) - 1 + else: + annual_return = 0 + + # 年化波动率 + annual_vol = returns.std() * np.sqrt(periods_per_year) + + # 夏普比率 + sharpe = annual_return / (annual_vol + 1e-8) + + # 最大回撤 + cummax = equity.cummax() + drawdown = (equity - cummax) / cummax + max_drawdown = drawdown.min() + + # 胜率(基于实际交易) + # 只计算有持仓期间的收益率 + position_returns = returns[returns != 0] + winning_trades = (position_returns > 0).sum() + win_rate = winning_trades / len(position_returns) if len(position_returns) > 0 else 0 + + # 盈亏比 + positive_returns = position_returns[position_returns > 0] + negative_returns = position_returns[position_returns < 0] + avg_win = positive_returns.mean() if len(positive_returns) > 0 else 0 + avg_loss = abs(negative_returns.mean()) if len(negative_returns) > 0 else 0 + profit_loss_ratio = avg_win / (avg_loss + 1e-8) + + return { + 'total_return': total_return, + 'annual_return': annual_return, + 'annual_volatility': annual_vol, + 'sharpe_ratio': sharpe, + 'max_drawdown': max_drawdown, + 'win_rate': win_rate, + 'profit_loss_ratio': profit_loss_ratio, + 'total_trades': num_trades # 实际交易次数 + } + diff --git a/combination.py b/combination.py new file mode 100644 index 0000000..9d4bccd --- /dev/null +++ b/combination.py @@ -0,0 +1,152 @@ +""" +因子组合模块:多因子模型 +""" +import numpy as np +import pandas as pd +from typing import Dict, List, Optional +from statsmodels.regression.linear_model import OLS + + +def risk_parity_weights(factors: pd.DataFrame, window: int = 30) -> pd.Series: + """ + 风险平价加权:使每个因子的风险贡献相等 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + window : int + 计算波动率的滚动窗口 + """ + # 计算各因子的滚动波动率 + volatilities = factors.rolling(window).std().iloc[-1] + + # 风险平价权重:w_i = 1/σ_i / Σ(1/σ_j) + inv_vol = 1.0 / (volatilities + 1e-8) + weights = inv_vol / inv_vol.sum() + + return weights + + +def regression_weights( + factors: pd.DataFrame, + forward_return: pd.Series, + window: Optional[int] = None +) -> pd.Series: + """ + 回归系数加权:用回归系数作为权重 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + forward_return : Series + 未来收益率 + window : int, optional + 滚动窗口,None表示使用全样本 + """ + # 对齐数据 + data = pd.concat([factors, forward_return], axis=1).dropna() + + if window is not None and len(data) > window: + data = data.iloc[-window:] + + if len(data) < 30: + # 如果数据不足,返回等权重 + return pd.Series(1.0 / len(factors.columns), index=factors.columns) + + y = data.iloc[:, -1].values + X = data.iloc[:, :-1].values + + try: + model = OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + betas = pd.Series(model.params, index=factors.columns) + + # 归一化权重(取绝对值后归一化) + weights = np.abs(betas) / (np.abs(betas).sum() + 1e-8) + return weights + except Exception as e: + print(f"回归权重计算出错: {e}") + # 返回等权重 + return pd.Series(1.0 / len(factors.columns), index=factors.columns) + + +def equal_weights(factors: pd.DataFrame) -> pd.Series: + """等权重""" + n = len(factors.columns) + return pd.Series(1.0 / n, index=factors.columns) + + +class MultiFactorModel: + """多因子模型""" + + def __init__(self, weight_method: str = 'risk_parity'): + """ + Parameters: + ----------- + weight_method : str + 权重方法:'risk_parity', 'regression', 'equal' + """ + self.weight_method = weight_method + self.weights: Optional[pd.Series] = None + self.factor_names: List[str] = [] + + def fit( + self, + factors: pd.DataFrame, + forward_return: Optional[pd.Series] = None, + window: Optional[int] = None + ): + """ + 拟合多因子模型 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + forward_return : Series, optional + 未来收益率(回归权重需要) + window : int, optional + 滚动窗口 + """ + self.factor_names = list(factors.columns) + + if self.weight_method == 'risk_parity': + self.weights = risk_parity_weights(factors, window=window or 30) + elif self.weight_method == 'regression': + if forward_return is None: + raise ValueError("回归权重需要提供forward_return") + self.weights = regression_weights(factors, forward_return, window=window) + elif self.weight_method == 'equal': + self.weights = equal_weights(factors) + else: + raise ValueError(f"未知的权重方法: {self.weight_method}") + + def predict(self, factors: pd.DataFrame) -> pd.Series: + """ + 计算多因子综合得分 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + + Returns: + -------- + Series: 综合得分 + """ + if self.weights is None: + raise ValueError("模型尚未拟合,请先调用fit()") + + # 确保因子顺序一致 + factors_aligned = factors[self.factor_names] + + # 计算加权得分 + score = (factors_aligned * self.weights).sum(axis=1) + + return score + + def get_weights(self) -> pd.Series: + """获取因子权重""" + return self.weights.copy() if self.weights is not None else pd.Series() + diff --git a/data.py b/data.py new file mode 100644 index 0000000..bb0c621 --- /dev/null +++ b/data.py @@ -0,0 +1,138 @@ +""" +数据加载和预处理模块 +""" +import numpy as np +import pandas as pd +from typing import Optional, List, Dict + + +def load_data(file_path: str) -> pd.DataFrame: + """加载数据文件(支持feather和csv格式)""" + if file_path.endswith('.feather'): + df = pd.read_feather(file_path) + elif file_path.endswith('.csv'): + df = pd.read_csv(file_path) + else: + raise ValueError(f"不支持的文件格式: {file_path}") + + # 尝试解析时间索引 + for col in ['datetime', 'time', 'timestamp', 'date']: + if col in df.columns: + df[col] = pd.to_datetime(df[col]) + df = df.set_index(col).sort_index() + break + + return df + + +def compute_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: + """计算技术指标作为候选因子""" + data = df.copy() + + # 收益率 + data['return'] = np.log(data['close'] / data['close'].shift(1)) + + # 波动率(滚动12期标准差) + data['volatility'] = data['return'].rolling(12).std() + + # 偏度(滚动6期) + data['skewness'] = data['return'].rolling(6).skew() + + # EMA + data['ema4'] = data['close'].ewm(span=4, adjust=False).mean() + data['ema8'] = data['close'].ewm(span=8, adjust=False).mean() + data['ema16'] = data['close'].ewm(span=16, adjust=False).mean() + + # MACD + ema12 = data['close'].ewm(span=12, adjust=False).mean() + ema26 = data['close'].ewm(span=26, adjust=False).mean() + data['macd'] = ema12 - ema26 + + # RSI + delta = data['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(14).mean() + loss = (-delta.where(delta < 0, 0)).rolling(14).mean() + rs = gain / loss + data['rsi'] = 100 - (100 / (1 + rs)) + + # 成交量指标 + data['volume_ma6'] = data['volume'].rolling(6).mean() + data['volume_ratio'] = data['volume'] / (data['volume_ma6'] + 1e-8) + + # ATR + high_low = data['high'] - data['low'] + high_close = np.abs(data['high'] - data['close'].shift()) + low_close = np.abs(data['low'] - data['close'].shift()) + ranges = pd.concat([high_low, high_close, low_close], axis=1) + true_range = ranges.max(axis=1) + data['atr'] = true_range.rolling(14).mean() + + # 高低价差率 + data['price_range'] = (data['high'] - data['low']) / (data['close'].shift(1) + 1e-8) + + return data + + +def preprocess_data( + df: pd.DataFrame, + outlier_threshold: float = 3.0, + fill_method: str = 'ffill', + normalize_window: int = 30 +) -> pd.DataFrame: + """ + 数据预处理:异常值处理、缺失值填补、标准化 + + Parameters: + ----------- + df : DataFrame + 原始数据 + outlier_threshold : float + 异常值阈值(标准差倍数) + fill_method : str + 缺失值填充方法:'ffill', 'bfill', 'mean' + normalize_window : int + 标准化滚动窗口 + """ + data = df.copy() + + # 异常值处理(3σ法则)- 向量化优化 + numeric_cols = data.select_dtypes(include=[np.number]).columns + if 'return' in numeric_cols: + col = 'return' + mean = data[col].rolling(normalize_window, min_periods=1).mean() + std = data[col].rolling(normalize_window, min_periods=1).std() + mask = np.abs(data[col] - mean) > (outlier_threshold * std) + if mask.any(): + # 用前后相邻数据的线性插值替换 + data.loc[mask, col] = np.nan + data[col] = data[col].interpolate(method='linear') + + # 缺失值填补(向量化) + if fill_method == 'ffill': + data = data.ffill() + elif fill_method == 'bfill': + data = data.bfill() + elif fill_method == 'mean': + data = data.fillna(data.rolling(3, min_periods=1).mean()) + + # 标准化(滚动Z-score)- 向量化处理 + exclude_cols = {'open', 'high', 'low', 'close', 'volume'} + cols_to_normalize = [col for col in numeric_cols if col not in exclude_cols] + + if cols_to_normalize: + # 批量计算滚动均值和标准差 + rolling_mean = data[cols_to_normalize].rolling(normalize_window, min_periods=1).mean() + rolling_std = data[cols_to_normalize].rolling(normalize_window, min_periods=1).std() + 1e-8 + # 批量标准化 + normalized = (data[cols_to_normalize] - rolling_mean) / rolling_std + # 批量赋值 + for col in cols_to_normalize: + data[f'{col}_norm'] = normalized[col] + + return data + + +def compute_forward_returns(price: pd.Series, horizon: int = 1) -> pd.Series: + """计算未来收益率""" + return price.pct_change(horizon).shift(-horizon) + diff --git a/deap_factor_mining.py b/deap_factor_mining.py index 80f938c..12db657 100644 --- a/deap_factor_mining.py +++ b/deap_factor_mining.py @@ -123,6 +123,7 @@ def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped: pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}") # Ephemeral constants: scalar to array via broadcasting + # 随机加一个常数 不一定合理 def _const() -> np.ndarray: return np.array(random.uniform(-2.0, 2.0)) diff --git a/example.py b/example.py new file mode 100644 index 0000000..a57dc9b --- /dev/null +++ b/example.py @@ -0,0 +1,110 @@ +""" +使用示例:时间序列因子挖掘流程 +""" +from pipeline import FactorPipeline +from factors import FactorMiner, create_default_factors + +# 方式1:使用默认流程(最简单) +def example_simple(): + """简单示例""" + pipeline = FactorPipeline( + ret_horizon=1, # 未来1期收益率 + ic_window=30, # IC计算窗口 + commission=0.001, # 手续费0.1% + slippage=0.0005 # 滑点0.05% + ) + + # 运行完整流程 + results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, # 最小IC阈值 + min_tstat=1.5, # 最小t统计量 + weight_method='risk_parity', # 权重方法:risk_parity, regression, equal + buy_threshold=0.8, # 买入阈值(标准差倍数) + sell_threshold=-0.8 # 卖出阈值(标准差倍数) + ) + + return results + + +# 方式2:分步骤执行(更灵活) +def example_step_by_step(): + """分步骤示例""" + pipeline = FactorPipeline(ret_horizon=1, ic_window=30) + + # 步骤1:加载和预处理数据 + pipeline.load_and_preprocess("ETH_USDT-1h.feather") + + # 步骤2:因子挖掘(可以使用自定义因子) + custom_miner = create_default_factors() + # 可以在这里添加自定义因子 + # custom_miner.register_rule_factor('CUSTOM', your_custom_function) + pipeline.mine_factors(custom_miner) + + # 步骤3:因子检验 + pipeline.validate_factors(min_ic=0.01, min_tstat=1.5) + + # 步骤4:因子组合 + pipeline.combine_factors(weight_method='risk_parity') + + # 步骤5:生成信号 + signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8) + + # 步骤6:回测 + backtest_results = pipeline.backtest(signals) + + return { + 'factors': pipeline.factors, + 'score': pipeline.score, + 'signals': signals, + 'backtest': backtest_results + } + + +# 方式3:自定义因子 +def example_custom_factors(): + """自定义因子示例""" + from factors import RuleFactor + import pandas as pd + import numpy as np + + # 定义自定义因子函数 + def my_custom_factor(data: pd.DataFrame) -> pd.Series: + """自定义因子:价格与均线的距离""" + return (data['close'] - data['ema8']) / data['ema8'] + + # 创建因子挖掘器 + miner = create_default_factors() + + # 注册自定义因子 + miner.register_rule_factor('CUSTOM_DISTANCE', my_custom_factor) + + # 使用自定义因子挖掘器 + pipeline = FactorPipeline() + pipeline.load_and_preprocess("ETH_USDT-1h.feather") + pipeline.mine_factors(custom_miner=miner) + pipeline.validate_factors() + pipeline.combine_factors() + pipeline.backtest() + + return pipeline + + +if __name__ == "__main__": + # 运行简单示例 + print("运行简单示例...") + results = example_simple() + + # 保存结果 + if results['factors'] is not None: + results['factors'].to_csv("factors_output.csv") + print("\n因子数据已保存到 factors_output.csv") + + if results['score'] is not None: + results['score'].to_csv("score_output.csv") + print("综合得分已保存到 score_output.csv") + + if results['backtest'] is not None and 'equity' in results['backtest']: + results['backtest']['equity'].to_csv("equity_curve.csv") + print("权益曲线已保存到 equity_curve.csv") + diff --git a/factors.py b/factors.py new file mode 100644 index 0000000..44abb5b --- /dev/null +++ b/factors.py @@ -0,0 +1,113 @@ +""" +因子挖掘模块:支持规则因子和遗传编程因子 +""" +import numpy as np +import pandas as pd +from typing import Callable, Dict, List, Optional +from abc import ABC, abstractmethod + + +class BaseFactor(ABC): + """因子基类""" + + def __init__(self, name: str): + self.name = name + + @abstractmethod + def compute(self, data: pd.DataFrame) -> pd.Series: + """计算因子值""" + pass + + +class RuleFactor(BaseFactor): + """规则因子:基于固定规则""" + + def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]): + super().__init__(name) + self.compute_func = compute_func + + def compute(self, data: pd.DataFrame) -> pd.Series: + return self.compute_func(data) + + +def create_trend_factor(data: pd.DataFrame) -> pd.Series: + """趋势因子:价格趋势方向""" + trend = pd.Series(0, index=data.index) + trend[data['close'] > data['ema16']] = 1 + trend[data['close'] < data['ema4']] = -1 + return trend + + +def create_volatility_factor(data: pd.DataFrame) -> pd.Series: + """波动率因子:滚动12期收益率标准差""" + return data['volatility'] + + +def create_volume_price_factor(data: pd.DataFrame) -> pd.Series: + """量价因子:成交量放大且价格上涨""" + volume_signal = (data['volume'] > data['volume_ma6']).astype(int) + return volume_signal * data['return'] + + +def create_reversal_factor(data: pd.DataFrame) -> pd.Series: + """反转因子:短期反转效应""" + return -data['return'].shift(1) + + +def create_momentum_factor(data: pd.DataFrame) -> pd.Series: + """动量因子:基于MACD""" + return data['macd'] + + +def create_rsi_factor(data: pd.DataFrame) -> pd.Series: + """RSI因子:相对强弱指数(标准化)""" + return (data['rsi'] - 50) / 50 # 归一化到[-1, 1] + + +class FactorMiner: + """因子挖掘器""" + + def __init__(self): + self.factors: Dict[str, BaseFactor] = {} + + def register_factor(self, factor: BaseFactor): + """注册因子""" + self.factors[factor.name] = factor + + def register_rule_factor(self, name: str, compute_func: Callable): + """注册规则因子""" + factor = RuleFactor(name, compute_func) + self.register_factor(factor) + + def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame: + """计算所有因子""" + factor_df = pd.DataFrame(index=data.index) + + for name, factor in self.factors.items(): + try: + factor_df[name] = factor.compute(data) + except Exception as e: + print(f"计算因子 {name} 时出错: {e}") + factor_df[name] = np.nan + + return factor_df + + def get_factor(self, name: str) -> Optional[BaseFactor]: + """获取指定因子""" + return self.factors.get(name) + + +def create_default_factors() -> FactorMiner: + """创建默认因子集合""" + miner = FactorMiner() + + # 注册基础因子 + miner.register_rule_factor('TREND', create_trend_factor) + miner.register_rule_factor('VOL', create_volatility_factor) + miner.register_rule_factor('VOLP', create_volume_price_factor) + miner.register_rule_factor('REV', create_reversal_factor) + miner.register_rule_factor('MOM', create_momentum_factor) + miner.register_rule_factor('RSI', create_rsi_factor) + + return miner + diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..d7fc180 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,287 @@ +""" +主流程:时间序列因子挖掘、检验、回测、信号生成 +""" +import pandas as pd +from typing import Dict, List, Optional +import warnings +warnings.filterwarnings('ignore') + +from data import load_data, compute_technical_indicators, preprocess_data, compute_forward_returns +from factors import FactorMiner, create_default_factors +from validation import validate_factor, factor_span_regression +from combination import MultiFactorModel +from backtest import BacktestEngine +from signal import generate_signals + + +class FactorPipeline: + """因子挖掘流程""" + + def __init__( + self, + ret_horizon: int = 1, + ic_window: int = 30, + commission: float = 0.001, + slippage: float = 0.0005 + ): + """ + Parameters: + ----------- + ret_horizon : int + 未来收益率周期 + ic_window : int + IC计算窗口 + commission : float + 手续费率 + slippage : float + 滑点 + """ + self.ret_horizon = ret_horizon + self.ic_window = ic_window + self.commission = commission + self.slippage = slippage + + self.data: Optional[pd.DataFrame] = None + self.factors: Optional[pd.DataFrame] = None + self.forward_return: Optional[pd.Series] = None + self.factor_miner: Optional[FactorMiner] = None + self.validation_results: Dict = {} + self.model: Optional[MultiFactorModel] = None + self.score: Optional[pd.Series] = None + self.backtest_results: Optional[Dict] = None + + def load_and_preprocess(self, file_path: str) -> 'FactorPipeline': + """步骤1:加载和预处理数据""" + print("=" * 50) + print("步骤1:加载和预处理数据") + print("=" * 50) + + # 加载数据 + self.data = load_data(file_path) + print(f"加载数据: {len(self.data)} 条记录") + + # 计算技术指标 + self.data = compute_technical_indicators(self.data) + print("计算技术指标完成") + + # 预处理 + self.data = preprocess_data(self.data) + print("数据预处理完成") + + # 计算未来收益率 + self.forward_return = compute_forward_returns( + self.data['close'], + horizon=self.ret_horizon + ) + print(f"计算未来收益率完成(周期={self.ret_horizon})") + + return self + + def mine_factors(self, custom_miner: Optional[FactorMiner] = None) -> 'FactorPipeline': + """步骤2:因子挖掘""" + print("\n" + "=" * 50) + print("步骤2:因子挖掘") + print("=" * 50) + + if self.data is None: + raise ValueError("请先加载数据") + + # 使用自定义或默认因子挖掘器 + if custom_miner is None: + self.factor_miner = create_default_factors() + else: + self.factor_miner = custom_miner + + # 计算所有因子 + self.factors = self.factor_miner.compute_all_factors(self.data) + print(f"计算因子完成: {list(self.factors.columns)}") + + return self + + def validate_factors(self, min_ic: float = 0.01, min_tstat: float = 1.5) -> 'FactorPipeline': + """步骤3:因子检验""" + print("\n" + "=" * 50) + print("步骤3:因子检验") + print("=" * 50) + + if self.factors is None or self.forward_return is None: + raise ValueError("请先完成因子挖掘") + + valid_factors = [] + self.validation_results = {} + + for factor_name in self.factors.columns: + factor = self.factors[factor_name] + + # 综合检验 + result = validate_factor(factor, self.forward_return, ic_window=self.ic_window) + self.validation_results[factor_name] = result + + # 筛选有效因子 + if (abs(result['mean_ic']) >= min_ic and + abs(result['mean_h_l_tstat']) >= min_tstat): + valid_factors.append(factor_name) + print(f"\n因子 {factor_name}:") + print(f" 平均IC: {result['mean_ic']:.4f}") + print(f" IC信息比率: {result['ic_ir']:.4f}") + print(f" H-L收益差: {result['mean_h_l_return']:.4f}") + print(f" H-L t统计量: {result['mean_h_l_tstat']:.4f}") + else: + print(f"\n因子 {factor_name} 未通过检验 (IC={result['mean_ic']:.4f}, t={result['mean_h_l_tstat']:.4f})") + + # 只保留有效因子 + if valid_factors: + self.factors = self.factors[valid_factors] + print(f"\n有效因子: {valid_factors}") + else: + print("\n警告:没有因子通过检验!") + + return self + + def combine_factors( + self, + weight_method: str = 'risk_parity', + window: Optional[int] = None + ) -> 'FactorPipeline': + """步骤4:因子组合""" + print("\n" + "=" * 50) + print("步骤4:因子组合") + print("=" * 50) + + if self.factors is None or len(self.factors.columns) == 0: + raise ValueError("没有有效因子可组合") + + # 创建多因子模型 + self.model = MultiFactorModel(weight_method=weight_method) + self.model.fit( + self.factors, + forward_return=self.forward_return, + window=window + ) + + # 计算综合得分 + self.score = self.model.predict(self.factors) + + # 显示权重 + weights = self.model.get_weights() + print("因子权重:") + for name, weight in weights.items(): + print(f" {name}: {weight:.4f}") + + print(f"\n综合得分统计:") + print(f" 均值: {self.score.mean():.4f}") + print(f" 标准差: {self.score.std():.4f}") + + return self + + def generate_signals( + self, + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30 + ) -> pd.Series: + """步骤5:生成交易信号""" + if self.score is None: + raise ValueError("请先完成因子组合") + + signals = generate_signals( + self.score, + buy_threshold=buy_threshold, + sell_threshold=sell_threshold, + window=window + ) + + return signals + + def backtest( + self, + signals: Optional[pd.Series] = None, + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30 + ) -> Dict: + """步骤6:回测""" + print("\n" + "=" * 50) + print("步骤6:回测") + print("=" * 50) + + if self.data is None: + raise ValueError("请先加载数据") + + if signals is None: + signals = self.generate_signals(buy_threshold, sell_threshold, window) + + # 创建回测引擎 + engine = BacktestEngine( + commission=self.commission, + slippage=self.slippage + ) + + # 运行回测 + self.backtest_results = engine.run( + signals, + self.data['close'], + score=self.score + ) + + # 显示结果 + metrics = self.backtest_results['metrics'] + print("\n回测结果:") + print(f" 总收益率: {metrics.get('total_return', 0)*100:.2f}%") + print(f" 年化收益率: {metrics.get('annual_return', 0)*100:.2f}%") + print(f" 年化波动率: {metrics.get('annual_volatility', 0)*100:.2f}%") + print(f" 夏普比率: {metrics.get('sharpe_ratio', 0):.2f}") + print(f" 最大回撤: {metrics.get('max_drawdown', 0)*100:.2f}%") + print(f" 胜率: {metrics.get('win_rate', 0)*100:.2f}%") + print(f" 盈亏比: {metrics.get('profit_loss_ratio', 0):.2f}") + print(f" 交易次数: {metrics.get('total_trades', 0)}") + + return self.backtest_results + + def run_full_pipeline( + self, + file_path: str, + custom_miner: Optional[FactorMiner] = None, + min_ic: float = 0.01, + min_tstat: float = 1.5, + weight_method: str = 'risk_parity', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8 + ) -> Dict: + """运行完整流程""" + self.load_and_preprocess(file_path) \ + .mine_factors(custom_miner) \ + .validate_factors(min_ic, min_tstat) \ + .combine_factors(weight_method) \ + .backtest(buy_threshold=buy_threshold, sell_threshold=sell_threshold) + + return { + 'factors': self.factors, + 'score': self.score, + 'validation': self.validation_results, + 'backtest': self.backtest_results + } + + +if __name__ == "__main__": + # 示例使用 + pipeline = FactorPipeline(ret_horizon=1, ic_window=30) + + results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, + min_tstat=1.5, + weight_method='risk_parity', + buy_threshold=0.8, + sell_threshold=-0.8 + ) + + # 保存结果 + if results['factors'] is not None: + results['factors'].to_csv("factors.csv") + print("\n因子数据已保存到 factors.csv") + + if results['score'] is not None: + results['score'].to_csv("score.csv") + print("综合得分已保存到 score.csv") + diff --git a/signal.py b/signal.py new file mode 100644 index 0000000..5753ad8 --- /dev/null +++ b/signal.py @@ -0,0 +1,109 @@ +""" +信号生成模块 +""" +import numpy as np +import pandas as pd +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from pandas import Series + + +def generate_signals( + score: 'pd.Series', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30, + use_rolling_std: bool = True +) -> 'pd.Series': + """ + 基于因子得分生成买卖信号 + + Parameters: + ----------- + score : Series + 因子综合得分 + buy_threshold : float + 买入阈值(标准差倍数) + sell_threshold : float + 卖出阈值(标准差倍数) + window : int + 滚动窗口(用于计算标准差) + use_rolling_std : bool + 是否使用滚动标准差 + + Returns: + -------- + Series: 交易信号(1=买入,-1=卖出,0=持有) + """ + signals = pd.Series(0, index=score.index) + + if use_rolling_std: + # 使用滚动标准差 + rolling_std = score.rolling(window).std() + buy_line = buy_threshold * rolling_std + sell_line = sell_threshold * rolling_std + else: + # 使用固定阈值 + std = score.std() + buy_line = buy_threshold * std + sell_line = sell_threshold * std + + # 生成原始信号 + raw_signals = pd.Series(0, index=score.index) + raw_signals[score > buy_line] = 1 # 买入信号 + raw_signals[score < sell_line] = -1 # 卖出信号 + + # 只在信号变化时产生交易信号,其他时候保持持仓状态 + signals = pd.Series(0, index=score.index) + position = 0 # 当前持仓状态:0=空仓,1=满仓 + + for i in range(len(raw_signals)): + current_signal = raw_signals.iloc[i] + + # 只在信号变化时产生交易 + if current_signal == 1 and position == 0: + signals.iloc[i] = 1 # 买入 + position = 1 + elif current_signal == -1 and position == 1: + signals.iloc[i] = -1 # 卖出 + position = 0 + # 其他情况保持当前持仓状态,不产生交易信号 + + return signals.astype(int) + + +def generate_signals_with_position( + score: 'pd.Series', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30, + current_position: int = 0 +) -> 'pd.Series': + """ + 生成信号(考虑当前持仓状态) + + Parameters: + ----------- + current_position : int + 当前持仓:0=空仓,1=满仓 + """ + raw_signals = generate_signals(score, buy_threshold, sell_threshold, window) + signals = pd.Series(0, index=score.index) + + position = current_position + + for i in range(len(raw_signals)): + signal = raw_signals.iloc[i] + + if signal == 1 and position == 0: + signals.iloc[i] = 1 # 买入 + position = 1 + elif signal == -1 and position == 1: + signals.iloc[i] = -1 # 卖出 + position = 0 + else: + signals.iloc[i] = 0 # 持有 + + return signals + diff --git a/validation.py b/validation.py new file mode 100644 index 0000000..adf7990 --- /dev/null +++ b/validation.py @@ -0,0 +1,226 @@ +""" +因子检验模块:IC检验、分组回测、因子跨度回归 +""" +import numpy as np +import pandas as pd +from typing import Dict, List, Tuple +from statsmodels.regression.linear_model import OLS + + +def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series: + """ + 计算IC(信息系数) + + Parameters: + ----------- + factor : Series + 因子值 + forward_return : Series + 未来收益率 + method : str + 相关性计算方法:'spearman' 或 'pearson' + """ + aligned = pd.concat([factor, forward_return], axis=1).dropna() + if len(aligned) < 10: + return pd.Series(dtype=float) + + if method == 'spearman': + ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank()) + else: + ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1]) + + return pd.Series([ic], index=[aligned.index[-1]]) + + +def compute_rolling_ic( + factor: pd.Series, + forward_return: pd.Series, + window: int = 30, + method: str = 'spearman' +) -> pd.Series: + """计算滚动IC(向量化优化)""" + # 对齐数据 + aligned = pd.concat([factor, forward_return], axis=1).dropna() + if len(aligned) < window: + return pd.Series(dtype=float, index=factor.index[window:]) + + aligned.columns = ['factor', 'return'] + + if method == 'spearman': + # 使用rank计算Spearman相关性 + factor_rank = aligned['factor'].rank() + return_rank = aligned['return'].rank() + # 使用DataFrame的rolling().corr()方法 + df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank}) + ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return']) + else: + # Pearson相关性 + df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']}) + ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return']) + + return ic_series + + +def group_backtest( + factor: pd.Series, + forward_return: pd.Series, + n_groups: int = 3, + group_period: int = 180 +) -> Dict: + """ + 分组回测:将数据按因子值分组,计算各组收益 + + Returns: + -------- + dict: 包含各组收益、H-L收益差、t统计量等 + """ + aligned = pd.concat([factor, forward_return], axis=1).dropna() + aligned.columns = ['factor', 'return'] + + results = { + 'group_returns': [], + 'h_l_return': [], + 'h_l_tstat': [], + 'periods': [] + } + + # 按月分组(每180个4h周期)- 使用更高效的步长 + step = max(group_period // 2, 90) # 减少重叠计算 + for start in range(0, len(aligned) - group_period, step): + end = start + group_period + period_data = aligned.iloc[start:end] + + if len(period_data) < 30: + continue + + # 按因子值分组(向量化) + try: + period_data = period_data.copy() + period_data['group'] = pd.qcut( + period_data['factor'], + q=n_groups, + labels=False, + duplicates='drop' + ) + + # 计算各组收益(向量化) + group_returns = period_data.groupby('group')['return'].mean() + results['group_returns'].append(group_returns) + + # H-L收益差 + if len(group_returns) >= 2: + h_return = group_returns.iloc[-1] # 高因子组 + l_return = group_returns.iloc[0] # 低因子组 + h_l_diff = h_return - l_return + + results['h_l_return'].append(h_l_diff) + results['periods'].append(period_data.index[-1]) + except (ValueError, KeyError): + # qcut失败时跳过 + continue + + # 计算平均H-L收益和t统计量 + if results['h_l_return']: + h_l_series = pd.Series(results['h_l_return'], index=results['periods']) + mean_h_l = h_l_series.mean() + std_h_l = h_l_series.std() + t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8) + + results['mean_h_l_return'] = mean_h_l + results['mean_h_l_tstat'] = t_stat + results['h_l_series'] = h_l_series + else: + results['mean_h_l_return'] = 0 + results['mean_h_l_tstat'] = 0 + + return results + + +def factor_span_regression( + factors: pd.DataFrame, + forward_return: pd.Series, + target_factor: str +) -> Dict: + """ + 因子跨度回归:检验因子的边际解释力 + + Parameters: + ----------- + factors : DataFrame + 所有因子数据框 + forward_return : Series + 未来收益率 + target_factor : str + 目标因子名称 + + Returns: + -------- + dict: 包含回归系数、t统计量、R²等 + """ + # 对齐数据 + data = pd.concat([factors, forward_return], axis=1).dropna() + if len(data) < 30: + return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} + + y = data.iloc[:, -1].values + X_all = data.iloc[:, :-1].values + + # 全模型(包含目标因子) + try: + model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + r2_all = model_all.rsquared + + # 目标因子的系数和t统计量 + target_idx = factors.columns.get_loc(target_factor) + beta = model_all.params[target_idx] + tstat = model_all.tvalues[target_idx] + + # 不含目标因子的模型 + X_without = np.delete(X_all, target_idx, axis=1) + model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + r2_without = model_without.rsquared + + r2_change = r2_all - r2_without + + return { + 'beta': beta, + 'tstat': tstat, + 'r2': r2_all, + 'r2_change': r2_change, + 'pvalue': model_all.pvalues[target_idx] + } + except Exception as e: + print(f"回归分析出错: {e}") + return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} + + +def validate_factor( + factor: pd.Series, + forward_return: pd.Series, + ic_window: int = 30, + n_groups: int = 3 +) -> Dict: + """ + 综合因子检验 + + Returns: + -------- + dict: 包含IC、分组回测、显著性等指标 + """ + # IC检验 + rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window) + mean_ic = rolling_ic.mean() + ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率 + + # 分组回测 + group_result = group_backtest(factor, forward_return, n_groups=n_groups) + + return { + 'mean_ic': mean_ic, + 'ic_ir': ic_ir, + 'ic_series': rolling_ic, + 'mean_h_l_return': group_result['mean_h_l_return'], + 'mean_h_l_tstat': group_result['mean_h_l_tstat'], + 'group_returns': group_result['group_returns'] + } +