新增6维度策略诊断实验脚本和报告: - task1: 信号产生分析 (调仓频率、无效调仓率) - task2: 收益计算分析 (T+1执行偏差、溢价问题) - task3: 调仓逻辑分析 (最小持仓期模拟) - task4: 资金管理分析 (止损、波动率适配) - task5: 收益归因分析 (集中度、静态vs轮动) - task6: 回撤诊断分析 (最大回撤复盘、尾部风险) 输出报告: - diagnosis_report.md: 完整策略诊断报告 - rebalancing_optimization_experiment.md: 调仓频率优化实验报告 实验结论: - 发现调仓过于频繁 (405次/1549天) - No-Trade Region方案可提升年化3%、夏普0.11 - 但改善幅度有限,信号质量是根本瓶颈
295 lines
11 KiB
Python
295 lines
11 KiB
Python
"""
|
|
Task 1: 信号产生问题诊断
|
|
|
|
分析维度:
|
|
1.1 调仓频率过高 - 统计调仓间隔分布、无效调仓比例
|
|
1.2 抖动检测 - 同一资产在阈值附近反复进出
|
|
1.3 动量因子评估 - 动量得分分布、崩盘过滤器触发率
|
|
1.4 动态阈值有效性 - 债券填充频率、债券持有后的收益表现
|
|
"""
|
|
|
|
import ast
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import Counter, defaultdict
|
|
from typing import Dict, List
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from rotation.experiments.common import (
|
|
load_nav, load_signals, load_detail_days, load_detail_meta,
|
|
print_section, ensure_output_dir, compute_drawdown,
|
|
)
|
|
|
|
|
|
def analyze_rebalance_frequency(signals: pd.DataFrame, nav: pd.DataFrame):
|
|
"""1.1 调仓频率分析"""
|
|
print_section("1.1 调仓频率分析")
|
|
|
|
rebal = signals[signals['is_rebalance']].reset_index(drop=True)
|
|
n_rebal = len(rebal)
|
|
n_days = len(signals)
|
|
avg_interval = n_days / n_rebal if n_rebal > 0 else float('inf')
|
|
|
|
print(f" 总交易日: {n_days}")
|
|
print(f" 调仓次数: {n_rebal}")
|
|
print(f" 平均调仓间隔: {avg_interval:.1f} 天")
|
|
|
|
# 调仓间隔分布
|
|
rebal_idx = signals[signals['is_rebalance']].index.tolist()
|
|
if len(rebal_idx) > 1:
|
|
gaps = [rebal_idx[i+1] - rebal_idx[i] for i in range(len(rebal_idx)-1)]
|
|
print(f" 最短间隔: {min(gaps)} 天")
|
|
print(f" 最长间隔: {max(gaps)} 天")
|
|
# 分位数
|
|
for p in [25, 50, 75, 90]:
|
|
print(f" P{p} 间隔: {np.percentile(gaps, p):.0f} 天")
|
|
|
|
# 无效调仓统计:调仓后 T+1 收益为负
|
|
invalid_count = 0
|
|
total_cost_drag = 0.0
|
|
trade_cost = 0.001
|
|
|
|
for idx in rebal_idx:
|
|
if idx + 1 < len(nav):
|
|
next_ret = nav.iloc[idx + 1]['daily_return']
|
|
if next_ret < 0:
|
|
invalid_count += 1
|
|
total_cost_drag += trade_cost # 每次调仓扣除万1
|
|
|
|
invalid_rate = invalid_count / n_rebal * 100 if n_rebal > 0 else 0
|
|
print(f"\n 无效调仓(T+1收益<0): {invalid_count}/{n_rebal} = {invalid_rate:.1f}%")
|
|
print(f" 累计交易成本: {n_rebal} 次 x 万1 = {total_cost_drag:.4f} NAV 单位 "
|
|
f"(约占总收益 {total_cost_drag/(nav.iloc[-1]['nav']-nav.iloc[0]['nav'])*100:.1f}%)")
|
|
|
|
# 按年统计调仓频率
|
|
signals_copy = signals.copy()
|
|
signals_copy['year'] = signals_copy['date'].dt.year
|
|
print(f"\n 分年度调仓频率:")
|
|
for year, grp in signals_copy.groupby('year'):
|
|
yr_rebal = grp['is_rebalance'].sum()
|
|
yr_days = len(grp)
|
|
print(f" {year}: {yr_rebal} 次 / {yr_days} 天 = 每 {yr_days/yr_rebal:.1f} 天" if yr_rebal > 0 else f" {year}: 0 次")
|
|
|
|
return {'n_rebal': n_rebal, 'avg_interval': avg_interval, 'invalid_rate': invalid_rate}
|
|
|
|
|
|
def analyze_jitter(signals: pd.DataFrame):
|
|
"""1.2 抖动检测:同一资产短期内反复进出"""
|
|
print_section("1.2 抖动检测")
|
|
|
|
# 统计每个资产的进出次数
|
|
asset_entries = defaultdict(list) # code -> list of (date, action)
|
|
for _, row in signals.iterrows():
|
|
date = row['date']
|
|
added = ast.literal_eval(row['added']) if isinstance(row['added'], str) else row['added']
|
|
removed = ast.literal_eval(row['removed']) if isinstance(row['removed'], str) else row['removed']
|
|
for code in added:
|
|
asset_entries[code].append((date, 'IN'))
|
|
for code in removed:
|
|
asset_entries[code].append((date, 'OUT'))
|
|
|
|
print(" 各资产进出统计:")
|
|
jitter_events = 0
|
|
for code in sorted(asset_entries.keys()):
|
|
events = asset_entries[code]
|
|
n_in = sum(1 for _, a in events if a == 'IN')
|
|
n_out = sum(1 for _, a in events if a == 'OUT')
|
|
# 检测短期抖动:连续 IN-OUT 或 OUT-IN 间隔 <= 3 天
|
|
short_switches = 0
|
|
for i in range(1, len(events)):
|
|
gap = (events[i][0] - events[i-1][0]).days
|
|
if gap <= 3 and events[i][1] != events[i-1][1]:
|
|
short_switches += 1
|
|
jitter_events += 1
|
|
print(f" {code}: 进入 {n_in} 次, 退出 {n_out} 次, 短期抖动(<=3天) {short_switches} 次")
|
|
|
|
print(f"\n 总短期抖动事件: {jitter_events}")
|
|
return {'jitter_events': jitter_events}
|
|
|
|
|
|
def analyze_momentum_distribution(days: List[dict]):
|
|
"""1.3 动量因子分布分析"""
|
|
print_section("1.3 动量因子分布")
|
|
|
|
# 收集所有资产的动量得分
|
|
momentum_by_code = defaultdict(list)
|
|
crash_filter_count = 0
|
|
|
|
for day in days:
|
|
for code, asset in day.get('assets', {}).items():
|
|
m = asset.get('momentum')
|
|
if m is not None:
|
|
momentum_by_code[code].append(m)
|
|
if m == 0.0:
|
|
crash_filter_count += 1
|
|
|
|
print(" 各资产动量得分统计:")
|
|
for code in sorted(momentum_by_code.keys()):
|
|
vals = momentum_by_code[code]
|
|
if not vals:
|
|
continue
|
|
arr = np.array(vals)
|
|
print(f" {code}: 均值={arr.mean():.4f}, 中位数={np.median(arr):.4f}, "
|
|
f"std={arr.std():.4f}, min={arr.min():.4f}, max={arr.max():.4f}")
|
|
|
|
total_momentum_values = sum(len(v) for v in momentum_by_code.values())
|
|
print(f"\n 崩盘过滤器(momentum=0)触发次数: {crash_filter_count}/{total_momentum_values} "
|
|
f"= {crash_filter_count/total_momentum_values*100:.1f}%")
|
|
|
|
# 动量得分 Top1 但最终未被选中的情况
|
|
top1_not_selected = 0
|
|
total_days_with_factors = 0
|
|
for day in days:
|
|
assets = day.get('assets', {})
|
|
holdings = set(day.get('holdings', []))
|
|
valid_assets = {c: a for c, a in assets.items() if a.get('momentum') is not None and c != '931862.CSI'}
|
|
if not valid_assets:
|
|
continue
|
|
total_days_with_factors += 1
|
|
top1_code = max(valid_assets, key=lambda c: valid_assets[c]['momentum'])
|
|
if top1_code not in holdings:
|
|
top1_not_selected += 1
|
|
|
|
print(f" 动量 Top1 但未被选中的天数: {top1_not_selected}/{total_days_with_factors} "
|
|
f"= {top1_not_selected/total_days_with_factors*100:.1f}%")
|
|
|
|
return {'crash_filter_rate': crash_filter_count / total_momentum_values if total_momentum_values > 0 else 0}
|
|
|
|
|
|
def analyze_dynamic_threshold(days: List[dict], signals: pd.DataFrame, nav: pd.DataFrame):
|
|
"""1.4 动态阈值有效性分析"""
|
|
print_section("1.4 动态阈值有效性分析")
|
|
|
|
# 统计债券被持有的天数
|
|
bond_code = '931862.CSI'
|
|
bond_holding_days = 0
|
|
total_days = len(days)
|
|
bond_fills = 0 # 因其他资产不足而被债券填充的次数
|
|
|
|
for day in days:
|
|
holdings = day.get('holdings', [])
|
|
if bond_code in holdings:
|
|
bond_holding_days += 1
|
|
|
|
# 统计债券填充(而非主动选中)的次数
|
|
for day in days:
|
|
assets = day.get('assets', {})
|
|
bond_asset = assets.get(bond_code, {})
|
|
holdings = day.get('holdings', [])
|
|
# 如果债券被持有但 above_threshold 为 False 或 momentum < threshold
|
|
if bond_code in holdings and bond_asset.get('momentum') is not None:
|
|
if bond_asset.get('momentum', 0) < bond_asset.get('threshold', 0):
|
|
bond_fills += 1
|
|
|
|
print(f" 债券({bond_code})持有天数: {bond_holding_days}/{total_days} "
|
|
f"= {bond_holding_days/total_days*100:.1f}%")
|
|
print(f" 债券填充(动量<阈值)次数: {bond_fills}")
|
|
|
|
# 分析债券持有期间的收益表现
|
|
nav_df = nav.copy()
|
|
signals_copy = signals.copy()
|
|
|
|
# 按是否持有债券分组统计日收益
|
|
bond_hold_rets = []
|
|
no_bond_rets = []
|
|
for i, row in signals_copy.iterrows():
|
|
holdings = ast.literal_eval(row['holdings']) if isinstance(row['holdings'], str) else row['holdings']
|
|
ret = nav_df.iloc[i]['daily_return'] if i < len(nav_df) else 0
|
|
if bond_code in holdings:
|
|
bond_hold_rets.append(ret)
|
|
else:
|
|
no_bond_rets.append(ret)
|
|
|
|
if bond_hold_rets:
|
|
print(f"\n 持有债券期间日收益: 均值={np.mean(bond_hold_rets):.6f}, "
|
|
f"std={np.std(bond_hold_rets):.6f}, 天数={len(bond_hold_rets)}")
|
|
if no_bond_rets:
|
|
print(f" 不持债券期间日收益: 均值={np.mean(no_bond_rets):.6f}, "
|
|
f"std={np.std(no_bond_rets):.6f}, 天数={len(no_bond_rets)}")
|
|
|
|
# 债券填充后 T+5 收益
|
|
print(f"\n 债券填充后 T+N 收益分析:")
|
|
for _, row in signals_copy.iterrows():
|
|
holdings = ast.literal_eval(row['holdings']) if isinstance(row['holdings'], str) else row['holdings']
|
|
if bond_code not in holdings:
|
|
continue
|
|
|
|
# 简单统计:连续持有债券的天数段
|
|
bond_streaks = []
|
|
current_streak = 0
|
|
for day in days:
|
|
if bond_code in day.get('holdings', []):
|
|
current_streak += 1
|
|
else:
|
|
if current_streak > 0:
|
|
bond_streaks.append(current_streak)
|
|
current_streak = 0
|
|
if current_streak > 0:
|
|
bond_streaks.append(current_streak)
|
|
|
|
if bond_streaks:
|
|
print(f" 连续持有债券段数: {len(bond_streaks)}")
|
|
print(f" 平均连续持有天数: {np.mean(bond_streaks):.1f}")
|
|
print(f" 最长连续持有天数: {max(bond_streaks)}")
|
|
print(f" 最短连续持有天数: {min(bond_streaks)}")
|
|
|
|
# 阈值分布
|
|
thresholds = [day.get('assets', {}).get(bond_code, {}).get('threshold', 0) for day in days]
|
|
thresholds = [t for t in thresholds if t is not None and t > 0]
|
|
if thresholds:
|
|
print(f"\n 动态阈值(短债动量)分布:")
|
|
print(f" 均值: {np.mean(thresholds):.6f}")
|
|
print(f" 中位数: {np.median(thresholds):.6f}")
|
|
print(f" 最小: {np.min(thresholds):.6f}")
|
|
print(f" 最大: {np.max(thresholds):.6f}")
|
|
|
|
return {
|
|
'bond_hold_pct': bond_holding_days / total_days,
|
|
'bond_fills': bond_fills,
|
|
}
|
|
|
|
|
|
def main():
|
|
print_section("Task 1: 信号产生问题诊断")
|
|
|
|
nav = load_nav()
|
|
signals = load_signals()
|
|
days = load_detail_days()
|
|
meta = load_detail_meta()
|
|
|
|
print(f" 数据期间: {meta['start_date']} ~ {meta['end_date']}")
|
|
print(f" 动量窗口: {meta['n_days']} 天")
|
|
print(f" 选择数量: {meta['select_num']}")
|
|
|
|
results = {}
|
|
|
|
# 1.1 调仓频率
|
|
results['frequency'] = analyze_rebalance_frequency(signals, nav)
|
|
|
|
# 1.2 抖动检测
|
|
results['jitter'] = analyze_jitter(signals)
|
|
|
|
# 1.3 动量因子
|
|
results['momentum'] = analyze_momentum_distribution(days)
|
|
|
|
# 1.4 动态阈值
|
|
results['threshold'] = analyze_dynamic_threshold(days, signals, nav)
|
|
|
|
print_section("Task 1 总结")
|
|
print(f" 1. 调仓频率: 每 {results['frequency']['avg_interval']:.1f} 天调仓一次,")
|
|
print(f" 无效调仓率 {results['frequency']['invalid_rate']:.1f}%,交易成本侵蚀约 "
|
|
f"{results['frequency']['n_rebal'] * 0.001 * 100:.1f}%")
|
|
print(f" 2. 短期抖动事件: {results['jitter']['jitter_events']} 次")
|
|
print(f" 3. 崩盘过滤器触发率: {results['momentum']['crash_filter_rate']*100:.1f}%")
|
|
print(f" 4. 债券持有占比: {results['threshold']['bond_hold_pct']*100:.1f}%")
|
|
|
|
return results
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|