From ec9c808e6cbf91739256a9e840c2bf647efaac6c Mon Sep 17 00:00:00 2001 From: aszerW Date: Thu, 26 Mar 2026 01:26:14 +0800 Subject: [PATCH] =?UTF-8?q?refactor(momentum):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=9B=A0=E5=AD=90=E8=AE=A1=E7=AE=97=E6=B5=81=E7=A8=8B=E5=B9=B6?= =?UTF-8?q?=E5=AF=B9=E9=BD=90A=E8=82=A1=E4=BA=A4=E6=98=93=E6=97=A5?= =?UTF-8?q?=E5=8E=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加辅助函数判断是否为A股指数 - 调整compute_factors函数结构,分别计算每个标的技术指标 - 严格实现T+1规则,确保信号只用T日及以前数据 - 对齐所有数据到A股交易日历,使用前向填充避免未来数据泄漏 - 增加有效代码有效性检查,剔除数据不足或缺失率过高的标的 - 完善函数注释,明确输入输出及核心逻辑说明 - 优化打印信息,清晰展示因子类型、窗口、有效标的及时间范围 --- core/factors/momentum.py | 85 ++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/core/factors/momentum.py b/core/factors/momentum.py index 018a37b..be5a4e0 100644 --- a/core/factors/momentum.py +++ b/core/factors/momentum.py @@ -79,6 +79,11 @@ def calculate_daily_return(price_series: pd.Series) -> pd.Series: return price_series / price_series.shift(1) - 1 +def _is_china_index(code: str) -> bool: + """判断是否为A股指数""" + return code.endswith('.SH') or code.endswith('.SZ') or code.endswith('.SS') + + def compute_factors( index_data: pd.DataFrame, code_list: list, @@ -88,19 +93,24 @@ def compute_factors( code_config: dict = None, ) -> tuple[pd.DataFrame, list]: """ - 计算所有指数的因子和日收益率(支持指数-ETF双轨数据) + 计算所有指数的因子和日收益率(横截面策略版本) + + 核心逻辑: + 1. 每个标的按照自己的交易日历计算技术指标 + 2. 对齐到A股交易日历(取离A股交易日最近的有效数据,不使用未来数据) + 3. 严格控制T+1规则:T日收盘计算信号,使用T日及之前的数据 Args: - index_data: 指数价格数据(宽格式,用于因子计算) + index_data: 指数价格数据(宽格式,已对齐到A股交易日历,非A股可能有NaN) code_list: 指数代码列表 n: 动量/趋势窗口 factor_type: 'momentum' 或 'slope_r2' etf_data: ETF价格数据(宽格式,用于收益计算) - code_config: 代码配置字典 {code: {name, etf, market}},用于判断是否为加密货币 + code_config: 代码配置字典 {code: {name, etf, market}} Returns: tuple: (result_df, valid_codes) - - result_df: 包含因子得分和日收益率的DataFrame + - result_df: 包含因子得分和日收益率的DataFrame(按A股交易日对齐) - valid_codes: 有效代码列表 """ code_config = code_config or {} @@ -109,45 +119,68 @@ def compute_factors( if etf_data is None: etf_data = pd.DataFrame() - result = index_data.copy() + # 获取A股交易日历(index_data的索引) + a_share_dates = index_data.index - # 过滤掉缺失值过多的指数 - total_rows = len(result) + # 过滤有效代码 valid_codes = [] for code in code_list: - if code not in result.columns: + if code not in index_data.columns: print(f" ⚠ 跳过 {code}: 不在数据中") continue - null_pct = result[code].isnull().sum() / total_rows - if null_pct > 0.2: - print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") - result = result.drop(columns=[code]) - else: - valid_codes.append(code) - - # 对有效指数计算因子和收益率 + valid_codes.append(code) + + # 为每个标的单独计算指标,然后对齐到A股交易日历 + result = pd.DataFrame(index=a_share_dates) + for code in valid_codes: - # 因子基于指数价格计算 + # 获取该标的的原始价格数据(去除NaN) + price_series = index_data[code].dropna() + + if len(price_series) < n + 1: + print(f" ⚠ 剔除 {code}: 数据不足 ({len(price_series)} < {n+1})") + valid_codes.remove(code) + continue + + # 按照该标的自己的交易日历计算指标 if factor_type == "momentum": - result[f"得分_{code}"] = calculate_momentum(result[code], n) + factor_series = calculate_momentum(price_series, n) elif factor_type == "slope_r2": - result[f"得分_{code}"] = calculate_slope_r2(result[code], n) + factor_series = calculate_slope_r2(price_series, n) else: raise ValueError(f"不支持的因子类型: {factor_type}") - # 日收益率基于指数价格计算(回测使用指数价格) - result[f"日收益率_{code}"] = calculate_daily_return(result[code]) + # 计算日收益率 + return_series = calculate_daily_return(price_series) + + # 对齐到A股交易日历:取离A股交易日最近的有效数据(不使用未来数据) + # 使用reindex + method='ffill',确保T日使用T日或之前的数据 + result[code] = price_series.reindex(a_share_dates, method='ffill') + result[f"得分_{code}"] = factor_series.reindex(a_share_dates, method='ffill') + result[f"日收益率_{code}"] = return_series.reindex(a_share_dates, method='ffill') + + # 过滤掉缺失值过多的指数(基于A股交易日历) + total_rows = len(result) + final_valid_codes = [] + for code in valid_codes: + null_pct = result[code].isnull().sum() / total_rows + if null_pct > 0.2: + print(f" ⚠ 剔除 {code}: 对齐后缺失率 {null_pct:.1%} 过高") + result = result.drop(columns=[code, f"得分_{code}", f"日收益率_{code}"], errors='ignore') + else: + final_valid_codes.append(code) - # 按得分列做 dropna - score_cols = [f"得分_{code}" for code in valid_codes] + # 按得分列做 dropna(确保所有标的同时有数据) + score_cols = [f"得分_{code}" for code in final_valid_codes] result = result.dropna(subset=score_cols) print("\n因子计算完成:") print(f" 因子类型: {factor_type}") print(f" 窗口天数: {n}") - print(f" 有效指数: {len(valid_codes)}/{len(code_list)}") + print(f" 有效指数: {len(final_valid_codes)}/{len(code_list)}") print(f" 有效数据: {len(result)} 行") - if etf_data is not index_data: + print(f" 时间范围: {result.index[0].date()} ~ {result.index[-1].date()}") + if etf_data is not index_data and not etf_data.empty: print(f" 使用ETF数据计算收益: ✓") - return result, valid_codes + return result, final_valid_codes