""" 动量因子(通用版本) 使用加权线性回归:得分 = 年化收益率 × R² 与现有 MomentumFactor 对比验证: - 输入相同 → 输出应该相同 """ import pandas as pd import numpy as np import math from framework_v2.core import FactorBase class MomentumFactor(FactorBase): """ 动量因子 计算加权线性回归动量得分: 得分 = 年化收益率 × R² 参数: - n_days: 动量窗口(默认25) - weighted: 是否加权(默认True) - crash_filter: 是否启用崩盘过滤(默认True) """ name = "momentum" category = "momentum" def __init__( self, n_days: int = 25, weighted: bool = True, crash_filter: bool = True ): super().__init__(n_days=n_days, weighted=weighted, crash_filter=crash_filter) self.n_days = n_days self.weighted = weighted self.crash_filter = crash_filter def compute(self, data: pd.DataFrame) -> pd.Series: """计算动量因子值""" if 'close' not in data.columns: raise ValueError("data must contain 'close' column") prices = data['close'] if self.weighted: factor_values = prices.rolling(self.n_days).apply( lambda x: self._weighted_momentum_score(x.values), raw=False ) else: factor_values = prices.pct_change(self.n_days) if self.crash_filter: factor_values = self._apply_crash_filter(prices, factor_values) return factor_values def _weighted_momentum_score(self, prices: np.ndarray) -> float: """计算加权动量得分(完全复制现有逻辑)""" if len(prices) < 5: return 0.0 # 价格下界 clip,防止 log(0) 或 log(负数) prices = np.clip(prices, 0.01, None) y = np.log(prices) # 异常值检测 if np.any(np.isnan(y)) or np.any(np.isinf(y)): return 0.0 x = np.arange(len(y)) weights = np.linspace(1, 2, len(y)) slope, intercept = np.polyfit(x, y, 1, w=weights) annualized_returns = math.exp(slope * 250) - 1 y_pred = slope * x + intercept ss_res = np.sum(weights * (y - y_pred) ** 2) ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 return annualized_returns * r2 def _apply_crash_filter(self, prices: pd.Series, factor_values: pd.Series) -> pd.Series: """崩盘过滤:连续3天跌>5%清零(完全复制现有逻辑)""" result = factor_values.copy() for i in range(3, len(prices)): r1 = prices.iloc[i] / prices.iloc[i-1] r2 = prices.iloc[i-1] / prices.iloc[i-2] r3 = prices.iloc[i-2] / prices.iloc[i-3] con1 = min(r1, r2, r3) < 0.95 con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices.iloc[i] / prices.iloc[i-3] < 0.95) if con1 or con2: result.iloc[i] = 0.0 return result