Files
etf/framework_v2/shared/factors/momentum.py
aszerW 908b28473f feat(framework_v2): 创建框架V2骨架 - 三层架构+因子验证通过
## 架构设计
- 三层架构:core(抽象接口) → shared(通用实现) → tests(验证测试)
- 5个核心抽象基类:StrategyBase, FactorBase, SignalGenerator, Executor, DataFetcher
- 零侵入:与现有框架并行开发,不修改生产代码

## 已完成
✓ 核心接口层(5个ABC类)
✓ 通用因子层(MomentumFactor完全复制现有逻辑)
✓ 对比验证测试(新旧因子输出差异=0,测试通过)

## 验证结果
- 最大差异: 0.000000e+00
- 平均差异: 0.000000e+00
- 容差: < 1e-10

## 下一步
- 阶段3: 信号层迁移(TopNSelector, DynamicThreshold, RebalanceController)
- 阶段4: 执行层迁移(BacktestRunner)
- 阶段5: 数据层迁移(DataFetcher实现)
- 阶段6: 完整策略对比验证

## 设计原则
- 按需抽象,不预先设计
- 职责分离,避免框架膨胀
- 测试驱动,每个组件必须有对比测试
- 渐进式迁移,验证通过再替换
2026-05-24 09:12:29 +08:00

105 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子(通用版本)
使用加权线性回归:得分 = 年化收益率 ×
与现有 MomentumFactor 对比验证:
- 输入相同 → 输出应该相同
"""
import pandas as pd
import numpy as np
import math
from framework_v2.core import FactorBase
class MomentumFactor(FactorBase):
"""
动量因子
计算加权线性回归动量得分:
得分 = 年化收益率 ×
参数:
- n_days: 动量窗口默认25
- weighted: 是否加权默认True
- crash_filter: 是否启用崩盘过滤默认True
"""
name = "momentum"
category = "momentum"
def __init__(
self,
n_days: int = 25,
weighted: bool = True,
crash_filter: bool = True
):
super().__init__(n_days=n_days, weighted=weighted, crash_filter=crash_filter)
self.n_days = n_days
self.weighted = weighted
self.crash_filter = crash_filter
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算动量因子值"""
if 'close' not in data.columns:
raise ValueError("data must contain 'close' column")
prices = data['close']
if self.weighted:
factor_values = prices.rolling(self.n_days).apply(
lambda x: self._weighted_momentum_score(x.values),
raw=False
)
else:
factor_values = prices.pct_change(self.n_days)
if self.crash_filter:
factor_values = self._apply_crash_filter(prices, factor_values)
return factor_values
def _weighted_momentum_score(self, prices: np.ndarray) -> float:
"""计算加权动量得分(完全复制现有逻辑)"""
if len(prices) < 5:
return 0.0
# 价格下界 clip防止 log(0) 或 log(负数)
prices = np.clip(prices, 0.01, None)
y = np.log(prices)
# 异常值检测
if np.any(np.isnan(y)) or np.any(np.isinf(y)):
return 0.0
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y))
slope, intercept = np.polyfit(x, y, 1, w=weights)
annualized_returns = math.exp(slope * 250) - 1
y_pred = slope * x + intercept
ss_res = np.sum(weights * (y - y_pred) ** 2)
ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
return annualized_returns * r2
def _apply_crash_filter(self, prices: pd.Series, factor_values: pd.Series) -> pd.Series:
"""崩盘过滤连续3天跌>5%清零(完全复制现有逻辑)"""
result = factor_values.copy()
for i in range(3, len(prices)):
r1 = prices.iloc[i] / prices.iloc[i-1]
r2 = prices.iloc[i-1] / prices.iloc[i-2]
r3 = prices.iloc[i-2] / prices.iloc[i-3]
con1 = min(r1, r2, r3) < 0.95
con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices.iloc[i] / prices.iloc[i-3] < 0.95)
if con1 or con2:
result.iloc[i] = 0.0
return result