diff --git a/scripts/export_backtest_detail.py b/scripts/export_backtest_detail.py new file mode 100644 index 0000000..b3fca91 --- /dev/null +++ b/scripts/export_backtest_detail.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +""" +导出回测逐日明细到 JSON,供 HTML 回放器加载。 + +模式 B:指数信号 + ETF 收益(2020-01-01 ~ 2026-05-19) + +用法: + python scripts/export_backtest_detail.py +""" + +import sys +import json +import math +from pathlib import Path + +import numpy as np +import pandas as pd +import yaml + +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from dotenv import load_dotenv +load_dotenv() + +from datasource.tushare_source import TushareSource +from datasource.flask_api_source import FlaskAPIDataSource +from strategies.shared.factors.momentum import MomentumFactor +from strategies.shared.signals.selectors import TopNSelector +from framework.execution import BacktestExecutor + +# ==================== 加载配置 ==================== +config_path = project_root / 'strategies' / 'rotation' / 'config.yaml' +with open(config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + +CODE_LIST = config['code_list'] +SELECT_NUM = config['select_num'] +N_DAYS = config['n_days'] +TRADE_COST = config['trade_cost'] +BOND_THRESHOLD = config.get('bond_threshold', {}) +BOND_CODE = BOND_THRESHOLD.get('bond_code', '931862.CSI') +BOND_RATIO = BOND_THRESHOLD.get('ratio', 1.0) + + +def fetch_all_data(start_date='2018-01-01', end_date='2026-05-19'): + ts = TushareSource() + api = FlaskAPIDataSource() # 默认使用 k3s.tokenpluse.xyz + + index_data = {} + etf_data = {} + etf_code_map = {} + + # 统一使用 Flask API 获取所有指数数据(与 strategy.py 保持一致) + print("[指数数据] - 通过 Flask API (k3s服务) 获取") + index_codes = list(CODE_LIST.keys()) + index_ohlcv_data = api.fetch_batch(index_codes, start_date, end_date) + + for code, df in index_ohlcv_data.items(): + if df is not None and 'close' in df.columns and len(df) > 0: + index_data[code] = df + name = CODE_LIST.get(code, {}).get('name', code) + print(f" {code} ({name})... {len(df)}天") + else: + name = CODE_LIST.get(code, {}).get('name', code) + print(f" {code} ({name})... 失败") + + print("\n[ETF数据]") + etf_nav_data = {} + for code, cfg in CODE_LIST.items(): + etf_code = cfg.get('etf') + if etf_code is None: + continue + etf_code_map[code] = etf_code + name = cfg['name'] + print(f" {etf_code} ({name})...", end=' ') + + df = ts.fetch_etf_adj(etf_code, start_date, end_date) + if df is not None and 'close_hfq' in df.columns and len(df) > 0: + adj_ratio = df['close_hfq'] / df['close'] + df['open_hfq'] = df['open'] * adj_ratio + etf_data[code] = df + print(f"{len(df)}天", end='') + else: + print("失败") + continue + + # 获取ETF净值(用于计算溢价率) + nav_df = ts.fetch_etf_nav(etf_code, start_date, end_date) + if nav_df is not None and 'nav' in nav_df.columns and len(nav_df) > 0: + etf_nav_data[code] = nav_df['nav'] + print(f" nav={len(nav_df)}天") + else: + print(" nav=无") + + return index_data, etf_data, etf_code_map, etf_nav_data + + +def compute_factors(price_data, n_days, trade_dates): + """先在原始交易日历上计算因子,再 ffill 对齐到 A 股日历(与 strategy.py 一致)""" + factor = MomentumFactor(n_days=n_days, weighted=True, crash_filter=True) + factor_values = {} + for code, df in price_data.items(): + if 'close' not in df.columns: + continue + close_series = df['close'].dropna() + if len(close_series) == 0: + continue + values = factor.compute(pd.DataFrame({'close': close_series})) + factor_values[code] = values.reindex(trade_dates, method='ffill') + return pd.DataFrame(factor_values) + + +def generate_signals(factor_df, group_mapping): + selector = TopNSelector( + select_num=SELECT_NUM, + group_mapping=group_mapping, + min_score=0.0, + rebalance_days=1, + rebalance_threshold=0.0, + bond_threshold_config=BOND_THRESHOLD + ) + return selector.generate(factor_df) + + +def safe_val(v, decimals=4): + if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): + return None + if isinstance(v, (np.floating, float)): + return round(float(v), decimals) + if isinstance(v, (np.integer, int)): + return int(v) + return v + + +def main(): + from datetime import datetime + backtest_start = '2020-01-01' + backtest_end = datetime.now().strftime('%Y-%m-%d') # 动态获取当前日期 + + print("=" * 60) + print(" 导出回测逐日明细 (模式B: 指数信号 + ETF收益)") + print("=" * 60) + + # 1. 获取数据 + print("\n[1] 获取数据...") + index_data, etf_data, etf_code_map, etf_nav_data = fetch_all_data() + + # 2. A股交易日历 + print("\n[2] 获取A股交易日历...") + ts = TushareSource() + a_share_dates = ts.fetch_trade_cal(backtest_start, backtest_end) + print(f" {len(a_share_dates)} 天") + + # 3. 分组映射 + group_mapping = {} + for code, cfg in CODE_LIST.items(): + if isinstance(cfg, dict): + group_mapping[code] = cfg.get('market', 'default') + + valid_codes = [c for c in CODE_LIST if c in index_data] + + # 4. 计算因子(指数信号) + print("\n[3] 计算指数动量因子...") + idx_price_data = {} + for code in valid_codes: + if code in index_data and 'close' in index_data[code].columns: + idx_price_data[code] = index_data[code] + factor_df = compute_factors(idx_price_data, N_DAYS, a_share_dates) + print(f" {len(factor_df.columns)} 只, {len(factor_df)} 天") + + # 5. 生成信号 + print("\n[4] 生成信号...") + signals = generate_signals(factor_df, group_mapping) + print(f" {len(signals)} 天") + + # 6. 准备ETF收益率(模式B) + print("\n[5] 准备ETF收益率...") + etf_close_hfq_aligned = {} + etf_close_aligned = {} + etf_open_aligned = {} + etf_close_hfq_raw = {} + index_close_aligned = {} + returns_etf = {} + returns_idx = {} + + for code in valid_codes: + # 指数收盘价和收益率 + if code in index_data and 'close' in index_data[code].columns: + ic = index_data[code]['close'].dropna() + ic_a = ic.reindex(a_share_dates, method='ffill') + index_close_aligned[code] = ic_a + returns_idx[code] = ic_a.pct_change(fill_method=None) + + # ETF价格和收益率 + etf_code = etf_code_map.get(code) + if etf_code and code in etf_data: + df = etf_data[code] + chfq = df['close_hfq'].dropna() + chfq_a = chfq.reindex(a_share_dates, method='ffill') + etf_close_hfq_aligned[code] = chfq_a + etf_close_hfq_raw[code] = chfq + returns_etf[f'日收益率_{code}'] = chfq_a.pct_change(fill_method=None) + + ec = df['close'].reindex(a_share_dates, method='ffill') + etf_close_aligned[code] = ec + eo = df['open'].reindex(a_share_dates, method='ffill') + etf_open_aligned[code] = eo + elif code in index_data and 'close' in index_data[code].columns: + ic = index_data[code]['close'].dropna() + ic_a = ic.reindex(a_share_dates, method='ffill') + returns_etf[f'日收益率_{code}'] = ic_a.pct_change(fill_method=None) + + returns_etf_df = pd.DataFrame(returns_etf) + + # 6.5 溢价率:(ETF收盘价 - 单位净值) / 单位净值 + etf_premium_aligned = {} + for code in valid_codes: + if code in etf_nav_data and code in etf_close_aligned: + nav_raw = etf_nav_data[code] + nav_raw = nav_raw[~nav_raw.index.duplicated(keep='last')] + nav = nav_raw.reindex(a_share_dates, method='ffill') + close = etf_close_aligned[code] + premium = (close - nav) / nav + etf_premium_aligned[code] = premium + + # 7. 执行回测获取净值 + print("\n[6] 执行回测...") + common_dates = signals.index.intersection(returns_etf_df.index) + signals_aligned = signals.loc[common_dates] + returns_aligned = returns_etf_df.loc[common_dates] + + executor = BacktestExecutor( + initial_capital=100000, + trade_cost=TRADE_COST, + select_num=SELECT_NUM + ) + portfolio = executor.execute(signals_aligned, returns_aligned) + result = portfolio.backtest_result + nav_series_raw = result['策略净值'] + daily_ret_raw = result['策略日收益率'] + + # 扩展到所有common_dates,信号前的日期 nav=1.0, return=0.0 + nav_series = nav_series_raw.reindex(common_dates) + daily_ret_series = daily_ret_raw.reindex(common_dates, fill_value=0.0) + first_valid = nav_series.first_valid_index() + if first_valid is not None: + nav_series.loc[:first_valid] = nav_series.loc[:first_valid].fillna(1.0) + nav_series = nav_series.ffill() + + print(f" 终值: {nav_series.iloc[-1]:.4f}") + + # 8. 构建逐日明细 + print("\n[7] 构建逐日明细...") + + # 持仓跟踪状态 + holdings_state = {} # {code: {'entry_date': str, 'entry_price': float}} + prev_holdings = set() + + days_list = [] + signal_col = 'signal' + + for i, date in enumerate(common_dates): + sig_val = signals_aligned.loc[date, signal_col] if signal_col in signals_aligned.columns else '' + current_holdings = set(str(sig_val).split(',')) if pd.notna(sig_val) and sig_val else set() + current_holdings.discard('') + + # 调仓检测 + added = list(current_holdings - prev_holdings) + removed = list(prev_holdings - current_holdings) + is_rebalance = len(added) > 0 or len(removed) > 0 + + # 更新持仓状态 + for code in removed: + holdings_state.pop(code, None) + for code in added: + entry_price_etf = None + entry_price_idx = None + if code in etf_close_hfq_aligned: + ep = etf_close_hfq_aligned[code].get(date) + if pd.notna(ep): + entry_price_etf = float(ep) + if code in index_close_aligned: + ep = index_close_aligned[code].get(date) + if pd.notna(ep): + entry_price_idx = float(ep) + holdings_state[code] = { + 'entry_date': date.strftime('%Y-%m-%d'), + 'entry_price_etf': entry_price_etf, + 'entry_price_idx': entry_price_idx, + } + + # 动态阈值 + factor_scores = {} + for code in valid_codes: + if code in factor_df.columns: + v = factor_df.loc[date, code] if date in factor_df.index else np.nan + if pd.notna(v): + factor_scores[code] = float(v) + + bond_score = factor_scores.get(BOND_CODE) + if BOND_THRESHOLD.get('enabled') and bond_score is not None and bond_score >= 0: + threshold = bond_score * BOND_RATIO + else: + threshold = 0.0 + + # 排名(按动量降序,排除BOND) + non_bond_scores = {k: v for k, v in factor_scores.items() + if group_mapping.get(k) != 'BOND'} + sorted_codes = sorted(non_bond_scores.keys(), + key=lambda c: non_bond_scores[c], reverse=True) + rank_map = {c: r + 1 for r, c in enumerate(sorted_codes)} + # BOND不参与排名 + if BOND_CODE in factor_scores: + rank_map[BOND_CODE] = None + + # 每标的详情 + assets = {} + for code in valid_codes: + asset = {} + + # 指数收盘价 + if code in index_close_aligned: + v = index_close_aligned[code].get(date) + asset['index_close'] = safe_val(v, 2) + else: + asset['index_close'] = None + + # 动量 + mom = factor_scores.get(code) + asset['momentum'] = safe_val(mom, 4) + + # 排名 + asset['rank'] = rank_map.get(code) + + # 阈值 + asset['threshold'] = safe_val(threshold, 4) + asset['above_threshold'] = mom >= threshold if mom is not None else False + + # ETF价格 + if code in etf_close_aligned: + asset['etf_close'] = safe_val(etf_close_aligned[code].get(date), 3) + else: + asset['etf_close'] = None + + if code in etf_open_aligned: + asset['etf_open'] = safe_val(etf_open_aligned[code].get(date), 3) + else: + asset['etf_open'] = None + + if code in etf_close_hfq_aligned: + asset['etf_close_hfq'] = safe_val(etf_close_hfq_aligned[code].get(date), 4) + else: + asset['etf_close_hfq'] = None + + # 溢价率 + if code in etf_premium_aligned: + asset['premium'] = safe_val(etf_premium_aligned[code].get(date), 4) + else: + asset['premium'] = None + + # ETF日收益率 + ret_col = f'日收益率_{code}' + if ret_col in returns_etf_df.columns: + asset['etf_return_ctc'] = safe_val(returns_etf_df.loc[date, ret_col], 6) + else: + asset['etf_return_ctc'] = None + + # 指数日收益率 + if code in returns_idx: + asset['index_return'] = safe_val(returns_idx[code].get(date), 6) + else: + asset['index_return'] = None + + # 持仓状态 + is_held = code in current_holdings + asset['is_held'] = is_held + + if is_held and code in holdings_state: + hs = holdings_state[code] + asset['entry_date'] = hs['entry_date'] + asset['entry_price_etf'] = safe_val(hs['entry_price_etf'], 4) + asset['entry_price_idx'] = safe_val(hs['entry_price_idx'], 4) + + entry_dt = pd.Timestamp(hs['entry_date']) + trading_days_held = len(common_dates[(common_dates >= entry_dt) & (common_dates <= date)]) + asset['holding_days'] = trading_days_held + + # ETF累计收益 + if hs['entry_price_etf'] and hs['entry_price_etf'] > 0: + cur = etf_close_hfq_aligned[code].get(date) if code in etf_close_hfq_aligned else None + if cur and pd.notna(cur): + asset['cum_return_etf'] = safe_val(float(cur) / hs['entry_price_etf'] - 1, 4) + else: + asset['cum_return_etf'] = None + else: + asset['cum_return_etf'] = None + + # 指数累计收益 + if hs['entry_price_idx'] and hs['entry_price_idx'] > 0: + cur = index_close_aligned[code].get(date) if code in index_close_aligned else None + if cur and pd.notna(cur): + asset['cum_return_idx'] = safe_val(float(cur) / hs['entry_price_idx'] - 1, 4) + else: + asset['cum_return_idx'] = None + else: + asset['cum_return_idx'] = None + else: + asset['entry_date'] = None + asset['entry_price_etf'] = None + asset['entry_price_idx'] = None + asset['holding_days'] = 0 + asset['cum_return_etf'] = None + asset['cum_return_idx'] = None + + assets[code] = asset + + # 构建当天记录 + nav_val = nav_series.loc[date] if date in nav_series.index else None + ret_val = daily_ret_series.loc[date] if date in daily_ret_series.index else None + + day_record = { + 'date': date.strftime('%Y-%m-%d'), + 'nav': safe_val(nav_val, 4), + 'daily_return': safe_val(ret_val, 6), + 'is_rebalance': is_rebalance, + 'holdings': sorted(list(current_holdings)), + 'added': sorted(added), + 'removed': sorted(removed), + 'assets': assets + } + days_list.append(day_record) + prev_holdings = current_holdings + + # 9. 构建元数据 + codes_meta = {} + for code, cfg in CODE_LIST.items(): + codes_meta[code] = { + 'name': cfg['name'], + 'etf': cfg.get('etf'), + 'market': cfg.get('market') + } + + output = { + 'meta': { + 'mode': 'B: 指数信号 + ETF收益', + 'start_date': common_dates[0].strftime('%Y-%m-%d'), + 'end_date': common_dates[-1].strftime('%Y-%m-%d'), + 'total_days': len(common_dates), + 'select_num': SELECT_NUM, + 'n_days': N_DAYS, + 'trade_cost': TRADE_COST, + 'bond_threshold': { + 'enabled': BOND_THRESHOLD.get('enabled', False), + 'bond_code': BOND_CODE, + 'ratio': BOND_RATIO + }, + 'codes': codes_meta + }, + 'days': days_list + } + + # 10. 输出 + output_path = project_root / 'results' / 'backtest_detail.json' + print(f"\n[8] 写入 {output_path}...") + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(output, f, ensure_ascii=False) + + file_size_mb = output_path.stat().st_size / 1024 / 1024 + print(f" 大小: {file_size_mb:.1f} MB") + print(f" 天数: {len(days_list)}") + print(f" 标的: {len(valid_codes)}") + print(" 完成!") + + +if __name__ == '__main__': + main() diff --git a/strategies/shared/factors/momentum.py b/strategies/shared/factors/momentum.py index 02fae44..f83e293 100644 --- a/strategies/shared/factors/momentum.py +++ b/strategies/shared/factors/momentum.py @@ -62,7 +62,14 @@ class MomentumFactor(FactorBase): if len(prices) < 5: return 0.0 + # 价格下界 clip,防止 log(0) 或 log(负数) + prices = np.clip(prices, 0.01, None) y = np.log(prices) + + # 异常值检测 + if np.any(np.isnan(y)) or np.any(np.isinf(y)): + return 0.0 + x = np.arange(len(y)) weights = np.linspace(1, 2, len(y)) diff --git a/tests/test_trading_calendar.py b/tests/test_trading_calendar.py new file mode 100644 index 0000000..5dfbf13 --- /dev/null +++ b/tests/test_trading_calendar.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +""" +测试交易日历 API +""" + +import sys +from pathlib import Path +import requests + +# Flask 服务地址 +FLASK_API_URL = "http://localhost:80" + +def test_calendar_api(): + """测试交易日历 API""" + print("\n" + "="*80) + print("📅 交易日历 API 测试") + print("="*80) + + # 测试 1: A 股 + print("\n[1] 测试 A 股交易日历...") + url = f"{FLASK_API_URL}/api/v1/trading-calendar" + params = {"market": "A", "start": "2024-01-01", "end": "2024-01-31"} + + try: + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + print(f" ✅ 成功: {data['count']} 个交易日") + print(f" 市场: {data['market']}") + print(f" 交易所: {data['exchange']}") + print(f" 日期范围: {data['start']} ~ {data['end']}") + print(f" 前5个交易日: {data['trading_dates'][:5]}") + else: + print(f" ❌ 失败: {response.status_code}") + print(f" 响应: {response.json()}") + except Exception as e: + print(f" ❌ 异常: {e}") + + # 测试 2: 美股 + print("\n[2] 测试美股交易日历...") + params = {"market": "US", "start": "2024-01-01", "end": "2024-01-31"} + + try: + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + print(f" ✅ 成功: {data['count']} 个交易日") + print(f" 市场: {data['market']}") + print(f" 交易所: {data['exchange']}") + print(f" 前5个交易日: {data['trading_dates'][:5]}") + else: + print(f" ❌ 失败: {response.status_code}") + print(f" 响应: {response.json()}") + except Exception as e: + print(f" ❌ 异常: {e}") + + # 测试 3: 港股 + print("\n[3] 测试港股交易日历...") + params = {"market": "HK", "start": "2024-01-01", "end": "2024-01-31"} + + try: + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + print(f" ✅ 成功: {data['count']} 个交易日") + print(f" 市场: {data['market']}") + print(f" 交易所: {data['exchange']}") + print(f" 前5个交易日: {data['trading_dates'][:5]}") + else: + print(f" ❌ 失败: {response.status_code}") + print(f" 响应: {response.json()}") + except Exception as e: + print(f" ❌ 异常: {e}") + + # 测试 4: 日历信息 + print("\n[4] 测试日历信息...") + url_info = f"{FLASK_API_URL}/api/v1/calendar/info" + + try: + response = requests.get(url_info, timeout=10) + if response.status_code == 200: + data = response.json() + print(f" ✅ 成功") + print(f" 支持的市场:") + for market, info in data.get('supported_markets', {}).items(): + print(f" {market}: {info['name']} ({info['method']})") + print(f" pandas_market_calendars: {'✅ 已安装' if data.get('pandas_market_calendars_installed') else '❌ 未安装'}") + else: + print(f" ❌ 失败: {response.status_code}") + except Exception as e: + print(f" ❌ 异常: {e}") + +def test_local_fetcher(): + """测试本地 UniversalDataFetcher""" + print("\n" + "="*80) + print("🧪 本地 UniversalDataFetcher 测试") + print("="*80) + + sys.path.insert(0, str(Path(__file__).parent.parent)) + + try: + from datasource.universal_fetcher import UniversalDataFetcher + + fetcher = UniversalDataFetcher() + + # 测试 A 股 + print("\n[1] A 股交易日历 (2024年)...") + cal_a = fetcher.get_trading_calendar('A', '2024-01-01', '2024-12-31') + print(f" ✅ {len(cal_a)} 个交易日") + print(f" 前5天: {list(cal_a[:5])}") + + # 测试美股 + print("\n[2] 美股交易日历 (2024年)...") + cal_us = fetcher.get_trading_calendar('US', '2024-01-01', '2024-12-31') + print(f" ✅ {len(cal_us)} 个交易日") + print(f" 前5天: {list(cal_us[:5])}") + + # 测试港股 + print("\n[3] 港股交易日历 (2024年)...") + cal_hk = fetcher.get_trading_calendar('HK', '2024-01-01', '2024-12-31') + print(f" ✅ {len(cal_hk)} 个交易日") + print(f" 前5天: {list(cal_hk[:5])}") + + # 日历信息 + print("\n[4] 日历支持信息...") + info = fetcher.get_calendar_info() + print(f" ✅ 支持 {len(info['supported_markets'])} 个市场") + + except Exception as e: + print(f" ❌ 失败: {e}") + import traceback + traceback.print_exc() + +def main(): + print("\n" + "="*80) + print("📅 交易日历功能测试") + print("="*80) + + # 测试 1: 本地 fetcher + test_local_fetcher() + + # 测试 2: Flask API(如果服务在运行) + print("\n" + "="*80) + print("🌐 测试 Flask API 端点") + print("="*80) + print(f"\nAPI 地址: {FLASK_API_URL}") + print("注意: 需要 Flask 服务正在运行") + + try: + response = requests.get(f"{FLASK_API_URL}/health", timeout=3) + if response.status_code == 200: + print("✅ Flask 服务可访问") + test_calendar_api() + else: + print(f"⚠️ Flask 服务返回 {response.status_code},跳过 API 测试") + except: + print("⚠️ Flask 服务未运行,跳过 API 测试") + + print("\n" + "="*80) + print("✅ 测试完成") + print("="*80) + +if __name__ == "__main__": + main() diff --git a/tests/verify_fix_result.py b/tests/verify_fix_result.py new file mode 100644 index 0000000..a620556 --- /dev/null +++ b/tests/verify_fix_result.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +""" +验证修复后的回测结果是否与文档一致 + +文档预期结果 (Mode A - 指数信号+指数收益): + CAGR: 11.80%, 最大回撤: -29.49%, 夏普: 0.818, Calmar: 0.400 + +文档预期结果 (Mode B - 指数信号+ETF收益): + CAGR: 28.07%, 最大回撤: -13.34%, 夏普: 1.685, Calmar: 2.104 +""" + +import sys +from pathlib import Path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from dotenv import load_dotenv +load_dotenv() + +import pandas as pd +import numpy as np +import yaml +from datetime import datetime +from strategies.rotation.strategy import RotationStrategy + + +def calculate_metrics(nav: pd.Series) -> dict: + """计算绩效指标""" + start_date = nav.index[0] + end_date = nav.index[-1] + days = (end_date - start_date).days + years = days / 365 + + total_return = nav.iloc[-1] - 1 + cagr = (nav.iloc[-1] / nav.iloc[0]) ** (1/years) - 1 + + daily_ret = nav.pct_change().dropna() + sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0 + + peak = nav.cummax() + drawdown = (nav - peak) / peak + max_dd = drawdown.min() + + calmar = cagr / abs(max_dd) if max_dd != 0 else 0 + win_rate = (daily_ret > 0).sum() / len(daily_ret) + + return { + 'start_date': start_date.strftime('%Y-%m-%d'), + 'end_date': end_date.strftime('%Y-%m-%d'), + 'years': years, + 'days': len(nav), + 'total_return': total_return, + 'cagr': cagr, + 'max_dd': max_dd, + 'sharpe': sharpe, + 'calmar': calmar, + 'win_rate': win_rate + } + + +def main(): + # 加载配置 + config_path = project_root / 'strategies/rotation/config.yaml' + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + + # 设置回测区间(文档中的测试区间) + config['start_date'] = '2020-01-02' + config['end_date'] = '2026-05-19' + + print('='*70) + print('修复后回测结果验证') + print('='*70) + print(f'回测区间: {config["start_date"]} ~ {config["end_date"]}') + + # 初始化策略 + strategy = RotationStrategy(config) + + # 获取数据并执行回测 + print('\n获取数据...') + data = strategy.get_data(use_flask_api=False) + + print('\n执行回测...') + result = strategy.run_backtest(data=data) + + if result.get('result') is None: + print('❌ 回测未生成结果') + return + + # 计算指标 + nav = result['result']['策略净值'] + metrics = calculate_metrics(nav) + + # 输出结果 + print('\n' + '='*70) + print('修复后回测结果') + print('='*70) + print(f"回测区间: {metrics['start_date']} ~ {metrics['end_date']}") + print(f"回测年数: {metrics['years']:.2f} 年") + print(f"交易天数: {metrics['days']} 天") + print('-'*70) + print(f"CAGR: {metrics['cagr']:.2%}") + print(f"最大回撤: {metrics['max_dd']:.2%}") + print(f"夏普比率: {metrics['sharpe']:.3f}") + print(f"Calmar比率: {metrics['calmar']:.3f}") + print(f"日胜率: {metrics['win_rate']:.2%}") + print(f"累计收益: {metrics['total_return']:.2%}") + print(f"调仓次数: {len(result.get('rebalance_events', []))} 次") + print('='*70) + + # 文档预期结果对比 + print('\n' + '='*70) + print('文档预期结果对比') + print('='*70) + print("\nMode A (指数信号 → 指数收益):") + print(" 预期: CAGR 11.80%, MaxDD -29.49%, Sharpe 0.818, Calmar 0.400") + + print("\nMode B (指数信号 → ETF收益):") + print(" 预期: CAGR 28.07%, MaxDD -13.34%, Sharpe 1.685, Calmar 2.104") + + # 判断当前模式 + print('\n' + '-'*70) + cagr_diff_a = abs(metrics['cagr'] - 0.1180) + cagr_diff_b = abs(metrics['cagr'] - 0.2807) + + if cagr_diff_a < 0.03: + print(f"✓ 当前结果接近 Mode A (CAGR差异: {cagr_diff_a:.2%})") + print(" 说明: 当前回测使用指数收盘价计算收益") + elif cagr_diff_b < 0.03: + print(f"✓ 当前结果接近 Mode B (CAGR差异: {cagr_diff_b:.2%})") + print(" 说明: 当前回测使用ETF价格计算收益") + else: + print(f"⚠ 当前结果与文档预期有差异") + print(f" Mode A CAGR差异: {cagr_diff_a:.2%}") + print(f" Mode B CAGR差异: {cagr_diff_b:.2%}") + + print('='*70) + + return metrics + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/tests/verify_mode_b.py b/tests/verify_mode_b.py new file mode 100644 index 0000000..184b983 --- /dev/null +++ b/tests/verify_mode_b.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +验证 Mode B: 指数信号 → ETF收益 + +文档预期结果: + CAGR: 28.07%, 最大回撤: -13.34%, 夏普: 1.685, Calmar: 2.104 +""" + +import sys +from pathlib import Path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from dotenv import load_dotenv +load_dotenv() + +import pandas as pd +import numpy as np +import yaml +from datetime import datetime +from strategies.rotation.strategy import RotationStrategy + + +def calculate_metrics(nav: pd.Series) -> dict: + """计算绩效指标""" + start_date = nav.index[0] + end_date = nav.index[-1] + days = (end_date - start_date).days + years = days / 365 + + total_return = nav.iloc[-1] - 1 + cagr = (nav.iloc[-1] / nav.iloc[0]) ** (1/years) - 1 + + daily_ret = nav.pct_change().dropna() + sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0 + + peak = nav.cummax() + drawdown = (nav - peak) / peak + max_dd = drawdown.min() + + calmar = cagr / abs(max_dd) if max_dd != 0 else 0 + win_rate = (daily_ret > 0).sum() / len(daily_ret) + + return { + 'start_date': start_date.strftime('%Y-%m-%d'), + 'end_date': end_date.strftime('%Y-%m-%d'), + 'years': years, + 'days': len(nav), + 'total_return': total_return, + 'cagr': cagr, + 'max_dd': max_dd, + 'sharpe': sharpe, + 'calmar': calmar, + 'win_rate': win_rate + } + + +def run_mode_b_backtest(data: dict, signals: pd.DataFrame, valid_codes: list, + etf_code_map: dict, a_share_dates: pd.DatetimeIndex, + trade_cost: float, select_num: int) -> dict: + """ + Mode B: 使用ETF价格计算收益 + + Args: + data: 包含 etf_data 的数据字典 + signals: 指数生成的信号 + valid_codes: 指数代码列表 + etf_code_map: {指数代码: ETF代码} 映射 + a_share_dates: A股交易日历 + trade_cost: 交易成本 + select_num: 选股数量 + """ + from framework.execution import BacktestExecutor + + etf_data = data.get('etf_data') + if etf_data is None: + print("❌ ETF数据不可用") + return {'result': None} + + # 将信号对齐到 A 股日历 + if a_share_dates is not signals.index: + signals = signals.reindex(a_share_dates, method='ffill').dropna(subset=[signals.columns[0]]) + + # 使用ETF收盘价计算收益率 + returns_data = {} + for code in valid_codes: + etf_code = etf_code_map.get(code) + if etf_code and etf_code in etf_data.columns: + etf_close = etf_data[etf_code].dropna() + # 对齐到A股日历 + etf_aligned = etf_close.reindex(a_share_dates, method='ffill') + returns_aligned = etf_aligned.pct_change(fill_method=None) + # 使用指数代码作为列名(与信号匹配) + returns_data[f'日收益率_{code}'] = returns_aligned + else: + # 没有ETF映射的标的,回退使用指数数据 + index_data = data.get('index_data', {}) + if code in index_data and 'close' in index_data[code].columns: + close_series = index_data[code]['close'].dropna() + close_aligned = close_series.reindex(a_share_dates, method='ffill') + returns_data[f'日收益率_{code}'] = close_aligned.pct_change(fill_method=None) + + returns_df = pd.DataFrame(returns_data) + + # 对齐日期 + common_dates = signals.index.intersection(returns_df.index) + signals = signals.loc[common_dates] + returns_df = returns_df.loc[common_dates] + + print(f" Mode B 对齐后日期: {len(common_dates)} 天") + print(f" 使用ETF计算收益: {len([c for c in valid_codes if etf_code_map.get(c)])} 只") + + executor = BacktestExecutor( + initial_capital=100000, + trade_cost=trade_cost, + select_num=select_num + ) + + portfolio = executor.execute(signals, returns_df) + + if hasattr(portfolio, 'backtest_result'): + return {'result': portfolio.backtest_result, 'portfolio': portfolio} + + return {'result': None} + + +def main(): + # 加载配置 + config_path = project_root / 'strategies/rotation/config.yaml' + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + + # 设置回测区间 + config['start_date'] = '2020-01-02' + config['end_date'] = '2026-05-19' + + print('='*70) + print('Mode B 验证: 指数信号 → ETF收益') + print('='*70) + + # 初始化策略 + strategy = RotationStrategy(config) + + # 获取数据 + print('\n获取数据...') + data = strategy.get_data(use_flask_api=False) + + # 计算因子(使用指数数据) + print('\n计算因子(指数信号)...') + factor_df = strategy.compute_factors(data) + + # 生成信号 + print('\n生成信号...') + signals = strategy.generate_signals(factor_df) + + # 执行 Mode B 回测 + print('\n执行 Mode B 回测(ETF收益)...') + result_b = run_mode_b_backtest( + data=data, + signals=signals, + valid_codes=data['valid_codes'], + etf_code_map=data['etf_code_map'], + a_share_dates=data.get('a_share_dates'), + trade_cost=config.get('trade_cost', 0.001), + select_num=config.get('select_num', 3) + ) + + if result_b.get('result') is None: + print('❌ Mode B 回测未生成结果') + return + + # 计算指标 + nav_b = result_b['result']['策略净值'] + metrics_b = calculate_metrics(nav_b) + + # 输出结果 + print('\n' + '='*70) + print('Mode B 回测结果') + print('='*70) + print(f"回测区间: {metrics_b['start_date']} ~ {metrics_b['end_date']}") + print(f"回测年数: {metrics_b['years']:.2f} 年") + print(f"交易天数: {metrics_b['days']} 天") + print('-'*70) + print(f"CAGR: {metrics_b['cagr']:.2%}") + print(f"最大回撤: {metrics_b['max_dd']:.2%}") + print(f"夏普比率: {metrics_b['sharpe']:.3f}") + print(f"Calmar比率: {metrics_b['calmar']:.3f}") + print(f"日胜率: {metrics_b['win_rate']:.2%}") + print(f"累计收益: {metrics_b['total_return']:.2%}") + print('='*70) + + # 文档预期对比 + print('\n文档预期 (Mode B):') + print(' CAGR: 28.07%, MaxDD -13.34%, Sharpe 1.685, Calmar 2.104') + + cagr_diff = abs(metrics_b['cagr'] - 0.2807) + print(f'\nCAGR差异: {cagr_diff:.2%}') + + if cagr_diff < 0.05: + print('✓ 结果与文档预期基本一致') + else: + print('⚠ 结果与文档预期有差异') + + return metrics_b + + +if __name__ == '__main__': + main() \ No newline at end of file