""" 动量因子计算模块 支持两种动量因子: 1. N日涨幅(简单动量) 2. 斜率×R²趋势得分(改进版) """ import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression import math def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series: """ 计算 N 日涨幅(简单动量) Args: price_series: 价格序列 n: 动量窗口天数 Returns: Series: N日涨幅 """ return price_series / price_series.shift(n + 1) - 1.0 def _slope_r2_score(srs: pd.Series, n: int = 25) -> float: """ 单次计算斜率×R²趋势得分 Args: srs: 价格窗口序列(长度为 n) n: 窗口长度 Returns: float: 斜率 × R² × 10000 """ if srs.shape[0] < n: return np.nan x = np.arange(1, n + 1).reshape(-1, 1) y = srs.values / srs.values[0] # 归一化 lr = LinearRegression().fit(x, y) slope = lr.coef_[0] r_squared = lr.score(x, y) score = 10000 * slope * r_squared return score def calculate_weighted_momentum_score(prices: np.ndarray) -> float: """ 加权线性回归动量得分 (匹配 动量.py / JoinQuant 逻辑) Args: prices: 价格数组 Returns: float: 年化收益率 * R² """ if len(prices) < 5: return 0.0 y = np.log(prices) x = np.arange(len(y)) weights = np.linspace(1, 2, len(y)) # 近期权重更高 (1 -> 2) # 加权线性回归 # 使用 np.polyfit 的 w 参数进行加权 slope, intercept = np.polyfit(x, y, 1, w=weights) annualized_returns = math.exp(slope * 250) - 1 # 加权R² y_pred = slope * x + intercept ss_res = np.sum(weights * (y - y_pred) ** 2) ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 return annualized_returns * r2 def calculate_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: """计算ATR(不依赖talib)""" prev_close = close.shift(1) tr = pd.concat([ high - low, (high - prev_close).abs(), (low - prev_close).abs(), ], axis=1).max(axis=1) return tr.rolling(window=period, min_periods=period).mean() def apply_crash_filter(prices: np.ndarray, score: float) -> float: """崩盘过滤:连续3天有任一天跌>5%""" if len(prices) < 4: return score r1 = prices[-1] / prices[-2] r2 = prices[-2] / prices[-3] r3 = prices[-3] / prices[-4] # 条件1:任一天跌>5% con1 = min(r1, r2, r3) < 0.95 # 条件2:连续下跌且累计跌>5% con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95) if con1 or con2: return 0.0 return score def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series: """ 计算斜率×R²趋势得分序列 Args: price_series: 价格序列 n: 滚动窗口天数 Returns: Series: 趋势得分序列 """ return price_series.rolling(n).apply( lambda x: _slope_r2_score(x, n), raw=False ) def calculate_daily_return(price_series: pd.Series) -> pd.Series: """ 计算日收益率 Args: price_series: 价格序列 Returns: Series: 日收益率 """ return price_series / price_series.shift(1) - 1 def _is_china_index(code: str) -> bool: """判断是否为A股指数""" return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS') def compute_factors( index_data: pd.DataFrame, code_list: list, n: int = 25, factor_type: str = "slope_r2", etf_data: pd.DataFrame = None, code_config: dict = None, index_ohlcv_data: dict = None, auto_day: bool = False, min_days: int = 20, max_days: int = 60, ) -> tuple[pd.DataFrame, list]: """ 计算所有指数的因子和日收益率(横截面策略版本) Args: index_data: 宽格式指数收盘价数据 (对齐后) code_list: 标的代码列表 n: 默认窗口天数 factor_type: 因子类型 ('momentum', 'slope_r2', 'weighted_momentum') etf_data: 宽格式ETF收盘价数据 (用于收益计算) code_config: 代码配置字典 index_ohlcv_data: 原始指数OHLCV数据字典 {code: df} auto_day: 是否启用动态ATR周期 min_days: 动态周期最小值 max_days: 动态周期最大值 """ a_share_dates = index_data.index # 为每个标的单独计算指标,然后对齐到A股交易日历 result = pd.DataFrame(index=a_share_dates) # 使用一个新的列表来存储真正的有效代码 processed_codes = [] for code in code_list: # 优先使用 OHLCV 数据(如果提供) if index_ohlcv_data and code in index_ohlcv_data: df = index_ohlcv_data[code].dropna() else: # 退而求其次使用 index_data 中的 close if code not in index_data: continue df = pd.DataFrame({'close': index_data[code].dropna()}) if len(df) < n + 1: print(f" ⚠ 剔除 {code}: 数据不足 ({len(df)} < {n+1})") continue # 按照该标的自己的交易日历计算指标 if auto_day and 'high' in df.columns and 'low' in df.columns: # 动态周期逻辑 long_atr = calculate_atr(df['high'], df['low'], df['close'], max_days) short_atr = calculate_atr(df['high'], df['low'], df['close'], min_days) # 计算滚动窗口大小 def get_dynamic_n(row, la_col, sa_col): la = row[la_col] sa = row[sa_col] if la > 0 and not np.isnan(la) and not np.isnan(sa): ratio = min(0.9, sa / la) return int(min_days + (max_days - min_days) * (1 - ratio)) return n # 合并ATR到主DF以进行滚动应用 df_temp = df.copy() df_temp['la'] = long_atr df_temp['sa'] = short_atr # 逐日计算得分 (较慢但准确) scores = [] for i in range(len(df_temp)): row = df_temp.iloc[i] d_n = get_dynamic_n(row, 'la', 'sa') if i < d_n: scores.append(np.nan) continue window_prices = df_temp['close'].iloc[i-d_n+1 : i+1].values if factor_type == "weighted_momentum": s = calculate_weighted_momentum_score(window_prices) else: s = _slope_r2_score(pd.Series(window_prices), d_n) # 应用崩盘过滤 s = apply_crash_filter(df_temp['close'].iloc[:i+1].values, s) scores.append(s) factor_series = pd.Series(scores, index=df.index) else: # 固定周期逻辑 if factor_type == "momentum": factor_series = calculate_momentum(df['close'], n) elif factor_type == "slope_r2": factor_series = calculate_slope_r2(df['close'], n) elif factor_type == "weighted_momentum": factor_series = df['close'].rolling(n).apply( lambda x: apply_crash_filter(df['close'].loc[:x.index[-1]].values, calculate_weighted_momentum_score(x.values)), raw=False ) else: raise ValueError(f"不支持的因子类型: {factor_type}") # 对齐到A股交易日历 price_aligned = df['close'].reindex(a_share_dates, method='ffill') factor_aligned = factor_series.reindex(a_share_dates, method='ffill') # 使用传入的ETF数据计算收益(如果有) if etf_data is not None and code in etf_data: return_aligned = calculate_daily_return(etf_data[code].reindex(a_share_dates, method='ffill')) else: return_aligned = calculate_daily_return(price_aligned) result[code] = price_aligned result[f"得分_{code}"] = factor_aligned result[f"日收益率_{code}"] = return_aligned processed_codes.append(code) # 过滤掉缺失值过多的指数 total_rows = len(result) final_valid_codes = [] for code in processed_codes: null_pct = result[code].isnull().sum() / total_rows if null_pct > 0.5: print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高") result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore') else: final_valid_codes.append(code) return result, final_valid_codes