""" 动量因子计算模块 支持两种动量因子: 1. N日涨幅(简单动量) 2. 斜率×R²趋势得分(改进版) """ import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series: """ 计算 N 日涨幅(简单动量) Args: price_series: 价格序列 n: 动量窗口天数 Returns: Series: N日涨幅 """ return price_series / price_series.shift(n + 1) - 1.0 def _slope_r2_score(srs: pd.Series, n: int = 25) -> float: """ 单次计算斜率×R²趋势得分 Args: srs: 价格窗口序列(长度为 n) n: 窗口长度 Returns: float: 斜率 × R² × 10000 """ if srs.shape[0] < n: return np.nan x = np.arange(1, n + 1).reshape(-1, 1) y = srs.values / srs.values[0] # 归一化 lr = LinearRegression().fit(x, y) slope = lr.coef_[0] r_squared = lr.score(x, y) score = 10000 * slope * r_squared return score def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series: """ 计算斜率×R²趋势得分序列 Args: price_series: 价格序列 n: 滚动窗口天数 Returns: Series: 趋势得分序列 """ return price_series.rolling(n).apply( lambda x: _slope_r2_score(x, n), raw=False ) def calculate_daily_return(price_series: pd.Series) -> pd.Series: """ 计算日收益率 Args: price_series: 价格序列 Returns: Series: 日收益率 """ return price_series / price_series.shift(1) - 1 def _is_china_index(code: str) -> bool: """判断是否为A股指数""" return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS') def compute_factors( index_data: pd.DataFrame, code_list: list, n: int = 25, factor_type: str = "slope_r2", etf_data: pd.DataFrame = None, code_config: dict = None, ) -> tuple[pd.DataFrame, list]: """ 计算所有指数的因子和日收益率(横截面策略版本) 核心逻辑: 1. 每个标的按照自己的交易日历计算技术指标 2. 对齐到A股交易日历(取离A股交易日最近的有效数据,不使用未来数据) 3. 严格控制T+1规则:T日收盘计算信号,使用T日及之前的数据 Args: index_data: 指数价格数据(宽格式,已对齐到A股交易日历,非A股可能有NaN) code_list: 指数代码列表 n: 动量/趋势窗口 factor_type: 'momentum' 或 'slope_r2' etf_data: ETF价格数据(宽格式,用于收益计算) code_config: 代码配置字典 {code: {name, etf, market}} Returns: tuple: (result_df, valid_codes) - result_df: 包含因子得分和日收益率的DataFrame(按A股交易日对齐) - valid_codes: 有效代码列表 """ code_config = code_config or {} # 如果没有提供ETF数据,创建一个空的DataFrame if etf_data is None: etf_data = pd.DataFrame() # 获取A股交易日历(index_data的索引) a_share_dates = index_data.index # 过滤有效代码 valid_codes = [] for code in code_list: if code not in index_data.columns: print(f" ⚠ 跳过 {code}: 不在数据中") continue valid_codes.append(code) # 为每个标的单独计算指标,然后对齐到A股交易日历 result = pd.DataFrame(index=a_share_dates) for code in valid_codes: # 获取该标的的原始价格数据(去除NaN) price_series = index_data[code].dropna() if len(price_series) < n + 1: print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})") valid_codes.remove(code) continue # 按照该标的自己的交易日历计算指标 if factor_type == "momentum": factor_series = calculate_momentum(price_series, n) elif factor_type == "slope_r2": factor_series = calculate_slope_r2(price_series, n) else: raise ValueError(f"不支持的因子类型: {factor_type}") # 计算日收益率 return_series = calculate_daily_return(price_series) # 对齐到A股交易日历:取离A股交易日最近的有效数据(不使用未来数据) # 使用reindex + method='ffill',确保T日使用T日或之前的数据 result[code] = price_series.reindex(a_share_dates, method='ffill') result[f"得分_{code}"] = factor_series.reindex(a_share_dates, method='ffill') result[f"日收益率_{code}"] = return_series.reindex(a_share_dates, method='ffill') # 过滤掉缺失值过多的指数(基于A股交易日历) total_rows = len(result) final_valid_codes = [] for code in valid_codes: null_pct = result[code].isnull().sum() / total_rows if null_pct > 0.2: print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高") result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore') else: final_valid_codes.append(code) # 按得分列做 dropna(确保所有标的同时有数据) score_cols = [f"得分_{code}" for code in final_valid_codes] result = result.dropna(subset=score_cols) print("\n因子计算完成:") print(f" 因子类型: {factor_type}") print(f" 窗口天数: {n}") print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}") print(f" 有效数据: {len(result)} 行") print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}") if etf_data is not index_data and not etf_data.empty: print(f" 使用ETF数据计算收益: ✓") return result, final_valid_codes