Files
etf/core/factors/momentum.py
aszerW 70bb69fd98 fix(core): 修复计算与数据对齐等多处逻辑问题
- 修正CAGR计算,去除NaN并检查起始值有效性以避免异常结果
- 优化混合数据源的数据对齐逻辑,使用配置结束日期与A股最新数据日期的较早者
- 计算因子时对齐A股交易日历,重新基于对齐价格计算日收益率,改进因子对齐准确度
- 轮动策略中跳过空信号,避免空信号影响持仓和调仓逻辑
- 调整信号处理,过滤空字符串和NaN,保证轮动信号数据有效性
- 多品种轮动持仓中加入空信号判断,避免无效信号导致错误
- 调整调仓明细和品种汇总保存逻辑,增加空文件创建以保证输出路径文件稳定生成
- 完善多处打印信息和注释,增强代码可读性与调试便利性
2026-03-26 22:21:38 +08:00

192 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子计算模块
支持两种动量因子:
1. N日涨幅简单动量
2. 斜率×R²趋势得分改进版
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
"""
计算 N 日涨幅(简单动量)
Args:
price_series: 价格序列
n: 动量窗口天数
Returns:
Series: N日涨幅
"""
return price_series / price_series.shift(n + 1) - 1.0
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
"""
单次计算斜率×R²趋势得分
Args:
srs: 价格窗口序列(长度为 n
n: 窗口长度
Returns:
float: 斜率 ×× 10000
"""
if srs.shape[0] < n:
return np.nan
x = np.arange(1, n + 1).reshape(-1, 1)
y = srs.values / srs.values[0] # 归一化
lr = LinearRegression().fit(x, y)
slope = lr.coef_[0]
r_squared = lr.score(x, y)
score = 10000 * slope * r_squared
return score
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
"""
计算斜率×R²趋势得分序列
Args:
price_series: 价格序列
n: 滚动窗口天数
Returns:
Series: 趋势得分序列
"""
return price_series.rolling(n).apply(
lambda x: _slope_r2_score(x, n), raw=False
)
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
"""
计算日收益率
Args:
price_series: 价格序列
Returns:
Series: 日收益率
"""
return price_series / price_series.shift(1) - 1
def _is_china_index(code: str) -> bool:
"""判断是否为A股指数"""
return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS')
def compute_factors(
index_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
etf_data: pd.DataFrame = None,
code_config: dict = None,
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率(横截面策略版本)
核心逻辑:
1. 每个标的按照自己的交易日历计算技术指标
2. 对齐到A股交易日历取离A股交易日最近的有效数据不使用未来数据
3. 严格控制T+1规则T日收盘计算信号使用T日及之前的数据
Args:
index_data: 指数价格数据宽格式已对齐到A股交易日历非A股可能有NaN
code_list: 指数代码列表
n: 动量/趋势窗口
factor_type: 'momentum''slope_r2'
etf_data: ETF价格数据宽格式用于收益计算
code_config: 代码配置字典 {code: {name, etf, market}}
Returns:
tuple: (result_df, valid_codes)
- result_df: 包含因子得分和日收益率的DataFrame按A股交易日对齐
- valid_codes: 有效代码列表
"""
code_config = code_config or {}
# 如果没有提供ETF数据创建一个空的DataFrame
if etf_data is None:
etf_data = pd.DataFrame()
# 获取A股交易日历index_data的索引
a_share_dates = index_data.index
# 过滤有效代码
valid_codes = []
for code in code_list:
if code not in index_data.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
valid_codes.append(code)
# 为每个标的单独计算指标然后对齐到A股交易日历
result = pd.DataFrame(index=a_share_dates)
for code in valid_codes:
# 获取该标的的原始价格数据去除NaN
price_series = index_data[code].dropna()
if len(price_series) < n + 1:
print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})")
valid_codes.remove(code)
continue
# 按照该标的自己的交易日历计算指标(使用指数数据)
if factor_type == "momentum":
factor_series = calculate_momentum(price_series, n)
elif factor_type == "slope_r2":
factor_series = calculate_slope_r2(price_series, n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 对齐到A股交易日历价格使用ffill指标使用ffill
# 但日收益率需要基于对齐后的价格重新计算而不是直接ffill
price_aligned = price_series.reindex(a_share_dates, method='ffill')
factor_aligned = factor_series.reindex(a_share_dates, method='ffill')
# 基于对齐后的价格重新计算日收益率
# 这样如果T日没有交易价格被ffill日收益率为0
return_aligned = calculate_daily_return(price_aligned)
result[code] = price_aligned
result[f"得分_{code}"] = factor_aligned
result[f"日收益率_{code}"] = return_aligned
# 过滤掉缺失值过多的指数基于A股交易日历
total_rows = len(result)
final_valid_codes = []
for code in valid_codes:
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore')
else:
final_valid_codes.append(code)
# 注意不做dropna保留所有A股交易日
# 非A股标的在没有数据的日子得分和日收益率会保持NaN或前向填充值
# 这是正常的横截面策略行为T日只交易有数据的标的
score_cols = [f"得分_{code}" for code in final_valid_codes]
print("\n因子计算完成:")
print(f" 因子类型: {factor_type}")
print(f" 窗口天数: {n}")
print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}")
print(f" 有效数据: {len(result)}")
print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}")
if etf_data is not index_data and not etf_data.empty:
print(f" 使用ETF数据计算收益: ✓")
return result, final_valid_codes