From 07d6f1451c12ad83f17aa13daa9191326650c012 Mon Sep 17 00:00:00 2001 From: aszerW Date: Tue, 2 Jun 2026 01:16:44 +0800 Subject: [PATCH] fix(rotation): raise RuntimeError on held asset data failure - Add data integrity check: if any currently held asset is missing from factors, raise RuntimeError immediately to prevent false rebalance - Previously missing data would silently cause incorrect sell signals - Now fails fast with clear error message identifying the missing assets and the date of failure --- rotation/simple_rotation.py | 339 ++++++++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) diff --git a/rotation/simple_rotation.py b/rotation/simple_rotation.py index d50a520..56ef46e 100644 --- a/rotation/simple_rotation.py +++ b/rotation/simple_rotation.py @@ -317,6 +317,11 @@ class SimpleRotationStrategy: # Preloaded data self.index_data: Dict[str, pd.DataFrame] = {} self.etf_data: Dict[str, pd.DataFrame] = {} + self.benchmark_data: Optional[pd.DataFrame] = None + + # Benchmark config + self.benchmark_code = self.config.benchmark.code + self.benchmark_name = self.config.benchmark.name # Results self.daily_records: List[dict] = [] @@ -351,6 +356,13 @@ class SimpleRotationStrategy: self.data_cache.preload_premium(code, end_date=end_date) print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded") + # Load benchmark + print(f"\n Loading benchmark ({self.benchmark_code})...") + bm_df = self.data_cache.preload(self.benchmark_code, preload_start, end_date, adj='raw') + if bm_df is not None: + self.benchmark_data = bm_df + print(f" Benchmark: {len(bm_df)} rows") + def _compute_momentum(self, signal_code: str, date: pd.Timestamp) -> Optional[float]: """Compute momentum for a single code on a given date""" if signal_code not in self.index_data: @@ -538,6 +550,17 @@ class SimpleRotationStrategy: signal_date = date # First day: no prior trading day available new_holdings, factors, bond_momentum = self._generate_signals(signal_date) + + # Data integrity check: if any currently held asset is missing from + # today's factors, abort immediately to prevent false rebalancing. + if current_holdings: + missing = [c for c in current_holdings if c not in factors] + if missing: + raise RuntimeError( + f"Data failure: held assets {missing} missing from factors on " + f"{date.strftime('%Y-%m-%d')}. Aborting to prevent false rebalance." + ) + is_rebalance = (sorted(new_holdings) != sorted(current_holdings)) and len(current_holdings) > 0 # Return uses T's ETF prices (open for buy/sell, close for hold) @@ -724,6 +747,23 @@ class SimpleRotationStrategy: return None return round(float(val), 6) + def _get_latest_premium(self, trade_code: str, date: pd.Timestamp) -> Optional[float]: + """Get premium for trade_code, looking back up to 5 days if exact date not found.""" + group = self.trade_code_to_group.get(trade_code, '') + if group == 'BOND': + return None + premium_dict = self.data_cache.premium_data.get(trade_code) + if not premium_dict: + return None + # Try exact date first, then look back up to 5 calendar days + for offset in range(6): + check_date = date - timedelta(days=offset) + date_str = check_date.strftime('%Y-%m-%d') + val = premium_dict.get(date_str) + if val is not None: + return round(float(val), 6) + return None + def _build_day_assets(self, record: dict, date: pd.Timestamp, entry_info: Dict[str, dict]) -> dict: """Build V2-compatible per-asset detail dict for one day.""" @@ -900,6 +940,304 @@ class SimpleRotationStrategy: json.dump(metrics, f, ensure_ascii=False, indent=2) print(f" + Metrics: {metrics_path}") + # ============================================================ + # Report Generation (PNG chart with tables) + # ============================================================ + + def generate_report(self, output_dir: str = None): + """Generate performance report chart (PNG) with signal table, metrics, NAV, drawdown, holdings""" + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + if not self.daily_records: + print(" x No results for report") + return + + if output_dir is None: + output_dir = Path(__file__).parent / 'results' + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Build DataFrames + df = pd.DataFrame(self.daily_records) + df['date'] = pd.to_datetime(df['date']) + df = df.set_index('date') + strategy_nav = df['nav'] + strategy_ret = df['daily_return'] + + # Compute benchmark NAV + benchmark_nav = None + if self.benchmark_data is not None: + bm_close = self.benchmark_data['close'].reindex(df.index, method='ffill') + if bm_close is not None and not bm_close.isna().all(): + benchmark_nav = (1 + bm_close.pct_change()).cumprod() + first_valid = benchmark_nav.dropna().iloc[0] if len(benchmark_nav.dropna()) > 0 else 1 + benchmark_nav = benchmark_nav / first_valid + + # Compute individual asset NAVs + asset_navs = {} + for code in self.signal_codes: + if code in self.index_data: + price = self.index_data[code]['close'].reindex(df.index, method='ffill') + if price is not None and not price.isna().all(): + nav_s = (1 + price.pct_change()).cumprod() + fv = nav_s.dropna().iloc[0] if len(nav_s.dropna()) > 0 else 1 + asset_navs[code] = nav_s / fv + + # Compute metrics + s_total_return = strategy_nav.iloc[-1] / strategy_nav.iloc[0] - 1 + n_days = len(df) + s_annual = (1 + s_total_return) ** (252 / n_days) - 1 if n_days > 0 else 0 + s_sharpe = strategy_ret.mean() / strategy_ret.std() * np.sqrt(252) if strategy_ret.std() > 0 else 0 + s_peak = strategy_nav.cummax() + s_dd = ((strategy_nav - s_peak) / s_peak).min() + s_calmar = s_annual / abs(s_dd) if s_dd != 0 else 0 + non_zero = strategy_ret[strategy_ret != 0] + s_win_rate = (non_zero > 0).sum() / len(non_zero) if len(non_zero) > 0 else 0 + + # Benchmark metrics + b_total_return = b_annual = b_sharpe = b_dd = 0 + if benchmark_nav is not None: + bm_ret = benchmark_nav.pct_change() + b_total_return = benchmark_nav.iloc[-1] - 1 + b_annual = (1 + b_total_return) ** (252 / n_days) - 1 if n_days > 0 else 0 + b_sharpe = bm_ret.mean() / bm_ret.std() * np.sqrt(252) if bm_ret.std() > 0 else 0 + b_peak = benchmark_nav.cummax() + b_dd = ((benchmark_nav - b_peak) / b_peak).min() + + # Get latest holdings info + last_rec = self.daily_records[-1] + last_date = pd.Timestamp(last_rec['date']) + holdings = last_rec['holdings'] + factors = last_rec.get('factors', {}) + + # Build code config dict for display + code_config = {} + for name, asset in self.config.asset_pools.assets.items(): + code_config[asset.signal_source] = { + 'name': asset.name if hasattr(asset, 'name') else name, + 'etf': asset.trade_source, + 'market': asset.group, + } + + # Build positions info for table + # Sort holdings by momentum score descending + weight = 1.0 / self.select_num if self.select_num > 0 else 1.0 + sorted_holdings = sorted(holdings, key=lambda c: factors.get(c, 0) or 0, reverse=True) + + # Determine previous holdings to distinguish "调入" vs "维持" + last_idx = len(self.daily_records) - 1 + prev_holdings = set() + if last_idx > 0: + prev_holdings = set(self.daily_records[last_idx - 1]['holdings']) + is_rebalance_day = last_rec.get('is_rebalance', False) + + positions_info = [] + for code in sorted_holdings: + cfg = code_config.get(code, {}) + name = cfg.get('name', code) + etf_code = cfg.get('etf', '—') + score = factors.get(code) + idx_close = self._get_index_close(code, last_date) + trade_code = self.signal_to_trade.get(code, code) + etf_close = self._get_etf_close(trade_code, last_date) + premium = self._get_latest_premium(trade_code, last_date) + + # Determine action: 调入 (new on rebalance day) vs 维持 (already holding) + if is_rebalance_day and code not in prev_holdings: + action = '调入' + else: + action = '维持' + + # Find entry info: scan backwards for continuous holding start + entry_date = None + entry_price = None + holding_days = 0 + pnl = None + for rec in reversed(self.daily_records): + if code in rec['holdings']: + entry_date = pd.Timestamp(rec['date']) + p = self._get_etf_prices(trade_code, entry_date) + entry_price = p['open'] if p else None + else: + break + if entry_date is not None: + holding_days = (last_date - entry_date).days + if entry_price and entry_price > 0 and etf_close and etf_close > 0: + pnl = etf_close / entry_price - 1 + + positions_info.append({ + 'name': name, 'code': code, 'etf': etf_code, + 'weight': weight, 'score': score, + 'idx_close': idx_close, 'etf_close': etf_close, + 'premium': premium, 'action': action, + 'entry_date': entry_date, 'entry_price': entry_price, + 'holding_days': holding_days, 'pnl': pnl, + }) + + # Find exit positions: ONLY show when the last day is a rebalance day + # If last day is NOT rebalance, don't show any "调出" info (already past) + exit_positions = [] + if last_rec.get('is_rebalance', False): + # Find the previous day's holdings to compare + last_idx = len(self.daily_records) - 1 + if last_idx > 0: + old_holdings = set(self.daily_records[last_idx - 1]['holdings']) + new_holdings = set(holdings) + removed = old_holdings - new_holdings + for code in sorted(removed): + cfg = code_config.get(code, {}) + name = cfg.get('name', code) + etf_code = cfg.get('etf', '—') + idx_close = self._get_index_close(code, last_date) + trade_code = self.signal_to_trade.get(code, code) + etf_close = self._get_etf_close(trade_code, last_date) + premium = self._get_latest_premium(trade_code, last_date) + exit_positions.append({ + 'name': name, 'code': code, 'etf': etf_code, + 'weight': weight, 'score': None, + 'idx_close': idx_close, 'etf_close': etf_close, + 'premium': premium, 'action': '调出', + 'entry_date': None, 'entry_price': None, + 'holding_days': 0, 'pnl': None, + }) + + # ==================== Plot ==================== + plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "WenQuanYi Zen Hei", "DejaVu Sans"] + plt.rcParams["axes.unicode_minus"] = False + + n_rows = len(positions_info) + len(exit_positions) + signal_h = max(1.5, 0.5 + n_rows * 0.35) + fig = plt.figure(figsize=(16, 10 + signal_h + 1.2 + 8)) + gs = fig.add_gridspec(5, 1, height_ratios=[signal_h, 1.2, 3, 1, 1.2], hspace=0.35) + + # Panel 0: Signal table + ax0 = fig.add_subplot(gs[0]) + ax0.axis("off") + ax0.set_title(f"最新调仓信号 (信号日期: {last_date.strftime('%Y-%m-%d')},下一交易日执行)", + fontsize=14, fontweight="bold", loc="left", pad=15) + + col_labels = ["标的名称", "指数代码", "ETF代码", "仓位", "得分", "指数最新价", + "ETF收盘价", "溢价率", "操作", "持有天数", "盈亏"] + table_data = [] + for p in positions_info: + score_s = f"{p['score']:.2f}" if p['score'] is not None else "—" + idx_s = f"{p['idx_close']:.2f}" if p['idx_close'] is not None else "—" + etf_s = f"{p['etf_close']:.3f}" if p['etf_close'] is not None else "—" + prem_s = f"{p['premium']:+.2%}" if p['premium'] is not None else "—" + days_s = str(p['holding_days']) if p['holding_days'] > 0 else "—" + pnl_s = f"{p['pnl']:+.2%}" if p['pnl'] is not None else "—" + table_data.append([ + p['name'], p['code'], p['etf'], f"{p['weight']:.0%}", + score_s, idx_s, etf_s, prem_s, p['action'], days_s, pnl_s + ]) + for p in exit_positions: + idx_s = f"{p['idx_close']:.2f}" if p['idx_close'] is not None else "—" + etf_s = f"{p['etf_close']:.3f}" if p['etf_close'] is not None else "—" + prem_s = f"{p['premium']:+.2%}" if p['premium'] is not None else "—" + table_data.append([ + p['name'], p['code'], p['etf'], f"{p['weight']:.0%}", + "—", idx_s, etf_s, prem_s, "调出", "—", "—" + ]) + + if table_data: + tbl = ax0.table(cellText=table_data, colLabels=col_labels, loc="center", + cellLoc="center", bbox=[0, 0, 1, 1]) + tbl.auto_set_font_size(False) + tbl.set_fontsize(9) + tbl.scale(1, 1.8) + for j in range(len(col_labels)): + tbl[0, j].set_facecolor("#2C3E50") + tbl[0, j].set_text_props(color="white", fontweight="bold") + for i in range(len(table_data)): + action = table_data[i][8] + # Legacy color scheme: 调入=green, 维持=yellow, 调出=red + if action == "调入": + color = "#d4edda" # 浅绿色 + elif action == "调出": + color = "#f8d7da" # 浅红色 + else: + color = "#fff3cd" # 浅黄色(维持) + for j in range(len(col_labels)): + tbl[i + 1, j].set_facecolor(color) + + # Panel 1: Metrics table + ax1 = fig.add_subplot(gs[1]) + ax1.axis("off") + ax1.set_title("策略绩效对比", fontsize=14, fontweight="bold", loc="left", pad=10) + + start_s = df.index.min().strftime('%Y-%m-%d') + end_s = df.index.max().strftime('%Y-%m-%d') + perf_cols = ["策略", "开始时间", "结束时间", "累计收益", "年化收益", "最大回撤", "夏普比率", "Calmar比率", "日胜率"] + strat_row = ["轮动策略", start_s, end_s, + f"{s_total_return:.2%}", f"{s_annual:.2%}", f"{s_dd:.2%}", + f"{s_sharpe:.2f}", f"{s_calmar:.2f}", f"{s_win_rate:.2%}"] + bench_row = [f"基准({self.benchmark_name})", start_s, end_s, + f"{b_total_return:.2%}", f"{b_annual:.2%}", f"{b_dd:.2%}", + f"{b_sharpe:.2f}", "—", "—"] + ptbl = ax1.table(cellText=[strat_row, bench_row], colLabels=perf_cols, + loc="center", cellLoc="center", bbox=[0, 0, 1, 1]) + ptbl.auto_set_font_size(False) + ptbl.set_fontsize(10) + ptbl.scale(1, 1.8) + for j in range(len(perf_cols)): + ptbl[0, j].set_facecolor("#2C3E50") + ptbl[0, j].set_text_props(color="white", fontweight="bold") + ptbl[1, j].set_facecolor("#d4edda") + ptbl[2, j].set_facecolor("#cce5ff") + + # Panel 2: NAV curves + ax2 = fig.add_subplot(gs[2]) + ax2.plot(strategy_nav.index, strategy_nav.values, + label="轮动策略", linewidth=2, color="#E74C3C") + if benchmark_nav is not None: + ax2.plot(benchmark_nav.index, benchmark_nav.values, + label=self.benchmark_name, linewidth=1.5, color="#3498DB", alpha=0.8) + colors = plt.cm.tab20.colors + for i, code in enumerate(self.signal_codes): + if code in asset_navs: + cfg = code_config.get(code, {}) + lbl = cfg.get('name', code) if i < 10 else None + ax2.plot(asset_navs[code].index, asset_navs[code].values, + label=lbl, linewidth=0.8, alpha=0.4, + color=colors[i % len(colors)]) + ax2.set_title("ETF轮动策略 - 净值曲线", fontsize=16, fontweight="bold") + ax2.set_ylabel("净值") + ax2.legend(loc="upper left", fontsize=8, ncol=2) + ax2.grid(True, alpha=0.3) + ax2.set_yscale("log") + + # Panel 3: Drawdown + ax3 = fig.add_subplot(gs[3]) + drawdown = (strategy_nav - s_peak) / s_peak + ax3.fill_between(drawdown.index, drawdown.values, 0, alpha=0.5, color="#E74C3C") + ax3.set_title("策略回撤", fontsize=12) + ax3.set_ylabel("回撤") + ax3.grid(True, alpha=0.3) + + # Panel 4: Holdings distribution + ax4 = fig.add_subplot(gs[4]) + holdings_series = df['holdings'] + for i, code in enumerate(self.signal_codes): + cfg = code_config.get(code, {}) + name = cfg.get('name', code) + mask = holdings_series.apply(lambda h: code in h) + if mask.any(): + ax4.fill_between(mask.index, i, i + 0.8, + where=mask, alpha=0.7, + color=colors[i % len(colors)], label=name) + ylabels = [code_config.get(c, {}).get('name', c) for c in self.signal_codes] + ax4.set_title("每日持仓分布", fontsize=12) + ax4.set_yticks(range(len(ylabels))) + ax4.set_yticklabels(ylabels, fontsize=7) + ax4.grid(True, alpha=0.3) + + chart_path = output_dir / 'simple_rotation_report.png' + plt.savefig(str(chart_path), dpi=150, bbox_inches="tight") + plt.close() + print(f" + Report: {chart_path}") + # ============================================================ # Entry point @@ -914,3 +1252,4 @@ if __name__ == "__main__": if result: strategy.export_results() + strategy.generate_report()