From 2829f80427f7172239fe2e2f92f1af3c39ecb1a2 Mon Sep 17 00:00:00 2001 From: aszerW Date: Wed, 29 Apr 2026 22:15:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(backtest):=20=E6=B6=88=E9=99=A4=E5=89=8D?= =?UTF-8?q?=E8=A7=86=E5=81=8F=E5=B7=AE=EF=BC=8C=E5=AE=9E=E7=8E=B0=E5=8A=A8?= =?UTF-8?q?=E6=80=81ETF=E6=B1=A0=E9=87=8D=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 消除回测前视偏差(Look-Ahead Bias): - 新增 ETFDataCache 本地缓存系统,预下载全量ETF(含已退市)基础信息和日线数据 - 改造 ETFUniverseBuilder 支持纯历史模式,每个时间点只使用当时可获得的数据 - 动量.py 新增 dynamic 模式,回测中每60交易日动态重建ETF候选池 - momentum_experiment.py 同步支持动态重建 - 新增 ETF筛选引擎文档和动态池方案文档 无前视偏差实验结果(6组对比,2015-2026): A: 全仓1只 CAGR=3.32%, MaxDD=-63.19%, Sharpe=0.26 B: 等权3只 CAGR=3.40%, MaxDD=-49.72%, Sharpe=0.30 ← 最优 C: 反波动率3只 CAGR=1.73%, MaxDD=-38.59%, Sharpe=0.21 D: 等权5只 CAGR=2.77%, MaxDD=-42.39%, Sharpe=0.29 E: 反波动率5只 CAGR=-0.37%, MaxDD=-19.56%, Sharpe=-0.03 F: 动量>0全选等权 CAGR=2.02%, MaxDD=-43.27%, Sharpe=0.24 最优方案: B(等权3只)夏普、Calmar、CAGR三项均最高 --- docs/etf_rotation_framework.md | 300 +++++++++++ docs/动态ETF池筛选引擎-调研与方案.md | 247 +++++++++ scripts/build_etf_universe.py | 744 +++++++++++++++++++++++++++ scripts/etf_data_cache.py | 280 ++++++++++ scripts/momentum_experiment.py | 399 ++++++++++++++ 动量.py | 627 ++++++++++++++++++++++ 6 files changed, 2597 insertions(+) create mode 100644 docs/etf_rotation_framework.md create mode 100644 docs/动态ETF池筛选引擎-调研与方案.md create mode 100644 scripts/build_etf_universe.py create mode 100644 scripts/etf_data_cache.py create mode 100644 scripts/momentum_experiment.py create mode 100644 动量.py diff --git a/docs/etf_rotation_framework.md b/docs/etf_rotation_framework.md new file mode 100644 index 0000000..554e66a --- /dev/null +++ b/docs/etf_rotation_framework.md @@ -0,0 +1,300 @@ +# ETF 动量轮动策略:筛选框架与多持仓实验分析 + +## 1. 系统概览 + +本系统从全市场 ETF 中,通过 **5 层漏斗** 筛选出低相关、高流动性、覆盖多资产类别的候选池,再以动量因子选出 Top-N 等权持仓。 + +``` +全市场 ETF (~900+) + │ Layer 0: 拉取全量基础信息 + ▼ + │ Layer 1: 上市≥1年 + 排除货币/杠杆 + 日均额≥5000万 + ▼ (~200+) + │ Layer 2: 同一跟踪指数去重 (保留流动性最优) + ▼ (~220) + │ Layer 3: 三级分类链 → 10 大类资产标签 + ▼ + │ Layer 4: 类内等比分配预筛 (ENB × 3 预算) + ▼ (~36) + │ Layer 5: 相关性矩阵 + ENB → 贪心最大分散选择 + ▼ (9 只, ENB 驱动) + 候选池 → 动量打分 → Top-3 等权持仓 +``` + +--- + +## 2. 五层漏斗详解 + +### 2.1 Layer 0 — 全量拉取 + +通过 Tushare `fund_basic` 获取全量上市 ETF,字段包括: + +| 字段 | 用途 | +|------|------| +| `ts_code`, `name` | 标识 | +| `fund_type` | 一级分类 (股票型/债券型/商品型/REITs/货币市场型) | +| `invest_type` | 二级分类 (黄金现货合约/白银期货型/被动指数型…) | +| `benchmark` | 跟踪指数名称 (用于地域+行业判断) | +| `list_date` | 上市日期 | + +### 2.2 Layer 1 — 基础过滤 + +| 条件 | 阈值 | 目的 | +|------|------|------| +| 上市时间 | ≥ 365 天 | 排除新基金,确保有足够历史数据 | +| 基金类型 | 排除货币型 | 不参与轮动 | +| 名称过滤 | 排除杠杆/反向/分级 | 避免衍生品风险 | +| 日均成交额 | ≥ 5000 万元 (60 日均值) | 保证流动性 | + +### 2.3 Layer 2 — 同指数去重 + +从 ETF 名称提取核心指数名(去除基金公司前缀和 ETF/LOF/联接等后缀),同一指数只保留日均成交额最大的一只。 + +### 2.4 Layer 3 — 三级分类链 + +**核心创新**:不依赖纯关键词匹配,而是利用官方字段构建优先级分类链,覆盖率 100%。 + +``` +classify(row): + ┌─ 第1级: fund_type 硬判断 + │ REITs → REITs + │ 货币市场型 → 货币/现金 + │ 商品型 → 商品 + │ + ├─ 第2级: invest_type 细分 + │ 黄金现货合约 / 白银期货型 / 有色金属期货型 + │ 能源化工期货型 / 豆粕期货型 / 原油主题基金 → 商品 + │ (fund_type=债券型) → 债券 + │ + ├─ 第3级: 商品名称优先 (防止 QDII 油气被误分到美股) + │ name/benchmark 含 油气/原油/石油/能源行业 → 商品 + │ + ├─ 第4级: 地域判断 (benchmark + name) + │ 恒生/港股/H股 → 港股 + │ 纳斯达克/标普500/道琼斯 → 美股 + │ 日经/德国/越南/印度/全球 → 全球/其他 + │ + ├─ 第5级: A股内部细分 + │ 沪深300/中证500/创业板… → A股宽基 + │ 红利/央企/ESG/AI… → A股主题 + │ 其余股票型/混合型 → A股行业 + │ + └─ 兜底: 货币/债券关键词 → 对应类别 + 其他 → 未分类 +``` + +**最终 10 大类**:A股宽基、A股行业、A股主题、港股、美股、全球/其他、商品、债券、REITs、货币/现金 + +### 2.5 Layer 4 — 类内等比分配 + +不使用固定的 `INTRA_CLASS_LIMITS`,改为数据驱动的等比分配: + +$$ +\text{budget} = \text{ENB}_{\text{fallback}} \times \text{candidate\_multiplier} +$$ + +$$ +\text{limit}_i = \min\bigl(n_i,\; \max(\text{min\_per\_class},\; \lfloor r_i \times \text{budget} \rceil)\bigr) +$$ + +其中 $r_i = n_i / \sum n_j$ 为第 $i$ 类在筛选后样本中的占比,$n_i$ 为该类可选 ETF 数量。 + +默认参数:`ENB_fallback=12, candidate_multiplier=3.0, min_per_class=2`,总预算约 36 只。 + +每类选取日均成交额最高的 `limit_i` 只。 + +### 2.6 Layer 5 — ENB 驱动 + 贪心选择 + +1. **计算相关性矩阵**:使用 120 个交易日收益率 +2. **确定目标池大小**: + +$$ +\text{ENB} = \exp\!\Bigl(-\sum_{i=1}^{d} p_i \ln p_i\Bigr), \quad p_i = \frac{\lambda_i}{\sum_j \lambda_j} +$$ + +其中 $\lambda_i$ 为相关性矩阵的特征值 (Meucci 2009)。ENB 衡量的是候选池中**独立风险因子**的有效数量。 + +3. **贪心选择**: + - Step A:每个大类先选入流动性最好的 1 只(确保覆盖) + - Step B:从剩余候选中贪心选取与已选集合最大相关系数最小的 ETF + - 约束:最大相关系数 ≤ 0.85;A股行业占比 ≤ 50% + +--- + +## 3. 多持仓对比实验 + +### 3.1 实验设计 + +从 Layer 5 输出的 9 只候选池(ENB 驱动版本)出发,使用动量策略打分,比较不同持仓数量和权重方案。 + +**动量打分**:自适应回看窗口 + 加权动量得分 + 崩溃过滤器 + 溢价率惩罚。得分在 (0, 6) 区间内视为有效正动量。 + +**6 组实验**: + +| 编号 | 持仓数 | 权重方案 | 说明 | +|------|--------|---------|------| +| A | 1 | — | 全仓 1 只(基准) | +| B | 3 | 等权 (1/N) | 每只 33.3% | +| C | 3 | 反波动率 | 权重 ∝ 1/σ | +| D | 5 | 等权 | 每只 20% | +| E | 5 | 反波动率 | 权重 ∝ 1/σ | +| F | 全部正动量 | 等权 | 所有得分>0 的 ETF 等权 | + +**反波动率权重公式**: + +$$ +w_i = \frac{1/\sigma_i}{\sum_{j=1}^{N} 1/\sigma_j} +$$ + +$\sigma_i$ 为过去 20 个交易日的日收益率标准差。 + +### 3.2 实验结果 + +| 实验 | CAGR | 夏普比率 | 最大回撤 | Calmar | 盈利年 | +|------|------|---------|---------|--------|-------| +| A: 全仓1只 | 20.41% | 1.01 | -29.65% | 0.69 | 8/12 | +| **B: 等权3只** | **15.11%** | **1.23** | **-17.96%** | **0.84** | **10/12** | +| C: 反波动率3只 | 10.03% | 1.09 | -12.37% | 0.81 | 9/12 | +| D: 等权5只 | 11.41% | 1.14 | -19.66% | 0.58 | 10/12 | +| E: 反波动率5只 | 2.68% | 0.53 | -8.68% | 0.31 | 8/12 | +| F: 动量>0全选等权 | 10.73% | 1.13 | -18.19% | 0.59 | 10/12 | + +### 3.3 关键发现 + +1. **等权 3 只 (B) 综合最优**:夏普 1.23(最高)、Calmar 0.84(最高)、盈利年 10/12(并列最高) +2. **全仓 1 只 (A) 收益最高但波动最大**:CAGR 20.41%,但最大回撤 -29.65% +3. **5 只持仓边际收益递减**:D 和 E 相比 B 和 C,CAGR 大幅下降但回撤未明显改善 +4. **全选正动量 (F) ≈ 等权 5 只 (D)**:说明选优比全选更重要,动量 alpha 被稀释 +5. **反波动率权重降低收益**:低波动资产权重更高,倾向持有债券/货币等低收益品种 + +--- + +## 4. 等权选 3 只的理论基础 + +### 4.1 从信息论角度:√N 经验法则 + +当从 $N$ 个候选中选取子集构建组合时,一个经典的经验法则是: + +$$ +k^* = \lfloor \sqrt{N} \rceil +$$ + +对于 $N = 9$ 的候选池: + +$$ +k^* = \sqrt{9} = 3 +$$ + +**直觉**:$\sqrt{N}$ 是分散化收益与集中度损失之间的平衡点。少于 $\sqrt{N}$ 时分散不足,多于 $\sqrt{N}$ 时每增加一只带来的边际方差下降不抵 alpha 稀释。 + +### 4.2 从动量文献角度:Top Decile / Top Tercile + +Jegadeesh & Titman (1993, 2001) 的经典动量策略将资产按动量排序后分为 3~10 组,持有**最强的一组**: + +$$ +\text{Top Group Size} = \frac{N}{D} +$$ + +其中 $D$ 为分组数。当 $D = 3$(三分位法)时: + +$$ +k = \frac{N}{3} = \frac{9}{3} = 3 +$$ + +Top Tercile (前 1/3) 是动量文献中最常用的分组方式之一,在回测中稳健地跑赢其他分位。 + +### 4.3 从等权理论角度:DeMiguel (2009) + +DeMiguel, Garlappi & Uppal (2009) 在 "Optimal Versus Naive Diversification: How Inefficient is the 1/N Portfolio?" 中证明: + +> 在估计误差存在的情况下,简单的 $1/N$ 等权组合在样本外表现优于大多数均值-方差优化模型,除非样本量 $T$ 满足: +> +> $$T > \frac{N(N+1)}{2} \cdot c$$ +> +> 其中 $c$ 依赖于资产的夏普比率差异。 + +对于 $N = 3$:$T > 6c$(很容易满足);对于 $N = 9$:$T > 45c$(需要更长数据)。 + +**结论**:在小池子(N=9 候选、k=3 持仓)的场景下,等权是理论最优的权重方案。 + +### 4.4 从方差分解角度:边际分散效应递减 + +等权组合的方差为: + +$$ +\sigma_p^2 = \frac{1}{k}\bar{\sigma}^2 + \frac{k-1}{k}\overline{\text{Cov}} +$$ + +其中 $\bar{\sigma}^2$ 为平均方差,$\overline{\text{Cov}}$ 为平均协方差。 + +对 $k$ 求导: + +$$ +\frac{\partial \sigma_p^2}{\partial k} = -\frac{1}{k^2}(\bar{\sigma}^2 - \overline{\text{Cov}}) +$$ + +边际方差下降 $\propto 1/k^2$,呈二次递减: + +| k | 边际方差下降 (相对于 k=1) | +|---|--------------------------| +| 1 → 2 | $-25.0\%$ | +| 2 → 3 | $-11.1\%$ | +| 3 → 4 | $-6.3\%$ | +| 4 → 5 | $-4.0\%$ | + +从 k=1 到 k=3,方差下降约 $\frac{2}{3}(\bar{\sigma}^2 - \overline{\text{Cov}})$,覆盖了可分散风险的大部分;k>3 后边际效益显著递减。 + +### 4.5 综合公式 + +给定候选池大小 $N$ 和目标风险调整收益最大化,推荐持仓数: + +$$ +\boxed{k^* = \max\!\Big(2,\; \min\!\big(\lfloor\sqrt{N}\rceil,\; \lfloor N/3 \rfloor\big)\Big)} +$$ + +| 候选池 N | √N | N/3 | k* | +|----------|-----|------|-----| +| 6 | 2.4 → 2 | 2 | 2 | +| 9 | 3.0 → 3 | 3 | **3** | +| 12 | 3.5 → 4 | 4 | 4 | +| 16 | 4.0 → 4 | 5 | 4 | +| 20 | 4.5 → 5 | 6 | 5 | + +--- + +## 5. 完整策略流程总结 + +``` +┌────────────────────────────────────────┐ +│ Layer 0-5: 五层漏斗筛选 (~月频重建) │ +│ 输出: N 只低相关候选池 (当前 N=9) │ +└──────────────┬─────────────────────────┘ + ▼ +┌────────────────────────────────────────┐ +│ 动量打分 (日频/周频) │ +│ 自适应回看 + 加权动量 + 崩溃过滤 │ +│ 输出: 各 ETF 动量得分 │ +└──────────────┬─────────────────────────┘ + ▼ +┌────────────────────────────────────────┐ +│ 选出 Top-k 持仓 │ +│ k = min(√N, N/3) = 3 │ +│ 仅选得分 > 0 的 ETF │ +└──────────────┬─────────────────────────┘ + ▼ +┌────────────────────────────────────────┐ +│ 等权配置 (1/k) │ +│ 每只 33.3%,换仓时计算换手成本 │ +└────────────────────────────────────────┘ +``` + +--- + +## 6. 参考文献 + +- **Meucci, A.** (2009). "Managing Diversification." *Risk*, 22(5). — ENB (Effective Number of Bets) 公式来源 +- **Jegadeesh, N. & Titman, S.** (1993, 2001). "Returns to Buying Winners and Selling Losers." — 动量策略与 Top Tercile 方法 +- **DeMiguel, V., Garlappi, L. & Uppal, R.** (2009). "Optimal Versus Naive Diversification." *Review of Financial Studies*. — 1/N 等权优于均值-方差优化 +- **Faber, M.** (2007). "A Quantitative Approach to Tactical Asset Allocation." *SSRN:962461*. — GTAA 风险因子覆盖设计 +- **Antonacci, G.** (2014). "Dual Momentum Investing." *SSRN:2042750*. — 跨资产动量分散化 +- **López de Prado, M.** (2016). "Building Diversified Portfolios that Outperform." *SSRN:2708678*. — HRP 层次聚类相关性优化 diff --git a/docs/动态ETF池筛选引擎-调研与方案.md b/docs/动态ETF池筛选引擎-调研与方案.md new file mode 100644 index 0000000..6c2dbfb --- /dev/null +++ b/docs/动态ETF池筛选引擎-调研与方案.md @@ -0,0 +1,247 @@ +# 动态ETF池自动化筛选引擎 — 调研与方案 + +## 1. 问题背景 + +当前ETF轮动策略的标的池是人工预选的,存在严重的**幸存者偏差**。回测对比实验显示: + +| 标的池 | 累计收益 | CAGR | 最大回撤 | +|--------|---------|------|---------| +| 9只精选ETF (全仓1只, 2015-2026) | 2733.60% | 34.38% | -32.79% | +| 20只行业ETF (全仓1只, 2015-2026) | 208.16% | 10.46% | -67.16% | +| 20只轮动ETF (等权5只, 2020-2026) | 171.36% | 17.48% | -30.85% | + +**结论**: 标的池选择是策略最大的 alpha 来源,需要构建**系统化、无偏差**的动态筛选能力。 + +--- + +## 2. 学术论文与权威机构调研 + +### 2.1 TrendFolios (UCLA, 2024) + +- **论文**: Lu et al. "TrendFolios: A Portfolio Construction Framework for Utilizing Momentum and Trend-Following In a Multi-Asset Portfolio" +- **来源**: arxiv:2506.09330 +- **核心方法**: + - 按资产类别的风险因子对ETF进行分类 + - Universe 随时间自然扩展 — 新ETF上市后才纳入,杜绝前视偏差 + - 结合趋势跟踪信号与动量因子构建多资产组合 +- **对本方案的启发**: Layer 3 资产标签化设计 + 滚动重建机制中的无前视偏差原则 + +### 2.2 AEGIS (2024) + +- **论文**: Chakraborty & Singh. "Taming the Black Swan: A Momentum-Gated Hierarchical Optimisation Framework" +- **来源**: arxiv:2604.09060 +- **核心方法**: + - 流动性硬门槛: FALR (Fraction of Available Liquidity Ratio) >= 0.75 + - 每年根据动量领先者重建 universe + - 分层优化框架: 先筛选 universe → 动量门控 → 层次化权重优化 +- **对本方案的启发**: Layer 1 流动性过滤的硬门槛设计 + 定期重建机制 + +### 2.3 HRP — Hierarchical Risk Parity (Lopez de Prado, 2016) + +- **论文**: Lopez de Prado. "Building Diversified Portfolios that Outperform Out-of-Sample" +- **来源**: SSRN:2708678 +- **核心方法**: + - 基于收益率相关性矩阵进行层次聚类(Hierarchical Clustering) + - 将资产分为互不相关的簇,同簇内取代表性资产 + - 相比传统均值-方差优化,HRP 在样本外表现更稳健,不依赖协方差矩阵求逆 +- **对本方案的启发**: Layer 5 相关性优化选择算法的理论基础 + +### 2.4 Faber GTAA — Global Tactical Asset Allocation (2006) + +- **论文**: Faber. "A Quantitative Approach to Tactical Asset Allocation" +- **来源**: SSRN:962461 +- **核心方法**: + - 选择 5-13 个大类资产ETF,每个代表一个独立的经济驱动因子 + - 用 10 个月移动平均线作为趋势信号(价格 > MA10 则持有,否则转现金) + - 关键洞察: **资产类别的覆盖度比选择数量更重要** +- **对本方案的启发**: Universe 设计原则 — 确保风险因子全覆盖(股票、债券、商品、REITs、外汇) + +### 2.5 Antonacci Dual Momentum (2012) + +- **论文**: Antonacci. "Risk Premia Harvesting Through Dual Momentum" +- **来源**: SSRN:2042750 +- **核心方法**: + - **绝对动量** (时间序列动量): 资产自身是否处于上升趋势 + - **相对动量** (横截面动量): 资产间的相对强弱排序 + - 资产对(pairs)作为构建模块,每对代表一个风险溢价 + - 当绝对动量为负时,转入债券避险 +- **对本方案的启发**: 跨资产分散化设计理念 — 每个资产类别用"对"来覆盖 + +### 2.6 Jegadeesh & Titman (1993) — 动量效应经典文献 + +- **论文**: "Returns to Buying Winners and Selling Losers: Implications for Stock Market Efficiency" +- **来源**: Journal of Finance, 48(1), 65-91 +- **核心发现**: + - 买入过去 3-12 个月的赢家、卖出输家,可获得显著超额收益 + - 动量效应在不同市场、不同时期持续存在 + - 这是所有动量策略的学术基石 + +### 2.7 华宝基金动量优选基金 (业界实践) + +- **来源**: 华宝基金动量优选混合型基金招募说明书 +- **实践方法**: + - 年度 ETF 池调整(非固定池) + - 行业、风格、主题多维度覆盖 + - 定量筛选 + 基金经理主观判断结合 +- **对本方案的启发**: 实业级重建周期参考(年度/季度) + +--- + +## 3. 方案设计:多层漏斗筛选 + +### 3.1 架构概览 + +``` +全量ETF (~1000只) + | + v [Layer 1] 基础过滤 (流动性/类型/上市时间) +约300只 + | + v [Layer 2] 同指数去重 (每个指数只留1只最优ETF) +约200只 + | + v [Layer 3] 大类资产标签化 (自动分类) +约200只 (含标签) + | + v [Layer 4] 类内预筛选 (每类留Top-N) +约30-50只 + | + v [Layer 5] 相关性优化选择 (贪心/HRP聚类) +10-15只 最终池 +``` + +### 3.2 Layer 1 — 基础过滤(硬性门槛) + +| 过滤条件 | 原因 | +|---------|------| +| 上市满1年 | 新基金数据不足,无法计算有效因子 | +| 日均成交额 > 5000万 | 流动性不足会导致冲击成本过大 (参考AEGIS的FALR门槛) | +| 非货币/非债券增强类 | 货币基金无轮动意义 | +| 非杠杆/非反向ETF | 杠杆ETF不适合持有 | +| 有明确跟踪指数 | 需要指数数据计算因子 | + +### 3.3 Layer 2 — 同指数去重 + +一个指数可能有 20+ 只 ETF 跟踪(如沪深300有30+只),按 `index_code` 分组,每组选1只: + +1. 优先选**日均成交额最大**的(流动性最好) +2. 同等条件下选**管理费最低**的 +3. 同等条件下选**上市时间最早**的(历史数据最长) + +### 3.4 Layer 3 — 大类资产标签化 + +根据指数名称和类别,自动打上资产大类标签(参考 Faber GTAA 的风险因子覆盖思想): + +```python +ASSET_CLASS_RULES = { + 'A股宽基': ['沪深300', '中证500', '中证1000', '创业板', '上证50', '科创50'], + 'A股行业': ['银行', '证券', '医疗', '白酒', '军工', '新能源', '芯片', '煤炭', ...], + 'A股主题': ['红利', '消费', '科技', '央企', '国企', ...], + '港股': ['恒生', '港股', 'H股'], + '美股': ['纳斯达克', '纳指', '标普', '美股'], + '全球/其他': ['日经', '德国', '法国', '越南', '印度', 'MSCI'], + '商品': ['黄金', '白银', '原油', '有色', '豆粕'], + '债券': ['国债', '利率债', '信用债', '可转债'], +} +``` + +### 3.5 Layer 4 — 类内预筛选 + +每个大类保留最具代表性的 Top-N 只,避免单一类别占满池子: + +| 大类 | 保留数量 | 选择依据 | +|------|---------|---------| +| A股宽基 | 3-5 | 按规模/流动性排序 | +| A股行业 | 8-12 | 按行业分散度,每个细分行业最多1只 | +| A股主题 | 3-5 | 按流动性 | +| 港股 | 2-3 | 按流动性 | +| 美股 | 2-3 | 按流动性 | +| 全球/其他 | 2-3 | 按流动性 | +| 商品 | 2-3 | 按流动性 | +| 债券 | 2-3 | 按流动性 | + +### 3.6 Layer 5 — 相关性优化选择(核心算法) + +基于 HRP (Lopez de Prado) 的层次聚类思想,用贪心最大分散化算法从30-50只候选中选出最终10-15只: + +```python +def greedy_max_diversification(candidates, n_select, lookback_days=120): + """ + 1. 计算所有候选的 lookback_days 日收益率相关系数矩阵 + 2. 先选入每个大类中流动性最好的1只(确保类别覆盖) + 3. 剩余名额贪心填充: + - 对每个未选候选,计算其与已选集合的最大相关系数 + - 选入 max_corr 最小的(即与已有持仓最不相关的) + 4. 重复直到选满 n_select 只 + """ +``` + +**约束条件**: +- 每个大类至少1只(确保资产类别覆盖) +- 任意两只的相关系数不超过 0.85(强制分散) +- A股行业类别不超过总数的 50%(避免A股过度集中) + +--- + +## 4. 定期重建机制 + +- **重建周期**: 每季度(90个交易日)重建一次 +- **平滑切换**: 新旧池差异超过 30% 时才执行切换,避免频繁调整 +- **反前视偏差** (TrendFolios/AEGIS 强调): 重建时只用截止到重建日的历史数据,Universe 随时间自然扩展 +- **退市ETF处理**: 回测中需包含已退市ETF的历史数据,避免幸存者偏差 + +--- + +## 5. 实现规划 + +### 5.1 独立脚本 `scripts/build_etf_universe.py` + +```python +class ETFUniverseBuilder: + def __init__(self, config): + self.min_trading_days = 250 # 上市满1年 + self.min_daily_amount = 5000 # 日均成交额万元 + self.n_select = 12 # 最终池大小 + self.max_corr = 0.85 # 最大相关系数 + self.lookback_days = 120 # 相关性计算窗口 + + def run(self): + raw = self.fetch_etf_universe() # Layer 0: 获取全量 + filtered = self.basic_filter(raw) # Layer 1: 基础过滤 + deduped = self.dedup_by_index(filtered) # Layer 2: 同指数去重 + labeled = self.label_asset_class(deduped) # Layer 3: 标签化 + shortlist = self.intra_class_select(labeled) # Layer 4: 类内筛选 + final = self.correlation_optimize(shortlist) # Layer 5: 相关性优化 + self.save_results(final) + return final +``` + +### 5.2 输出文件 + +- `data/etf_universe/universe_{date}.csv`: 最终筛选结果 +- `data/etf_universe/pipeline_log_{date}.txt`: 每层过滤日志 +- `data/etf_universe/corr_matrix_{date}.csv`: 相关性矩阵 + +### 5.3 集成到动量策略 + +修改 `动量.py`,支持从动态池加载: + +```python +CONFIG = { + 'etf_pool': 'auto', # 'auto' 表示使用动态池 + 'rebuild_interval': 90, # 每90个交易日重建 +} +``` + +回测时每隔90天调用一次 `ETFUniverseBuilder`,用截止到当前回测日期的数据重建池子,确保不使用未来数据。 + +--- + +## 6. 参考文献 + +1. Lu et al. (2024). "TrendFolios: A Portfolio Construction Framework for Utilizing Momentum and Trend-Following In a Multi-Asset Portfolio". *arxiv:2506.09330* +2. Chakraborty & Singh (2024). "Taming the Black Swan: A Momentum-Gated Hierarchical Optimisation Framework". *arxiv:2604.09060* +3. Lopez de Prado (2016). "Building Diversified Portfolios that Outperform Out-of-Sample" (HRP). *SSRN:2708678* +4. Faber (2006). "A Quantitative Approach to Tactical Asset Allocation". *SSRN:962461* +5. Antonacci (2012). "Risk Premia Harvesting Through Dual Momentum". *SSRN:2042750* +6. Jegadeesh & Titman (1993). "Returns to Buying Winners and Selling Losers". *Journal of Finance, 48(1), 65-91* diff --git a/scripts/build_etf_universe.py b/scripts/build_etf_universe.py new file mode 100644 index 0000000..11a51ed --- /dev/null +++ b/scripts/build_etf_universe.py @@ -0,0 +1,744 @@ +""" +动态ETF池自动化筛选引擎 +========================= +多层漏斗筛选,从全市场ETF中选出低相关、高流动性、覆盖多资产类别的最优轮动候选池。 + +参考文献: +- TrendFolios (arxiv:2506.09330): 资产标签化 + 无前视偏差 +- AEGIS (arxiv:2604.09060): 流动性硬门槛 + 定期重建 +- HRP (SSRN:2708678): 层次聚类相关性优化 +- Faber GTAA (SSRN:962461): 风险因子覆盖设计 +- Antonacci Dual Momentum (SSRN:2042750): 跨资产分散化 + +用法: + python scripts/build_etf_universe.py # 当前日期构建 + python scripts/build_etf_universe.py --date 20240101 # 指定日期构建 +""" + +import os +import sys +import time +import argparse +import logging +from pathlib import Path +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from dotenv import load_dotenv +load_dotenv() + +import tushare as ts + +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + +# ============================================================ +# 配置 +# ============================================================ +DEFAULT_CONFIG = { + 'min_list_days': 365, # 上市满1年 + 'min_daily_amount': 5000, # 日均成交额(万元) + 'lookback_amount_days': 60, # 计算日均成交额的窗口 + 'n_select': 'auto', # 最终池大小: 'auto'=ENB驱动, 或整数固定 + 'candidate_multiplier': 3.0, # Layer4 候选池 = ENB估计 * 此倍数 + 'min_per_class': 2, # 每类最少保留数 + 'max_corr': 0.85, # 最大允许相关系数 + 'corr_lookback_days': 120, # 相关性计算窗口 + 'max_equity_ratio': 0.5, # A股行业占比上限 + 'enb_fallback': 12, # ENB计算失败时的回退值 +} + +# ============================================================ +# Layer 3: 大类资产分类配置 +# ============================================================ +# 分类优先级: fund_type/invest_type(官方字段) > benchmark(跟踪指数) > name(名称关键词兜底) + +# Layer 4: 大类资产类别列表 (保留数量由数据驱动计算) +ASSET_CLASSES = ['A股宽基', 'A股行业', 'A股主题', '港股', '美股', + '全球/其他', '商品', '债券', 'REITs', '货币/现金'] + +# --- 以下为分类规则(仅作名称兜底时使用) --- +_BROAD_KW = ['沪深300', '中证500', '中证1000', '创业板', '上证50', '科创50', + '上证180', '深证100', '中证100', 'A50', 'A500', '中证800', + '万得全A', '富时A50', 'MSCI中国A'] +_HK_KW = ['恒生', '港股', 'H股', '港股通'] +_US_KW = ['纳斯达克', '纳指', '标普500', '美股', 'S&P500', '道琼斯'] +_GLOBAL_KW = ['日经', '德国', '法国', '越南', '印度', '东南亚', + '沙特', '韩国', '英国', '全球', '亚太'] +_THEME_KW = ['红利', '央企', '国企', 'ESG', '碳中和', '数字经济', + '人工智能', 'AI', '机器人', '信创', '北证50', + '一带一路', '养老', '价值', '成长', '质量', + '现金流', '低波'] + + +class ETFUniverseBuilder: + """动态ETF池筛选引擎""" + + def __init__(self, config: dict = None, ref_date: str = None, data_cache=None): + """ + Args: + config: 配置字典,缺省用 DEFAULT_CONFIG + ref_date: 参考日期 YYYYMMDD,缺省为当天 + data_cache: ETFDataCache 实例,传入则使用本地缓存(无前视偏差模式) + """ + self.cfg = {**DEFAULT_CONFIG, **(config or {})} + self.ref_date = ref_date or datetime.now().strftime('%Y%m%d') + self.ref_dt = pd.Timestamp(self.ref_date) + self.data_cache = data_cache + + if data_cache is None: + token = os.getenv('TUSHARE_TOKEN') + if not token: + raise ValueError("请设置环境变量 TUSHARE_TOKEN") + self.pro = ts.pro_api(token) + else: + self.pro = None # 缓存模式不需要 API + + self.output_dir = Path(__file__).parent.parent / 'data' / 'etf_universe' + self.output_dir.mkdir(parents=True, exist_ok=True) + + # 管线日志 + self._log_lines = [] + + def _log(self, msg: str): + logger.info(msg) + self._log_lines.append(msg) + + def _api_call(self, func, **kwargs): + """带重试和限流的 API 调用""" + for attempt in range(3): + try: + result = func(**kwargs) + time.sleep(0.35) + return result + except Exception as e: + if attempt < 2: + time.sleep(2) + else: + raise e + + # ============================================================ + # Layer 0: 获取全量 ETF 基础数据 + # ============================================================ + def fetch_etf_universe(self) -> pd.DataFrame: + """获取全量上市ETF基础信息""" + self._log("=" * 60) + self._log("Layer 0: 获取全量ETF基础信息") + self._log("=" * 60) + + if self.data_cache is not None: + # 缓存模式: 从本地读取,只保留 ref_date 时已上市且未退市的 + df = self.data_cache.load_basic().copy() + df['list_date'] = pd.to_datetime(df['list_date']) + + # 只保留 ref_date 时已上市的 + mask = df['list_date'] <= self.ref_dt + + # 排除 ref_date 之前已退市的 + if 'delist_date' in df.columns: + delist = pd.to_datetime(df['delist_date'], errors='coerce') + mask = mask & (delist.isna() | (delist > self.ref_dt)) + + # 只保留 market='E' 的(缓存可能包含场外基金) + if 'type' in df.columns: + # fund_basic 的 type 字段区分 ETF 类型 + pass # 缓存已经是 market='E' 的 + + df = df[mask].copy() + self._log(f" 缓存模式: 截至 {self.ref_date} 已上市ETF: {len(df)} 只") + else: + # 在线模式: 调用 API + df = self._api_call( + self.pro.fund_basic, + market='E', + status='L', + fields='ts_code,name,management,list_date,fund_type,invest_type,benchmark,type,trustee' + ) + + if df is None or df.empty: + raise RuntimeError("获取ETF列表失败,请检查Tushare权限") + + self._log(f" 全量上市ETF: {len(df)} 只") + df['list_date'] = pd.to_datetime(df['list_date']) + + return df + + # ============================================================ + # Layer 1: 基础过滤 + # ============================================================ + def basic_filter(self, df: pd.DataFrame) -> pd.DataFrame: + """硬性门槛过滤""" + self._log("\n" + "=" * 60) + self._log("Layer 1: 基础过滤") + self._log("=" * 60) + + before = len(df) + + # 1. 上市时间过滤 + cutoff = self.ref_dt - timedelta(days=self.cfg['min_list_days']) + df = df[df['list_date'] <= cutoff].copy() + self._log(f" 上市满1年: {before} -> {len(df)}") + + # 2. 排除货币型、QDII中的债券型 + # fund_type: 股票型/混合型/债券型/货币型/其他 + if 'fund_type' in df.columns: + exclude_types = ['货币型'] + mask = ~df['fund_type'].str.contains('|'.join(exclude_types), na=False) + df = df[mask] + self._log(f" 排除货币型: -> {len(df)}") + + # 3. 排除杠杆/反向 ETF + leverage_kw = ['杠杆', '反向', '两倍', '三倍', '2X', '3X', '-1X', '分级'] + mask = ~df['name'].str.contains('|'.join(leverage_kw), na=False, case=False) + df = df[mask] + self._log(f" 排除杠杆/反向: -> {len(df)}") + + # 4. 获取流动性数据(日均成交额) + self._log(f"\n 获取近{self.cfg['lookback_amount_days']}日成交额数据...") + amount_start = (self.ref_dt - timedelta(days=self.cfg['lookback_amount_days'] * 2)).strftime('%Y%m%d') + + amounts = {} + total = len(df) + for idx, (_, row) in enumerate(df.iterrows()): + code = row['ts_code'] + if idx % 50 == 0: + self._log(f" 进度: {idx}/{total}") + try: + if self.data_cache is not None: + # 缓存模式 + daily_df = self.data_cache.load_cached_daily(code, self.ref_date) + if not daily_df.empty: + daily_df = daily_df[daily_df['trade_date'] >= amount_start] + if not daily_df.empty and 'amount' in daily_df.columns: + avg_amount = daily_df['amount'].astype(float).mean() / 10 + amounts[code] = avg_amount + else: + # 在线模式 + daily = self._api_call( + self.pro.fund_daily, + ts_code=code, + start_date=amount_start, + end_date=self.ref_date, + fields='ts_code,trade_date,amount' + ) + if daily is not None and not daily.empty: + # amount 单位是千元,转成万元 + avg_amount = daily['amount'].astype(float).mean() / 10 + amounts[code] = avg_amount + except Exception: + pass + + df['avg_daily_amount'] = df['ts_code'].map(amounts) + df = df.dropna(subset=['avg_daily_amount']) + df = df[df['avg_daily_amount'] >= self.cfg['min_daily_amount']] + self._log(f" 日均成交额>={self.cfg['min_daily_amount']}万: -> {len(df)}") + + self._log(f"\nLayer 1 结果: {before} -> {len(df)}") + return df + + # ============================================================ + # Layer 2: 同指数去重 + # ============================================================ + def dedup_by_index(self, df: pd.DataFrame) -> pd.DataFrame: + """同一跟踪指数只保留最优的一只ETF""" + self._log("\n" + "=" * 60) + self._log("Layer 2: 同指数去重") + self._log("=" * 60) + + before = len(df) + + # 尝试获取指数信息做去重 + # 先从 name 中提取隐含的指数信息 + # 用名称相似度进行分组: 去掉 ETF/联接/LOF 等后缀 + import re + + def extract_index_name(name: str) -> str: + """从ETF名称提取核心指数名""" + # 去掉常见后缀 + for suffix in ['ETF', 'LOF', '联接', '基金', 'A', 'C', '(', '(']: + name = name.split(suffix)[0] + # 去掉基金公司前缀 (通常是2-4个汉字 + 核心名) + # 常见基金公司 + companies = ['华夏', '易方达', '南方', '华安', '嘉实', '富国', '广发', + '博时', '工银', '招商', '华宝', '天弘', '中银', '建信', + '汇添富', '鹏华', '国泰', '银华', '大成', '景顺', '长城', + '中欧', '交银', '兴全', '平安', '万家', '泰康', '诺安', + '华泰柏瑞', '华泰', '浦银安盛', '国金', '长信', '东方', + '中证', '方正富邦', '前海开源', '申万菱信', '融通'] + for c in companies: + if name.startswith(c): + name = name[len(c):] + break + return name.strip() + + df = df.copy() + df['index_name'] = df['name'].apply(extract_index_name) + + # 按 index_name 分组,每组选日均成交额最大的 + df = df.sort_values('avg_daily_amount', ascending=False) + df = df.drop_duplicates(subset='index_name', keep='first') + + self._log(f" 同名去重: {before} -> {len(df)}") + return df + + # ============================================================ + # Layer 3: 大类资产标签化 + # ============================================================ + def label_asset_class(self, df: pd.DataFrame) -> pd.DataFrame: + """ + 三级分类链: + 1. fund_type / invest_type (官方字段,最可靠) + 2. benchmark (跟踪指数名称) + 3. name (关键词兜底) + """ + self._log("\n" + "=" * 60) + self._log("Layer 3: 大类资产标签化 (官方字段优先)") + self._log("=" * 60) + + def _name_has(text: str, keywords: list) -> bool: + """text 中是否包含任一 keyword""" + t = text.lower() + return any(kw.lower() in t for kw in keywords) + + def classify_row(row) -> str: + ft = str(row.get('fund_type', '') or '') + it = str(row.get('invest_type', '') or '') + bm = str(row.get('benchmark', '') or '') + name = str(row.get('name', '') or '') + combined = f"{name} {bm}" # 名称 + 跟踪指数拼接 + + # ---- 第1级: fund_type 硬判断 ---- + if ft == 'REITs': + return 'REITs' + if ft == '货币市场型': + return '货币/现金' + if ft == '商品型': + return '商品' + + # ---- 第2级: invest_type 细分 ---- + if it in ('黄金现货合约', '白银期货型', '有色金属期货型', + '能源化工期货型', '豆粕期货型', '原油主题基金'): + return '商品' + + # 债券型 + if ft == '债券型': + return '债券' + + # ---- 第3级: 商品类优先判断 (油气/石油/能源类本质是商品,即使QDII包装) ---- + if _name_has(combined, ['油气', '原油', '石油', '能源行业']): + return '商品' + + # ---- 第4级: 地域判断 (从 benchmark + name) ---- + # 港股 + if _name_has(combined, _HK_KW): + return '港股' + # 美股 + if _name_has(combined, _US_KW): + return '美股' + # 全球/其他 + if _name_has(combined, _GLOBAL_KW): + return '全球/其他' + + # ---- 第5级: A股内部细分 (fund_type=股票型/混合型) ---- + if ft in ('股票型', '混合型') or it in ('被动指数型', '增强指数型'): + # 宽基指数 + if _name_has(combined, _BROAD_KW): + return 'A股宽基' + # 主题策略 + if _name_has(combined, _THEME_KW): + return 'A股主题' + # 剩余股票型默认为行业 + return 'A股行业' + + # ---- 兜底 ---- + # 还有一些“另类投资型”等少数类别 + if _name_has(name, ['日利', '添益', '货币']): + return '货币/现金' + if _name_has(name, ['债', '短融', '利率']): + return '债券' + + return '未分类' + + df = df.copy() + df['asset_class'] = df.apply(classify_row, axis=1) + + # 统计每类数量 + class_counts = df['asset_class'].value_counts() + self._log("\n 分类结果:") + for cls, cnt in class_counts.items(): + self._log(f" {cls}: {cnt} 只") + + # 未分类检查 + n_unclassified = (df['asset_class'] == '未分类').sum() + total = len(df) + coverage = (total - n_unclassified) / total * 100 if total > 0 else 0 + self._log(f"\n 分类覆盖率: {coverage:.1f}% ({total - n_unclassified}/{total})") + + if n_unclassified > 0: + self._log(f" 未分类 {n_unclassified} 只:") + unclassified = df[df['asset_class'] == '未分类'].nlargest(10, 'avg_daily_amount') + for _, row in unclassified.iterrows(): + self._log(f" {row['ts_code']} {row['name']} " + f"[ft={row.get('fund_type','')}, it={row.get('invest_type','')}] " + f"(日均{row['avg_daily_amount']:.0f}万)") + + return df + + # ============================================================ + # Layer 4: 类内预筛选 + # ============================================================ + @staticmethod + def _compute_enb(corr_matrix) -> float: + """计算 Effective Number of Bets (Meucci 2009) + ENB = exp(- sum(p_i * ln(p_i))), p_i = λ_i / sum(λ) + """ + import numpy as np + eigenvalues = np.linalg.eigvalsh(corr_matrix.values) + eigenvalues = eigenvalues[eigenvalues > 1e-10] # 只取正特征值 + p = eigenvalues / eigenvalues.sum() + return float(np.exp(-np.sum(p * np.log(p)))) + + def _compute_class_limits(self, df: pd.DataFrame) -> dict: + """数据驱动的类内保留数量: max(min_per_class, round(class_ratio * budget)) + budget = candidate_multiplier * ENB估计 (首次用 enb_fallback) + """ + class_counts = df['asset_class'].value_counts().to_dict() + total = sum(class_counts.get(c, 0) for c in ASSET_CLASSES) + if total == 0: + return {c: self.cfg['min_per_class'] for c in ASSET_CLASSES} + + # 预估 budget + n_classes_present = sum(1 for c in ASSET_CLASSES if class_counts.get(c, 0) > 0) + enb_est = self.cfg.get('enb_fallback', 12) + budget = int(enb_est * self.cfg['candidate_multiplier']) + + limits = {} + for cls in ASSET_CLASSES: + cnt = class_counts.get(cls, 0) + if cnt == 0: + limits[cls] = 0 + continue + ratio = cnt / total + raw = ratio * budget + limits[cls] = min(cnt, max(self.cfg['min_per_class'], round(raw))) + + self._log(f" 候选预算: budget={budget} (ENB估计={enb_est}, 倍数={self.cfg['candidate_multiplier']})") + self._log(f" 等比分配: {limits}") + return limits + + def intra_class_select(self, df: pd.DataFrame) -> pd.DataFrame: + """数据驱动类内预筛选: 按各类占比等比分配名额""" + self._log("\n" + "=" * 60) + self._log("Layer 4: 类内预筛选 (等比分配)") + self._log("=" * 60) + + before = len(df) + limits = self._compute_class_limits(df) + selected = [] + + for cls_name in ASSET_CLASSES: + limit = limits.get(cls_name, 0) + if limit == 0: + continue + cls_df = df[df['asset_class'] == cls_name] + if cls_df.empty: + continue + top = cls_df.nlargest(limit, 'avg_daily_amount') + selected.append(top) + self._log(f" {cls_name}: {len(cls_df)} -> {len(top)} 只") + for _, row in top.iterrows(): + self._log(f" {row['ts_code']} {row['name']} (日均{row['avg_daily_amount']:.0f}万)") + + # 未分类中流动性特别好的保留少量 + unclassified = df[df['asset_class'] == '未分类'] + if not unclassified.empty: + top_unc = unclassified.nlargest(2, 'avg_daily_amount') + top_unc = top_unc[top_unc['avg_daily_amount'] >= self.cfg['min_daily_amount'] * 10] + if not top_unc.empty: + selected.append(top_unc) + self._log(f" 未分类(超高流动): {len(top_unc)} 只") + + result = pd.concat(selected, ignore_index=True) if selected else pd.DataFrame() + self._log(f"\nLayer 4 结果: {before} -> {len(result)}") + return result + + # ============================================================ + # Layer 5: 相关性优化选择 + # ============================================================ + def correlation_optimize(self, df: pd.DataFrame) -> pd.DataFrame: + """ENB驱动 + 贪心最大分散化选择""" + self._log("\n" + "=" * 60) + self._log("Layer 5: 相关性优化选择 (ENB驱动)") + self._log("=" * 60) + + # 1. 获取收益率数据计算相关性 + self._log(f" 获取{self.cfg['corr_lookback_days']}日收益率数据...") + corr_start = (self.ref_dt - timedelta(days=self.cfg['corr_lookback_days'] * 2)).strftime('%Y%m%d') + + returns_dict = {} + for _, row in df.iterrows(): + code = row['ts_code'] + try: + if self.data_cache is not None: + # 缓存模式 + daily = self.data_cache.load_cached_daily(code, self.ref_date) + if not daily.empty and len(daily) >= 60: + daily = daily[daily['trade_date'] >= corr_start] + daily = daily.sort_values('trade_date') + daily['ret'] = daily['close'].astype(float).pct_change() + returns_dict[code] = daily.set_index('trade_date')['ret'].tail(self.cfg['corr_lookback_days']) + else: + # 在线模式 + daily = self._api_call( + self.pro.fund_daily, + ts_code=code, + start_date=corr_start, + end_date=self.ref_date, + fields='ts_code,trade_date,close' + ) + if daily is not None and len(daily) >= 60: + daily = daily.sort_values('trade_date') + daily['ret'] = daily['close'].astype(float).pct_change() + returns_dict[code] = daily.set_index('trade_date')['ret'].tail(self.cfg['corr_lookback_days']) + except Exception: + pass + + if len(returns_dict) < 5: + self._log(" 收益率数据不足,跳过相关性优化") + df = df.copy() + df['selected'] = True + return df + + ret_df = pd.DataFrame(returns_dict).dropna(axis=1, thresh=60) + corr_matrix = ret_df.corr() + + self._log(f" 有效相关性矩阵: {len(corr_matrix)} x {len(corr_matrix)}") + + # 2. 确定目标池大小 + n_select_cfg = self.cfg['n_select'] + if n_select_cfg == 'auto': + # 用候选池相关性矩阵的 ENB 确定自然池大小 + enb = self._compute_enb(corr_matrix) + n_select = max(6, min(int(round(enb)), len(corr_matrix))) + self._log(f" 候选池 ENB = {enb:.2f} -> 目标池大小 = {n_select}") + else: + n_select = int(n_select_cfg) + self._log(f" 固定目标池大小 = {n_select}") + + if len(df) <= n_select: + self._log(f" 候选 {len(df)} <= 目标 {n_select},全部保留") + df = df.copy() + df['selected'] = True + return df + + # 3. 贪心选择 + available_codes = set(corr_matrix.columns) & set(df['ts_code'].values) + df_indexed = df.set_index('ts_code') + + # Step A: 每个大类先选入流动性最好的1只(确保覆盖) + selected = [] + for cls_name in ASSET_CLASSES: + cls_codes = df_indexed[df_indexed['asset_class'] == cls_name].index + cls_available = [c for c in cls_codes if c in available_codes] + if cls_available: + # 按流动性排序 + best = max(cls_available, key=lambda c: df_indexed.loc[c, 'avg_daily_amount']) + selected.append(best) + available_codes.discard(best) + + self._log(f" 类别覆盖: 已选 {len(selected)} 只") + + # Step B: 贪心填充剩余名额 + remaining = n_select - len(selected) + candidates = list(available_codes) + + for _ in range(remaining): + if not candidates: + break + + best_candidate = None + best_max_corr = 2.0 # 越小越好 + + for c in candidates: + if c not in corr_matrix.columns: + continue + # 计算与已选集合的最大相关系数 + if selected: + selected_in_corr = [s for s in selected if s in corr_matrix.columns] + if selected_in_corr: + max_corr = corr_matrix.loc[c, selected_in_corr].abs().max() + else: + max_corr = 0 + else: + max_corr = 0 + + if max_corr < best_max_corr: + best_max_corr = max_corr + best_candidate = c + + if best_candidate is None: + break + + # 检查相关系数阈值 + if best_max_corr > self.cfg['max_corr']: + self._log(f" 剩余候选相关性均>{self.cfg['max_corr']:.2f},停止选择") + break + + selected.append(best_candidate) + candidates.remove(best_candidate) + + # 检查 A股行业占比约束 + selected_df = df_indexed.loc[[s for s in selected if s in df_indexed.index]] + equity_count = (selected_df['asset_class'] == 'A股行业').sum() + total_count = len(selected_df) + if total_count > 0 and equity_count / total_count > self.cfg['max_equity_ratio']: + self._log(f" A股行业占比 {equity_count}/{total_count} 超限,需裁剪") + # 从A股行业中移除相关性最高的 + equity_codes = selected_df[selected_df['asset_class'] == 'A股行业'].index.tolist() + max_equity = int(total_count * self.cfg['max_equity_ratio']) + while len(equity_codes) > max_equity: + # 找出与其他A股行业相关性最高的 + worst = None + worst_avg_corr = -1 + for ec in equity_codes: + others = [c for c in equity_codes if c != ec and c in corr_matrix.columns] + if others and ec in corr_matrix.columns: + avg_corr = corr_matrix.loc[ec, others].abs().mean() + if avg_corr > worst_avg_corr: + worst_avg_corr = avg_corr + worst = ec + if worst: + selected.remove(worst) + equity_codes.remove(worst) + self._log(f" 移除高相关A股行业: {worst}") + else: + break + + # 3. 标记结果 + df = df.copy() + df['selected'] = df['ts_code'].isin(selected) + + self._log(f"\nLayer 5 最终选出: {df['selected'].sum()} 只") + final = df[df['selected']].copy() + for _, row in final.iterrows(): + self._log(f" {row['ts_code']} {row['name']} [{row['asset_class']}] 日均{row['avg_daily_amount']:.0f}万") + + # 保存相关性矩阵 + final_codes = [c for c in final['ts_code'] if c in corr_matrix.columns] + if final_codes: + final_corr = corr_matrix.loc[final_codes, final_codes] + corr_path = self.output_dir / f'corr_matrix_{self.ref_date}.csv' + final_corr.to_csv(corr_path, float_format='%.3f') + self._log(f"\n 相关性矩阵已保存: {corr_path}") + + return df + + # ============================================================ + # 保存结果 + # ============================================================ + def save_results(self, df: pd.DataFrame): + """保存筛选结果和日志""" + # 保存最终池 + final = df[df['selected'] == True].copy() + cols = ['ts_code', 'name', 'asset_class', 'avg_daily_amount'] + cols = [c for c in cols if c in final.columns] + universe_path = self.output_dir / f'universe_{self.ref_date}.csv' + final[cols].to_csv(universe_path, index=False, encoding='utf-8-sig') + self._log(f"\n最终ETF池已保存: {universe_path}") + + # 保存 latest 软链接/副本 + latest_path = self.output_dir / 'universe_latest.csv' + final[cols].to_csv(latest_path, index=False, encoding='utf-8-sig') + + # 保存管线日志 + log_path = self.output_dir / f'pipeline_log_{self.ref_date}.txt' + with open(log_path, 'w', encoding='utf-8') as f: + f.write('\n'.join(self._log_lines)) + self._log(f"管线日志已保存: {log_path}") + + # 打印最终汇总 + self._log("\n" + "=" * 60) + self._log("筛选完成!") + self._log("=" * 60) + self._log(f"最终池: {len(final)} 只ETF") + class_dist = final['asset_class'].value_counts() + for cls, cnt in class_dist.items(): + self._log(f" {cls}: {cnt}") + + # ============================================================ + # 主运行入口 + # ============================================================ + def run(self) -> pd.DataFrame: + """执行完整筛选管线""" + self._log(f"参考日期: {self.ref_date}") + self._log(f"配置: {self.cfg}") + + raw = self.fetch_etf_universe() # Layer 0 + filtered = self.basic_filter(raw) # Layer 1 + deduped = self.dedup_by_index(filtered) # Layer 2 + labeled = self.label_asset_class(deduped) # Layer 3 + shortlist = self.intra_class_select(labeled) # Layer 4 + final = self.correlation_optimize(shortlist) # Layer 5 + self.save_results(final) + + return final + + +# ============================================================ +# 便捷函数:供动量策略回测调用 +# ============================================================ +def build_universe(ref_date: str = None, config: dict = None, data_cache=None) -> dict: + """ + 构建ETF池并返回 {ts_code: name} 字典,可直接用于动量策略 CONFIG['etf_pool'] + + Args: + ref_date: 参考日期 YYYYMMDD + config: 覆盖默认配置 + data_cache: ETFDataCache 实例(缓存模式,无前视偏差) + + Returns: + dict: {ts_code: name} + """ + builder = ETFUniverseBuilder(config=config, ref_date=ref_date, data_cache=data_cache) + result = builder.run() + final = result[result['selected'] == True] + return dict(zip(final['ts_code'], final['name'])) + + +def load_latest_universe() -> dict: + """ + 加载最近一次构建的ETF池 + + Returns: + dict: {ts_code: name} + """ + latest_path = Path(__file__).parent.parent / 'data' / 'etf_universe' / 'universe_latest.csv' + if not latest_path.exists(): + raise FileNotFoundError(f"未找到ETF池文件: {latest_path}\n请先运行 build_etf_universe.py") + df = pd.read_csv(latest_path) + return dict(zip(df['ts_code'], df['name'])) + + +# ============================================================ +# CLI 入口 +# ============================================================ +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='动态ETF池筛选引擎') + parser.add_argument('--date', type=str, default=None, + help='参考日期 YYYYMMDD (默认: 当天)') + parser.add_argument('--n-select', type=str, default='auto', + help='最终池大小: auto=ENB驱动, 或整数 (默认: auto)') + parser.add_argument('--min-amount', type=float, default=5000, + help='最低日均成交额(万) (默认: 5000)') + args = parser.parse_args() + + cfg = { + 'n_select': args.n_select if args.n_select == 'auto' else int(args.n_select), + 'min_daily_amount': args.min_amount, + } + + builder = ETFUniverseBuilder(config=cfg, ref_date=args.date) + builder.run() diff --git a/scripts/etf_data_cache.py b/scripts/etf_data_cache.py new file mode 100644 index 0000000..4c5af09 --- /dev/null +++ b/scripts/etf_data_cache.py @@ -0,0 +1,280 @@ +""" +ETF 全量历史数据本地缓存 +======================== +一次性下载全市场 ETF(含已退市)的基础信息和日线数据到本地, +供回测中按 ref_date 截取历史数据,消除前视偏差。 + +用法: + # 首次下载(约 30-60 分钟,取决于 API 限流) + python scripts/etf_data_cache.py + + # 增量更新(只下载缺失的新数据) + python scripts/etf_data_cache.py --update +""" + +import os +import sys +import time +import logging +from pathlib import Path +from datetime import datetime + +import pandas as pd + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from dotenv import load_dotenv +load_dotenv() + +import tushare as ts + +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + +# 缓存目录 +CACHE_DIR = Path(__file__).parent.parent / 'data' / 'etf_cache' +DAILY_DIR = CACHE_DIR / 'daily' +BASIC_PATH = CACHE_DIR / 'fund_basic.csv' + + +class ETFDataCache: + """ETF 全量历史数据缓存管理器""" + + def __init__(self): + self.pro = ts.pro_api(os.getenv('TUSHARE_TOKEN')) + CACHE_DIR.mkdir(parents=True, exist_ok=True) + DAILY_DIR.mkdir(parents=True, exist_ok=True) + self._basic_df = None # 懒加载 + + # ---------------------------------------------------------- + # API 调用(带重试 + 限流) + # ---------------------------------------------------------- + def _api_call(self, func, **kwargs): + for attempt in range(3): + try: + result = func(**kwargs) + time.sleep(0.35) + return result + except Exception as e: + if attempt < 2: + wait = 2 * (attempt + 1) + logger.warning(f" API 重试 ({attempt+1}/3): {e}, 等待 {wait}s") + time.sleep(wait) + else: + raise + + # ---------------------------------------------------------- + # 1. 下载并缓存 fund_basic + # ---------------------------------------------------------- + def download_basic(self, force: bool = False): + """下载全量 ETF 基础信息(含已退市)""" + if BASIC_PATH.exists() and not force: + logger.info(f"fund_basic 缓存已存在: {BASIC_PATH}") + return + + logger.info("下载全量 ETF 基础信息...") + fields = 'ts_code,name,management,list_date,delist_date,fund_type,invest_type,benchmark,type,trustee,status' + + dfs = [] + for status in ['L', 'D']: # L=上市, D=已退市 + df = self._api_call(self.pro.fund_basic, market='E', status=status, fields=fields) + if df is not None and not df.empty: + dfs.append(df) + logger.info(f" status={status}: {len(df)} 只") + + if not dfs: + raise RuntimeError("获取 ETF 列表失败") + + basic = pd.concat(dfs, ignore_index=True).drop_duplicates(subset='ts_code') + basic.to_csv(BASIC_PATH, index=False, encoding='utf-8-sig') + logger.info(f"fund_basic 已保存: {len(basic)} 只 -> {BASIC_PATH}") + + # ---------------------------------------------------------- + # 2. 批量下载日线数据 + # ---------------------------------------------------------- + def download_daily(self, force: bool = False): + """批量下载所有 ETF 的全历史日线数据""" + basic = self.load_basic() + codes = basic['ts_code'].tolist() + total = len(codes) + logger.info(f"准备下载 {total} 只 ETF 的日线数据...") + + downloaded = 0 + skipped = 0 + failed = 0 + + for i, code in enumerate(codes): + csv_path = DAILY_DIR / f"{code}.csv" + + if csv_path.exists() and not force: + # 增量更新: 读取已有数据的最后日期 + try: + existing = pd.read_csv(csv_path, nrows=1) # 只读首行检查 + if not existing.empty: + skipped += 1 + continue + except Exception: + pass + + if (i - skipped) % 20 == 0: + logger.info(f" 进度: {i}/{total} (下载={downloaded}, 跳过={skipped}, 失败={failed})") + + try: + df = self._api_call( + self.pro.fund_daily, + ts_code=code, + fields='ts_code,trade_date,open,high,low,close,vol,amount' + ) + if df is not None and not df.empty: + df = df.sort_values('trade_date') + df.to_csv(csv_path, index=False) + downloaded += 1 + else: + failed += 1 + except Exception as e: + logger.warning(f" {code} 下载失败: {e}") + failed += 1 + + logger.info(f"日线数据下载完成: 下载={downloaded}, 跳过={skipped}, 失败={failed}") + + def update_daily(self): + """增量更新: 只为已有缓存文件追加新数据""" + basic = self.load_basic() + codes = basic['ts_code'].tolist() + today_str = datetime.now().strftime('%Y%m%d') + + updated = 0 + for code in codes: + csv_path = DAILY_DIR / f"{code}.csv" + if not csv_path.exists(): + continue + + try: + existing = pd.read_csv(csv_path) + if existing.empty: + continue + last_date = str(existing['trade_date'].max()) + if last_date >= today_str: + continue + + # 下载 last_date 之后的数据 + new_df = self._api_call( + self.pro.fund_daily, + ts_code=code, + start_date=str(int(last_date) + 1), + end_date=today_str, + fields='ts_code,trade_date,open,high,low,close,vol,amount' + ) + if new_df is not None and not new_df.empty: + combined = pd.concat([existing, new_df], ignore_index=True) + combined = combined.drop_duplicates(subset='trade_date').sort_values('trade_date') + combined.to_csv(csv_path, index=False) + updated += 1 + except Exception: + pass + + logger.info(f"增量更新完成: {updated} 只有新数据") + + # ---------------------------------------------------------- + # 3. 数据读取接口(回测用) + # ---------------------------------------------------------- + def load_basic(self) -> pd.DataFrame: + """加载 fund_basic 缓存""" + if self._basic_df is not None: + return self._basic_df + + if not BASIC_PATH.exists(): + raise FileNotFoundError(f"fund_basic 缓存不存在,请先运行: python scripts/etf_data_cache.py") + + self._basic_df = pd.read_csv(BASIC_PATH) + return self._basic_df + + def load_cached_daily(self, ts_code: str, end_date: str = None) -> pd.DataFrame: + """ + 加载某只 ETF 的日线数据,截至 end_date(含)。 + + Args: + ts_code: ETF 代码 + end_date: 截止日期 YYYYMMDD,None 表示全部 + + Returns: + DataFrame with columns [trade_date, open, high, low, close, vol, amount] + 按 trade_date 升序排列 + """ + csv_path = DAILY_DIR / f"{ts_code}.csv" + if not csv_path.exists(): + return pd.DataFrame() + + df = pd.read_csv(csv_path) + if df.empty: + return df + + df['trade_date'] = df['trade_date'].astype(str) + df = df.sort_values('trade_date') + + if end_date: + end_str = str(end_date).replace('-', '') + df = df[df['trade_date'] <= end_str] + + return df + + def load_cached_daily_as_series(self, ts_code: str, end_date: str = None, + column: str = 'close') -> pd.Series: + """加载某只 ETF 的单列数据,index 为 datetime""" + df = self.load_cached_daily(ts_code, end_date) + if df.empty: + return pd.Series(dtype=float) + df['date'] = pd.to_datetime(df['trade_date']) + return df.set_index('date')[column].astype(float) + + def load_cached_ohlcv(self, ts_code: str, end_date: str = None) -> pd.DataFrame: + """加载 OHLCV 数据,index 为 datetime(与 动量.py 的 all_data 格式兼容)""" + df = self.load_cached_daily(ts_code, end_date) + if df.empty: + return pd.DataFrame() + df['date'] = pd.to_datetime(df['trade_date']) + df = df.set_index('date').sort_index() + df = df.rename(columns={'vol': 'volume'}) + return df[['open', 'high', 'low', 'close', 'volume']].astype(float) + + def ensure_downloaded(self): + """确保基础信息和日线数据都已下载""" + self.download_basic() + self.download_daily() + + def get_available_codes_at(self, ref_date: str) -> list: + """获取在 ref_date 时已上市且未退市的 ETF 代码列表""" + basic = self.load_basic() + basic['list_date'] = basic['list_date'].astype(str) + mask = basic['list_date'] <= ref_date + + # 排除在 ref_date 之前已退市的 + if 'delist_date' in basic.columns: + delist = basic['delist_date'].astype(str).fillna('99991231') + mask = mask & (delist > ref_date) + + return basic[mask]['ts_code'].tolist() + + +# ---------------------------------------------------------- +# CLI +# ---------------------------------------------------------- +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='ETF 全量历史数据缓存下载') + parser.add_argument('--update', action='store_true', help='增量更新已有缓存') + parser.add_argument('--force', action='store_true', help='强制重新下载全部') + args = parser.parse_args() + + cache = ETFDataCache() + + if args.update: + cache.download_basic(force=True) + cache.update_daily() + else: + cache.download_basic(force=args.force) + cache.download_daily(force=args.force) + + # 统计 + basic = cache.load_basic() + n_daily = len(list(DAILY_DIR.glob('*.csv'))) + logger.info(f"\n缓存统计: fund_basic={len(basic)} 只, 日线文件={n_daily} 个") diff --git a/scripts/momentum_experiment.py b/scripts/momentum_experiment.py new file mode 100644 index 0000000..fbc45e2 --- /dev/null +++ b/scripts/momentum_experiment.py @@ -0,0 +1,399 @@ +""" +动量策略多持仓对比实验 +对比 6 种配置: 全仓1只 / 等权3只 / 反波动率3只 / 等权5只 / 反波动率5只 / 动量>0全选等权 +支持 dynamic 模式: 回测中定期重建ETF池,消除前视偏差 +""" + +import sys +import math +import warnings +from pathlib import Path +from datetime import datetime + +import numpy as np +import pandas as pd + +warnings.filterwarnings("ignore") +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from dotenv import load_dotenv +load_dotenv() + + +# ==================== 复用动量.py的核心函数 ==================== +from 动量 import ( + fetch_all_etf_data, + fetch_etf_nav_data, + calc_atr, + calc_weighted_momentum_score, + apply_crash_filter, + calc_premium_rate, + resolve_etf_pool, +) + + +# ==================== 权重计算 ==================== +def calc_equal_weights(codes: list) -> dict: + """等权""" + w = 1.0 / len(codes) + return {c: w for c in codes} + + +def calc_inv_vol_weights(codes: list, all_data: dict, today, lookback: int = 20) -> dict: + """反波动率加权: 权重 ∝ 1/σ""" + vols = {} + for c in codes: + if c not in all_data: + continue + df = all_data[c] + hist = df[df.index <= today].tail(lookback + 1) + if len(hist) < 10: + vols[c] = 1.0 # fallback + continue + ret = hist['close'].pct_change().dropna() + vol = ret.std() + vols[c] = vol if vol > 0 else 1e-6 + + if not vols: + return calc_equal_weights(codes) + + inv_vols = {c: 1.0 / v for c, v in vols.items()} + total = sum(inv_vols.values()) + return {c: iv / total for c, iv in inv_vols.items()} + + +# ==================== 多持仓回测引擎 ==================== +def run_multi_backtest(config: dict, all_data: dict, nav_data: dict, + trade_dates: list, etf_codes: list, + target_num: int = 1, weight_mode: str = 'equal', + label: str = '', + data_cache=None, rebuild_interval: int = 0) -> dict: + """ + 多持仓回测 + + Args: + target_num: 同时持有数量 + weight_mode: 'equal' 等权 | 'inv_vol' 反波动率 + label: 实验标签 + data_cache: ETFDataCache 实例(动态重建模式) + rebuild_interval: 重建间隔(交易日),0=不重建 + + Returns: + dict: 绩效指标 + """ + max_lookback = config['max_days'] + 10 + holdings = {} # {code: weight} + daily_returns = [] + n_trades = 0 + last_rebuild_i = -rebuild_interval if rebuild_interval > 0 else 0 + current_codes = list(etf_codes) # 当前活跃的候选池 + + for i, today in enumerate(trade_dates): + # 动态重建 ETF 池 + if rebuild_interval > 0 and data_cache is not None and (i - last_rebuild_i >= rebuild_interval): + ref_str = today.strftime('%Y%m%d') + try: + new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache) + current_codes = list(new_pool.keys()) + # 加载新增 ETF 数据 + for code in current_codes: + if code not in all_data: + ohlcv = data_cache.load_cached_ohlcv(code) + if not ohlcv.empty: + all_data[code] = ohlcv + last_rebuild_i = i + except Exception: + pass + + # 1. 计算每只 ETF 的得分 (使用当前活跃池) + scores = {} + for code in current_codes: + if code not in all_data: + continue + df = all_data[code] + hist = df[df.index <= today].tail(max_lookback + 1) + if len(hist) < config['min_days']: + continue + + close_arr = hist['close'].values + + if config['auto_day']: + if len(hist) < max_lookback: + lookback = config['fixed_days'] + else: + long_atr = calc_atr(hist['high'], hist['low'], hist['close'], + config['max_days']) + short_atr = calc_atr(hist['high'], hist['low'], hist['close'], + config['min_days']) + la = long_atr.iloc[-1] + sa = short_atr.iloc[-1] + if la > 0 and not np.isnan(la) and not np.isnan(sa): + ratio = min(0.9, sa / la) + lookback = int(config['min_days'] + + (config['max_days'] - config['min_days']) * (1 - ratio)) + else: + lookback = config['fixed_days'] + prices = close_arr[-lookback:] + else: + prices = close_arr[-config['fixed_days']:] + + if len(prices) < 5: + continue + + result = calc_weighted_momentum_score(prices) + score = result['score'] + score = apply_crash_filter(close_arr, score) + + if code in nav_data: + nav_df = nav_data[code] + nav_row = nav_df[nav_df.index <= today] + if not nav_row.empty: + nav_val = nav_row.iloc[-1]['nav'] + etf_price = close_arr[-1] + premium = calc_premium_rate(etf_price, nav_val) + if premium >= config['premium_threshold']: + score -= 1 + + if 0 < score < 6: + scores[code] = score + + # 2. 选出 top N (或全部正动量) + if scores: + ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True) + if target_num == 'all_positive': + targets = [c for c, s in ranked] # scores 已过滤 >0 + else: + targets = [c for c, _ in ranked[:target_num]] + else: + targets = [] + + # 3. 计算权重 + if targets: + if weight_mode == 'inv_vol': + new_weights = calc_inv_vol_weights(targets, all_data, today) + else: + new_weights = calc_equal_weights(targets) + else: + new_weights = {} + + # 4. 计算当日组合收益 + port_ret = 0.0 + for code, weight in holdings.items(): + if code not in all_data: + continue + df_h = all_data[code] + if today in df_h.index: + prev_dates = df_h[df_h.index < today].index + if len(prev_dates) > 0: + prev_price = df_h.loc[prev_dates[-1], 'close'] + today_price = df_h.loc[today, 'close'] + port_ret += weight * (today_price / prev_price - 1) + + # 5. 调仓判断 + old_set = set(holdings.keys()) + new_set = set(new_weights.keys()) + if old_set != new_set: + # 换手成本: 按换手比例收取 + turnover = 0.0 + for c in old_set - new_set: + turnover += holdings[c] + for c in new_set - old_set: + turnover += new_weights[c] + for c in old_set & new_set: + turnover += abs(new_weights[c] - holdings[c]) + trade_cost = turnover * config['trade_cost'] / 2 # 单边已含在trade_cost中 + n_trades += 1 + else: + trade_cost = 0.0 + + holdings = new_weights + + daily_returns.append({ + 'date': today, + 'daily_return': port_ret - trade_cost, + }) + + # 计算绩效 + result_df = pd.DataFrame(daily_returns).set_index('date') + result_df['nav'] = (1 + result_df['daily_return']).cumprod() + + nav = result_df['nav'] + total_return = nav.iloc[-1] / nav.iloc[0] - 1 + days = (result_df.index[-1] - result_df.index[0]).days + cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 + daily_rets = result_df['daily_return'] + sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 + peak = nav.cummax() + drawdown = (nav - peak) / peak + max_dd = drawdown.min() + calmar = cagr / abs(max_dd) if max_dd != 0 else 0 + win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0 + years = days / 365 + + # 年度统计 + win_years = 0 + total_years = 0 + for year, group in result_df.groupby(result_df.index.year): + yr = group['nav'] + yr_ret = yr.iloc[-1] / yr.iloc[0] - 1 + total_years += 1 + if yr_ret > 0: + win_years += 1 + + return { + 'label': label, + 'target_num': target_num, + 'weight_mode': weight_mode, + 'total_return': total_return, + 'cagr': cagr, + 'sharpe': sharpe, + 'max_dd': max_dd, + 'calmar': calmar, + 'win_rate': win_rate, + 'n_trades': n_trades, + 'trades_per_year': n_trades / years if years > 0 else 0, + 'win_years': f"{win_years}/{total_years}", + 'result_df': result_df, + } + + +# ==================== 主函数 ==================== +def main(): + from 动量 import CONFIG + + config = CONFIG.copy() + # 强制使用 dynamic 模式 + config['etf_pool'] = 'dynamic' + rebuild_interval = config.get('rebuild_interval', 60) + + # 初始化缓存 + from scripts.etf_data_cache import ETFDataCache + data_cache = ETFDataCache() + + # 用 start_date 作为初始重建日期 + init_ref_date = config['start_date'].replace('-', '') + etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache) + etf_codes = list(etf_pool.keys()) + end_date = datetime.now().strftime('%Y-%m-%d') + + print("=" * 70) + print(" 动量策略多持仓对比实验 (动态重建模式, 无前视偏差)") + print("=" * 70) + print(f" 初始ETF池 ({init_ref_date}): {len(etf_codes)} 只") + for code, name in etf_pool.items(): + print(f" {code} {name}") + print(f" 回测区间: {config['start_date']} ~ {end_date}") + print(f" 重建间隔: {rebuild_interval} 交易日") + + # 从缓存加载数据 + print(f"\n{'='*70}") + print("从本地缓存加载数据...") + all_data = {} + # 加载所有可能用到的 ETF 数据 (初始池 + 后续可能加入的) + for code in etf_codes: + ohlcv = data_cache.load_cached_ohlcv(code) + if not ohlcv.empty: + all_data[code] = ohlcv + nav_data = {} # 动态模式下不使用净值数据 + print(f"价格数据: {len(all_data)} 只") + + # 构建交易日历 + all_dates = set() + for df in all_data.values(): + all_dates.update(df.index.tolist()) + trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date'])) + print(f"交易日: {len(trade_dates)}") + + # 6 组实验 + experiments = [ + {'target_num': 1, 'weight_mode': 'equal', 'label': 'A: 全仓1只'}, + {'target_num': 3, 'weight_mode': 'equal', 'label': 'B: 等权3只'}, + {'target_num': 3, 'weight_mode': 'inv_vol', 'label': 'C: 反波动率3只'}, + {'target_num': 5, 'weight_mode': 'equal', 'label': 'D: 等权5只'}, + {'target_num': 5, 'weight_mode': 'inv_vol', 'label': 'E: 反波动率5只'}, + {'target_num': 'all_positive', 'weight_mode': 'equal', 'label': 'F: 动量>0全选等权'}, + ] + + results = [] + for exp in experiments: + print(f"\n{'─'*70}") + print(f" 运行: {exp['label']}...") + r = run_multi_backtest( + config, all_data, nav_data, trade_dates, etf_codes, + target_num=exp['target_num'], + weight_mode=exp['weight_mode'], + label=exp['label'], + data_cache=data_cache, + rebuild_interval=rebuild_interval, + ) + results.append(r) + print(f" 完成: CAGR={r['cagr']:.2%}, MaxDD={r['max_dd']:.2%}, Sharpe={r['sharpe']:.2f}") + + # 输出对比表 + print(f"\n\n{'='*100}") + print(f"{'':>20s} 动量策略多持仓对比实验结果") + print(f"{'='*100}") + print(f" {'实验':<18s} {'累计收益':>10s} {'CAGR':>8s} {'夏普':>6s} {'最大回撤':>8s} {'Calmar':>8s} {'日胜率':>7s} {'调仓次':>6s} {'年调仓':>6s} {'盈利年':>7s}") + print(f"{'─'*100}") + + for r in results: + print(f" {r['label']:<16s} {r['total_return']:>9.2%} {r['cagr']:>7.2%} {r['sharpe']:>6.2f} " + f"{r['max_dd']:>8.2%} {r['calmar']:>7.2f} {r['win_rate']:>6.2%} " + f"{r['n_trades']:>5d} {r['trades_per_year']:>6.1f} {r['win_years']:>7s}") + + print(f"{'='*100}") + + # 找出最优 + best_sharpe = max(results, key=lambda x: x['sharpe']) + best_calmar = max(results, key=lambda x: x['calmar']) + best_cagr = max(results, key=lambda x: x['cagr']) + + print(f"\n 最高夏普: {best_sharpe['label']} (Sharpe={best_sharpe['sharpe']:.2f})") + print(f" 最高Calmar: {best_calmar['label']} (Calmar={best_calmar['calmar']:.2f})") + print(f" 最高CAGR: {best_cagr['label']} (CAGR={best_cagr['cagr']:.2%})") + + # 保存图表 + try: + import matplotlib + matplotlib.use('Agg') + import matplotlib.pyplot as plt + matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans'] + matplotlib.rcParams['axes.unicode_minus'] = False + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), height_ratios=[3, 1], + gridspec_kw={'hspace': 0.3}) + + colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6'] + for r, color in zip(results, colors): + nav = r['result_df']['nav'] + ax1.plot(nav.index, nav, label=r['label'], linewidth=1.2, color=color) + + ax1.set_title('动量策略多持仓对比 - 净值曲线', fontsize=14, fontweight='bold') + ax1.legend(loc='upper left', fontsize=10) + ax1.grid(True, alpha=0.3) + ax1.set_ylabel('净值') + ax1.set_yscale('log') + + # 回撤 + for r, color in zip(results, colors): + nav = r['result_df']['nav'] + peak = nav.cummax() + dd = (nav - peak) / peak + ax2.plot(dd.index, dd, label=r['label'], linewidth=0.8, color=color, alpha=0.7) + + ax2.set_title('回撤对比', fontsize=12) + ax2.set_ylabel('回撤') + ax2.grid(True, alpha=0.3) + ax2.legend(loc='lower left', fontsize=8) + + chart_path = Path(__file__).parent.parent / 'results' / 'momentum_multi_experiment.png' + chart_path.parent.mkdir(exist_ok=True) + fig.savefig(chart_path, dpi=150, bbox_inches='tight') + plt.close(fig) + print(f"\n 对比图表已保存: {chart_path}") + except Exception as e: + print(f"\n 图表生成失败: {e}") + + +if __name__ == '__main__': + main() diff --git a/动量.py b/动量.py new file mode 100644 index 0000000..0a8b4f9 --- /dev/null +++ b/动量.py @@ -0,0 +1,627 @@ +""" +ETF动量轮动策略 - 本地回测版本 +原始策略来源:聚宽 https://www.joinquant.com/post/1399 + +核心逻辑: +1. 加权线性回归(权重1→2递增)计算趋势得分 +2. score = 年化收益率 × R² +3. ATR动态调整回看窗口(20~60天) +4. 崩盘过滤:连续3天任一天跌>5%则得分归零 +5. 溢价过滤:溢价率≥5%则降权 +6. 全仓单一品种轮动 +""" + +import sys +import math +import warnings +from pathlib import Path +from datetime import datetime + +import numpy as np +import pandas as pd + +warnings.filterwarnings("ignore") + +# 添加项目根目录 +sys.path.insert(0, str(Path(__file__).parent)) + +from dotenv import load_dotenv +load_dotenv() + +# ==================== 策略配置 ==================== +CONFIG = { + # 候选ETF池: + # - dict: 手动指定 {ts_code: name} + # - 'auto': 使用动态筛选引擎自动构建 + # - 'latest': 加载最近一次构建结果 + # - 'dynamic': 回测中定期重建,无前视偏差 + 'etf_pool': 'dynamic', + 'rebuild_interval': 60, # 动态池重建间隔(交易日) + 'target_num': 1, # 持仓数量 + 'auto_day': True, # 是否启用动态周期 + 'fixed_days': 25, # 固定回看天数 + 'min_days': 20, # 动态周期最小值 + 'max_days': 60, # 动态周期最大值 + 'premium_threshold': 5.0, # 溢价率阈值(%) + 'trade_cost': 0.001, # 单次交易成本(双边) + 'start_date': '2015-01-01', + 'benchmark': '000300.SH', # 基准:沪深300 +} + + +# ==================== 数据获取 ==================== +def fetch_all_etf_data(etf_codes: list, start_date: str, end_date: str, etf_pool: dict = None) -> dict: + """使用Tushare获取所有ETF的OHLCV数据""" + import os + import tushare as ts + + token = os.getenv("TUSHARE_TOKEN") + if not token: + raise ValueError("请设置环境变量 TUSHARE_TOKEN") + pro = ts.pro_api(token) + + # 需要额外前置数据用于ATR计算 + pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d') + end_str = end_date.replace('-', '') + + pool_names = etf_pool or {} + all_data = {} + for code in etf_codes: + print(f" 下载 {code} ({pool_names.get(code, '')})...", end=" ") + try: + df = pro.fund_daily( + ts_code=code, + start_date=pre_start, + end_date=end_str, + ) + if df is None or df.empty: + print("✗ 无数据") + continue + + df = df.rename(columns={'trade_date': 'date', 'vol': 'volume'}) + df['date'] = pd.to_datetime(df['date']) + df = df.set_index('date').sort_index() + df = df[['open', 'high', 'low', 'close', 'volume']].astype(float) + all_data[code] = df + print(f"✓ {len(df)} 条") + except Exception as e: + print(f"✗ {e}") + + return all_data + + +def fetch_etf_nav_data(etf_codes: list, start_date: str, end_date: str) -> dict: + """获取ETF净值数据(用于溢价率计算)""" + import os + import tushare as ts + + token = os.getenv("TUSHARE_TOKEN") + pro = ts.pro_api(token) + + pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d') + end_str = end_date.replace('-', '') + + nav_data = {} + for code in etf_codes: + try: + df = pro.fund_nav( + ts_code=code, + start_date=pre_start, + end_date=end_str, + ) + if df is not None and not df.empty: + df = df.rename(columns={'nav_date': 'date', 'unit_nav': 'nav'}) + df['date'] = pd.to_datetime(df['date']) + df = df.set_index('date').sort_index() + nav_data[code] = df[['nav']].astype(float) + except Exception: + pass + + return nav_data + + +# ==================== ATR计算 ==================== +def calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: + """计算ATR(不依赖talib)""" + prev_close = close.shift(1) + tr = pd.concat([ + high - low, + (high - prev_close).abs(), + (low - prev_close).abs(), + ], axis=1).max(axis=1) + return tr.rolling(window=period, min_periods=period).mean() + + +# ==================== 核心得分计算 ==================== +def calc_weighted_momentum_score(prices: np.ndarray) -> dict: + """ + 加权线性回归动量得分 + + Args: + prices: 价格数组(含当日价格) + + Returns: + {'annualized_returns': float, 'r2': float, 'score': float} + """ + if len(prices) < 5: + return {'annualized_returns': 0, 'r2': 0, 'score': 0} + + y = np.log(prices) + x = np.arange(len(y)) + weights = np.linspace(1, 2, len(y)) # 近期权重更高 + + # 加权线性回归 + slope, intercept = np.polyfit(x, y, 1, w=weights) + annualized_returns = math.exp(slope * 250) - 1 + + # 加权R² + y_pred = slope * x + intercept + ss_res = np.sum(weights * (y - y_pred) ** 2) + ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) + r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 + + score = annualized_returns * r2 + + return {'annualized_returns': annualized_returns, 'r2': r2, 'score': score} + + +def apply_crash_filter(prices: np.ndarray, score: float) -> float: + """崩盘过滤:连续3天有任一天跌>5%""" + if len(prices) < 4: + return score + + r1 = prices[-1] / prices[-2] + r2 = prices[-2] / prices[-3] + r3 = prices[-3] / prices[-4] + + # 条件1:任一天跌>5% + con1 = min(r1, r2, r3) < 0.95 + # 条件2:连续下跌且累计跌>5% + con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95) + + if con1 or con2: + return 0.0 + return score + + +def calc_premium_rate(etf_price: float, nav: float) -> float: + """计算溢价率(%)""" + if nav is None or nav == 0 or np.isnan(nav): + return 0.0 + return (etf_price - nav) / nav * 100 + + +# ==================== 回测引擎 ==================== +def resolve_etf_pool(config: dict, ref_date: str = None, data_cache=None) -> dict: + """ + 解析ETF池配置: + - dict: 直接返回 + - 'auto': 调用筛选引擎构建 + - 'latest': 加载最近一次构建结果 + - 'dynamic': 用缓存数据在指定日期重建(无前视偏差) + """ + pool = config['etf_pool'] + if isinstance(pool, dict): + return pool + + from scripts.build_etf_universe import build_universe, load_latest_universe + + if pool == 'latest': + print("加载最近一次构建的动态ETF池...") + return load_latest_universe() + elif pool == 'auto': + print("使用筛选引擎构建动态ETF池...") + return build_universe() + elif pool == 'dynamic': + if data_cache is None: + from scripts.etf_data_cache import ETFDataCache + data_cache = ETFDataCache() + date_str = ref_date or datetime.now().strftime('%Y%m%d') + return build_universe(ref_date=date_str, data_cache=data_cache) + else: + raise ValueError(f"不支持的 etf_pool 配置: {pool}") + + +def run_backtest(config: dict): + """执行回测""" + end_date = datetime.now().strftime('%Y-%m-%d') + pool_mode = config['etf_pool'] if isinstance(config['etf_pool'], str) else '手动指定' + is_dynamic = (pool_mode == 'dynamic') + + # 动态模式: 初始化缓存 + data_cache = None + if is_dynamic: + from scripts.etf_data_cache import ETFDataCache + data_cache = ETFDataCache() + print("动态重建模式: 使用本地缓存数据,无前视偏差") + print(f" 重建间隔: {config['rebuild_interval']} 交易日") + + # 解析初始 ETF 池 + # 动态模式下用 start_date 作为初始重建日期 + init_ref_date = config['start_date'].replace('-', '') if is_dynamic else None + etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache) + etf_codes = list(etf_pool.keys()) + + print("=" * 60) + print(" ETF动量轮动策略 - 本地回测") + print("=" * 60) + print(f" ETF池模式: {pool_mode}") + print(f" 候选ETF: {len(etf_codes)} 只") + for code, name in etf_pool.items(): + print(f" {code} {name}") + print(f" 持仓数量: {config['target_num']}") + print(f" 动态周期: {'开启' if config['auto_day'] else '关闭'}") + if config['auto_day']: + print(f" 回看范围: {config['min_days']}~{config['max_days']} 天") + else: + print(f" 固定回看: {config['fixed_days']} 天") + print(f" 回测区间: {config['start_date']} ~ {end_date}") + + # 1. 获取数据 + print(f"\n{'='*60}") + if data_cache is not None: + print("从本地缓存加载ETF价格数据...") + all_data = {} + for code in etf_codes: + ohlcv = data_cache.load_cached_ohlcv(code) + if not ohlcv.empty: + all_data[code] = ohlcv + print(f" 加载完成: {len(all_data)} 只") + nav_data = {} # 动态模式下暂不用净值数据 + else: + print("下载ETF价格数据...") + all_data = fetch_all_etf_data(etf_codes, config['start_date'], end_date, etf_pool) + print("\n下载ETF净值数据...") + nav_data = fetch_etf_nav_data(etf_codes, config['start_date'], end_date) + print(f" 净值数据: {len(nav_data)} 只") + + if not all_data: + print("无数据,退出") + return + + # 2. 构建交易日历(以A股交易日为准) + all_dates = set() + for df in all_data.values(): + all_dates.update(df.index.tolist()) + trade_dates = sorted(all_dates) + trade_dates = [d for d in trade_dates if d >= pd.Timestamp(config['start_date'])] + + print(f"\n交易日数: {len(trade_dates)}") + print(f"区间: {trade_dates[0].strftime('%Y-%m-%d')} ~ {trade_dates[-1].strftime('%Y-%m-%d')}") + + # 3. 逐日回测 + print(f"\n{'='*60}") + print("开始回测...") + print("=" * 60) + + max_lookback = config['max_days'] + 10 + holding = None # 当前持仓ETF代码 + daily_returns = [] # 每日收益率 + signals = [] # 信号记录 + last_rebuild_i = -config['rebuild_interval'] # 确保第一天就重建 + + for i, today in enumerate(trade_dates): + # 动态重建 ETF 池 + if is_dynamic and (i - last_rebuild_i >= config['rebuild_interval']): + ref_str = today.strftime('%Y%m%d') + print(f"\n [重建] {ref_str}: 重新构建ETF池...") + try: + new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache) + etf_codes = list(new_pool.keys()) + # 加载新增 ETF 的数据 + for code in etf_codes: + if code not in all_data and data_cache is not None: + ohlcv = data_cache.load_cached_ohlcv(code) + if not ohlcv.empty: + all_data[code] = ohlcv + print(f" [重建] 新池子: {len(etf_codes)} 只") + last_rebuild_i = i + except Exception as e: + print(f" [重建] 失败: {e},继续使用旧池") + + # 计算每只ETF的得分 + scores = {} + score_details = {} + + for code in etf_codes: + if code not in all_data: + continue + df = all_data[code] + + # 获取截至今日的历史数据 + hist = df[df.index <= today].tail(max_lookback + 1) + if len(hist) < config['min_days']: + continue + + close_arr = hist['close'].values + + if config['auto_day']: + # 动态周期:基于ATR波动率调整 + if len(hist) < max_lookback: + lookback = config['fixed_days'] + else: + long_atr = calc_atr(hist['high'], hist['low'], hist['close'], + config['max_days']) + short_atr = calc_atr(hist['high'], hist['low'], hist['close'], + config['min_days']) + la = long_atr.iloc[-1] + sa = short_atr.iloc[-1] + if la > 0 and not np.isnan(la) and not np.isnan(sa): + ratio = min(0.9, sa / la) + lookback = int(config['min_days'] + + (config['max_days'] - config['min_days']) * (1 - ratio)) + else: + lookback = config['fixed_days'] + + prices = close_arr[-lookback:] + else: + prices = close_arr[-config['fixed_days']:] + + if len(prices) < 5: + continue + + # 计算得分 + result = calc_weighted_momentum_score(prices) + score = result['score'] + + # 崩盘过滤 + score = apply_crash_filter(close_arr, score) + + # 溢价过滤 + if code in nav_data: + nav_df = nav_data[code] + nav_row = nav_df[nav_df.index <= today] + if not nav_row.empty: + nav_val = nav_row.iloc[-1]['nav'] + etf_price = close_arr[-1] + premium = calc_premium_rate(etf_price, nav_val) + if premium >= config['premium_threshold']: + score -= 1 + + # 只保留有效得分 (0 < score < 6) + if 0 < score < 6: + scores[code] = score + score_details[code] = result + + # 选出排名最高的标的 + if scores: + ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True) + target = ranked[0][0] # target_num=1 + else: + target = None + + # 计算当日收益 + if holding is not None and holding in all_data: + df_h = all_data[holding] + if today in df_h.index: + prev_dates = df_h[df_h.index < today].index + if len(prev_dates) > 0: + prev_date = prev_dates[-1] + prev_price = df_h.loc[prev_date, 'close'] + today_price = df_h.loc[today, 'close'] + daily_ret = today_price / prev_price - 1 + else: + daily_ret = 0.0 + else: + daily_ret = 0.0 + else: + daily_ret = 0.0 + + # 调仓成本 + trade_cost = 0.0 + if target != holding: + trade_cost = config['trade_cost'] + if holding is not None: + signals.append({ + 'date': today, 'action': '调仓', + 'from': holding, 'to': target or '空仓', + 'score': scores.get(target, 0) if target else 0, + }) + holding = target + + daily_returns.append({ + 'date': today, + 'daily_return': daily_ret - trade_cost if trade_cost > 0 else daily_ret, + 'holding': holding or '空仓', + }) + + # 4. 计算绩效 + result_df = pd.DataFrame(daily_returns).set_index('date') + result_df['nav'] = (1 + result_df['daily_return']).cumprod() + + # 基准数据 + benchmark_code = config['benchmark'] + print(f"\n获取基准数据 {benchmark_code}...") + import os, tushare as ts + pro = ts.pro_api(os.getenv("TUSHARE_TOKEN")) + bench_df = pro.index_daily( + ts_code=benchmark_code, + start_date=config['start_date'].replace('-', ''), + end_date=end_date.replace('-', ''), + ) + if bench_df is not None and not bench_df.empty: + bench_df['date'] = pd.to_datetime(bench_df['trade_date']) + bench_df = bench_df.set_index('date').sort_index() + bench_close = bench_df['close'].reindex(result_df.index, method='ffill') + result_df['bench_return'] = bench_close / bench_close.iloc[0] + else: + result_df['bench_return'] = 1.0 + + # 5. 输出绩效报告 + print_performance(result_df, signals, config) + + # 6. 年度收益统计 + print_yearly_returns(result_df) + + # 7. 生成图表 + save_chart(result_df, config) + + return result_df + + +# ==================== 绩效报告 ==================== +def print_performance(result_df: pd.DataFrame, signals: list, config: dict): + """打印绩效报告""" + nav = result_df['nav'] + total_return = nav.iloc[-1] / nav.iloc[0] - 1 + + # 年化收益 + days = (result_df.index[-1] - result_df.index[0]).days + cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 + + # 夏普比率 + daily_rets = result_df['daily_return'] + sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 + + # 最大回撤 + peak = nav.cummax() + drawdown = (nav - peak) / peak + max_dd = drawdown.min() + dd_end = drawdown.idxmin() + dd_start = nav[:dd_end].idxmax() + + # 日胜率 + win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0 + + # 基准收益 + bench_return = result_df['bench_return'].iloc[-1] - 1 + bench_cagr = (1 + bench_return) ** (365 / days) - 1 if days > 0 else 0 + + # 调仓次数 + n_trades = len(signals) + years = days / 365 + + # Calmar比率 + calmar = cagr / abs(max_dd) if max_dd != 0 else 0 + + print(f"\n{'='*70}") + print(f" 绩效评估报告") + print(f"{'='*70}") + print(f" 回测区间: {result_df.index[0].strftime('%Y-%m-%d')} ~ {result_df.index[-1].strftime('%Y-%m-%d')}") + print(f" 交易天数: {len(result_df)}") + print(f"{'─'*70}") + print(f" {'指标':<30s} {'动量策略':>12s} {'基准(沪深300)':>14s}") + print(f"{'─'*70}") + print(f" {'累计收益':<28s} {total_return:>11.2%} {bench_return:>13.2%}") + print(f" {'CAGR(年化)':<27s} {cagr:>11.2%} {bench_cagr:>13.2%}") + print(f" {'年化夏普比率':<26s} {sharpe:>11.2f} {'--':>13s}") + print(f" {'最大回撤':<28s} {max_dd:>11.2%} {'--':>13s}") + print(f" {'Calmar比率':<27s} {calmar:>11.2f} {'--':>13s}") + print(f" {'日胜率':<28s} {win_rate:>11.2%} {'--':>13s}") + print(f" {'调仓次数':<28s} {n_trades:>9d}次 {'--':>13s}") + if years > 0: + print(f" {'年均调仓':<28s} {n_trades/years:>9.1f}次 {'--':>13s}") + print(f" {'最大回撤区间':<26s} {dd_start.strftime('%Y-%m-%d')} ~ {dd_end.strftime('%Y-%m-%d')}") + print(f"{'='*70}") + + # 最新持仓信号 + last_row = result_df.iloc[-1] + print(f"\n 最新持仓: {last_row['holding']}", end="") + if last_row['holding'] != '空仓': + pool = config['etf_pool'] if isinstance(config['etf_pool'], dict) else {} + name = pool.get(last_row['holding'], '') + print(f" ({name})", end="") + print(f"\n 最新净值: {last_row['nav']:.4f}") + + +# ==================== 年度收益统计 ==================== +def print_yearly_returns(result_df: pd.DataFrame): + """按年统计收益""" + nav = result_df['nav'] + bench = result_df['bench_return'] + + # 按年分组 + yearly_data = [] + for year, group in result_df.groupby(result_df.index.year): + year_nav = group['nav'] + year_ret = year_nav.iloc[-1] / year_nav.iloc[0] - 1 + + year_bench = group['bench_return'] + bench_ret = year_bench.iloc[-1] / year_bench.iloc[0] - 1 + + # 年内最大回撤 + peak = year_nav.cummax() + dd = (year_nav - peak) / peak + max_dd = dd.min() + + # 年内夏普 + daily_rets = group['daily_return'] + sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 + + # 超额收益 + excess = year_ret - bench_ret + + yearly_data.append({ + 'year': year, + 'return': year_ret, + 'bench_return': bench_ret, + 'excess': excess, + 'max_dd': max_dd, + 'sharpe': sharpe, + 'trade_days': len(group), + }) + + print(f"\n{'='*90}") + print(f" 年度收益统计") + print(f"{'='*90}") + print(f" {'年份':<6s} {'策略收益':>10s} {'基准收益':>10s} {'超额收益':>10s} {'最大回撤':>10s} {'夏普比率':>10s} {'交易天数':>10s}") + print(f"{'─'*90}") + + for d in yearly_data: + print(f" {d['year']:<6d} {d['return']:>9.2%} {d['bench_return']:>9.2%} {d['excess']:>9.2%} {d['max_dd']:>9.2%} {d['sharpe']:>9.2f} {d['trade_days']:>8d}") + + print(f"{'─'*90}") + + # 汇总 + total_ret = nav.iloc[-1] / nav.iloc[0] - 1 + total_bench = bench.iloc[-1] / bench.iloc[0] - 1 + win_years = sum(1 for d in yearly_data if d['return'] > 0) + beat_years = sum(1 for d in yearly_data if d['excess'] > 0) + total_years = len(yearly_data) + + print(f" {'合计':<6s} {total_ret:>9.2%} {total_bench:>9.2%} {total_ret - total_bench:>9.2%}") + print(f" 盈利年份: {win_years}/{total_years} | 跑赢基准年份: {beat_years}/{total_years}") + print(f"{'='*90}") + + +# ==================== 图表生成 ==================== +def save_chart(result_df: pd.DataFrame, config: dict): + """生成净值曲线图""" + try: + import matplotlib + matplotlib.use('Agg') + import matplotlib.pyplot as plt + matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans'] + matplotlib.rcParams['axes.unicode_minus'] = False + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), height_ratios=[3, 1], + gridspec_kw={'hspace': 0.3}) + + # 净值曲线 + ax1.plot(result_df.index, result_df['nav'], label='动量策略', linewidth=1.5, color='#e74c3c') + ax1.plot(result_df.index, result_df['bench_return'], label='沪深300', linewidth=1, color='#95a5a6') + ax1.set_title('ETF动量轮动策略 净值曲线', fontsize=14, fontweight='bold') + ax1.legend(loc='upper left') + ax1.grid(True, alpha=0.3) + ax1.set_ylabel('净值') + + # 回撤曲线 + peak = result_df['nav'].cummax() + drawdown = (result_df['nav'] - peak) / peak + ax2.fill_between(result_df.index, drawdown, 0, alpha=0.4, color='#e74c3c') + ax2.set_title('回撤', fontsize=12) + ax2.set_ylabel('回撤') + ax2.grid(True, alpha=0.3) + + chart_path = Path(__file__).parent / 'results' / 'momentum_chart.png' + chart_path.parent.mkdir(exist_ok=True) + fig.savefig(chart_path, dpi=150, bbox_inches='tight') + plt.close(fig) + print(f"\n报告图表已保存: {chart_path}") + except Exception as e: + print(f"\n图表生成失败: {e}") + + +# ==================== 主入口 ==================== +if __name__ == "__main__": + run_backtest(CONFIG)