Files
etf/core/factors/momentum.py
aszerW 9ea84f0e57 feat(rotation): 支持混合数据源并优化因子计算和策略逻辑
- 删除旧的Tushare Token环境变量函数,简化配置
- 在配置文件中新增全市场指数及SSH隧道配置支持YFinance数据访问
- 更新compute_factors函数,支持长格式混合数据源,兼容旧宽格式数据
- 修改RotationStrategy使用HybridDataSource,支持Tushare与YFinance数据源混合
- 添加SSH隧道支持,实现安全访问非主市场数据
- 优化因子计算逻辑,提升缺失值处理和因子合并的鲁棒性
- 修正基准净值计算,兼容长宽格式基准数据处理
- 增强信号生成逻辑,处理因子得分中的NaN情况防止异常
2026-03-19 20:38:13 +08:00

190 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子计算模块
支持两种动量因子:
1. N日涨幅简单动量
2. 斜率×R²趋势得分改进版
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
"""
计算 N 日涨幅(简单动量)
Args:
price_series: 价格序列
n: 动量窗口天数
Returns:
Series: N日涨幅
"""
return price_series / price_series.shift(n + 1) - 1.0
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
"""
单次计算斜率×R²趋势得分
Args:
srs: 价格窗口序列(长度为 n
n: 窗口长度
Returns:
float: 斜率 ×× 10000
"""
if srs.shape[0] < n:
return np.nan
x = np.arange(1, n + 1).reshape(-1, 1)
y = srs.values / srs.values[0] # 归一化
lr = LinearRegression().fit(x, y)
slope = lr.coef_[0]
r_squared = lr.score(x, y)
score = 10000 * slope * r_squared
return score
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
"""
计算斜率×R²趋势得分序列
Args:
price_series: 价格序列
n: 滚动窗口天数
Returns:
Series: 趋势得分序列
"""
return price_series.rolling(n).apply(
lambda x: _slope_r2_score(x, n), raw=False
)
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
"""
计算日收益率
Args:
price_series: 价格序列
Returns:
Series: 日收益率
"""
return price_series / price_series.shift(1) - 1
def compute_factors(
etf_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率
支持长格式数据混合数据源Tushare + YFinance
Args:
etf_data: DataFrame, 长格式数据,包含 [code, close, source] 列
code_list: 指数代码列表
n: 动量/趋势窗口
factor_type: 'momentum''slope_r2'
Returns:
tuple: (result_df, valid_codes)
"""
# 检查数据格式
if 'code' in etf_data.columns:
# 长格式数据 - 按 code 分别计算因子(旧逻辑,保留兼容)
all_factors = []
valid_codes = []
for code in code_list:
code_data = etf_data[etf_data['code'] == code].copy()
if len(code_data) == 0:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
# 检查缺失值
null_pct = code_data['close'].isnull().sum() / len(code_data)
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
continue
# 按日期排序
code_data = code_data.sort_index()
# 计算日收益率和因子
code_data[f"日收益率_{code}"] = calculate_daily_return(code_data['close'])
if factor_type == "momentum":
code_data[f"得分_{code}"] = calculate_momentum(code_data['close'], n)
elif factor_type == "slope_r2":
code_data[f"得分_{code}"] = calculate_slope_r2(code_data['close'], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 保留需要的列
code_data = code_data[[f"日收益率_{code}", f"得分_{code}"]]
all_factors.append(code_data)
valid_codes.append(code)
if not all_factors:
raise ValueError("没有有效的指数数据")
# 合并所有因子的数据(按日期内连接 - 只保留所有指数都有数据的日期)
result = all_factors[0]
for df in all_factors[1:]:
result = result.join(df, how='inner')
# 删除所有得分都是 NaN 的行(即窗口期内的数据)
score_cols = [f"得分_{code}" for code in valid_codes]
# 只删除完全无法比较的行所有得分都是NaN
result = result.dropna(subset=score_cols, how='all')
else:
# 宽格式数据(向后兼容)
result = etf_data.copy()
# 过滤掉缺失值过多的指数
total_rows = len(result)
valid_codes = []
for code in code_list:
if code not in result.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code])
else:
valid_codes.append(code)
# 对有效指数计算因子
for code in valid_codes:
result[f"日收益率_{code}"] = calculate_daily_return(result[code])
if factor_type == "momentum":
result[f"得分_{code}"] = calculate_momentum(result[code], n)
elif factor_type == "slope_r2":
result[f"得分_{code}"] = calculate_slope_r2(result[code], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 按得分列做 dropna
score_cols = [f"得分_{code}" for code in valid_codes]
result = result.dropna(subset=score_cols)
print("\n因子计算完成:")
print(f" 因子类型: {factor_type}")
print(f" 窗口天数: {n}")
print(f" 有效指数: {len(valid_codes)}/{len(code_list)}")
print(f" 有效数据: {len(result)}")
return result, valid_codes