""" 因子检验模块: IC检验、分组回测、因子跨度回归 """ import numpy as np import pandas as pd from typing import Dict, List, Tuple from statsmodels.regression.linear_model import OLS def compute_rolling_ic( factor: pd.Series, forward_return: pd.Series, window: int = 30, method: str = "spearman", ) -> pd.Series: """计算滚动IC (向量化优化)""" # 对齐数据 aligned = pd.concat([factor, forward_return], axis=1).dropna() if len(aligned) < window: return pd.Series(dtype=float, index=factor.index[window:]) aligned.columns = ["factor", "return"] if method == "spearman": # 使用rank计算Spearman相关性 # 这里是全局的 rank,理论上应该是按照 window 滚动排序 factor_rank = aligned["factor"].rank() return_rank = aligned["return"].rank() # 使用DataFrame的rolling().corr()方法, 该方法pandas优化过 df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank}) ic_series = ( df_rank["factor"] .rolling(window, min_periods=window) .corr(df_rank["return"]) ) else: # Pearson相关性 df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]}) ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"]) return ic_series def group_backtest( factor: pd.Series, forward_return: pd.Series, n_groups: int = 3, group_period: int = 180, ) -> Dict: """ 分组回测:将数据按因子值分组,计算各组收益 Returns: -------- dict: 包含各组收益、H-L收益差、t统计量等 """ aligned = pd.concat([factor, forward_return], axis=1).dropna() aligned.columns = ["factor", "return"] results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []} # 按月分组(每180个4h周期)- 使用更高效的步长 step = max(group_period // 2, 90) # 减少重叠计算 for start in range(0, len(aligned) - group_period, step): end = start + group_period period_data = aligned.iloc[start:end] if len(period_data) < 30: continue # 按因子值分组(向量化) try: period_data = period_data.copy() period_data["group"] = pd.qcut( period_data["factor"], q=n_groups, labels=False, duplicates="drop" ) # 计算各组收益(向量化) group_returns = period_data.groupby("group")["return"].mean() results["group_returns"].append(group_returns) # H-L收益差 if len(group_returns) >= 2: h_return = group_returns.iloc[-1] # 高因子组 l_return = group_returns.iloc[0] # 低因子组 h_l_diff = h_return - l_return results["h_l_return"].append(h_l_diff) results["periods"].append(period_data.index[-1]) except (ValueError, KeyError): # qcut失败时跳过 continue # 计算平均H-L收益和t统计量 if results["h_l_return"]: h_l_series = pd.Series(results["h_l_return"], index=results["periods"]) mean_h_l = h_l_series.mean() std_h_l = h_l_series.std() t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8) results["mean_h_l_return"] = mean_h_l results["mean_h_l_tstat"] = t_stat results["h_l_series"] = h_l_series else: results["mean_h_l_return"] = 0 results["mean_h_l_tstat"] = 0 return results def factor_span_regression( factors: pd.DataFrame, forward_return: pd.Series, target_factor: str ) -> Dict: """ 因子跨度回归:检验因子的边际解释力 Parameters: ----------- factors : DataFrame 所有因子数据框 forward_return : Series 未来收益率 target_factor : str 目标因子名称 Returns: -------- dict: 包含回归系数、t统计量、R²等 """ # 对齐数据 data = pd.concat([factors, forward_return], axis=1).dropna() if len(data) < 30: return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0} y = data.iloc[:, -1].values X_all = data.iloc[:, :-1].values # 全模型(包含目标因子) try: model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6}) r2_all = model_all.rsquared # 目标因子的系数和t统计量 target_idx = factors.columns.get_loc(target_factor) beta = model_all.params[target_idx] tstat = model_all.tvalues[target_idx] # 不含目标因子的模型 X_without = np.delete(X_all, target_idx, axis=1) model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6}) r2_without = model_without.rsquared r2_change = r2_all - r2_without return { "beta": beta, "tstat": tstat, "r2": r2_all, "r2_change": r2_change, "pvalue": model_all.pvalues[target_idx], } except Exception as e: print(f"回归分析出错: {e}") return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0} def validate_factor( factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3 ) -> Dict: """ 综合因子检验 Returns: -------- dict: 包含IC、分组回测、显著性等指标 """ # IC检验 rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window) mean_ic = rolling_ic.mean() ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率 # 分组回测 group_result = group_backtest(factor, forward_return, n_groups=n_groups) return { "mean_ic": mean_ic, "ic_ir": ic_ir, "ic_series": rolling_ic, "mean_h_l_return": group_result["mean_h_l_return"], "mean_h_l_tstat": group_result["mean_h_l_tstat"], "group_returns": group_result["group_returns"], }