""" 动量因子实现 基于加权线性回归动量的因子 """ import pandas as pd import numpy as np import math from typing import Optional from framework.factors import FactorBase class MomentumFactor(FactorBase): """ 动量因子 计算加权线性回归动量得分: 得分 = 年化收益率 × R² 参数: - n_days: 动量窗口(默认25) - weighted: 是否加权(默认True) - crash_filter: 是否启用崩盘过滤(默认True) """ name = "momentum" category = "momentum" def __init__( self, n_days: int = 25, weighted: bool = True, crash_filter: bool = True ): super().__init__(n_days=n_days, weighted=weighted, crash_filter=crash_filter) self.n_days = n_days self.weighted = weighted self.crash_filter = crash_filter def compute(self, data: pd.DataFrame) -> pd.Series: """计算动量因子值""" if 'close' not in data.columns: raise ValueError("data must contain 'close' column") prices = data['close'] if self.weighted: # 加权动量得分 factor_values = prices.rolling(self.n_days).apply( lambda x: self._weighted_momentum_score(x.values), raw=False ) else: # 简单动量 factor_values = prices.pct_change(self.n_days) # 应用崩盘过滤 if self.crash_filter: factor_values = self._apply_crash_filter(prices, factor_values) return factor_values def _weighted_momentum_score(self, prices: np.ndarray) -> float: """计算加权动量得分""" if len(prices) < 5: return 0.0 y = np.log(prices) x = np.arange(len(y)) weights = np.linspace(1, 2, len(y)) # 加权线性回归 slope, intercept = np.polyfit(x, y, 1, w=weights) annualized_returns = math.exp(slope * 250) - 1 # 加权R² y_pred = slope * x + intercept ss_res = np.sum(weights * (y - y_pred) ** 2) ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 return annualized_returns * r2 def _apply_crash_filter( self, prices: pd.Series, factor_values: pd.Series ) -> pd.Series: """崩盘过滤:连续3天跌>5%清零""" result = factor_values.copy() for i in range(3, len(prices)): r1 = prices.iloc[i] / prices.iloc[i-1] r2 = prices.iloc[i-1] / prices.iloc[i-2] r3 = prices.iloc[i-2] / prices.iloc[i-3] # 条件1:任一天跌>5% con1 = min(r1, r2, r3) < 0.95 # 条件2:连续下跌且累计跌>5% con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices.iloc[i] / prices.iloc[i-3] < 0.95) if con1 or con2: result.iloc[i] = 0.0 return result class TrendFactor(FactorBase): """ 趋势因子 计算趋势强度: - MA交叉偏离度 - MACD趋势 参数: - method: 趋势方法('ma_cross', 'macd') - fast: 快线周期(默认5) - slow: 慢线周期(默认20) """ name = "trend" category = "trend" def __init__( self, method: str = 'ma_cross', fast: int = 5, slow: int = 20 ): super().__init__(method=method, fast=fast, slow=slow) self.method = method self.fast = fast self.slow = slow def compute(self, data: pd.DataFrame) -> pd.Series: """计算趋势因子值""" if 'close' not in data.columns: raise ValueError("data must contain 'close' column") prices = data['close'] if self.method == 'ma_cross': # MA交叉偏离度 fast_ma = prices.rolling(self.fast).mean() slow_ma = prices.rolling(self.slow).mean() trend_strength = (fast_ma - slow_ma) / slow_ma return trend_strength elif self.method == 'macd': # MACD趋势 ema12 = prices.ewm(span=12).mean() ema26 = prices.ewm(span=26).mean() macd = ema12 - ema26 signal = macd.ewm(span=9).mean() return macd - signal else: raise ValueError(f"Unknown method: {self.method}") class ReversalFactor(FactorBase): """ 反转因子 计算超买超卖信号: - RSI偏离度 - KDJ 参数: - method: 反转方法('rsi', 'kdj') - period: 周期(默认14) - overbought: 超买阈值(默认70) - oversold: 超卖阈值(默认30) """ name = "reversal" category = "reversal" def __init__( self, method: str = 'rsi', period: int = 14, overbought: float = 70, oversold: float = 30 ): super().__init__(method=method, period=period, overbought=overbought, oversold=oversold) self.method = method self.period = period self.overbought = overbought self.oversold = oversold def compute(self, data: pd.DataFrame) -> pd.Series: """计算反转因子值""" if 'close' not in data.columns: raise ValueError("data must contain 'close' column") prices = data['close'] if self.method == 'rsi': # RSI反转信号 rsi = self._compute_rsi(prices, self.period) # 超买超卖偏离度 # 超买 → 负值(反转向下信号) # 超卖 → 正值(反转向上信号) reversal_signal = pd.Series(index=prices.index, dtype=float) reversal_signal = np.where( rsi > self.overbought, -(rsi - self.overbought) / (100 - self.overbought), # 超买:负值 np.where( rsi < self.oversold, (self.oversold - rsi) / self.oversold, # 超卖:正值 0 # 正常区间:0 ) ) return pd.Series(reversal_signal, index=prices.index) elif self.method == 'kdj': # KDJ反转信号 return self._compute_kdj(data) else: raise ValueError(f"Unknown method: {self.method}") def _compute_rsi(self, prices: pd.Series, period: int) -> pd.Series: """计算RSI""" delta = prices.diff() gain = delta.where(delta > 0, 0) loss = (-delta).where(delta < 0, 0) avg_gain = gain.rolling(period).mean() avg_loss = loss.rolling(period).mean() rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) return rsi def _compute_kdj(self, data: pd.DataFrame) -> pd.Series: """计算KDJ反转信号""" low = data['low'] high = data['high'] close = data['close'] # 计算K、D、J low_min = low.rolling(self.period).min() high_max = high.rolling(self.period).max() rsv = (close - low_min) / (high_max - low_min) * 100 k = rsv.ewm(alpha=1/3).mean() d = k.ewm(alpha=1/3).mean() j = 3 * k - 2 * d # J值偏离度作为反转信号 return j class VolatilityFactor(FactorBase): """ 波动率因子 计算价格波动率: - ATR - 标准差 参数: - method: 波动率方法('atr', 'std') - period: 周期(默认20) """ name = "volatility" category = "volatility" def __init__( self, method: str = 'std', period: int = 20 ): super().__init__(method=method, period=period) self.method = method self.period = period def compute(self, data: pd.DataFrame) -> pd.Series: """计算波动率因子值""" if self.method == 'std': # 标准差波动率 return data['close'].rolling(self.period).std() elif self.method == 'atr': # ATR波动率 return self._compute_atr(data) else: raise ValueError(f"Unknown method: {self.method}") def _compute_atr(self, data: pd.DataFrame) -> pd.Series: """计算ATR""" high = data['high'] low = data['low'] close = data['close'] prev_close = close.shift(1) tr = pd.concat([ high - low, (high - prev_close).abs(), (low - prev_close).abs() ], axis=1).max(axis=1) return tr.rolling(self.period).mean()