Files
etf/framework/factors/momentum.py
aszerW 796a695eef feat(factors): 实现因子层抽象
核心组件:
- FactorBase: 因子抽象基类(compute方法 + 数据验证)
- FactorRegistry: 因子注册器(注册/获取/按类别筛选)
- FactorCombiner: 因子组合器(加权组合4种方法)

已实现因子:
- MomentumFactor: 加权动量因子(含崩盘过滤)
- TrendFactor: 趋势因子(MA交叉/MACD)
- ReversalFactor: 反转因子(RSI/KDJ)
- VolatilityFactor: 波动率因子(ATR/标准差)

测试覆盖:18个测试全部通过
2026-05-11 22:17:53 +08:00

312 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子实现
基于加权线性回归动量的因子
"""
import pandas as pd
import numpy as np
import math
from typing import Optional
from framework.factors import FactorBase
class MomentumFactor(FactorBase):
"""
动量因子
计算加权线性回归动量得分:
得分 = 年化收益率 ×
参数:
- n_days: 动量窗口默认25
- weighted: 是否加权默认True
- crash_filter: 是否启用崩盘过滤默认True
"""
name = "momentum"
category = "momentum"
def __init__(
self,
n_days: int = 25,
weighted: bool = True,
crash_filter: bool = True
):
super().__init__(n_days=n_days, weighted=weighted, crash_filter=crash_filter)
self.n_days = n_days
self.weighted = weighted
self.crash_filter = crash_filter
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算动量因子值"""
if 'close' not in data.columns:
raise ValueError("data must contain 'close' column")
prices = data['close']
if self.weighted:
# 加权动量得分
factor_values = prices.rolling(self.n_days).apply(
lambda x: self._weighted_momentum_score(x.values),
raw=False
)
else:
# 简单动量
factor_values = prices.pct_change(self.n_days)
# 应用崩盘过滤
if self.crash_filter:
factor_values = self._apply_crash_filter(prices, factor_values)
return factor_values
def _weighted_momentum_score(self, prices: np.ndarray) -> float:
"""计算加权动量得分"""
if len(prices) < 5:
return 0.0
y = np.log(prices)
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y))
# 加权线性回归
slope, intercept = np.polyfit(x, y, 1, w=weights)
annualized_returns = math.exp(slope * 250) - 1
# 加权R²
y_pred = slope * x + intercept
ss_res = np.sum(weights * (y - y_pred) ** 2)
ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
return annualized_returns * r2
def _apply_crash_filter(
self,
prices: pd.Series,
factor_values: pd.Series
) -> pd.Series:
"""崩盘过滤连续3天跌>5%清零"""
result = factor_values.copy()
for i in range(3, len(prices)):
r1 = prices.iloc[i] / prices.iloc[i-1]
r2 = prices.iloc[i-1] / prices.iloc[i-2]
r3 = prices.iloc[i-2] / prices.iloc[i-3]
# 条件1任一天跌>5%
con1 = min(r1, r2, r3) < 0.95
# 条件2连续下跌且累计跌>5%
con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices.iloc[i] / prices.iloc[i-3] < 0.95)
if con1 or con2:
result.iloc[i] = 0.0
return result
class TrendFactor(FactorBase):
"""
趋势因子
计算趋势强度:
- MA交叉偏离度
- MACD趋势
参数:
- method: 趋势方法('ma_cross', 'macd'
- fast: 快线周期默认5
- slow: 慢线周期默认20
"""
name = "trend"
category = "trend"
def __init__(
self,
method: str = 'ma_cross',
fast: int = 5,
slow: int = 20
):
super().__init__(method=method, fast=fast, slow=slow)
self.method = method
self.fast = fast
self.slow = slow
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算趋势因子值"""
if 'close' not in data.columns:
raise ValueError("data must contain 'close' column")
prices = data['close']
if self.method == 'ma_cross':
# MA交叉偏离度
fast_ma = prices.rolling(self.fast).mean()
slow_ma = prices.rolling(self.slow).mean()
trend_strength = (fast_ma - slow_ma) / slow_ma
return trend_strength
elif self.method == 'macd':
# MACD趋势
ema12 = prices.ewm(span=12).mean()
ema26 = prices.ewm(span=26).mean()
macd = ema12 - ema26
signal = macd.ewm(span=9).mean()
return macd - signal
else:
raise ValueError(f"Unknown method: {self.method}")
class ReversalFactor(FactorBase):
"""
反转因子
计算超买超卖信号:
- RSI偏离度
- KDJ
参数:
- method: 反转方法('rsi', 'kdj'
- period: 周期默认14
- overbought: 超买阈值默认70
- oversold: 超卖阈值默认30
"""
name = "reversal"
category = "reversal"
def __init__(
self,
method: str = 'rsi',
period: int = 14,
overbought: float = 70,
oversold: float = 30
):
super().__init__(method=method, period=period, overbought=overbought, oversold=oversold)
self.method = method
self.period = period
self.overbought = overbought
self.oversold = oversold
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算反转因子值"""
if 'close' not in data.columns:
raise ValueError("data must contain 'close' column")
prices = data['close']
if self.method == 'rsi':
# RSI反转信号
rsi = self._compute_rsi(prices, self.period)
# 超买超卖偏离度
# 超买 → 负值(反转向下信号)
# 超卖 → 正值(反转向上信号)
reversal_signal = pd.Series(index=prices.index, dtype=float)
reversal_signal = np.where(
rsi > self.overbought,
-(rsi - self.overbought) / (100 - self.overbought), # 超买:负值
np.where(
rsi < self.oversold,
(self.oversold - rsi) / self.oversold, # 超卖:正值
0 # 正常区间0
)
)
return pd.Series(reversal_signal, index=prices.index)
elif self.method == 'kdj':
# KDJ反转信号
return self._compute_kdj(data)
else:
raise ValueError(f"Unknown method: {self.method}")
def _compute_rsi(self, prices: pd.Series, period: int) -> pd.Series:
"""计算RSI"""
delta = prices.diff()
gain = delta.where(delta > 0, 0)
loss = (-delta).where(delta < 0, 0)
avg_gain = gain.rolling(period).mean()
avg_loss = loss.rolling(period).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def _compute_kdj(self, data: pd.DataFrame) -> pd.Series:
"""计算KDJ反转信号"""
low = data['low']
high = data['high']
close = data['close']
# 计算K、D、J
low_min = low.rolling(self.period).min()
high_max = high.rolling(self.period).max()
rsv = (close - low_min) / (high_max - low_min) * 100
k = rsv.ewm(alpha=1/3).mean()
d = k.ewm(alpha=1/3).mean()
j = 3 * k - 2 * d
# J值偏离度作为反转信号
return j
class VolatilityFactor(FactorBase):
"""
波动率因子
计算价格波动率:
- ATR
- 标准差
参数:
- method: 波动率方法('atr', 'std'
- period: 周期默认20
"""
name = "volatility"
category = "volatility"
def __init__(
self,
method: str = 'std',
period: int = 20
):
super().__init__(method=method, period=period)
self.method = method
self.period = period
def compute(self, data: pd.DataFrame) -> pd.Series:
"""计算波动率因子值"""
if self.method == 'std':
# 标准差波动率
return data['close'].rolling(self.period).std()
elif self.method == 'atr':
# ATR波动率
return self._compute_atr(data)
else:
raise ValueError(f"Unknown method: {self.method}")
def _compute_atr(self, data: pd.DataFrame) -> pd.Series:
"""计算ATR"""
high = data['high']
low = data['low']
close = data['close']
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
return tr.rolling(self.period).mean()