refactor: 归档旧代码,保留新框架结构

归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
This commit is contained in:
2026-05-11 23:34:23 +08:00
parent f663d51b87
commit 1fca536c95
61 changed files with 221 additions and 159 deletions

View File

@@ -1,351 +0,0 @@
"""
ETF轮动策略引擎
整合信号生成和回测逻辑
使用 YFinance 数据源(支持 SSH 隧道)
"""
import pandas as pd
import numpy as np
from typing import Optional
from strategies.base import BacktestStrategy
from core.datasource.hybrid_source import HybridDataSource
from core.factors.momentum import compute_factors, calculate_daily_return
class RotationStrategy(BacktestStrategy):
"""ETF轮动策略"""
def __init__(self, config: dict):
super().__init__("ETF轮动策略", config)
# 初始化混合数据源
ssh_config = config.get("ssh_tunnel", {})
self.data_source = HybridDataSource(
ssh_config=ssh_config,
use_cache=config.get("use_cache", True)
)
print(f"使用混合数据源: Tushare(中国A股) + YFinance(港股/美股/加密货币)")
print(f"SSH隧道: {ssh_config.get('enabled', False)}")
self.data = None
self.signals = None
self.backtest_result = None
def fetch_data(self) -> pd.DataFrame:
"""获取数据(支持指数-ETF双轨数据"""
from config.settings import DEFAULT_BENCHMARK_CODE
# 从配置中读取基准代码,或使用默认值
benchmark_code = self.config.get("benchmark", {}).get("code", DEFAULT_BENCHMARK_CODE)
# 获取代码配置(包含 name, etf, market
code_config = self.config.get("code_list", {})
# 使用上下文管理器管理 SSH 隧道
with self.data_source:
index_data, etf_data, etf_nav_data, benchmark_data, valid_codes, index_ohlcv_data = self.data_source.fetch_all(
code_config,
benchmark_code,
self.config["start_date"],
self.config["end_date"],
)
# 存储数据和配置
self.index_data = index_data # 指数数据(用于因子计算)
self.etf_data = etf_data # ETF价格数据用于收益计算
self.etf_nav_data = etf_nav_data # ETF净值数据用于溢价率计算
self.benchmark_data = benchmark_data
self.valid_codes = valid_codes
self.code_config = code_config # 代码配置(用于判断市场类型)
# 计算因子传入两套数据指数数据用于因子ETF数据用于收益
factor_data, valid_codes = compute_factors(
index_data,
valid_codes,
n=self.config["n_days"],
factor_type=self.config["factor_type"],
etf_data=etf_data, # 传入ETF数据用于收益计算
code_config=code_config, # 传入配置以判断加密货币
index_ohlcv_data=index_ohlcv_data,
auto_day=self.config.get("auto_day", False),
min_days=self.config.get("min_days", 20),
max_days=self.config.get("max_days", 60),
)
self.data = factor_data
self.valid_codes = valid_codes
return factor_data
def generate_signals(self) -> pd.DataFrame:
"""生成轮动信号"""
if self.data is None:
self.fetch_data()
result = self.data.copy()
score_cols = [f"得分_{code}" for code in self.valid_codes]
select_num = self.config["select_num"]
rebalance_days = self.config["rebalance_days"]
rebalance_threshold = self.config["rebalance_threshold"]
# Step 1: 每日目标组合
if not score_cols:
raise ValueError("没有有效的指数代码,无法生成信号")
diversified = self.config.get("diversified", False)
if not diversified:
if select_num == 1:
def top_1_filter(row):
scores = pd.to_numeric(row[score_cols], errors="coerce").dropna()
if scores.empty: return ""
best_code = scores.idxmax()
if scores[best_code] <= 0: return "" # 强制过滤负分
return best_code.replace("得分_", "")
daily_target = result.apply(top_1_filter, axis=1)
else:
def top_n_codes(row):
scores = pd.to_numeric(row[score_cols], errors="coerce").dropna()
scores = scores[scores > 0] # 强制只保留正分标的
if scores.empty: return ""
top = scores.nlargest(min(select_num, len(scores))).index.tolist()
return ",".join([c.replace("得分_", "") for c in top])
daily_target = result.apply(top_n_codes, axis=1)
else:
# 强制分散化:每个大类只选 Top 1
def top_n_diversified(row):
scores = pd.to_numeric(row[score_cols], errors="coerce").dropna()
scores = scores[scores > 0] # 强制只保留正分标的
if scores.empty: return ""
# 建立 category -> (code, score) 的映射
cat_best = {}
for col_name, score in scores.items():
code = col_name.replace("得分_", "")
cat = self.code_config.get(code, {}).get("market", "未知")
if cat not in cat_best or score > cat_best[cat][1]:
cat_best[cat] = (code, score)
# 对各大类的冠军进行排序
sorted_cats = sorted(cat_best.values(), key=lambda x: x[1], reverse=True)
top = [code for code, score in sorted_cats[:select_num]]
return ",".join(top)
daily_target = result.apply(top_n_diversified, axis=1)
# Step 2: 逐日生成信号(调仓周期控制)
held_signals = []
current_held = None
last_rebalance_idx = 0
for i in range(len(result)):
target = daily_target.iloc[i]
if current_held is None:
# 跳过空信号,直到找到第一个有效信号
if not target:
held_signals.append(None) # 添加None占位保持长度一致
continue
current_held = target
last_rebalance_idx = i
held_signals.append(current_held)
continue
days_since = i - last_rebalance_idx
if days_since >= rebalance_days:
# 目标信号为空时不调仓
if target: # 只在目标有效时才检查是否调仓
should = self._check_rebalance(
result.iloc[i], current_held, target,
select_num, rebalance_threshold
)
if should:
current_held = target
last_rebalance_idx = i
held_signals.append(current_held)
result["信号_raw"] = held_signals
result["信号"] = result["信号_raw"].shift(1)
result = result.drop(columns=["信号_raw"])
# 删除信号为 NaN 或空字符串的行
result = result.dropna(subset=["信号"])
result = result[result["信号"] != ""]
self.signals = result
self._print_signal_stats(result, select_num, rebalance_days, rebalance_threshold)
return result
def _check_rebalance(self, row, current_held, target, select_num, threshold):
"""检查是否应该调仓"""
if select_num == 1:
if target == current_held:
return False
new_score = float(row[f"得分_{target}"])
old_score = float(row[f"得分_{current_held}"])
if old_score > 0:
return (new_score / old_score - 1) >= threshold
return new_score > 0
else:
new_codes = [c for c in target.split(",") if c] # 过滤空字符串
old_codes = [c for c in current_held.split(",") if c] # 过滤空字符串
if not new_codes or not old_codes:
return True # 有空持仓,需要调仓
if set(new_codes) == set(old_codes):
return False
new_total = sum(float(row.get(f"得分_{c}", 0)) for c in new_codes)
old_total = sum(float(row.get(f"得分_{c}", 0)) for c in old_codes)
if old_total > 0:
return (new_total / old_total - 1) >= threshold
return new_total > 0
def _print_signal_stats(self, result, select_num, rebalance_days, rebalance_threshold):
"""打印信号统计"""
total_days = len(result)
if select_num == 1:
rebalance_count = (result["信号"] != result["信号"].shift(1)).sum() - 1
else:
prev = None
rebalance_count = 0
for s in result["信号"]:
if prev is not None and s != prev:
if set(s.split(",")) != set(prev.split(",")):
rebalance_count += 1
prev = s
rebalance_count = max(rebalance_count, 0)
avg_hold = total_days / max(rebalance_count, 1)
years = total_days / 252
annual_rebalances = rebalance_count / max(years, 0.1)
print(f"\n信号生成完成:")
print(f" 调仓周期: {rebalance_days} 天 | 阈值: {rebalance_threshold:.1%}")
print(f" 交易天数: {total_days}")
print(f" 调仓次数: {rebalance_count} | 平均持仓: {avg_hold:.1f} 天 | 年均调仓: {annual_rebalances:.1f}")
if select_num == 1:
signal_counts = result["信号"].value_counts()
print(f" 品种持仓分布 (前10):")
for code, count in signal_counts.head(10).items():
pct = count / total_days * 100
print(f" {code}: {count}天 ({pct:.1f}%)")
def run_backtest(self) -> pd.DataFrame:
"""执行回测"""
if self.signals is None:
self.generate_signals()
result = self.signals.copy()
select_num = self.config["select_num"]
trade_cost = self.config["trade_cost"]
# 计算策略日收益率 - 处理NaN值
if select_num == 1:
def calc_return(row):
signal = row['信号']
if not signal or pd.isna(signal):
return 0.0
ret = row.get(f"日收益率_{signal}", 0.0)
# 如果日收益率是NaN返回0.0
return ret if pd.notna(ret) else 0.0
result["轮动策略日收益率"] = result.apply(calc_return, axis=1)
else:
def calc_multi_return(row):
codes = [c for c in row["信号"].split(",") if c] # 过滤空字符串
if not codes:
return 0.0
# 获取各品种日收益率忽略NaN值
returns = []
for c in codes:
ret = row.get(f"日收益率_{c}", None)
if ret is not None and pd.notna(ret):
returns.append(ret)
# 如果所有品种日收益率都缺失返回0.0
return np.mean(returns) if returns else 0.0
result["轮动策略日收益率"] = result.apply(calc_multi_return, axis=1)
# 扣除交易成本
if trade_cost > 0:
prev_signal = result["信号"].shift(1)
if select_num == 1:
changed = (result["信号"] != prev_signal) & prev_signal.notna()
result.loc[changed, "轮动策略日收益率"] -= trade_cost
else:
turnover_list = []
for curr, prev in zip(result["信号"], prev_signal):
if pd.isna(prev) or curr == prev:
turnover_list.append(0.0)
else:
old = set(prev.split(","))
new = set(curr.split(","))
swapped = len(old - new)
turnover_list.append(swapped / len(old))
result["换手率"] = turnover_list
result["轮动策略日收益率"] -= result["换手率"] * trade_cost
# 计算净值 - 强制起点为1.0
result["轮动策略净值"] = (1 + result["轮动策略日收益率"]).cumprod()
# 归一化确保净值起点为1.0(消除第一行日收益率包含的前一天收益)
result["轮动策略净值"] = result["轮动策略净值"] / result["轮动策略净值"].iloc[0]
# 各ETF单独净值 - 使用第一个有效价格作为基准
for code in self.valid_codes:
# 获取第一个有效价格非NaN
valid_prices = result[code][result[code].notna()]
if len(valid_prices) > 0:
first_valid_price = valid_prices.iloc[0]
result[f"净值_{code}"] = result[code] / first_valid_price
else:
# 如果没有有效数据净值列全部为NaN
result[f"净值_{code}"] = np.nan
# 基准净值
# benchmark_data 是 DataFrame需要提取 close 列
if isinstance(self.benchmark_data, pd.DataFrame):
if 'close' in self.benchmark_data.columns:
bench_close = self.benchmark_data['close']
else:
# 宽格式数据
bench_close = self.benchmark_data.iloc[:, 0]
else:
bench_close = self.benchmark_data
bench_ret = bench_close.pct_change().dropna()
common_dates = result.index.intersection(bench_ret.index)
bench_ret = bench_ret.loc[common_dates]
result["基准日收益率"] = bench_ret.reindex(result.index, fill_value=0)
result["基准净值"] = (1 + result["基准日收益率"]).cumprod()
# 归一化确保基准净值起点为1.0
result["基准净值"] = result["基准净值"] / result["基准净值"].iloc[0]
self.backtest_result = result
# 打印摘要
total_days = len(result)
strategy_total_return = result["轮动策略净值"].iloc[-1] - 1
benchmark_total_return = result["基准净值"].iloc[-1] - 1
print(f"\n回测完成:")
print(f" 回测区间: {result.index.min().date()} ~ {result.index.max().date()}")
print(f" 交易天数: {total_days}")
print(f" 策略累计收益: {strategy_total_return:.2%}")
print(f" 基准累计收益: {benchmark_total_return:.2%}")
return result
def run(self) -> dict:
"""运行完整流程"""
self.fetch_data()
self.generate_signals()
self.run_backtest()
return self.backtest_result
def get_signals(self) -> pd.DataFrame:
"""获取当前信号"""
if self.signals is None:
self.generate_signals()
return self.signals

View File

@@ -1,252 +0,0 @@
"""
ETF轮动策略 - 持仓跟踪模块
"""
import pandas as pd
from typing import Optional
def track_positions(
backtest_result: pd.DataFrame,
code_name_map: dict = None,
select_num: int = 1,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
从回测结果中提取每笔持仓记录
Args:
backtest_result: 回测结果(含 '信号' 列)
code_name_map: 代码→名称映射
select_num: 每次选中的品种数量
Returns:
tuple: (trades_df, summary_df)
"""
code_name_map = code_name_map or {}
data = backtest_result.copy()
dates = data.index.tolist()
signals = data["信号"].tolist()
trades = []
if select_num == 1:
# 单品种轮动
current_code = signals[0]
entry_date = dates[0]
entry_price = data.loc[entry_date, current_code]
entry_nav = data.loc[entry_date, "轮动策略净值"]
for i in range(1, len(dates)):
today_code = signals[i]
if today_code != current_code:
exit_date = dates[i - 1]
exit_price = data.loc[exit_date, current_code]
exit_nav = data.loc[exit_date, "轮动策略净值"]
holding_days = (i - 1) - dates.index(entry_date) + 1
trade_return = exit_price / entry_price - 1 if entry_price != 0 else 0
nav_contrib = exit_nav - entry_nav
trades.append({
"序号": len(trades) + 1,
"品种代码": current_code,
"品种名称": code_name_map.get(current_code, current_code),
"进场日期": entry_date,
"出场日期": exit_date,
"持仓天数": holding_days,
"仓位占比": "100%",
"进场价格": round(entry_price, 2),
"出场价格": round(exit_price, 2),
"持仓收益": trade_return,
"进场净值": round(entry_nav, 4),
"出场净值": round(exit_nav, 4),
"净值贡献": round(nav_contrib, 4),
})
current_code = today_code
entry_date = dates[i]
entry_price = data.loc[entry_date, current_code]
entry_nav = data.loc[entry_date, "轮动策略净值"]
# 最后一笔
exit_date = dates[-1]
exit_price = data.loc[exit_date, current_code]
exit_nav = data.loc[exit_date, "轮动策略净值"]
holding_days = len(dates) - dates.index(entry_date)
trade_return = exit_price / entry_price - 1 if entry_price != 0 else 0
nav_contrib = exit_nav - entry_nav
trades.append({
"序号": len(trades) + 1,
"品种代码": current_code,
"品种名称": code_name_map.get(current_code, current_code),
"进场日期": entry_date,
"出场日期": exit_date,
"持仓天数": holding_days,
"仓位占比": "100%",
"进场价格": round(entry_price, 2),
"出场价格": round(exit_price, 2),
"持仓收益": trade_return,
"进场净值": round(entry_nav, 4),
"出场净值": round(exit_nav, 4),
"净值贡献": round(nav_contrib, 4),
})
else:
# 多品种等权轮动
current_signal = signals[0]
entry_date = dates[0]
codes = [c for c in current_signal.split(",") if c] # 过滤空字符串
if not codes:
# 空信号,返回空结果
return pd.DataFrame(trades), pd.DataFrame()
weight = 1.0 / len(codes)
entry_prices = {c: data.loc[entry_date, c] for c in codes}
entry_nav = data.loc[entry_date, "轮动策略净值"]
for i in range(1, len(dates)):
today_signal = signals[i]
if today_signal != current_signal:
exit_date = dates[i - 1]
exit_nav = data.loc[exit_date, "轮动策略净值"]
holding_days = (i - 1) - dates.index(entry_date) + 1
for c in codes:
exit_price = data.loc[exit_date, c]
ep = entry_prices[c]
trade_return = exit_price / ep - 1 if ep != 0 else 0
trades.append({
"序号": len(trades) + 1,
"品种代码": c,
"品种名称": code_name_map.get(c, c),
"进场日期": entry_date,
"出场日期": exit_date,
"持仓天数": holding_days,
"仓位占比": f"{weight:.0%}",
"进场价格": round(ep, 2),
"出场价格": round(exit_price, 2),
"持仓收益": trade_return,
"进场净值": round(entry_nav, 4),
"出场净值": round(exit_nav, 4),
"净值贡献": round((exit_nav - entry_nav) * weight, 4),
})
current_signal = today_signal
entry_date = dates[i]
codes = [c for c in current_signal.split(",") if c] # 过滤空字符串
if not codes:
break # 空信号,结束循环
weight = 1.0 / len(codes)
entry_prices = {c: data.loc[entry_date, c] for c in codes}
entry_nav = data.loc[entry_date, "轮动策略净值"]
# 最后一笔
exit_date = dates[-1]
exit_nav = data.loc[exit_date, "轮动策略净值"]
holding_days = len(dates) - dates.index(entry_date)
for c in codes:
exit_price = data.loc[exit_date, c]
ep = entry_prices[c]
trade_return = exit_price / ep - 1 if ep != 0 else 0
trades.append({
"序号": len(trades) + 1,
"品种代码": c,
"品种名称": code_name_map.get(c, c),
"进场日期": entry_date,
"出场日期": exit_date,
"持仓天数": holding_days,
"仓位占比": f"{weight:.0%}",
"进场价格": round(ep, 2),
"出场价格": round(exit_price, 2),
"持仓收益": trade_return,
"进场净值": round(entry_nav, 4),
"出场净值": round(exit_nav, 4),
"净值贡献": round((exit_nav - entry_nav) * weight, 4),
})
trades_df = pd.DataFrame(trades)
summary = _summarize_by_code(trades_df, code_name_map)
return trades_df, summary
def _summarize_by_code(trades_df: pd.DataFrame, code_name_map: dict) -> pd.DataFrame:
"""按品种汇总持仓统计"""
if trades_df.empty:
return pd.DataFrame()
groups = trades_df.groupby("品种代码")
rows = []
for code, grp in groups:
total_trades = len(grp)
total_days = grp["持仓天数"].sum()
avg_days = grp["持仓天数"].mean()
win_trades = (grp["持仓收益"] > 0).sum()
win_rate = win_trades / total_trades if total_trades > 0 else 0
avg_return = grp["持仓收益"].mean()
total_return = (1 + grp["持仓收益"]).prod() - 1
max_return = grp["持仓收益"].max()
min_return = grp["持仓收益"].min()
rows.append({
"品种代码": code,
"品种名称": code_name_map.get(code, code),
"调仓次数": total_trades,
"总持仓天数": total_days,
"平均持仓天数": round(avg_days, 1),
"胜率": win_rate,
"平均收益": avg_return,
"累计收益": total_return,
"最大单次收益": max_return,
"最大单次亏损": min_return,
})
summary = pd.DataFrame(rows)
summary = summary.sort_values("总持仓天数", ascending=False).reset_index(drop=True)
return summary
def save_trades(
trades_df: pd.DataFrame,
summary_df: pd.DataFrame,
save_path: str = "report",
) -> None:
"""保存调仓明细和汇总到CSV"""
import os
os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else ".", exist_ok=True)
# 保存调仓明细
trades_path = f"{save_path}_trades.csv"
if not trades_df.empty:
trades_out = trades_df.copy()
if "持仓收益" in trades_out.columns:
trades_out["持仓收益"] = trades_out["持仓收益"].apply(lambda x: f"{x:.2%}")
if "进场日期" in trades_out.columns:
trades_out["进场日期"] = trades_out["进场日期"].apply(
lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10]
)
if "出场日期" in trades_out.columns:
trades_out["出场日期"] = trades_out["出场日期"].apply(
lambda x: x.strftime("%Y-%m-%d") if hasattr(x, "strftime") else str(x)[:10]
)
trades_out.to_csv(trades_path, index=False, encoding="utf-8-sig")
print(f"\n调仓明细已保存: {trades_path}")
else:
# 创建空文件
pd.DataFrame().to_csv(trades_path, index=False, encoding="utf-8-sig")
print(f"\n调仓明细为空: {trades_path}")
# 保存品种汇总
summary_path = f"{save_path}_summary.csv"
if not summary_df.empty:
summary_out = summary_df.copy()
for col in ["胜率", "平均收益", "累计收益", "最大单次收益", "最大单次亏损"]:
if col in summary_out.columns:
summary_out[col] = summary_out[col].apply(lambda x: f"{x:.2%}")
summary_out.to_csv(summary_path, index=False, encoding="utf-8-sig")
print(f"品种汇总已保存: {summary_path}")
else:
# 创建空文件
pd.DataFrame().to_csv(summary_path, index=False, encoding="utf-8-sig")
print(f"品种汇总为空: {summary_path}")

View File

@@ -1,776 +0,0 @@
"""
ETF轮动策略 - 绩效报告模块
"""
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from typing import Optional
from core.common.utils import calculate_cagr, calculate_max_drawdown, calculate_sharpe
def generate_performance_report(
backtest_result: pd.DataFrame,
code_list: list,
code_name_map: dict = None,
benchmark_name: str = "沪深300指数",
save_path: str = "report",
select_num: int = 1,
code_config: dict = None,
index_data: pd.DataFrame = None,
etf_price_data: pd.DataFrame = None,
etf_nav_data_raw: pd.DataFrame = None,
) -> dict:
"""
生成完整的绩效报告
Args:
backtest_result: 回测结果
code_list: ETF代码列表
code_name_map: 代码到名称映射
benchmark_name: 基准名称
save_path: 报告保存路径前缀
select_num: 选中数量
code_config: 代码配置(包含 name, etf, market用于显示ETF映射
index_data: 指数价格数据
etf_price_data: ETF价格数据用于计算溢价率
etf_nav_data_raw: ETF净值数据用于计算溢价率
Returns:
dict: 绩效指标字典
"""
import os
os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else ".", exist_ok=True)
code_name_map = code_name_map or {}
code_config = code_config or {}
strategy_nav = backtest_result["轮动策略净值"]
strategy_ret = backtest_result["轮动策略日收益率"]
benchmark_nav = backtest_result["基准净值"]
benchmark_ret = backtest_result["基准日收益率"]
# 计算绩效指标
s_cagr_nat = calculate_cagr(strategy_nav, "natural_days")
s_cagr_trd = calculate_cagr(strategy_nav, "trading_days")
s_total_return = strategy_nav.iloc[-1] - 1
s_sharpe = calculate_sharpe(strategy_ret)
s_max_dd, s_dd_start, s_dd_end = calculate_max_drawdown(strategy_nav)
s_win_rate = (strategy_ret > 0).sum() / len(strategy_ret)
s_calmar = s_cagr_nat / abs(s_max_dd) if s_max_dd != 0 else np.inf
b_cagr_nat = calculate_cagr(benchmark_nav, "natural_days")
b_cagr_trd = calculate_cagr(benchmark_nav, "trading_days")
b_total_return = benchmark_nav.iloc[-1] - 1
b_sharpe = calculate_sharpe(benchmark_ret)
b_max_dd, _, _ = calculate_max_drawdown(benchmark_nav)
# 打印绩效表格
print("\n" + "=" * 70)
print(" 绩效评估报告")
print("=" * 70)
print(f" 回测区间: {strategy_nav.index.min().date()} ~ {strategy_nav.index.max().date()}")
print(f" 交易天数: {len(strategy_nav)}")
print("-" * 70)
print(f' {"指标":<25} {"轮动策略":>15} {"基准(" + benchmark_name + ")":>18}')
print("-" * 70)
print(f' {"累计收益":<25} {s_total_return:>14.2%} {b_total_return:>17.2%}')
print(f' {"CAGR(自然日口径)":<25} {s_cagr_nat:>14.2%} {b_cagr_nat:>17.2%}')
print(f' {"CAGR(交易日口径)":<25} {s_cagr_trd:>14.2%} {b_cagr_trd:>17.2%}')
print(f' {"年化夏普比率":<25} {s_sharpe:>14.2f} {b_sharpe:>17.2f}')
print(f' {"最大回撤":<25} {s_max_dd:>14.2%} {b_max_dd:>17.2%}')
print(f' {"Calmar比率":<23} {s_calmar:>14.2f} {"--":>17}')
print(f' {"日胜率":<25} {s_win_rate:>14.2%} {"--":>17}')
print(f' {"最大回撤区间":<22} {str(s_dd_start.date()):>10} ~ {str(s_dd_end.date())}')
print("=" * 70)
# 计算溢价率需要ETF价格和ETF净值
# 溢价率 = (ETF价格 - ETF净值) / ETF净值
# 使用信号日期的ETF收盘价但只有当天有净值数据时才计算溢价率
etf_close_data = {} # ETF收盘价
premium_data = {} # 溢价率(仅当当天有净值时计算)
if etf_price_data is not None:
signal_date = backtest_result.index[-1]
# 获取信号日期的ETF收盘价
if signal_date in etf_price_data.index:
for code in code_list:
if code in etf_price_data.columns:
etf_close = etf_price_data.loc[signal_date, code]
if pd.notna(etf_close):
etf_close_data[code] = etf_close
# 计算溢价率:只有当天有净值数据时才计算
if etf_nav_data_raw is not None and signal_date in etf_nav_data_raw.index:
for code in code_list:
if code in etf_close_data and code in etf_nav_data_raw.columns:
etf_nav = etf_nav_data_raw.loc[signal_date, code]
if pd.notna(etf_nav) and etf_nav > 0:
etf_close = etf_close_data[code]
premium = (etf_close - etf_nav) / etf_nav
premium_data[code] = premium
# 打印最新调仓信号
_print_latest_signal(backtest_result, code_list, code_name_map, select_num, code_config, etf_close_data, premium_data)
# 绘制图表
_plot_report_chart(
backtest_result, code_list, code_name_map,
benchmark_name, save_path, select_num,
metrics={
"累计收益": s_total_return,
"年化收益": s_cagr_nat,
"夏普比率": s_sharpe,
"最大回撤": s_max_dd,
"Calmar比率": s_calmar,
"日胜率": s_win_rate,
},
code_config=code_config,
etf_price_data=etf_price_data,
etf_nav_data_raw=etf_nav_data_raw,
)
# 保存整体策略KPI到JSON文件
import json
metrics_dict = {
"策略": {
"累计收益": float(s_total_return),
"年化收益(自然日)": float(s_cagr_nat),
"年化收益(交易日)": float(s_cagr_trd),
"夏普比率": float(s_sharpe),
"最大回撤": float(s_max_dd),
"Calmar比率": float(s_calmar),
"日胜率": float(s_win_rate),
"回测区间": {
"开始": strategy_nav.index.min().strftime("%Y-%m-%d"),
"结束": strategy_nav.index.max().strftime("%Y-%m-%d"),
"交易天数": len(strategy_nav)
}
},
"基准": {
"累计收益": float(b_total_return),
"年化收益(自然日)": float(b_cagr_nat),
"夏普比率": float(b_sharpe),
"最大回撤": float(b_max_dd),
"名称": benchmark_name
}
}
metrics_path = f"{save_path}_metrics.json"
with open(metrics_path, 'w', encoding='utf-8') as f:
json.dump(metrics_dict, f, indent=2, ensure_ascii=False)
print(f"策略指标已保存: {metrics_path}")
# 保存净值曲线数据到CSV文件
nav_df = pd.DataFrame({
'日期': strategy_nav.index.strftime('%Y-%m-%d'),
'策略净值': strategy_nav.values,
'基准净值': benchmark_nav.values,
})
# 添加各品种净值
for code in code_list:
if f"净值_{code}" in backtest_result.columns:
nav_df[f"净值_{code}"] = backtest_result[f"净值_{code}"].values
nav_path = f"{save_path}_nav.csv"
nav_df.to_csv(nav_path, index=False)
print(f"净值曲线已保存: {nav_path}")
# 返回指标字典
return {
"累计收益": s_total_return,
"CAGR_自然日": s_cagr_nat,
"CAGR_交易日": s_cagr_trd,
"夏普比率": s_sharpe,
"最大回撤": s_max_dd,
"Calmar比率": s_calmar,
"日胜率": s_win_rate,
"基准累计收益": b_total_return,
"基准CAGR_自然日": b_cagr_nat,
"基准夏普比率": b_sharpe,
"基准最大回撤": b_max_dd,
}
def _print_latest_signal(backtest_result: pd.DataFrame, code_list: list, code_name_map: dict, select_num: int, code_config: dict = None, etf_close_data: dict = None, premium_data: dict = None):
"""打印最新调仓信号支持ETF映射、ETF收盘价和溢价率显示"""
code_config = code_config or {}
etf_close_data = etf_close_data or {}
premium_data = premium_data or {}
latest = _extract_latest_positions(backtest_result, code_list, code_name_map, select_num)
signal_date = latest["signal_date"]
signal_date_str = signal_date.strftime("%Y-%m-%d")
# 数据基准日期:使用信号日期的数据
# 如果信号日期没有数据,则使用前一天
if signal_date in backtest_result.index:
data_base_date = signal_date
else:
data_base_date = signal_date - pd.Timedelta(days=1)
data_base_date_str = data_base_date.strftime("%Y-%m-%d")
print("\n")
print("=" * 135)
print(" 最新调仓信号 (下一交易日执行)")
print("=" * 135)
print(f" 信号日期: {signal_date_str} (基于 {data_base_date_str} 收盘数据)")
print()
# 表头 - 添加ETF收盘价和溢价率列
print(f' {"标的名称":<10} {"指数代码":>12} {"ETF代码":>12} {"仓位":>6} {"得分":>8} {"进场日期":>12} {"指数进场价":>10} {"指数最新价":>10} {"ETF收盘价":>10} {"溢价率":>8} {"操作":>6} {"持有天数":>8} {"盈亏":>10}')
print(" " + "-" * 155)
# 下期持仓(调入/维持)
for pos in latest["positions"]:
pnl_str = f'{pos["pnl"]:>+9.2%}' if pos["pnl"] is not None else ''
days_str = f'{pos["holding_days"]:>7}' if pos["holding_days"] is not None else ''
entry_str = f'{pos["entry_price"]:>10.2f}' if pos["entry_price"] is not None else ''
entry_date_str = pos["entry_date"].strftime("%Y-%m-%d") if pos.get("entry_date") else ''
score_str = f'{pos["score"]:>8.2f}' if pos["score"] is not None else ''
flag = '' if pos["action"] == "调入" else ' '
# 获取ETF代码、ETF净值和溢价率
idx_code = pos["code"]
cfg = code_config.get(idx_code, {})
etf_code = cfg.get('etf', '')
market = cfg.get('market', 'A')
if etf_code is None:
etf_code = '直接交易'
# 获取ETF收盘价和溢价率
if market == 'CRYPTO':
etf_close_str = ''
premium_str = ''
else:
# ETF收盘价
etf_close = etf_close_data.get(idx_code)
if etf_close is not None:
etf_close_str = f'{etf_close:>10.3f}'
else:
etf_close_str = ''
# 溢价率(只有当天有净值数据时才显示)
premium = premium_data.get(idx_code)
if premium is not None:
# 高溢价警告标记
warning = '⚠️' if premium > 0.02 else ''
premium_str = f'{premium:>+7.2%}{warning}'
else:
premium_str = ''
print(f' {pos["name"]:<10} {idx_code:>12} {etf_code:>12} {pos["weight"]:>6.0%} {score_str} {entry_date_str:>12} {entry_str} {pos["current_price"]:>10.2f} {etf_close_str} {premium_str} {flag}{pos["action"]:>4} {days_str} {pnl_str}')
# 需调出的品种
if latest["exit_positions"]:
print()
print(" 需调出:")
for pos in latest["exit_positions"]:
pnl_str = f'{pos["pnl"]:>+9.2%}' if pos["pnl"] is not None else ''
days_str = f'{pos["holding_days"]:>7}' if pos["holding_days"] is not None else ''
entry_str = f'{pos["entry_price"]:>10.2f}' if pos["entry_price"] is not None else ''
entry_date_str = pos["entry_date"].strftime("%Y-%m-%d") if pos.get("entry_date") else ''
score_str = '' # 调出品种无得分
# 获取ETF代码、ETF收盘价和溢价率
idx_code = pos["code"]
cfg = code_config.get(idx_code, {})
etf_code = cfg.get('etf', '')
market = cfg.get('market', 'A')
if etf_code is None:
etf_code = '直接交易'
# 获取ETF收盘价和溢价率
if market == 'CRYPTO':
etf_close_str = ''
premium_str = ''
else:
# ETF收盘价
etf_close = etf_close_data.get(idx_code)
if etf_close is not None:
etf_close_str = f'{etf_close:>10.3f}'
else:
etf_close_str = ''
# 溢价率(只有当天有净值数据时才显示)
premium = premium_data.get(idx_code)
if premium is not None:
warning = '⚠️' if premium > 0.02 else ''
premium_str = f'{premium:>+7.2%}{warning}'
else:
premium_str = ''
print(f' {pos["name"]:<10} {idx_code:>12} {etf_code:>12} {pos["weight"]:>6.0%} {score_str} {entry_date_str:>12} {entry_str} {pos["current_price"]:>10.2f} {etf_close_str} {premium_str} ▼调出 {days_str} {pnl_str}')
print("=" * 160)
def _extract_latest_positions(backtest_result: pd.DataFrame, code_list: list, code_name_map: dict, select_num: int) -> dict:
"""提取最新持仓和下期调仓建议"""
last_date = backtest_result.index[-1]
last_row = backtest_result.iloc[-1]
# 当前持仓
current_signal = last_row["信号"]
if select_num == 1:
current_codes = [current_signal]
else:
current_codes = current_signal.split(",")
# 下期建议
score_cols = [f"得分_{code}" for code in code_list if f"得分_{code}" in backtest_result.columns]
scores = pd.to_numeric(last_row[score_cols], errors="coerce")
top_n = scores.nlargest(select_num)
next_codes = [c.replace("得分_", "") for c in top_n.index]
# 计算持仓信息
positions_info = []
weight = 1.0 / select_num
for code in next_codes:
name = code_name_map.get(code, code)
action = "维持" if code in current_codes else "调入"
# 获取当前价格和得分
current_price = last_row.get(code, 0)
score = scores.get(f"得分_{code}", None)
# 计算持仓信息(如果是维持的仓位)
entry_date = None
entry_price = None
holding_days = None
pnl = None
if action == "维持":
# 找到该标的最近一次连续持仓的起始日期
signal_series = backtest_result["信号"]
mask = signal_series == code if select_num == 1 else signal_series.str.contains(code, regex=False, na=False)
# 找到连续持仓段(从后往前找)
is_holding = mask.values
dates = backtest_result.index
# 从最后一天往前遍历,找到连续持仓的起始点
entry_date = None
for i in range(len(is_holding) - 1, -1, -1):
if is_holding[i]:
entry_date = dates[i]
else:
break
if entry_date is not None:
entry_price = backtest_result.loc[entry_date, code]
holding_days = (last_date - entry_date).days
if entry_price and entry_price != 0:
pnl = current_price / entry_price - 1
positions_info.append({
"code": code,
"name": name,
"weight": weight,
"score": score,
"action": action,
"current_price": current_price,
"entry_date": entry_date,
"entry_price": entry_price,
"holding_days": holding_days,
"pnl": pnl,
})
# 需调出的品种信息
exit_positions = []
for code in current_codes:
if code not in next_codes:
name = code_name_map.get(code, code)
current_price = last_row.get(code, 0)
# 计算调出品种的持仓信息(最近一次连续持仓)
signal_series = backtest_result["信号"]
mask = signal_series == code if select_num == 1 else signal_series.str.contains(code, regex=False, na=False)
# 找到连续持仓段(从后往前找)
is_holding = mask.values
dates = backtest_result.index
entry_price = None
holding_days = None
pnl = None
# 从最后一天往前遍历,找到连续持仓的起始点
entry_date = None
for i in range(len(is_holding) - 1, -1, -1):
if is_holding[i]:
entry_date = dates[i]
else:
break
if entry_date is not None:
entry_price = backtest_result.loc[entry_date, code]
holding_days = (last_date - entry_date).days
if entry_price and entry_price != 0:
pnl = current_price / entry_price - 1
exit_positions.append({
"code": code,
"name": name,
"weight": weight,
"score": None, # 调出品种无得分
"action": "调出",
"current_price": current_price,
"entry_date": entry_date,
"entry_price": entry_price,
"holding_days": holding_days,
"pnl": pnl,
})
return {
"signal_date": last_date,
"current_codes": current_codes,
"next_codes": next_codes,
"positions": positions_info,
"exit_positions": exit_positions,
}
def _plot_report_chart(
backtest_result: pd.DataFrame,
code_list: list,
code_name_map: dict,
benchmark_name: str,
save_path: str,
select_num: int,
metrics: dict = None,
code_config: dict = None,
etf_price_data: pd.DataFrame = None,
etf_nav_data_raw: pd.DataFrame = None,
):
"""绘制报告图表支持ETF净值和溢价率显示"""
# 设置中文字体macOS: Arial Unicode MS, Linux: WenQuanYi Zen Hei
plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "WenQuanYi Zen Hei", "DejaVu Sans"]
plt.rcParams["axes.unicode_minus"] = False
strategy_nav = backtest_result["轮动策略净值"]
benchmark_nav = backtest_result["基准净值"]
strategy_ret = backtest_result["轮动策略日收益率"]
# 计算绩效指标(如果没有传入)
if metrics is None:
from core.common.utils import calculate_cagr, calculate_max_drawdown, calculate_sharpe
s_cagr_nat = calculate_cagr(strategy_nav, "natural_days")
s_total_return = strategy_nav.iloc[-1] - 1
s_sharpe = calculate_sharpe(strategy_ret)
s_max_dd, s_dd_start, s_dd_end = calculate_max_drawdown(strategy_nav)
s_win_rate = (strategy_ret > 0).sum() / len(strategy_ret)
s_calmar = s_cagr_nat / abs(s_max_dd) if s_max_dd != 0 else np.inf
metrics = {
"累计收益": s_total_return,
"年化收益": s_cagr_nat,
"夏普比率": s_sharpe,
"最大回撤": s_max_dd,
"Calmar比率": s_calmar,
"日胜率": s_win_rate,
}
# 提取最新调仓信息
latest = _extract_latest_positions(backtest_result, code_list, code_name_map, select_num)
# 准备配置数据
code_config = code_config or {}
signal_date = backtest_result.index[-1]
# 数据基准日期:使用信号日期的数据,如果没有则使用前一天
if signal_date in backtest_result.index:
data_base_date = signal_date
else:
data_base_date = signal_date - pd.Timedelta(days=1)
# 计算ETF收盘价和溢价率使用信号日期的数据
etf_close_dict = {} # ETF收盘价
premium_dict = {} # 溢价率(仅当当天有净值时计算)
if etf_price_data is not None:
# 获取信号日期的ETF收盘价
if signal_date in etf_price_data.index:
for code in code_list:
if code in etf_price_data.columns:
etf_close = etf_price_data.loc[signal_date, code]
if pd.notna(etf_close):
etf_close_dict[code] = etf_close
# 计算溢价率:只有当天有净值数据时才计算
if etf_nav_data_raw is not None and signal_date in etf_nav_data_raw.index:
for code in code_list:
if code in etf_close_dict and code in etf_nav_data_raw.columns:
etf_nav = etf_nav_data_raw.loc[signal_date, code]
if pd.notna(etf_nav) and etf_nav > 0:
etf_close = etf_close_dict[code]
premium = (etf_close - etf_nav) / etf_nav
premium_dict[code] = premium
# 计算表格行数
n_table_rows = len(latest["positions"]) + len(latest["exit_positions"])
signal_table_height = max(2.0, 0.6 + n_table_rows * 0.35)
metrics_table_height = 1.2
fig = plt.figure(figsize=(14, 10 + signal_table_height + metrics_table_height + 8))
gs = fig.add_gridspec(5, 1, height_ratios=[signal_table_height, metrics_table_height, 3, 1, 1.2], hspace=0.35)
# 面板0: 最新调仓信号表
ax0 = fig.add_subplot(gs[0])
ax0.axis("off")
signal_date = latest["signal_date"]
signal_date_str = signal_date.strftime("%Y-%m-%d")
# 数据基准日期:使用信号日期的数据,如果没有则使用前一天
if signal_date in backtest_result.index:
data_base_date = signal_date
else:
data_base_date = signal_date - pd.Timedelta(days=1)
data_base_date_str = data_base_date.strftime("%Y-%m-%d")
ax0.set_title(f"最新调仓信号 (信号日期: {signal_date_str},基于 {data_base_date_str} 数据,下一交易日执行)", fontsize=14, fontweight="bold", loc="left", pad=15)
# 构建表格数据添加ETF代码、ETF收盘价和溢价率列
table_data = []
col_labels = ["标的名称", "指数代码", "ETF代码", "仓位", "得分", "进场日期", "进场价", "最新价", "ETF收盘价", "溢价率", "操作", "持有天数", "盈亏"]
# 下期持仓(调入/维持)
for pos in latest["positions"]:
pnl_str = f'{pos["pnl"]:+.2%}' if pos["pnl"] is not None else ""
days_str = f'{pos["holding_days"]}' if pos["holding_days"] is not None else ""
entry_str = f'{pos["entry_price"]:.2f}' if pos["entry_price"] is not None else ""
entry_date_str = pos["entry_date"].strftime("%m-%d") if pos.get("entry_date") else ""
score_str = f'{pos["score"]:.2f}' if pos["score"] is not None else ""
# 获取ETF代码、ETF收盘价和溢价率
idx_code = pos["code"]
cfg = code_config.get(idx_code, {})
market = cfg.get('market', 'A')
etf_code = cfg.get('etf', '')
if etf_code is None:
etf_code = '直接交易'
if market == 'CRYPTO':
etf_close_str = ""
premium_str = ""
else:
etf_close = etf_close_dict.get(idx_code)
premium = premium_dict.get(idx_code)
etf_close_str = f"{etf_close:.3f}" if etf_close is not None else ""
if premium is not None:
warning = "⚠️" if premium > 0.02 else ""
premium_str = f"{premium:+.2%}{warning}"
else:
premium_str = ""
table_data.append([
pos["name"], pos["code"], etf_code, f'{pos["weight"]:.0%}',
score_str, entry_date_str, entry_str, f'{pos["current_price"]:.2f}',
etf_close_str, premium_str, pos["action"], days_str, pnl_str
])
# 需调出的品种
for pos in latest["exit_positions"]:
pnl_str = f'{pos["pnl"]:+.2%}' if pos["pnl"] is not None else ""
days_str = f'{pos["holding_days"]}' if pos["holding_days"] is not None else ""
entry_str = f'{pos["entry_price"]:.2f}' if pos["entry_price"] is not None else ""
entry_date_str = pos["entry_date"].strftime("%m-%d") if pos.get("entry_date") else ""
score_str = "" # 调出品种无得分
# 获取ETF代码、ETF收盘价和溢价率
idx_code = pos["code"]
cfg = code_config.get(idx_code, {})
market = cfg.get('market', 'A')
etf_code = cfg.get('etf', '')
if etf_code is None:
etf_code = '直接交易'
if market == 'CRYPTO':
etf_close_str = ""
premium_str = ""
else:
etf_close = etf_close_dict.get(idx_code)
premium = premium_dict.get(idx_code)
etf_close_str = f"{etf_close:.3f}" if etf_close is not None else ""
if premium is not None:
warning = "⚠️" if premium > 0.02 else ""
premium_str = f"{premium:+.2%}{warning}"
else:
premium_str = ""
table_data.append([
pos["name"], pos["code"], etf_code, f'{pos["weight"]:.0%}',
score_str, entry_date_str, entry_str, f'{pos["current_price"]:.2f}',
etf_close_str, premium_str, "调出", days_str, pnl_str
])
if table_data:
table = ax0.table(
cellText=table_data,
colLabels=col_labels,
loc="center",
cellLoc="center",
colWidths=[0.08, 0.08, 0.08, 0.05, 0.06, 0.06, 0.06, 0.06, 0.06, 0.07, 0.05, 0.06, 0.06],
bbox=[0, 0, 1, 1], # 使用完整宽度
)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 2.0) # 行高与绩效表格一致
# 表头深色
for j in range(len(col_labels)):
table[0, j].set_facecolor("#2C3E50")
table[0, j].set_text_props(color="white", fontweight="bold")
# 数据行按操作着色
for i in range(len(table_data)):
action = table_data[i][10] # 操作列在第11列索引10
if action == "调入":
color = "#d4edda" # 绿色
elif action == "调出":
color = "#f8d7da" # 红色
else:
color = "#fff3cd" # 黄色(维持)
for j in range(len(col_labels)):
table[i + 1, j].set_facecolor(color)
# 面板1: 策略绩效指标对比表(转置:行为策略/基准,列为指标)
ax1 = fig.add_subplot(gs[1])
ax1.axis("off")
ax1.set_title("策略绩效对比", fontsize=14, fontweight="bold", loc="left", pad=10)
# 计算基准指标
from core.common.utils import calculate_cagr, calculate_max_drawdown, calculate_sharpe
benchmark_ret = backtest_result["基准日收益率"]
b_cagr_nat = calculate_cagr(benchmark_nav, "natural_days")
b_total_return = benchmark_nav.iloc[-1] - 1
b_sharpe = calculate_sharpe(benchmark_ret)
b_max_dd, _, _ = calculate_max_drawdown(benchmark_nav)
# 构建绩效对比表格(转置)
start_date = strategy_nav.index.min().strftime("%Y-%m-%d")
end_date = strategy_nav.index.max().strftime("%Y-%m-%d")
# 列标题(指标),第一列添加"策略"
perf_col_labels = ["策略", "开始时间", "结束时间", "累计收益", "年化收益", "最大回撤", "夏普比率", "Calmar比率", "日胜率"]
# 策略行数据(包含行标题)
strategy_row = [
"轮动策略",
start_date,
end_date,
f"{metrics.get('累计收益', 0):.2%}",
f"{metrics.get('年化收益', 0):.2%}",
f"{metrics.get('最大回撤', 0):.2%}",
f"{metrics.get('夏普比率', 0):.2f}",
f"{metrics.get('Calmar比率', 0):.2f}",
f"{metrics.get('日胜率', 0):.2%}",
]
# 基准行数据(包含行标题)
benchmark_row = [
f"基准({benchmark_name})",
start_date,
end_date,
f"{b_total_return:.2%}",
f"{b_cagr_nat:.2%}",
f"{b_max_dd:.2%}",
f"{b_sharpe:.2f}",
"",
"",
]
# 表格数据2行策略、基准
perf_table_data = [strategy_row, benchmark_row]
# 使用与调仓表格相同的列宽计算方式,确保总宽度一致
# 调仓表格有10列这里也有9列使用相似的宽度分配
perf_col_widths = [0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10]
perf_table = ax1.table(
cellText=perf_table_data,
colLabels=perf_col_labels,
loc="center",
cellLoc="center",
colWidths=perf_col_widths,
bbox=[0, 0, 1, 1], # 使用完整宽度,与调仓表格一致
)
perf_table.auto_set_font_size(False)
perf_table.set_fontsize(10) # 字体大小与调仓表格一致
perf_table.scale(1, 2.0) # 行高与调仓表格一致
# 表头样式(第一行)
for j in range(len(perf_col_labels)):
perf_table[0, j].set_facecolor("#2C3E50")
perf_table[0, j].set_text_props(color="white", fontweight="bold")
# 数据行样式
# 策略行浅绿背景
for j in range(len(perf_col_labels)):
perf_table[1, j].set_facecolor("#d4edda")
# 基准行浅蓝背景
for j in range(len(perf_col_labels)):
perf_table[2, j].set_facecolor("#cce5ff")
# 第一列(策略名称)加粗
for i in range(2):
perf_table[i + 1, 0].set_text_props(fontweight="bold")
# 面板2: 净值曲线
ax2 = fig.add_subplot(gs[2])
ax2.plot(strategy_nav.index, strategy_nav.values,
label="轮动策略", linewidth=2, color="#E74C3C")
ax2.plot(benchmark_nav.index, benchmark_nav.values,
label=benchmark_name, linewidth=1.5, color="#3498DB", alpha=0.8)
chart_colors = plt.cm.tab20.colors
show_legend_n = min(len(code_list), 10)
for i, code in enumerate(code_list):
name = code_name_map.get(code, code)
lbl = name if i < show_legend_n else None
ax2.plot(backtest_result.index, backtest_result[f"净值_{code}"].values,
label=lbl, linewidth=0.8, alpha=0.4,
color=chart_colors[i % len(chart_colors)])
ax2.set_title("ETF轮动策略 - 净值曲线", fontsize=16, fontweight="bold")
ax2.set_ylabel("净值")
ax2.legend(loc="upper left", fontsize=8, ncol=2)
ax2.grid(True, alpha=0.3)
ax2.set_yscale("log")
# 面板3: 回撤曲线
ax3 = fig.add_subplot(gs[3])
cummax = strategy_nav.cummax()
drawdown = (strategy_nav - cummax) / cummax
ax3.fill_between(drawdown.index, drawdown.values, 0, alpha=0.5, color="#E74C3C")
ax3.set_title("策略回撤", fontsize=12)
ax3.set_ylabel("回撤")
ax3.grid(True, alpha=0.3)
# 面板4: 持仓分布
ax4 = fig.add_subplot(gs[4])
signal_series = backtest_result["信号"]
for i, code in enumerate(code_list):
name = code_name_map.get(code, code)
if select_num > 1:
mask = signal_series.str.contains(code, regex=False, na=False)
else:
mask = signal_series == code
if mask.any():
ax4.fill_between(signal_series.index, i, i + 0.8,
where=mask, alpha=0.7,
color=chart_colors[i % len(chart_colors)],
label=name)
ylabels = [code_name_map.get(c, c) for c in code_list]
ax4.set_title("每日持仓分布", fontsize=12)
ax4.set_yticks(range(len(ylabels)))
ax4.set_yticklabels(ylabels, fontsize=7)
ax4.grid(True, alpha=0.3)
chart_path = f"{save_path}_chart.png"
plt.savefig(chart_path, dpi=150, bbox_inches="tight")
plt.close()
print(f"\n报告图表已保存: {chart_path}")