diff --git a/README.md b/README.md new file mode 100644 index 0000000..8aa16db --- /dev/null +++ b/README.md @@ -0,0 +1,211 @@ +# 时间序列因子挖掘框架 + +一套简洁、灵活的时间序列因子挖掘、检验、回测、信号生成框架。 + +## 特性 + +- **流程化设计**:清晰的步骤划分,易于理解和扩展 +- **灵活度高**:支持自定义因子、权重方法、信号规则 +- **代码简洁**:避免过度设计,核心逻辑清晰 +- **完整流程**:从数据预处理到信号生成的完整链路 + +## 项目结构 + +``` +factorhack/ +├── data.py # 数据加载和预处理 +├── factors.py # 因子挖掘(规则因子、GP因子) +├── validation.py # 因子检验(IC、分组回测、回归) +├── combination.py # 因子组合(多因子模型) +├── backtest.py # 回测引擎 +├── signal.py # 信号生成 +├── pipeline.py # 主流程 +├── example.py # 使用示例 +└── README.md # 说明文档 +``` + +## 快速开始 + +### 1. 安装依赖 + +```bash +pip install pandas numpy scipy statsmodels +``` + +### 2. 基本使用 + +```python +from pipeline import FactorPipeline + +# 创建流程 +pipeline = FactorPipeline( + ret_horizon=1, # 未来1期收益率 + ic_window=30, # IC计算窗口 + commission=0.001, # 手续费0.1% + slippage=0.0005 # 滑点0.05% +) + +# 运行完整流程 +results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, # 最小IC阈值 + min_tstat=1.5, # 最小t统计量 + weight_method='risk_parity', + buy_threshold=0.8, + sell_threshold=-0.8 +) +``` + +### 3. 分步骤执行 + +```python +pipeline = FactorPipeline() + +# 步骤1:加载和预处理数据 +pipeline.load_and_preprocess("ETH_USDT-1h.feather") + +# 步骤2:因子挖掘 +pipeline.mine_factors() + +# 步骤3:因子检验 +pipeline.validate_factors(min_ic=0.01, min_tstat=1.5) + +# 步骤4:因子组合 +pipeline.combine_factors(weight_method='risk_parity') + +# 步骤5:生成信号 +signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8) + +# 步骤6:回测 +backtest_results = pipeline.backtest(signals) +``` + +## 核心模块说明 + +### 1. 数据模块 (`data.py`) + +- `load_data()`: 加载数据(支持feather和csv) +- `compute_technical_indicators()`: 计算技术指标 +- `preprocess_data()`: 数据预处理(异常值、缺失值、标准化) +- `compute_forward_returns()`: 计算未来收益率 + +### 2. 因子模块 (`factors.py`) + +- `BaseFactor`: 因子基类 +- `RuleFactor`: 规则因子 +- `FactorMiner`: 因子挖掘器 +- `create_default_factors()`: 创建默认因子集合 + +**默认因子**: +- `TREND`: 趋势因子 +- `VOL`: 波动率因子 +- `VOLP`: 量价因子 +- `REV`: 反转因子 +- `MOM`: 动量因子 +- `RSI`: RSI因子 + +### 3. 检验模块 (`validation.py`) + +- `compute_ic()`: 计算IC(信息系数) +- `compute_rolling_ic()`: 计算滚动IC +- `group_backtest()`: 分组回测 +- `factor_span_regression()`: 因子跨度回归 +- `validate_factor()`: 综合因子检验 + +### 4. 组合模块 (`combination.py`) + +- `risk_parity_weights()`: 风险平价权重 +- `regression_weights()`: 回归系数权重 +- `equal_weights()`: 等权重 +- `MultiFactorModel`: 多因子模型 + +### 5. 回测模块 (`backtest.py`) + +- `BacktestEngine`: 回测引擎 +- 支持手续费、滑点 +- 计算年化收益率、夏普比率、最大回撤、胜率等指标 + +### 6. 信号模块 (`signal.py`) + +- `generate_signals()`: 基于因子得分生成买卖信号 +- 支持滚动标准差阈值 +- 避免频繁交易 + +## 自定义扩展 + +### 添加自定义因子 + +```python +from factors import FactorMiner, RuleFactor +import pandas as pd + +def my_custom_factor(data: pd.DataFrame) -> pd.Series: + """自定义因子""" + return (data['close'] - data['ema8']) / data['ema8'] + +miner = create_default_factors() +miner.register_rule_factor('CUSTOM', my_custom_factor) +``` + +### 使用不同的权重方法 + +```python +# 风险平价 +pipeline.combine_factors(weight_method='risk_parity') + +# 回归系数 +pipeline.combine_factors(weight_method='regression') + +# 等权重 +pipeline.combine_factors(weight_method='equal') +``` + +### 自定义信号规则 + +```python +from signal import generate_signals + +signals = generate_signals( + score=pipeline.score, + buy_threshold=1.0, # 买入阈值 + sell_threshold=-1.0, # 卖出阈值 + window=30 # 滚动窗口 +) +``` + +## 数据格式要求 + +输入数据应包含以下列: +- `open`: 开盘价 +- `high`: 最高价 +- `low`: 最低价 +- `close`: 收盘价 +- `volume`: 成交量 + +可选时间列(用于设置索引): +- `datetime`, `time`, `timestamp`, `date` + +## 输出结果 + +流程完成后,可获得: +- **因子数据** (`factors`): 所有有效因子的时间序列 +- **综合得分** (`score`): 多因子综合得分 +- **检验结果** (`validation`): 各因子的IC、t统计量等 +- **回测结果** (`backtest`): 权益曲线、回测指标、交易记录 + +## 注意事项 + +1. 数据质量:确保输入数据无严重缺失和异常 +2. 参数调优:根据实际数据特点调整阈值和窗口参数 +3. 过拟合风险:避免在样本内过度优化参数 +4. 实盘差异:回测结果仅供参考,实盘可能存在滑点、延迟等问题 + +## 参考文档 + +- `TS因子挖掘构建流程.md`: 详细的因子挖掘理论和方法 +- `deap_factor_mining.py`: 基于遗传编程的因子挖掘示例 + +## 许可证 + +MIT License + diff --git a/TS因子挖掘构建流程.md b/TS因子挖掘构建流程.md new file mode 100644 index 0000000..6f53769 --- /dev/null +++ b/TS因子挖掘构建流程.md @@ -0,0 +1,121 @@ +# 基于BTC 4h数据的时间序列因子模型实操流程(因子挖掘→检验→回测→信号生成) +结合《ssrn.3255748.pdf》中时间序列(TS)因子模型的核心逻辑,以及高维高频金融数据建模的前沿方法(如投影主成分分析P-PCA),以下为针对BTC 4h数据的完整实操流程,包含每一步的理论原理、操作细节及论文引用依据。 + + +## 一、数据准备与预处理(基础步骤,确保数据质量) +### 1. 数据来源与变量选择 +- **核心数据**:BTC的4h级原始数据,至少包含“开盘价、收盘价、最高价、最低价、成交量、成交额”,时间跨度建议≥5年(如2018年1月-2023年12月,共约11325个4h数据点),来源可选择CoinGecko、Binance API等合规平台。 +- **扩展变量**:基于原始数据计算技术指标变量(作为因子候选),参考《ssrn.3255748.pdf》中“资产特征驱动因子”的逻辑(),具体包括: + - 收益类:4h收益率(\(R_t = \ln(Close_t/Close_{t-1})\))、滚动12期(48h)收益率标准差(波动率)、滚动6期(24h)收益率偏度(尾部风险); + - 趋势类:EMA(指数移动平均,如4期/8期/16期)、MACD(异同移动平均线)、RSI(相对强弱指数,14期); + - 量能类:成交量滚动6期均值、成交额/成交量比值(量价配合度); + - 波动类:ATR(平均真实波幅,14期)、高低价差率(\((High_t-Low_t)/Close_{t-1}\))。 + +### 2. 数据预处理(消除噪声与异常值) +- **异常值处理**:采用“3σ法则”识别异常收益率(如单日涨跌幅>20%的4h数据),用前后相邻数据的线性插值替换,避免极端值对因子估计的干扰——这与中山大学研究中“抑制高频特异性波动”的思路一致(摘要1、5),该研究指出高频数据中的异常波动会扭曲因子估计,需通过预处理降低噪声。 +- **缺失值填补**:若存在数据缺失(如交易所维护导致的断更),采用“前向填充+滚动均值平滑”(如用前3期均值填补),确保时间序列的连续性。 +- **标准化**:对所有候选因子变量进行“Z-score标准化”(\(X_{std}=(X-\mu)/\sigma\),其中\(\mu\)、\(\sigma\)为滚动30期(120h)的均值和标准差),避免量纲差异影响因子权重——参考《ssrn.3255748.pdf》中CS因子“标准化匹配TS因子标准差”的操作逻辑()。 + + +## 二、因子挖掘:基于时间序列逻辑构建候选因子 +### 1. 因子构建原则(贴合TS因子“预设规则、可解释”的核心特性) +根据《ssrn.3255748.pdf》中TS因子的构建逻辑(),BTC 4h因子需满足“基于固定规则、反映特定风险/收益驱动逻辑”,避免CS因子“月度优化、非可投资”的缺陷()。具体分为“基础因子”和“合成因子”两类: + +#### (1)基础因子:单一逻辑驱动的因子 +| 因子名称 | 因子逻辑 | 计算方式(4h频率) | 理论依据(论文关联) | +|----------|----------|----------------------|------------------------| +| 趋势因子(TREND) | 价格趋势方向,趋势向上则预期收益高 | \(TREND_t = I(Close_t > EMA_{16,t}) \times 1 + I(Close_t < EMA_{4,t}) \times (-1)\),其中\(I(\cdot)\)为指示函数 | 类似《ssrn.3255748.pdf》中“特征驱动收益”逻辑(),用EMA交叉反映趋势特征 | +| 波动率因子(VOL) | 波动率越高,风险溢价越高 | \(VOL_t = \text{滚动12期收益率标准差}\) | 对应中山大学研究中“高频波动捕捉风险”的思路(摘要1、5),波动率是高频金融数据的核心风险特征 | +| 量价因子(VOLP) | 量价配合度,成交量放大且价格上涨则动量强 | \(VOLP_t = I(Volume_t > \text{滚动6期Volume均值}) \times R_t\) | 参考《ssrn.3255748.pdf》中“动量因子(UMD)”的“收益+量能”逻辑(),UMD通过前期收益反映动量,此处叠加成交量增强信号 | +| 反转因子(REV) | 短期反转效应,过度上涨/下跌后预期回调 | \(REV_t = -R_{t-1}\)(前1期4h收益率的相反数) | 符合时间序列因子“单一特征驱动”的特性(),捕捉BTC短期(4h级)的反转风险 | + +#### (2)合成因子:多变量降维得到的综合因子 +采用中山大学研究提出的**投影主成分分析(P-PCA)** 构建合成因子(摘要1、5),该方法相比传统PCA能更有效利用特征变量信息,抑制高频噪声,具体步骤: +1. **选择输入变量**:将上述基础因子(TREND、VOL、VOLP、REV)及3个核心技术指标(MACD差值、RSI、ATR)作为P-PCA的输入矩阵\(X_{T \times K}\)(T为时间维度,K=7为变量维度)。 +2. **投影步骤**:根据P-PCA理论(摘要1、5),先将输入变量投影到“可观测特征空间”(此处选择“滚动12期收益率”作为工具变量,反映BTC收益的长期动态),得到投影矩阵\(P\); +3. **提取主成分**:对投影后的矩阵\(P \times X\)进行PCA,取前2个主成分(累计方差解释率需≥80%)作为合成因子: + - 合成因子1(PC1):命名为“趋势-量能因子”,权重集中在TREND、VOLP,反映趋势与量能的协同效应; + - 合成因子2(PC2):命名为“风险因子”,权重集中在VOL、ATR,反映4h级的风险暴露程度。 +4. **因子方向校准**:通过“因子与未来1期收益率的相关性”调整方向(如PC1与\(R_{t+1}\)正相关则保留原方向,负相关则取反),确保因子值越高,预期收益越高——贴合《ssrn.3255748.pdf》中“因子收益差为正”的TS因子设计逻辑(如HML=高BM收益-低BM收益)()。 + + +## 三、因子检验:验证因子的有效性与稳健性 +### 1. 因子收益检验(核心:因子能否区分未来收益) +参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的检验逻辑(),对每个候选因子进行“分组回测”,步骤如下: +- **分组规则**:每月(按4h频率约180个数据点)将BTC 4h数据按因子值分为3组(低因子组L、中因子组M、高因子组H); +- **计算组收益**:每组的4h收益为该组内因子值对应的BTC收益率(因仅单标的,此处为“因子值分位数对应的收益”,如高因子组H为因子值前30%的4h数据的平均收益); +- **检验指标**: + - 因子收益差:\(H-L\)收益(高因子组收益 - 低因子组收益),若显著为正,说明因子能区分收益; + - t统计量:用Newey-West调整的t统计量(滞后6期,对应24h)检验\(H-L\)收益的显著性(避免自相关导致的虚假显著),参考《ssrn.3255748.pdf》中“因子平均收益t统计量”的计算方式()。 + +**示例结果要求**:如趋势因子(TREND)的\(H-L\)收益为0.35%/4h(年化约84%),t统计量=2.89(>2,显著),说明该因子有效。 + +### 2. 因子跨度回归(检验因子的边际解释力) +根据《ssrn.3255748.pdf》中“因子跨度回归”的核心逻辑(),检验单个因子能否被其他因子替代,步骤如下: +- **回归模型**:以BTC未来1期4h收益率\(R_{t+1}\)为因变量,以候选因子及控制变量为自变量,构建时间序列回归: + \(R_{t+1} = \alpha + \beta_1 F_1_t + \beta_2 F_2_t + ... + \beta_k F_k_t + e_{t+1}\) + 其中\(F_1,F_2,...F_k\)为候选因子,\(\alpha\)为定价误差,\(\beta_i\)为因子载荷。 +- **检验标准**:若某因子的\(\beta_i\)显著不为0(t统计量>2),且加入该因子后模型\(R^2\)提升≥5%,说明该因子具有“不可替代的边际解释力”,未被其他因子吸收——类似《ssrn.3255748.pdf》中TS因子“市场、规模因子边际信息显著”的结论()。 + +**示例**:若加入合成因子PC1后,模型\(R^2\)从0.12提升至0.18,PC1的\(\beta=0.25\)(t=3.12),说明PC1具有独立解释力。 + +### 3. 稳健性检验(排除偶然因素) +- **样本外检验**:将数据分为“训练集(2018-2021年)”和“样本外集(2022-2023年)”,若因子在样本外的\(H-L\)收益t统计量仍>1.8(接近显著),说明因子稳健——参考《ssrn.3255748.pdf》中“跨样本验证因子表现”的逻辑()。 +- **频率敏感性检验**:将4h频率调整为2h或8h,若因子收益差的显著性变化≤20%,说明因子不受频率小幅变动影响——符合中山大学研究中“高频因子需跨频率稳健”的要求(摘要1、5)。 + + +## 四、因子组合:构建多因子模型(贴合TS模型“常数载荷、可投资”特性) +### 1. 因子权重确定(避免CS模型“月度优化”的复杂性) +根据《ssrn.3255748.pdf》中TS因子模型“预设因子权重、常数载荷”的逻辑(),采用“风险平价”或“回归系数加权”,避免动态优化导致的过拟合: +- **风险平价加权**:使每个因子的“风险贡献相等”(因子风险贡献=因子权重×因子波动率×因子与收益的相关性),公式为: + \(w_i = \frac{1/\sigma_i}{\sum_{j=1}^n 1/\sigma_j}\),其中\(\sigma_i\)为因子\(F_i\)的滚动30期波动率; + 该方法确保单一因子不会过度主导组合,贴合《ssrn.3255748.pdf》中“多因子分散风险”的思路(如FF五因子模型的等权重逻辑)()。 +- **回归系数加权**:用训练集的时间序列回归系数作为权重(如因子\(F_i\)的权重\(w_i = \beta_i / \sum_{j=1}^n |\beta_j|\),\(\beta_i\)为\(R_{t+1}\)对\(F_i\)的回归系数),确保权重与因子解释力正相关。 + +### 2. 多因子综合得分(最终信号输入) +将筛选后的有效因子(如TREND、VOLP、PC1、PC2)按权重合并,得到4h级的“多因子综合得分”: +\(Score_t = \sum_{i=1}^n w_i \times F_{i,t}\) +其中\(F_{i,t}\)为标准化后的因子值,\(Score_t\)越高,代表未来1期(4h)BTC上涨概率越大——该得分对应《ssrn.3255748.pdf》中“因子组合预测收益”的逻辑(),通过多因子协同提升预测准确性。 + + +## 五、回测:验证因子模型的实战有效性 +### 1. 回测框架设计(贴合TS因子“可投资”的核心优势) +参考《ssrn.3255748.pdf》中“资产定价检验”的回测逻辑(),采用“等仓单边交易”(仅做多/做空BTC,无杠杆),避免CS因子“高杠杆”的非可投资性(),具体参数: +- **调仓频率**:4h调仓(与因子频率一致),每个4h周期根据\(Score_t\)生成交易信号; +- **手续费**:按0.1%/次(现货交易手续费,参考Binance等平台),滑点按0.05%/次(4h级BTC流动性充足,滑点较低); +- **回测区间**:2019年1月-2023年12月(共约4680个4h数据点,包含牛熊周期,检验模型适应性)。 + +### 2. 交易信号规则(基于多因子得分的阈值策略) +根据《ssrn.3255748.pdf》中“因子值与收益正相关”的结论(),设定阈值生成买卖信号: +- **买入信号**:当\(Score_t > 0.8\sigma_{Score}\)(\(\sigma_{Score}\)为Score的滚动30期标准差),且前1期无持仓时,买入BTC(满仓); +- **卖出信号**:当\(Score_t < -0.8\sigma_{Score}\),且前1期有持仓时,卖出BTC(空仓); +- **观望信号**:当\(Score_t\)在\([-0.8\sigma_{Score}, 0.8\sigma_{Score}]\)之间,维持原有持仓(避免频繁交易)。 + +### 3. 回测指标与评估(参考论文中的资产定价检验指标) +采用《ssrn.3255748.pdf》中“平均收益、夏普比率、最大回撤”等核心指标(),同时加入高频数据特有的“胜率、盈亏比”,具体如下: +| 回测指标 | 计算方式 | 合格标准(BTC 4h策略) | +|----------|----------|--------------------------| +| 年化收益率 | \((1+\text{累计收益})^{252×6/24} - 1\)(假设年交易252天,每天6个4h周期) | >30%(跑赢BTC现货年化收益) | +| 夏普比率 | 年化收益率 / 年化波动率 | >1.5(风险调整收益优秀) | +| 最大回撤 | 回测期间最大亏损幅度 | <50%(控制极端风险) | +| 胜率 | 盈利交易次数 / 总交易次数 | >55%(信号准确性高) | + +**示例结果**:若回测得到年化收益45%、夏普比率1.8、最大回撤42%、胜率58%,说明模型有效——类似《ssrn.3255748.pdf》中“TS因子模型解释力达标”的实证结论()。 + + +## 六、信号优化与上线:动态适应市场变化 +### 1. 因子载荷动态调整(参考“时变载荷”的改进思路) +虽然《ssrn.3255748.pdf》中TS模型默认“常数载荷”,但中山大学研究指出“因子载荷时变能提升预测准确性”(摘要1、5),因此可引入“滚动窗口调整权重”: +- 每30天(180个4h周期)重新估计因子权重(如风险平价权重的波动率用最新30期数据),避免因子失效(如BTC在牛熊周期中,波动率因子的重要性会变化); +- 若某因子连续60个4h周期(10天)的\(H-L\)收益t统计量<1.0,暂时剔除该因子,待其恢复显著性后重新加入——贴合《ssrn.3255748.pdf》中“因子边际信息动态检验”的逻辑()。 + +### 2. 实盘上线与监控 +- **信号输出**:每4h生成“Score_t”及对应买卖信号,通过API对接交易所(如Binance Spot API)实现自动交易; +- **风险监控**:实时监控“因子有效性指标”(如当前因子的\(H-L\)收益t统计量、模型\(R^2\)),若指标连续3天不达标(如t统计量<1.2),暂停自动交易,人工排查原因(如市场结构变化导致因子失效); +- **日志记录**:保存每4h的因子值、信号、交易结果,每月进行回测复盘,对比实盘与回测的差异,优化因子参数(如调整EMA周期、阈值系数)。 + + +## 七、关键论文引用与理论支撑总结 +1. 《ssrn.3255748.pdf》(Fama & French 2018):核心支撑TS因子“预设规则、可投资、常数斜率回归检验”的逻辑,指导因子构建、检验、回测的整体框架(、、); +2. 中山大学《高维、高频金融数据的因子建模》(摘要1、5):提供P-PCA合成因子、高频噪声处理、时变载荷调整的方法,解决BTC 4h高频数据的因子估计问题; +3. 国家金融与发展实验室《收益率曲线三因子模型》(摘要2):借鉴“因子解释度、动态调整”的思路,用于合成因子的方差解释率检验和权重动态优化。 \ No newline at end of file diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..2c2940c --- /dev/null +++ b/backtest.py @@ -0,0 +1,180 @@ +""" +回测模块 +""" +import numpy as np +import pandas as pd +from typing import Dict, Optional, Tuple + + +class BacktestEngine: + """回测引擎""" + + def __init__( + self, + commission: float = 0.001, # 手续费率 + slippage: float = 0.0005, # 滑点 + initial_capital: float = 10000.0 + ): + self.commission = commission + self.slippage = slippage + self.initial_capital = initial_capital + + def run( + self, + signals: pd.Series, + price: pd.Series, + score: Optional[pd.Series] = None + ) -> Dict: + """ + 运行回测 + + Parameters: + ----------- + signals : Series + 交易信号:1=买入,-1=卖出,0=持有 + price : Series + 价格序列 + score : Series, optional + 因子得分(用于记录) + + Returns: + -------- + dict: 回测结果 + """ + # 对齐数据 + aligned = pd.concat([signals, price], axis=1).dropna() + aligned.columns = ['signal', 'price'] + + if score is not None: + aligned = pd.concat([aligned, score], axis=1) + aligned.columns = ['signal', 'price', 'score'] + + # 向量化优化:先计算价格变化率 + price_pct = aligned['price'].pct_change().fillna(0) + + # 初始化 + capital = self.initial_capital + position = 0 # 持仓:0=空仓,1=满仓 + equity = np.zeros(len(aligned)) + equity[0] = capital + trades = [] + buy_price = None # 记录买入价格 + + # 检测信号变化点(向量化) + signal_changes = aligned['signal'].diff().fillna(0) != 0 + + # 遍历处理(优化:只在信号变化时处理) + for i in range(1, len(aligned)): + current_signal = aligned['signal'].iloc[i] + current_price = aligned['price'].iloc[i] + prev_signal = aligned['signal'].iloc[i-1] + + # 计算收益率(基于价格变化) + if position == 1: + period_return = price_pct.iloc[i] + else: + period_return = 0 + + # 交易逻辑(只在信号变化时处理) + if signal_changes.iloc[i]: + if current_signal == 1 and position == 0: # 买入 + # 扣除手续费和滑点 + cost = self.commission + self.slippage + capital *= (1 - cost) + position = 1 + buy_price = current_price + trades.append({ + 'date': aligned.index[i], + 'action': 'buy', + 'price': current_price, + 'capital': capital + }) + elif current_signal == -1 and position == 1: # 卖出 + # 扣除手续费和滑点 + cost = self.commission + self.slippage + capital *= (1 - cost) + position = 0 + buy_price = None + trades.append({ + 'date': aligned.index[i], + 'action': 'sell', + 'price': current_price, + 'capital': capital + }) + + # 更新权益 + if position == 1 and buy_price is not None: + equity[i] = capital * (current_price / buy_price) + else: + equity[i] = capital + + equity_series = pd.Series(equity, index=aligned.index) + returns_series = price_pct * (aligned['signal'].shift(1) == 1).astype(int) + + # 计算回测指标 + metrics = self._calculate_metrics(equity_series, returns_series, len(trades)) + + return { + 'equity': equity_series, + 'returns': returns_series, + 'trades': trades, + 'metrics': metrics, + 'final_capital': equity_series.iloc[-1] if len(equity_series) > 0 else self.initial_capital + } + + def _calculate_metrics( + self, + equity: pd.Series, + returns: pd.Series, + num_trades: int = 0 + ) -> Dict: + """计算回测指标""" + if len(equity) == 0 or len(returns) == 0: + return {} + + # 总收益率 + total_return = (equity.iloc[-1] / equity.iloc[0] - 1) if len(equity) > 0 else 0 + + # 年化收益率(假设每天6个4h周期,一年252个交易日) + periods_per_year = 252 * 6 + n_periods = len(returns) + if n_periods > 0: + annual_return = (1 + total_return) ** (periods_per_year / n_periods) - 1 + else: + annual_return = 0 + + # 年化波动率 + annual_vol = returns.std() * np.sqrt(periods_per_year) + + # 夏普比率 + sharpe = annual_return / (annual_vol + 1e-8) + + # 最大回撤 + cummax = equity.cummax() + drawdown = (equity - cummax) / cummax + max_drawdown = drawdown.min() + + # 胜率(基于实际交易) + # 只计算有持仓期间的收益率 + position_returns = returns[returns != 0] + winning_trades = (position_returns > 0).sum() + win_rate = winning_trades / len(position_returns) if len(position_returns) > 0 else 0 + + # 盈亏比 + positive_returns = position_returns[position_returns > 0] + negative_returns = position_returns[position_returns < 0] + avg_win = positive_returns.mean() if len(positive_returns) > 0 else 0 + avg_loss = abs(negative_returns.mean()) if len(negative_returns) > 0 else 0 + profit_loss_ratio = avg_win / (avg_loss + 1e-8) + + return { + 'total_return': total_return, + 'annual_return': annual_return, + 'annual_volatility': annual_vol, + 'sharpe_ratio': sharpe, + 'max_drawdown': max_drawdown, + 'win_rate': win_rate, + 'profit_loss_ratio': profit_loss_ratio, + 'total_trades': num_trades # 实际交易次数 + } + diff --git a/combination.py b/combination.py new file mode 100644 index 0000000..9d4bccd --- /dev/null +++ b/combination.py @@ -0,0 +1,152 @@ +""" +因子组合模块:多因子模型 +""" +import numpy as np +import pandas as pd +from typing import Dict, List, Optional +from statsmodels.regression.linear_model import OLS + + +def risk_parity_weights(factors: pd.DataFrame, window: int = 30) -> pd.Series: + """ + 风险平价加权:使每个因子的风险贡献相等 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + window : int + 计算波动率的滚动窗口 + """ + # 计算各因子的滚动波动率 + volatilities = factors.rolling(window).std().iloc[-1] + + # 风险平价权重:w_i = 1/σ_i / Σ(1/σ_j) + inv_vol = 1.0 / (volatilities + 1e-8) + weights = inv_vol / inv_vol.sum() + + return weights + + +def regression_weights( + factors: pd.DataFrame, + forward_return: pd.Series, + window: Optional[int] = None +) -> pd.Series: + """ + 回归系数加权:用回归系数作为权重 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + forward_return : Series + 未来收益率 + window : int, optional + 滚动窗口,None表示使用全样本 + """ + # 对齐数据 + data = pd.concat([factors, forward_return], axis=1).dropna() + + if window is not None and len(data) > window: + data = data.iloc[-window:] + + if len(data) < 30: + # 如果数据不足,返回等权重 + return pd.Series(1.0 / len(factors.columns), index=factors.columns) + + y = data.iloc[:, -1].values + X = data.iloc[:, :-1].values + + try: + model = OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + betas = pd.Series(model.params, index=factors.columns) + + # 归一化权重(取绝对值后归一化) + weights = np.abs(betas) / (np.abs(betas).sum() + 1e-8) + return weights + except Exception as e: + print(f"回归权重计算出错: {e}") + # 返回等权重 + return pd.Series(1.0 / len(factors.columns), index=factors.columns) + + +def equal_weights(factors: pd.DataFrame) -> pd.Series: + """等权重""" + n = len(factors.columns) + return pd.Series(1.0 / n, index=factors.columns) + + +class MultiFactorModel: + """多因子模型""" + + def __init__(self, weight_method: str = 'risk_parity'): + """ + Parameters: + ----------- + weight_method : str + 权重方法:'risk_parity', 'regression', 'equal' + """ + self.weight_method = weight_method + self.weights: Optional[pd.Series] = None + self.factor_names: List[str] = [] + + def fit( + self, + factors: pd.DataFrame, + forward_return: Optional[pd.Series] = None, + window: Optional[int] = None + ): + """ + 拟合多因子模型 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + forward_return : Series, optional + 未来收益率(回归权重需要) + window : int, optional + 滚动窗口 + """ + self.factor_names = list(factors.columns) + + if self.weight_method == 'risk_parity': + self.weights = risk_parity_weights(factors, window=window or 30) + elif self.weight_method == 'regression': + if forward_return is None: + raise ValueError("回归权重需要提供forward_return") + self.weights = regression_weights(factors, forward_return, window=window) + elif self.weight_method == 'equal': + self.weights = equal_weights(factors) + else: + raise ValueError(f"未知的权重方法: {self.weight_method}") + + def predict(self, factors: pd.DataFrame) -> pd.Series: + """ + 计算多因子综合得分 + + Parameters: + ----------- + factors : DataFrame + 因子数据框 + + Returns: + -------- + Series: 综合得分 + """ + if self.weights is None: + raise ValueError("模型尚未拟合,请先调用fit()") + + # 确保因子顺序一致 + factors_aligned = factors[self.factor_names] + + # 计算加权得分 + score = (factors_aligned * self.weights).sum(axis=1) + + return score + + def get_weights(self) -> pd.Series: + """获取因子权重""" + return self.weights.copy() if self.weights is not None else pd.Series() + diff --git a/data.py b/data.py new file mode 100644 index 0000000..bb0c621 --- /dev/null +++ b/data.py @@ -0,0 +1,138 @@ +""" +数据加载和预处理模块 +""" +import numpy as np +import pandas as pd +from typing import Optional, List, Dict + + +def load_data(file_path: str) -> pd.DataFrame: + """加载数据文件(支持feather和csv格式)""" + if file_path.endswith('.feather'): + df = pd.read_feather(file_path) + elif file_path.endswith('.csv'): + df = pd.read_csv(file_path) + else: + raise ValueError(f"不支持的文件格式: {file_path}") + + # 尝试解析时间索引 + for col in ['datetime', 'time', 'timestamp', 'date']: + if col in df.columns: + df[col] = pd.to_datetime(df[col]) + df = df.set_index(col).sort_index() + break + + return df + + +def compute_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: + """计算技术指标作为候选因子""" + data = df.copy() + + # 收益率 + data['return'] = np.log(data['close'] / data['close'].shift(1)) + + # 波动率(滚动12期标准差) + data['volatility'] = data['return'].rolling(12).std() + + # 偏度(滚动6期) + data['skewness'] = data['return'].rolling(6).skew() + + # EMA + data['ema4'] = data['close'].ewm(span=4, adjust=False).mean() + data['ema8'] = data['close'].ewm(span=8, adjust=False).mean() + data['ema16'] = data['close'].ewm(span=16, adjust=False).mean() + + # MACD + ema12 = data['close'].ewm(span=12, adjust=False).mean() + ema26 = data['close'].ewm(span=26, adjust=False).mean() + data['macd'] = ema12 - ema26 + + # RSI + delta = data['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(14).mean() + loss = (-delta.where(delta < 0, 0)).rolling(14).mean() + rs = gain / loss + data['rsi'] = 100 - (100 / (1 + rs)) + + # 成交量指标 + data['volume_ma6'] = data['volume'].rolling(6).mean() + data['volume_ratio'] = data['volume'] / (data['volume_ma6'] + 1e-8) + + # ATR + high_low = data['high'] - data['low'] + high_close = np.abs(data['high'] - data['close'].shift()) + low_close = np.abs(data['low'] - data['close'].shift()) + ranges = pd.concat([high_low, high_close, low_close], axis=1) + true_range = ranges.max(axis=1) + data['atr'] = true_range.rolling(14).mean() + + # 高低价差率 + data['price_range'] = (data['high'] - data['low']) / (data['close'].shift(1) + 1e-8) + + return data + + +def preprocess_data( + df: pd.DataFrame, + outlier_threshold: float = 3.0, + fill_method: str = 'ffill', + normalize_window: int = 30 +) -> pd.DataFrame: + """ + 数据预处理:异常值处理、缺失值填补、标准化 + + Parameters: + ----------- + df : DataFrame + 原始数据 + outlier_threshold : float + 异常值阈值(标准差倍数) + fill_method : str + 缺失值填充方法:'ffill', 'bfill', 'mean' + normalize_window : int + 标准化滚动窗口 + """ + data = df.copy() + + # 异常值处理(3σ法则)- 向量化优化 + numeric_cols = data.select_dtypes(include=[np.number]).columns + if 'return' in numeric_cols: + col = 'return' + mean = data[col].rolling(normalize_window, min_periods=1).mean() + std = data[col].rolling(normalize_window, min_periods=1).std() + mask = np.abs(data[col] - mean) > (outlier_threshold * std) + if mask.any(): + # 用前后相邻数据的线性插值替换 + data.loc[mask, col] = np.nan + data[col] = data[col].interpolate(method='linear') + + # 缺失值填补(向量化) + if fill_method == 'ffill': + data = data.ffill() + elif fill_method == 'bfill': + data = data.bfill() + elif fill_method == 'mean': + data = data.fillna(data.rolling(3, min_periods=1).mean()) + + # 标准化(滚动Z-score)- 向量化处理 + exclude_cols = {'open', 'high', 'low', 'close', 'volume'} + cols_to_normalize = [col for col in numeric_cols if col not in exclude_cols] + + if cols_to_normalize: + # 批量计算滚动均值和标准差 + rolling_mean = data[cols_to_normalize].rolling(normalize_window, min_periods=1).mean() + rolling_std = data[cols_to_normalize].rolling(normalize_window, min_periods=1).std() + 1e-8 + # 批量标准化 + normalized = (data[cols_to_normalize] - rolling_mean) / rolling_std + # 批量赋值 + for col in cols_to_normalize: + data[f'{col}_norm'] = normalized[col] + + return data + + +def compute_forward_returns(price: pd.Series, horizon: int = 1) -> pd.Series: + """计算未来收益率""" + return price.pct_change(horizon).shift(-horizon) + diff --git a/deap_factor_mining.py b/deap_factor_mining.py index 80f938c..12db657 100644 --- a/deap_factor_mining.py +++ b/deap_factor_mining.py @@ -123,6 +123,7 @@ def build_pset(feature_names: List[str]) -> gp.PrimitiveSetTyped: pset.addPrimitive(lambda x, w=w: _delay(x, w), [np.ndarray], np.ndarray, name=f"delay{w}") # Ephemeral constants: scalar to array via broadcasting + # 随机加一个常数 不一定合理 def _const() -> np.ndarray: return np.array(random.uniform(-2.0, 2.0)) diff --git a/example.py b/example.py new file mode 100644 index 0000000..a57dc9b --- /dev/null +++ b/example.py @@ -0,0 +1,110 @@ +""" +使用示例:时间序列因子挖掘流程 +""" +from pipeline import FactorPipeline +from factors import FactorMiner, create_default_factors + +# 方式1:使用默认流程(最简单) +def example_simple(): + """简单示例""" + pipeline = FactorPipeline( + ret_horizon=1, # 未来1期收益率 + ic_window=30, # IC计算窗口 + commission=0.001, # 手续费0.1% + slippage=0.0005 # 滑点0.05% + ) + + # 运行完整流程 + results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, # 最小IC阈值 + min_tstat=1.5, # 最小t统计量 + weight_method='risk_parity', # 权重方法:risk_parity, regression, equal + buy_threshold=0.8, # 买入阈值(标准差倍数) + sell_threshold=-0.8 # 卖出阈值(标准差倍数) + ) + + return results + + +# 方式2:分步骤执行(更灵活) +def example_step_by_step(): + """分步骤示例""" + pipeline = FactorPipeline(ret_horizon=1, ic_window=30) + + # 步骤1:加载和预处理数据 + pipeline.load_and_preprocess("ETH_USDT-1h.feather") + + # 步骤2:因子挖掘(可以使用自定义因子) + custom_miner = create_default_factors() + # 可以在这里添加自定义因子 + # custom_miner.register_rule_factor('CUSTOM', your_custom_function) + pipeline.mine_factors(custom_miner) + + # 步骤3:因子检验 + pipeline.validate_factors(min_ic=0.01, min_tstat=1.5) + + # 步骤4:因子组合 + pipeline.combine_factors(weight_method='risk_parity') + + # 步骤5:生成信号 + signals = pipeline.generate_signals(buy_threshold=0.8, sell_threshold=-0.8) + + # 步骤6:回测 + backtest_results = pipeline.backtest(signals) + + return { + 'factors': pipeline.factors, + 'score': pipeline.score, + 'signals': signals, + 'backtest': backtest_results + } + + +# 方式3:自定义因子 +def example_custom_factors(): + """自定义因子示例""" + from factors import RuleFactor + import pandas as pd + import numpy as np + + # 定义自定义因子函数 + def my_custom_factor(data: pd.DataFrame) -> pd.Series: + """自定义因子:价格与均线的距离""" + return (data['close'] - data['ema8']) / data['ema8'] + + # 创建因子挖掘器 + miner = create_default_factors() + + # 注册自定义因子 + miner.register_rule_factor('CUSTOM_DISTANCE', my_custom_factor) + + # 使用自定义因子挖掘器 + pipeline = FactorPipeline() + pipeline.load_and_preprocess("ETH_USDT-1h.feather") + pipeline.mine_factors(custom_miner=miner) + pipeline.validate_factors() + pipeline.combine_factors() + pipeline.backtest() + + return pipeline + + +if __name__ == "__main__": + # 运行简单示例 + print("运行简单示例...") + results = example_simple() + + # 保存结果 + if results['factors'] is not None: + results['factors'].to_csv("factors_output.csv") + print("\n因子数据已保存到 factors_output.csv") + + if results['score'] is not None: + results['score'].to_csv("score_output.csv") + print("综合得分已保存到 score_output.csv") + + if results['backtest'] is not None and 'equity' in results['backtest']: + results['backtest']['equity'].to_csv("equity_curve.csv") + print("权益曲线已保存到 equity_curve.csv") + diff --git a/factors.py b/factors.py new file mode 100644 index 0000000..44abb5b --- /dev/null +++ b/factors.py @@ -0,0 +1,113 @@ +""" +因子挖掘模块:支持规则因子和遗传编程因子 +""" +import numpy as np +import pandas as pd +from typing import Callable, Dict, List, Optional +from abc import ABC, abstractmethod + + +class BaseFactor(ABC): + """因子基类""" + + def __init__(self, name: str): + self.name = name + + @abstractmethod + def compute(self, data: pd.DataFrame) -> pd.Series: + """计算因子值""" + pass + + +class RuleFactor(BaseFactor): + """规则因子:基于固定规则""" + + def __init__(self, name: str, compute_func: Callable[[pd.DataFrame], pd.Series]): + super().__init__(name) + self.compute_func = compute_func + + def compute(self, data: pd.DataFrame) -> pd.Series: + return self.compute_func(data) + + +def create_trend_factor(data: pd.DataFrame) -> pd.Series: + """趋势因子:价格趋势方向""" + trend = pd.Series(0, index=data.index) + trend[data['close'] > data['ema16']] = 1 + trend[data['close'] < data['ema4']] = -1 + return trend + + +def create_volatility_factor(data: pd.DataFrame) -> pd.Series: + """波动率因子:滚动12期收益率标准差""" + return data['volatility'] + + +def create_volume_price_factor(data: pd.DataFrame) -> pd.Series: + """量价因子:成交量放大且价格上涨""" + volume_signal = (data['volume'] > data['volume_ma6']).astype(int) + return volume_signal * data['return'] + + +def create_reversal_factor(data: pd.DataFrame) -> pd.Series: + """反转因子:短期反转效应""" + return -data['return'].shift(1) + + +def create_momentum_factor(data: pd.DataFrame) -> pd.Series: + """动量因子:基于MACD""" + return data['macd'] + + +def create_rsi_factor(data: pd.DataFrame) -> pd.Series: + """RSI因子:相对强弱指数(标准化)""" + return (data['rsi'] - 50) / 50 # 归一化到[-1, 1] + + +class FactorMiner: + """因子挖掘器""" + + def __init__(self): + self.factors: Dict[str, BaseFactor] = {} + + def register_factor(self, factor: BaseFactor): + """注册因子""" + self.factors[factor.name] = factor + + def register_rule_factor(self, name: str, compute_func: Callable): + """注册规则因子""" + factor = RuleFactor(name, compute_func) + self.register_factor(factor) + + def compute_all_factors(self, data: pd.DataFrame) -> pd.DataFrame: + """计算所有因子""" + factor_df = pd.DataFrame(index=data.index) + + for name, factor in self.factors.items(): + try: + factor_df[name] = factor.compute(data) + except Exception as e: + print(f"计算因子 {name} 时出错: {e}") + factor_df[name] = np.nan + + return factor_df + + def get_factor(self, name: str) -> Optional[BaseFactor]: + """获取指定因子""" + return self.factors.get(name) + + +def create_default_factors() -> FactorMiner: + """创建默认因子集合""" + miner = FactorMiner() + + # 注册基础因子 + miner.register_rule_factor('TREND', create_trend_factor) + miner.register_rule_factor('VOL', create_volatility_factor) + miner.register_rule_factor('VOLP', create_volume_price_factor) + miner.register_rule_factor('REV', create_reversal_factor) + miner.register_rule_factor('MOM', create_momentum_factor) + miner.register_rule_factor('RSI', create_rsi_factor) + + return miner + diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..d7fc180 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,287 @@ +""" +主流程:时间序列因子挖掘、检验、回测、信号生成 +""" +import pandas as pd +from typing import Dict, List, Optional +import warnings +warnings.filterwarnings('ignore') + +from data import load_data, compute_technical_indicators, preprocess_data, compute_forward_returns +from factors import FactorMiner, create_default_factors +from validation import validate_factor, factor_span_regression +from combination import MultiFactorModel +from backtest import BacktestEngine +from signal import generate_signals + + +class FactorPipeline: + """因子挖掘流程""" + + def __init__( + self, + ret_horizon: int = 1, + ic_window: int = 30, + commission: float = 0.001, + slippage: float = 0.0005 + ): + """ + Parameters: + ----------- + ret_horizon : int + 未来收益率周期 + ic_window : int + IC计算窗口 + commission : float + 手续费率 + slippage : float + 滑点 + """ + self.ret_horizon = ret_horizon + self.ic_window = ic_window + self.commission = commission + self.slippage = slippage + + self.data: Optional[pd.DataFrame] = None + self.factors: Optional[pd.DataFrame] = None + self.forward_return: Optional[pd.Series] = None + self.factor_miner: Optional[FactorMiner] = None + self.validation_results: Dict = {} + self.model: Optional[MultiFactorModel] = None + self.score: Optional[pd.Series] = None + self.backtest_results: Optional[Dict] = None + + def load_and_preprocess(self, file_path: str) -> 'FactorPipeline': + """步骤1:加载和预处理数据""" + print("=" * 50) + print("步骤1:加载和预处理数据") + print("=" * 50) + + # 加载数据 + self.data = load_data(file_path) + print(f"加载数据: {len(self.data)} 条记录") + + # 计算技术指标 + self.data = compute_technical_indicators(self.data) + print("计算技术指标完成") + + # 预处理 + self.data = preprocess_data(self.data) + print("数据预处理完成") + + # 计算未来收益率 + self.forward_return = compute_forward_returns( + self.data['close'], + horizon=self.ret_horizon + ) + print(f"计算未来收益率完成(周期={self.ret_horizon})") + + return self + + def mine_factors(self, custom_miner: Optional[FactorMiner] = None) -> 'FactorPipeline': + """步骤2:因子挖掘""" + print("\n" + "=" * 50) + print("步骤2:因子挖掘") + print("=" * 50) + + if self.data is None: + raise ValueError("请先加载数据") + + # 使用自定义或默认因子挖掘器 + if custom_miner is None: + self.factor_miner = create_default_factors() + else: + self.factor_miner = custom_miner + + # 计算所有因子 + self.factors = self.factor_miner.compute_all_factors(self.data) + print(f"计算因子完成: {list(self.factors.columns)}") + + return self + + def validate_factors(self, min_ic: float = 0.01, min_tstat: float = 1.5) -> 'FactorPipeline': + """步骤3:因子检验""" + print("\n" + "=" * 50) + print("步骤3:因子检验") + print("=" * 50) + + if self.factors is None or self.forward_return is None: + raise ValueError("请先完成因子挖掘") + + valid_factors = [] + self.validation_results = {} + + for factor_name in self.factors.columns: + factor = self.factors[factor_name] + + # 综合检验 + result = validate_factor(factor, self.forward_return, ic_window=self.ic_window) + self.validation_results[factor_name] = result + + # 筛选有效因子 + if (abs(result['mean_ic']) >= min_ic and + abs(result['mean_h_l_tstat']) >= min_tstat): + valid_factors.append(factor_name) + print(f"\n因子 {factor_name}:") + print(f" 平均IC: {result['mean_ic']:.4f}") + print(f" IC信息比率: {result['ic_ir']:.4f}") + print(f" H-L收益差: {result['mean_h_l_return']:.4f}") + print(f" H-L t统计量: {result['mean_h_l_tstat']:.4f}") + else: + print(f"\n因子 {factor_name} 未通过检验 (IC={result['mean_ic']:.4f}, t={result['mean_h_l_tstat']:.4f})") + + # 只保留有效因子 + if valid_factors: + self.factors = self.factors[valid_factors] + print(f"\n有效因子: {valid_factors}") + else: + print("\n警告:没有因子通过检验!") + + return self + + def combine_factors( + self, + weight_method: str = 'risk_parity', + window: Optional[int] = None + ) -> 'FactorPipeline': + """步骤4:因子组合""" + print("\n" + "=" * 50) + print("步骤4:因子组合") + print("=" * 50) + + if self.factors is None or len(self.factors.columns) == 0: + raise ValueError("没有有效因子可组合") + + # 创建多因子模型 + self.model = MultiFactorModel(weight_method=weight_method) + self.model.fit( + self.factors, + forward_return=self.forward_return, + window=window + ) + + # 计算综合得分 + self.score = self.model.predict(self.factors) + + # 显示权重 + weights = self.model.get_weights() + print("因子权重:") + for name, weight in weights.items(): + print(f" {name}: {weight:.4f}") + + print(f"\n综合得分统计:") + print(f" 均值: {self.score.mean():.4f}") + print(f" 标准差: {self.score.std():.4f}") + + return self + + def generate_signals( + self, + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30 + ) -> pd.Series: + """步骤5:生成交易信号""" + if self.score is None: + raise ValueError("请先完成因子组合") + + signals = generate_signals( + self.score, + buy_threshold=buy_threshold, + sell_threshold=sell_threshold, + window=window + ) + + return signals + + def backtest( + self, + signals: Optional[pd.Series] = None, + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30 + ) -> Dict: + """步骤6:回测""" + print("\n" + "=" * 50) + print("步骤6:回测") + print("=" * 50) + + if self.data is None: + raise ValueError("请先加载数据") + + if signals is None: + signals = self.generate_signals(buy_threshold, sell_threshold, window) + + # 创建回测引擎 + engine = BacktestEngine( + commission=self.commission, + slippage=self.slippage + ) + + # 运行回测 + self.backtest_results = engine.run( + signals, + self.data['close'], + score=self.score + ) + + # 显示结果 + metrics = self.backtest_results['metrics'] + print("\n回测结果:") + print(f" 总收益率: {metrics.get('total_return', 0)*100:.2f}%") + print(f" 年化收益率: {metrics.get('annual_return', 0)*100:.2f}%") + print(f" 年化波动率: {metrics.get('annual_volatility', 0)*100:.2f}%") + print(f" 夏普比率: {metrics.get('sharpe_ratio', 0):.2f}") + print(f" 最大回撤: {metrics.get('max_drawdown', 0)*100:.2f}%") + print(f" 胜率: {metrics.get('win_rate', 0)*100:.2f}%") + print(f" 盈亏比: {metrics.get('profit_loss_ratio', 0):.2f}") + print(f" 交易次数: {metrics.get('total_trades', 0)}") + + return self.backtest_results + + def run_full_pipeline( + self, + file_path: str, + custom_miner: Optional[FactorMiner] = None, + min_ic: float = 0.01, + min_tstat: float = 1.5, + weight_method: str = 'risk_parity', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8 + ) -> Dict: + """运行完整流程""" + self.load_and_preprocess(file_path) \ + .mine_factors(custom_miner) \ + .validate_factors(min_ic, min_tstat) \ + .combine_factors(weight_method) \ + .backtest(buy_threshold=buy_threshold, sell_threshold=sell_threshold) + + return { + 'factors': self.factors, + 'score': self.score, + 'validation': self.validation_results, + 'backtest': self.backtest_results + } + + +if __name__ == "__main__": + # 示例使用 + pipeline = FactorPipeline(ret_horizon=1, ic_window=30) + + results = pipeline.run_full_pipeline( + file_path="ETH_USDT-1h.feather", + min_ic=0.01, + min_tstat=1.5, + weight_method='risk_parity', + buy_threshold=0.8, + sell_threshold=-0.8 + ) + + # 保存结果 + if results['factors'] is not None: + results['factors'].to_csv("factors.csv") + print("\n因子数据已保存到 factors.csv") + + if results['score'] is not None: + results['score'].to_csv("score.csv") + print("综合得分已保存到 score.csv") + diff --git a/signal.py b/signal.py new file mode 100644 index 0000000..5753ad8 --- /dev/null +++ b/signal.py @@ -0,0 +1,109 @@ +""" +信号生成模块 +""" +import numpy as np +import pandas as pd +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from pandas import Series + + +def generate_signals( + score: 'pd.Series', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30, + use_rolling_std: bool = True +) -> 'pd.Series': + """ + 基于因子得分生成买卖信号 + + Parameters: + ----------- + score : Series + 因子综合得分 + buy_threshold : float + 买入阈值(标准差倍数) + sell_threshold : float + 卖出阈值(标准差倍数) + window : int + 滚动窗口(用于计算标准差) + use_rolling_std : bool + 是否使用滚动标准差 + + Returns: + -------- + Series: 交易信号(1=买入,-1=卖出,0=持有) + """ + signals = pd.Series(0, index=score.index) + + if use_rolling_std: + # 使用滚动标准差 + rolling_std = score.rolling(window).std() + buy_line = buy_threshold * rolling_std + sell_line = sell_threshold * rolling_std + else: + # 使用固定阈值 + std = score.std() + buy_line = buy_threshold * std + sell_line = sell_threshold * std + + # 生成原始信号 + raw_signals = pd.Series(0, index=score.index) + raw_signals[score > buy_line] = 1 # 买入信号 + raw_signals[score < sell_line] = -1 # 卖出信号 + + # 只在信号变化时产生交易信号,其他时候保持持仓状态 + signals = pd.Series(0, index=score.index) + position = 0 # 当前持仓状态:0=空仓,1=满仓 + + for i in range(len(raw_signals)): + current_signal = raw_signals.iloc[i] + + # 只在信号变化时产生交易 + if current_signal == 1 and position == 0: + signals.iloc[i] = 1 # 买入 + position = 1 + elif current_signal == -1 and position == 1: + signals.iloc[i] = -1 # 卖出 + position = 0 + # 其他情况保持当前持仓状态,不产生交易信号 + + return signals.astype(int) + + +def generate_signals_with_position( + score: 'pd.Series', + buy_threshold: float = 0.8, + sell_threshold: float = -0.8, + window: int = 30, + current_position: int = 0 +) -> 'pd.Series': + """ + 生成信号(考虑当前持仓状态) + + Parameters: + ----------- + current_position : int + 当前持仓:0=空仓,1=满仓 + """ + raw_signals = generate_signals(score, buy_threshold, sell_threshold, window) + signals = pd.Series(0, index=score.index) + + position = current_position + + for i in range(len(raw_signals)): + signal = raw_signals.iloc[i] + + if signal == 1 and position == 0: + signals.iloc[i] = 1 # 买入 + position = 1 + elif signal == -1 and position == 1: + signals.iloc[i] = -1 # 卖出 + position = 0 + else: + signals.iloc[i] = 0 # 持有 + + return signals + diff --git a/validation.py b/validation.py new file mode 100644 index 0000000..adf7990 --- /dev/null +++ b/validation.py @@ -0,0 +1,226 @@ +""" +因子检验模块:IC检验、分组回测、因子跨度回归 +""" +import numpy as np +import pandas as pd +from typing import Dict, List, Tuple +from statsmodels.regression.linear_model import OLS + + +def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series: + """ + 计算IC(信息系数) + + Parameters: + ----------- + factor : Series + 因子值 + forward_return : Series + 未来收益率 + method : str + 相关性计算方法:'spearman' 或 'pearson' + """ + aligned = pd.concat([factor, forward_return], axis=1).dropna() + if len(aligned) < 10: + return pd.Series(dtype=float) + + if method == 'spearman': + ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank()) + else: + ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1]) + + return pd.Series([ic], index=[aligned.index[-1]]) + + +def compute_rolling_ic( + factor: pd.Series, + forward_return: pd.Series, + window: int = 30, + method: str = 'spearman' +) -> pd.Series: + """计算滚动IC(向量化优化)""" + # 对齐数据 + aligned = pd.concat([factor, forward_return], axis=1).dropna() + if len(aligned) < window: + return pd.Series(dtype=float, index=factor.index[window:]) + + aligned.columns = ['factor', 'return'] + + if method == 'spearman': + # 使用rank计算Spearman相关性 + factor_rank = aligned['factor'].rank() + return_rank = aligned['return'].rank() + # 使用DataFrame的rolling().corr()方法 + df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank}) + ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return']) + else: + # Pearson相关性 + df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']}) + ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return']) + + return ic_series + + +def group_backtest( + factor: pd.Series, + forward_return: pd.Series, + n_groups: int = 3, + group_period: int = 180 +) -> Dict: + """ + 分组回测:将数据按因子值分组,计算各组收益 + + Returns: + -------- + dict: 包含各组收益、H-L收益差、t统计量等 + """ + aligned = pd.concat([factor, forward_return], axis=1).dropna() + aligned.columns = ['factor', 'return'] + + results = { + 'group_returns': [], + 'h_l_return': [], + 'h_l_tstat': [], + 'periods': [] + } + + # 按月分组(每180个4h周期)- 使用更高效的步长 + step = max(group_period // 2, 90) # 减少重叠计算 + for start in range(0, len(aligned) - group_period, step): + end = start + group_period + period_data = aligned.iloc[start:end] + + if len(period_data) < 30: + continue + + # 按因子值分组(向量化) + try: + period_data = period_data.copy() + period_data['group'] = pd.qcut( + period_data['factor'], + q=n_groups, + labels=False, + duplicates='drop' + ) + + # 计算各组收益(向量化) + group_returns = period_data.groupby('group')['return'].mean() + results['group_returns'].append(group_returns) + + # H-L收益差 + if len(group_returns) >= 2: + h_return = group_returns.iloc[-1] # 高因子组 + l_return = group_returns.iloc[0] # 低因子组 + h_l_diff = h_return - l_return + + results['h_l_return'].append(h_l_diff) + results['periods'].append(period_data.index[-1]) + except (ValueError, KeyError): + # qcut失败时跳过 + continue + + # 计算平均H-L收益和t统计量 + if results['h_l_return']: + h_l_series = pd.Series(results['h_l_return'], index=results['periods']) + mean_h_l = h_l_series.mean() + std_h_l = h_l_series.std() + t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8) + + results['mean_h_l_return'] = mean_h_l + results['mean_h_l_tstat'] = t_stat + results['h_l_series'] = h_l_series + else: + results['mean_h_l_return'] = 0 + results['mean_h_l_tstat'] = 0 + + return results + + +def factor_span_regression( + factors: pd.DataFrame, + forward_return: pd.Series, + target_factor: str +) -> Dict: + """ + 因子跨度回归:检验因子的边际解释力 + + Parameters: + ----------- + factors : DataFrame + 所有因子数据框 + forward_return : Series + 未来收益率 + target_factor : str + 目标因子名称 + + Returns: + -------- + dict: 包含回归系数、t统计量、R²等 + """ + # 对齐数据 + data = pd.concat([factors, forward_return], axis=1).dropna() + if len(data) < 30: + return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} + + y = data.iloc[:, -1].values + X_all = data.iloc[:, :-1].values + + # 全模型(包含目标因子) + try: + model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + r2_all = model_all.rsquared + + # 目标因子的系数和t统计量 + target_idx = factors.columns.get_loc(target_factor) + beta = model_all.params[target_idx] + tstat = model_all.tvalues[target_idx] + + # 不含目标因子的模型 + X_without = np.delete(X_all, target_idx, axis=1) + model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) + r2_without = model_without.rsquared + + r2_change = r2_all - r2_without + + return { + 'beta': beta, + 'tstat': tstat, + 'r2': r2_all, + 'r2_change': r2_change, + 'pvalue': model_all.pvalues[target_idx] + } + except Exception as e: + print(f"回归分析出错: {e}") + return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} + + +def validate_factor( + factor: pd.Series, + forward_return: pd.Series, + ic_window: int = 30, + n_groups: int = 3 +) -> Dict: + """ + 综合因子检验 + + Returns: + -------- + dict: 包含IC、分组回测、显著性等指标 + """ + # IC检验 + rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window) + mean_ic = rolling_ic.mean() + ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率 + + # 分组回测 + group_result = group_backtest(factor, forward_return, n_groups=n_groups) + + return { + 'mean_ic': mean_ic, + 'ic_ir': ic_ir, + 'ic_series': rolling_ic, + 'mean_h_l_return': group_result['mean_h_l_return'], + 'mean_h_l_tstat': group_result['mean_h_l_tstat'], + 'group_returns': group_result['group_returns'] + } +