Files
etf/core/factors/momentum.py
aszerW 988c2335fb chore(config): 添加环境变量示例及.gitignore更新
- 新增 .env.example,包含 Tushare API、钉钉机器人和PostgreSQL数据库配置模板
- 更新.gitignore,忽略本地配置文件如 .env.local 和 config_local.py
- 添加对报表文件命名规则的支持,保留示例文件不忽略
- 删除废弃的 chart.py 及相关图表模块代码
- 新增 config/settings.py,实现从环境变量读取配置的统一接口
- 设置数据目录及缓存目录,确保目录存在,提高配置管理规范性
2026-03-18 23:33:40 +08:00

138 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子计算模块
支持两种动量因子:
1. N日涨幅简单动量
2. 斜率×R²趋势得分改进版
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
"""
计算 N 日涨幅(简单动量)
Args:
price_series: 价格序列
n: 动量窗口天数
Returns:
Series: N日涨幅
"""
return price_series / price_series.shift(n + 1) - 1.0
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
"""
单次计算斜率×R²趋势得分
Args:
srs: 价格窗口序列(长度为 n
n: 窗口长度
Returns:
float: 斜率 ×× 10000
"""
if srs.shape[0] < n:
return np.nan
x = np.arange(1, n + 1).reshape(-1, 1)
y = srs.values / srs.values[0] # 归一化
lr = LinearRegression().fit(x, y)
slope = lr.coef_[0]
r_squared = lr.score(x, y)
score = 10000 * slope * r_squared
return score
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
"""
计算斜率×R²趋势得分序列
Args:
price_series: 价格序列
n: 滚动窗口天数
Returns:
Series: 趋势得分序列
"""
return price_series.rolling(n).apply(
lambda x: _slope_r2_score(x, n), raw=False
)
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
"""
计算日收益率
Args:
price_series: 价格序列
Returns:
Series: 日收益率
"""
return price_series / price_series.shift(1) - 1
def compute_factors(
etf_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率
Args:
etf_data: DataFrame, 宽表格式的收盘价
code_list: 指数代码列表
n: 动量/趋势窗口
factor_type: 'momentum''slope_r2'
Returns:
tuple: (result_df, valid_codes)
"""
result = etf_data.copy()
# 过滤掉缺失值过多的指数
total_rows = len(result)
valid_codes = []
for code in code_list:
if code not in result.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code])
else:
valid_codes.append(code)
# 对有效指数计算因子
for code in valid_codes:
result[f"日收益率_{code}"] = calculate_daily_return(result[code])
if factor_type == "momentum":
result[f"得分_{code}"] = calculate_momentum(result[code], n)
elif factor_type == "slope_r2":
result[f"得分_{code}"] = calculate_slope_r2(result[code], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 按得分列做 dropna
score_cols = [f"得分_{code}" for code in valid_codes]
result = result.dropna(subset=score_cols)
print("\n因子计算完成:")
print(f" 因子类型: {factor_type}")
print(f" 窗口天数: {n}")
print(f" 有效指数: {len(valid_codes)}/{len(code_list)}")
print(f" 有效数据: {len(result)}")
return result, valid_codes