添加talib算子

This commit is contained in:
2025-11-09 20:19:08 +08:00
parent dc3d41d6e5
commit e5beada25e
5 changed files with 512 additions and 404 deletions

View File

@@ -1,63 +1,44 @@
"""
因子检验模块IC检验、分组回测、因子跨度回归
因子检验模块: IC检验、分组回测、因子跨度回归
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from statsmodels.regression.linear_model import OLS
def compute_ic(factor: pd.Series, forward_return: pd.Series, method: str = 'spearman') -> pd.Series:
"""
计算IC信息系数
Parameters:
-----------
factor : Series
因子值
forward_return : Series
未来收益率
method : str
相关性计算方法:'spearman''pearson'
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < 10:
return pd.Series(dtype=float)
if method == 'spearman':
ic = aligned.iloc[:, 0].rank().corr(aligned.iloc[:, 1].rank())
else:
ic = aligned.iloc[:, 0].corr(aligned.iloc[:, 1])
return pd.Series([ic], index=[aligned.index[-1]])
def compute_rolling_ic(
factor: pd.Series,
forward_return: pd.Series,
window: int = 30,
method: str = 'spearman'
method: str = "spearman",
) -> pd.Series:
"""计算滚动IC向量化优化"""
"""计算滚动IC (向量化优化)"""
# 对齐数据
aligned = pd.concat([factor, forward_return], axis=1).dropna()
if len(aligned) < window:
return pd.Series(dtype=float, index=factor.index[window:])
aligned.columns = ['factor', 'return']
if method == 'spearman':
aligned.columns = ["factor", "return"]
if method == "spearman":
# 使用rank计算Spearman相关性
factor_rank = aligned['factor'].rank()
return_rank = aligned['return'].rank()
# 使用DataFrame的rolling().corr()方法
df_rank = pd.DataFrame({'factor': factor_rank, 'return': return_rank})
ic_series = df_rank['factor'].rolling(window, min_periods=window).corr(df_rank['return'])
# 这里是全局的 rank理论上应该是按照 window 滚动排序
factor_rank = aligned["factor"].rank()
return_rank = aligned["return"].rank()
# 使用DataFrame的rolling().corr()方法, 该方法pandas优化过
df_rank = pd.DataFrame({"factor": factor_rank, "return": return_rank})
ic_series = (
df_rank["factor"]
.rolling(window, min_periods=window)
.corr(df_rank["return"])
)
else:
# Pearson相关性
df = pd.DataFrame({'factor': aligned['factor'], 'return': aligned['return']})
ic_series = df['factor'].rolling(window, min_periods=window).corr(df['return'])
df = pd.DataFrame({"factor": aligned["factor"], "return": aligned["return"]})
ic_series = df["factor"].rolling(window, min_periods=window).corr(df["return"])
return ic_series
@@ -65,85 +46,75 @@ def group_backtest(
factor: pd.Series,
forward_return: pd.Series,
n_groups: int = 3,
group_period: int = 180
group_period: int = 180,
) -> Dict:
"""
分组回测:将数据按因子值分组,计算各组收益
Returns:
--------
dict: 包含各组收益、H-L收益差、t统计量等
"""
aligned = pd.concat([factor, forward_return], axis=1).dropna()
aligned.columns = ['factor', 'return']
results = {
'group_returns': [],
'h_l_return': [],
'h_l_tstat': [],
'periods': []
}
aligned.columns = ["factor", "return"]
results = {"group_returns": [], "h_l_return": [], "h_l_tstat": [], "periods": []}
# 按月分组每180个4h周期- 使用更高效的步长
step = max(group_period // 2, 90) # 减少重叠计算
for start in range(0, len(aligned) - group_period, step):
end = start + group_period
period_data = aligned.iloc[start:end]
if len(period_data) < 30:
continue
# 按因子值分组(向量化)
try:
period_data = period_data.copy()
period_data['group'] = pd.qcut(
period_data['factor'],
q=n_groups,
labels=False,
duplicates='drop'
period_data["group"] = pd.qcut(
period_data["factor"], q=n_groups, labels=False, duplicates="drop"
)
# 计算各组收益(向量化)
group_returns = period_data.groupby('group')['return'].mean()
results['group_returns'].append(group_returns)
group_returns = period_data.groupby("group")["return"].mean()
results["group_returns"].append(group_returns)
# H-L收益差
if len(group_returns) >= 2:
h_return = group_returns.iloc[-1] # 高因子组
l_return = group_returns.iloc[0] # 低因子组
l_return = group_returns.iloc[0] # 低因子组
h_l_diff = h_return - l_return
results['h_l_return'].append(h_l_diff)
results['periods'].append(period_data.index[-1])
results["h_l_return"].append(h_l_diff)
results["periods"].append(period_data.index[-1])
except (ValueError, KeyError):
# qcut失败时跳过
continue
# 计算平均H-L收益和t统计量
if results['h_l_return']:
h_l_series = pd.Series(results['h_l_return'], index=results['periods'])
if results["h_l_return"]:
h_l_series = pd.Series(results["h_l_return"], index=results["periods"])
mean_h_l = h_l_series.mean()
std_h_l = h_l_series.std()
t_stat = mean_h_l / (std_h_l / np.sqrt(len(h_l_series)) + 1e-8)
results['mean_h_l_return'] = mean_h_l
results['mean_h_l_tstat'] = t_stat
results['h_l_series'] = h_l_series
results["mean_h_l_return"] = mean_h_l
results["mean_h_l_tstat"] = t_stat
results["h_l_series"] = h_l_series
else:
results['mean_h_l_return'] = 0
results['mean_h_l_tstat'] = 0
results["mean_h_l_return"] = 0
results["mean_h_l_tstat"] = 0
return results
def factor_span_regression(
factors: pd.DataFrame,
forward_return: pd.Series,
target_factor: str
factors: pd.DataFrame, forward_return: pd.Series, target_factor: str
) -> Dict:
"""
因子跨度回归:检验因子的边际解释力
Parameters:
-----------
factors : DataFrame
@@ -152,7 +123,7 @@ def factor_span_regression(
未来收益率
target_factor : str
目标因子名称
Returns:
--------
dict: 包含回归系数、t统计量、R²等
@@ -160,49 +131,46 @@ def factor_span_regression(
# 对齐数据
data = pd.concat([factors, forward_return], axis=1).dropna()
if len(data) < 30:
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
y = data.iloc[:, -1].values
X_all = data.iloc[:, :-1].values
# 全模型(包含目标因子)
try:
model_all = OLS(y, X_all).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_all = OLS(y, X_all).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_all = model_all.rsquared
# 目标因子的系数和t统计量
target_idx = factors.columns.get_loc(target_factor)
beta = model_all.params[target_idx]
tstat = model_all.tvalues[target_idx]
# 不含目标因子的模型
X_without = np.delete(X_all, target_idx, axis=1)
model_without = OLS(y, X_without).fit(cov_type='HAC', cov_kwds={'maxlags': 6})
model_without = OLS(y, X_without).fit(cov_type="HAC", cov_kwds={"maxlags": 6})
r2_without = model_without.rsquared
r2_change = r2_all - r2_without
return {
'beta': beta,
'tstat': tstat,
'r2': r2_all,
'r2_change': r2_change,
'pvalue': model_all.pvalues[target_idx]
"beta": beta,
"tstat": tstat,
"r2": r2_all,
"r2_change": r2_change,
"pvalue": model_all.pvalues[target_idx],
}
except Exception as e:
print(f"回归分析出错: {e}")
return {'beta': 0, 'tstat': 0, 'r2': 0, 'r2_change': 0}
return {"beta": 0, "tstat": 0, "r2": 0, "r2_change": 0}
def validate_factor(
factor: pd.Series,
forward_return: pd.Series,
ic_window: int = 30,
n_groups: int = 3
factor: pd.Series, forward_return: pd.Series, ic_window: int = 30, n_groups: int = 3
) -> Dict:
"""
综合因子检验
Returns:
--------
dict: 包含IC、分组回测、显著性等指标
@@ -211,16 +179,15 @@ def validate_factor(
rolling_ic = compute_rolling_ic(factor, forward_return, window=ic_window)
mean_ic = rolling_ic.mean()
ic_ir = mean_ic / (rolling_ic.std() + 1e-8) # IC信息比率
# 分组回测
group_result = group_backtest(factor, forward_return, n_groups=n_groups)
return {
'mean_ic': mean_ic,
'ic_ir': ic_ir,
'ic_series': rolling_ic,
'mean_h_l_return': group_result['mean_h_l_return'],
'mean_h_l_tstat': group_result['mean_h_l_tstat'],
'group_returns': group_result['group_returns']
}
return {
"mean_ic": mean_ic,
"ic_ir": ic_ir,
"ic_series": rolling_ic,
"mean_h_l_return": group_result["mean_h_l_return"],
"mean_h_l_tstat": group_result["mean_h_l_tstat"],
"group_returns": group_result["group_returns"],
}