#!/usr/bin/env python3 """ 导出回测逐日明细到 JSON,供 HTML 回放器加载。 模式 B:指数信号 + ETF 收益(2020-01-01 ~ 2026-05-19) 用法: python scripts/export_backtest_detail.py """ import sys import json import math from pathlib import Path import numpy as np import pandas as pd import yaml project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv() from datasource.tushare_source import TushareSource from datasource.flask_api_source import FlaskAPIDataSource from strategies.shared.factors.momentum import MomentumFactor from strategies.shared.signals.selectors import TopNSelector from framework.execution import BacktestExecutor # ==================== 加载配置 ==================== config_path = project_root / 'strategies' / 'rotation' / 'config.yaml' with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) CODE_LIST = config['code_list'] SELECT_NUM = config['select_num'] N_DAYS = config['n_days'] TRADE_COST = config['trade_cost'] BOND_THRESHOLD = config.get('bond_threshold', {}) BOND_CODE = BOND_THRESHOLD.get('bond_code', '931862.CSI') BOND_RATIO = BOND_THRESHOLD.get('ratio', 1.0) def fetch_all_data(start_date='2018-01-01', end_date='2026-05-19'): ts = TushareSource() api = FlaskAPIDataSource() # 默认使用 k3s.tokenpluse.xyz index_data = {} etf_data = {} etf_code_map = {} # 统一使用 Flask API 获取所有指数数据(与 strategy.py 保持一致) print("[指数数据] - 通过 Flask API (k3s服务) 获取") index_codes = list(CODE_LIST.keys()) index_ohlcv_data = api.fetch_batch(index_codes, start_date, end_date) for code, df in index_ohlcv_data.items(): if df is not None and 'close' in df.columns and len(df) > 0: index_data[code] = df name = CODE_LIST.get(code, {}).get('name', code) print(f" {code} ({name})... {len(df)}天") else: name = CODE_LIST.get(code, {}).get('name', code) print(f" {code} ({name})... 失败") print("\n[ETF数据]") etf_nav_data = {} for code, cfg in CODE_LIST.items(): etf_code = cfg.get('etf') if etf_code is None: continue etf_code_map[code] = etf_code name = cfg['name'] print(f" {etf_code} ({name})...", end=' ') df = ts.fetch_etf_adj(etf_code, start_date, end_date) if df is not None and 'close_hfq' in df.columns and len(df) > 0: adj_ratio = df['close_hfq'] / df['close'] df['open_hfq'] = df['open'] * adj_ratio etf_data[code] = df print(f"{len(df)}天", end='') else: print("失败") continue # 获取ETF净值(用于计算溢价率) nav_df = ts.fetch_etf_nav(etf_code, start_date, end_date) if nav_df is not None and 'nav' in nav_df.columns and len(nav_df) > 0: etf_nav_data[code] = nav_df['nav'] print(f" nav={len(nav_df)}天") else: print(" nav=无") return index_data, etf_data, etf_code_map, etf_nav_data def compute_factors(price_data, n_days, trade_dates): """先在原始交易日历上计算因子,再 ffill 对齐到 A 股日历(与 strategy.py 一致)""" factor = MomentumFactor(n_days=n_days, weighted=True, crash_filter=True) factor_values = {} for code, df in price_data.items(): if 'close' not in df.columns: continue close_series = df['close'].dropna() if len(close_series) == 0: continue values = factor.compute(pd.DataFrame({'close': close_series})) factor_values[code] = values.reindex(trade_dates, method='ffill') return pd.DataFrame(factor_values) def generate_signals(factor_df, group_mapping): selector = TopNSelector( select_num=SELECT_NUM, group_mapping=group_mapping, min_score=0.0, rebalance_days=1, rebalance_threshold=0.0, bond_threshold_config=BOND_THRESHOLD ) return selector.generate(factor_df) def safe_val(v, decimals=4): if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): return None if isinstance(v, (np.floating, float)): return round(float(v), decimals) if isinstance(v, (np.integer, int)): return int(v) return v def main(): from datetime import datetime backtest_start = '2020-01-01' backtest_end = datetime.now().strftime('%Y-%m-%d') # 动态获取当前日期 print("=" * 60) print(" 导出回测逐日明细 (模式B: 指数信号 + ETF收益)") print("=" * 60) # 1. 获取数据 print("\n[1] 获取数据...") index_data, etf_data, etf_code_map, etf_nav_data = fetch_all_data() # 2. A股交易日历 print("\n[2] 获取A股交易日历...") ts = TushareSource() a_share_dates = ts.fetch_trade_cal(backtest_start, backtest_end) print(f" {len(a_share_dates)} 天") # 3. 分组映射 group_mapping = {} for code, cfg in CODE_LIST.items(): if isinstance(cfg, dict): group_mapping[code] = cfg.get('market', 'default') valid_codes = [c for c in CODE_LIST if c in index_data] # 4. 计算因子(指数信号) print("\n[3] 计算指数动量因子...") idx_price_data = {} for code in valid_codes: if code in index_data and 'close' in index_data[code].columns: idx_price_data[code] = index_data[code] factor_df = compute_factors(idx_price_data, N_DAYS, a_share_dates) print(f" {len(factor_df.columns)} 只, {len(factor_df)} 天") # 5. 生成信号 print("\n[4] 生成信号...") signals = generate_signals(factor_df, group_mapping) print(f" {len(signals)} 天") # 6. 准备ETF收益率(模式B) print("\n[5] 准备ETF收益率...") etf_close_hfq_aligned = {} etf_close_aligned = {} etf_open_aligned = {} etf_close_hfq_raw = {} index_close_aligned = {} returns_etf = {} returns_idx = {} for code in valid_codes: # 指数收盘价和收益率 if code in index_data and 'close' in index_data[code].columns: ic = index_data[code]['close'].dropna() ic_a = ic.reindex(a_share_dates, method='ffill') index_close_aligned[code] = ic_a returns_idx[code] = ic_a.pct_change(fill_method=None) # ETF价格和收益率 etf_code = etf_code_map.get(code) if etf_code and code in etf_data: df = etf_data[code] chfq = df['close_hfq'].dropna() chfq_a = chfq.reindex(a_share_dates, method='ffill') etf_close_hfq_aligned[code] = chfq_a etf_close_hfq_raw[code] = chfq returns_etf[f'日收益率_{code}'] = chfq_a.pct_change(fill_method=None) ec = df['close'].reindex(a_share_dates, method='ffill') etf_close_aligned[code] = ec eo = df['open'].reindex(a_share_dates, method='ffill') etf_open_aligned[code] = eo elif code in index_data and 'close' in index_data[code].columns: ic = index_data[code]['close'].dropna() ic_a = ic.reindex(a_share_dates, method='ffill') returns_etf[f'日收益率_{code}'] = ic_a.pct_change(fill_method=None) returns_etf_df = pd.DataFrame(returns_etf) # 6.5 溢价率:(ETF收盘价 - 单位净值) / 单位净值 etf_premium_aligned = {} for code in valid_codes: if code in etf_nav_data and code in etf_close_aligned: nav_raw = etf_nav_data[code] nav_raw = nav_raw[~nav_raw.index.duplicated(keep='last')] nav = nav_raw.reindex(a_share_dates, method='ffill') close = etf_close_aligned[code] premium = (close - nav) / nav etf_premium_aligned[code] = premium # 7. 执行回测获取净值 print("\n[6] 执行回测...") common_dates = signals.index.intersection(returns_etf_df.index) signals_aligned = signals.loc[common_dates] returns_aligned = returns_etf_df.loc[common_dates] executor = BacktestExecutor( initial_capital=100000, trade_cost=TRADE_COST, select_num=SELECT_NUM ) portfolio = executor.execute(signals_aligned, returns_aligned) result = portfolio.backtest_result nav_series_raw = result['策略净值'] daily_ret_raw = result['策略日收益率'] # 扩展到所有common_dates,信号前的日期 nav=1.0, return=0.0 nav_series = nav_series_raw.reindex(common_dates) daily_ret_series = daily_ret_raw.reindex(common_dates, fill_value=0.0) first_valid = nav_series.first_valid_index() if first_valid is not None: nav_series.loc[:first_valid] = nav_series.loc[:first_valid].fillna(1.0) nav_series = nav_series.ffill() print(f" 终值: {nav_series.iloc[-1]:.4f}") # 8. 构建逐日明细 print("\n[7] 构建逐日明细...") # 持仓跟踪状态 holdings_state = {} # {code: {'entry_date': str, 'entry_price': float}} prev_holdings = set() days_list = [] signal_col = 'signal' for i, date in enumerate(common_dates): sig_val = signals_aligned.loc[date, signal_col] if signal_col in signals_aligned.columns else '' current_holdings = set(str(sig_val).split(',')) if pd.notna(sig_val) and sig_val else set() current_holdings.discard('') # 调仓检测 added = list(current_holdings - prev_holdings) removed = list(prev_holdings - current_holdings) is_rebalance = len(added) > 0 or len(removed) > 0 # 更新持仓状态 for code in removed: holdings_state.pop(code, None) for code in added: entry_price_etf = None entry_price_idx = None if code in etf_close_hfq_aligned: ep = etf_close_hfq_aligned[code].get(date) if pd.notna(ep): entry_price_etf = float(ep) if code in index_close_aligned: ep = index_close_aligned[code].get(date) if pd.notna(ep): entry_price_idx = float(ep) holdings_state[code] = { 'entry_date': date.strftime('%Y-%m-%d'), 'entry_price_etf': entry_price_etf, 'entry_price_idx': entry_price_idx, } # 动态阈值 factor_scores = {} for code in valid_codes: if code in factor_df.columns: v = factor_df.loc[date, code] if date in factor_df.index else np.nan if pd.notna(v): factor_scores[code] = float(v) bond_score = factor_scores.get(BOND_CODE) if BOND_THRESHOLD.get('enabled') and bond_score is not None and bond_score >= 0: threshold = bond_score * BOND_RATIO else: threshold = 0.0 # 排名(按动量降序,排除BOND) non_bond_scores = {k: v for k, v in factor_scores.items() if group_mapping.get(k) != 'BOND'} sorted_codes = sorted(non_bond_scores.keys(), key=lambda c: non_bond_scores[c], reverse=True) rank_map = {c: r + 1 for r, c in enumerate(sorted_codes)} # BOND不参与排名 if BOND_CODE in factor_scores: rank_map[BOND_CODE] = None # 每标的详情 assets = {} for code in valid_codes: asset = {} # 指数收盘价 if code in index_close_aligned: v = index_close_aligned[code].get(date) asset['index_close'] = safe_val(v, 2) else: asset['index_close'] = None # 动量 mom = factor_scores.get(code) asset['momentum'] = safe_val(mom, 4) # 排名 asset['rank'] = rank_map.get(code) # 阈值 asset['threshold'] = safe_val(threshold, 4) asset['above_threshold'] = mom >= threshold if mom is not None else False # ETF价格 if code in etf_close_aligned: asset['etf_close'] = safe_val(etf_close_aligned[code].get(date), 3) else: asset['etf_close'] = None if code in etf_open_aligned: asset['etf_open'] = safe_val(etf_open_aligned[code].get(date), 3) else: asset['etf_open'] = None if code in etf_close_hfq_aligned: asset['etf_close_hfq'] = safe_val(etf_close_hfq_aligned[code].get(date), 4) else: asset['etf_close_hfq'] = None # 溢价率 if code in etf_premium_aligned: asset['premium'] = safe_val(etf_premium_aligned[code].get(date), 4) else: asset['premium'] = None # ETF日收益率 ret_col = f'日收益率_{code}' if ret_col in returns_etf_df.columns: asset['etf_return_ctc'] = safe_val(returns_etf_df.loc[date, ret_col], 6) else: asset['etf_return_ctc'] = None # 指数日收益率 if code in returns_idx: asset['index_return'] = safe_val(returns_idx[code].get(date), 6) else: asset['index_return'] = None # 持仓状态 is_held = code in current_holdings asset['is_held'] = is_held if is_held and code in holdings_state: hs = holdings_state[code] asset['entry_date'] = hs['entry_date'] asset['entry_price_etf'] = safe_val(hs['entry_price_etf'], 4) asset['entry_price_idx'] = safe_val(hs['entry_price_idx'], 4) entry_dt = pd.Timestamp(hs['entry_date']) trading_days_held = len(common_dates[(common_dates >= entry_dt) & (common_dates <= date)]) asset['holding_days'] = trading_days_held # ETF累计收益 if hs['entry_price_etf'] and hs['entry_price_etf'] > 0: cur = etf_close_hfq_aligned[code].get(date) if code in etf_close_hfq_aligned else None if cur and pd.notna(cur): asset['cum_return_etf'] = safe_val(float(cur) / hs['entry_price_etf'] - 1, 4) else: asset['cum_return_etf'] = None else: asset['cum_return_etf'] = None # 指数累计收益 if hs['entry_price_idx'] and hs['entry_price_idx'] > 0: cur = index_close_aligned[code].get(date) if code in index_close_aligned else None if cur and pd.notna(cur): asset['cum_return_idx'] = safe_val(float(cur) / hs['entry_price_idx'] - 1, 4) else: asset['cum_return_idx'] = None else: asset['cum_return_idx'] = None else: asset['entry_date'] = None asset['entry_price_etf'] = None asset['entry_price_idx'] = None asset['holding_days'] = 0 asset['cum_return_etf'] = None asset['cum_return_idx'] = None assets[code] = asset # 构建当天记录 nav_val = nav_series.loc[date] if date in nav_series.index else None ret_val = daily_ret_series.loc[date] if date in daily_ret_series.index else None day_record = { 'date': date.strftime('%Y-%m-%d'), 'nav': safe_val(nav_val, 4), 'daily_return': safe_val(ret_val, 6), 'is_rebalance': is_rebalance, 'holdings': sorted(list(current_holdings)), 'added': sorted(added), 'removed': sorted(removed), 'assets': assets } days_list.append(day_record) prev_holdings = current_holdings # 9. 构建元数据 codes_meta = {} for code, cfg in CODE_LIST.items(): codes_meta[code] = { 'name': cfg['name'], 'etf': cfg.get('etf'), 'market': cfg.get('market') } output = { 'meta': { 'mode': 'B: 指数信号 + ETF收益', 'start_date': common_dates[0].strftime('%Y-%m-%d'), 'end_date': common_dates[-1].strftime('%Y-%m-%d'), 'total_days': len(common_dates), 'select_num': SELECT_NUM, 'n_days': N_DAYS, 'trade_cost': TRADE_COST, 'bond_threshold': { 'enabled': BOND_THRESHOLD.get('enabled', False), 'bond_code': BOND_CODE, 'ratio': BOND_RATIO }, 'codes': codes_meta }, 'days': days_list } # 10. 输出 output_path = project_root / 'results' / 'backtest_detail.json' print(f"\n[8] 写入 {output_path}...") with open(output_path, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False) file_size_mb = output_path.stat().st_size / 1024 / 1024 print(f" 大小: {file_size_mb:.1f} MB") print(f" 天数: {len(days_list)}") print(f" 标的: {len(valid_codes)}") print(" 完成!") if __name__ == '__main__': main()