Files
etf/framework_v2/scripts/export_backtest_detail.py
aszerW a62cfb4cd5 fix: 修复因子前向填充不生效的 bug(清理调试代码)
问题根因:
- pandas reindex(method='ffill') 只填充新增行的 NaN,不填充已存在的 NaN
- 当 factor_df 中已有境外市场放假日期的 NaN 值时,reindex 无法填充

修复方案:
- 改为两步操作:reindex() 然后 ffill()
- ffill() 会填充所有 NaN,包括已存在的

验证结果:
- 2026-04-30 HSI: None → 0.2388 
- 2026-04-30 GDAXI: None → 0.5647 
- 2026-05-08 HSI: None → 0.1144 
2026-05-25 19:16:14 +08:00

401 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
导出 V2 框架回测逐日明细到 JSON供 HTML 回放器加载。
适用于 GlobalRotationStrategyV2 正式版)
- 指数信号 + ETF 收益
- 动态短债阈值
- 强制分散化
- 交易成本
- CrossMarketAligner 数据对齐
用法:
python framework_v2/scripts/export_backtest_detail.py
"""
import sys
import json
import math
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv()
from framework_v2.config import load_config
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
from framework_v2.shared.data.alignment import CrossMarketAligner
# ==================== 辅助函数 ====================
def safe_val(v, decimals=4):
"""安全转换数值,处理 NaN/Inf"""
if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))):
return None
if isinstance(v, (np.floating, float)):
return round(float(v), decimals)
if isinstance(v, (np.integer, int)):
return int(v)
return v
def main():
print("=" * 80)
print(" V2 回测逐日明细导出GlobalRotationStrategy")
print("=" * 80)
# 1. 加载配置
config_file = project_root / 'framework_v2' / 'strategies' / 'rotation' / 'config_simple.yaml'
print(f"\n[1] 加载配置: {config_file}")
config = load_config(str(config_file))
# 2. 初始化策略
print("[2] 初始化策略...")
strategy = GlobalRotationStrategy(config)
# 3. 获取数据
print("[3] 获取数据...")
data = strategy.get_data()
print(f" 获取 {len(data)} 个标的")
# 4. 计算因子
print("[4] 计算因子...")
factors = strategy.compute_factors(data)
print(f" 计算 {len(factors)} 个因子")
# 5. 生成信号
print("[5] 生成信号...")
signals = strategy.generate_signals(factors)
print(f" 生成 {signals.shape[0]} 个信号")
# 6. 仓位管理
print("[6] 仓位管理...")
positions = strategy.manage_positions(signals)
# 7. 准备收益率数据(使用 CrossMarketAligner
print("[7] 准备收益率数据...")
signal_to_trade = config.asset_pools.get_signal_to_trade_mapping()
# 获取 A 股交易日历
trading_calendar = strategy._get_trading_calendar()
print(f" A 股交易日: {len(trading_calendar)}")
# 准备收盘价和溢价率数据
print("[7.5] 准备价格和溢价率数据...")
index_close_dict = {} # 指数收盘价
etf_close_dict = {} # ETF 收盘价
etf_premium_dict = {} # ETF 溢价率(需要从 API 获取)
for signal_code, trade_code in signal_to_trade.items():
# 指数收盘价
if signal_code in data:
index_close_dict[signal_code] = data[signal_code]['close']
# ETF 收盘价
if trade_code in data:
etf_close_dict[signal_code] = data[trade_code]['close'] # 注意:用 signal_code 作为键
# 溢价率暂时设为 None需要额外 API 支持)
# TODO: 接入 ETF 净值数据计算溢价率
# 创建对齐器
aligner = CrossMarketAligner(target_calendar=trading_calendar)
# 提取收盘价
close_dict = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in data:
close_dict[signal_code] = data[trade_code]['close']
# 对齐收益率
returns_df = aligner.align_multi_asset(close_dict)
print(f" 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的")
# 8. 计算策略收益和净值
print("[8] 计算策略收益...")
positions_aligned = positions.reindex(trading_calendar, method='ffill')
positions_delayed = positions_aligned.shift(1).fillna(0)
strategy_returns = (positions_delayed * returns_df).sum(axis=1)
# 扣除交易成本
strategy_returns_clean, rebalance_count = strategy._apply_trade_cost(
strategy_returns, positions_aligned
)
print(f" 调仓次数: {rebalance_count}")
# 计算净值
equity_curve = (1 + strategy_returns_clean).cumprod()
print(f" 最终净值: {equity_curve.iloc[-1]:.4f}")
# 9. 构建逐日明细
print("[9] 构建逐日明细...")
# 获取展示日历
common_dates = equity_curve.index
# 因子数据DataFrame 格式)
factor_df = pd.DataFrame(factors)
# 确保索引是 DatetimeIndex
if not isinstance(factor_df.index, pd.DatetimeIndex):
factor_df.index = pd.to_datetime(factor_df.index)
# 将因子对齐到实际展示日历(前向填充)
# 因子已经在原始数据上计算完成,这里只是将结果对齐到展示日历
# 注意:必须先 reindex 再 ffill因为 reindex(method='ffill') 不会填充已有的 NaN
factor_df_aligned = factor_df.reindex(common_dates)
factor_df_aligned = factor_df_aligned.ffill()
# 持仓状态跟踪
holdings_state = {} # {code: {'entry_date': str, 'entry_price': float}}
prev_holdings = set()
days_list = []
# 获取配置信息
bond_code = strategy.bond_code if strategy.use_dynamic_threshold else None
bond_ratio = strategy.bond_ratio
for i, date in enumerate(common_dates):
# 当前持仓
pos_row = positions_aligned.loc[date]
current_holdings = set(pos_row[pos_row > 0].index.tolist())
# 调仓检测
added = list(current_holdings - prev_holdings)
removed = list(prev_holdings - current_holdings)
is_rebalance = len(added) > 0 or len(removed) > 0
# 更新持仓状态
for code in removed:
holdings_state.pop(code, None)
for code in added:
# 获取入场价格
entry_price = None
if code in close_dict:
ep = close_dict[code].get(date)
if pd.notna(ep):
entry_price = float(ep)
holdings_state[code] = {
'entry_date': date.strftime('%Y-%m-%d'),
'entry_price': entry_price,
}
# 动态阈值(使用对齐后的因子)
factor_scores = {}
if date in factor_df_aligned.index:
for code in factor_df_aligned.columns:
v = factor_df_aligned.loc[date, code]
if pd.notna(v):
factor_scores[code] = float(v)
bond_score = factor_scores.get(bond_code) if bond_code else None
if bond_score is not None:
threshold = bond_score * bond_ratio
else:
threshold = 0.0
# 排名(按动量降序,排除 BOND
groups = config.asset_pools.by_group
bond_assets = groups.get('BOND', {})
bond_codes = set(bond_assets.keys())
non_bond_scores = {k: v for k, v in factor_scores.items() if k not in bond_codes}
sorted_codes = sorted(non_bond_scores.keys(),
key=lambda c: non_bond_scores[c], reverse=True)
rank_map = {c: r + 1 for r, c in enumerate(sorted_codes)}
# BOND 不参与排名
for code in bond_codes:
if code in factor_scores:
rank_map[code] = None
# 每标的详情
assets = {}
all_codes = factor_df.columns.tolist()
# 对齐价格到 A 股日历
index_close_aligned = {}
etf_close_aligned = {}
for code in all_codes:
if code in index_close_dict:
index_close_aligned[code] = index_close_dict[code].reindex(common_dates, method='ffill')
if code in etf_close_dict:
etf_close_aligned[code] = etf_close_dict[code].reindex(common_dates, method='ffill')
# 计算指数和 ETF 收益率
index_returns = {}
etf_returns = {}
for code in all_codes:
if code in index_close_aligned:
index_returns[code] = index_close_aligned[code].pct_change(fill_method=None)
if code in etf_close_aligned:
etf_returns[code] = etf_close_aligned[code].pct_change(fill_method=None)
for code in all_codes:
asset = {}
# 动量得分
mom = factor_scores.get(code)
asset['momentum'] = safe_val(mom, 4)
# 排名
asset['rank'] = rank_map.get(code)
# 阈值
asset['threshold'] = safe_val(threshold, 4)
asset['above_threshold'] = mom >= threshold if mom is not None else False
# 指数价格
if code in index_close_aligned:
idx_close = index_close_aligned[code].get(date)
asset['index_close'] = safe_val(idx_close, 2) if pd.notna(idx_close) else None
else:
asset['index_close'] = None
# ETF 价格
if code in etf_close_aligned:
etf_close = etf_close_aligned[code].get(date)
asset['etf_close'] = safe_val(etf_close, 3) if pd.notna(etf_close) else None
else:
asset['etf_close'] = None
# 指数收益率
if code in index_returns:
idx_ret = index_returns[code].get(date)
asset['index_return'] = safe_val(idx_ret, 6) if pd.notna(idx_ret) else None
else:
asset['index_return'] = None
# ETF 收益率(兼容 V1 命名etf_return_ctc
if code in etf_returns:
etf_ret = etf_returns[code].get(date)
asset['etf_return_ctc'] = safe_val(etf_ret, 6) if pd.notna(etf_ret) else None
else:
asset['etf_return_ctc'] = None
# 溢价率(暂时为 None
asset['premium'] = None
# 持仓状态
is_held = code in current_holdings
asset['is_held'] = is_held
if is_held and code in holdings_state:
hs = holdings_state[code]
asset['entry_date'] = hs['entry_date']
asset['entry_price_etf'] = safe_val(hs['entry_price'], 4)
asset['entry_price_idx'] = None # V2 暂不记录指数进场价
entry_dt = pd.Timestamp(hs['entry_date'])
trading_days_held = len(common_dates[(common_dates >= entry_dt) & (common_dates <= date)])
asset['holding_days'] = trading_days_held
# 累计收益(区分 ETF 和指数,兼容 V1
if hs['entry_price'] and hs['entry_price'] > 0:
if code in close_dict:
cur = close_dict[code].get(date)
if cur and pd.notna(cur):
cum_ret = float(cur) / hs['entry_price'] - 1
asset['cum_return_etf'] = safe_val(cum_ret, 4)
asset['cum_return_idx'] = safe_val(cum_ret, 4) # V2 暂不区分
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['entry_date'] = None
asset['entry_price_etf'] = None
asset['entry_price_idx'] = None
asset['holding_days'] = 0
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
assets[code] = asset
# 构建当天记录
nav_val = equity_curve.loc[date] if date in equity_curve.index else None
ret_val = strategy_returns_clean.loc[date] if date in strategy_returns_clean.index else None
day_record = {
'date': date.strftime('%Y-%m-%d'),
'nav': safe_val(nav_val, 4),
'daily_return': safe_val(ret_val, 6),
'is_rebalance': is_rebalance,
'holdings': sorted(list(current_holdings)),
'added': sorted(added),
'removed': sorted(removed),
'assets': assets
}
days_list.append(day_record)
prev_holdings = current_holdings
# 10. 构建元数据(兼容 V1 格式)
codes_meta = {}
for code in all_codes:
asset_config = config.asset_pools.assets.get(code)
codes_meta[code] = {
'name': asset_config.name if asset_config else code,
'etf': asset_config.trade_source if asset_config else None,
'market': asset_config.group if asset_config else None # V1 使用 market 字段
}
output = {
'meta': {
'mode': 'V2: 指数信号 + ETF收益',
'start_date': common_dates[0].strftime('%Y-%m-%d'),
'end_date': common_dates[-1].strftime('%Y-%m-%d'),
'total_days': len(common_dates),
'select_num': strategy.select_num,
'n_days': config.factor.n_days,
'trade_cost': strategy.trade_cost,
'bond_threshold': {
'enabled': strategy.use_dynamic_threshold,
'bond_code': bond_code,
'ratio': bond_ratio
},
'codes': codes_meta
},
'days': days_list
}
# 11. 输出
output_path = project_root / 'framework_v2' / 'results' / 'backtest_detail_v2.json'
print(f"\n[10] 写入 {output_path}...")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False)
file_size_mb = output_path.stat().st_size / 1024 / 1024
print(f" 大小: {file_size_mb:.1f} MB")
print(f" 天数: {len(days_list)}")
print(f" 标的: {len(all_codes)}")
print(" 完成!")
# 打印汇总统计
print("\n" + "=" * 80)
print(" 回测汇总")
print("=" * 80)
print(f" 总收益: {(equity_curve.iloc[-1] - 1) * 100:.2f}%")
print(f" 年化收益: {((equity_curve.iloc[-1]) ** (252 / len(common_dates)) - 1) * 100:.2f}%")
print(f" 调仓次数: {rebalance_count}")
print(f" 交易天数: {len(common_dates)}")
print(f" 输出文件: {output_path}")
if __name__ == '__main__':
main()