feat(backtest): 消除前视偏差,实现动态ETF池重建

消除回测前视偏差(Look-Ahead Bias):
- 新增 ETFDataCache 本地缓存系统,预下载全量ETF(含已退市)基础信息和日线数据
- 改造 ETFUniverseBuilder 支持纯历史模式,每个时间点只使用当时可获得的数据
- 动量.py 新增 dynamic 模式,回测中每60交易日动态重建ETF候选池
- momentum_experiment.py 同步支持动态重建
- 新增 ETF筛选引擎文档和动态池方案文档

无前视偏差实验结果(6组对比,2015-2026):
  A: 全仓1只       CAGR=3.32%, MaxDD=-63.19%, Sharpe=0.26
  B: 等权3只       CAGR=3.40%, MaxDD=-49.72%, Sharpe=0.30 ← 最优
  C: 反波动率3只   CAGR=1.73%, MaxDD=-38.59%, Sharpe=0.21
  D: 等权5只       CAGR=2.77%, MaxDD=-42.39%, Sharpe=0.29
  E: 反波动率5只   CAGR=-0.37%, MaxDD=-19.56%, Sharpe=-0.03
  F: 动量>0全选等权 CAGR=2.02%, MaxDD=-43.27%, Sharpe=0.24

最优方案: B(等权3只)夏普、Calmar、CAGR三项均最高
This commit is contained in:
2026-04-29 22:15:01 +08:00
parent e301a08724
commit 2829f80427
6 changed files with 2597 additions and 0 deletions

View File

@@ -0,0 +1,744 @@
"""
动态ETF池自动化筛选引擎
=========================
多层漏斗筛选从全市场ETF中选出低相关、高流动性、覆盖多资产类别的最优轮动候选池。
参考文献:
- TrendFolios (arxiv:2506.09330): 资产标签化 + 无前视偏差
- AEGIS (arxiv:2604.09060): 流动性硬门槛 + 定期重建
- HRP (SSRN:2708678): 层次聚类相关性优化
- Faber GTAA (SSRN:962461): 风险因子覆盖设计
- Antonacci Dual Momentum (SSRN:2042750): 跨资产分散化
用法:
python scripts/build_etf_universe.py # 当前日期构建
python scripts/build_etf_universe.py --date 20240101 # 指定日期构建
"""
import os
import sys
import time
import argparse
import logging
from pathlib import Path
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
import tushare as ts
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# ============================================================
# 配置
# ============================================================
DEFAULT_CONFIG = {
'min_list_days': 365, # 上市满1年
'min_daily_amount': 5000, # 日均成交额(万元)
'lookback_amount_days': 60, # 计算日均成交额的窗口
'n_select': 'auto', # 最终池大小: 'auto'=ENB驱动, 或整数固定
'candidate_multiplier': 3.0, # Layer4 候选池 = ENB估计 * 此倍数
'min_per_class': 2, # 每类最少保留数
'max_corr': 0.85, # 最大允许相关系数
'corr_lookback_days': 120, # 相关性计算窗口
'max_equity_ratio': 0.5, # A股行业占比上限
'enb_fallback': 12, # ENB计算失败时的回退值
}
# ============================================================
# Layer 3: 大类资产分类配置
# ============================================================
# 分类优先级: fund_type/invest_type(官方字段) > benchmark(跟踪指数) > name(名称关键词兜底)
# Layer 4: 大类资产类别列表 (保留数量由数据驱动计算)
ASSET_CLASSES = ['A股宽基', 'A股行业', 'A股主题', '港股', '美股',
'全球/其他', '商品', '债券', 'REITs', '货币/现金']
# --- 以下为分类规则(仅作名称兜底时使用) ---
_BROAD_KW = ['沪深300', '中证500', '中证1000', '创业板', '上证50', '科创50',
'上证180', '深证100', '中证100', 'A50', 'A500', '中证800',
'万得全A', '富时A50', 'MSCI中国A']
_HK_KW = ['恒生', '港股', 'H股', '港股通']
_US_KW = ['纳斯达克', '纳指', '标普500', '美股', 'S&P500', '道琼斯']
_GLOBAL_KW = ['日经', '德国', '法国', '越南', '印度', '东南亚',
'沙特', '韩国', '英国', '全球', '亚太']
_THEME_KW = ['红利', '央企', '国企', 'ESG', '碳中和', '数字经济',
'人工智能', 'AI', '机器人', '信创', '北证50',
'一带一路', '养老', '价值', '成长', '质量',
'现金流', '低波']
class ETFUniverseBuilder:
"""动态ETF池筛选引擎"""
def __init__(self, config: dict = None, ref_date: str = None, data_cache=None):
"""
Args:
config: 配置字典,缺省用 DEFAULT_CONFIG
ref_date: 参考日期 YYYYMMDD缺省为当天
data_cache: ETFDataCache 实例,传入则使用本地缓存(无前视偏差模式)
"""
self.cfg = {**DEFAULT_CONFIG, **(config or {})}
self.ref_date = ref_date or datetime.now().strftime('%Y%m%d')
self.ref_dt = pd.Timestamp(self.ref_date)
self.data_cache = data_cache
if data_cache is None:
token = os.getenv('TUSHARE_TOKEN')
if not token:
raise ValueError("请设置环境变量 TUSHARE_TOKEN")
self.pro = ts.pro_api(token)
else:
self.pro = None # 缓存模式不需要 API
self.output_dir = Path(__file__).parent.parent / 'data' / 'etf_universe'
self.output_dir.mkdir(parents=True, exist_ok=True)
# 管线日志
self._log_lines = []
def _log(self, msg: str):
logger.info(msg)
self._log_lines.append(msg)
def _api_call(self, func, **kwargs):
"""带重试和限流的 API 调用"""
for attempt in range(3):
try:
result = func(**kwargs)
time.sleep(0.35)
return result
except Exception as e:
if attempt < 2:
time.sleep(2)
else:
raise e
# ============================================================
# Layer 0: 获取全量 ETF 基础数据
# ============================================================
def fetch_etf_universe(self) -> pd.DataFrame:
"""获取全量上市ETF基础信息"""
self._log("=" * 60)
self._log("Layer 0: 获取全量ETF基础信息")
self._log("=" * 60)
if self.data_cache is not None:
# 缓存模式: 从本地读取,只保留 ref_date 时已上市且未退市的
df = self.data_cache.load_basic().copy()
df['list_date'] = pd.to_datetime(df['list_date'])
# 只保留 ref_date 时已上市的
mask = df['list_date'] <= self.ref_dt
# 排除 ref_date 之前已退市的
if 'delist_date' in df.columns:
delist = pd.to_datetime(df['delist_date'], errors='coerce')
mask = mask & (delist.isna() | (delist > self.ref_dt))
# 只保留 market='E' 的(缓存可能包含场外基金)
if 'type' in df.columns:
# fund_basic 的 type 字段区分 ETF 类型
pass # 缓存已经是 market='E' 的
df = df[mask].copy()
self._log(f" 缓存模式: 截至 {self.ref_date} 已上市ETF: {len(df)}")
else:
# 在线模式: 调用 API
df = self._api_call(
self.pro.fund_basic,
market='E',
status='L',
fields='ts_code,name,management,list_date,fund_type,invest_type,benchmark,type,trustee'
)
if df is None or df.empty:
raise RuntimeError("获取ETF列表失败请检查Tushare权限")
self._log(f" 全量上市ETF: {len(df)}")
df['list_date'] = pd.to_datetime(df['list_date'])
return df
# ============================================================
# Layer 1: 基础过滤
# ============================================================
def basic_filter(self, df: pd.DataFrame) -> pd.DataFrame:
"""硬性门槛过滤"""
self._log("\n" + "=" * 60)
self._log("Layer 1: 基础过滤")
self._log("=" * 60)
before = len(df)
# 1. 上市时间过滤
cutoff = self.ref_dt - timedelta(days=self.cfg['min_list_days'])
df = df[df['list_date'] <= cutoff].copy()
self._log(f" 上市满1年: {before} -> {len(df)}")
# 2. 排除货币型、QDII中的债券型
# fund_type: 股票型/混合型/债券型/货币型/其他
if 'fund_type' in df.columns:
exclude_types = ['货币型']
mask = ~df['fund_type'].str.contains('|'.join(exclude_types), na=False)
df = df[mask]
self._log(f" 排除货币型: -> {len(df)}")
# 3. 排除杠杆/反向 ETF
leverage_kw = ['杠杆', '反向', '两倍', '三倍', '2X', '3X', '-1X', '分级']
mask = ~df['name'].str.contains('|'.join(leverage_kw), na=False, case=False)
df = df[mask]
self._log(f" 排除杠杆/反向: -> {len(df)}")
# 4. 获取流动性数据(日均成交额)
self._log(f"\n 获取近{self.cfg['lookback_amount_days']}日成交额数据...")
amount_start = (self.ref_dt - timedelta(days=self.cfg['lookback_amount_days'] * 2)).strftime('%Y%m%d')
amounts = {}
total = len(df)
for idx, (_, row) in enumerate(df.iterrows()):
code = row['ts_code']
if idx % 50 == 0:
self._log(f" 进度: {idx}/{total}")
try:
if self.data_cache is not None:
# 缓存模式
daily_df = self.data_cache.load_cached_daily(code, self.ref_date)
if not daily_df.empty:
daily_df = daily_df[daily_df['trade_date'] >= amount_start]
if not daily_df.empty and 'amount' in daily_df.columns:
avg_amount = daily_df['amount'].astype(float).mean() / 10
amounts[code] = avg_amount
else:
# 在线模式
daily = self._api_call(
self.pro.fund_daily,
ts_code=code,
start_date=amount_start,
end_date=self.ref_date,
fields='ts_code,trade_date,amount'
)
if daily is not None and not daily.empty:
# amount 单位是千元,转成万元
avg_amount = daily['amount'].astype(float).mean() / 10
amounts[code] = avg_amount
except Exception:
pass
df['avg_daily_amount'] = df['ts_code'].map(amounts)
df = df.dropna(subset=['avg_daily_amount'])
df = df[df['avg_daily_amount'] >= self.cfg['min_daily_amount']]
self._log(f" 日均成交额>={self.cfg['min_daily_amount']}万: -> {len(df)}")
self._log(f"\nLayer 1 结果: {before} -> {len(df)}")
return df
# ============================================================
# Layer 2: 同指数去重
# ============================================================
def dedup_by_index(self, df: pd.DataFrame) -> pd.DataFrame:
"""同一跟踪指数只保留最优的一只ETF"""
self._log("\n" + "=" * 60)
self._log("Layer 2: 同指数去重")
self._log("=" * 60)
before = len(df)
# 尝试获取指数信息做去重
# 先从 name 中提取隐含的指数信息
# 用名称相似度进行分组: 去掉 ETF/联接/LOF 等后缀
import re
def extract_index_name(name: str) -> str:
"""从ETF名称提取核心指数名"""
# 去掉常见后缀
for suffix in ['ETF', 'LOF', '联接', '基金', 'A', 'C', '(', '']:
name = name.split(suffix)[0]
# 去掉基金公司前缀 (通常是2-4个汉字 + 核心名)
# 常见基金公司
companies = ['华夏', '易方达', '南方', '华安', '嘉实', '富国', '广发',
'博时', '工银', '招商', '华宝', '天弘', '中银', '建信',
'汇添富', '鹏华', '国泰', '银华', '大成', '景顺', '长城',
'中欧', '交银', '兴全', '平安', '万家', '泰康', '诺安',
'华泰柏瑞', '华泰', '浦银安盛', '国金', '长信', '东方',
'中证', '方正富邦', '前海开源', '申万菱信', '融通']
for c in companies:
if name.startswith(c):
name = name[len(c):]
break
return name.strip()
df = df.copy()
df['index_name'] = df['name'].apply(extract_index_name)
# 按 index_name 分组,每组选日均成交额最大的
df = df.sort_values('avg_daily_amount', ascending=False)
df = df.drop_duplicates(subset='index_name', keep='first')
self._log(f" 同名去重: {before} -> {len(df)}")
return df
# ============================================================
# Layer 3: 大类资产标签化
# ============================================================
def label_asset_class(self, df: pd.DataFrame) -> pd.DataFrame:
"""
三级分类链:
1. fund_type / invest_type (官方字段,最可靠)
2. benchmark (跟踪指数名称)
3. name (关键词兜底)
"""
self._log("\n" + "=" * 60)
self._log("Layer 3: 大类资产标签化 (官方字段优先)")
self._log("=" * 60)
def _name_has(text: str, keywords: list) -> bool:
"""text 中是否包含任一 keyword"""
t = text.lower()
return any(kw.lower() in t for kw in keywords)
def classify_row(row) -> str:
ft = str(row.get('fund_type', '') or '')
it = str(row.get('invest_type', '') or '')
bm = str(row.get('benchmark', '') or '')
name = str(row.get('name', '') or '')
combined = f"{name} {bm}" # 名称 + 跟踪指数拼接
# ---- 第1级: fund_type 硬判断 ----
if ft == 'REITs':
return 'REITs'
if ft == '货币市场型':
return '货币/现金'
if ft == '商品型':
return '商品'
# ---- 第2级: invest_type 细分 ----
if it in ('黄金现货合约', '白银期货型', '有色金属期货型',
'能源化工期货型', '豆粕期货型', '原油主题基金'):
return '商品'
# 债券型
if ft == '债券型':
return '债券'
# ---- 第3级: 商品类优先判断 (油气/石油/能源类本质是商品即使QDII包装) ----
if _name_has(combined, ['油气', '原油', '石油', '能源行业']):
return '商品'
# ---- 第4级: 地域判断 (从 benchmark + name) ----
# 港股
if _name_has(combined, _HK_KW):
return '港股'
# 美股
if _name_has(combined, _US_KW):
return '美股'
# 全球/其他
if _name_has(combined, _GLOBAL_KW):
return '全球/其他'
# ---- 第5级: A股内部细分 (fund_type=股票型/混合型) ----
if ft in ('股票型', '混合型') or it in ('被动指数型', '增强指数型'):
# 宽基指数
if _name_has(combined, _BROAD_KW):
return 'A股宽基'
# 主题策略
if _name_has(combined, _THEME_KW):
return 'A股主题'
# 剩余股票型默认为行业
return 'A股行业'
# ---- 兜底 ----
# 还有一些“另类投资型”等少数类别
if _name_has(name, ['日利', '添益', '货币']):
return '货币/现金'
if _name_has(name, ['', '短融', '利率']):
return '债券'
return '未分类'
df = df.copy()
df['asset_class'] = df.apply(classify_row, axis=1)
# 统计每类数量
class_counts = df['asset_class'].value_counts()
self._log("\n 分类结果:")
for cls, cnt in class_counts.items():
self._log(f" {cls}: {cnt}")
# 未分类检查
n_unclassified = (df['asset_class'] == '未分类').sum()
total = len(df)
coverage = (total - n_unclassified) / total * 100 if total > 0 else 0
self._log(f"\n 分类覆盖率: {coverage:.1f}% ({total - n_unclassified}/{total})")
if n_unclassified > 0:
self._log(f" 未分类 {n_unclassified} 只:")
unclassified = df[df['asset_class'] == '未分类'].nlargest(10, 'avg_daily_amount')
for _, row in unclassified.iterrows():
self._log(f" {row['ts_code']} {row['name']} "
f"[ft={row.get('fund_type','')}, it={row.get('invest_type','')}] "
f"(日均{row['avg_daily_amount']:.0f}万)")
return df
# ============================================================
# Layer 4: 类内预筛选
# ============================================================
@staticmethod
def _compute_enb(corr_matrix) -> float:
"""计算 Effective Number of Bets (Meucci 2009)
ENB = exp(- sum(p_i * ln(p_i))), p_i = λ_i / sum(λ)
"""
import numpy as np
eigenvalues = np.linalg.eigvalsh(corr_matrix.values)
eigenvalues = eigenvalues[eigenvalues > 1e-10] # 只取正特征值
p = eigenvalues / eigenvalues.sum()
return float(np.exp(-np.sum(p * np.log(p))))
def _compute_class_limits(self, df: pd.DataFrame) -> dict:
"""数据驱动的类内保留数量: max(min_per_class, round(class_ratio * budget))
budget = candidate_multiplier * ENB估计 (首次用 enb_fallback)
"""
class_counts = df['asset_class'].value_counts().to_dict()
total = sum(class_counts.get(c, 0) for c in ASSET_CLASSES)
if total == 0:
return {c: self.cfg['min_per_class'] for c in ASSET_CLASSES}
# 预估 budget
n_classes_present = sum(1 for c in ASSET_CLASSES if class_counts.get(c, 0) > 0)
enb_est = self.cfg.get('enb_fallback', 12)
budget = int(enb_est * self.cfg['candidate_multiplier'])
limits = {}
for cls in ASSET_CLASSES:
cnt = class_counts.get(cls, 0)
if cnt == 0:
limits[cls] = 0
continue
ratio = cnt / total
raw = ratio * budget
limits[cls] = min(cnt, max(self.cfg['min_per_class'], round(raw)))
self._log(f" 候选预算: budget={budget} (ENB估计={enb_est}, 倍数={self.cfg['candidate_multiplier']})")
self._log(f" 等比分配: {limits}")
return limits
def intra_class_select(self, df: pd.DataFrame) -> pd.DataFrame:
"""数据驱动类内预筛选: 按各类占比等比分配名额"""
self._log("\n" + "=" * 60)
self._log("Layer 4: 类内预筛选 (等比分配)")
self._log("=" * 60)
before = len(df)
limits = self._compute_class_limits(df)
selected = []
for cls_name in ASSET_CLASSES:
limit = limits.get(cls_name, 0)
if limit == 0:
continue
cls_df = df[df['asset_class'] == cls_name]
if cls_df.empty:
continue
top = cls_df.nlargest(limit, 'avg_daily_amount')
selected.append(top)
self._log(f" {cls_name}: {len(cls_df)} -> {len(top)}")
for _, row in top.iterrows():
self._log(f" {row['ts_code']} {row['name']} (日均{row['avg_daily_amount']:.0f}万)")
# 未分类中流动性特别好的保留少量
unclassified = df[df['asset_class'] == '未分类']
if not unclassified.empty:
top_unc = unclassified.nlargest(2, 'avg_daily_amount')
top_unc = top_unc[top_unc['avg_daily_amount'] >= self.cfg['min_daily_amount'] * 10]
if not top_unc.empty:
selected.append(top_unc)
self._log(f" 未分类(超高流动): {len(top_unc)}")
result = pd.concat(selected, ignore_index=True) if selected else pd.DataFrame()
self._log(f"\nLayer 4 结果: {before} -> {len(result)}")
return result
# ============================================================
# Layer 5: 相关性优化选择
# ============================================================
def correlation_optimize(self, df: pd.DataFrame) -> pd.DataFrame:
"""ENB驱动 + 贪心最大分散化选择"""
self._log("\n" + "=" * 60)
self._log("Layer 5: 相关性优化选择 (ENB驱动)")
self._log("=" * 60)
# 1. 获取收益率数据计算相关性
self._log(f" 获取{self.cfg['corr_lookback_days']}日收益率数据...")
corr_start = (self.ref_dt - timedelta(days=self.cfg['corr_lookback_days'] * 2)).strftime('%Y%m%d')
returns_dict = {}
for _, row in df.iterrows():
code = row['ts_code']
try:
if self.data_cache is not None:
# 缓存模式
daily = self.data_cache.load_cached_daily(code, self.ref_date)
if not daily.empty and len(daily) >= 60:
daily = daily[daily['trade_date'] >= corr_start]
daily = daily.sort_values('trade_date')
daily['ret'] = daily['close'].astype(float).pct_change()
returns_dict[code] = daily.set_index('trade_date')['ret'].tail(self.cfg['corr_lookback_days'])
else:
# 在线模式
daily = self._api_call(
self.pro.fund_daily,
ts_code=code,
start_date=corr_start,
end_date=self.ref_date,
fields='ts_code,trade_date,close'
)
if daily is not None and len(daily) >= 60:
daily = daily.sort_values('trade_date')
daily['ret'] = daily['close'].astype(float).pct_change()
returns_dict[code] = daily.set_index('trade_date')['ret'].tail(self.cfg['corr_lookback_days'])
except Exception:
pass
if len(returns_dict) < 5:
self._log(" 收益率数据不足,跳过相关性优化")
df = df.copy()
df['selected'] = True
return df
ret_df = pd.DataFrame(returns_dict).dropna(axis=1, thresh=60)
corr_matrix = ret_df.corr()
self._log(f" 有效相关性矩阵: {len(corr_matrix)} x {len(corr_matrix)}")
# 2. 确定目标池大小
n_select_cfg = self.cfg['n_select']
if n_select_cfg == 'auto':
# 用候选池相关性矩阵的 ENB 确定自然池大小
enb = self._compute_enb(corr_matrix)
n_select = max(6, min(int(round(enb)), len(corr_matrix)))
self._log(f" 候选池 ENB = {enb:.2f} -> 目标池大小 = {n_select}")
else:
n_select = int(n_select_cfg)
self._log(f" 固定目标池大小 = {n_select}")
if len(df) <= n_select:
self._log(f" 候选 {len(df)} <= 目标 {n_select},全部保留")
df = df.copy()
df['selected'] = True
return df
# 3. 贪心选择
available_codes = set(corr_matrix.columns) & set(df['ts_code'].values)
df_indexed = df.set_index('ts_code')
# Step A: 每个大类先选入流动性最好的1只确保覆盖
selected = []
for cls_name in ASSET_CLASSES:
cls_codes = df_indexed[df_indexed['asset_class'] == cls_name].index
cls_available = [c for c in cls_codes if c in available_codes]
if cls_available:
# 按流动性排序
best = max(cls_available, key=lambda c: df_indexed.loc[c, 'avg_daily_amount'])
selected.append(best)
available_codes.discard(best)
self._log(f" 类别覆盖: 已选 {len(selected)}")
# Step B: 贪心填充剩余名额
remaining = n_select - len(selected)
candidates = list(available_codes)
for _ in range(remaining):
if not candidates:
break
best_candidate = None
best_max_corr = 2.0 # 越小越好
for c in candidates:
if c not in corr_matrix.columns:
continue
# 计算与已选集合的最大相关系数
if selected:
selected_in_corr = [s for s in selected if s in corr_matrix.columns]
if selected_in_corr:
max_corr = corr_matrix.loc[c, selected_in_corr].abs().max()
else:
max_corr = 0
else:
max_corr = 0
if max_corr < best_max_corr:
best_max_corr = max_corr
best_candidate = c
if best_candidate is None:
break
# 检查相关系数阈值
if best_max_corr > self.cfg['max_corr']:
self._log(f" 剩余候选相关性均>{self.cfg['max_corr']:.2f},停止选择")
break
selected.append(best_candidate)
candidates.remove(best_candidate)
# 检查 A股行业占比约束
selected_df = df_indexed.loc[[s for s in selected if s in df_indexed.index]]
equity_count = (selected_df['asset_class'] == 'A股行业').sum()
total_count = len(selected_df)
if total_count > 0 and equity_count / total_count > self.cfg['max_equity_ratio']:
self._log(f" A股行业占比 {equity_count}/{total_count} 超限,需裁剪")
# 从A股行业中移除相关性最高的
equity_codes = selected_df[selected_df['asset_class'] == 'A股行业'].index.tolist()
max_equity = int(total_count * self.cfg['max_equity_ratio'])
while len(equity_codes) > max_equity:
# 找出与其他A股行业相关性最高的
worst = None
worst_avg_corr = -1
for ec in equity_codes:
others = [c for c in equity_codes if c != ec and c in corr_matrix.columns]
if others and ec in corr_matrix.columns:
avg_corr = corr_matrix.loc[ec, others].abs().mean()
if avg_corr > worst_avg_corr:
worst_avg_corr = avg_corr
worst = ec
if worst:
selected.remove(worst)
equity_codes.remove(worst)
self._log(f" 移除高相关A股行业: {worst}")
else:
break
# 3. 标记结果
df = df.copy()
df['selected'] = df['ts_code'].isin(selected)
self._log(f"\nLayer 5 最终选出: {df['selected'].sum()}")
final = df[df['selected']].copy()
for _, row in final.iterrows():
self._log(f" {row['ts_code']} {row['name']} [{row['asset_class']}] 日均{row['avg_daily_amount']:.0f}")
# 保存相关性矩阵
final_codes = [c for c in final['ts_code'] if c in corr_matrix.columns]
if final_codes:
final_corr = corr_matrix.loc[final_codes, final_codes]
corr_path = self.output_dir / f'corr_matrix_{self.ref_date}.csv'
final_corr.to_csv(corr_path, float_format='%.3f')
self._log(f"\n 相关性矩阵已保存: {corr_path}")
return df
# ============================================================
# 保存结果
# ============================================================
def save_results(self, df: pd.DataFrame):
"""保存筛选结果和日志"""
# 保存最终池
final = df[df['selected'] == True].copy()
cols = ['ts_code', 'name', 'asset_class', 'avg_daily_amount']
cols = [c for c in cols if c in final.columns]
universe_path = self.output_dir / f'universe_{self.ref_date}.csv'
final[cols].to_csv(universe_path, index=False, encoding='utf-8-sig')
self._log(f"\n最终ETF池已保存: {universe_path}")
# 保存 latest 软链接/副本
latest_path = self.output_dir / 'universe_latest.csv'
final[cols].to_csv(latest_path, index=False, encoding='utf-8-sig')
# 保存管线日志
log_path = self.output_dir / f'pipeline_log_{self.ref_date}.txt'
with open(log_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(self._log_lines))
self._log(f"管线日志已保存: {log_path}")
# 打印最终汇总
self._log("\n" + "=" * 60)
self._log("筛选完成!")
self._log("=" * 60)
self._log(f"最终池: {len(final)} 只ETF")
class_dist = final['asset_class'].value_counts()
for cls, cnt in class_dist.items():
self._log(f" {cls}: {cnt}")
# ============================================================
# 主运行入口
# ============================================================
def run(self) -> pd.DataFrame:
"""执行完整筛选管线"""
self._log(f"参考日期: {self.ref_date}")
self._log(f"配置: {self.cfg}")
raw = self.fetch_etf_universe() # Layer 0
filtered = self.basic_filter(raw) # Layer 1
deduped = self.dedup_by_index(filtered) # Layer 2
labeled = self.label_asset_class(deduped) # Layer 3
shortlist = self.intra_class_select(labeled) # Layer 4
final = self.correlation_optimize(shortlist) # Layer 5
self.save_results(final)
return final
# ============================================================
# 便捷函数:供动量策略回测调用
# ============================================================
def build_universe(ref_date: str = None, config: dict = None, data_cache=None) -> dict:
"""
构建ETF池并返回 {ts_code: name} 字典,可直接用于动量策略 CONFIG['etf_pool']
Args:
ref_date: 参考日期 YYYYMMDD
config: 覆盖默认配置
data_cache: ETFDataCache 实例(缓存模式,无前视偏差)
Returns:
dict: {ts_code: name}
"""
builder = ETFUniverseBuilder(config=config, ref_date=ref_date, data_cache=data_cache)
result = builder.run()
final = result[result['selected'] == True]
return dict(zip(final['ts_code'], final['name']))
def load_latest_universe() -> dict:
"""
加载最近一次构建的ETF池
Returns:
dict: {ts_code: name}
"""
latest_path = Path(__file__).parent.parent / 'data' / 'etf_universe' / 'universe_latest.csv'
if not latest_path.exists():
raise FileNotFoundError(f"未找到ETF池文件: {latest_path}\n请先运行 build_etf_universe.py")
df = pd.read_csv(latest_path)
return dict(zip(df['ts_code'], df['name']))
# ============================================================
# CLI 入口
# ============================================================
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='动态ETF池筛选引擎')
parser.add_argument('--date', type=str, default=None,
help='参考日期 YYYYMMDD (默认: 当天)')
parser.add_argument('--n-select', type=str, default='auto',
help='最终池大小: auto=ENB驱动, 或整数 (默认: auto)')
parser.add_argument('--min-amount', type=float, default=5000,
help='最低日均成交额(万) (默认: 5000)')
args = parser.parse_args()
cfg = {
'n_select': args.n_select if args.n_select == 'auto' else int(args.n_select),
'min_daily_amount': args.min_amount,
}
builder = ETFUniverseBuilder(config=cfg, ref_date=args.date)
builder.run()

280
scripts/etf_data_cache.py Normal file
View File

@@ -0,0 +1,280 @@
"""
ETF 全量历史数据本地缓存
========================
一次性下载全市场 ETF含已退市的基础信息和日线数据到本地
供回测中按 ref_date 截取历史数据,消除前视偏差。
用法:
# 首次下载(约 30-60 分钟,取决于 API 限流)
python scripts/etf_data_cache.py
# 增量更新(只下载缺失的新数据)
python scripts/etf_data_cache.py --update
"""
import os
import sys
import time
import logging
from pathlib import Path
from datetime import datetime
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
import tushare as ts
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
# 缓存目录
CACHE_DIR = Path(__file__).parent.parent / 'data' / 'etf_cache'
DAILY_DIR = CACHE_DIR / 'daily'
BASIC_PATH = CACHE_DIR / 'fund_basic.csv'
class ETFDataCache:
"""ETF 全量历史数据缓存管理器"""
def __init__(self):
self.pro = ts.pro_api(os.getenv('TUSHARE_TOKEN'))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
DAILY_DIR.mkdir(parents=True, exist_ok=True)
self._basic_df = None # 懒加载
# ----------------------------------------------------------
# API 调用(带重试 + 限流)
# ----------------------------------------------------------
def _api_call(self, func, **kwargs):
for attempt in range(3):
try:
result = func(**kwargs)
time.sleep(0.35)
return result
except Exception as e:
if attempt < 2:
wait = 2 * (attempt + 1)
logger.warning(f" API 重试 ({attempt+1}/3): {e}, 等待 {wait}s")
time.sleep(wait)
else:
raise
# ----------------------------------------------------------
# 1. 下载并缓存 fund_basic
# ----------------------------------------------------------
def download_basic(self, force: bool = False):
"""下载全量 ETF 基础信息(含已退市)"""
if BASIC_PATH.exists() and not force:
logger.info(f"fund_basic 缓存已存在: {BASIC_PATH}")
return
logger.info("下载全量 ETF 基础信息...")
fields = 'ts_code,name,management,list_date,delist_date,fund_type,invest_type,benchmark,type,trustee,status'
dfs = []
for status in ['L', 'D']: # L=上市, D=已退市
df = self._api_call(self.pro.fund_basic, market='E', status=status, fields=fields)
if df is not None and not df.empty:
dfs.append(df)
logger.info(f" status={status}: {len(df)}")
if not dfs:
raise RuntimeError("获取 ETF 列表失败")
basic = pd.concat(dfs, ignore_index=True).drop_duplicates(subset='ts_code')
basic.to_csv(BASIC_PATH, index=False, encoding='utf-8-sig')
logger.info(f"fund_basic 已保存: {len(basic)} 只 -> {BASIC_PATH}")
# ----------------------------------------------------------
# 2. 批量下载日线数据
# ----------------------------------------------------------
def download_daily(self, force: bool = False):
"""批量下载所有 ETF 的全历史日线数据"""
basic = self.load_basic()
codes = basic['ts_code'].tolist()
total = len(codes)
logger.info(f"准备下载 {total} 只 ETF 的日线数据...")
downloaded = 0
skipped = 0
failed = 0
for i, code in enumerate(codes):
csv_path = DAILY_DIR / f"{code}.csv"
if csv_path.exists() and not force:
# 增量更新: 读取已有数据的最后日期
try:
existing = pd.read_csv(csv_path, nrows=1) # 只读首行检查
if not existing.empty:
skipped += 1
continue
except Exception:
pass
if (i - skipped) % 20 == 0:
logger.info(f" 进度: {i}/{total} (下载={downloaded}, 跳过={skipped}, 失败={failed})")
try:
df = self._api_call(
self.pro.fund_daily,
ts_code=code,
fields='ts_code,trade_date,open,high,low,close,vol,amount'
)
if df is not None and not df.empty:
df = df.sort_values('trade_date')
df.to_csv(csv_path, index=False)
downloaded += 1
else:
failed += 1
except Exception as e:
logger.warning(f" {code} 下载失败: {e}")
failed += 1
logger.info(f"日线数据下载完成: 下载={downloaded}, 跳过={skipped}, 失败={failed}")
def update_daily(self):
"""增量更新: 只为已有缓存文件追加新数据"""
basic = self.load_basic()
codes = basic['ts_code'].tolist()
today_str = datetime.now().strftime('%Y%m%d')
updated = 0
for code in codes:
csv_path = DAILY_DIR / f"{code}.csv"
if not csv_path.exists():
continue
try:
existing = pd.read_csv(csv_path)
if existing.empty:
continue
last_date = str(existing['trade_date'].max())
if last_date >= today_str:
continue
# 下载 last_date 之后的数据
new_df = self._api_call(
self.pro.fund_daily,
ts_code=code,
start_date=str(int(last_date) + 1),
end_date=today_str,
fields='ts_code,trade_date,open,high,low,close,vol,amount'
)
if new_df is not None and not new_df.empty:
combined = pd.concat([existing, new_df], ignore_index=True)
combined = combined.drop_duplicates(subset='trade_date').sort_values('trade_date')
combined.to_csv(csv_path, index=False)
updated += 1
except Exception:
pass
logger.info(f"增量更新完成: {updated} 只有新数据")
# ----------------------------------------------------------
# 3. 数据读取接口(回测用)
# ----------------------------------------------------------
def load_basic(self) -> pd.DataFrame:
"""加载 fund_basic 缓存"""
if self._basic_df is not None:
return self._basic_df
if not BASIC_PATH.exists():
raise FileNotFoundError(f"fund_basic 缓存不存在,请先运行: python scripts/etf_data_cache.py")
self._basic_df = pd.read_csv(BASIC_PATH)
return self._basic_df
def load_cached_daily(self, ts_code: str, end_date: str = None) -> pd.DataFrame:
"""
加载某只 ETF 的日线数据,截至 end_date
Args:
ts_code: ETF 代码
end_date: 截止日期 YYYYMMDDNone 表示全部
Returns:
DataFrame with columns [trade_date, open, high, low, close, vol, amount]
按 trade_date 升序排列
"""
csv_path = DAILY_DIR / f"{ts_code}.csv"
if not csv_path.exists():
return pd.DataFrame()
df = pd.read_csv(csv_path)
if df.empty:
return df
df['trade_date'] = df['trade_date'].astype(str)
df = df.sort_values('trade_date')
if end_date:
end_str = str(end_date).replace('-', '')
df = df[df['trade_date'] <= end_str]
return df
def load_cached_daily_as_series(self, ts_code: str, end_date: str = None,
column: str = 'close') -> pd.Series:
"""加载某只 ETF 的单列数据index 为 datetime"""
df = self.load_cached_daily(ts_code, end_date)
if df.empty:
return pd.Series(dtype=float)
df['date'] = pd.to_datetime(df['trade_date'])
return df.set_index('date')[column].astype(float)
def load_cached_ohlcv(self, ts_code: str, end_date: str = None) -> pd.DataFrame:
"""加载 OHLCV 数据index 为 datetime与 动量.py 的 all_data 格式兼容)"""
df = self.load_cached_daily(ts_code, end_date)
if df.empty:
return pd.DataFrame()
df['date'] = pd.to_datetime(df['trade_date'])
df = df.set_index('date').sort_index()
df = df.rename(columns={'vol': 'volume'})
return df[['open', 'high', 'low', 'close', 'volume']].astype(float)
def ensure_downloaded(self):
"""确保基础信息和日线数据都已下载"""
self.download_basic()
self.download_daily()
def get_available_codes_at(self, ref_date: str) -> list:
"""获取在 ref_date 时已上市且未退市的 ETF 代码列表"""
basic = self.load_basic()
basic['list_date'] = basic['list_date'].astype(str)
mask = basic['list_date'] <= ref_date
# 排除在 ref_date 之前已退市的
if 'delist_date' in basic.columns:
delist = basic['delist_date'].astype(str).fillna('99991231')
mask = mask & (delist > ref_date)
return basic[mask]['ts_code'].tolist()
# ----------------------------------------------------------
# CLI
# ----------------------------------------------------------
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='ETF 全量历史数据缓存下载')
parser.add_argument('--update', action='store_true', help='增量更新已有缓存')
parser.add_argument('--force', action='store_true', help='强制重新下载全部')
args = parser.parse_args()
cache = ETFDataCache()
if args.update:
cache.download_basic(force=True)
cache.update_daily()
else:
cache.download_basic(force=args.force)
cache.download_daily(force=args.force)
# 统计
basic = cache.load_basic()
n_daily = len(list(DAILY_DIR.glob('*.csv')))
logger.info(f"\n缓存统计: fund_basic={len(basic)} 只, 日线文件={n_daily}")

View File

@@ -0,0 +1,399 @@
"""
动量策略多持仓对比实验
对比 6 种配置: 全仓1只 / 等权3只 / 反波动率3只 / 等权5只 / 反波动率5只 / 动量>0全选等权
支持 dynamic 模式: 回测中定期重建ETF池消除前视偏差
"""
import sys
import math
import warnings
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
# ==================== 复用动量.py的核心函数 ====================
from 动量 import (
fetch_all_etf_data,
fetch_etf_nav_data,
calc_atr,
calc_weighted_momentum_score,
apply_crash_filter,
calc_premium_rate,
resolve_etf_pool,
)
# ==================== 权重计算 ====================
def calc_equal_weights(codes: list) -> dict:
"""等权"""
w = 1.0 / len(codes)
return {c: w for c in codes}
def calc_inv_vol_weights(codes: list, all_data: dict, today, lookback: int = 20) -> dict:
"""反波动率加权: 权重 ∝ 1/σ"""
vols = {}
for c in codes:
if c not in all_data:
continue
df = all_data[c]
hist = df[df.index <= today].tail(lookback + 1)
if len(hist) < 10:
vols[c] = 1.0 # fallback
continue
ret = hist['close'].pct_change().dropna()
vol = ret.std()
vols[c] = vol if vol > 0 else 1e-6
if not vols:
return calc_equal_weights(codes)
inv_vols = {c: 1.0 / v for c, v in vols.items()}
total = sum(inv_vols.values())
return {c: iv / total for c, iv in inv_vols.items()}
# ==================== 多持仓回测引擎 ====================
def run_multi_backtest(config: dict, all_data: dict, nav_data: dict,
trade_dates: list, etf_codes: list,
target_num: int = 1, weight_mode: str = 'equal',
label: str = '',
data_cache=None, rebuild_interval: int = 0) -> dict:
"""
多持仓回测
Args:
target_num: 同时持有数量
weight_mode: 'equal' 等权 | 'inv_vol' 反波动率
label: 实验标签
data_cache: ETFDataCache 实例(动态重建模式)
rebuild_interval: 重建间隔(交易日)0=不重建
Returns:
dict: 绩效指标
"""
max_lookback = config['max_days'] + 10
holdings = {} # {code: weight}
daily_returns = []
n_trades = 0
last_rebuild_i = -rebuild_interval if rebuild_interval > 0 else 0
current_codes = list(etf_codes) # 当前活跃的候选池
for i, today in enumerate(trade_dates):
# 动态重建 ETF 池
if rebuild_interval > 0 and data_cache is not None and (i - last_rebuild_i >= rebuild_interval):
ref_str = today.strftime('%Y%m%d')
try:
new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache)
current_codes = list(new_pool.keys())
# 加载新增 ETF 数据
for code in current_codes:
if code not in all_data:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
last_rebuild_i = i
except Exception:
pass
# 1. 计算每只 ETF 的得分 (使用当前活跃池)
scores = {}
for code in current_codes:
if code not in all_data:
continue
df = all_data[code]
hist = df[df.index <= today].tail(max_lookback + 1)
if len(hist) < config['min_days']:
continue
close_arr = hist['close'].values
if config['auto_day']:
if len(hist) < max_lookback:
lookback = config['fixed_days']
else:
long_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['max_days'])
short_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['min_days'])
la = long_atr.iloc[-1]
sa = short_atr.iloc[-1]
if la > 0 and not np.isnan(la) and not np.isnan(sa):
ratio = min(0.9, sa / la)
lookback = int(config['min_days'] +
(config['max_days'] - config['min_days']) * (1 - ratio))
else:
lookback = config['fixed_days']
prices = close_arr[-lookback:]
else:
prices = close_arr[-config['fixed_days']:]
if len(prices) < 5:
continue
result = calc_weighted_momentum_score(prices)
score = result['score']
score = apply_crash_filter(close_arr, score)
if code in nav_data:
nav_df = nav_data[code]
nav_row = nav_df[nav_df.index <= today]
if not nav_row.empty:
nav_val = nav_row.iloc[-1]['nav']
etf_price = close_arr[-1]
premium = calc_premium_rate(etf_price, nav_val)
if premium >= config['premium_threshold']:
score -= 1
if 0 < score < 6:
scores[code] = score
# 2. 选出 top N (或全部正动量)
if scores:
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
if target_num == 'all_positive':
targets = [c for c, s in ranked] # scores 已过滤 >0
else:
targets = [c for c, _ in ranked[:target_num]]
else:
targets = []
# 3. 计算权重
if targets:
if weight_mode == 'inv_vol':
new_weights = calc_inv_vol_weights(targets, all_data, today)
else:
new_weights = calc_equal_weights(targets)
else:
new_weights = {}
# 4. 计算当日组合收益
port_ret = 0.0
for code, weight in holdings.items():
if code not in all_data:
continue
df_h = all_data[code]
if today in df_h.index:
prev_dates = df_h[df_h.index < today].index
if len(prev_dates) > 0:
prev_price = df_h.loc[prev_dates[-1], 'close']
today_price = df_h.loc[today, 'close']
port_ret += weight * (today_price / prev_price - 1)
# 5. 调仓判断
old_set = set(holdings.keys())
new_set = set(new_weights.keys())
if old_set != new_set:
# 换手成本: 按换手比例收取
turnover = 0.0
for c in old_set - new_set:
turnover += holdings[c]
for c in new_set - old_set:
turnover += new_weights[c]
for c in old_set & new_set:
turnover += abs(new_weights[c] - holdings[c])
trade_cost = turnover * config['trade_cost'] / 2 # 单边已含在trade_cost中
n_trades += 1
else:
trade_cost = 0.0
holdings = new_weights
daily_returns.append({
'date': today,
'daily_return': port_ret - trade_cost,
})
# 计算绩效
result_df = pd.DataFrame(daily_returns).set_index('date')
result_df['nav'] = (1 + result_df['daily_return']).cumprod()
nav = result_df['nav']
total_return = nav.iloc[-1] / nav.iloc[0] - 1
days = (result_df.index[-1] - result_df.index[0]).days
cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
daily_rets = result_df['daily_return']
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
peak = nav.cummax()
drawdown = (nav - peak) / peak
max_dd = drawdown.min()
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0
years = days / 365
# 年度统计
win_years = 0
total_years = 0
for year, group in result_df.groupby(result_df.index.year):
yr = group['nav']
yr_ret = yr.iloc[-1] / yr.iloc[0] - 1
total_years += 1
if yr_ret > 0:
win_years += 1
return {
'label': label,
'target_num': target_num,
'weight_mode': weight_mode,
'total_return': total_return,
'cagr': cagr,
'sharpe': sharpe,
'max_dd': max_dd,
'calmar': calmar,
'win_rate': win_rate,
'n_trades': n_trades,
'trades_per_year': n_trades / years if years > 0 else 0,
'win_years': f"{win_years}/{total_years}",
'result_df': result_df,
}
# ==================== 主函数 ====================
def main():
from 动量 import CONFIG
config = CONFIG.copy()
# 强制使用 dynamic 模式
config['etf_pool'] = 'dynamic'
rebuild_interval = config.get('rebuild_interval', 60)
# 初始化缓存
from scripts.etf_data_cache import ETFDataCache
data_cache = ETFDataCache()
# 用 start_date 作为初始重建日期
init_ref_date = config['start_date'].replace('-', '')
etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache)
etf_codes = list(etf_pool.keys())
end_date = datetime.now().strftime('%Y-%m-%d')
print("=" * 70)
print(" 动量策略多持仓对比实验 (动态重建模式, 无前视偏差)")
print("=" * 70)
print(f" 初始ETF池 ({init_ref_date}): {len(etf_codes)}")
for code, name in etf_pool.items():
print(f" {code} {name}")
print(f" 回测区间: {config['start_date']} ~ {end_date}")
print(f" 重建间隔: {rebuild_interval} 交易日")
# 从缓存加载数据
print(f"\n{'='*70}")
print("从本地缓存加载数据...")
all_data = {}
# 加载所有可能用到的 ETF 数据 (初始池 + 后续可能加入的)
for code in etf_codes:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
nav_data = {} # 动态模式下不使用净值数据
print(f"价格数据: {len(all_data)}")
# 构建交易日历
all_dates = set()
for df in all_data.values():
all_dates.update(df.index.tolist())
trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date']))
print(f"交易日: {len(trade_dates)}")
# 6 组实验
experiments = [
{'target_num': 1, 'weight_mode': 'equal', 'label': 'A: 全仓1只'},
{'target_num': 3, 'weight_mode': 'equal', 'label': 'B: 等权3只'},
{'target_num': 3, 'weight_mode': 'inv_vol', 'label': 'C: 反波动率3只'},
{'target_num': 5, 'weight_mode': 'equal', 'label': 'D: 等权5只'},
{'target_num': 5, 'weight_mode': 'inv_vol', 'label': 'E: 反波动率5只'},
{'target_num': 'all_positive', 'weight_mode': 'equal', 'label': 'F: 动量>0全选等权'},
]
results = []
for exp in experiments:
print(f"\n{''*70}")
print(f" 运行: {exp['label']}...")
r = run_multi_backtest(
config, all_data, nav_data, trade_dates, etf_codes,
target_num=exp['target_num'],
weight_mode=exp['weight_mode'],
label=exp['label'],
data_cache=data_cache,
rebuild_interval=rebuild_interval,
)
results.append(r)
print(f" 完成: CAGR={r['cagr']:.2%}, MaxDD={r['max_dd']:.2%}, Sharpe={r['sharpe']:.2f}")
# 输出对比表
print(f"\n\n{'='*100}")
print(f"{'':>20s} 动量策略多持仓对比实验结果")
print(f"{'='*100}")
print(f" {'实验':<18s} {'累计收益':>10s} {'CAGR':>8s} {'夏普':>6s} {'最大回撤':>8s} {'Calmar':>8s} {'日胜率':>7s} {'调仓次':>6s} {'年调仓':>6s} {'盈利年':>7s}")
print(f"{''*100}")
for r in results:
print(f" {r['label']:<16s} {r['total_return']:>9.2%} {r['cagr']:>7.2%} {r['sharpe']:>6.2f} "
f"{r['max_dd']:>8.2%} {r['calmar']:>7.2f} {r['win_rate']:>6.2%} "
f"{r['n_trades']:>5d} {r['trades_per_year']:>6.1f} {r['win_years']:>7s}")
print(f"{'='*100}")
# 找出最优
best_sharpe = max(results, key=lambda x: x['sharpe'])
best_calmar = max(results, key=lambda x: x['calmar'])
best_cagr = max(results, key=lambda x: x['cagr'])
print(f"\n 最高夏普: {best_sharpe['label']} (Sharpe={best_sharpe['sharpe']:.2f})")
print(f" 最高Calmar: {best_calmar['label']} (Calmar={best_calmar['calmar']:.2f})")
print(f" 最高CAGR: {best_cagr['label']} (CAGR={best_cagr['cagr']:.2%})")
# 保存图表
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), height_ratios=[3, 1],
gridspec_kw={'hspace': 0.3})
colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6']
for r, color in zip(results, colors):
nav = r['result_df']['nav']
ax1.plot(nav.index, nav, label=r['label'], linewidth=1.2, color=color)
ax1.set_title('动量策略多持仓对比 - 净值曲线', fontsize=14, fontweight='bold')
ax1.legend(loc='upper left', fontsize=10)
ax1.grid(True, alpha=0.3)
ax1.set_ylabel('净值')
ax1.set_yscale('log')
# 回撤
for r, color in zip(results, colors):
nav = r['result_df']['nav']
peak = nav.cummax()
dd = (nav - peak) / peak
ax2.plot(dd.index, dd, label=r['label'], linewidth=0.8, color=color, alpha=0.7)
ax2.set_title('回撤对比', fontsize=12)
ax2.set_ylabel('回撤')
ax2.grid(True, alpha=0.3)
ax2.legend(loc='lower left', fontsize=8)
chart_path = Path(__file__).parent.parent / 'results' / 'momentum_multi_experiment.png'
chart_path.parent.mkdir(exist_ok=True)
fig.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close(fig)
print(f"\n 对比图表已保存: {chart_path}")
except Exception as e:
print(f"\n 图表生成失败: {e}")
if __name__ == '__main__':
main()