Files
etf/core/factors/momentum.py
aszerW ec9c808e6c refactor(momentum): 优化因子计算流程并对齐A股交易日历
- 添加辅助函数判断是否为A股指数
- 调整compute_factors函数结构,分别计算每个标的技术指标
- 严格实现T+1规则,确保信号只用T日及以前数据
- 对齐所有数据到A股交易日历,使用前向填充避免未来数据泄漏
- 增加有效代码有效性检查,剔除数据不足或缺失率过高的标的
- 完善函数注释,明确输入输出及核心逻辑说明
- 优化打印信息,清晰展示因子类型、窗口、有效标的及时间范围
2026-03-26 01:26:14 +08:00

187 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子计算模块
支持两种动量因子:
1. N日涨幅简单动量
2. 斜率×R²趋势得分改进版
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
"""
计算 N 日涨幅(简单动量)
Args:
price_series: 价格序列
n: 动量窗口天数
Returns:
Series: N日涨幅
"""
return price_series / price_series.shift(n + 1) - 1.0
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
"""
单次计算斜率×R²趋势得分
Args:
srs: 价格窗口序列(长度为 n
n: 窗口长度
Returns:
float: 斜率 ×× 10000
"""
if srs.shape[0] < n:
return np.nan
x = np.arange(1, n + 1).reshape(-1, 1)
y = srs.values / srs.values[0] # 归一化
lr = LinearRegression().fit(x, y)
slope = lr.coef_[0]
r_squared = lr.score(x, y)
score = 10000 * slope * r_squared
return score
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
"""
计算斜率×R²趋势得分序列
Args:
price_series: 价格序列
n: 滚动窗口天数
Returns:
Series: 趋势得分序列
"""
return price_series.rolling(n).apply(
lambda x: _slope_r2_score(x, n), raw=False
)
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
"""
计算日收益率
Args:
price_series: 价格序列
Returns:
Series: 日收益率
"""
return price_series / price_series.shift(1) - 1
def _is_china_index(code: str) -> bool:
"""判断是否为A股指数"""
return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS')
def compute_factors(
index_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
etf_data: pd.DataFrame = None,
code_config: dict = None,
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率(横截面策略版本)
核心逻辑:
1. 每个标的按照自己的交易日历计算技术指标
2. 对齐到A股交易日历取离A股交易日最近的有效数据不使用未来数据
3. 严格控制T+1规则T日收盘计算信号使用T日及之前的数据
Args:
index_data: 指数价格数据宽格式已对齐到A股交易日历非A股可能有NaN
code_list: 指数代码列表
n: 动量/趋势窗口
factor_type: 'momentum''slope_r2'
etf_data: ETF价格数据宽格式用于收益计算
code_config: 代码配置字典 {code: {name, etf, market}}
Returns:
tuple: (result_df, valid_codes)
- result_df: 包含因子得分和日收益率的DataFrame按A股交易日对齐
- valid_codes: 有效代码列表
"""
code_config = code_config or {}
# 如果没有提供ETF数据创建一个空的DataFrame
if etf_data is None:
etf_data = pd.DataFrame()
# 获取A股交易日历index_data的索引
a_share_dates = index_data.index
# 过滤有效代码
valid_codes = []
for code in code_list:
if code not in index_data.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
valid_codes.append(code)
# 为每个标的单独计算指标然后对齐到A股交易日历
result = pd.DataFrame(index=a_share_dates)
for code in valid_codes:
# 获取该标的的原始价格数据去除NaN
price_series = index_data[code].dropna()
if len(price_series) < n + 1:
print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})")
valid_codes.remove(code)
continue
# 按照该标的自己的交易日历计算指标
if factor_type == "momentum":
factor_series = calculate_momentum(price_series, n)
elif factor_type == "slope_r2":
factor_series = calculate_slope_r2(price_series, n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 计算日收益率
return_series = calculate_daily_return(price_series)
# 对齐到A股交易日历取离A股交易日最近的有效数据不使用未来数据
# 使用reindex + method='ffill'确保T日使用T日或之前的数据
result[code] = price_series.reindex(a_share_dates, method='ffill')
result[f"得分_{code}"] = factor_series.reindex(a_share_dates, method='ffill')
result[f"日收益率_{code}"] = return_series.reindex(a_share_dates, method='ffill')
# 过滤掉缺失值过多的指数基于A股交易日历
total_rows = len(result)
final_valid_codes = []
for code in valid_codes:
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore')
else:
final_valid_codes.append(code)
# 按得分列做 dropna确保所有标的同时有数据
score_cols = [f"得分_{code}" for code in final_valid_codes]
result = result.dropna(subset=score_cols)
print("\n因子计算完成:")
print(f" 因子类型: {factor_type}")
print(f" 窗口天数: {n}")
print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}")
print(f" 有效数据: {len(result)}")
print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}")
if etf_data is not index_data and not etf_data.empty:
print(f" 使用ETF数据计算收益: ✓")
return result, final_valid_codes