""" 全市场44只ETF Top 3 等权轮动回测 标的池来源:etf_rotation_deep_analysis.md """ import sys import math import warnings from pathlib import Path from datetime import datetime import numpy as np import pandas as pd warnings.filterwarnings("ignore") sys.path.insert(0, str(Path(__file__).parent.parent)) from dotenv import load_dotenv load_dotenv() from 动量 import ( fetch_all_etf_data, fetch_etf_nav_data, calc_atr, calc_weighted_momentum_score, apply_crash_filter, calc_premium_rate, print_performance, print_yearly_returns, ) # ==================== 资产配置池 (9个精选 + 恒生科技 + 恒生指数) ==================== FULL_POOL = { '513100.SH': '纳指100ETF', '513520.SH': '日经225ETF', '513030.SH': '德国DAX ETF', '518880.SH': '黄金ETF', '159980.SZ': '有色金属ETF', '160723.SZ': '嘉实原油LOF', '511090.SH': '30年国债ETF', '512890.SH': '红利低波ETF', '159915.SZ': '创业板ETF', '513130.SH': '恒生科技ETF', '159920.SZ': '恒生ETF', } # ==================== 资产大类映射 ==================== ETF_CATEGORIES = { '513100.SH': '美股', '513520.SH': '日本', '513030.SH': '欧洲', '518880.SH': '商品', '159980.SZ': '商品', '160723.SZ': '商品', '511090.SH': '固收', '512890.SH': 'A股主题', '159915.SZ': 'A股宽基', '513130.SH': '港股', '159920.SZ': '港股', } CONFIG = { 'etf_pool': FULL_POOL, 'target_num': 3, # 持仓数量 'auto_day': True, # 是否启用动态周期 'fixed_days': 25, # 固定回看天数 'min_days': 20, # 动态周期最小值 'max_days': 60, # 动态周期最大值 'premium_threshold': 5.0, # 溢价率阈值(%) 'trade_cost': 0.001, # 单次交易成本(双边) 'start_date': '2019-01-01', 'benchmark': '000300.SH', # 基准:沪深300 } def run_full_backtest(config: dict): """执行全市场回测""" end_date = datetime.now().strftime('%Y-%m-%d') etf_pool = config['etf_pool'] etf_codes = list(etf_pool.keys()) print("=" * 60) print(" 全市场ETF轮动策略 - Top 3 等权回测") print("=" * 60) print(f" 候选ETF: {len(etf_codes)} 只") print(f" 持仓数量: {config['target_num']}") print(f" 回测区间: {config['start_date']} ~ {end_date}") # 1. 获取数据 (使用缓存加速) from scripts.etf_data_cache import ETFDataCache data_cache = ETFDataCache() print(f"\n{'='*60}") print("加载数据...") all_data = {} for code in etf_codes: df = data_cache.load_cached_ohlcv(code) if not df.empty: all_data[code] = df print(f" 加载完成: {len(all_data)} 只价格数据") # 2. 构建交易日历 all_dates = set() for df in all_data.values(): all_dates.update(df.index.tolist()) trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date'])) print(f" 交易日数: {len(trade_dates)}") # 3. 逐日回测 print(f"\n{'='*60}") print("开始回测...") max_lookback = config['max_days'] + 10 holdings = {} # {code: weight} daily_returns = [] signals = [] for i, today in enumerate(trade_dates): # 计算得分 scores = {} for code in etf_codes: if code not in all_data: continue df = all_data[code] hist = df[df.index <= today].tail(max_lookback + 1) if len(hist) < config['min_days']: continue close_arr = hist['close'].values # 动态周期 if config['auto_day'] and len(hist) >= max_lookback: long_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['max_days']).iloc[-1] short_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['min_days']).iloc[-1] if long_atr > 0: ratio = min(0.9, short_atr / long_atr) lookback = int(config['min_days'] + (config['max_days'] - config['min_days']) * (1 - ratio)) else: lookback = config['fixed_days'] else: lookback = config['fixed_days'] prices = close_arr[-lookback:] if len(prices) < 5: continue result = calc_weighted_momentum_score(prices) score = result['score'] score = apply_crash_filter(close_arr, score) if 0 < score < 6: scores[code] = score # 选出排名最高的 3 只 (跨大类 Top 1 逻辑) if scores: ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True) # 1. 每个大类只保留最高分的那一个 category_best = {} # {category: (code, score)} for code, score in ranked: cat = ETF_CATEGORIES.get(code, '未知') if cat not in category_best: category_best[cat] = (code, score) # 2. 对所有大类的 Top 1 进行排序,选前 3 个大类 sorted_categories = sorted(category_best.values(), key=lambda x: x[1], reverse=True) targets = [code for code, score in sorted_categories[:config['target_num']]] new_holdings = {c: 1.0/len(targets) for c in targets} else: new_holdings = {} # 计算收益 port_ret = 0.0 for code, weight in holdings.items(): df_h = all_data[code] if today in df_h.index: prev_dates = df_h[df_h.index < today].index if len(prev_dates) > 0: prev_price = df_h.loc[prev_dates[-1], 'close'] port_ret += weight * (df_h.loc[today, 'close'] / prev_price - 1) # 调仓成本 old_set, new_set = set(holdings.keys()), set(new_holdings.keys()) trade_cost = 0.0 if old_set != new_set: turnover = sum(holdings[c] for c in old_set - new_set) + sum(new_holdings[c] for c in new_set - old_set) trade_cost = turnover * config['trade_cost'] / 2 signals.append({'date': today, 'holdings': list(new_holdings.keys())}) holdings = new_holdings daily_returns.append({ 'date': today, 'daily_return': port_ret - trade_cost, 'holding': ", ".join(holdings.keys()) if holdings else "空仓" }) # 4. 计算绩效 result_df = pd.DataFrame(daily_returns).set_index('date') result_df['nav'] = (1 + result_df['daily_return']).cumprod() # 基准 import os, tushare as ts pro = ts.pro_api(os.getenv("TUSHARE_TOKEN")) bench_df = pro.index_daily(ts_code=config['benchmark'], start_date=config['start_date'].replace('-', ''), end_date=end_date.replace('-', '')) if bench_df is not None and not bench_df.empty: bench_df['date'] = pd.to_datetime(bench_df['trade_date']) bench_df = bench_df.set_index('date').sort_index() result_df['bench_return'] = bench_df['close'].reindex(result_df.index, method='ffill') / bench_df['close'].iloc[0] else: result_df['bench_return'] = 1.0 print_performance(result_df, signals, config) print_yearly_returns(result_df) # 保存图表 save_chart(result_df) def save_chart(result_df): try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans'] matplotlib.rcParams['axes.unicode_minus'] = False fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), height_ratios=[3, 1], gridspec_kw={'hspace': 0.3}) ax1.plot(result_df.index, result_df['nav'], label='全市场Top3等权', color='#2ecc71') ax1.plot(result_df.index, result_df['bench_return'], label='沪深300', color='#95a5a6') ax1.set_yscale('log') ax1.legend() ax1.grid(True, alpha=0.3) peak = result_df['nav'].cummax() ax2.fill_between(result_df.index, (result_df['nav'] - peak) / peak, 0, color='#e74c3c', alpha=0.4) plt.savefig(Path(__file__).parent.parent / 'results' / 'full_pool_top3_chart.png') print(f"图表已保存到 results/full_pool_top3_chart.png") except Exception as e: print(f"图表生成失败: {e}") if __name__ == "__main__": run_full_backtest(CONFIG)