Files
etf/rotation/experiments/task4_capital_mgmt_analysis.py
aszerW 04b858ff09 feat: 添加ETF轮动策略诊断分析实验
新增6维度策略诊断实验脚本和报告:
- task1: 信号产生分析 (调仓频率、无效调仓率)
- task2: 收益计算分析 (T+1执行偏差、溢价问题)
- task3: 调仓逻辑分析 (最小持仓期模拟)
- task4: 资金管理分析 (止损、波动率适配)
- task5: 收益归因分析 (集中度、静态vs轮动)
- task6: 回撤诊断分析 (最大回撤复盘、尾部风险)

输出报告:
- diagnosis_report.md: 完整策略诊断报告
- rebalancing_optimization_experiment.md: 调仓频率优化实验报告

实验结论:
- 发现调仓过于频繁 (405次/1549天)
- No-Trade Region方案可提升年化3%、夏普0.11
- 但改善幅度有限,信号质量是根本瓶颈
2026-06-06 15:00:28 +08:00

322 lines
11 KiB
Python

"""
Task 4: 资金管理问题诊断
分析维度:
4.1 止损机制模拟 - 组合级止损 vs 单资产止损
4.2 波动率适配 - 基于组合波动率的动态仓位
4.3 现金管理评估 - 全仓 vs 债券填充 vs 空仓
"""
import sys
from pathlib import Path
from collections import defaultdict
from typing import Dict, List
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from rotation.experiments.common import (
load_nav, load_signals, load_detail_days, load_detail_meta,
print_section, compute_drawdown, compute_sharpe,
compute_annual_return,
)
def simulate_portfolio_stoploss(nav_df: pd.DataFrame, stop_threshold: float) -> dict:
"""模拟组合级止损:当组合从峰值回撤超过 stop_threshold 时,
假设转为持有债券(短债收益近似 0.02/252 每天)。
简化模型:止损触发后持有债券 10 天,然后恢复正常交易。
"""
nav = nav_df['nav'].values.copy()
returns = nav_df['daily_return'].values.copy()
bond_daily_ret = 0.02 / 252 # 短债近似日收益
simulated_nav = 1.0
sim_returns = []
peak = 1.0
in_stoploss = False
stoploss_days_remaining = 0
stoploss_triggers = 0
for i in range(len(nav)):
if in_stoploss:
# 止损期间持有债券
ret = bond_daily_ret
stoploss_days_remaining -= 1
if stoploss_days_remaining <= 0:
in_stoploss = False
else:
ret = returns[i]
simulated_nav *= (1 + ret)
peak = max(peak, simulated_nav)
dd = (simulated_nav - peak) / peak
if dd < -stop_threshold:
in_stoploss = True
stoploss_days_remaining = 10 # 止损后持有债券 10 天
stoploss_triggers += 1
ret = bond_daily_ret # 当天就转债券
simulated_nav *= (1 + ret) if not in_stoploss or stoploss_days_remaining < 10 else 1
# 修正:简化处理
if in_stoploss:
sim_returns.append(bond_daily_ret)
else:
sim_returns.append(returns[i])
# 重新计算 NAV
sim_nav = pd.Series(sim_returns).add(1).cumprod()
total_ret = sim_nav.iloc[-1] - 1
n = len(sim_nav)
annual_ret = (1 + total_ret) ** (252 / n) - 1
max_dd = compute_drawdown(sim_nav).min()
sharpe = compute_sharpe(pd.Series(sim_returns))
return {
'stop_threshold': stop_threshold,
'total_return': total_ret,
'annual_return': annual_ret,
'max_drawdown': max_dd,
'sharpe': sharpe,
'trigger_count': stoploss_triggers,
}
def simulate_asset_stoploss(days: List[dict], stop_pct: float) -> dict:
"""模拟单资产止损:当持仓资产从入场价回撤超过 stop_pct 时,强制卖出。
简化模型:卖出后该仓位转为债券 5 天。
"""
trade_cost = 0.001
bond_daily_ret = 0.02 / 252
# 追踪每个持仓的入场价格
entry_prices = {} # code -> entry_cum_return_etf at entry
nav = 1.0
stoploss_events = 0
returns = []
for day in days:
daily_return = day.get('daily_return', 0)
holdings = day.get('holdings', [])
# 检查是否有资产触发止损
triggered = []
for code, asset in day.get('assets', {}).items():
if asset.get('is_held') and asset.get('cum_return_etf') is not None:
cum_ret = asset['cum_return_etf']
if cum_ret < -stop_pct:
triggered.append(code)
if triggered:
stoploss_events += len(triggered)
# 简化:被止损的仓位按债券收益计算,其余按原收益
n_held = len(holdings) if holdings else 1
weight = 1.0 / n_held
adjusted_return = daily_return + weight * (bond_daily_ret - daily_return * weight)
nav *= (1 + adjusted_return)
returns.append(adjusted_return)
else:
nav *= (1 + daily_return)
returns.append(daily_return)
sim_nav = pd.Series(returns).add(1).cumprod()
total_ret = sim_nav.iloc[-1] - 1
n = len(returns)
annual_ret = (1 + total_ret) ** (252 / n) - 1 if n > 0 else 0
max_dd = compute_drawdown(sim_nav).min()
sharpe = compute_sharpe(pd.Series(returns))
return {
'stop_pct': stop_pct,
'total_return': total_ret,
'annual_return': annual_ret,
'max_drawdown': max_dd,
'sharpe': sharpe,
'stoploss_events': stoploss_events,
}
def analyze_stoploss(nav: pd.DataFrame, days: List[dict]):
"""4.1 止损机制模拟"""
print_section("4.1 组合级止损模拟")
# 原始策略作为基准
orig_nav = nav['nav']
orig_total = orig_nav.iloc[-1] / orig_nav.iloc[0] - 1
orig_dd = compute_drawdown(orig_nav).min()
orig_sharpe = compute_sharpe(nav['daily_return'])
print(f" 原始策略: 累计={orig_total:+.2%}, 最大回撤={orig_dd:.2%}, 夏普={orig_sharpe:.2f}")
for threshold in [0.05, 0.08, 0.10, 0.12]:
r = simulate_portfolio_stoploss(nav, threshold)
print(f" 组合止损线={threshold:.0%}: 累计={r['total_return']:+.2%}, "
f"回撤={r['max_drawdown']:.2%}, 夏普={r['sharpe']:.2f}, "
f"触发{r['trigger_count']}")
print_section("4.1a 单资产止损模拟")
for stop_pct in [0.05, 0.08, 0.10, 0.15]:
r = simulate_asset_stoploss(days, stop_pct)
print(f" 单资产止损线={stop_pct:.0%}: 累计={r['total_return']:+.2%}, "
f"回撤={r['max_drawdown']:.2%}, 夏普={r['sharpe']:.2f}, "
f"触发{r['stoploss_events']}")
return {}
def analyze_volatility_sizing(days: List[dict], nav: pd.DataFrame):
"""4.2 波动率适配分析"""
print_section("4.2 波动率适配分析")
# 计算滚动 20 日波动率
nav_df = nav.copy()
nav_df['rolling_vol'] = nav_df['daily_return'].rolling(20).std() * np.sqrt(252)
# 按波动率分桶统计收益
print(" 组合波动率分桶统计:")
valid = nav_df.dropna(subset=['rolling_vol'])
buckets = [(0, 0.10), (0.10, 0.15), (0.15, 0.20), (0.20, 0.30), (0.30, 1.0)]
for lo, hi in buckets:
mask = (valid['rolling_vol'] >= lo) & (valid['rolling_vol'] < hi)
subset = valid[mask]
if len(subset) == 0:
continue
avg_ret = subset['daily_return'].mean() * 252
avg_vol = subset['rolling_vol'].mean()
win_rate = (subset['daily_return'] > 0).mean()
print(f" 波动率 [{lo:.0%}, {hi:.0%}): {len(subset)} 天, "
f"年化收益={avg_ret:+.2%}, 胜率={win_rate:.1%}")
# 模拟:高波动期减仓
print(f"\n 模拟: 波动率 > 20% 时仓位减至 2/3:")
sim_returns = []
for _, row in valid.iterrows():
ret = row['daily_return']
vol = row['rolling_vol']
if vol > 0.20:
ret = ret * 2 / 3 # 减仓至 2/3
sim_returns.append(ret)
sim_nav = pd.Series(sim_returns).add(1).cumprod()
total_ret = sim_nav.iloc[-1] - 1
n = len(sim_returns)
annual_ret = (1 + total_ret) ** (252 / n) - 1
max_dd = compute_drawdown(sim_nav).min()
sharpe = compute_sharpe(pd.Series(sim_returns))
orig_total = nav_df['nav'].iloc[-1] / nav_df['nav'].iloc[0] - 1
print(f" 原始: 累计={orig_total:+.2%}")
print(f" 波动率适配: 累计={total_ret:+.2%}, 回撤={max_dd:.2%}, 夏普={sharpe:.2f}")
# 高波动期出现频率
high_vol_days = (valid['rolling_vol'] > 0.20).sum()
print(f"\n 高波动期(>20%): {high_vol_days}/{len(valid)} 天 ({high_vol_days/len(valid)*100:.1f}%)")
return {}
def analyze_cash_management(days: List[dict]):
"""4.3 现金管理评估"""
print_section("4.3 现金管理评估")
# 统计所有资产动量都低于阈值的天数(全部防御)
all_below = 0
partial_below = 0
total_days = 0
for day in days:
total_days += 1
assets = day.get('assets', {})
holdings = day.get('holdings', [])
non_bond_assets = {c: a for c, a in assets.items()
if c != '931862.CSI' and a.get('momentum') is not None}
if not non_bond_assets:
continue
below_count = sum(1 for a in non_bond_assets.values()
if not a.get('above_threshold', False))
if below_count == len(non_bond_assets):
all_below += 1
elif below_count > 0:
partial_below += 1
print(f" 全部非债券资产动量低于阈值: {all_below} 天 ({all_below/total_days*100:.1f}%)")
print(f" 部分非债券资产动量低于阈值: {partial_below} 天 ({partial_below/total_days*100:.1f}%)")
print(f" 所有资产动量都高于阈值: {total_days - all_below - partial_below}")
# 全部低于阈值时的后续收益
print(f"\n 全部低于阈值后的 T+N 收益:")
all_below_dates = []
for day in days:
assets = day.get('assets', {})
non_bond = {c: a for c, a in assets.items()
if c != '931862.CSI' and a.get('momentum') is not None}
if non_bond and all(not a.get('above_threshold', False) for a in non_bond.values()):
all_below_dates.append(day['date'])
# 计算全部低于阈值后 5/10/20 天的策略收益
nav_df = pd.DataFrame({'date': [d['date'] for d in days],
'daily_return': [d['daily_return'] for d in days]})
nav_df['date'] = pd.to_datetime(nav_df['date'])
for forward in [5, 10, 20]:
rets_after = []
for d in all_below_dates:
d_ts = pd.Timestamp(d)
mask = (nav_df['date'] > d_ts)
future = nav_df[mask].head(forward)
if len(future) == forward:
cum_ret = (1 + future['daily_return']).prod() - 1
rets_after.append(cum_ret)
if rets_after:
avg = np.mean(rets_after)
pos_rate = sum(1 for r in rets_after if r > 0) / len(rets_after)
print(f" T+{forward}: 均值={avg:+.4%}, 正收益占比={pos_rate:.1%}, 样本={len(rets_after)}")
# 持仓数量分布
print(f"\n 持仓数量分布:")
holding_counts = defaultdict(int)
for day in days:
n = len(day.get('holdings', []))
holding_counts[n] += 1
for n in sorted(holding_counts.keys()):
print(f" 持有 {n} 只资产: {holding_counts[n]} 天 ({holding_counts[n]/total_days*100:.1f}%)")
return {}
def main():
print_section("Task 4: 资金管理问题诊断")
nav = load_nav()
days = load_detail_days()
meta = load_detail_meta()
print(f" 数据期间: {meta['start_date']} ~ {meta['end_date']}")
results = {}
# 4.1 止损
results['stoploss'] = analyze_stoploss(nav, days)
# 4.2 波动率适配
results['vol_sizing'] = analyze_volatility_sizing(days, nav)
# 4.3 现金管理
results['cash'] = analyze_cash_management(days)
print_section("Task 4 总结")
print(" 1. 止损机制可减少极端回撤,但频繁止损可能拖累收益")
print(" 2. 高波动期减仓有助于控制回撤,但需要平衡收益损失")
print(" 3. 全部资产低于阈值时强制防御,后续短期收益偏弱")
return results
if __name__ == '__main__':
main()