消除回测前视偏差(Look-Ahead Bias): - 新增 ETFDataCache 本地缓存系统,预下载全量ETF(含已退市)基础信息和日线数据 - 改造 ETFUniverseBuilder 支持纯历史模式,每个时间点只使用当时可获得的数据 - 动量.py 新增 dynamic 模式,回测中每60交易日动态重建ETF候选池 - momentum_experiment.py 同步支持动态重建 - 新增 ETF筛选引擎文档和动态池方案文档 无前视偏差实验结果(6组对比,2015-2026): A: 全仓1只 CAGR=3.32%, MaxDD=-63.19%, Sharpe=0.26 B: 等权3只 CAGR=3.40%, MaxDD=-49.72%, Sharpe=0.30 ← 最优 C: 反波动率3只 CAGR=1.73%, MaxDD=-38.59%, Sharpe=0.21 D: 等权5只 CAGR=2.77%, MaxDD=-42.39%, Sharpe=0.29 E: 反波动率5只 CAGR=-0.37%, MaxDD=-19.56%, Sharpe=-0.03 F: 动量>0全选等权 CAGR=2.02%, MaxDD=-43.27%, Sharpe=0.24 最优方案: B(等权3只)夏普、Calmar、CAGR三项均最高
400 lines
14 KiB
Python
400 lines
14 KiB
Python
"""
|
||
动量策略多持仓对比实验
|
||
对比 6 种配置: 全仓1只 / 等权3只 / 反波动率3只 / 等权5只 / 反波动率5只 / 动量>0全选等权
|
||
支持 dynamic 模式: 回测中定期重建ETF池,消除前视偏差
|
||
"""
|
||
|
||
import sys
|
||
import math
|
||
import warnings
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
warnings.filterwarnings("ignore")
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
|
||
|
||
# ==================== 复用动量.py的核心函数 ====================
|
||
from 动量 import (
|
||
fetch_all_etf_data,
|
||
fetch_etf_nav_data,
|
||
calc_atr,
|
||
calc_weighted_momentum_score,
|
||
apply_crash_filter,
|
||
calc_premium_rate,
|
||
resolve_etf_pool,
|
||
)
|
||
|
||
|
||
# ==================== 权重计算 ====================
|
||
def calc_equal_weights(codes: list) -> dict:
|
||
"""等权"""
|
||
w = 1.0 / len(codes)
|
||
return {c: w for c in codes}
|
||
|
||
|
||
def calc_inv_vol_weights(codes: list, all_data: dict, today, lookback: int = 20) -> dict:
|
||
"""反波动率加权: 权重 ∝ 1/σ"""
|
||
vols = {}
|
||
for c in codes:
|
||
if c not in all_data:
|
||
continue
|
||
df = all_data[c]
|
||
hist = df[df.index <= today].tail(lookback + 1)
|
||
if len(hist) < 10:
|
||
vols[c] = 1.0 # fallback
|
||
continue
|
||
ret = hist['close'].pct_change().dropna()
|
||
vol = ret.std()
|
||
vols[c] = vol if vol > 0 else 1e-6
|
||
|
||
if not vols:
|
||
return calc_equal_weights(codes)
|
||
|
||
inv_vols = {c: 1.0 / v for c, v in vols.items()}
|
||
total = sum(inv_vols.values())
|
||
return {c: iv / total for c, iv in inv_vols.items()}
|
||
|
||
|
||
# ==================== 多持仓回测引擎 ====================
|
||
def run_multi_backtest(config: dict, all_data: dict, nav_data: dict,
|
||
trade_dates: list, etf_codes: list,
|
||
target_num: int = 1, weight_mode: str = 'equal',
|
||
label: str = '',
|
||
data_cache=None, rebuild_interval: int = 0) -> dict:
|
||
"""
|
||
多持仓回测
|
||
|
||
Args:
|
||
target_num: 同时持有数量
|
||
weight_mode: 'equal' 等权 | 'inv_vol' 反波动率
|
||
label: 实验标签
|
||
data_cache: ETFDataCache 实例(动态重建模式)
|
||
rebuild_interval: 重建间隔(交易日),0=不重建
|
||
|
||
Returns:
|
||
dict: 绩效指标
|
||
"""
|
||
max_lookback = config['max_days'] + 10
|
||
holdings = {} # {code: weight}
|
||
daily_returns = []
|
||
n_trades = 0
|
||
last_rebuild_i = -rebuild_interval if rebuild_interval > 0 else 0
|
||
current_codes = list(etf_codes) # 当前活跃的候选池
|
||
|
||
for i, today in enumerate(trade_dates):
|
||
# 动态重建 ETF 池
|
||
if rebuild_interval > 0 and data_cache is not None and (i - last_rebuild_i >= rebuild_interval):
|
||
ref_str = today.strftime('%Y%m%d')
|
||
try:
|
||
new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache)
|
||
current_codes = list(new_pool.keys())
|
||
# 加载新增 ETF 数据
|
||
for code in current_codes:
|
||
if code not in all_data:
|
||
ohlcv = data_cache.load_cached_ohlcv(code)
|
||
if not ohlcv.empty:
|
||
all_data[code] = ohlcv
|
||
last_rebuild_i = i
|
||
except Exception:
|
||
pass
|
||
|
||
# 1. 计算每只 ETF 的得分 (使用当前活跃池)
|
||
scores = {}
|
||
for code in current_codes:
|
||
if code not in all_data:
|
||
continue
|
||
df = all_data[code]
|
||
hist = df[df.index <= today].tail(max_lookback + 1)
|
||
if len(hist) < config['min_days']:
|
||
continue
|
||
|
||
close_arr = hist['close'].values
|
||
|
||
if config['auto_day']:
|
||
if len(hist) < max_lookback:
|
||
lookback = config['fixed_days']
|
||
else:
|
||
long_atr = calc_atr(hist['high'], hist['low'], hist['close'],
|
||
config['max_days'])
|
||
short_atr = calc_atr(hist['high'], hist['low'], hist['close'],
|
||
config['min_days'])
|
||
la = long_atr.iloc[-1]
|
||
sa = short_atr.iloc[-1]
|
||
if la > 0 and not np.isnan(la) and not np.isnan(sa):
|
||
ratio = min(0.9, sa / la)
|
||
lookback = int(config['min_days'] +
|
||
(config['max_days'] - config['min_days']) * (1 - ratio))
|
||
else:
|
||
lookback = config['fixed_days']
|
||
prices = close_arr[-lookback:]
|
||
else:
|
||
prices = close_arr[-config['fixed_days']:]
|
||
|
||
if len(prices) < 5:
|
||
continue
|
||
|
||
result = calc_weighted_momentum_score(prices)
|
||
score = result['score']
|
||
score = apply_crash_filter(close_arr, score)
|
||
|
||
if code in nav_data:
|
||
nav_df = nav_data[code]
|
||
nav_row = nav_df[nav_df.index <= today]
|
||
if not nav_row.empty:
|
||
nav_val = nav_row.iloc[-1]['nav']
|
||
etf_price = close_arr[-1]
|
||
premium = calc_premium_rate(etf_price, nav_val)
|
||
if premium >= config['premium_threshold']:
|
||
score -= 1
|
||
|
||
if 0 < score < 6:
|
||
scores[code] = score
|
||
|
||
# 2. 选出 top N (或全部正动量)
|
||
if scores:
|
||
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
|
||
if target_num == 'all_positive':
|
||
targets = [c for c, s in ranked] # scores 已过滤 >0
|
||
else:
|
||
targets = [c for c, _ in ranked[:target_num]]
|
||
else:
|
||
targets = []
|
||
|
||
# 3. 计算权重
|
||
if targets:
|
||
if weight_mode == 'inv_vol':
|
||
new_weights = calc_inv_vol_weights(targets, all_data, today)
|
||
else:
|
||
new_weights = calc_equal_weights(targets)
|
||
else:
|
||
new_weights = {}
|
||
|
||
# 4. 计算当日组合收益
|
||
port_ret = 0.0
|
||
for code, weight in holdings.items():
|
||
if code not in all_data:
|
||
continue
|
||
df_h = all_data[code]
|
||
if today in df_h.index:
|
||
prev_dates = df_h[df_h.index < today].index
|
||
if len(prev_dates) > 0:
|
||
prev_price = df_h.loc[prev_dates[-1], 'close']
|
||
today_price = df_h.loc[today, 'close']
|
||
port_ret += weight * (today_price / prev_price - 1)
|
||
|
||
# 5. 调仓判断
|
||
old_set = set(holdings.keys())
|
||
new_set = set(new_weights.keys())
|
||
if old_set != new_set:
|
||
# 换手成本: 按换手比例收取
|
||
turnover = 0.0
|
||
for c in old_set - new_set:
|
||
turnover += holdings[c]
|
||
for c in new_set - old_set:
|
||
turnover += new_weights[c]
|
||
for c in old_set & new_set:
|
||
turnover += abs(new_weights[c] - holdings[c])
|
||
trade_cost = turnover * config['trade_cost'] / 2 # 单边已含在trade_cost中
|
||
n_trades += 1
|
||
else:
|
||
trade_cost = 0.0
|
||
|
||
holdings = new_weights
|
||
|
||
daily_returns.append({
|
||
'date': today,
|
||
'daily_return': port_ret - trade_cost,
|
||
})
|
||
|
||
# 计算绩效
|
||
result_df = pd.DataFrame(daily_returns).set_index('date')
|
||
result_df['nav'] = (1 + result_df['daily_return']).cumprod()
|
||
|
||
nav = result_df['nav']
|
||
total_return = nav.iloc[-1] / nav.iloc[0] - 1
|
||
days = (result_df.index[-1] - result_df.index[0]).days
|
||
cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
|
||
daily_rets = result_df['daily_return']
|
||
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
|
||
peak = nav.cummax()
|
||
drawdown = (nav - peak) / peak
|
||
max_dd = drawdown.min()
|
||
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
|
||
win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0
|
||
years = days / 365
|
||
|
||
# 年度统计
|
||
win_years = 0
|
||
total_years = 0
|
||
for year, group in result_df.groupby(result_df.index.year):
|
||
yr = group['nav']
|
||
yr_ret = yr.iloc[-1] / yr.iloc[0] - 1
|
||
total_years += 1
|
||
if yr_ret > 0:
|
||
win_years += 1
|
||
|
||
return {
|
||
'label': label,
|
||
'target_num': target_num,
|
||
'weight_mode': weight_mode,
|
||
'total_return': total_return,
|
||
'cagr': cagr,
|
||
'sharpe': sharpe,
|
||
'max_dd': max_dd,
|
||
'calmar': calmar,
|
||
'win_rate': win_rate,
|
||
'n_trades': n_trades,
|
||
'trades_per_year': n_trades / years if years > 0 else 0,
|
||
'win_years': f"{win_years}/{total_years}",
|
||
'result_df': result_df,
|
||
}
|
||
|
||
|
||
# ==================== 主函数 ====================
|
||
def main():
|
||
from 动量 import CONFIG
|
||
|
||
config = CONFIG.copy()
|
||
# 强制使用 dynamic 模式
|
||
config['etf_pool'] = 'dynamic'
|
||
rebuild_interval = config.get('rebuild_interval', 60)
|
||
|
||
# 初始化缓存
|
||
from scripts.etf_data_cache import ETFDataCache
|
||
data_cache = ETFDataCache()
|
||
|
||
# 用 start_date 作为初始重建日期
|
||
init_ref_date = config['start_date'].replace('-', '')
|
||
etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache)
|
||
etf_codes = list(etf_pool.keys())
|
||
end_date = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
print("=" * 70)
|
||
print(" 动量策略多持仓对比实验 (动态重建模式, 无前视偏差)")
|
||
print("=" * 70)
|
||
print(f" 初始ETF池 ({init_ref_date}): {len(etf_codes)} 只")
|
||
for code, name in etf_pool.items():
|
||
print(f" {code} {name}")
|
||
print(f" 回测区间: {config['start_date']} ~ {end_date}")
|
||
print(f" 重建间隔: {rebuild_interval} 交易日")
|
||
|
||
# 从缓存加载数据
|
||
print(f"\n{'='*70}")
|
||
print("从本地缓存加载数据...")
|
||
all_data = {}
|
||
# 加载所有可能用到的 ETF 数据 (初始池 + 后续可能加入的)
|
||
for code in etf_codes:
|
||
ohlcv = data_cache.load_cached_ohlcv(code)
|
||
if not ohlcv.empty:
|
||
all_data[code] = ohlcv
|
||
nav_data = {} # 动态模式下不使用净值数据
|
||
print(f"价格数据: {len(all_data)} 只")
|
||
|
||
# 构建交易日历
|
||
all_dates = set()
|
||
for df in all_data.values():
|
||
all_dates.update(df.index.tolist())
|
||
trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date']))
|
||
print(f"交易日: {len(trade_dates)}")
|
||
|
||
# 6 组实验
|
||
experiments = [
|
||
{'target_num': 1, 'weight_mode': 'equal', 'label': 'A: 全仓1只'},
|
||
{'target_num': 3, 'weight_mode': 'equal', 'label': 'B: 等权3只'},
|
||
{'target_num': 3, 'weight_mode': 'inv_vol', 'label': 'C: 反波动率3只'},
|
||
{'target_num': 5, 'weight_mode': 'equal', 'label': 'D: 等权5只'},
|
||
{'target_num': 5, 'weight_mode': 'inv_vol', 'label': 'E: 反波动率5只'},
|
||
{'target_num': 'all_positive', 'weight_mode': 'equal', 'label': 'F: 动量>0全选等权'},
|
||
]
|
||
|
||
results = []
|
||
for exp in experiments:
|
||
print(f"\n{'─'*70}")
|
||
print(f" 运行: {exp['label']}...")
|
||
r = run_multi_backtest(
|
||
config, all_data, nav_data, trade_dates, etf_codes,
|
||
target_num=exp['target_num'],
|
||
weight_mode=exp['weight_mode'],
|
||
label=exp['label'],
|
||
data_cache=data_cache,
|
||
rebuild_interval=rebuild_interval,
|
||
)
|
||
results.append(r)
|
||
print(f" 完成: CAGR={r['cagr']:.2%}, MaxDD={r['max_dd']:.2%}, Sharpe={r['sharpe']:.2f}")
|
||
|
||
# 输出对比表
|
||
print(f"\n\n{'='*100}")
|
||
print(f"{'':>20s} 动量策略多持仓对比实验结果")
|
||
print(f"{'='*100}")
|
||
print(f" {'实验':<18s} {'累计收益':>10s} {'CAGR':>8s} {'夏普':>6s} {'最大回撤':>8s} {'Calmar':>8s} {'日胜率':>7s} {'调仓次':>6s} {'年调仓':>6s} {'盈利年':>7s}")
|
||
print(f"{'─'*100}")
|
||
|
||
for r in results:
|
||
print(f" {r['label']:<16s} {r['total_return']:>9.2%} {r['cagr']:>7.2%} {r['sharpe']:>6.2f} "
|
||
f"{r['max_dd']:>8.2%} {r['calmar']:>7.2f} {r['win_rate']:>6.2%} "
|
||
f"{r['n_trades']:>5d} {r['trades_per_year']:>6.1f} {r['win_years']:>7s}")
|
||
|
||
print(f"{'='*100}")
|
||
|
||
# 找出最优
|
||
best_sharpe = max(results, key=lambda x: x['sharpe'])
|
||
best_calmar = max(results, key=lambda x: x['calmar'])
|
||
best_cagr = max(results, key=lambda x: x['cagr'])
|
||
|
||
print(f"\n 最高夏普: {best_sharpe['label']} (Sharpe={best_sharpe['sharpe']:.2f})")
|
||
print(f" 最高Calmar: {best_calmar['label']} (Calmar={best_calmar['calmar']:.2f})")
|
||
print(f" 最高CAGR: {best_cagr['label']} (CAGR={best_cagr['cagr']:.2%})")
|
||
|
||
# 保存图表
|
||
try:
|
||
import matplotlib
|
||
matplotlib.use('Agg')
|
||
import matplotlib.pyplot as plt
|
||
matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
|
||
matplotlib.rcParams['axes.unicode_minus'] = False
|
||
|
||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), height_ratios=[3, 1],
|
||
gridspec_kw={'hspace': 0.3})
|
||
|
||
colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6']
|
||
for r, color in zip(results, colors):
|
||
nav = r['result_df']['nav']
|
||
ax1.plot(nav.index, nav, label=r['label'], linewidth=1.2, color=color)
|
||
|
||
ax1.set_title('动量策略多持仓对比 - 净值曲线', fontsize=14, fontweight='bold')
|
||
ax1.legend(loc='upper left', fontsize=10)
|
||
ax1.grid(True, alpha=0.3)
|
||
ax1.set_ylabel('净值')
|
||
ax1.set_yscale('log')
|
||
|
||
# 回撤
|
||
for r, color in zip(results, colors):
|
||
nav = r['result_df']['nav']
|
||
peak = nav.cummax()
|
||
dd = (nav - peak) / peak
|
||
ax2.plot(dd.index, dd, label=r['label'], linewidth=0.8, color=color, alpha=0.7)
|
||
|
||
ax2.set_title('回撤对比', fontsize=12)
|
||
ax2.set_ylabel('回撤')
|
||
ax2.grid(True, alpha=0.3)
|
||
ax2.legend(loc='lower left', fontsize=8)
|
||
|
||
chart_path = Path(__file__).parent.parent / 'results' / 'momentum_multi_experiment.png'
|
||
chart_path.parent.mkdir(exist_ok=True)
|
||
fig.savefig(chart_path, dpi=150, bbox_inches='tight')
|
||
plt.close(fig)
|
||
print(f"\n 对比图表已保存: {chart_path}")
|
||
except Exception as e:
|
||
print(f"\n 图表生成失败: {e}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|