diff --git a/framework_v2/scripts/measure_gap_impact.py b/framework_v2/scripts/measure_gap_impact.py new file mode 100644 index 0000000..7d3e948 --- /dev/null +++ b/framework_v2/scripts/measure_gap_impact.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +""" +测算 ETF 跳空收益(Gap Return)对策略的影响 + +测算目标: +1. 量化各 ETF 的跳空特征(幅度、频率、波动率) +2. 分析跳空对策略收益的实际影响 +3. 判断是否需要修改收益计算逻辑 + +用法: + python framework_v2/scripts/measure_gap_impact.py +""" + +import sys +from pathlib import Path +import numpy as np +import pandas as pd + +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from dotenv import load_dotenv +load_dotenv() + +from framework_v2.config import load_config +from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy +from framework_v2.shared.data import FlaskAPIFetcher + + +def fetch_etf_data_with_ohlc(codes, start, end): + """获取 ETF 的 OHLC 数据(hfq)""" + fetcher = FlaskAPIFetcher() + + print(f"\n[数据获取] 获取 {len(codes)} 只 ETF 的 OHLC 数据(hfq)...") + data = {} + + for i, code in enumerate(codes, 1): + print(f" [{i}/{len(codes)}] {code}...") + df = fetcher._source.fetch( + code=code, + start_date=start, + end_date=end, + adj='hfq', + asset_type='china_etf' + ) + if df is not None: + data[code] = df + print(f" ✓ {len(df)} 条") + else: + print(f" ✗ 获取失败") + + return data + + +def calculate_gap_statistics(etf_data): + """计算各 ETF 的跳空统计""" + print("\n" + "=" * 80) + print(" 跳空收益统计分析") + print("=" * 80) + + stats_list = [] + + for code, df in etf_data.items(): + # 确保按日期排序 + df = df.sort_index() + + # 计算收益率 + prev_close = df['close'].shift(1) + + # 跳空收益率:(T_open - T-1_close) / T-1_close + gap_return = (df['open'] - prev_close) / prev_close + + # 日内收益率:(T_close - T_open) / T_open + intraday_return = (df['close'] - df['open']) / df['open'] + + # 验证:总收益率 ≈ 跳空 + 日内 + total_return = df['close'].pct_change() + + # 统计指标 + stats = { + 'ETF': code, + '数据天数': len(df), + '平均跳空(%)': gap_return.mean() * 100, + '跳空波动率(%)': gap_return.std() * 100, + '向上跳空比例(%)': (gap_return > 0.0001).sum() / len(gap_return) * 100, + '向下跳空比例(%)': (gap_return < -0.0001).sum() / len(gap_return) * 100, + '最大向上跳空(%)': gap_return.max() * 100, + '最大向下跳空(%)': gap_return.min() * 100, + '平均日内收益(%)': intraday_return.mean() * 100, + '日内波动率(%)': intraday_return.std() * 100, + '跳空>1%天数': (gap_return.abs() > 0.01).sum(), + '跳空>2%天数': (gap_return.abs() > 0.02).sum(), + } + + stats_list.append(stats) + + # 转换为 DataFrame + stats_df = pd.DataFrame(stats_list) + + # 打印统计表格 + print("\n各 ETF 跳空收益统计:") + print("-" * 80) + for _, row in stats_df.iterrows(): + print(f"\n{row['ETF']}:") + print(f" 数据天数: {row['数据天数']}") + print(f" 平均跳空: {row['平均跳空(%)']:+.3f}% (波动率: {row['跳空波动率(%)']:.2f}%)") + print(f" 向上跳空: {row['向上跳空比例(%)']:.1f}% 向下: {row['向下跳空比例(%)']:.1f}%") + print(f" 最大跳空: +{row['最大向上跳空(%)']:.2f}% / {row['最大向下跳空(%)']:.2f}%") + print(f" 跳空>1%: {row['跳空>1%天数']}天 >2%: {row['跳空>2%天数']}天") + print(f" 平均日内收益: {row['平均日内收益(%)']:+.3f}%") + + return stats_df + + +def analyze_strategy_gap_impact(strategy, etf_data): + """分析跳空对策略的实际影响""" + print("\n" + "=" * 80) + print(" 策略跳空影响分析") + print("=" * 80) + + # 1. 获取策略持仓数据 + print("\n[1] 获取策略持仓数据...") + + # 运行策略获取信号和仓位 + from datetime import date + config = strategy.config + start = config.backtest.start_date + end = config.backtest.end_date + if end is None: + end = date.today().strftime('%Y-%m-%d') + + # 运行策略(不导出 JSON) + result = strategy.run(export_detail=False) + + positions = result['positions'] + trading_calendar = positions.index + + # 2. 计算新旧两种收益 + print("\n[2] 计算两种收益方法...") + + signal_to_trade = config.asset_pools.get_signal_to_trade_mapping() + + # 准备数据 + close_dict = {} + open_dict = {} + + for signal_code, trade_code in signal_to_trade.items(): + if trade_code in etf_data: + df = etf_data[trade_code] + # 对齐到 A 股日历 + close_dict[signal_code] = df['close'].reindex(trading_calendar, method='ffill') + open_dict[signal_code] = df['open'].reindex(trading_calendar, method='ffill') + + close_df = pd.DataFrame(close_dict) + open_df = pd.DataFrame(open_dict) + + # 方法 1:旧方法(close-to-close) + positions_delayed = positions.shift(1).fillna(0) + old_returns_df = close_df.pct_change() + old_strategy_returns = (positions_delayed * old_returns_df).sum(axis=1) + + # 方法 2:新方法(分段计算) + prev_positions = positions_delayed.shift(1).fillna(0) + curr_positions = positions_delayed + + # 检测状态 + is_buying = (prev_positions == 0) & (curr_positions > 0) + is_holding = (prev_positions > 0) & (curr_positions > 0) + is_selling = (prev_positions > 0) & (curr_positions == 0) + + # 计算各类收益率 + buy_returns = (close_df - open_df) / open_df # open-to-close + hold_returns = close_df.pct_change() # close-to-close + sell_returns = (open_df - close_df.shift(1)) / close_df.shift(1) # close-to-open + + # 组合收益率 + new_returns_df = pd.DataFrame(0.0, index=close_df.index, columns=close_df.columns) + new_returns_df[is_buying] = buy_returns[is_buying] + new_returns_df[is_holding] = hold_returns[is_holding] + new_returns_df[is_selling] = sell_returns[is_selling] + + new_strategy_returns = (curr_positions * new_returns_df).sum(axis=1) + + # 3. 计算净值曲线和 KPI + print("\n[3] 计算净值曲线和 KPI 对比...") + + old_equity = (1 + old_strategy_returns).cumprod() + new_equity = (1 + new_strategy_returns).cumprod() + + def calc_kpi(returns, equity, name): + total_return = equity.iloc[-1] / equity.iloc[0] - 1 + n_days = len(returns) + annual_return = (1 + total_return) ** (252 / n_days) - 1 + + cummax = equity.cummax() + drawdown = (equity - cummax) / cummax + max_drawdown = drawdown.min() + + sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0 + + print(f"\n {name}:") + print(f" 总收益: {total_return * 100:.2f}%") + print(f" 年化收益: {annual_return * 100:.2f}%") + print(f" 最大回撤: {max_drawdown * 100:.2f}%") + print(f" 夏普比率: {sharpe:.2f}") + print(f" 交易天数: {n_days}") + + return { + '总收益': total_return, + '年化收益': annual_return, + '最大回撤': max_drawdown, + '夏普比率': sharpe, + } + + old_kpi = calc_kpi(old_strategy_returns, old_equity, "旧方法(close-to-close)") + new_kpi = calc_kpi(new_strategy_returns, new_equity, "新方法(分段计算)") + + # 4. 差异分析 + print("\n" + "=" * 80) + print(" 差异对比") + print("=" * 80) + + print(f"\n {'指标':<12} {'旧方法':>12} {'新方法':>12} {'差异':>12}") + print(f" {'-'*12} {'-'*12} {'-'*12} {'-'*12}") + + for key in ['总收益', '年化收益', '最大回撤', '夏普比率']: + old_val = old_kpi[key] + new_val = new_kpi[key] + diff = new_val - old_val + + if key == '夏普比率': + print(f" {key:<12} {old_val:>12.2f} {new_val:>12.2f} {diff:>+12.2f}") + else: + print(f" {key:<12} {old_val*100:>11.2f}% {new_val*100:>11.2f}% {diff*100:>+11.2f}%") + + # 5. 调仓日分析 + print("\n" + "=" * 80) + print(" 调仓日跳空分析") + print("=" * 80) + + # 识别调仓日 + position_changes = (positions != positions.shift(1)).any(axis=1) + rebalance_dates = positions[position_changes].index + + print(f"\n 总调仓次数: {len(rebalance_dates)}") + + # 分析调仓日的跳空 + gap_returns_all = [] + for date in rebalance_dates: + if date in close_df.index: + # 计算该日的平均跳空(所有持仓 ETF) + pos = positions.loc[date] + held_codes = pos[pos > 0].index + + if len(held_codes) > 0: + # 过滤掉不在 open_df 中的代码(如指数) + held_codes = [c for c in held_codes if c in open_df.columns] + if len(held_codes) == 0: + continue + + day_gap = open_df.loc[date][held_codes] + prev_close = close_df.shift(1).loc[date][held_codes] + gap = (day_gap - prev_close) / prev_close + gap_returns_all.append(gap.mean()) + + if gap_returns_all: + gap_series = pd.Series(gap_returns_all) + print(f"\n 调仓日跳空统计:") + print(f" 平均跳空: {gap_series.mean() * 100:+.3f}%") + print(f" 跳空标准差: {gap_series.std() * 100:.2f}%") + print(f" 最大向上跳空: {gap_series.max() * 100:+.2f}%") + print(f" 最大向下跳空: {gap_series.min() * 100:+.2f}%") + print(f" 向上跳空天数: {(gap_series > 0).sum()} ({(gap_series > 0).sum() / len(gap_series) * 100:.1f}%)") + print(f" 向下跳空天数: {(gap_series < 0).sum()} ({(gap_series < 0).sum() / len(gap_series) * 100:.1f}%)") + else: + print(f"\n ⚠ 无法计算调仓日跳空(数据缺失)") + + return old_kpi, new_kpi + + +def main(): + print("=" * 80) + print(" ETF 跳空收益影响测算") + print("=" * 80) + + # 1. 加载配置 + config_file = project_root / 'framework_v2' / 'strategies' / 'rotation' / 'config_simple.yaml' + print(f"\n[1] 加载配置: {config_file}") + config = load_config(str(config_file)) + + # 2. 获取 ETF 列表 + signal_to_trade = config.asset_pools.get_signal_to_trade_mapping() + trade_codes = list(set(signal_to_trade.values())) + # 过滤掉不是 ETF 的代码(如 931862.CSI) + trade_codes = [c for c in trade_codes if not c.endswith('.CSI')] + + print(f" ETF 数量: {len(trade_codes)}") + + # 3. 获取数据 + from datetime import date + start = config.backtest.start_date + end = config.backtest.end_date + if end is None: + end = date.today().strftime('%Y-%m-%d') + + etf_data = fetch_etf_data_with_ohlc(trade_codes, start, end) + + # 4. 计算跳空统计 + stats_df = calculate_gap_statistics(etf_data) + + # 5. 分析策略影响 + strategy = GlobalRotationStrategy(config) + old_kpi, new_kpi = analyze_strategy_gap_impact(strategy, etf_data) + + # 6. 结论 + print("\n" + "=" * 80) + print(" 结论与建议") + print("=" * 80) + + annual_diff = new_kpi['年化收益'] - old_kpi['年化收益'] + + if abs(annual_diff) < 0.01: # 差异 < 1% + print("\n ✓ 跳空影响较小(< 1%),可以继续使用 close-to-close 简化计算") + elif abs(annual_diff) < 0.03: # 差异 1-3% + print("\n ⚠ 跳空影响中等(1-3%),建议考虑使用分段计算提高精度") + else: # 差异 > 3% + print("\n ✗ 跳空影响显著(> 3%),强烈建议使用分段计算") + + print(f"\n 当前年化: {old_kpi['年化收益'] * 100:.2f}%") + print(f" 修正后年化: {new_kpi['年化收益'] * 100:.2f}%") + print(f" 差异: {annual_diff * 100:+.2f}%") + print("=" * 80) + + +if __name__ == '__main__': + main() diff --git a/framework_v2/scripts/verify_cum_return_fix.py b/framework_v2/scripts/verify_cum_return_fix.py new file mode 100644 index 0000000..bb975e5 --- /dev/null +++ b/framework_v2/scripts/verify_cum_return_fix.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +验证 cum_return_idx 和 cum_return_etf 是否独立计算 +""" + +import sys +from pathlib import Path +import json + +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +# 读取已有的 backtest_detail_v2.json +detail_path = project_root / 'framework_v2' / 'results' / 'backtest_detail_v2.json' + +if not detail_path.exists(): + print(f"❌ 文件不存在: {detail_path}") + print("请先运行: python framework_v2/scripts/export_backtest_detail.py") + sys.exit(1) + +print("=" * 80) +print(" 验证指数和 ETF 累计收益是否独立计算") +print("=" * 80) + +with open(detail_path, 'r') as f: + data = json.load(f) + +# 检查每日数据 +days = data['days'] +print(f"\n总天数: {len(days)}") + +# 统计有差异的天数 +diff_count = 0 +same_count = 0 +total_checked = 0 + +for day in days[:100]: # 检查前 100 天 + date = day['date'] + assets = day.get('assets', {}) + + for code, asset in assets.items(): + if not asset.get('is_held'): + continue + + cum_etf = asset.get('cum_return_etf') + cum_idx = asset.get('cum_return_idx') + + if cum_etf is not None and cum_idx is not None: + total_checked += 1 + + if abs(cum_etf - cum_idx) > 0.0001: # 差异超过 0.01% + diff_count += 1 + if diff_count <= 5: # 只显示前 5 个示例 + print(f"\n✓ {date} - {code}:") + print(f" ETF 累计收益: {cum_etf:.4f} ({cum_etf*100:.2f}%)") + print(f" 指数累计收益: {cum_idx:.4f} ({cum_idx*100:.2f}%)") + print(f" 差异: {abs(cum_etf - cum_idx)*100:.2f}%") + else: + same_count += 1 + +print(f"\n{'=' * 80}") +print(f"统计结果(前 100 天,持仓标的):") +print(f" 总检查次数: {total_checked}") +print(f" 有差异: {diff_count} ({diff_count/total_checked*100:.1f}%)") +print(f" 相同: {same_count} ({same_count/total_checked*100:.1f}%)") + +if diff_count > 0: + print(f"\n✅ 修复成功!指数和 ETF 累计收益已独立计算") +else: + print(f"\n❌ 仍有问题:指数和 ETF 累计收益完全相同") + print(" 需要重新生成 backtest_detail_v2.json") diff --git a/framework_v2/strategies/rotation/rotation.py b/framework_v2/strategies/rotation/rotation.py index 56ba20c..695bd39 100644 --- a/framework_v2/strategies/rotation/rotation.py +++ b/framework_v2/strategies/rotation/rotation.py @@ -795,19 +795,29 @@ class GlobalRotationStrategy(StrategyBase): trading_days_held = len(trading_calendar[(trading_calendar >= entry_dt) & (trading_calendar <= date)]) asset['holding_days'] = trading_days_held - # 累计收益 + # 累计收益(分别使用 ETF 和指数价格计算) if hs['entry_price'] and hs['entry_price'] > 0: + # ETF 累计收益 if code in etf_close_dict: - cur = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) - if cur and pd.notna(cur): - cum_ret = float(cur) / hs['entry_price'] - 1 - asset['cum_return_etf'] = self._safe_val(cum_ret, 4) - asset['cum_return_idx'] = self._safe_val(cum_ret, 4) + etf_cur = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + if etf_cur and pd.notna(etf_cur): + etf_cum_ret = float(etf_cur) / hs['entry_price'] - 1 + asset['cum_return_etf'] = self._safe_val(etf_cum_ret, 4) else: asset['cum_return_etf'] = None - asset['cum_return_idx'] = None else: asset['cum_return_etf'] = None + + # 指数累计收益(独立计算) + if code in index_close_dict: + idx_cur = index_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + idx_entry = index_close_dict[code].reindex(trading_calendar, method='ffill').get(entry_dt) + if idx_cur and idx_entry and pd.notna(idx_entry) and float(idx_entry) > 0: + idx_cum_ret = float(idx_cur) / float(idx_entry) - 1 + asset['cum_return_idx'] = self._safe_val(idx_cum_ret, 4) + else: + asset['cum_return_idx'] = None + else: asset['cum_return_idx'] = None else: asset['cum_return_etf'] = None