Files
etf/core/factors/momentum.py
aszerW 63a100cef0 feat(config): finalize 11-asset global pool with cross-market diversification
标的池优化与分散化配置更新:

1. 最终标的池确立 (11 只):
   - 精选 9 只原始核心标的 + 恒生科技 + 恒生指数。
   - 相比全市场 43 只池子,精简后的池子大幅减少了 A 股细分行业的噪声干扰。

2. 关键参数调整:
   - 开启 'diversified: true':强制跨大类(美股、港股、A股、商品、固收)选择 Top 1 标的。
   - 启用 'weighted_momentum' 因子与 'auto_day' 动态周期。
   - 放宽溢价率阈值至 10%,以适应跨境资产的高溢价常态。

回测影响分析:
- 引入恒生双指后,2022年回撤得到显著对冲(22.6% 正收益)。
- 跨大类分散化逻辑将最大回撤从 43 只池子时的 -33% 压缩至 -14.5%。
- 该配置在保持 20%+ 稳健年化的同时,提供了 1.5 以上的顶级夏普比率。
2026-04-30 00:14:55 +08:00

280 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
动量因子计算模块
支持两种动量因子:
1. N日涨幅简单动量
2. 斜率×R²趋势得分改进版
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
import math
def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series:
"""
计算 N 日涨幅(简单动量)
Args:
price_series: 价格序列
n: 动量窗口天数
Returns:
Series: N日涨幅
"""
return price_series / price_series.shift(n + 1) - 1.0
def _slope_r2_score(srs: pd.Series, n: int = 25) -> float:
"""
单次计算斜率×R²趋势得分
Args:
srs: 价格窗口序列(长度为 n
n: 窗口长度
Returns:
float: 斜率 ×× 10000
"""
if srs.shape[0] < n:
return np.nan
x = np.arange(1, n + 1).reshape(-1, 1)
y = srs.values / srs.values[0] # 归一化
lr = LinearRegression().fit(x, y)
slope = lr.coef_[0]
r_squared = lr.score(x, y)
score = 10000 * slope * r_squared
return score
def calculate_weighted_momentum_score(prices: np.ndarray) -> float:
"""
加权线性回归动量得分 (匹配 动量.py / JoinQuant 逻辑)
Args:
prices: 价格数组
Returns:
float: 年化收益率 * R²
"""
if len(prices) < 5:
return 0.0
y = np.log(prices)
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y)) # 近期权重更高 (1 -> 2)
# 加权线性回归
# 使用 np.polyfit 的 w 参数进行加权
slope, intercept = np.polyfit(x, y, 1, w=weights)
annualized_returns = math.exp(slope * 250) - 1
# 加权R²
y_pred = slope * x + intercept
ss_res = np.sum(weights * (y - y_pred) ** 2)
ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
return annualized_returns * r2
def calculate_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
"""计算ATR不依赖talib"""
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs(),
], axis=1).max(axis=1)
return tr.rolling(window=period, min_periods=period).mean()
def apply_crash_filter(prices: np.ndarray, score: float) -> float:
"""崩盘过滤连续3天有任一天跌>5%"""
if len(prices) < 4:
return score
r1 = prices[-1] / prices[-2]
r2 = prices[-2] / prices[-3]
r3 = prices[-3] / prices[-4]
# 条件1任一天跌>5%
con1 = min(r1, r2, r3) < 0.95
# 条件2连续下跌且累计跌>5%
con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95)
if con1 or con2:
return 0.0
return score
def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series:
"""
计算斜率×R²趋势得分序列
Args:
price_series: 价格序列
n: 滚动窗口天数
Returns:
Series: 趋势得分序列
"""
return price_series.rolling(n).apply(
lambda x: _slope_r2_score(x, n), raw=False
)
def calculate_daily_return(price_series: pd.Series) -> pd.Series:
"""
计算日收益率
Args:
price_series: 价格序列
Returns:
Series: 日收益率
"""
return price_series / price_series.shift(1) - 1
def _is_china_index(code: str) -> bool:
"""判断是否为A股指数"""
return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS')
def compute_factors(
index_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
etf_data: pd.DataFrame = None,
code_config: dict = None,
index_ohlcv_data: dict = None,
auto_day: bool = False,
min_days: int = 20,
max_days: int = 60,
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率(横截面策略版本)
Args:
index_data: 宽格式指数收盘价数据 (对齐后)
code_list: 标的代码列表
n: 默认窗口天数
factor_type: 因子类型 ('momentum', 'slope_r2', 'weighted_momentum')
etf_data: 宽格式ETF收盘价数据 (用于收益计算)
code_config: 代码配置字典
index_ohlcv_data: 原始指数OHLCV数据字典 {code: df}
auto_day: 是否启用动态ATR周期
min_days: 动态周期最小值
max_days: 动态周期最大值
"""
a_share_dates = index_data.index
# 为每个标的单独计算指标然后对齐到A股交易日历
result = pd.DataFrame(index=a_share_dates)
# 使用一个新的列表来存储真正的有效代码
processed_codes = []
for code in code_list:
# 优先使用 OHLCV 数据(如果提供)
if index_ohlcv_data and code in index_ohlcv_data:
df = index_ohlcv_data[code].dropna()
else:
# 退而求其次使用 index_data 中的 close
if code not in index_data:
continue
df = pd.DataFrame({'close': index_data[code].dropna()})
if len(df) < n + 1:
print(f" ⚠ 剔除 {code}: 数据不足 ({len(df)} < {n+1})")
continue
# 按照该标的自己的交易日历计算指标
if auto_day and 'high' in df.columns and 'low' in df.columns:
# 动态周期逻辑
long_atr = calculate_atr(df['high'], df['low'], df['close'], max_days)
short_atr = calculate_atr(df['high'], df['low'], df['close'], min_days)
# 计算滚动窗口大小
def get_dynamic_n(row, la_col, sa_col):
la = row[la_col]
sa = row[sa_col]
if la > 0 and not np.isnan(la) and not np.isnan(sa):
ratio = min(0.9, sa / la)
return int(min_days + (max_days - min_days) * (1 - ratio))
return n
# 合并ATR到主DF以进行滚动应用
df_temp = df.copy()
df_temp['la'] = long_atr
df_temp['sa'] = short_atr
# 逐日计算得分 (较慢但准确)
scores = []
for i in range(len(df_temp)):
row = df_temp.iloc[i]
d_n = get_dynamic_n(row, 'la', 'sa')
if i < d_n:
scores.append(np.nan)
continue
window_prices = df_temp['close'].iloc[i-d_n+1 : i+1].values
if factor_type == "weighted_momentum":
s = calculate_weighted_momentum_score(window_prices)
else:
s = _slope_r2_score(pd.Series(window_prices), d_n)
# 应用崩盘过滤
s = apply_crash_filter(df_temp['close'].iloc[:i+1].values, s)
scores.append(s)
factor_series = pd.Series(scores, index=df.index)
else:
# 固定周期逻辑
if factor_type == "momentum":
factor_series = calculate_momentum(df['close'], n)
elif factor_type == "slope_r2":
factor_series = calculate_slope_r2(df['close'], n)
elif factor_type == "weighted_momentum":
factor_series = df['close'].rolling(n).apply(
lambda x: apply_crash_filter(df['close'].loc[:x.index[-1]].values,
calculate_weighted_momentum_score(x.values)),
raw=False
)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 对齐到A股交易日历
price_aligned = df['close'].reindex(a_share_dates, method='ffill')
factor_aligned = factor_series.reindex(a_share_dates, method='ffill')
# 使用传入的ETF数据计算收益如果有
if etf_data is not None and code in etf_data:
return_aligned = calculate_daily_return(etf_data[code].reindex(a_share_dates, method='ffill'))
else:
return_aligned = calculate_daily_return(price_aligned)
result[code] = price_aligned
result[f"得分_{code}"] = factor_aligned
result[f"日收益率_{code}"] = return_aligned
processed_codes.append(code)
# 过滤掉缺失值过多的指数
total_rows = len(result)
final_valid_codes = []
for code in processed_codes:
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.5:
print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore')
else:
final_valid_codes.append(code)
return result, final_valid_codes