""" ETF动量轮动策略 - 本地回测版本 原始策略来源:聚宽 https://www.joinquant.com/post/1399 核心逻辑: 1. 加权线性回归(权重1→2递增)计算趋势得分 2. score = 年化收益率 × R² 3. ATR动态调整回看窗口(20~60天) 4. 崩盘过滤:连续3天任一天跌>5%则得分归零 5. 溢价过滤:溢价率≥5%则降权 6. 全仓单一品种轮动 """ import sys import math import warnings from pathlib import Path from datetime import datetime import numpy as np import pandas as pd warnings.filterwarnings("ignore") # 添加项目根目录 sys.path.insert(0, str(Path(__file__).parent)) from dotenv import load_dotenv load_dotenv() # ==================== 策略配置 ==================== CONFIG = { # 候选ETF池: # - dict: 手动指定 {ts_code: name} # - 'auto': 使用动态筛选引擎自动构建 # - 'latest': 加载最近一次构建结果 # - 'dynamic': 回测中定期重建,无前视偏差 'etf_pool': 'dynamic', 'rebuild_interval': 60, # 动态池重建间隔(交易日) 'target_num': 1, # 持仓数量 'auto_day': True, # 是否启用动态周期 'fixed_days': 25, # 固定回看天数 'min_days': 20, # 动态周期最小值 'max_days': 60, # 动态周期最大值 'premium_threshold': 5.0, # 溢价率阈值(%) 'trade_cost': 0.001, # 单次交易成本(双边) 'start_date': '2015-01-01', 'benchmark': '000300.SH', # 基准:沪深300 } # ==================== 数据获取 ==================== def fetch_all_etf_data(etf_codes: list, start_date: str, end_date: str, etf_pool: dict = None) -> dict: """使用Tushare获取所有ETF的OHLCV数据""" import os import tushare as ts token = os.getenv("TUSHARE_TOKEN") if not token: raise ValueError("请设置环境变量 TUSHARE_TOKEN") pro = ts.pro_api(token) # 需要额外前置数据用于ATR计算 pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d') end_str = end_date.replace('-', '') pool_names = etf_pool or {} all_data = {} for code in etf_codes: print(f" 下载 {code} ({pool_names.get(code, '')})...", end=" ") try: df = pro.fund_daily( ts_code=code, start_date=pre_start, end_date=end_str, ) if df is None or df.empty: print("✗ 无数据") continue df = df.rename(columns={'trade_date': 'date', 'vol': 'volume'}) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() df = df[['open', 'high', 'low', 'close', 'volume']].astype(float) all_data[code] = df print(f"✓ {len(df)} 条") except Exception as e: print(f"✗ {e}") return all_data def fetch_etf_nav_data(etf_codes: list, start_date: str, end_date: str) -> dict: """获取ETF净值数据(用于溢价率计算)""" import os import tushare as ts token = os.getenv("TUSHARE_TOKEN") pro = ts.pro_api(token) pre_start = (pd.Timestamp(start_date) - pd.Timedelta(days=120)).strftime('%Y%m%d') end_str = end_date.replace('-', '') nav_data = {} for code in etf_codes: try: df = pro.fund_nav( ts_code=code, start_date=pre_start, end_date=end_str, ) if df is not None and not df.empty: df = df.rename(columns={'nav_date': 'date', 'unit_nav': 'nav'}) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() nav_data[code] = df[['nav']].astype(float) except Exception: pass return nav_data # ==================== ATR计算 ==================== def calc_atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: """计算ATR(不依赖talib)""" prev_close = close.shift(1) tr = pd.concat([ high - low, (high - prev_close).abs(), (low - prev_close).abs(), ], axis=1).max(axis=1) return tr.rolling(window=period, min_periods=period).mean() # ==================== 核心得分计算 ==================== def calc_weighted_momentum_score(prices: np.ndarray) -> dict: """ 加权线性回归动量得分 Args: prices: 价格数组(含当日价格) Returns: {'annualized_returns': float, 'r2': float, 'score': float} """ if len(prices) < 5: return {'annualized_returns': 0, 'r2': 0, 'score': 0} y = np.log(prices) x = np.arange(len(y)) weights = np.linspace(1, 2, len(y)) # 近期权重更高 # 加权线性回归 slope, intercept = np.polyfit(x, y, 1, w=weights) annualized_returns = math.exp(slope * 250) - 1 # 加权R² y_pred = slope * x + intercept ss_res = np.sum(weights * (y - y_pred) ** 2) ss_tot = np.sum(weights * (y - np.average(y, weights=weights)) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 score = annualized_returns * r2 return {'annualized_returns': annualized_returns, 'r2': r2, 'score': score} def apply_crash_filter(prices: np.ndarray, score: float) -> float: """崩盘过滤:连续3天有任一天跌>5%""" if len(prices) < 4: return score r1 = prices[-1] / prices[-2] r2 = prices[-2] / prices[-3] r3 = prices[-3] / prices[-4] # 条件1:任一天跌>5% con1 = min(r1, r2, r3) < 0.95 # 条件2:连续下跌且累计跌>5% con2 = (r1 < 1) and (r2 < 1) and (r3 < 1) and (prices[-1] / prices[-4] < 0.95) if con1 or con2: return 0.0 return score def calc_premium_rate(etf_price: float, nav: float) -> float: """计算溢价率(%)""" if nav is None or nav == 0 or np.isnan(nav): return 0.0 return (etf_price - nav) / nav * 100 # ==================== 回测引擎 ==================== def resolve_etf_pool(config: dict, ref_date: str = None, data_cache=None) -> dict: """ 解析ETF池配置: - dict: 直接返回 - 'auto': 调用筛选引擎构建 - 'latest': 加载最近一次构建结果 - 'dynamic': 用缓存数据在指定日期重建(无前视偏差) """ pool = config['etf_pool'] if isinstance(pool, dict): return pool from scripts.build_etf_universe import build_universe, load_latest_universe if pool == 'latest': print("加载最近一次构建的动态ETF池...") return load_latest_universe() elif pool == 'auto': print("使用筛选引擎构建动态ETF池...") return build_universe() elif pool == 'dynamic': if data_cache is None: from scripts.etf_data_cache import ETFDataCache data_cache = ETFDataCache() date_str = ref_date or datetime.now().strftime('%Y%m%d') return build_universe(ref_date=date_str, data_cache=data_cache) else: raise ValueError(f"不支持的 etf_pool 配置: {pool}") def run_backtest(config: dict): """执行回测""" end_date = datetime.now().strftime('%Y-%m-%d') pool_mode = config['etf_pool'] if isinstance(config['etf_pool'], str) else '手动指定' is_dynamic = (pool_mode == 'dynamic') # 动态模式: 初始化缓存 data_cache = None if is_dynamic: from scripts.etf_data_cache import ETFDataCache data_cache = ETFDataCache() print("动态重建模式: 使用本地缓存数据,无前视偏差") print(f" 重建间隔: {config['rebuild_interval']} 交易日") # 解析初始 ETF 池 # 动态模式下用 start_date 作为初始重建日期 init_ref_date = config['start_date'].replace('-', '') if is_dynamic else None etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache) etf_codes = list(etf_pool.keys()) print("=" * 60) print(" ETF动量轮动策略 - 本地回测") print("=" * 60) print(f" ETF池模式: {pool_mode}") print(f" 候选ETF: {len(etf_codes)} 只") for code, name in etf_pool.items(): print(f" {code} {name}") print(f" 持仓数量: {config['target_num']}") print(f" 动态周期: {'开启' if config['auto_day'] else '关闭'}") if config['auto_day']: print(f" 回看范围: {config['min_days']}~{config['max_days']} 天") else: print(f" 固定回看: {config['fixed_days']} 天") print(f" 回测区间: {config['start_date']} ~ {end_date}") # 1. 获取数据 print(f"\n{'='*60}") if data_cache is not None: print("从本地缓存加载ETF价格数据...") all_data = {} for code in etf_codes: ohlcv = data_cache.load_cached_ohlcv(code) if not ohlcv.empty: all_data[code] = ohlcv print(f" 加载完成: {len(all_data)} 只") nav_data = {} # 动态模式下暂不用净值数据 else: print("下载ETF价格数据...") all_data = fetch_all_etf_data(etf_codes, config['start_date'], end_date, etf_pool) print("\n下载ETF净值数据...") nav_data = fetch_etf_nav_data(etf_codes, config['start_date'], end_date) print(f" 净值数据: {len(nav_data)} 只") if not all_data: print("无数据,退出") return # 2. 构建交易日历(以A股交易日为准) all_dates = set() for df in all_data.values(): all_dates.update(df.index.tolist()) trade_dates = sorted(all_dates) trade_dates = [d for d in trade_dates if d >= pd.Timestamp(config['start_date'])] print(f"\n交易日数: {len(trade_dates)}") print(f"区间: {trade_dates[0].strftime('%Y-%m-%d')} ~ {trade_dates[-1].strftime('%Y-%m-%d')}") # 3. 逐日回测 print(f"\n{'='*60}") print("开始回测...") print("=" * 60) max_lookback = config['max_days'] + 10 holding = None # 当前持仓ETF代码 daily_returns = [] # 每日收益率 signals = [] # 信号记录 last_rebuild_i = -config['rebuild_interval'] # 确保第一天就重建 for i, today in enumerate(trade_dates): # 动态重建 ETF 池 if is_dynamic and (i - last_rebuild_i >= config['rebuild_interval']): ref_str = today.strftime('%Y%m%d') print(f"\n [重建] {ref_str}: 重新构建ETF池...") try: new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache) etf_codes = list(new_pool.keys()) # 加载新增 ETF 的数据 for code in etf_codes: if code not in all_data and data_cache is not None: ohlcv = data_cache.load_cached_ohlcv(code) if not ohlcv.empty: all_data[code] = ohlcv print(f" [重建] 新池子: {len(etf_codes)} 只") last_rebuild_i = i except Exception as e: print(f" [重建] 失败: {e},继续使用旧池") # 计算每只ETF的得分 scores = {} score_details = {} for code in etf_codes: if code not in all_data: continue df = all_data[code] # 获取截至今日的历史数据 hist = df[df.index <= today].tail(max_lookback + 1) if len(hist) < config['min_days']: continue close_arr = hist['close'].values if config['auto_day']: # 动态周期:基于ATR波动率调整 if len(hist) < max_lookback: lookback = config['fixed_days'] else: long_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['max_days']) short_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['min_days']) la = long_atr.iloc[-1] sa = short_atr.iloc[-1] if la > 0 and not np.isnan(la) and not np.isnan(sa): ratio = min(0.9, sa / la) lookback = int(config['min_days'] + (config['max_days'] - config['min_days']) * (1 - ratio)) else: lookback = config['fixed_days'] prices = close_arr[-lookback:] else: prices = close_arr[-config['fixed_days']:] if len(prices) < 5: continue # 计算得分 result = calc_weighted_momentum_score(prices) score = result['score'] # 崩盘过滤 score = apply_crash_filter(close_arr, score) # 溢价过滤 if code in nav_data: nav_df = nav_data[code] nav_row = nav_df[nav_df.index <= today] if not nav_row.empty: nav_val = nav_row.iloc[-1]['nav'] etf_price = close_arr[-1] premium = calc_premium_rate(etf_price, nav_val) if premium >= config['premium_threshold']: score -= 1 # 只保留有效得分 (0 < score < 6) if 0 < score < 6: scores[code] = score score_details[code] = result # 选出排名最高的标的 if scores: ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True) target = ranked[0][0] # target_num=1 else: target = None # 计算当日收益 if holding is not None and holding in all_data: df_h = all_data[holding] if today in df_h.index: prev_dates = df_h[df_h.index < today].index if len(prev_dates) > 0: prev_date = prev_dates[-1] prev_price = df_h.loc[prev_date, 'close'] today_price = df_h.loc[today, 'close'] daily_ret = today_price / prev_price - 1 else: daily_ret = 0.0 else: daily_ret = 0.0 else: daily_ret = 0.0 # 调仓成本 trade_cost = 0.0 if target != holding: trade_cost = config['trade_cost'] if holding is not None: signals.append({ 'date': today, 'action': '调仓', 'from': holding, 'to': target or '空仓', 'score': scores.get(target, 0) if target else 0, }) holding = target daily_returns.append({ 'date': today, 'daily_return': daily_ret - trade_cost if trade_cost > 0 else daily_ret, 'holding': holding or '空仓', }) # 4. 计算绩效 result_df = pd.DataFrame(daily_returns).set_index('date') result_df['nav'] = (1 + result_df['daily_return']).cumprod() # 基准数据 benchmark_code = config['benchmark'] print(f"\n获取基准数据 {benchmark_code}...") import os, tushare as ts pro = ts.pro_api(os.getenv("TUSHARE_TOKEN")) bench_df = pro.index_daily( ts_code=benchmark_code, start_date=config['start_date'].replace('-', ''), end_date=end_date.replace('-', ''), ) if bench_df is not None and not bench_df.empty: bench_df['date'] = pd.to_datetime(bench_df['trade_date']) bench_df = bench_df.set_index('date').sort_index() bench_close = bench_df['close'].reindex(result_df.index, method='ffill') result_df['bench_return'] = bench_close / bench_close.iloc[0] else: result_df['bench_return'] = 1.0 # 5. 输出绩效报告 print_performance(result_df, signals, config) # 6. 年度收益统计 print_yearly_returns(result_df) # 7. 生成图表 save_chart(result_df, config) return result_df # ==================== 绩效报告 ==================== def print_performance(result_df: pd.DataFrame, signals: list, config: dict): """打印绩效报告""" nav = result_df['nav'] total_return = nav.iloc[-1] / nav.iloc[0] - 1 # 年化收益 days = (result_df.index[-1] - result_df.index[0]).days cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 # 夏普比率 daily_rets = result_df['daily_return'] sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 # 最大回撤 peak = nav.cummax() drawdown = (nav - peak) / peak max_dd = drawdown.min() dd_end = drawdown.idxmin() dd_start = nav[:dd_end].idxmax() # 日胜率 win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0 # 基准收益 bench_return = result_df['bench_return'].iloc[-1] - 1 bench_cagr = (1 + bench_return) ** (365 / days) - 1 if days > 0 else 0 # 调仓次数 n_trades = len(signals) years = days / 365 # Calmar比率 calmar = cagr / abs(max_dd) if max_dd != 0 else 0 print(f"\n{'='*70}") print(f" 绩效评估报告") print(f"{'='*70}") print(f" 回测区间: {result_df.index[0].strftime('%Y-%m-%d')} ~ {result_df.index[-1].strftime('%Y-%m-%d')}") print(f" 交易天数: {len(result_df)}") print(f"{'─'*70}") print(f" {'指标':<30s} {'动量策略':>12s} {'基准(沪深300)':>14s}") print(f"{'─'*70}") print(f" {'累计收益':<28s} {total_return:>11.2%} {bench_return:>13.2%}") print(f" {'CAGR(年化)':<27s} {cagr:>11.2%} {bench_cagr:>13.2%}") print(f" {'年化夏普比率':<26s} {sharpe:>11.2f} {'--':>13s}") print(f" {'最大回撤':<28s} {max_dd:>11.2%} {'--':>13s}") print(f" {'Calmar比率':<27s} {calmar:>11.2f} {'--':>13s}") print(f" {'日胜率':<28s} {win_rate:>11.2%} {'--':>13s}") print(f" {'调仓次数':<28s} {n_trades:>9d}次 {'--':>13s}") if years > 0: print(f" {'年均调仓':<28s} {n_trades/years:>9.1f}次 {'--':>13s}") print(f" {'最大回撤区间':<26s} {dd_start.strftime('%Y-%m-%d')} ~ {dd_end.strftime('%Y-%m-%d')}") print(f"{'='*70}") # 最新持仓信号 last_row = result_df.iloc[-1] print(f"\n 最新持仓: {last_row['holding']}", end="") if last_row['holding'] != '空仓': pool = config['etf_pool'] if isinstance(config['etf_pool'], dict) else {} name = pool.get(last_row['holding'], '') print(f" ({name})", end="") print(f"\n 最新净值: {last_row['nav']:.4f}") # ==================== 年度收益统计 ==================== def print_yearly_returns(result_df: pd.DataFrame): """按年统计收益""" nav = result_df['nav'] bench = result_df['bench_return'] # 按年分组 yearly_data = [] for year, group in result_df.groupby(result_df.index.year): year_nav = group['nav'] year_ret = year_nav.iloc[-1] / year_nav.iloc[0] - 1 year_bench = group['bench_return'] bench_ret = year_bench.iloc[-1] / year_bench.iloc[0] - 1 # 年内最大回撤 peak = year_nav.cummax() dd = (year_nav - peak) / peak max_dd = dd.min() # 年内夏普 daily_rets = group['daily_return'] sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 # 超额收益 excess = year_ret - bench_ret yearly_data.append({ 'year': year, 'return': year_ret, 'bench_return': bench_ret, 'excess': excess, 'max_dd': max_dd, 'sharpe': sharpe, 'trade_days': len(group), }) print(f"\n{'='*90}") print(f" 年度收益统计") print(f"{'='*90}") print(f" {'年份':<6s} {'策略收益':>10s} {'基准收益':>10s} {'超额收益':>10s} {'最大回撤':>10s} {'夏普比率':>10s} {'交易天数':>10s}") print(f"{'─'*90}") for d in yearly_data: print(f" {d['year']:<6d} {d['return']:>9.2%} {d['bench_return']:>9.2%} {d['excess']:>9.2%} {d['max_dd']:>9.2%} {d['sharpe']:>9.2f} {d['trade_days']:>8d}") print(f"{'─'*90}") # 汇总 total_ret = nav.iloc[-1] / nav.iloc[0] - 1 total_bench = bench.iloc[-1] / bench.iloc[0] - 1 win_years = sum(1 for d in yearly_data if d['return'] > 0) beat_years = sum(1 for d in yearly_data if d['excess'] > 0) total_years = len(yearly_data) print(f" {'合计':<6s} {total_ret:>9.2%} {total_bench:>9.2%} {total_ret - total_bench:>9.2%}") print(f" 盈利年份: {win_years}/{total_years} | 跑赢基准年份: {beat_years}/{total_years}") print(f"{'='*90}") # ==================== 图表生成 ==================== def save_chart(result_df: pd.DataFrame, config: dict): """生成净值曲线图""" try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans'] matplotlib.rcParams['axes.unicode_minus'] = False fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), height_ratios=[3, 1], gridspec_kw={'hspace': 0.3}) # 净值曲线 ax1.plot(result_df.index, result_df['nav'], label='动量策略', linewidth=1.5, color='#e74c3c') ax1.plot(result_df.index, result_df['bench_return'], label='沪深300', linewidth=1, color='#95a5a6') ax1.set_title('ETF动量轮动策略 净值曲线', fontsize=14, fontweight='bold') ax1.legend(loc='upper left') ax1.grid(True, alpha=0.3) ax1.set_ylabel('净值') # 回撤曲线 peak = result_df['nav'].cummax() drawdown = (result_df['nav'] - peak) / peak ax2.fill_between(result_df.index, drawdown, 0, alpha=0.4, color='#e74c3c') ax2.set_title('回撤', fontsize=12) ax2.set_ylabel('回撤') ax2.grid(True, alpha=0.3) chart_path = Path(__file__).parent / 'results' / 'momentum_chart.png' chart_path.parent.mkdir(exist_ok=True) fig.savefig(chart_path, dpi=150, bbox_inches='tight') plt.close(fig) print(f"\n报告图表已保存: {chart_path}") except Exception as e: print(f"\n图表生成失败: {e}") # ==================== 主入口 ==================== if __name__ == "__main__": run_backtest(CONFIG)