"""IC Calibration diagnostic analysis script""" import os, sys, numpy as np, pandas as pd from pathlib import Path from collections import Counter os.environ['FLASK_API_URL'] = 'https://k3s.tokenpluse.xyz' sys.path.insert(0, str(Path.cwd())) from rotation.simple_rotation import SimpleRotationStrategy from rotation.ic_calibration import ICCalibrator def run_and_analyze(select_num, ic_enabled): s = SimpleRotationStrategy() s.config.rotation.select_num = select_num s.select_num = select_num if ic_enabled: s.config.rotation.ic_calibration.enabled = True s.ic_calibrator = ICCalibrator(fwd_days=5, min_samples=120, ic_method='pearson') r = s.run() return s, s.daily_records print("=" * 90) print(" Running 4 experiments...") print("=" * 90) s1, r1 = run_and_analyze(3, False) s2, r2 = run_and_analyze(3, True) s3, r3 = run_and_analyze(1, False) s4, r4 = run_and_analyze(1, True) # === 1. 持仓频率分析 === print("\n\n" + "=" * 90) print(" 1. 持仓频率对比") print("=" * 90) for label, recs, s_obj in [ ("原始 select=3", r1, s1), ("IC校准 select=3", r2, s2), ("原始 select=1", r3, s3), ("IC校准 select=1", r4, s4), ]: hold_counter = Counter() for rec in recs: for h in rec['holdings']: nm = s_obj.config.asset_pools.assets.get(h) name = nm.name if nm else h hold_counter[name] += 1 print(f"\n [{label}] 持仓天数分布:") for name, cnt in hold_counter.most_common(): pct = cnt / len(recs) * 100 print(f" {name:<12}: {cnt:>5} 天 ({pct:>5.1f}%)") # === 2. 关键资产选中变化 === print("\n\n" + "=" * 90) print(" 2. IC校准 select=3: 关键资产被选中变化") print("=" * 90) targets = ['日经225', '德国DAX', '纳指100', '恒生指数', '黄金', '有色金属', '中证红利低波', '原油', '创业板指'] for name_key in targets: orig = sum(1 for r in r1 if any( name_key == (s1.config.asset_pools.assets.get(h).name if s1.config.asset_pools.assets.get(h) else h) for h in r['holdings'])) calib = sum(1 for r in r2 if any( name_key == (s2.config.asset_pools.assets.get(h).name if s2.config.asset_pools.assets.get(h) else h) for h in r['holdings'])) print(f" {name_key:<12}: 原始={orig:>5}天 -> IC校准={calib:>5}天 (差={calib-orig:+d})") # === 3. 收益质量分析 === print("\n\n" + "=" * 90) print(" 3. 持有期收益质量") print("=" * 90) for label, recs in [ ("原始 select=3", r1), ("IC校准 select=3", r2), ("原始 select=1", r3), ("IC校准 select=1", r4), ]: arr = np.array([r['daily_return'] for r in recs]) win = arr[arr > 0] lose = arr[arr < 0] print(f"\n [{label}]:") print(f" 日均: {arr.mean()*100:+.4f}% 胜率: {(arr>0).mean()*100:.1f}%") print(f" 盈利均: {win.mean()*100:+.4f}% 亏损均: {lose.mean()*100:+.4f}% 盈亏比: {abs(win.mean()/lose.mean()):.3f}") # Worst 5 days sorted_rets = sorted(recs, key=lambda x: x['daily_return']) worst5 = [r['daily_return'] * 100 for r in sorted_rets[:5]] best5 = [r['daily_return'] * 100 for r in sorted_rets[-5:]] print(f" 最差5天: {['%.2f%%' % v for v in worst5]}") print(f" 最好5天: {['%.2f%%' % v for v in best5]}") # === 4. 分段绩效 === print("\n\n" + "=" * 90) print(" 4. 分段绩效: 冷启动期(前170天) vs 校准生效期") print("=" * 90) for label, recs in [ ("原始 select=1", r3), ("IC校准 select=1", r4), ("原始 select=3", r1), ("IC校准 select=3", r2), ]: cold = recs[:170] warm = recs[170:] cold_rets = np.array([r['daily_return'] for r in cold]) warm_rets = np.array([r['daily_return'] for r in warm]) cold_cum = np.prod(1 + cold_rets) - 1 warm_cum = np.prod(1 + warm_rets) - 1 cold_ann = (1 + cold_cum) ** (252 / len(cold_rets)) - 1 warm_ann = (1 + warm_cum) ** (252 / len(warm_rets)) - 1 warm_nav = np.cumprod(1 + warm_rets) warm_peak = np.maximum.accumulate(warm_nav) warm_dd = (warm_nav - warm_peak) / warm_peak warm_maxdd = warm_dd.min() cold_nav = np.cumprod(1 + cold_rets) cold_peak = np.maximum.accumulate(cold_nav) cold_dd = (cold_nav - cold_peak) / cold_peak cold_maxdd = cold_dd.min() print(f"\n [{label}]:") print(f" 冷启动(1-170天): 累计={cold_cum*100:+.2f}% 年化={cold_ann*100:.2f}% 胜率={(cold_rets>0).mean()*100:.1f}% 最大回撤={cold_maxdd*100:.2f}%") print(f" 校准后(170天+): 累计={warm_cum*100:+.2f}% 年化={warm_ann*100:.2f}% 胜率={(warm_rets>0).mean()*100:.1f}% 最大回撤={warm_maxdd*100:.2f}%") # === 5. IC校准 select=3 退化的根因:新增持仓的收益质量 === print("\n\n" + "=" * 90) print(" 5. IC校准 select=3: 新增持仓收益分析") print("=" * 90) # For each day, check if holdings changed between original and calibrated changed_days = 0 new_asset_rets = [] removed_asset_rets = [] for orig_rec, calib_rec in zip(r1, r2): if orig_rec['date'] != calib_rec['date']: continue orig_set = set(orig_rec['holdings']) calib_set = set(calib_rec['holdings']) if orig_set != calib_set: changed_days += 1 # The return of the calibrated portfolio on that day new_asset_rets.append(calib_rec['daily_return']) removed_asset_rets.append(orig_rec['daily_return']) print(f"\n 持仓变化天数: {changed_days} / {len(r1)}") if new_asset_rets: new_arr = np.array(new_asset_rets) old_arr = np.array(removed_asset_rets) diff = new_arr - old_arr print(f" 变化日 - IC校准收益: {new_arr.mean()*100:+.4f}% 原始收益: {old_arr.mean()*100:+.4f}% 差: {diff.mean()*100:+.4f}%") print(f" 变化日胜率: IC校准={((new_arr>0).mean()*100):.1f}% 原始={((old_arr>0).mean()*100):.1f}%") print(f" 变化日累计: IC校准={new_arr.sum()*100:+.2f}% 原始={old_arr.sum()*100:+.2f}% 差={diff.sum()*100:+.2f}%") # === 6. IC stats === print("\n\n" + "=" * 90) print(" 6. IC校准最终状态") print("=" * 90) calibrator = s2.ic_calibrator for code in sorted(calibrator._history.keys()): stats = calibrator.get_stats(code) nm = s2.config.asset_pools.assets.get(code) name = nm.name if nm else code print(f" {name:<12} ({code:<14}): n={stats.n_samples:>5} IC={stats.ic_value:+.4f} sign={stats.ic_sign:+d}")