diff --git a/core/common/utils.py b/core/common/utils.py index b742927..3a73f24 100644 --- a/core/common/utils.py +++ b/core/common/utils.py @@ -72,7 +72,20 @@ def calculate_cagr( Returns: float: CAGR值 """ - total_return = nav_series.iloc[-1] / nav_series.iloc[0] + # 去除NaN值 + nav_series = nav_series.dropna() + + if len(nav_series) < 2: + return 0.0 + + start_val = nav_series.iloc[0] + end_val = nav_series.iloc[-1] + + # 检查起始值是否有效 + if pd.isna(start_val) or pd.isna(end_val) or start_val <= 0: + return 0.0 + + total_return = end_val / start_val if method == "natural_days": days = (nav_series.index[-1] - nav_series.index[0]).days diff --git a/core/datasource/hybrid_source.py b/core/datasource/hybrid_source.py index f955487..4d298f8 100644 --- a/core/datasource/hybrid_source.py +++ b/core/datasource/hybrid_source.py @@ -639,18 +639,31 @@ class HybridDataSource: aggfunc='first' ) - # 数据对齐策略:使用各标的能获取到的最新数据 - # 以A股最新数据日期为基准,其他市场数据对齐到该日期 + # 数据对齐策略:使用配置的日期范围,确保回测区间与配置一致 + # 以A股最新数据日期或配置的end_date(取较早者)为基准 # 获取A股最新数据日期 china_codes = [c for c in valid_codes if self._is_china_index(c)] if china_codes: - a_share_latest = index_data[china_codes].dropna().index.max() + a_share_data_latest = index_data[china_codes].dropna().index.max() else: # 如果没有A股,使用所有数据的最早最新日期 - a_share_latest = index_data.dropna().index.max() + a_share_data_latest = index_data.dropna().index.max() - print(f" A股最新数据日期: {a_share_latest.strftime('%Y-%m-%d')}") + # 使用配置的end_date,但不超过数据的最新日期 + config_end = pd.Timestamp(end_date) + a_share_latest = min(a_share_data_latest, config_end) + + print(f" A股数据最新日期: {a_share_data_latest.strftime('%Y-%m-%d')}") + print(f" 配置结束日期: {config_end.strftime('%Y-%m-%d')}") + print(f" 实际使用日期: {a_share_latest.strftime('%Y-%m-%d')}") + + # 打印各标的的数据时间范围(用于调试) + print(f"\n 各标的数据时间范围:") + for code in valid_codes: + code_data = index_data[code].dropna() + if len(code_data) > 0: + print(f" {code}: {code_data.index.min().strftime('%Y-%m-%d')} ~ {code_data.index.max().strftime('%Y-%m-%d')} ({len(code_data)}条)") # 获取A股交易日历(从start_date到a_share_latest) start_str = pd.Timestamp(start_date).strftime('%Y%m%d') diff --git a/core/factors/momentum.py b/core/factors/momentum.py index be5a4e0..f8c383d 100644 --- a/core/factors/momentum.py +++ b/core/factors/momentum.py @@ -142,7 +142,7 @@ def compute_factors( valid_codes.remove(code) continue - # 按照该标的自己的交易日历计算指标 + # 按照该标的自己的交易日历计算指标(使用指数数据) if factor_type == "momentum": factor_series = calculate_momentum(price_series, n) elif factor_type == "slope_r2": @@ -150,14 +150,18 @@ def compute_factors( else: raise ValueError(f"不支持的因子类型: {factor_type}") - # 计算日收益率 - return_series = calculate_daily_return(price_series) + # 对齐到A股交易日历:价格使用ffill,指标使用ffill + # 但日收益率需要基于对齐后的价格重新计算,而不是直接ffill + price_aligned = price_series.reindex(a_share_dates, method='ffill') + factor_aligned = factor_series.reindex(a_share_dates, method='ffill') - # 对齐到A股交易日历:取离A股交易日最近的有效数据(不使用未来数据) - # 使用reindex + method='ffill',确保T日使用T日或之前的数据 - result[code] = price_series.reindex(a_share_dates, method='ffill') - result[f"得分_{code}"] = factor_series.reindex(a_share_dates, method='ffill') - result[f"日收益率_{code}"] = return_series.reindex(a_share_dates, method='ffill') + # 基于对齐后的价格重新计算日收益率 + # 这样如果T日没有交易(价格被ffill),日收益率为0 + return_aligned = calculate_daily_return(price_aligned) + + result[code] = price_aligned + result[f"得分_{code}"] = factor_aligned + result[f"日收益率_{code}"] = return_aligned # 过滤掉缺失值过多的指数(基于A股交易日历) total_rows = len(result) @@ -170,9 +174,10 @@ def compute_factors( else: final_valid_codes.append(code) - # 按得分列做 dropna(确保所有标的同时有数据) + # 注意:不做dropna,保留所有A股交易日 + # 非A股标的在没有数据的日子,得分和日收益率会保持NaN或前向填充值 + # 这是正常的横截面策略行为:T日只交易有数据的标的 score_cols = [f"得分_{code}" for code in final_valid_codes] - result = result.dropna(subset=score_cols) print("\n因子计算完成:") print(f" 因子类型: {factor_type}") diff --git a/strategies/rotation/engine.py b/strategies/rotation/engine.py index 3b02fb3..84b6f24 100644 --- a/strategies/rotation/engine.py +++ b/strategies/rotation/engine.py @@ -115,6 +115,10 @@ class RotationStrategy(BacktestStrategy): target = daily_target.iloc[i] if current_held is None: + # 跳过空信号,直到找到第一个有效信号 + if not target: + held_signals.append(None) # 添加None占位,保持长度一致 + continue current_held = target last_rebalance_idx = i held_signals.append(current_held) @@ -122,20 +126,24 @@ class RotationStrategy(BacktestStrategy): days_since = i - last_rebalance_idx if days_since >= rebalance_days: - should = self._check_rebalance( - result.iloc[i], current_held, target, - select_num, rebalance_threshold - ) - if should: - current_held = target - last_rebalance_idx = i + # 目标信号为空时不调仓 + if target: # 只在目标有效时才检查是否调仓 + should = self._check_rebalance( + result.iloc[i], current_held, target, + select_num, rebalance_threshold + ) + if should: + current_held = target + last_rebalance_idx = i held_signals.append(current_held) result["信号_raw"] = held_signals result["信号"] = result["信号_raw"].shift(1) result = result.drop(columns=["信号_raw"]) + # 删除信号为 NaN 或空字符串的行 result = result.dropna(subset=["信号"]) + result = result[result["信号"] != ""] self.signals = result self._print_signal_stats(result, select_num, rebalance_days, rebalance_threshold) @@ -152,12 +160,14 @@ class RotationStrategy(BacktestStrategy): return (new_score / old_score - 1) >= threshold return new_score > 0 else: - new_codes = target.split(",") - old_codes = current_held.split(",") + new_codes = [c for c in target.split(",") if c] # 过滤空字符串 + old_codes = [c for c in current_held.split(",") if c] # 过滤空字符串 + if not new_codes or not old_codes: + return True # 有空持仓,需要调仓 if set(new_codes) == set(old_codes): return False - new_total = sum(float(row[f"得分_{c}"]) for c in new_codes) - old_total = sum(float(row[f"得分_{c}"]) for c in old_codes) + new_total = sum(float(row.get(f"得分_{c}", 0)) for c in new_codes) + old_total = sum(float(row.get(f"得分_{c}", 0)) for c in old_codes) if old_total > 0: return (new_total / old_total - 1) >= threshold return new_total > 0 @@ -206,12 +216,17 @@ class RotationStrategy(BacktestStrategy): # 计算策略日收益率 if select_num == 1: def calc_return(row): - return row[f"日收益率_{row['信号']}"] + signal = row['信号'] + if not signal or pd.isna(signal): + return 0.0 + return row.get(f"日收益率_{signal}", 0.0) result["轮动策略日收益率"] = result.apply(calc_return, axis=1) else: def calc_multi_return(row): - codes = row["信号"].split(",") - returns = [row[f"日收益率_{c}"] for c in codes] + codes = [c for c in row["信号"].split(",") if c] # 过滤空字符串 + if not codes: + return 0.0 + returns = [row.get(f"日收益率_{c}", 0.0) for c in codes] return np.mean(returns) result["轮动策略日收益率"] = result.apply(calc_multi_return, axis=1) diff --git a/strategies/rotation/portfolio.py b/strategies/rotation/portfolio.py index dbd6e32..93b6417 100644 --- a/strategies/rotation/portfolio.py +++ b/strategies/rotation/portfolio.py @@ -95,7 +95,10 @@ def track_positions( # 多品种等权轮动 current_signal = signals[0] entry_date = dates[0] - codes = current_signal.split(",") + codes = [c for c in current_signal.split(",") if c] # 过滤空字符串 + if not codes: + # 空信号,返回空结果 + return pd.DataFrame(trades), pd.DataFrame() weight = 1.0 / len(codes) entry_prices = {c: data.loc[entry_date, c] for c in codes} entry_nav = data.loc[entry_date, "轮动策略净值"] @@ -131,7 +134,9 @@ def track_positions( current_signal = today_signal entry_date = dates[i] - codes = current_signal.split(",") + codes = [c for c in current_signal.split(",") if c] # 过滤空字符串 + if not codes: + break # 空信号,结束循环 weight = 1.0 / len(codes) entry_prices = {c: data.loc[entry_date, c] for c in codes} entry_nav = data.loc[entry_date, "轮动策略净值"] @@ -211,23 +216,37 @@ def save_trades( import os os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else ".", exist_ok=True) - trades_out = trades_df.copy() - trades_out["持仓收益"] = trades_out["持仓收益"].apply(lambda x: f"{x:.2%}") - trades_out["进场日期"] = trades_out["进场日期"].apply( - lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10] - ) - trades_out["出场日期"] = trades_out["出场日期"].apply( - lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10] - ) - + # 保存调仓明细 trades_path = f"{save_path}_trades.csv" - trades_out.to_csv(trades_path, index=False, encoding="utf-8-sig") - print(f"\n调仓明细已保存: {trades_path}") - - summary_out = summary_df.copy() - for col in ["胜率", "平均收益", "累计收益", "最大单次收益", "最大单次亏损"]: - summary_out[col] = summary_out[col].apply(lambda x: f"{x:.2%}") + if not trades_df.empty: + trades_out = trades_df.copy() + if "持仓收益" in trades_out.columns: + trades_out["持仓收益"] = trades_out["持仓收益"].apply(lambda x: f"{x:.2%}") + if "进场日期" in trades_out.columns: + trades_out["进场日期"] = trades_out["进场日期"].apply( + lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10] + ) + if "出场日期" in trades_out.columns: + trades_out["出场日期"] = trades_out["出场日期"].apply( + lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10] + ) + trades_out.to_csv(trades_path, index=False, encoding="utf-8-sig") + print(f"\n调仓明细已保存: {trades_path}") + else: + # 创建空文件 + pd.DataFrame().to_csv(trades_path, index=False, encoding="utf-8-sig") + print(f"\n调仓明细为空: {trades_path}") + # 保存品种汇总 summary_path = f"{save_path}_summary.csv" - summary_out.to_csv(summary_path, index=False, encoding="utf-8-sig") - print(f"品种汇总已保存: {summary_path}") + if not summary_df.empty: + summary_out = summary_df.copy() + for col in ["胜率", "平均收益", "累计收益", "最大单次收益", "最大单次亏损"]: + if col in summary_out.columns: + summary_out[col] = summary_out[col].apply(lambda x: f"{x:.2%}") + summary_out.to_csv(summary_path, index=False, encoding="utf-8-sig") + print(f"品种汇总已保存: {summary_path}") + else: + # 创建空文件 + pd.DataFrame().to_csv(summary_path, index=False, encoding="utf-8-sig") + print(f"品种汇总为空: {summary_path}")