""" 动量因子计算模块 支持两种动量因子: 1. N日涨幅(简单动量) 2. 斜率×R²趋势得分(改进版) """ import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series: """ 计算 N 日涨幅(简单动量) Args: price_series: 价格序列 n: 动量窗口天数 Returns: Series: N日涨幅 """ return price_series / price_series.shift(n + 1) - 1.0 def _slope_r2_score(srs: pd.Series, n: int = 25) -> float: """ 单次计算斜率×R²趋势得分 Args: srs: 价格窗口序列(长度为 n) n: 窗口长度 Returns: float: 斜率 × R² × 10000 """ if srs.shape[0] < n: return np.nan x = np.arange(1, n + 1).reshape(-1, 1) y = srs.values / srs.values[0] # 归一化 lr = LinearRegression().fit(x, y) slope = lr.coef_[0] r_squared = lr.score(x, y) score = 10000 * slope * r_squared return score def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series: """ 计算斜率×R²趋势得分序列 Args: price_series: 价格序列 n: 滚动窗口天数 Returns: Series: 趋势得分序列 """ return price_series.rolling(n).apply( lambda x: _slope_r2_score(x, n), raw=False ) def calculate_daily_return(price_series: pd.Series) -> pd.Series: """ 计算日收益率 Args: price_series: 价格序列 Returns: Series: 日收益率 """ return price_series / price_series.shift(1) - 1 def compute_factors( etf_data: pd.DataFrame, code_list: list, n: int = 25, factor_type: str = "slope_r2", ) -> tuple[pd.DataFrame, list]: """ 计算所有指数的因子和日收益率 支持长格式数据(混合数据源:Tushare + YFinance) Args: etf_data: DataFrame, 长格式数据,包含 [code, close, source] 列 code_list: 指数代码列表 n: 动量/趋势窗口 factor_type: 'momentum' 或 'slope_r2' Returns: tuple: (result_df, valid_codes) """ # 检查数据格式 if 'code' in etf_data.columns: # 长格式数据 - 按 code 分别计算因子(旧逻辑,保留兼容) all_factors = [] valid_codes = [] for code in code_list: code_data = etf_data[etf_data['code'] == code].copy() if len(code_data) == 0: print(f" ⚠ 跳过 {code}: 不在数据中") continue # 检查缺失值 null_pct = code_data['close'].isnull().sum() / len(code_data) if null_pct > 0.2: print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") continue # 按日期排序 code_data = code_data.sort_index() # 计算日收益率和因子 code_data[f"日收益率_{code}"] = calculate_daily_return(code_data['close']) if factor_type == "momentum": code_data[f"得分_{code}"] = calculate_momentum(code_data['close'], n) elif factor_type == "slope_r2": code_data[f"得分_{code}"] = calculate_slope_r2(code_data['close'], n) else: raise ValueError(f"不支持的因子类型: {factor_type}") # 保留需要的列 code_data = code_data[[f"日收益率_{code}", f"得分_{code}"]] all_factors.append(code_data) valid_codes.append(code) if not all_factors: raise ValueError("没有有效的指数数据") # 合并所有因子的数据(按日期内连接 - 只保留所有指数都有数据的日期) result = all_factors[0] for df in all_factors[1:]: result = result.join(df, how='inner') # 删除所有得分都是 NaN 的行(即窗口期内的数据) score_cols = [f"得分_{code}" for code in valid_codes] # 只删除完全无法比较的行(所有得分都是NaN) result = result.dropna(subset=score_cols, how='all') else: # 宽格式数据(向后兼容) result = etf_data.copy() # 过滤掉缺失值过多的指数 total_rows = len(result) valid_codes = [] for code in code_list: if code not in result.columns: print(f" ⚠ 跳过 {code}: 不在数据中") continue null_pct = result[code].isnull().sum() / total_rows if null_pct > 0.2: print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") result = result.drop(columns=[code]) else: valid_codes.append(code) # 对有效指数计算因子 for code in valid_codes: result[f"日收益率_{code}"] = calculate_daily_return(result[code]) if factor_type == "momentum": result[f"得分_{code}"] = calculate_momentum(result[code], n) elif factor_type == "slope_r2": result[f"得分_{code}"] = calculate_slope_r2(result[code], n) else: raise ValueError(f"不支持的因子类型: {factor_type}") # 按得分列做 dropna score_cols = [f"得分_{code}" for code in valid_codes] result = result.dropna(subset=score_cols) print("\n因子计算完成:") print(f" 因子类型: {factor_type}") print(f" 窗口天数: {n}") print(f" 有效指数: {len(valid_codes)}/{len(code_list)}") print(f" 有效数据: {len(result)} 行") return result, valid_codes