feat: 新增 slope_r2_idm 和 slope_r2_ensemble 动量因子

- slope_r2_idm: slope_r2 × IDM(信息离散动量),惩罚靠少数大涨日撑起来的假动量
- slope_r2_ensemble: 多窗口(63/126/252天) slope_r2 等权融合,捕捉不同周期趋势信号
- 新增 info_dispersal_momentum() 计算正收益天数占比
- 新增 slope_r2_idm_score() 和 slope_r2_ensemble_score() 因子函数
- ensemble 因子需要更长预加载窗口(504天)和计算窗口(252天)
- crash filter 仍使用原始 n_days 窗口
This commit is contained in:
2026-06-12 12:37:29 +08:00
parent 49b623931b
commit 8c3ae2269a
3 changed files with 77 additions and 8 deletions

View File

@@ -35,6 +35,8 @@ class FactorType(str, Enum):
WEIGHTED_MOMENTUM = "weighted_momentum"
VOL_ADJUSTED_MOMENTUM = "vol_adjusted_momentum"
STANDARDIZED_SLOPE = "standardized_slope"
SLOPE_R2_IDM = "slope_r2_idm" # slope_r2 * IDM (信息离散动量融合)
SLOPE_R2_ENSEMBLE = "slope_r2_ensemble" # 多窗口融合 (63/126/252天)
class PremiumMode(str, Enum):

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 MiB

View File

@@ -123,6 +123,59 @@ def slope_r2_score(prices: np.ndarray) -> float:
return 10000 * slope * r2
def info_dispersal_momentum(prices: np.ndarray) -> float:
"""Information Dispersal Momentum (IDM): 正收益天数占比
衡量上涨的持续性和广度,过滤靠少数极端日撑起来的假动量。
范围 [0, 1],值越高表示上涨天数越多。
"""
if len(prices) < 2:
return 0.0
returns = np.diff(prices)
positive_days = np.sum(returns > 0)
return positive_days / len(returns)
def slope_r2_idm_score(prices: np.ndarray) -> float:
"""Slope * R² * IDM: 趋势强度 × 拟合质量 × 上涨持续性
将 slope_r2 与信息离散动量 (IDM) 通过乘法融合:
- slope_r2 捕捉趋势方向和稳定性
- IDM 作为折扣系数,惩罚靠少数大涨日撑起来的动量
"""
sr2 = slope_r2_score(prices)
idm = info_dispersal_momentum(prices)
return sr2 * idm
def slope_r2_ensemble_score(prices: np.ndarray, windows: list = None) -> float:
"""多窗口 slope_r2 等权融合
使用多个时间窗口(默认 63/126/252 天)计算 slope_r2取等权平均。
金融语义:捕捉不同周期的趋势信号,降低单窗口风险。
对 slope_r2 偏好的影响:
- 削弱对"高波动+间歇性趋势"资产的偏好
- 增强对"低波动+持续性趋势"资产的识别
- 让因子更偏向"持续性趋势"而非"爆发性趋势"
"""
if windows is None:
windows = [63, 126, 252] # 3月、6月、12月
scores = []
for w in windows:
if len(prices) >= w:
# 取最后 w 天的价格计算 slope_r2
window_prices = prices[-w:]
score = slope_r2_score(window_prices)
scores.append(score)
if not scores:
return 0.0
return sum(scores) / len(scores)
def standardized_slope_score(prices: np.ndarray) -> float:
"""Standardized slope (t-statistic): slope / SE(slope)
@@ -459,7 +512,9 @@ class SimpleRotationStrategy:
"""Preload all historical data"""
start_date = self.config.backtest.start_date
end_date = self.config.backtest.end_date or datetime.now().strftime('%Y-%m-%d')
preload_start = (pd.Timestamp(start_date) - timedelta(days=self.n_days * 2)).strftime('%Y-%m-%d')
# ensemble 因子需要更长的历史252天窗口预加载 2 倍)
preload_days = 504 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days * 2
preload_start = (pd.Timestamp(start_date) - timedelta(days=preload_days)).strftime('%Y-%m-%d')
print("\n[1/4] Preloading signal sources (index raw)...")
for code in self.signal_codes:
@@ -498,13 +553,17 @@ class SimpleRotationStrategy:
df = self.index_data[signal_code]
mask = df.index <= date
recent = df.loc[mask]
if len(recent) < self.n_days:
# ensemble 因子需要更长的窗口252天
required_days = 252 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days
if len(recent) < required_days:
return None
prices = recent['close'].values[-self.n_days:]
prices = recent['close'].values[-required_days:]
base_momentum = self._compute_base_momentum(prices)
if is_crash(prices):
if is_crash(prices[-self.n_days:]): # crash filter 仍然用 n_days 窗口
return 0.0
return base_momentum
@@ -517,17 +576,21 @@ class SimpleRotationStrategy:
df = self.index_data[signal_code]
mask = df.index <= date
recent = df.loc[mask]
if len(recent) < self.n_days:
# ensemble 因子需要更长的窗口252天
required_days = 252 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days
if len(recent) < required_days:
return None
prices = recent['close'].values[-self.n_days:]
prices = recent['close'].values[-required_days:]
if self.config.factor.type == FactorType.VOL_ADJUSTED_MOMENTUM:
base_momentum = weighted_momentum_score(prices)
base_momentum = weighted_momentum_score(prices[-self.n_days:])
else:
# Use the same score function as ranking
base_momentum = self._compute_base_momentum(prices)
if is_crash(prices):
if is_crash(prices[-self.n_days:]):
return 0.0
return base_momentum
@@ -538,6 +601,10 @@ class SimpleRotationStrategy:
return vol_adjusted_momentum_score(prices)
elif ft == FactorType.SLOPE_R2:
return slope_r2_score(prices)
elif ft == FactorType.SLOPE_R2_IDM:
return slope_r2_idm_score(prices)
elif ft == FactorType.SLOPE_R2_ENSEMBLE:
return slope_r2_ensemble_score(prices)
elif ft == FactorType.STANDARDIZED_SLOPE:
return standardized_slope_score(prices)
elif ft == FactorType.MOMENTUM: