diff --git a/framework_v2/core/strategy.py b/framework_v2/core/strategy.py index 6c2a5ec..9a3c5e7 100644 --- a/framework_v2/core/strategy.py +++ b/framework_v2/core/strategy.py @@ -161,12 +161,15 @@ class StrategyBase(ABC): return self._signal_generator.generate(factor_df) - def run(self, data: Optional[Dict[str, pd.DataFrame]] = None) -> Dict[str, Any]: + def run(self, data: Optional[Dict[str, pd.DataFrame]] = None, + export_detail: bool = False, detail_path: str = None) -> Dict[str, Any]: """ 运行完整回测流程(框架标准流程) Args: data: 可选,如不提供则自动获取 + export_detail: 是否导出逐日明细(默认 False) + detail_path: 明细 JSON 文件路径(export_detail=True 时必需) Returns: 回测结果字典,包含: @@ -174,11 +177,14 @@ class StrategyBase(ABC): - trades: 交易记录 - metrics: 绩效指标 """ - # 1. 获取数据 + # 1. 获取数据并保存 if data is None: print("[1/5] 获取数据...") data = self.get_data() + self._data = data # 保存数据供导出使用 print(f" 获取 {len(data)} 个标的") + else: + self._data = data # 2. 计算因子 print("[2/5] 计算因子...") @@ -205,6 +211,20 @@ class StrategyBase(ABC): result = self._execute_backtest(positions, data) print(f" 回测完成") + # 6. 可选:导出逐日明细 + if export_detail: + if not detail_path: + raise ValueError("export_detail=True 时需要指定 detail_path") + + print("\n[额外] 导出逐日明细...") + self._export_backtest_detail( + factors=factors, + signals=signals, + positions=positions, + result=result, + output_path=detail_path + ) + return result def _execute_backtest(self, signals: pd.DataFrame, data: Dict[str, Any]) -> Dict[str, Any]: diff --git a/framework_v2/scripts/export_backtest_detail.py b/framework_v2/scripts/export_backtest_detail.py index f9050aa..afefed3 100644 --- a/framework_v2/scripts/export_backtest_detail.py +++ b/framework_v2/scripts/export_backtest_detail.py @@ -1,26 +1,16 @@ #!/usr/bin/env python3 """ -导出 V2 框架回测逐日明细到 JSON,供 HTML 回放器加载。 +导出 V2 框架回测逐日明细到 JSON(简化版) -适用于 GlobalRotationStrategy(V2 正式版) -- 指数信号 + ETF 收益 -- 动态短债阈值 -- 强制分散化 -- 交易成本 -- CrossMarketAligner 数据对齐 +现在直接调用 strategy.run(export_detail=True) +不再重复执行策略逻辑 用法: python framework_v2/scripts/export_backtest_detail.py """ import sys -import json -import math from pathlib import Path -from datetime import datetime - -import numpy as np -import pandas as pd project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) @@ -30,20 +20,6 @@ load_dotenv() from framework_v2.config import load_config from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy -from framework_v2.shared.data.alignment import CrossMarketAligner - - -# ==================== 辅助函数 ==================== - -def safe_val(v, decimals=4): - """安全转换数值,处理 NaN/Inf""" - if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): - return None - if isinstance(v, (np.floating, float)): - return round(float(v), decimals) - if isinstance(v, (np.integer, int)): - return int(v) - return v def main(): @@ -60,339 +36,25 @@ def main(): print("[2] 初始化策略...") strategy = GlobalRotationStrategy(config) - # 3. 获取数据 - print("[3] 获取数据...") - data = strategy.get_data() - print(f" 获取 {len(data)} 个标的") - - # 4. 计算因子 - print("[4] 计算因子...") - factors = strategy.compute_factors(data) - print(f" 计算 {len(factors)} 个因子") - - # 5. 生成信号 - print("[5] 生成信号...") - signals = strategy.generate_signals(factors) - print(f" 生成 {signals.shape[0]} 个信号") - - # 6. 仓位管理 - print("[6] 仓位管理...") - positions = strategy.manage_positions(signals) - - # 7. 准备收益率数据(使用 CrossMarketAligner) - print("[7] 准备收益率数据...") - signal_to_trade = config.asset_pools.get_signal_to_trade_mapping() - - # 获取 A 股交易日历 - trading_calendar = strategy._get_trading_calendar() - print(f" A 股交易日: {len(trading_calendar)} 天") - - # 准备收盘价和溢价率数据 - print("[7.5] 准备价格和溢价率数据...") - index_close_dict = {} # 指数收盘价 - etf_close_dict = {} # ETF 收盘价 - etf_premium_dict = {} # ETF 溢价率(需要从 API 获取) - - for signal_code, trade_code in signal_to_trade.items(): - # 指数收盘价 - if signal_code in data: - index_close_dict[signal_code] = data[signal_code]['close'] - - # ETF 收盘价 - if trade_code in data: - etf_close_dict[signal_code] = data[trade_code]['close'] # 注意:用 signal_code 作为键 - - # 溢价率暂时设为 None(需要额外 API 支持) - # TODO: 接入 ETF 净值数据计算溢价率 - - # 创建对齐器 - aligner = CrossMarketAligner(target_calendar=trading_calendar) - - # 提取收盘价 - close_dict = {} - for signal_code, trade_code in signal_to_trade.items(): - if trade_code in data: - close_dict[signal_code] = data[trade_code]['close'] - - # 对齐收益率 - returns_df = aligner.align_multi_asset(close_dict) - print(f" 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的") - - # 8. 计算策略收益和净值 - print("[8] 计算策略收益...") - positions_aligned = positions.reindex(trading_calendar, method='ffill') - positions_delayed = positions_aligned.shift(1).fillna(0) - strategy_returns = (positions_delayed * returns_df).sum(axis=1) - - # 扣除交易成本 - strategy_returns_clean, rebalance_count = strategy._apply_trade_cost( - strategy_returns, positions_aligned - ) - print(f" 调仓次数: {rebalance_count}") - - # 计算净值 - equity_curve = (1 + strategy_returns_clean).cumprod() - print(f" 最终净值: {equity_curve.iloc[-1]:.4f}") - - # 9. 构建逐日明细 - print("[9] 构建逐日明细...") - - # 获取展示日历 - common_dates = equity_curve.index - - # 因子数据(DataFrame 格式) - factor_df = pd.DataFrame(factors) - - # 确保索引是 DatetimeIndex - if not isinstance(factor_df.index, pd.DatetimeIndex): - factor_df.index = pd.to_datetime(factor_df.index) - - # 将因子对齐到实际展示日历(前向填充) - # 因子已经在原始数据上计算完成,这里只是将结果对齐到展示日历 - # 注意:必须先 reindex 再 ffill,因为 reindex(method='ffill') 不会填充已有的 NaN - factor_df_aligned = factor_df.reindex(common_dates) - factor_df_aligned = factor_df_aligned.ffill() - - # 持仓状态跟踪 - holdings_state = {} # {code: {'entry_date': str, 'entry_price': float}} - prev_holdings = set() - - days_list = [] - - # 获取配置信息 - bond_code = strategy.bond_code if strategy.use_dynamic_threshold else None - bond_ratio = strategy.bond_ratio - - for i, date in enumerate(common_dates): - # 当前持仓 - pos_row = positions_aligned.loc[date] - current_holdings = set(pos_row[pos_row > 0].index.tolist()) - - # 调仓检测 - added = list(current_holdings - prev_holdings) - removed = list(prev_holdings - current_holdings) - is_rebalance = len(added) > 0 or len(removed) > 0 - - # 更新持仓状态 - for code in removed: - holdings_state.pop(code, None) - for code in added: - # 获取入场价格 - entry_price = None - if code in close_dict: - ep = close_dict[code].get(date) - if pd.notna(ep): - entry_price = float(ep) - - holdings_state[code] = { - 'entry_date': date.strftime('%Y-%m-%d'), - 'entry_price': entry_price, - } - - # 动态阈值(使用对齐后的因子) - factor_scores = {} - if date in factor_df_aligned.index: - for code in factor_df_aligned.columns: - v = factor_df_aligned.loc[date, code] - if pd.notna(v): - factor_scores[code] = float(v) - - bond_score = factor_scores.get(bond_code) if bond_code else None - if bond_score is not None: - threshold = bond_score * bond_ratio - else: - threshold = 0.0 - - # 排名(按动量降序,排除 BOND) - groups = config.asset_pools.by_group - bond_assets = groups.get('BOND', {}) - bond_codes = set(bond_assets.keys()) - - non_bond_scores = {k: v for k, v in factor_scores.items() if k not in bond_codes} - sorted_codes = sorted(non_bond_scores.keys(), - key=lambda c: non_bond_scores[c], reverse=True) - rank_map = {c: r + 1 for r, c in enumerate(sorted_codes)} - - # BOND 不参与排名 - for code in bond_codes: - if code in factor_scores: - rank_map[code] = None - - # 每标的详情 - assets = {} - all_codes = factor_df.columns.tolist() - - # 对齐价格到 A 股日历 - index_close_aligned = {} - etf_close_aligned = {} - - for code in all_codes: - if code in index_close_dict: - index_close_aligned[code] = index_close_dict[code].reindex(common_dates, method='ffill') - if code in etf_close_dict: - etf_close_aligned[code] = etf_close_dict[code].reindex(common_dates, method='ffill') - - # 计算指数和 ETF 收益率 - index_returns = {} - etf_returns = {} - for code in all_codes: - if code in index_close_aligned: - index_returns[code] = index_close_aligned[code].pct_change(fill_method=None) - if code in etf_close_aligned: - etf_returns[code] = etf_close_aligned[code].pct_change(fill_method=None) - - for code in all_codes: - asset = {} - - # 动量得分 - mom = factor_scores.get(code) - asset['momentum'] = safe_val(mom, 4) - - # 排名 - asset['rank'] = rank_map.get(code) - - # 阈值 - asset['threshold'] = safe_val(threshold, 4) - asset['above_threshold'] = mom >= threshold if mom is not None else False - - # 指数价格 - if code in index_close_aligned: - idx_close = index_close_aligned[code].get(date) - asset['index_close'] = safe_val(idx_close, 2) if pd.notna(idx_close) else None - else: - asset['index_close'] = None - - # ETF 价格 - if code in etf_close_aligned: - etf_close = etf_close_aligned[code].get(date) - asset['etf_close'] = safe_val(etf_close, 3) if pd.notna(etf_close) else None - else: - asset['etf_close'] = None - - # 指数收益率 - if code in index_returns: - idx_ret = index_returns[code].get(date) - asset['index_return'] = safe_val(idx_ret, 6) if pd.notna(idx_ret) else None - else: - asset['index_return'] = None - - # ETF 收益率(兼容 V1 命名:etf_return_ctc) - if code in etf_returns: - etf_ret = etf_returns[code].get(date) - asset['etf_return_ctc'] = safe_val(etf_ret, 6) if pd.notna(etf_ret) else None - else: - asset['etf_return_ctc'] = None - - # 溢价率(暂时为 None) - asset['premium'] = None - - # 持仓状态 - is_held = code in current_holdings - asset['is_held'] = is_held - - if is_held and code in holdings_state: - hs = holdings_state[code] - asset['entry_date'] = hs['entry_date'] - asset['entry_price_etf'] = safe_val(hs['entry_price'], 4) - asset['entry_price_idx'] = None # V2 暂不记录指数进场价 - - entry_dt = pd.Timestamp(hs['entry_date']) - trading_days_held = len(common_dates[(common_dates >= entry_dt) & (common_dates <= date)]) - asset['holding_days'] = trading_days_held - - # 累计收益(区分 ETF 和指数,兼容 V1) - if hs['entry_price'] and hs['entry_price'] > 0: - if code in close_dict: - cur = close_dict[code].get(date) - if cur and pd.notna(cur): - cum_ret = float(cur) / hs['entry_price'] - 1 - asset['cum_return_etf'] = safe_val(cum_ret, 4) - asset['cum_return_idx'] = safe_val(cum_ret, 4) # V2 暂不区分 - else: - asset['cum_return_etf'] = None - asset['cum_return_idx'] = None - else: - asset['cum_return_etf'] = None - asset['cum_return_idx'] = None - else: - asset['cum_return_etf'] = None - asset['cum_return_idx'] = None - else: - asset['entry_date'] = None - asset['entry_price_etf'] = None - asset['entry_price_idx'] = None - asset['holding_days'] = 0 - asset['cum_return_etf'] = None - asset['cum_return_idx'] = None - - assets[code] = asset - - # 构建当天记录 - nav_val = equity_curve.loc[date] if date in equity_curve.index else None - ret_val = strategy_returns_clean.loc[date] if date in strategy_returns_clean.index else None - - day_record = { - 'date': date.strftime('%Y-%m-%d'), - 'nav': safe_val(nav_val, 4), - 'daily_return': safe_val(ret_val, 6), - 'is_rebalance': is_rebalance, - 'holdings': sorted(list(current_holdings)), - 'added': sorted(added), - 'removed': sorted(removed), - 'assets': assets - } - days_list.append(day_record) - prev_holdings = current_holdings - - # 10. 构建元数据(兼容 V1 格式) - codes_meta = {} - for code in all_codes: - asset_config = config.asset_pools.assets.get(code) - codes_meta[code] = { - 'name': asset_config.name if asset_config else code, - 'etf': asset_config.trade_source if asset_config else None, - 'market': asset_config.group if asset_config else None # V1 使用 market 字段 - } - - output = { - 'meta': { - 'mode': 'V2: 指数信号 + ETF收益', - 'start_date': common_dates[0].strftime('%Y-%m-%d'), - 'end_date': common_dates[-1].strftime('%Y-%m-%d'), - 'total_days': len(common_dates), - 'select_num': strategy.select_num, - 'n_days': config.factor.n_days, - 'trade_cost': strategy.trade_cost, - 'bond_threshold': { - 'enabled': strategy.use_dynamic_threshold, - 'bond_code': bond_code, - 'ratio': bond_ratio - }, - 'codes': codes_meta - }, - 'days': days_list - } - - # 11. 输出 + # 3. 运行策略并导出明细 output_path = project_root / 'framework_v2' / 'results' / 'backtest_detail_v2.json' - print(f"\n[10] 写入 {output_path}...") - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(output, f, ensure_ascii=False) - file_size_mb = output_path.stat().st_size / 1024 / 1024 - print(f" 大小: {file_size_mb:.1f} MB") - print(f" 天数: {len(days_list)}") - print(f" 标的: {len(all_codes)}") - print(" 完成!") + print("[3] 运行策略并导出明细...") + result = strategy.run( + export_detail=True, + detail_path=str(output_path) + ) - # 打印汇总统计 + # 4. 打印汇总 print("\n" + "=" * 80) print(" 回测汇总") print("=" * 80) - print(f" 总收益: {(equity_curve.iloc[-1] - 1) * 100:.2f}%") - print(f" 年化收益: {((equity_curve.iloc[-1]) ** (252 / len(common_dates)) - 1) * 100:.2f}%") - print(f" 调仓次数: {rebalance_count}") - print(f" 交易天数: {len(common_dates)}") + print(f" 总收益: {result['metrics']['total_return'] * 100:.2f}%") + print(f" 年化收益: {result['metrics']['annual_return'] * 100:.2f}%") + print(f" 最大回撤: {result['metrics']['max_drawdown'] * 100:.2f}%") + print(f" 夏普比率: {result['metrics']['sharpe_ratio']:.2f}") + print(f" 调仓次数: {result['metrics']['rebalance_count']}") + print(f" 交易天数: {result['metrics']['n_days']}") print(f" 输出文件: {output_path}")