""" 动量策略多持仓对比实验 对比 6 种配置: 全仓1只 / 等权3只 / 反波动率3只 / 等权5只 / 反波动率5只 / 动量>0全选等权 支持 dynamic 模式: 回测中定期重建ETF池,消除前视偏差 """ import sys import math import warnings from pathlib import Path from datetime import datetime import numpy as np import pandas as pd warnings.filterwarnings("ignore") sys.path.insert(0, str(Path(__file__).parent.parent)) from dotenv import load_dotenv load_dotenv() # ==================== 复用动量.py的核心函数 ==================== from 动量 import ( fetch_all_etf_data, fetch_etf_nav_data, calc_atr, calc_weighted_momentum_score, apply_crash_filter, calc_premium_rate, resolve_etf_pool, ) # ==================== 权重计算 ==================== def calc_equal_weights(codes: list) -> dict: """等权""" w = 1.0 / len(codes) return {c: w for c in codes} def calc_inv_vol_weights(codes: list, all_data: dict, today, lookback: int = 20) -> dict: """反波动率加权: 权重 ∝ 1/σ""" vols = {} for c in codes: if c not in all_data: continue df = all_data[c] hist = df[df.index <= today].tail(lookback + 1) if len(hist) < 10: vols[c] = 1.0 # fallback continue ret = hist['close'].pct_change().dropna() vol = ret.std() vols[c] = vol if vol > 0 else 1e-6 if not vols: return calc_equal_weights(codes) inv_vols = {c: 1.0 / v for c, v in vols.items()} total = sum(inv_vols.values()) return {c: iv / total for c, iv in inv_vols.items()} # ==================== 多持仓回测引擎 ==================== def run_multi_backtest(config: dict, all_data: dict, nav_data: dict, trade_dates: list, etf_codes: list, target_num: int = 1, weight_mode: str = 'equal', label: str = '', data_cache=None, rebuild_interval: int = 0) -> dict: """ 多持仓回测 Args: target_num: 同时持有数量 weight_mode: 'equal' 等权 | 'inv_vol' 反波动率 label: 实验标签 data_cache: ETFDataCache 实例(动态重建模式) rebuild_interval: 重建间隔(交易日),0=不重建 Returns: dict: 绩效指标 """ max_lookback = config['max_days'] + 10 holdings = {} # {code: weight} daily_returns = [] n_trades = 0 last_rebuild_i = -rebuild_interval if rebuild_interval > 0 else 0 current_codes = list(etf_codes) # 当前活跃的候选池 for i, today in enumerate(trade_dates): # 动态重建 ETF 池 if rebuild_interval > 0 and data_cache is not None and (i - last_rebuild_i >= rebuild_interval): ref_str = today.strftime('%Y%m%d') try: new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache) current_codes = list(new_pool.keys()) # 加载新增 ETF 数据 for code in current_codes: if code not in all_data: ohlcv = data_cache.load_cached_ohlcv(code) if not ohlcv.empty: all_data[code] = ohlcv last_rebuild_i = i except Exception: pass # 1. 计算每只 ETF 的得分 (使用当前活跃池) scores = {} for code in current_codes: if code not in all_data: continue df = all_data[code] hist = df[df.index <= today].tail(max_lookback + 1) if len(hist) < config['min_days']: continue close_arr = hist['close'].values if config['auto_day']: if len(hist) < max_lookback: lookback = config['fixed_days'] else: long_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['max_days']) short_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['min_days']) la = long_atr.iloc[-1] sa = short_atr.iloc[-1] if la > 0 and not np.isnan(la) and not np.isnan(sa): ratio = min(0.9, sa / la) lookback = int(config['min_days'] + (config['max_days'] - config['min_days']) * (1 - ratio)) else: lookback = config['fixed_days'] prices = close_arr[-lookback:] else: prices = close_arr[-config['fixed_days']:] if len(prices) < 5: continue result = calc_weighted_momentum_score(prices) score = result['score'] score = apply_crash_filter(close_arr, score) if code in nav_data: nav_df = nav_data[code] nav_row = nav_df[nav_df.index <= today] if not nav_row.empty: nav_val = nav_row.iloc[-1]['nav'] etf_price = close_arr[-1] premium = calc_premium_rate(etf_price, nav_val) if premium >= config['premium_threshold']: score -= 1 if 0 < score < 6: scores[code] = score # 2. 选出 top N (或全部正动量) if scores: ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True) if target_num == 'all_positive': targets = [c for c, s in ranked] # scores 已过滤 >0 else: targets = [c for c, _ in ranked[:target_num]] else: targets = [] # 3. 计算权重 if targets: if weight_mode == 'inv_vol': new_weights = calc_inv_vol_weights(targets, all_data, today) else: new_weights = calc_equal_weights(targets) else: new_weights = {} # 4. 计算当日组合收益 port_ret = 0.0 for code, weight in holdings.items(): if code not in all_data: continue df_h = all_data[code] if today in df_h.index: prev_dates = df_h[df_h.index < today].index if len(prev_dates) > 0: prev_price = df_h.loc[prev_dates[-1], 'close'] today_price = df_h.loc[today, 'close'] port_ret += weight * (today_price / prev_price - 1) # 5. 调仓判断 old_set = set(holdings.keys()) new_set = set(new_weights.keys()) if old_set != new_set: # 换手成本: 按换手比例收取 turnover = 0.0 for c in old_set - new_set: turnover += holdings[c] for c in new_set - old_set: turnover += new_weights[c] for c in old_set & new_set: turnover += abs(new_weights[c] - holdings[c]) trade_cost = turnover * config['trade_cost'] / 2 # 单边已含在trade_cost中 n_trades += 1 else: trade_cost = 0.0 holdings = new_weights daily_returns.append({ 'date': today, 'daily_return': port_ret - trade_cost, }) # 计算绩效 result_df = pd.DataFrame(daily_returns).set_index('date') result_df['nav'] = (1 + result_df['daily_return']).cumprod() nav = result_df['nav'] total_return = nav.iloc[-1] / nav.iloc[0] - 1 days = (result_df.index[-1] - result_df.index[0]).days cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 daily_rets = result_df['daily_return'] sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0 peak = nav.cummax() drawdown = (nav - peak) / peak max_dd = drawdown.min() calmar = cagr / abs(max_dd) if max_dd != 0 else 0 win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0 years = days / 365 # 年度统计 win_years = 0 total_years = 0 for year, group in result_df.groupby(result_df.index.year): yr = group['nav'] yr_ret = yr.iloc[-1] / yr.iloc[0] - 1 total_years += 1 if yr_ret > 0: win_years += 1 return { 'label': label, 'target_num': target_num, 'weight_mode': weight_mode, 'total_return': total_return, 'cagr': cagr, 'sharpe': sharpe, 'max_dd': max_dd, 'calmar': calmar, 'win_rate': win_rate, 'n_trades': n_trades, 'trades_per_year': n_trades / years if years > 0 else 0, 'win_years': f"{win_years}/{total_years}", 'result_df': result_df, } # ==================== 主函数 ==================== def main(): from 动量 import CONFIG config = CONFIG.copy() # 强制使用 dynamic 模式 config['etf_pool'] = 'dynamic' rebuild_interval = config.get('rebuild_interval', 60) # 初始化缓存 from scripts.etf_data_cache import ETFDataCache data_cache = ETFDataCache() # 用 start_date 作为初始重建日期 init_ref_date = config['start_date'].replace('-', '') etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache) etf_codes = list(etf_pool.keys()) end_date = datetime.now().strftime('%Y-%m-%d') print("=" * 70) print(" 动量策略多持仓对比实验 (动态重建模式, 无前视偏差)") print("=" * 70) print(f" 初始ETF池 ({init_ref_date}): {len(etf_codes)} 只") for code, name in etf_pool.items(): print(f" {code} {name}") print(f" 回测区间: {config['start_date']} ~ {end_date}") print(f" 重建间隔: {rebuild_interval} 交易日") # 从缓存加载数据 print(f"\n{'='*70}") print("从本地缓存加载数据...") all_data = {} # 加载所有可能用到的 ETF 数据 (初始池 + 后续可能加入的) for code in etf_codes: ohlcv = data_cache.load_cached_ohlcv(code) if not ohlcv.empty: all_data[code] = ohlcv nav_data = {} # 动态模式下不使用净值数据 print(f"价格数据: {len(all_data)} 只") # 构建交易日历 all_dates = set() for df in all_data.values(): all_dates.update(df.index.tolist()) trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date'])) print(f"交易日: {len(trade_dates)}") # 6 组实验 experiments = [ {'target_num': 1, 'weight_mode': 'equal', 'label': 'A: 全仓1只'}, {'target_num': 3, 'weight_mode': 'equal', 'label': 'B: 等权3只'}, {'target_num': 3, 'weight_mode': 'inv_vol', 'label': 'C: 反波动率3只'}, {'target_num': 5, 'weight_mode': 'equal', 'label': 'D: 等权5只'}, {'target_num': 5, 'weight_mode': 'inv_vol', 'label': 'E: 反波动率5只'}, {'target_num': 'all_positive', 'weight_mode': 'equal', 'label': 'F: 动量>0全选等权'}, ] results = [] for exp in experiments: print(f"\n{'─'*70}") print(f" 运行: {exp['label']}...") r = run_multi_backtest( config, all_data, nav_data, trade_dates, etf_codes, target_num=exp['target_num'], weight_mode=exp['weight_mode'], label=exp['label'], data_cache=data_cache, rebuild_interval=rebuild_interval, ) results.append(r) print(f" 完成: CAGR={r['cagr']:.2%}, MaxDD={r['max_dd']:.2%}, Sharpe={r['sharpe']:.2f}") # 输出对比表 print(f"\n\n{'='*100}") print(f"{'':>20s} 动量策略多持仓对比实验结果") print(f"{'='*100}") print(f" {'实验':<18s} {'累计收益':>10s} {'CAGR':>8s} {'夏普':>6s} {'最大回撤':>8s} {'Calmar':>8s} {'日胜率':>7s} {'调仓次':>6s} {'年调仓':>6s} {'盈利年':>7s}") print(f"{'─'*100}") for r in results: print(f" {r['label']:<16s} {r['total_return']:>9.2%} {r['cagr']:>7.2%} {r['sharpe']:>6.2f} " f"{r['max_dd']:>8.2%} {r['calmar']:>7.2f} {r['win_rate']:>6.2%} " f"{r['n_trades']:>5d} {r['trades_per_year']:>6.1f} {r['win_years']:>7s}") print(f"{'='*100}") # 找出最优 best_sharpe = max(results, key=lambda x: x['sharpe']) best_calmar = max(results, key=lambda x: x['calmar']) best_cagr = max(results, key=lambda x: x['cagr']) print(f"\n 最高夏普: {best_sharpe['label']} (Sharpe={best_sharpe['sharpe']:.2f})") print(f" 最高Calmar: {best_calmar['label']} (Calmar={best_calmar['calmar']:.2f})") print(f" 最高CAGR: {best_cagr['label']} (CAGR={best_cagr['cagr']:.2%})") # 保存图表 try: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans'] matplotlib.rcParams['axes.unicode_minus'] = False fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), height_ratios=[3, 1], gridspec_kw={'hspace': 0.3}) colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6'] for r, color in zip(results, colors): nav = r['result_df']['nav'] ax1.plot(nav.index, nav, label=r['label'], linewidth=1.2, color=color) ax1.set_title('动量策略多持仓对比 - 净值曲线', fontsize=14, fontweight='bold') ax1.legend(loc='upper left', fontsize=10) ax1.grid(True, alpha=0.3) ax1.set_ylabel('净值') ax1.set_yscale('log') # 回撤 for r, color in zip(results, colors): nav = r['result_df']['nav'] peak = nav.cummax() dd = (nav - peak) / peak ax2.plot(dd.index, dd, label=r['label'], linewidth=0.8, color=color, alpha=0.7) ax2.set_title('回撤对比', fontsize=12) ax2.set_ylabel('回撤') ax2.grid(True, alpha=0.3) ax2.legend(loc='lower left', fontsize=8) chart_path = Path(__file__).parent.parent / 'results' / 'momentum_multi_experiment.png' chart_path.parent.mkdir(exist_ok=True) fig.savefig(chart_path, dpi=150, bbox_inches='tight') plt.close(fig) print(f"\n 对比图表已保存: {chart_path}") except Exception as e: print(f"\n 图表生成失败: {e}") if __name__ == '__main__': main()