""" Task 1: 信号产生问题诊断 分析维度: 1.1 调仓频率过高 - 统计调仓间隔分布、无效调仓比例 1.2 抖动检测 - 同一资产在阈值附近反复进出 1.3 动量因子评估 - 动量得分分布、崩盘过滤器触发率 1.4 动态阈值有效性 - 债券填充频率、债券持有后的收益表现 """ import ast import sys from pathlib import Path from collections import Counter, defaultdict from typing import Dict, List import numpy as np import pandas as pd sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from rotation.experiments.common import ( load_nav, load_signals, load_detail_days, load_detail_meta, print_section, ensure_output_dir, compute_drawdown, ) def analyze_rebalance_frequency(signals: pd.DataFrame, nav: pd.DataFrame): """1.1 调仓频率分析""" print_section("1.1 调仓频率分析") rebal = signals[signals['is_rebalance']].reset_index(drop=True) n_rebal = len(rebal) n_days = len(signals) avg_interval = n_days / n_rebal if n_rebal > 0 else float('inf') print(f" 总交易日: {n_days}") print(f" 调仓次数: {n_rebal}") print(f" 平均调仓间隔: {avg_interval:.1f} 天") # 调仓间隔分布 rebal_idx = signals[signals['is_rebalance']].index.tolist() if len(rebal_idx) > 1: gaps = [rebal_idx[i+1] - rebal_idx[i] for i in range(len(rebal_idx)-1)] print(f" 最短间隔: {min(gaps)} 天") print(f" 最长间隔: {max(gaps)} 天") # 分位数 for p in [25, 50, 75, 90]: print(f" P{p} 间隔: {np.percentile(gaps, p):.0f} 天") # 无效调仓统计:调仓后 T+1 收益为负 invalid_count = 0 total_cost_drag = 0.0 trade_cost = 0.001 for idx in rebal_idx: if idx + 1 < len(nav): next_ret = nav.iloc[idx + 1]['daily_return'] if next_ret < 0: invalid_count += 1 total_cost_drag += trade_cost # 每次调仓扣除万1 invalid_rate = invalid_count / n_rebal * 100 if n_rebal > 0 else 0 print(f"\n 无效调仓(T+1收益<0): {invalid_count}/{n_rebal} = {invalid_rate:.1f}%") print(f" 累计交易成本: {n_rebal} 次 x 万1 = {total_cost_drag:.4f} NAV 单位 " f"(约占总收益 {total_cost_drag/(nav.iloc[-1]['nav']-nav.iloc[0]['nav'])*100:.1f}%)") # 按年统计调仓频率 signals_copy = signals.copy() signals_copy['year'] = signals_copy['date'].dt.year print(f"\n 分年度调仓频率:") for year, grp in signals_copy.groupby('year'): yr_rebal = grp['is_rebalance'].sum() yr_days = len(grp) print(f" {year}: {yr_rebal} 次 / {yr_days} 天 = 每 {yr_days/yr_rebal:.1f} 天" if yr_rebal > 0 else f" {year}: 0 次") return {'n_rebal': n_rebal, 'avg_interval': avg_interval, 'invalid_rate': invalid_rate} def analyze_jitter(signals: pd.DataFrame): """1.2 抖动检测:同一资产短期内反复进出""" print_section("1.2 抖动检测") # 统计每个资产的进出次数 asset_entries = defaultdict(list) # code -> list of (date, action) for _, row in signals.iterrows(): date = row['date'] added = ast.literal_eval(row['added']) if isinstance(row['added'], str) else row['added'] removed = ast.literal_eval(row['removed']) if isinstance(row['removed'], str) else row['removed'] for code in added: asset_entries[code].append((date, 'IN')) for code in removed: asset_entries[code].append((date, 'OUT')) print(" 各资产进出统计:") jitter_events = 0 for code in sorted(asset_entries.keys()): events = asset_entries[code] n_in = sum(1 for _, a in events if a == 'IN') n_out = sum(1 for _, a in events if a == 'OUT') # 检测短期抖动:连续 IN-OUT 或 OUT-IN 间隔 <= 3 天 short_switches = 0 for i in range(1, len(events)): gap = (events[i][0] - events[i-1][0]).days if gap <= 3 and events[i][1] != events[i-1][1]: short_switches += 1 jitter_events += 1 print(f" {code}: 进入 {n_in} 次, 退出 {n_out} 次, 短期抖动(<=3天) {short_switches} 次") print(f"\n 总短期抖动事件: {jitter_events}") return {'jitter_events': jitter_events} def analyze_momentum_distribution(days: List[dict]): """1.3 动量因子分布分析""" print_section("1.3 动量因子分布") # 收集所有资产的动量得分 momentum_by_code = defaultdict(list) crash_filter_count = 0 for day in days: for code, asset in day.get('assets', {}).items(): m = asset.get('momentum') if m is not None: momentum_by_code[code].append(m) if m == 0.0: crash_filter_count += 1 print(" 各资产动量得分统计:") for code in sorted(momentum_by_code.keys()): vals = momentum_by_code[code] if not vals: continue arr = np.array(vals) print(f" {code}: 均值={arr.mean():.4f}, 中位数={np.median(arr):.4f}, " f"std={arr.std():.4f}, min={arr.min():.4f}, max={arr.max():.4f}") total_momentum_values = sum(len(v) for v in momentum_by_code.values()) print(f"\n 崩盘过滤器(momentum=0)触发次数: {crash_filter_count}/{total_momentum_values} " f"= {crash_filter_count/total_momentum_values*100:.1f}%") # 动量得分 Top1 但最终未被选中的情况 top1_not_selected = 0 total_days_with_factors = 0 for day in days: assets = day.get('assets', {}) holdings = set(day.get('holdings', [])) valid_assets = {c: a for c, a in assets.items() if a.get('momentum') is not None and c != '931862.CSI'} if not valid_assets: continue total_days_with_factors += 1 top1_code = max(valid_assets, key=lambda c: valid_assets[c]['momentum']) if top1_code not in holdings: top1_not_selected += 1 print(f" 动量 Top1 但未被选中的天数: {top1_not_selected}/{total_days_with_factors} " f"= {top1_not_selected/total_days_with_factors*100:.1f}%") return {'crash_filter_rate': crash_filter_count / total_momentum_values if total_momentum_values > 0 else 0} def analyze_dynamic_threshold(days: List[dict], signals: pd.DataFrame, nav: pd.DataFrame): """1.4 动态阈值有效性分析""" print_section("1.4 动态阈值有效性分析") # 统计债券被持有的天数 bond_code = '931862.CSI' bond_holding_days = 0 total_days = len(days) bond_fills = 0 # 因其他资产不足而被债券填充的次数 for day in days: holdings = day.get('holdings', []) if bond_code in holdings: bond_holding_days += 1 # 统计债券填充(而非主动选中)的次数 for day in days: assets = day.get('assets', {}) bond_asset = assets.get(bond_code, {}) holdings = day.get('holdings', []) # 如果债券被持有但 above_threshold 为 False 或 momentum < threshold if bond_code in holdings and bond_asset.get('momentum') is not None: if bond_asset.get('momentum', 0) < bond_asset.get('threshold', 0): bond_fills += 1 print(f" 债券({bond_code})持有天数: {bond_holding_days}/{total_days} " f"= {bond_holding_days/total_days*100:.1f}%") print(f" 债券填充(动量<阈值)次数: {bond_fills}") # 分析债券持有期间的收益表现 nav_df = nav.copy() signals_copy = signals.copy() # 按是否持有债券分组统计日收益 bond_hold_rets = [] no_bond_rets = [] for i, row in signals_copy.iterrows(): holdings = ast.literal_eval(row['holdings']) if isinstance(row['holdings'], str) else row['holdings'] ret = nav_df.iloc[i]['daily_return'] if i < len(nav_df) else 0 if bond_code in holdings: bond_hold_rets.append(ret) else: no_bond_rets.append(ret) if bond_hold_rets: print(f"\n 持有债券期间日收益: 均值={np.mean(bond_hold_rets):.6f}, " f"std={np.std(bond_hold_rets):.6f}, 天数={len(bond_hold_rets)}") if no_bond_rets: print(f" 不持债券期间日收益: 均值={np.mean(no_bond_rets):.6f}, " f"std={np.std(no_bond_rets):.6f}, 天数={len(no_bond_rets)}") # 债券填充后 T+5 收益 print(f"\n 债券填充后 T+N 收益分析:") for _, row in signals_copy.iterrows(): holdings = ast.literal_eval(row['holdings']) if isinstance(row['holdings'], str) else row['holdings'] if bond_code not in holdings: continue # 简单统计:连续持有债券的天数段 bond_streaks = [] current_streak = 0 for day in days: if bond_code in day.get('holdings', []): current_streak += 1 else: if current_streak > 0: bond_streaks.append(current_streak) current_streak = 0 if current_streak > 0: bond_streaks.append(current_streak) if bond_streaks: print(f" 连续持有债券段数: {len(bond_streaks)}") print(f" 平均连续持有天数: {np.mean(bond_streaks):.1f}") print(f" 最长连续持有天数: {max(bond_streaks)}") print(f" 最短连续持有天数: {min(bond_streaks)}") # 阈值分布 thresholds = [day.get('assets', {}).get(bond_code, {}).get('threshold', 0) for day in days] thresholds = [t for t in thresholds if t is not None and t > 0] if thresholds: print(f"\n 动态阈值(短债动量)分布:") print(f" 均值: {np.mean(thresholds):.6f}") print(f" 中位数: {np.median(thresholds):.6f}") print(f" 最小: {np.min(thresholds):.6f}") print(f" 最大: {np.max(thresholds):.6f}") return { 'bond_hold_pct': bond_holding_days / total_days, 'bond_fills': bond_fills, } def main(): print_section("Task 1: 信号产生问题诊断") nav = load_nav() signals = load_signals() days = load_detail_days() meta = load_detail_meta() print(f" 数据期间: {meta['start_date']} ~ {meta['end_date']}") print(f" 动量窗口: {meta['n_days']} 天") print(f" 选择数量: {meta['select_num']}") results = {} # 1.1 调仓频率 results['frequency'] = analyze_rebalance_frequency(signals, nav) # 1.2 抖动检测 results['jitter'] = analyze_jitter(signals) # 1.3 动量因子 results['momentum'] = analyze_momentum_distribution(days) # 1.4 动态阈值 results['threshold'] = analyze_dynamic_threshold(days, signals, nav) print_section("Task 1 总结") print(f" 1. 调仓频率: 每 {results['frequency']['avg_interval']:.1f} 天调仓一次,") print(f" 无效调仓率 {results['frequency']['invalid_rate']:.1f}%,交易成本侵蚀约 " f"{results['frequency']['n_rebal'] * 0.001 * 100:.1f}%") print(f" 2. 短期抖动事件: {results['jitter']['jitter_events']} 次") print(f" 3. 崩盘过滤器触发率: {results['momentum']['crash_filter_rate']*100:.1f}%") print(f" 4. 债券持有占比: {results['threshold']['bond_hold_pct']*100:.1f}%") return results if __name__ == '__main__': main()