""" 统一入口:依次运行 6 个 Task,收集输出,合并生成最终诊断报告。 Usage: python -m rotation.experiments.run_all # 或 python rotation/experiments/run_all.py """ import io import sys import time import json from pathlib import Path from contextlib import redirect_stdout from datetime import datetime sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from rotation.experiments.common import ( OUTPUT_DIR, ensure_output_dir, load_metrics, print_section, ) def capture_task(task_func, task_name: str) -> dict: """运行单个 Task 并捕获输出和返回值""" print(f"\n{'#'*60}") print(f"# 运行 {task_name}") print(f"{'#'*60}") buf = io.StringIO() start = time.time() try: with redirect_stdout(buf): result = task_func() elapsed = time.time() - start output = buf.getvalue() print(output) # 同时打印到终端 return { 'name': task_name, 'status': 'OK', 'elapsed': elapsed, 'output': output, 'result': result, } except Exception as e: elapsed = time.time() - start output = buf.getvalue() print(output) print(f" [ERROR] {task_name} 执行失败: {e}") import traceback traceback.print_exc() return { 'name': task_name, 'status': 'ERROR', 'elapsed': elapsed, 'output': output, 'error': str(e), 'result': {}, } def generate_report(task_outputs: list, metrics: dict) -> str: """生成合并诊断报告 (Markdown)""" report_lines = [] report_lines.append("# ETF 轮动策略深度诊断报告") report_lines.append(f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report_lines.append(f"\n回测期间: {metrics.get('start_date', 'N/A')} ~ {metrics.get('end_date', 'N/A')}") # 策略表现快照 report_lines.append("\n## 策略表现快照") report_lines.append(f"\n| 指标 | 数值 |") report_lines.append(f"|---|---|") report_lines.append(f"| 累计收益 | {metrics.get('total_return', 0):+.2%} |") report_lines.append(f"| 年化收益 | {metrics.get('annual_return', 0):+.2%} |") report_lines.append(f"| 最大回撤 | {metrics.get('max_drawdown', 0):.2%} |") report_lines.append(f"| 夏普比率 | {metrics.get('sharpe_ratio', 0):.2f} |") report_lines.append(f"| Calmar 比率 | {metrics.get('calmar_ratio', 0):.2f} |") report_lines.append(f"| 日胜率 | {metrics.get('win_rate', 0):.1%} |") report_lines.append(f"| 调仓次数 | {metrics.get('rebalance_count', 0)} |") # 各 Task 输出 task_titles = { 'Task 1': '信号产生问题诊断', 'Task 2': '收益计算问题诊断', 'Task 3': '调仓逻辑问题诊断', 'Task 4': '资金管理问题诊断', 'Task 5': '整体收益归因分析', 'Task 6': '回撤诊断', } for i, task_out in enumerate(task_outputs, 1): title = task_titles.get(f'Task {i}', f'Task {i}') report_lines.append(f"\n## Task {i}: {title}") report_lines.append(f"\n状态: {task_out['status']} | 耗时: {task_out['elapsed']:.1f}s") if task_out['status'] == 'ERROR': report_lines.append(f"\n**执行失败**: {task_out.get('error', 'Unknown')}") continue # 提取关键结论 output = task_out['output'] result = task_out.get('result', {}) # 添加诊断结论 report_lines.append(f"\n### 诊断结论") if f'Task {i}' in task_titles: conclusions = extract_conclusions(i, result, output) report_lines.extend(conclusions) # 综合建议 report_lines.append("\n## 综合优化建议") report_lines.append(generate_recommendations(task_outputs)) # 执行统计 report_lines.append("\n## 执行统计") report_lines.append(f"\n| Task | 状态 | 耗时 |") report_lines.append(f"|---|---|---|") for i, task_out in enumerate(task_outputs, 1): title = task_titles.get(f'Task {i}', f'Task {i}') report_lines.append(f"| Task {i}: {title} | {task_out['status']} | {task_out['elapsed']:.1f}s |") total_time = sum(t['elapsed'] for t in task_outputs) report_lines.append(f"| **总计** | | **{total_time:.1f}s** |") return '\n'.join(report_lines) def extract_conclusions(task_num: int, result: dict, output: str) -> list: """从 Task 结果中提取关键结论""" lines = [] if task_num == 1: freq = result.get('frequency', {}) jitter = result.get('jitter', {}) threshold = result.get('threshold', {}) lines.append(f"- 调仓频率: 每 {freq.get('avg_interval', 0):.1f} 天一次," f"无效调仓率 {freq.get('invalid_rate', 0):.1f}%") lines.append(f"- 短期抖动事件: {jitter.get('jitter_events', 0)} 次") lines.append(f"- 债券持有占比: {threshold.get('bond_hold_pct', 0)*100:.1f}%") elif task_num == 2: first = result.get('first_day', {}) t1 = result.get('t1_bias', {}) lines.append(f"- 首日 NAV = {first.get('first_nav', 0):.6f},存在轻微逻辑瑕疵") lines.append(f"- 极端日共 {t1.get('extreme_days', 0)} 次") elif task_num == 3: min_hold = result.get('min_hold', []) if min_hold: lines.append(f"- 最小持仓期模拟结果:") for r in min_hold: lines.append(f" - {r['min_hold']}天: 年化={r['annual_return']:+.2%}, " f"回撤={r['max_drawdown']:.2%}, 夏普={r['sharpe']:.2f}") elif task_num == 4: lines.append("- 止损机制可减少极端回撤,但频繁止损可能拖累长期收益") lines.append("- 高波动期减仓有助于控制回撤") elif task_num == 5: conc = result.get('concentration', {}) lines.append(f"- 收益依赖度: 最好5天贡献了 {conc.get('dependency_pct', 0):.1f}% 的最终净值") elif task_num == 6: max_dd = result.get('max_dd', {}) tail = result.get('tail', {}) lines.append(f"- 最大回撤 {max_dd.get('max_dd', 0):.2%} 发生在 {max_dd.get('trough_date', 'N/A')}") lines.append(f"- CVaR(5%): {tail.get('cvar_5pct', 0):+.4%}") lines.append(f"- 最大连续亏损: {tail.get('max_streak', 0)} 天") return lines def generate_recommendations(task_outputs: list) -> str: """生成综合优化建议""" recs = """ ### 优先级 P0(预期影响最大) 1. **降低调仓频率** - 引入最小持仓期约束(建议 5 天起步) - 在 `_generate_signals` 中加入 `min_hold_days` 检查 2. **启用溢价控制** - 对 QDII ETF(NDX/N225/GDAXI/HSI/HSTECH)启用溢价过滤 - 建议 threshold=5%,避免高溢价买入 ### 优先级 P1(显著改善回撤) 3. **组合级止损机制** - 建议回撤 > 8% 时触发止损,转债券持有 10 天 4. **修复首日 NAV 逻辑** - 首日 `current_holdings` 为空时不应计算收益 ### 优先级 P2(提升风险调整收益) 5. **波动率加权配置** - 替代等权,使用波动率倒数加权平衡风险贡献 6. **波动率适配仓位** - 高波动期(滚动20日波动率 > 20%)减仓至 2/3 ### 优先级 P3(进一步优化) 7. **评估分组机制** - 对比取消分组 vs 当前分组的收益差异 8. **优化动态阈值** - 调整 bond_ratio,测试不同阈值对防御效果的影响 """ return recs def main(): print("=" * 60) print(" ETF 轮动策略深度诊断 - 统一入口") print("=" * 60) ensure_output_dir() # 加载原始指标 metrics = load_metrics() # 从 detail JSON 补充日期 from rotation.experiments.common import load_detail_meta meta = load_detail_meta() metrics['start_date'] = meta['start_date'] metrics['end_date'] = meta['end_date'] print(f"\n策略期间: {meta['start_date']} ~ {meta['end_date']}") print(f"累计收益: {metrics['total_return']:+.2%}") print(f"年化收益: {metrics['annual_return']:+.2%}") print(f"最大回撤: {metrics['max_drawdown']:.2%}") # 导入并运行各 Task from rotation.experiments.task1_signal_analysis import main as task1_main from rotation.experiments.task2_return_calc_analysis import main as task2_main from rotation.experiments.task3_rebalance_analysis import main as task3_main from rotation.experiments.task4_capital_mgmt_analysis import main as task4_main from rotation.experiments.task5_return_attribution import main as task5_main from rotation.experiments.task6_drawdown_analysis import main as task6_main tasks = [ (task1_main, 'Task 1: 信号产生问题诊断'), (task2_main, 'Task 2: 收益计算问题诊断'), (task3_main, 'Task 3: 调仓逻辑问题诊断'), (task4_main, 'Task 4: 资金管理问题诊断'), (task5_main, 'Task 5: 整体收益归因分析'), (task6_main, 'Task 6: 回撤诊断'), ] task_outputs = [] for func, name in tasks: result = capture_task(func, name) task_outputs.append(result) # 生成合并报告 print_section("生成诊断报告") report = generate_report(task_outputs, metrics) report_path = OUTPUT_DIR / 'diagnosis_report.md' with open(report_path, 'w', encoding='utf-8') as f: f.write(report) print(f" + 诊断报告已保存: {report_path}") # 输出摘要 print_section("执行完成") ok_count = sum(1 for t in task_outputs if t['status'] == 'OK') err_count = sum(1 for t in task_outputs if t['status'] == 'ERROR') total_time = sum(t['elapsed'] for t in task_outputs) print(f" 成功: {ok_count}/6, 失败: {err_count}/6") print(f" 总耗时: {total_time:.1f}s") print(f" 报告路径: {report_path}") if __name__ == '__main__': main()