- 问题:cum_return_idx 和 cum_return_etf 使用相同的 ETF 价格计算 - 修复:分别使用指数价格(raw)和 ETF 价格(hfq)独立计算 - 验证:72.6% 的持仓记录显示差异(0.06%~0.48%),符合预期 - 新增验证脚本:verify_cum_return_fix.py
337 lines
12 KiB
Python
337 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
测算 ETF 跳空收益(Gap Return)对策略的影响
|
||
|
||
测算目标:
|
||
1. 量化各 ETF 的跳空特征(幅度、频率、波动率)
|
||
2. 分析跳空对策略收益的实际影响
|
||
3. 判断是否需要修改收益计算逻辑
|
||
|
||
用法:
|
||
python framework_v2/scripts/measure_gap_impact.py
|
||
"""
|
||
|
||
import sys
|
||
from pathlib import Path
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
project_root = Path(__file__).parent.parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
|
||
from framework_v2.config import load_config
|
||
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
|
||
from framework_v2.shared.data import FlaskAPIFetcher
|
||
|
||
|
||
def fetch_etf_data_with_ohlc(codes, start, end):
|
||
"""获取 ETF 的 OHLC 数据(hfq)"""
|
||
fetcher = FlaskAPIFetcher()
|
||
|
||
print(f"\n[数据获取] 获取 {len(codes)} 只 ETF 的 OHLC 数据(hfq)...")
|
||
data = {}
|
||
|
||
for i, code in enumerate(codes, 1):
|
||
print(f" [{i}/{len(codes)}] {code}...")
|
||
df = fetcher._source.fetch(
|
||
code=code,
|
||
start_date=start,
|
||
end_date=end,
|
||
adj='hfq',
|
||
asset_type='china_etf'
|
||
)
|
||
if df is not None:
|
||
data[code] = df
|
||
print(f" ✓ {len(df)} 条")
|
||
else:
|
||
print(f" ✗ 获取失败")
|
||
|
||
return data
|
||
|
||
|
||
def calculate_gap_statistics(etf_data):
|
||
"""计算各 ETF 的跳空统计"""
|
||
print("\n" + "=" * 80)
|
||
print(" 跳空收益统计分析")
|
||
print("=" * 80)
|
||
|
||
stats_list = []
|
||
|
||
for code, df in etf_data.items():
|
||
# 确保按日期排序
|
||
df = df.sort_index()
|
||
|
||
# 计算收益率
|
||
prev_close = df['close'].shift(1)
|
||
|
||
# 跳空收益率:(T_open - T-1_close) / T-1_close
|
||
gap_return = (df['open'] - prev_close) / prev_close
|
||
|
||
# 日内收益率:(T_close - T_open) / T_open
|
||
intraday_return = (df['close'] - df['open']) / df['open']
|
||
|
||
# 验证:总收益率 ≈ 跳空 + 日内
|
||
total_return = df['close'].pct_change()
|
||
|
||
# 统计指标
|
||
stats = {
|
||
'ETF': code,
|
||
'数据天数': len(df),
|
||
'平均跳空(%)': gap_return.mean() * 100,
|
||
'跳空波动率(%)': gap_return.std() * 100,
|
||
'向上跳空比例(%)': (gap_return > 0.0001).sum() / len(gap_return) * 100,
|
||
'向下跳空比例(%)': (gap_return < -0.0001).sum() / len(gap_return) * 100,
|
||
'最大向上跳空(%)': gap_return.max() * 100,
|
||
'最大向下跳空(%)': gap_return.min() * 100,
|
||
'平均日内收益(%)': intraday_return.mean() * 100,
|
||
'日内波动率(%)': intraday_return.std() * 100,
|
||
'跳空>1%天数': (gap_return.abs() > 0.01).sum(),
|
||
'跳空>2%天数': (gap_return.abs() > 0.02).sum(),
|
||
}
|
||
|
||
stats_list.append(stats)
|
||
|
||
# 转换为 DataFrame
|
||
stats_df = pd.DataFrame(stats_list)
|
||
|
||
# 打印统计表格
|
||
print("\n各 ETF 跳空收益统计:")
|
||
print("-" * 80)
|
||
for _, row in stats_df.iterrows():
|
||
print(f"\n{row['ETF']}:")
|
||
print(f" 数据天数: {row['数据天数']}")
|
||
print(f" 平均跳空: {row['平均跳空(%)']:+.3f}% (波动率: {row['跳空波动率(%)']:.2f}%)")
|
||
print(f" 向上跳空: {row['向上跳空比例(%)']:.1f}% 向下: {row['向下跳空比例(%)']:.1f}%")
|
||
print(f" 最大跳空: +{row['最大向上跳空(%)']:.2f}% / {row['最大向下跳空(%)']:.2f}%")
|
||
print(f" 跳空>1%: {row['跳空>1%天数']}天 >2%: {row['跳空>2%天数']}天")
|
||
print(f" 平均日内收益: {row['平均日内收益(%)']:+.3f}%")
|
||
|
||
return stats_df
|
||
|
||
|
||
def analyze_strategy_gap_impact(strategy, etf_data):
|
||
"""分析跳空对策略的实际影响"""
|
||
print("\n" + "=" * 80)
|
||
print(" 策略跳空影响分析")
|
||
print("=" * 80)
|
||
|
||
# 1. 获取策略持仓数据
|
||
print("\n[1] 获取策略持仓数据...")
|
||
|
||
# 运行策略获取信号和仓位
|
||
from datetime import date
|
||
config = strategy.config
|
||
start = config.backtest.start_date
|
||
end = config.backtest.end_date
|
||
if end is None:
|
||
end = date.today().strftime('%Y-%m-%d')
|
||
|
||
# 运行策略(不导出 JSON)
|
||
result = strategy.run(export_detail=False)
|
||
|
||
positions = result['positions']
|
||
trading_calendar = positions.index
|
||
|
||
# 2. 计算新旧两种收益
|
||
print("\n[2] 计算两种收益方法...")
|
||
|
||
signal_to_trade = config.asset_pools.get_signal_to_trade_mapping()
|
||
|
||
# 准备数据
|
||
close_dict = {}
|
||
open_dict = {}
|
||
|
||
for signal_code, trade_code in signal_to_trade.items():
|
||
if trade_code in etf_data:
|
||
df = etf_data[trade_code]
|
||
# 对齐到 A 股日历
|
||
close_dict[signal_code] = df['close'].reindex(trading_calendar, method='ffill')
|
||
open_dict[signal_code] = df['open'].reindex(trading_calendar, method='ffill')
|
||
|
||
close_df = pd.DataFrame(close_dict)
|
||
open_df = pd.DataFrame(open_dict)
|
||
|
||
# 方法 1:旧方法(close-to-close)
|
||
positions_delayed = positions.shift(1).fillna(0)
|
||
old_returns_df = close_df.pct_change()
|
||
old_strategy_returns = (positions_delayed * old_returns_df).sum(axis=1)
|
||
|
||
# 方法 2:新方法(分段计算)
|
||
prev_positions = positions_delayed.shift(1).fillna(0)
|
||
curr_positions = positions_delayed
|
||
|
||
# 检测状态
|
||
is_buying = (prev_positions == 0) & (curr_positions > 0)
|
||
is_holding = (prev_positions > 0) & (curr_positions > 0)
|
||
is_selling = (prev_positions > 0) & (curr_positions == 0)
|
||
|
||
# 计算各类收益率
|
||
buy_returns = (close_df - open_df) / open_df # open-to-close
|
||
hold_returns = close_df.pct_change() # close-to-close
|
||
sell_returns = (open_df - close_df.shift(1)) / close_df.shift(1) # close-to-open
|
||
|
||
# 组合收益率
|
||
new_returns_df = pd.DataFrame(0.0, index=close_df.index, columns=close_df.columns)
|
||
new_returns_df[is_buying] = buy_returns[is_buying]
|
||
new_returns_df[is_holding] = hold_returns[is_holding]
|
||
new_returns_df[is_selling] = sell_returns[is_selling]
|
||
|
||
new_strategy_returns = (curr_positions * new_returns_df).sum(axis=1)
|
||
|
||
# 3. 计算净值曲线和 KPI
|
||
print("\n[3] 计算净值曲线和 KPI 对比...")
|
||
|
||
old_equity = (1 + old_strategy_returns).cumprod()
|
||
new_equity = (1 + new_strategy_returns).cumprod()
|
||
|
||
def calc_kpi(returns, equity, name):
|
||
total_return = equity.iloc[-1] / equity.iloc[0] - 1
|
||
n_days = len(returns)
|
||
annual_return = (1 + total_return) ** (252 / n_days) - 1
|
||
|
||
cummax = equity.cummax()
|
||
drawdown = (equity - cummax) / cummax
|
||
max_drawdown = drawdown.min()
|
||
|
||
sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
|
||
|
||
print(f"\n {name}:")
|
||
print(f" 总收益: {total_return * 100:.2f}%")
|
||
print(f" 年化收益: {annual_return * 100:.2f}%")
|
||
print(f" 最大回撤: {max_drawdown * 100:.2f}%")
|
||
print(f" 夏普比率: {sharpe:.2f}")
|
||
print(f" 交易天数: {n_days}")
|
||
|
||
return {
|
||
'总收益': total_return,
|
||
'年化收益': annual_return,
|
||
'最大回撤': max_drawdown,
|
||
'夏普比率': sharpe,
|
||
}
|
||
|
||
old_kpi = calc_kpi(old_strategy_returns, old_equity, "旧方法(close-to-close)")
|
||
new_kpi = calc_kpi(new_strategy_returns, new_equity, "新方法(分段计算)")
|
||
|
||
# 4. 差异分析
|
||
print("\n" + "=" * 80)
|
||
print(" 差异对比")
|
||
print("=" * 80)
|
||
|
||
print(f"\n {'指标':<12} {'旧方法':>12} {'新方法':>12} {'差异':>12}")
|
||
print(f" {'-'*12} {'-'*12} {'-'*12} {'-'*12}")
|
||
|
||
for key in ['总收益', '年化收益', '最大回撤', '夏普比率']:
|
||
old_val = old_kpi[key]
|
||
new_val = new_kpi[key]
|
||
diff = new_val - old_val
|
||
|
||
if key == '夏普比率':
|
||
print(f" {key:<12} {old_val:>12.2f} {new_val:>12.2f} {diff:>+12.2f}")
|
||
else:
|
||
print(f" {key:<12} {old_val*100:>11.2f}% {new_val*100:>11.2f}% {diff*100:>+11.2f}%")
|
||
|
||
# 5. 调仓日分析
|
||
print("\n" + "=" * 80)
|
||
print(" 调仓日跳空分析")
|
||
print("=" * 80)
|
||
|
||
# 识别调仓日
|
||
position_changes = (positions != positions.shift(1)).any(axis=1)
|
||
rebalance_dates = positions[position_changes].index
|
||
|
||
print(f"\n 总调仓次数: {len(rebalance_dates)}")
|
||
|
||
# 分析调仓日的跳空
|
||
gap_returns_all = []
|
||
for date in rebalance_dates:
|
||
if date in close_df.index:
|
||
# 计算该日的平均跳空(所有持仓 ETF)
|
||
pos = positions.loc[date]
|
||
held_codes = pos[pos > 0].index
|
||
|
||
if len(held_codes) > 0:
|
||
# 过滤掉不在 open_df 中的代码(如指数)
|
||
held_codes = [c for c in held_codes if c in open_df.columns]
|
||
if len(held_codes) == 0:
|
||
continue
|
||
|
||
day_gap = open_df.loc[date][held_codes]
|
||
prev_close = close_df.shift(1).loc[date][held_codes]
|
||
gap = (day_gap - prev_close) / prev_close
|
||
gap_returns_all.append(gap.mean())
|
||
|
||
if gap_returns_all:
|
||
gap_series = pd.Series(gap_returns_all)
|
||
print(f"\n 调仓日跳空统计:")
|
||
print(f" 平均跳空: {gap_series.mean() * 100:+.3f}%")
|
||
print(f" 跳空标准差: {gap_series.std() * 100:.2f}%")
|
||
print(f" 最大向上跳空: {gap_series.max() * 100:+.2f}%")
|
||
print(f" 最大向下跳空: {gap_series.min() * 100:+.2f}%")
|
||
print(f" 向上跳空天数: {(gap_series > 0).sum()} ({(gap_series > 0).sum() / len(gap_series) * 100:.1f}%)")
|
||
print(f" 向下跳空天数: {(gap_series < 0).sum()} ({(gap_series < 0).sum() / len(gap_series) * 100:.1f}%)")
|
||
else:
|
||
print(f"\n ⚠ 无法计算调仓日跳空(数据缺失)")
|
||
|
||
return old_kpi, new_kpi
|
||
|
||
|
||
def main():
|
||
print("=" * 80)
|
||
print(" ETF 跳空收益影响测算")
|
||
print("=" * 80)
|
||
|
||
# 1. 加载配置
|
||
config_file = project_root / 'framework_v2' / 'strategies' / 'rotation' / 'config_simple.yaml'
|
||
print(f"\n[1] 加载配置: {config_file}")
|
||
config = load_config(str(config_file))
|
||
|
||
# 2. 获取 ETF 列表
|
||
signal_to_trade = config.asset_pools.get_signal_to_trade_mapping()
|
||
trade_codes = list(set(signal_to_trade.values()))
|
||
# 过滤掉不是 ETF 的代码(如 931862.CSI)
|
||
trade_codes = [c for c in trade_codes if not c.endswith('.CSI')]
|
||
|
||
print(f" ETF 数量: {len(trade_codes)}")
|
||
|
||
# 3. 获取数据
|
||
from datetime import date
|
||
start = config.backtest.start_date
|
||
end = config.backtest.end_date
|
||
if end is None:
|
||
end = date.today().strftime('%Y-%m-%d')
|
||
|
||
etf_data = fetch_etf_data_with_ohlc(trade_codes, start, end)
|
||
|
||
# 4. 计算跳空统计
|
||
stats_df = calculate_gap_statistics(etf_data)
|
||
|
||
# 5. 分析策略影响
|
||
strategy = GlobalRotationStrategy(config)
|
||
old_kpi, new_kpi = analyze_strategy_gap_impact(strategy, etf_data)
|
||
|
||
# 6. 结论
|
||
print("\n" + "=" * 80)
|
||
print(" 结论与建议")
|
||
print("=" * 80)
|
||
|
||
annual_diff = new_kpi['年化收益'] - old_kpi['年化收益']
|
||
|
||
if abs(annual_diff) < 0.01: # 差异 < 1%
|
||
print("\n ✓ 跳空影响较小(< 1%),可以继续使用 close-to-close 简化计算")
|
||
elif abs(annual_diff) < 0.03: # 差异 1-3%
|
||
print("\n ⚠ 跳空影响中等(1-3%),建议考虑使用分段计算提高精度")
|
||
else: # 差异 > 3%
|
||
print("\n ✗ 跳空影响显著(> 3%),强烈建议使用分段计算")
|
||
|
||
print(f"\n 当前年化: {old_kpi['年化收益'] * 100:.2f}%")
|
||
print(f" 修正后年化: {new_kpi['年化收益'] * 100:.2f}%")
|
||
print(f" 差异: {annual_diff * 100:+.2f}%")
|
||
print("=" * 80)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|