""" 因子检验模块:IC检验、分组回测、因子跨度回归 """ import numpy as np import pandas as pd from typing import Dict, List, Tuple from statsmodels.regression.linear_model import OLS def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series: """ 计算IC(信息系数) Parameters: ----------- factor : Series 因子值 forward_return : Series 未来收益率 method : str 相关性计算方法:'spearman' 或 'pearson' """ aligned = pd.concat([factor, forward_return], axis=1).dropna() if len(aligned) < 10: return pd.Series(dtype=float) if method == 'spearman': ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank()) else: ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1]) return pd.Series([ic], index=[aligned.index[-1]]) def compute_rolling_ic( factor: pd.Series, forward_return: pd.Series, window: int = 30, method: str = 'spearman' ) -> pd.Series: """计算滚动IC(向量化优化)""" # 对齐数据 aligned = pd.concat([factor, forward_return], axis=1).dropna() if len(aligned) < window: return pd.Series(dtype=float, index=factor.index[window:]) aligned.columns = ['factor', 'return'] if method == 'spearman': # 使用rank计算Spearman相关性 factor_rank = aligned['factor'].rank() return_rank = aligned['return'].rank() # 使用DataFrame的rolling().corr()方法 df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank}) ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return']) else: # Pearson相关性 df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']}) ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return']) return ic_series def group_backtest( factor: pd.Series, forward_return: pd.Series, n_groups: int = 3, group_period: int = 180 ) -> Dict: """ 分组回测:将数据按因子值分组,计算各组收益 Returns: -------- dict: 包含各组收益、H-L收益差、t统计量等 """ aligned = pd.concat([factor, forward_return], axis=1).dropna() aligned.columns = ['factor', 'return'] results = { 'group_returns': [], 'h_l_return': [], 'h_l_tstat': [], 'periods': [] } # 按月分组(每180个4h周期)- 使用更高效的步长 step = max(group_period // 2, 90) # 减少重叠计算 for start in range(0, len(aligned) - group_period, step): end = start + group_period period_data = aligned.iloc[start:end] if len(period_data) < 30: continue # 按因子值分组(向量化) try: period_data = period_data.copy() period_data['group'] = pd.qcut( period_data['factor'], q=n_groups, labels=False, duplicates='drop' ) # 计算各组收益(向量化) group_returns = period_data.groupby('group')['return'].mean() results['group_returns'].append(group_returns) # H-L收益差 if len(group_returns) >= 2: h_return = group_returns.iloc[-1] # 高因子组 l_return = group_returns.iloc[0] # 低因子组 h_l_diff = h_return - l_return results['h_l_return'].append(h_l_diff) results['periods'].append(period_data.index[-1]) except (ValueError, KeyError): # qcut失败时跳过 continue # 计算平均H-L收益和t统计量 if results['h_l_return']: h_l_series = pd.Series(results['h_l_return'], index=results['periods']) mean_h_l = h_l_series.mean() std_h_l = h_l_series.std() t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8) results['mean_h_l_return'] = mean_h_l results['mean_h_l_tstat'] = t_stat results['h_l_series'] = h_l_series else: results['mean_h_l_return'] = 0 results['mean_h_l_tstat'] = 0 return results def factor_span_regression( factors: pd.DataFrame, forward_return: pd.Series, target_factor: str ) -> Dict: """ 因子跨度回归:检验因子的边际解释力 Parameters: ----------- factors : DataFrame 所有因子数据框 forward_return : Series 未来收益率 target_factor : str 目标因子名称 Returns: -------- dict: 包含回归系数、t统计量、R²等 """ # 对齐数据 data = pd.concat([factors, forward_return], axis=1).dropna() if len(data) < 30: return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} y = data.iloc[:, -1].values X_all = data.iloc[:, :-1].values # 全模型(包含目标因子) try: model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) r2_all = model_all.rsquared # 目标因子的系数和t统计量 target_idx = factors.columns.get_loc(target_factor) beta = model_all.params[target_idx] tstat = model_all.tvalues[target_idx] # 不含目标因子的模型 X_without = np.delete(X_all, target_idx, axis=1) model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6}) r2_without = model_without.rsquared r2_change = r2_all - r2_without return { 'beta': beta, 'tstat': tstat, 'r2': r2_all, 'r2_change': r2_change, 'pvalue': model_all.pvalues[target_idx] } except Exception as e: print(f"回归分析出错: {e}") return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0} def validate_factor( factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3 ) -> Dict: """ 综合因子检验 Returns: -------- dict: 包含IC、分组回测、显著性等指标 """ # IC检验 rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window) mean_ic = rolling_ic.mean() ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率 # 分组回测 group_result = group_backtest(factor, forward_return, n_groups=n_groups) return { 'mean_ic': mean_ic, 'ic_ir': ic_ir, 'ic_series': rolling_ic, 'mean_h_l_return': group_result['mean_h_l_return'], 'mean_h_l_tstat': group_result['mean_h_l_tstat'], 'group_returns': group_result['group_returns'] }