Files
etf/动量.py
aszerW 2829f80427 feat(backtest): 消除前视偏差,实现动态ETF池重建
消除回测前视偏差(Look-Ahead Bias):
- 新增 ETFDataCache 本地缓存系统,预下载全量ETF(含已退市)基础信息和日线数据
- 改造 ETFUniverseBuilder 支持纯历史模式,每个时间点只使用当时可获得的数据
- 动量.py 新增 dynamic 模式,回测中每60交易日动态重建ETF候选池
- momentum_experiment.py 同步支持动态重建
- 新增 ETF筛选引擎文档和动态池方案文档

无前视偏差实验结果(6组对比,2015-2026):
  A: 全仓1只       CAGR=3.32%, MaxDD=-63.19%, Sharpe=0.26
  B: 等权3只       CAGR=3.40%, MaxDD=-49.72%, Sharpe=0.30 ← 最优
  C: 反波动率3只   CAGR=1.73%, MaxDD=-38.59%, Sharpe=0.21
  D: 等权5只       CAGR=2.77%, MaxDD=-42.39%, Sharpe=0.29
  E: 反波动率5只   CAGR=-0.37%, MaxDD=-19.56%, Sharpe=-0.03
  F: 动量>0全选等权 CAGR=2.02%, MaxDD=-43.27%, Sharpe=0.24

最优方案: B(等权3只)夏普、Calmar、CAGR三项均最高
2026-04-29 22:15:01 +08:00

628 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ETF动量轮动策略 - 本地回测版本
原始策略来源:聚宽 https://www.joinquant.com/post/1399
核心逻辑:
1. 加权线性回归权重1→2递增计算趋势得分
2. score = 年化收益率 ×
3. ATR动态调整回看窗口20~60天
4. 崩盘过滤连续3天任一天跌>5%则得分归零
5. 溢价过滤溢价率≥5%则降权
6. 全仓单一品种轮动
"""
import sys
import math
import warnings
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
# 添加项目根目录
sys.path.insert(0, str(Path(__file__).parent))
from dotenv import load_dotenv
load_dotenv()
# ==================== 策略配置 ====================
CONFIG = {
# 候选ETF池:
# - dict: 手动指定 {ts_code: name}
# - 'auto': 使用动态筛选引擎自动构建
# - 'latest': 加载最近一次构建结果
# - 'dynamic': 回测中定期重建,无前视偏差
'etf_pool': 'dynamic',
'rebuild_interval': 60, # 动态池重建间隔(交易日)
'target_num': 1, # 持仓数量
'auto_day': True, # 是否启用动态周期
'fixed_days': 25, # 固定回看天数
'min_days': 20, # 动态周期最小值
'max_days': 60, # 动态周期最大值
'premium_threshold': 5.0, # 溢价率阈值(%)
'trade_cost': 0.001, # 单次交易成本(双边)
'start_date': '2015-01-01',
'benchmark': '000300.SH', # 基准沪深300
}
# ==================== 数据获取 ====================
def fetch_all_etf_data(etf_codes: list, start_date: str, end_date: str, etf_pool: dict = None) -> dict:
"""使用Tushare获取所有ETF的OHLCV数据"""
import os
import tushare as ts
token = os.getenv("TUSHARE_TOKEN")
if not token:
raise ValueError("请设置环境变量 TUSHARE_TOKEN")
pro = ts.pro_api(token)
# 需要额外前置数据用于ATR计算
pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d')
end_str = end_date.replace('-', '')
pool_names = etf_pool or {}
all_data = {}
for code in etf_codes:
print(f" 下载 {code} ({pool_names.get(code, '')})...", end=" ")
try:
df = pro.fund_daily(
ts_code=code,
start_date=pre_start,
end_date=end_str,
)
if df is None or df.empty:
print("✗ 无数据")
continue
df = df.rename(columns={'trade_date': 'date', 'vol': 'volume'})
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date').sort_index()
df = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
all_data[code] = df
print(f"{len(df)}")
except Exception as e:
print(f"{e}")
return all_data
def fetch_etf_nav_data(etf_codes: list, start_date: str, end_date: str) -> dict:
"""获取ETF净值数据用于溢价率计算"""
import os
import tushare as ts
token = os.getenv("TUSHARE_TOKEN")
pro = ts.pro_api(token)
pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d')
end_str = end_date.replace('-', '')
nav_data = {}
for code in etf_codes:
try:
df = pro.fund_nav(
ts_code=code,
start_date=pre_start,
end_date=end_str,
)
if df is not None and not df.empty:
df = df.rename(columns={'nav_date': 'date', 'unit_nav': 'nav'})
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date').sort_index()
nav_data[code] = df[['nav']].astype(float)
except Exception:
pass
return nav_data
# ==================== ATR计算 ====================
def calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
"""计算ATR不依赖talib"""
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs(),
], axis=1).max(axis=1)
return tr.rolling(window=period, min_periods=period).mean()
# ==================== 核心得分计算 ====================
def calc_weighted_momentum_score(prices: np.ndarray) -> dict:
"""
加权线性回归动量得分
Args:
prices: 价格数组(含当日价格)
Returns:
{'annualized_returns': float, 'r2': float, 'score': float}
"""
if len(prices) < 5:
return {'annualized_returns': 0, 'r2': 0, 'score': 0}
y = np.log(prices)
x = np.arange(len(y))
weights = np.linspace(1, 2, len(y)) # 近期权重更高
# 加权线性回归
slope, intercept = np.polyfit(x, y, 1, w=weights)
annualized_returns = math.exp(slope * 250) - 1
# 加权R²
y_pred = slope * x + intercept
ss_res = np.sum(weights * (y - y_pred) ** 2)
ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
score = annualized_returns * r2
return {'annualized_returns': annualized_returns, 'r2': r2, 'score': score}
def apply_crash_filter(prices: np.ndarray, score: float) -> float:
"""崩盘过滤连续3天有任一天跌>5%"""
if len(prices) < 4:
return score
r1 = prices[-1] / prices[-2]
r2 = prices[-2] / prices[-3]
r3 = prices[-3] / prices[-4]
# 条件1任一天跌>5%
con1 = min(r1, r2, r3) < 0.95
# 条件2连续下跌且累计跌>5%
con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95)
if con1 or con2:
return 0.0
return score
def calc_premium_rate(etf_price: float, nav: float) -> float:
"""计算溢价率(%)"""
if nav is None or nav == 0 or np.isnan(nav):
return 0.0
return (etf_price - nav) / nav * 100
# ==================== 回测引擎 ====================
def resolve_etf_pool(config: dict, ref_date: str = None, data_cache=None) -> dict:
"""
解析ETF池配置:
- dict: 直接返回
- 'auto': 调用筛选引擎构建
- 'latest': 加载最近一次构建结果
- 'dynamic': 用缓存数据在指定日期重建(无前视偏差)
"""
pool = config['etf_pool']
if isinstance(pool, dict):
return pool
from scripts.build_etf_universe import build_universe, load_latest_universe
if pool == 'latest':
print("加载最近一次构建的动态ETF池...")
return load_latest_universe()
elif pool == 'auto':
print("使用筛选引擎构建动态ETF池...")
return build_universe()
elif pool == 'dynamic':
if data_cache is None:
from scripts.etf_data_cache import ETFDataCache
data_cache = ETFDataCache()
date_str = ref_date or datetime.now().strftime('%Y%m%d')
return build_universe(ref_date=date_str, data_cache=data_cache)
else:
raise ValueError(f"不支持的 etf_pool 配置: {pool}")
def run_backtest(config: dict):
"""执行回测"""
end_date = datetime.now().strftime('%Y-%m-%d')
pool_mode = config['etf_pool'] if isinstance(config['etf_pool'], str) else '手动指定'
is_dynamic = (pool_mode == 'dynamic')
# 动态模式: 初始化缓存
data_cache = None
if is_dynamic:
from scripts.etf_data_cache import ETFDataCache
data_cache = ETFDataCache()
print("动态重建模式: 使用本地缓存数据,无前视偏差")
print(f" 重建间隔: {config['rebuild_interval']} 交易日")
# 解析初始 ETF 池
# 动态模式下用 start_date 作为初始重建日期
init_ref_date = config['start_date'].replace('-', '') if is_dynamic else None
etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache)
etf_codes = list(etf_pool.keys())
print("=" * 60)
print(" ETF动量轮动策略 - 本地回测")
print("=" * 60)
print(f" ETF池模式: {pool_mode}")
print(f" 候选ETF: {len(etf_codes)}")
for code, name in etf_pool.items():
print(f" {code} {name}")
print(f" 持仓数量: {config['target_num']}")
print(f" 动态周期: {'开启' if config['auto_day'] else '关闭'}")
if config['auto_day']:
print(f" 回看范围: {config['min_days']}~{config['max_days']}")
else:
print(f" 固定回看: {config['fixed_days']}")
print(f" 回测区间: {config['start_date']} ~ {end_date}")
# 1. 获取数据
print(f"\n{'='*60}")
if data_cache is not None:
print("从本地缓存加载ETF价格数据...")
all_data = {}
for code in etf_codes:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
print(f" 加载完成: {len(all_data)}")
nav_data = {} # 动态模式下暂不用净值数据
else:
print("下载ETF价格数据...")
all_data = fetch_all_etf_data(etf_codes, config['start_date'], end_date, etf_pool)
print("\n下载ETF净值数据...")
nav_data = fetch_etf_nav_data(etf_codes, config['start_date'], end_date)
print(f" 净值数据: {len(nav_data)}")
if not all_data:
print("无数据,退出")
return
# 2. 构建交易日历以A股交易日为准
all_dates = set()
for df in all_data.values():
all_dates.update(df.index.tolist())
trade_dates = sorted(all_dates)
trade_dates = [d for d in trade_dates if d >= pd.Timestamp(config['start_date'])]
print(f"\n交易日数: {len(trade_dates)}")
print(f"区间: {trade_dates[0].strftime('%Y-%m-%d')} ~ {trade_dates[-1].strftime('%Y-%m-%d')}")
# 3. 逐日回测
print(f"\n{'='*60}")
print("开始回测...")
print("=" * 60)
max_lookback = config['max_days'] + 10
holding = None # 当前持仓ETF代码
daily_returns = [] # 每日收益率
signals = [] # 信号记录
last_rebuild_i = -config['rebuild_interval'] # 确保第一天就重建
for i, today in enumerate(trade_dates):
# 动态重建 ETF 池
if is_dynamic and (i - last_rebuild_i >= config['rebuild_interval']):
ref_str = today.strftime('%Y%m%d')
print(f"\n [重建] {ref_str}: 重新构建ETF池...")
try:
new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache)
etf_codes = list(new_pool.keys())
# 加载新增 ETF 的数据
for code in etf_codes:
if code not in all_data and data_cache is not None:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
print(f" [重建] 新池子: {len(etf_codes)}")
last_rebuild_i = i
except Exception as e:
print(f" [重建] 失败: {e},继续使用旧池")
# 计算每只ETF的得分
scores = {}
score_details = {}
for code in etf_codes:
if code not in all_data:
continue
df = all_data[code]
# 获取截至今日的历史数据
hist = df[df.index <= today].tail(max_lookback + 1)
if len(hist) < config['min_days']:
continue
close_arr = hist['close'].values
if config['auto_day']:
# 动态周期基于ATR波动率调整
if len(hist) < max_lookback:
lookback = config['fixed_days']
else:
long_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['max_days'])
short_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['min_days'])
la = long_atr.iloc[-1]
sa = short_atr.iloc[-1]
if la > 0 and not np.isnan(la) and not np.isnan(sa):
ratio = min(0.9, sa / la)
lookback = int(config['min_days'] +
(config['max_days'] - config['min_days']) * (1 - ratio))
else:
lookback = config['fixed_days']
prices = close_arr[-lookback:]
else:
prices = close_arr[-config['fixed_days']:]
if len(prices) < 5:
continue
# 计算得分
result = calc_weighted_momentum_score(prices)
score = result['score']
# 崩盘过滤
score = apply_crash_filter(close_arr, score)
# 溢价过滤
if code in nav_data:
nav_df = nav_data[code]
nav_row = nav_df[nav_df.index <= today]
if not nav_row.empty:
nav_val = nav_row.iloc[-1]['nav']
etf_price = close_arr[-1]
premium = calc_premium_rate(etf_price, nav_val)
if premium >= config['premium_threshold']:
score -= 1
# 只保留有效得分 (0 < score < 6)
if 0 < score < 6:
scores[code] = score
score_details[code] = result
# 选出排名最高的标的
if scores:
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
target = ranked[0][0] # target_num=1
else:
target = None
# 计算当日收益
if holding is not None and holding in all_data:
df_h = all_data[holding]
if today in df_h.index:
prev_dates = df_h[df_h.index < today].index
if len(prev_dates) > 0:
prev_date = prev_dates[-1]
prev_price = df_h.loc[prev_date, 'close']
today_price = df_h.loc[today, 'close']
daily_ret = today_price / prev_price - 1
else:
daily_ret = 0.0
else:
daily_ret = 0.0
else:
daily_ret = 0.0
# 调仓成本
trade_cost = 0.0
if target != holding:
trade_cost = config['trade_cost']
if holding is not None:
signals.append({
'date': today, 'action': '调仓',
'from': holding, 'to': target or '空仓',
'score': scores.get(target, 0) if target else 0,
})
holding = target
daily_returns.append({
'date': today,
'daily_return': daily_ret - trade_cost if trade_cost > 0 else daily_ret,
'holding': holding or '空仓',
})
# 4. 计算绩效
result_df = pd.DataFrame(daily_returns).set_index('date')
result_df['nav'] = (1 + result_df['daily_return']).cumprod()
# 基准数据
benchmark_code = config['benchmark']
print(f"\n获取基准数据 {benchmark_code}...")
import os, tushare as ts
pro = ts.pro_api(os.getenv("TUSHARE_TOKEN"))
bench_df = pro.index_daily(
ts_code=benchmark_code,
start_date=config['start_date'].replace('-', ''),
end_date=end_date.replace('-', ''),
)
if bench_df is not None and not bench_df.empty:
bench_df['date'] = pd.to_datetime(bench_df['trade_date'])
bench_df = bench_df.set_index('date').sort_index()
bench_close = bench_df['close'].reindex(result_df.index, method='ffill')
result_df['bench_return'] = bench_close / bench_close.iloc[0]
else:
result_df['bench_return'] = 1.0
# 5. 输出绩效报告
print_performance(result_df, signals, config)
# 6. 年度收益统计
print_yearly_returns(result_df)
# 7. 生成图表
save_chart(result_df, config)
return result_df
# ==================== 绩效报告 ====================
def print_performance(result_df: pd.DataFrame, signals: list, config: dict):
"""打印绩效报告"""
nav = result_df['nav']
total_return = nav.iloc[-1] / nav.iloc[0] - 1
# 年化收益
days = (result_df.index[-1] - result_df.index[0]).days
cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
# 夏普比率
daily_rets = result_df['daily_return']
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
# 最大回撤
peak = nav.cummax()
drawdown = (nav - peak) / peak
max_dd = drawdown.min()
dd_end = drawdown.idxmin()
dd_start = nav[:dd_end].idxmax()
# 日胜率
win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0
# 基准收益
bench_return = result_df['bench_return'].iloc[-1] - 1
bench_cagr = (1 + bench_return) ** (365 / days) - 1 if days > 0 else 0
# 调仓次数
n_trades = len(signals)
years = days / 365
# Calmar比率
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
print(f"\n{'='*70}")
print(f" 绩效评估报告")
print(f"{'='*70}")
print(f" 回测区间: {result_df.index[0].strftime('%Y-%m-%d')} ~ {result_df.index[-1].strftime('%Y-%m-%d')}")
print(f" 交易天数: {len(result_df)}")
print(f"{''*70}")
print(f" {'指标':<30s} {'动量策略':>12s} {'基准(沪深300)':>14s}")
print(f"{''*70}")
print(f" {'累计收益':<28s} {total_return:>11.2%} {bench_return:>13.2%}")
print(f" {'CAGR(年化)':<27s} {cagr:>11.2%} {bench_cagr:>13.2%}")
print(f" {'年化夏普比率':<26s} {sharpe:>11.2f} {'--':>13s}")
print(f" {'最大回撤':<28s} {max_dd:>11.2%} {'--':>13s}")
print(f" {'Calmar比率':<27s} {calmar:>11.2f} {'--':>13s}")
print(f" {'日胜率':<28s} {win_rate:>11.2%} {'--':>13s}")
print(f" {'调仓次数':<28s} {n_trades:>9d}{'--':>13s}")
if years > 0:
print(f" {'年均调仓':<28s} {n_trades/years:>9.1f}{'--':>13s}")
print(f" {'最大回撤区间':<26s} {dd_start.strftime('%Y-%m-%d')} ~ {dd_end.strftime('%Y-%m-%d')}")
print(f"{'='*70}")
# 最新持仓信号
last_row = result_df.iloc[-1]
print(f"\n 最新持仓: {last_row['holding']}", end="")
if last_row['holding'] != '空仓':
pool = config['etf_pool'] if isinstance(config['etf_pool'], dict) else {}
name = pool.get(last_row['holding'], '')
print(f" ({name})", end="")
print(f"\n 最新净值: {last_row['nav']:.4f}")
# ==================== 年度收益统计 ====================
def print_yearly_returns(result_df: pd.DataFrame):
"""按年统计收益"""
nav = result_df['nav']
bench = result_df['bench_return']
# 按年分组
yearly_data = []
for year, group in result_df.groupby(result_df.index.year):
year_nav = group['nav']
year_ret = year_nav.iloc[-1] / year_nav.iloc[0] - 1
year_bench = group['bench_return']
bench_ret = year_bench.iloc[-1] / year_bench.iloc[0] - 1
# 年内最大回撤
peak = year_nav.cummax()
dd = (year_nav - peak) / peak
max_dd = dd.min()
# 年内夏普
daily_rets = group['daily_return']
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
# 超额收益
excess = year_ret - bench_ret
yearly_data.append({
'year': year,
'return': year_ret,
'bench_return': bench_ret,
'excess': excess,
'max_dd': max_dd,
'sharpe': sharpe,
'trade_days': len(group),
})
print(f"\n{'='*90}")
print(f" 年度收益统计")
print(f"{'='*90}")
print(f" {'年份':<6s} {'策略收益':>10s} {'基准收益':>10s} {'超额收益':>10s} {'最大回撤':>10s} {'夏普比率':>10s} {'交易天数':>10s}")
print(f"{''*90}")
for d in yearly_data:
print(f" {d['year']:<6d} {d['return']:>9.2%} {d['bench_return']:>9.2%} {d['excess']:>9.2%} {d['max_dd']:>9.2%} {d['sharpe']:>9.2f} {d['trade_days']:>8d}")
print(f"{''*90}")
# 汇总
total_ret = nav.iloc[-1] / nav.iloc[0] - 1
total_bench = bench.iloc[-1] / bench.iloc[0] - 1
win_years = sum(1 for d in yearly_data if d['return'] > 0)
beat_years = sum(1 for d in yearly_data if d['excess'] > 0)
total_years = len(yearly_data)
print(f" {'合计':<6s} {total_ret:>9.2%} {total_bench:>9.2%} {total_ret - total_bench:>9.2%}")
print(f" 盈利年份: {win_years}/{total_years} | 跑赢基准年份: {beat_years}/{total_years}")
print(f"{'='*90}")
# ==================== 图表生成 ====================
def save_chart(result_df: pd.DataFrame, config: dict):
"""生成净值曲线图"""
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), height_ratios=[3, 1],
gridspec_kw={'hspace': 0.3})
# 净值曲线
ax1.plot(result_df.index, result_df['nav'], label='动量策略', linewidth=1.5, color='#e74c3c')
ax1.plot(result_df.index, result_df['bench_return'], label='沪深300', linewidth=1, color='#95a5a6')
ax1.set_title('ETF动量轮动策略 净值曲线', fontsize=14, fontweight='bold')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
ax1.set_ylabel('净值')
# 回撤曲线
peak = result_df['nav'].cummax()
drawdown = (result_df['nav'] - peak) / peak
ax2.fill_between(result_df.index, drawdown, 0, alpha=0.4, color='#e74c3c')
ax2.set_title('回撤', fontsize=12)
ax2.set_ylabel('回撤')
ax2.grid(True, alpha=0.3)
chart_path = Path(__file__).parent / 'results' / 'momentum_chart.png'
chart_path.parent.mkdir(exist_ok=True)
fig.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close(fig)
print(f"\n报告图表已保存: {chart_path}")
except Exception as e:
print(f"\n图表生成失败: {e}")
# ==================== 主入口 ====================
if __name__ == "__main__":
run_backtest(CONFIG)