Files
etf/rotation/experiments/run_all.py
aszerW 04b858ff09 feat: 添加ETF轮动策略诊断分析实验
新增6维度策略诊断实验脚本和报告:
- task1: 信号产生分析 (调仓频率、无效调仓率)
- task2: 收益计算分析 (T+1执行偏差、溢价问题)
- task3: 调仓逻辑分析 (最小持仓期模拟)
- task4: 资金管理分析 (止损、波动率适配)
- task5: 收益归因分析 (集中度、静态vs轮动)
- task6: 回撤诊断分析 (最大回撤复盘、尾部风险)

输出报告:
- diagnosis_report.md: 完整策略诊断报告
- rebalancing_optimization_experiment.md: 调仓频率优化实验报告

实验结论:
- 发现调仓过于频繁 (405次/1549天)
- No-Trade Region方案可提升年化3%、夏普0.11
- 但改善幅度有限,信号质量是根本瓶颈
2026-06-06 15:00:28 +08:00

279 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一入口:依次运行 6 个 Task收集输出合并生成最终诊断报告。
Usage:
python -m rotation.experiments.run_all
# 或
python rotation/experiments/run_all.py
"""
import io
import sys
import time
import json
from pathlib import Path
from contextlib import redirect_stdout
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from rotation.experiments.common import (
OUTPUT_DIR, ensure_output_dir, load_metrics,
print_section,
)
def capture_task(task_func, task_name: str) -> dict:
"""运行单个 Task 并捕获输出和返回值"""
print(f"\n{'#'*60}")
print(f"# 运行 {task_name}")
print(f"{'#'*60}")
buf = io.StringIO()
start = time.time()
try:
with redirect_stdout(buf):
result = task_func()
elapsed = time.time() - start
output = buf.getvalue()
print(output) # 同时打印到终端
return {
'name': task_name,
'status': 'OK',
'elapsed': elapsed,
'output': output,
'result': result,
}
except Exception as e:
elapsed = time.time() - start
output = buf.getvalue()
print(output)
print(f" [ERROR] {task_name} 执行失败: {e}")
import traceback
traceback.print_exc()
return {
'name': task_name,
'status': 'ERROR',
'elapsed': elapsed,
'output': output,
'error': str(e),
'result': {},
}
def generate_report(task_outputs: list, metrics: dict) -> str:
"""生成合并诊断报告 (Markdown)"""
report_lines = []
report_lines.append("# ETF 轮动策略深度诊断报告")
report_lines.append(f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(f"\n回测期间: {metrics.get('start_date', 'N/A')} ~ {metrics.get('end_date', 'N/A')}")
# 策略表现快照
report_lines.append("\n## 策略表现快照")
report_lines.append(f"\n| 指标 | 数值 |")
report_lines.append(f"|---|---|")
report_lines.append(f"| 累计收益 | {metrics.get('total_return', 0):+.2%} |")
report_lines.append(f"| 年化收益 | {metrics.get('annual_return', 0):+.2%} |")
report_lines.append(f"| 最大回撤 | {metrics.get('max_drawdown', 0):.2%} |")
report_lines.append(f"| 夏普比率 | {metrics.get('sharpe_ratio', 0):.2f} |")
report_lines.append(f"| Calmar 比率 | {metrics.get('calmar_ratio', 0):.2f} |")
report_lines.append(f"| 日胜率 | {metrics.get('win_rate', 0):.1%} |")
report_lines.append(f"| 调仓次数 | {metrics.get('rebalance_count', 0)} |")
# 各 Task 输出
task_titles = {
'Task 1': '信号产生问题诊断',
'Task 2': '收益计算问题诊断',
'Task 3': '调仓逻辑问题诊断',
'Task 4': '资金管理问题诊断',
'Task 5': '整体收益归因分析',
'Task 6': '回撤诊断',
}
for i, task_out in enumerate(task_outputs, 1):
title = task_titles.get(f'Task {i}', f'Task {i}')
report_lines.append(f"\n## Task {i}: {title}")
report_lines.append(f"\n状态: {task_out['status']} | 耗时: {task_out['elapsed']:.1f}s")
if task_out['status'] == 'ERROR':
report_lines.append(f"\n**执行失败**: {task_out.get('error', 'Unknown')}")
continue
# 提取关键结论
output = task_out['output']
result = task_out.get('result', {})
# 添加诊断结论
report_lines.append(f"\n### 诊断结论")
if f'Task {i}' in task_titles:
conclusions = extract_conclusions(i, result, output)
report_lines.extend(conclusions)
# 综合建议
report_lines.append("\n## 综合优化建议")
report_lines.append(generate_recommendations(task_outputs))
# 执行统计
report_lines.append("\n## 执行统计")
report_lines.append(f"\n| Task | 状态 | 耗时 |")
report_lines.append(f"|---|---|---|")
for i, task_out in enumerate(task_outputs, 1):
title = task_titles.get(f'Task {i}', f'Task {i}')
report_lines.append(f"| Task {i}: {title} | {task_out['status']} | {task_out['elapsed']:.1f}s |")
total_time = sum(t['elapsed'] for t in task_outputs)
report_lines.append(f"| **总计** | | **{total_time:.1f}s** |")
return '\n'.join(report_lines)
def extract_conclusions(task_num: int, result: dict, output: str) -> list:
"""从 Task 结果中提取关键结论"""
lines = []
if task_num == 1:
freq = result.get('frequency', {})
jitter = result.get('jitter', {})
threshold = result.get('threshold', {})
lines.append(f"- 调仓频率: 每 {freq.get('avg_interval', 0):.1f} 天一次,"
f"无效调仓率 {freq.get('invalid_rate', 0):.1f}%")
lines.append(f"- 短期抖动事件: {jitter.get('jitter_events', 0)}")
lines.append(f"- 债券持有占比: {threshold.get('bond_hold_pct', 0)*100:.1f}%")
elif task_num == 2:
first = result.get('first_day', {})
t1 = result.get('t1_bias', {})
lines.append(f"- 首日 NAV = {first.get('first_nav', 0):.6f},存在轻微逻辑瑕疵")
lines.append(f"- 极端日共 {t1.get('extreme_days', 0)}")
elif task_num == 3:
min_hold = result.get('min_hold', [])
if min_hold:
lines.append(f"- 最小持仓期模拟结果:")
for r in min_hold:
lines.append(f" - {r['min_hold']}天: 年化={r['annual_return']:+.2%}, "
f"回撤={r['max_drawdown']:.2%}, 夏普={r['sharpe']:.2f}")
elif task_num == 4:
lines.append("- 止损机制可减少极端回撤,但频繁止损可能拖累长期收益")
lines.append("- 高波动期减仓有助于控制回撤")
elif task_num == 5:
conc = result.get('concentration', {})
lines.append(f"- 收益依赖度: 最好5天贡献了 {conc.get('dependency_pct', 0):.1f}% 的最终净值")
elif task_num == 6:
max_dd = result.get('max_dd', {})
tail = result.get('tail', {})
lines.append(f"- 最大回撤 {max_dd.get('max_dd', 0):.2%} 发生在 {max_dd.get('trough_date', 'N/A')}")
lines.append(f"- CVaR(5%): {tail.get('cvar_5pct', 0):+.4%}")
lines.append(f"- 最大连续亏损: {tail.get('max_streak', 0)}")
return lines
def generate_recommendations(task_outputs: list) -> str:
"""生成综合优化建议"""
recs = """
### 优先级 P0预期影响最大
1. **降低调仓频率**
- 引入最小持仓期约束(建议 5 天起步)
- 在 `_generate_signals` 中加入 `min_hold_days` 检查
2. **启用溢价控制**
- 对 QDII ETFNDX/N225/GDAXI/HSI/HSTECH启用溢价过滤
- 建议 threshold=5%,避免高溢价买入
### 优先级 P1显著改善回撤
3. **组合级止损机制**
- 建议回撤 > 8% 时触发止损,转债券持有 10 天
4. **修复首日 NAV 逻辑**
- 首日 `current_holdings` 为空时不应计算收益
### 优先级 P2提升风险调整收益
5. **波动率加权配置**
- 替代等权,使用波动率倒数加权平衡风险贡献
6. **波动率适配仓位**
- 高波动期滚动20日波动率 > 20%)减仓至 2/3
### 优先级 P3进一步优化
7. **评估分组机制**
- 对比取消分组 vs 当前分组的收益差异
8. **优化动态阈值**
- 调整 bond_ratio测试不同阈值对防御效果的影响
"""
return recs
def main():
print("=" * 60)
print(" ETF 轮动策略深度诊断 - 统一入口")
print("=" * 60)
ensure_output_dir()
# 加载原始指标
metrics = load_metrics()
# 从 detail JSON 补充日期
from rotation.experiments.common import load_detail_meta
meta = load_detail_meta()
metrics['start_date'] = meta['start_date']
metrics['end_date'] = meta['end_date']
print(f"\n策略期间: {meta['start_date']} ~ {meta['end_date']}")
print(f"累计收益: {metrics['total_return']:+.2%}")
print(f"年化收益: {metrics['annual_return']:+.2%}")
print(f"最大回撤: {metrics['max_drawdown']:.2%}")
# 导入并运行各 Task
from rotation.experiments.task1_signal_analysis import main as task1_main
from rotation.experiments.task2_return_calc_analysis import main as task2_main
from rotation.experiments.task3_rebalance_analysis import main as task3_main
from rotation.experiments.task4_capital_mgmt_analysis import main as task4_main
from rotation.experiments.task5_return_attribution import main as task5_main
from rotation.experiments.task6_drawdown_analysis import main as task6_main
tasks = [
(task1_main, 'Task 1: 信号产生问题诊断'),
(task2_main, 'Task 2: 收益计算问题诊断'),
(task3_main, 'Task 3: 调仓逻辑问题诊断'),
(task4_main, 'Task 4: 资金管理问题诊断'),
(task5_main, 'Task 5: 整体收益归因分析'),
(task6_main, 'Task 6: 回撤诊断'),
]
task_outputs = []
for func, name in tasks:
result = capture_task(func, name)
task_outputs.append(result)
# 生成合并报告
print_section("生成诊断报告")
report = generate_report(task_outputs, metrics)
report_path = OUTPUT_DIR / 'diagnosis_report.md'
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f" + 诊断报告已保存: {report_path}")
# 输出摘要
print_section("执行完成")
ok_count = sum(1 for t in task_outputs if t['status'] == 'OK')
err_count = sum(1 for t in task_outputs if t['status'] == 'ERROR')
total_time = sum(t['elapsed'] for t in task_outputs)
print(f" 成功: {ok_count}/6, 失败: {err_count}/6")
print(f" 总耗时: {total_time:.1f}s")
print(f" 报告路径: {report_path}")
if __name__ == '__main__':
main()