diff --git a/rotation/config_loader.py b/rotation/config_loader.py index 4f74304..a1cba5f 100644 --- a/rotation/config_loader.py +++ b/rotation/config_loader.py @@ -35,6 +35,8 @@ class FactorType(str, Enum): WEIGHTED_MOMENTUM = "weighted_momentum" VOL_ADJUSTED_MOMENTUM = "vol_adjusted_momentum" STANDARDIZED_SLOPE = "standardized_slope" + SLOPE_R2_IDM = "slope_r2_idm" # slope_r2 * IDM (信息离散动量融合) + SLOPE_R2_ENSEMBLE = "slope_r2_ensemble" # 多窗口融合 (63/126/252天) class PremiumMode(str, Enum): diff --git a/rotation/results/simple_rotation_report.png b/rotation/results/simple_rotation_report.png deleted file mode 100644 index 7355307..0000000 Binary files a/rotation/results/simple_rotation_report.png and /dev/null differ diff --git a/rotation/simple_rotation.py b/rotation/simple_rotation.py index b572781..4cd2959 100644 --- a/rotation/simple_rotation.py +++ b/rotation/simple_rotation.py @@ -123,6 +123,59 @@ def slope_r2_score(prices: np.ndarray) -> float: return 10000 * slope * r2 +def info_dispersal_momentum(prices: np.ndarray) -> float: + """Information Dispersal Momentum (IDM): 正收益天数占比 + + 衡量上涨的持续性和广度,过滤靠少数极端日撑起来的假动量。 + 范围 [0, 1],值越高表示上涨天数越多。 + """ + if len(prices) < 2: + return 0.0 + returns = np.diff(prices) + positive_days = np.sum(returns > 0) + return positive_days / len(returns) + + +def slope_r2_idm_score(prices: np.ndarray) -> float: + """Slope * R² * IDM: 趋势强度 × 拟合质量 × 上涨持续性 + + 将 slope_r2 与信息离散动量 (IDM) 通过乘法融合: + - slope_r2 捕捉趋势方向和稳定性 + - IDM 作为折扣系数,惩罚靠少数大涨日撑起来的动量 + """ + sr2 = slope_r2_score(prices) + idm = info_dispersal_momentum(prices) + return sr2 * idm + + +def slope_r2_ensemble_score(prices: np.ndarray, windows: list = None) -> float: + """多窗口 slope_r2 等权融合 + + 使用多个时间窗口(默认 63/126/252 天)计算 slope_r2,取等权平均。 + 金融语义:捕捉不同周期的趋势信号,降低单窗口风险。 + + 对 slope_r2 偏好的影响: + - 削弱对"高波动+间歇性趋势"资产的偏好 + - 增强对"低波动+持续性趋势"资产的识别 + - 让因子更偏向"持续性趋势"而非"爆发性趋势" + """ + if windows is None: + windows = [63, 126, 252] # 3月、6月、12月 + + scores = [] + for w in windows: + if len(prices) >= w: + # 取最后 w 天的价格计算 slope_r2 + window_prices = prices[-w:] + score = slope_r2_score(window_prices) + scores.append(score) + + if not scores: + return 0.0 + + return sum(scores) / len(scores) + + def standardized_slope_score(prices: np.ndarray) -> float: """Standardized slope (t-statistic): slope / SE(slope) @@ -459,7 +512,9 @@ class SimpleRotationStrategy: """Preload all historical data""" start_date = self.config.backtest.start_date end_date = self.config.backtest.end_date or datetime.now().strftime('%Y-%m-%d') - preload_start = (pd.Timestamp(start_date) - timedelta(days=self.n_days * 2)).strftime('%Y-%m-%d') + # ensemble 因子需要更长的历史(252天窗口,预加载 2 倍) + preload_days = 504 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days * 2 + preload_start = (pd.Timestamp(start_date) - timedelta(days=preload_days)).strftime('%Y-%m-%d') print("\n[1/4] Preloading signal sources (index raw)...") for code in self.signal_codes: @@ -498,13 +553,17 @@ class SimpleRotationStrategy: df = self.index_data[signal_code] mask = df.index <= date recent = df.loc[mask] - if len(recent) < self.n_days: + + # ensemble 因子需要更长的窗口(252天) + required_days = 252 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days + + if len(recent) < required_days: return None - prices = recent['close'].values[-self.n_days:] + prices = recent['close'].values[-required_days:] base_momentum = self._compute_base_momentum(prices) - if is_crash(prices): + if is_crash(prices[-self.n_days:]): # crash filter 仍然用 n_days 窗口 return 0.0 return base_momentum @@ -517,17 +576,21 @@ class SimpleRotationStrategy: df = self.index_data[signal_code] mask = df.index <= date recent = df.loc[mask] - if len(recent) < self.n_days: + + # ensemble 因子需要更长的窗口(252天) + required_days = 252 if self.config.factor.type == FactorType.SLOPE_R2_ENSEMBLE else self.n_days + + if len(recent) < required_days: return None - prices = recent['close'].values[-self.n_days:] + prices = recent['close'].values[-required_days:] if self.config.factor.type == FactorType.VOL_ADJUSTED_MOMENTUM: - base_momentum = weighted_momentum_score(prices) + base_momentum = weighted_momentum_score(prices[-self.n_days:]) else: # Use the same score function as ranking base_momentum = self._compute_base_momentum(prices) - if is_crash(prices): + if is_crash(prices[-self.n_days:]): return 0.0 return base_momentum @@ -538,6 +601,10 @@ class SimpleRotationStrategy: return vol_adjusted_momentum_score(prices) elif ft == FactorType.SLOPE_R2: return slope_r2_score(prices) + elif ft == FactorType.SLOPE_R2_IDM: + return slope_r2_idm_score(prices) + elif ft == FactorType.SLOPE_R2_ENSEMBLE: + return slope_r2_ensemble_score(prices) elif ft == FactorType.STANDARDIZED_SLOPE: return standardized_slope_score(prices) elif ft == FactorType.MOMENTUM: