Files
etf/visualization/report_generator/generate_report.py
aszerW c95ec9bfdb fix(report): 修复持仓收益百分号格式转换
- 使用apply+lambda统一处理百分号格式
- 添加列存在性检查,避免KeyError
- 正确计算盈亏次数
2026-05-11 23:10:31 +08:00

421 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ETF轮动策略报告生成器
=======================
从回测数据生成精美的 HTML 策略报告
使用方法:
python generate_report.py
python generate_report.py --start 2024-01-01 --end 2024-12-31
"""
import pandas as pd
import numpy as np
from jinja2 import Template
from datetime import datetime
import argparse
import os
import sys
import json
class ReportGenerator:
"""策略报告生成器"""
def __init__(self, results_dir='results'):
self.results_dir = results_dir
self.summary_df = None
self.trades_df = None
self.metrics = None # 从JSON加载的策略KPI
self.nav_df = None # 从CSV加载的净值曲线
def load_data(self):
"""加载回测数据"""
# 加载汇总数据
summary_path = os.path.join(self.results_dir, 'report_summary.csv')
if not os.path.exists(summary_path):
raise FileNotFoundError(f"找不到汇总数据文件: {summary_path}")
self.summary_df = pd.read_csv(summary_path)
# 转换百分比
for col in ['胜率', '平均收益', '累计收益', '最大单次收益', '最大单次亏损']:
if col in self.summary_df.columns:
self.summary_df[col] = self.summary_df[col].str.rstrip('%').astype(float)
# 加载交易记录
trades_path = os.path.join(self.results_dir, 'report_trades.csv')
if not os.path.exists(trades_path):
raise FileNotFoundError(f"找不到交易记录文件: {trades_path}")
self.trades_df = pd.read_csv(trades_path)
self.trades_df['进场日期'] = pd.to_datetime(self.trades_df['进场日期'])
self.trades_df['出场日期'] = pd.to_datetime(self.trades_df['出场日期'])
# 加载策略KPI JSON文件如果存在
metrics_path = os.path.join(self.results_dir, 'report_metrics.json')
if os.path.exists(metrics_path):
with open(metrics_path, 'r', encoding='utf-8') as f:
self.metrics = json.load(f)
print(f"✅ 加载策略指标: {metrics_path}")
else:
print(f"⚠️ 未找到策略指标文件: {metrics_path}")
# 加载净值曲线CSV文件如果存在
nav_path = os.path.join(self.results_dir, 'report_nav.csv')
if os.path.exists(nav_path):
self.nav_df = pd.read_csv(nav_path)
self.nav_df['日期'] = pd.to_datetime(self.nav_df['日期'])
print(f"✅ 加载净值曲线: {nav_path} ({len(self.nav_df)} 条记录)")
else:
print(f"⚠️ 未找到净值曲线文件: {nav_path}")
print(f"✅ 数据加载成功: {len(self.trades_df)} 条交易记录")
def calculate_kpis(self, trades_filtered=None):
"""计算关键指标 - 优先使用轮动策略输出的指标"""
# 如果有从JSON加载的策略KPI直接使用
if self.metrics is not None and '策略' in self.metrics:
strategy_metrics = self.metrics['策略']
print("✅ 使用轮动策略输出的KPI指标")
# 计算调仓次数需要从trades数据获取
df = trades_filtered if trades_filtered is not None else self.trades_df
total_trades = len(df)
# 最佳品种从summary获取
if self.summary_df is not None:
symbol_col = self.summary_df['累计收益']
best_symbol = self.summary_df.loc[symbol_col.idxmax(), '品种代码']
else:
best_symbol = 'N/A'
# 平均持仓天数
avg_holding_days = df['持仓天数'].mean() if len(df) > 0 else 0
# 盈亏次数基于trades数据
# 转换持仓收益为数值(统一处理百分号格式)
if '持仓收益' in df.columns:
# 使用通用转换方法
returns_series = df['持仓收益'].apply(
lambda x: float(str(x).rstrip('%')) if pd.notna(x) else 0.0
)
win_count = (returns_series > 0).sum()
loss_count = (returns_series < 0).sum()
else:
win_count = 0
loss_count = 0
return {
'total_return': f"{strategy_metrics['累计收益'] * 100:.2f}",
'annual_return': f"{strategy_metrics['年化收益(自然日)'] * 100:.2f}",
'win_rate': f"{strategy_metrics['日胜率'] * 100:.2f}",
'max_drawdown': f"{strategy_metrics['最大回撤'] * 100:.2f}",
'sharpe_ratio': f"{strategy_metrics['夏普比率']:.2f}",
'best_symbol': best_symbol,
'avg_holding_days': f"{avg_holding_days:.1f}",
'win_count': int(win_count),
'loss_count': int(loss_count)
}
# 否则重新计算(备用方案)
print("⚠️ 未找到策略KPI重新计算...")
df = trades_filtered if trades_filtered is not None else self.trades_df
# 使用净值计算真实收益
# 按日期分组计算每日组合净值
daily_nav = df.groupby('出场日期').apply(
lambda x: (x['出场净值'].astype(float) * x['仓位占比'].str.rstrip('%').astype(float) / 100).sum(),
include_groups=False
).reset_index()
daily_nav.columns = ['date', 'nav']
daily_nav = daily_nav.sort_values('date')
# 总收益 = (最终净值 - 初始净值) / 初始净值 * 100
initial_nav = daily_nav['nav'].iloc[0]
final_nav = daily_nav['nav'].iloc[-1]
total_return = (final_nav - initial_nav) / initial_nav * 100
# 年化收益
days = (daily_nav['date'].iloc[-1] - daily_nav['date'].iloc[0]).days
if days > 0:
annual_return = total_return / (days / 365.0)
else:
annual_return = 0
# 胜率 - 使用净值变化
daily_nav['nav_change'] = daily_nav['nav'].pct_change()
win_count = (daily_nav['nav_change'] > 0).sum()
total_count = len(daily_nav) - 1 # 减去第一天
win_rate = (win_count / total_count * 100) if total_count > 0 else 0
loss_count = total_count - win_count
# 夏普比率
if daily_nav['nav_change'].std() > 0:
sharpe_ratio = daily_nav['nav_change'].mean() / daily_nav['nav_change'].std() * np.sqrt(252)
else:
sharpe_ratio = 0
# 最大回撤
running_max = daily_nav['nav'].cummax()
drawdown = (daily_nav['nav'] - running_max) / running_max * 100
max_drawdown = drawdown.min()
# 调仓次数
total_trades = len(df)
# 最佳品种 - 从 summary 获取
if self.summary_df is not None:
symbol_col = self.summary_df['累计收益']
if symbol_col.dtype == 'object':
symbol_col_num = symbol_col.str.rstrip('%').astype(float)
else:
symbol_col_num = symbol_col
best_symbol = self.summary_df.loc[symbol_col_num.idxmax(), '品种代码']
else:
best_symbol = 'N/A'
# 平均持仓天数
avg_holding_days = df['持仓天数'].mean() if len(df) > 0 else 0
return {
'total_return': f"{total_return:.2f}",
'annual_return': f"{annual_return:.2f}",
'win_rate': f"{win_rate:.2f}",
'max_drawdown': f"{max_drawdown:.2f}",
'sharpe_ratio': f"{sharpe_ratio:.2f}",
'total_trades': str(total_trades),
'best_symbol': best_symbol,
'avg_holding_days': f"{avg_holding_days:.1f}",
'win_count': int(win_count),
'loss_count': int(loss_count)
}
def prepare_chart_data(self, trades_filtered=None):
"""准备图表数据 - 优先使用轮动策略输出的净值曲线"""
# 如果有从CSV加载的净值曲线直接使用
if self.nav_df is not None:
print("✅ 使用轮动策略输出的净值曲线")
# 净值曲线数据 - 直接读取
nav_dates = self.nav_df['日期'].dt.strftime('%Y-%m-%d').tolist()
nav_values = self.nav_df['策略净值'].round(4).tolist()
benchmark_values = self.nav_df['基准净值'].round(4).tolist()
# 月度收益数据 - 从净值计算
self.nav_df['年月'] = self.nav_df['日期'].dt.to_period('M')
monthly_nav = self.nav_df.groupby('年月').agg({
'策略净值': 'last'
}).reset_index()
monthly_nav.columns = ['年月', 'nav']
monthly_nav = monthly_nav.sort_values('年月')
monthly_nav['nav_change'] = monthly_nav['nav'].pct_change() * 100
monthly_nav['nav_change'] = monthly_nav['nav_change'].fillna(0)
monthly_nav['年月_str'] = monthly_nav['年月'].astype(str)
monthly_dates = monthly_nav['年月_str'].tolist()
monthly_values = monthly_nav['nav_change'].round(2).tolist()
# 盈亏分布 - 从trades数据计算
df = trades_filtered if trades_filtered is not None else self.trades_df
df = df.copy()
# 使用通用转换方法处理持仓收益
df['持仓收益_num'] = df['持仓收益'].apply(
lambda x: float(str(x).rstrip('%')) if pd.notna(x) else 0.0
)
positive_returns = df[df['持仓收益_num'] > 0]['持仓收益_num'].tolist()
negative_returns = df[df['持仓收益_num'] <= 0]['持仓收益_num'].tolist()
# 品种收益排行 - 使用累计收益列
symbol_returns = self.summary_df.set_index('品种代码')['累计收益']
symbol_returns = symbol_returns.sort_values()
symbol_names = []
symbol_returns_list = []
for code, ret in symbol_returns.items():
name = self.summary_df[self.summary_df['品种代码'] == code]
if len(name) > 0:
symbol_names.append(name.iloc[0]['品种名称'])
else:
symbol_names.append(code)
symbol_returns_list.append(ret)
return {
'nav_dates': nav_dates,
'nav_values': nav_values,
'benchmark_values': benchmark_values,
'monthly_dates': monthly_dates,
'monthly_values': monthly_values,
'positive_returns': positive_returns,
'negative_returns': negative_returns,
'symbol_names': symbol_names,
'symbol_returns': symbol_returns_list,
}
# 否则重新计算(备用方案)
print("⚠️ 未找到净值曲线,重新计算...")
df = trades_filtered if trades_filtered is not None else self.trades_df
# 转换持仓收益为数值(统一处理百分号格式)
df = df.copy()
df['持仓收益_num'] = df['持仓收益'].apply(
lambda x: float(str(x).rstrip('%')) if pd.notna(x) else 0.0
)
df_sorted = df.sort_values('出场日期')
# 净值曲线数据 - 使用出场净值(考虑仓位加权)
# 按日期分组,计算每日的加权平均净值
daily_nav = df_sorted.groupby('出场日期').apply(
lambda x: (x['出场净值'].astype(float) * x['仓位占比'].str.rstrip('%').astype(float) / 100).sum(),
include_groups=False
).reset_index()
daily_nav.columns = ['date', 'nav']
daily_nav = daily_nav.sort_values('date')
nav_values = daily_nav['nav'].round(4).tolist()
nav_dates = daily_nav['date'].dt.strftime('%Y-%m-%d').tolist()
benchmark_values = [] # 备用方案无基准数据
# 月度收益数据 - 使用净值变化计算
df_copy = df_sorted.copy()
df_copy['年月'] = df_copy['出场日期'].dt.to_period('M')
monthly_nav = df_copy.groupby('年月').apply(
lambda x: (x['出场净值'].astype(float) * x['仓位占比'].str.rstrip('%').astype(float) / 100).sum(),
include_groups=False
).reset_index()
monthly_nav.columns = ['年月', 'nav']
monthly_nav = monthly_nav.sort_values('年月')
monthly_nav['nav_change'] = monthly_nav['nav'].pct_change() * 100
monthly_nav['nav_change'] = monthly_nav['nav_change'].fillna(0)
monthly_nav['年月_str'] = monthly_nav['年月'].astype(str)
monthly_dates = monthly_nav['年月_str'].tolist()
monthly_values = monthly_nav['nav_change'].round(2).tolist()
# 品种收益排行 - 使用累计收益列
symbol_returns = self.summary_df.set_index('品种代码')['累计收益']
if symbol_returns.dtype == 'object':
symbol_returns = symbol_returns.str.rstrip('%').astype(float)
symbol_returns = symbol_returns.sort_values()
symbol_names = []
symbol_returns_list = []
for code, ret in symbol_returns.items():
name = self.summary_df[self.summary_df['品种代码'] == code]
if len(name) > 0:
symbol_names.append(name.iloc[0]['品种名称'])
else:
symbol_names.append(code)
symbol_returns_list.append(round(ret, 2))
# 唯一品种列表
symbols = df['品种代码'].unique().tolist()
return {
'nav_dates': nav_dates,
'nav_values': [round(v, 2) for v in nav_values],
'monthly_dates': monthly_dates,
'monthly_values': monthly_values,
'symbol_names': symbol_names,
'symbol_returns': symbol_returns_list,
'symbols': symbols
}
def generate(self, start_date=None, end_date=None, output_dir='reports'):
"""生成报告"""
print("🚀 开始生成策略报告...")
# 加载数据
self.load_data()
# 筛选数据
if start_date:
start_date = pd.to_datetime(start_date)
if end_date:
end_date = pd.to_datetime(end_date)
trades_filtered = self.trades_df.copy()
if start_date:
trades_filtered = trades_filtered[trades_filtered['出场日期'] >= start_date]
if end_date:
trades_filtered = trades_filtered[trades_filtered['出场日期'] <= end_date]
print(f"📊 筛选后数据: {len(trades_filtered)} 条记录")
# 计算指标
kpis = self.calculate_kpis(trades_filtered)
chart_data = self.prepare_chart_data(trades_filtered)
# 准备交易记录 - 按出场日期倒序排列(最新在前)
trades_display = trades_filtered.sort_values('出场日期', ascending=False).copy()
trades_display['进场日期'] = trades_display['进场日期'].dt.strftime('%Y-%m-%d')
trades_display['出场日期'] = trades_display['出场日期'].dt.strftime('%Y-%m-%d')
trades_list = trades_display.to_dict('records')
# 分页参数
page_size = 50 # 每页显示50条记录
total_trades = len(trades_list)
total_pages = (total_trades // page_size) + (1 if total_trades % page_size > 0 else 0)
# 读取模板
template_path = os.path.join(os.path.dirname(__file__), 'template.html')
with open(template_path, 'r', encoding='utf-8') as f:
template = Template(f.read())
# 渲染模板
html = template.render(
report_date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
start_date=start_date.strftime('%Y-%m-%d') if start_date else trades_filtered['出场日期'].min().strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d') if end_date else trades_filtered['出场日期'].max().strftime('%Y-%m-%d'),
trades=trades_list,
page_size=page_size,
total_trades=total_trades,
total_pages=total_pages,
**kpis,
**chart_data
)
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 保存报告(固定文件名)
output_file = os.path.join(output_dir, 'strategy_report.html')
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html)
print(f"✅ 报告已生成: {output_file}")
print(f"📁 文件大小: {os.path.getsize(output_file) / 1024:.1f} KB")
print(f"🌐 在浏览器中打开: file://{os.path.abspath(output_file)}")
return output_file
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='生成ETF轮动策略报告')
parser.add_argument('--start', type=str, help='开始日期 (YYYY-MM-DD)')
parser.add_argument('--end', type=str, help='结束日期 (YYYY-MM-DD)')
parser.add_argument('--output', type=str, default='reports', help='输出目录')
args = parser.parse_args()
try:
generator = ReportGenerator()
generator.generate(
start_date=args.start,
end_date=args.end,
output_dir=args.output
)
except Exception as e:
print(f"❌ 生成失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()