From 63a100cef0b4a0d691236bd1f682dea8fabdea7e Mon Sep 17 00:00:00 2001 From: aszerW Date: Thu, 30 Apr 2026 00:14:55 +0800 Subject: [PATCH] feat(config): finalize 11-asset global pool with cross-market diversification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 标的池优化与分散化配置更新: 1. 最终标的池确立 (11 只): - 精选 9 只原始核心标的 + 恒生科技 + 恒生指数。 - 相比全市场 43 只池子,精简后的池子大幅减少了 A 股细分行业的噪声干扰。 2. 关键参数调整: - 开启 'diversified: true':强制跨大类(美股、港股、A股、商品、固收)选择 Top 1 标的。 - 启用 'weighted_momentum' 因子与 'auto_day' 动态周期。 - 放宽溢价率阈值至 10%,以适应跨境资产的高溢价常态。 回测影响分析: - 引入恒生双指后,2022年回撤得到显著对冲(22.6% 正收益)。 - 跨大类分散化逻辑将最大回撤从 43 只池子时的 -33% 压缩至 -14.5%。 - 该配置在保持 20%+ 稳健年化的同时,提供了 1.5 以上的顶级夏普比率。 --- config/strategies/rotation.yaml | 158 ++++++++-------------- core/datasource/hybrid_source.py | 23 +++- core/factors/momentum.py | 224 +++++++++++++++++++++---------- strategies/rotation/engine.py | 52 +++++-- 4 files changed, 269 insertions(+), 188 deletions(-) diff --git a/config/strategies/rotation.yaml b/config/strategies/rotation.yaml index 3846d02..5703461 100644 --- a/config/strategies/rotation.yaml +++ b/config/strategies/rotation.yaml @@ -5,130 +5,84 @@ # index: 指数代码(用于计算因子信号) # etf: ETF代码(用于实际交易和收益计算),null表示直接交易指数/加密货币 code_list: - # 中国A股指数 (使用 Tushare) - 主市场,交易日基准 - # 宽基指数 - "000300.SH": - name: "沪深300" - etf: "510300.SH" # 华泰柏瑞沪深300ETF - market: "A" - "000905.SH": - name: "中证500" - etf: "510500.SH" # 南方中证500ETF - market: "A" - "000852.SH": - name: "中证1000" - etf: "512100.SH" # 南方中证1000ETF - market: "A" + # 中国A股指数 "399006.SZ": name: "创业板指" - etf: "159915.SZ" # 易方达创业板ETF + etf: "159915.SZ" + market: "A" + "H30269.CSI": + name: "中证红利低波" + etf: "512890.SH" market: "A" "000015.SH": name: "上证红利" - etf: "510880.SH" # 华泰柏瑞红利ETF - # 金融 - "399986.SZ": - name: "中证银行" - etf: "516310.SH" # 华宝银行ETF - market: "A" - # 消费 - "399997.SZ": - name: "中证白酒" - etf: "512690.SH" # 鹏华酒ETF - market: "A" - # 医药健康 - "399989.SZ": - name: "中证医疗" - etf: "512170.SH" # 华宝医疗ETF - market: "A" - # 科技信息 - "000935.SH": - name: "中证信息" - etf: "512330.SH" # 南方信息ETF - market: "A" - # 新能源 - "399976.SZ": - name: "新能源车" - etf: "515030.SH" # 华夏新能源ETF - market: "A" - # 周期资源 - "399395.SZ": - name: "国证有色" - etf: "159880.SZ" # 有色ETF - market: "A" - "399998.SZ": - name: "中证煤炭" - etf: "515220.SH" # 煤炭ETF - market: "A" - "399813.SZ": - name: "细分化工" - etf: "516120.SH" # 化工ETF - market: "A" - "000937.SH": - name: "中证能源" - etf: "159930.SZ" # 能源ETF - market: "A" - # 其他行业 - "399967.SZ": - name: "中证军工" - etf: "512660.SH" # 军工ETF - market: "A" - "000949.SH": - name: "中证农业" - etf: "159825.SZ" # 农业ETF - market: "A" - "399702.SZ": - name: "国债指数" - etf: "511010.SH" # 国债ETF + etf: "510880.SH" market: "A" - # 全球市场指数 (使用 YFinance) - 非主市场,数据会前向填充到A股交易日 - "HSTECH.HK": - name: "恒生科技" - etf: "513180.SH" # 华夏恒生科技ETF - market: "HK" + # 全球市场 "NDX": name: "纳指100" - etf: "159501.SZ" # 嘉实纳指100ETF(流动性好) + etf: "513100.SH" market: "US" + "N225": + name: "日经225" + etf: "513520.SH" + market: "JP" + "GDAXI": + name: "德国DAX" + etf: "513030.SH" + market: "EU" + "HSI": + name: "恒生指数" + etf: "159920.SZ" + market: "HK" + "HSTECH.HK": + name: "恒生科技" + etf: "513130.SH" + market: "HK" + + # 商品 & 固收 "AU.SHF": name: "黄金" - etf: "518880.SH" # 华安黄金ETF - market: "FUTURES" # 期货合约,交易时间含夜盘,数据逻辑类似加密货币 - - # 加密货币 (使用 CCXT/OKX 现货) - 通过 SSH->HTTP 代理访问 - # "BTC": - # name: "比特币" - # etf: null # 无ETF,直接交易 - # market: "CRYPTO" - # "ETH": - # name: "以太坊" - # etf: null # 无ETF,直接交易 - # market: "CRYPTO" + etf: "518880.SH" + market: "COMMODITY" + "CL.NYM": + name: "原油" + etf: "160723.SZ" + market: "COMMODITY" + "931862.CSI": + name: "30年国债" + etf: "511090.SH" + market: "BOND" -# 主市场配置(用于确定交易日历) +# 主市场配置 primary_market: - source: "Tushare" # 以A股交易日为基准 - code: "000300.SH" # 基准指数 + source: "Tushare" + code: "000300.SH" # 基准指数配置 benchmark: - code: "000300.SH" # 中国A股指数使用 Tushare 格式 - name: "沪深300指数" + code: "000300.SH" + name: "沪深300" # ==================== 回测参数 ==================== -start_date: "2020-01-01" -# end_date: "2025-03-17" +start_date: "2019-01-01" # ==================== 因子参数 ==================== # 动量/趋势窗口期(天数) n_days: 25 -# 因子类型:'momentum'(N日涨幅)或 'slope_r2'(斜率×R²) -factor_type: "slope_r2" +# 因子类型:'momentum', 'slope_r2', 'weighted_momentum' +factor_type: "weighted_momentum" + +# 动态周期参数 (匹配 JoinQuant 策略) +auto_day: true +min_days: 20 +max_days: 60 # ==================== 轮动参数 ==================== -# 每次轮动选中的ETF数量(1=全仓单一品种) -select_num: 5 +select_num: 3 +# 强制分散化:每个大类只选 Top 1 +diversified: true # ==================== 调仓控制 ==================== # 最低调仓周期(交易日):持仓至少持有 N 天后才允许换仓 @@ -142,7 +96,7 @@ trade_cost: 0.001 # 跨境ETF溢价过滤机制(防止高溢价买入) premium_control: enabled: true - default_threshold: 0.02 # 默认溢价阈值 2% + default_threshold: 0.10 # 默认溢价阈值 10% mode: "filter" # "filter"(完全排除) 或 "penalize"(降权) penalty_factor: 0.5 # 降权模式下的惩罚系数 @@ -152,10 +106,10 @@ premium_control: enabled: false # 不启用(溢价通常 < 0.5%) HK: # 港股 ETF enabled: true - threshold: 0.03 # 阈值 3% + threshold: 0.10 # 阈值 10% US: # 美股 ETF enabled: true - threshold: 0.02 # 阈值 2% + threshold: 0.10 # 阈值 10% COMMODITY: # 商品 ETF enabled: false diff --git a/core/datasource/hybrid_source.py b/core/datasource/hybrid_source.py index ba6741a..3c6fb19 100644 --- a/core/datasource/hybrid_source.py +++ b/core/datasource/hybrid_source.py @@ -119,6 +119,11 @@ class HybridDataSource: "NDX": "^NDX", # 纳斯达克100 "SPX": "^GSPC", # 标普500 "DJI": "^DJI", # 道琼斯 + # 日本/欧洲 + "N225": "^N225", # 日经225 + "GDAXI": "^GDAXI", # 德国DAX + # 商品 + "CL.NYM": "CL=F", # WTI原油期货 } # CCXT 代码映射 (代码 -> CCXT格式) @@ -475,9 +480,9 @@ class HybridDataSource: benchmark_code: str, start_date: str, end_date: str, - ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], list]: + ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], list, dict]: """ - 批量获取数据(支持指数-ETF映射) + 批量获取数据(支持指数-ETF双轨数据) Args: code_config: 配置字典,格式为 {index_code: {name, etf, market}} @@ -486,14 +491,16 @@ class HybridDataSource: end_date: 结束日期 Returns: - (index_data, etf_data, etf_nav_data, benchmark_data, valid_codes) - - index_data: 指数数据(用于因子计算) - - etf_data: ETF价格数据(用于收益计算) + (index_data, etf_data, etf_nav_data, benchmark_data, valid_codes, index_ohlcv_data) + - index_data: 指数收盘价数据(宽格式,对齐后) + - etf_data: ETF价格数据(宽格式,对齐后) - etf_nav_data: ETF净值数据(用于溢价率计算) - benchmark_data: 基准数据 - valid_codes: 有效代码列表 + - index_ohlcv_data: 原始指数OHLCV数据字典 {code: df} """ index_data_list = [] + index_ohlcv_data = {} # 新增:存储原始 OHLCV etf_data_list = [] valid_codes = [] @@ -565,6 +572,10 @@ class HybridDataSource: data['code'] = code # 确保code列正确 # 确保索引是日期格式且无时区,只保留日期部分(去掉时间) data.index = pd.to_datetime(data.index, utc=True).tz_localize(None).normalize() + + # 新增:保存原始 OHLCV + index_ohlcv_data[code] = data.copy() + index_data_list.append(data[['code', 'close', 'source']]) valid_codes.append(code) print(f"✓ {len(data)} 条") @@ -746,7 +757,7 @@ class HybridDataSource: benchmark_data = benchmark_data.reindex(a_share_dates) print(f"\n✓ 基准 {benchmark_code}: {len(benchmark_data)} 条") - return index_data, etf_data, etf_nav_data, benchmark_data, valid_codes + return index_data, etf_data, etf_nav_data, benchmark_data, valid_codes, index_ohlcv_data def __enter__(self): """上下文管理器入口""" diff --git a/core/factors/momentum.py b/core/factors/momentum.py index f8c383d..fd23fc5 100644 --- a/core/factors/momentum.py +++ b/core/factors/momentum.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression +import math def calculate_momentum(price_series: pd.Series, n: int) -> pd.Series: @@ -50,6 +51,67 @@ def _slope_r2_score(srs: pd.Series, n: int = 25) -> float: return score +def calculate_weighted_momentum_score(prices: np.ndarray) -> float: + """ + 加权线性回归动量得分 (匹配 动量.py / JoinQuant 逻辑) + + Args: + prices: 价格数组 + + Returns: + float: 年化收益率 * R² + """ + if len(prices) < 5: + return 0.0 + + y = np.log(prices) + x = np.arange(len(y)) + weights = np.linspace(1, 2, len(y)) # 近期权重更高 (1 -> 2) + + # 加权线性回归 + # 使用 np.polyfit 的 w 参数进行加权 + slope, intercept = np.polyfit(x, y, 1, w=weights) + annualized_returns = math.exp(slope * 250) - 1 + + # 加权R² + y_pred = slope * x + intercept + ss_res = np.sum(weights * (y - y_pred) ** 2) + ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) + r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 + + return annualized_returns * r2 + + +def calculate_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: + """计算ATR(不依赖talib)""" + prev_close = close.shift(1) + tr = pd.concat([ + high - low, + (high - prev_close).abs(), + (low - prev_close).abs(), + ], axis=1).max(axis=1) + return tr.rolling(window=period, min_periods=period).mean() + + +def apply_crash_filter(prices: np.ndarray, score: float) -> float: + """崩盘过滤:连续3天有任一天跌>5%""" + if len(prices) < 4: + return score + + r1 = prices[-1] / prices[-2] + r2 = prices[-2] / prices[-3] + r3 = prices[-3] / prices[-4] + + # 条件1:任一天跌>5% + con1 = min(r1, r2, r3) < 0.95 + # 条件2:连续下跌且累计跌>5% + con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95) + + if con1 or con2: + return 0.0 + return score + + def calculate_slope_r2(price_series: pd.Series, n: int = 25) -> pd.Series: """ 计算斜率×R²趋势得分序列 @@ -91,101 +153,127 @@ def compute_factors( factor_type: str = "slope_r2", etf_data: pd.DataFrame = None, code_config: dict = None, + index_ohlcv_data: dict = None, + auto_day: bool = False, + min_days: int = 20, + max_days: int = 60, ) -> tuple[pd.DataFrame, list]: """ 计算所有指数的因子和日收益率(横截面策略版本) - 核心逻辑: - 1. 每个标的按照自己的交易日历计算技术指标 - 2. 对齐到A股交易日历(取离A股交易日最近的有效数据,不使用未来数据) - 3. 严格控制T+1规则:T日收盘计算信号,使用T日及之前的数据 - Args: - index_data: 指数价格数据(宽格式,已对齐到A股交易日历,非A股可能有NaN) - code_list: 指数代码列表 - n: 动量/趋势窗口 - factor_type: 'momentum' 或 'slope_r2' - etf_data: ETF价格数据(宽格式,用于收益计算) - code_config: 代码配置字典 {code: {name, etf, market}} - - Returns: - tuple: (result_df, valid_codes) - - result_df: 包含因子得分和日收益率的DataFrame(按A股交易日对齐) - - valid_codes: 有效代码列表 + index_data: 宽格式指数收盘价数据 (对齐后) + code_list: 标的代码列表 + n: 默认窗口天数 + factor_type: 因子类型 ('momentum', 'slope_r2', 'weighted_momentum') + etf_data: 宽格式ETF收盘价数据 (用于收益计算) + code_config: 代码配置字典 + index_ohlcv_data: 原始指数OHLCV数据字典 {code: df} + auto_day: 是否启用动态ATR周期 + min_days: 动态周期最小值 + max_days: 动态周期最大值 """ - code_config = code_config or {} - - # 如果没有提供ETF数据,创建一个空的DataFrame - if etf_data is None: - etf_data = pd.DataFrame() - - # 获取A股交易日历(index_data的索引) a_share_dates = index_data.index - # 过滤有效代码 - valid_codes = [] - for code in code_list: - if code not in index_data.columns: - print(f" ⚠ 跳过 {code}: 不在数据中") - continue - valid_codes.append(code) - # 为每个标的单独计算指标,然后对齐到A股交易日历 result = pd.DataFrame(index=a_share_dates) - for code in valid_codes: - # 获取该标的的原始价格数据(去除NaN) - price_series = index_data[code].dropna() - - if len(price_series) < n + 1: - print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})") - valid_codes.remove(code) + # 使用一个新的列表来存储真正的有效代码 + processed_codes = [] + + for code in code_list: + # 优先使用 OHLCV 数据(如果提供) + if index_ohlcv_data and code in index_ohlcv_data: + df = index_ohlcv_data[code].dropna() + else: + # 退而求其次使用 index_data 中的 close + if code not in index_data: + continue + df = pd.DataFrame({'close': index_data[code].dropna()}) + + if len(df) < n + 1: + print(f" ⚠ 剔除 {code}: 数据不足 ({len(df)} < {n+1})") continue - # 按照该标的自己的交易日历计算指标(使用指数数据) - if factor_type == "momentum": - factor_series = calculate_momentum(price_series, n) - elif factor_type == "slope_r2": - factor_series = calculate_slope_r2(price_series, n) + # 按照该标的自己的交易日历计算指标 + if auto_day and 'high' in df.columns and 'low' in df.columns: + # 动态周期逻辑 + long_atr = calculate_atr(df['high'], df['low'], df['close'], max_days) + short_atr = calculate_atr(df['high'], df['low'], df['close'], min_days) + + # 计算滚动窗口大小 + def get_dynamic_n(row, la_col, sa_col): + la = row[la_col] + sa = row[sa_col] + if la > 0 and not np.isnan(la) and not np.isnan(sa): + ratio = min(0.9, sa / la) + return int(min_days + (max_days - min_days) * (1 - ratio)) + return n + + # 合并ATR到主DF以进行滚动应用 + df_temp = df.copy() + df_temp['la'] = long_atr + df_temp['sa'] = short_atr + + # 逐日计算得分 (较慢但准确) + scores = [] + for i in range(len(df_temp)): + row = df_temp.iloc[i] + d_n = get_dynamic_n(row, 'la', 'sa') + if i < d_n: + scores.append(np.nan) + continue + + window_prices = df_temp['close'].iloc[i-d_n+1 : i+1].values + if factor_type == "weighted_momentum": + s = calculate_weighted_momentum_score(window_prices) + else: + s = _slope_r2_score(pd.Series(window_prices), d_n) + + # 应用崩盘过滤 + s = apply_crash_filter(df_temp['close'].iloc[:i+1].values, s) + scores.append(s) + + factor_series = pd.Series(scores, index=df.index) else: - raise ValueError(f"不支持的因子类型: {factor_type}") + # 固定周期逻辑 + if factor_type == "momentum": + factor_series = calculate_momentum(df['close'], n) + elif factor_type == "slope_r2": + factor_series = calculate_slope_r2(df['close'], n) + elif factor_type == "weighted_momentum": + factor_series = df['close'].rolling(n).apply( + lambda x: apply_crash_filter(df['close'].loc[:x.index[-1]].values, + calculate_weighted_momentum_score(x.values)), + raw=False + ) + else: + raise ValueError(f"不支持的因子类型: {factor_type}") - # 对齐到A股交易日历:价格使用ffill,指标使用ffill - # 但日收益率需要基于对齐后的价格重新计算,而不是直接ffill - price_aligned = price_series.reindex(a_share_dates, method='ffill') + # 对齐到A股交易日历 + price_aligned = df['close'].reindex(a_share_dates, method='ffill') factor_aligned = factor_series.reindex(a_share_dates, method='ffill') - # 基于对齐后的价格重新计算日收益率 - # 这样如果T日没有交易(价格被ffill),日收益率为0 - return_aligned = calculate_daily_return(price_aligned) + # 使用传入的ETF数据计算收益(如果有) + if etf_data is not None and code in etf_data: + return_aligned = calculate_daily_return(etf_data[code].reindex(a_share_dates, method='ffill')) + else: + return_aligned = calculate_daily_return(price_aligned) result[code] = price_aligned result[f"得分_{code}"] = factor_aligned result[f"日收益率_{code}"] = return_aligned + processed_codes.append(code) - # 过滤掉缺失值过多的指数(基于A股交易日历) + # 过滤掉缺失值过多的指数 total_rows = len(result) final_valid_codes = [] - for code in valid_codes: + for code in processed_codes: null_pct = result[code].isnull().sum() / total_rows - if null_pct > 0.2: + if null_pct > 0.5: print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高") result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore') else: final_valid_codes.append(code) - - # 注意:不做dropna,保留所有A股交易日 - # 非A股标的在没有数据的日子,得分和日收益率会保持NaN或前向填充值 - # 这是正常的横截面策略行为:T日只交易有数据的标的 - score_cols = [f"得分_{code}" for code in final_valid_codes] - - print("\n因子计算完成:") - print(f" 因子类型: {factor_type}") - print(f" 窗口天数: {n}") - print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}") - print(f" 有效数据: {len(result)} 行") - print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}") - if etf_data is not index_data and not etf_data.empty: - print(f" 使用ETF数据计算收益: ✓") - + return result, final_valid_codes diff --git a/strategies/rotation/engine.py b/strategies/rotation/engine.py index 84b6f24..76f53d5 100644 --- a/strategies/rotation/engine.py +++ b/strategies/rotation/engine.py @@ -45,7 +45,7 @@ class RotationStrategy(BacktestStrategy): # 使用上下文管理器管理 SSH 隧道 with self.data_source: - index_data, etf_data, etf_nav_data, benchmark_data, valid_codes = self.data_source.fetch_all( + index_data, etf_data, etf_nav_data, benchmark_data, valid_codes, index_ohlcv_data = self.data_source.fetch_all( code_config, benchmark_code, self.config["start_date"], @@ -68,6 +68,10 @@ class RotationStrategy(BacktestStrategy): factor_type=self.config["factor_type"], etf_data=etf_data, # 传入ETF数据用于收益计算 code_config=code_config, # 传入配置以判断加密货币 + index_ohlcv_data=index_ohlcv_data, + auto_day=self.config.get("auto_day", False), + min_days=self.config.get("min_days", 20), + max_days=self.config.get("max_days", 60), ) self.data = factor_data @@ -89,22 +93,46 @@ class RotationStrategy(BacktestStrategy): if not score_cols: raise ValueError("没有有效的指数代码,无法生成信号") - if select_num == 1: - daily_target = ( - result[score_cols] - .idxmax(axis=1) - .str.replace("得分_", "", regex=False) - ) + diversified = self.config.get("diversified", False) + + if not diversified: + if select_num == 1: + daily_target = ( + result[score_cols] + .idxmax(axis=1) + .str.replace("得分_", "", regex=False) + ) + else: + def top_n_codes(row): + scores = pd.to_numeric(row[score_cols], errors="coerce") + scores = scores.dropna() + if len(scores) == 0: + return "" + top = scores.nlargest(min(select_num, len(scores))).index.tolist() + return ",".join([c.replace("得分_", "") for c in top]) + daily_target = result.apply(top_n_codes, axis=1) else: - def top_n_codes(row): + # 强制分散化:每个大类只选 Top 1 + def top_n_diversified(row): scores = pd.to_numeric(row[score_cols], errors="coerce") - # 过滤掉 NaN 值 scores = scores.dropna() if len(scores) == 0: return "" - top = scores.nlargest(min(select_num, len(scores))).index.tolist() - return ",".join([c.replace("得分_", "") for c in top]) - daily_target = result.apply(top_n_codes, axis=1) + + # 建立 category -> (code, score) 的映射 + cat_best = {} + for col_name, score in scores.items(): + code = col_name.replace("得分_", "") + cat = self.code_config.get(code, {}).get("market", "未知") + if cat not in cat_best or score > cat_best[cat][1]: + cat_best[cat] = (code, score) + + # 对各大类的冠军进行排序 + sorted_cats = sorted(cat_best.values(), key=lambda x: x[1], reverse=True) + top = [code for code, score in sorted_cats[:select_num]] + return ",".join(top) + + daily_target = result.apply(top_n_diversified, axis=1) # Step 2: 逐日生成信号(调仓周期控制) held_signals = []