fix(rotation): raise RuntimeError on held asset data failure

- Add data integrity check: if any currently held asset is missing
  from factors, raise RuntimeError immediately to prevent false rebalance
- Previously missing data would silently cause incorrect sell signals
- Now fails fast with clear error message identifying the missing assets
  and the date of failure
This commit is contained in:
2026-06-02 01:16:44 +08:00
parent 4791d3cf40
commit 07d6f1451c

View File

@@ -317,6 +317,11 @@ class SimpleRotationStrategy:
# Preloaded data
self.index_data: Dict[str, pd.DataFrame] = {}
self.etf_data: Dict[str, pd.DataFrame] = {}
self.benchmark_data: Optional[pd.DataFrame] = None
# Benchmark config
self.benchmark_code = self.config.benchmark.code
self.benchmark_name = self.config.benchmark.name
# Results
self.daily_records: List[dict] = []
@@ -351,6 +356,13 @@ class SimpleRotationStrategy:
self.data_cache.preload_premium(code, end_date=end_date)
print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded")
# Load benchmark
print(f"\n Loading benchmark ({self.benchmark_code})...")
bm_df = self.data_cache.preload(self.benchmark_code, preload_start, end_date, adj='raw')
if bm_df is not None:
self.benchmark_data = bm_df
print(f" Benchmark: {len(bm_df)} rows")
def _compute_momentum(self, signal_code: str, date: pd.Timestamp) -> Optional[float]:
"""Compute momentum for a single code on a given date"""
if signal_code not in self.index_data:
@@ -538,6 +550,17 @@ class SimpleRotationStrategy:
signal_date = date # First day: no prior trading day available
new_holdings, factors, bond_momentum = self._generate_signals(signal_date)
# Data integrity check: if any currently held asset is missing from
# today's factors, abort immediately to prevent false rebalancing.
if current_holdings:
missing = [c for c in current_holdings if c not in factors]
if missing:
raise RuntimeError(
f"Data failure: held assets {missing} missing from factors on "
f"{date.strftime('%Y-%m-%d')}. Aborting to prevent false rebalance."
)
is_rebalance = (sorted(new_holdings) != sorted(current_holdings)) and len(current_holdings) > 0
# Return uses T's ETF prices (open for buy/sell, close for hold)
@@ -724,6 +747,23 @@ class SimpleRotationStrategy:
return None
return round(float(val), 6)
def _get_latest_premium(self, trade_code: str, date: pd.Timestamp) -> Optional[float]:
"""Get premium for trade_code, looking back up to 5 days if exact date not found."""
group = self.trade_code_to_group.get(trade_code, '')
if group == 'BOND':
return None
premium_dict = self.data_cache.premium_data.get(trade_code)
if not premium_dict:
return None
# Try exact date first, then look back up to 5 calendar days
for offset in range(6):
check_date = date - timedelta(days=offset)
date_str = check_date.strftime('%Y-%m-%d')
val = premium_dict.get(date_str)
if val is not None:
return round(float(val), 6)
return None
def _build_day_assets(self, record: dict, date: pd.Timestamp,
entry_info: Dict[str, dict]) -> dict:
"""Build V2-compatible per-asset detail dict for one day."""
@@ -900,6 +940,304 @@ class SimpleRotationStrategy:
json.dump(metrics, f, ensure_ascii=False, indent=2)
print(f" + Metrics: {metrics_path}")
# ============================================================
# Report Generation (PNG chart with tables)
# ============================================================
def generate_report(self, output_dir: str = None):
"""Generate performance report chart (PNG) with signal table, metrics, NAV, drawdown, holdings"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
if not self.daily_records:
print(" x No results for report")
return
if output_dir is None:
output_dir = Path(__file__).parent / 'results'
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Build DataFrames
df = pd.DataFrame(self.daily_records)
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
strategy_nav = df['nav']
strategy_ret = df['daily_return']
# Compute benchmark NAV
benchmark_nav = None
if self.benchmark_data is not None:
bm_close = self.benchmark_data['close'].reindex(df.index, method='ffill')
if bm_close is not None and not bm_close.isna().all():
benchmark_nav = (1 + bm_close.pct_change()).cumprod()
first_valid = benchmark_nav.dropna().iloc[0] if len(benchmark_nav.dropna()) > 0 else 1
benchmark_nav = benchmark_nav / first_valid
# Compute individual asset NAVs
asset_navs = {}
for code in self.signal_codes:
if code in self.index_data:
price = self.index_data[code]['close'].reindex(df.index, method='ffill')
if price is not None and not price.isna().all():
nav_s = (1 + price.pct_change()).cumprod()
fv = nav_s.dropna().iloc[0] if len(nav_s.dropna()) > 0 else 1
asset_navs[code] = nav_s / fv
# Compute metrics
s_total_return = strategy_nav.iloc[-1] / strategy_nav.iloc[0] - 1
n_days = len(df)
s_annual = (1 + s_total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
s_sharpe = strategy_ret.mean() / strategy_ret.std() * np.sqrt(252) if strategy_ret.std() > 0 else 0
s_peak = strategy_nav.cummax()
s_dd = ((strategy_nav - s_peak) / s_peak).min()
s_calmar = s_annual / abs(s_dd) if s_dd != 0 else 0
non_zero = strategy_ret[strategy_ret != 0]
s_win_rate = (non_zero > 0).sum() / len(non_zero) if len(non_zero) > 0 else 0
# Benchmark metrics
b_total_return = b_annual = b_sharpe = b_dd = 0
if benchmark_nav is not None:
bm_ret = benchmark_nav.pct_change()
b_total_return = benchmark_nav.iloc[-1] - 1
b_annual = (1 + b_total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
b_sharpe = bm_ret.mean() / bm_ret.std() * np.sqrt(252) if bm_ret.std() > 0 else 0
b_peak = benchmark_nav.cummax()
b_dd = ((benchmark_nav - b_peak) / b_peak).min()
# Get latest holdings info
last_rec = self.daily_records[-1]
last_date = pd.Timestamp(last_rec['date'])
holdings = last_rec['holdings']
factors = last_rec.get('factors', {})
# Build code config dict for display
code_config = {}
for name, asset in self.config.asset_pools.assets.items():
code_config[asset.signal_source] = {
'name': asset.name if hasattr(asset, 'name') else name,
'etf': asset.trade_source,
'market': asset.group,
}
# Build positions info for table
# Sort holdings by momentum score descending
weight = 1.0 / self.select_num if self.select_num > 0 else 1.0
sorted_holdings = sorted(holdings, key=lambda c: factors.get(c, 0) or 0, reverse=True)
# Determine previous holdings to distinguish "调入" vs "维持"
last_idx = len(self.daily_records) - 1
prev_holdings = set()
if last_idx > 0:
prev_holdings = set(self.daily_records[last_idx - 1]['holdings'])
is_rebalance_day = last_rec.get('is_rebalance', False)
positions_info = []
for code in sorted_holdings:
cfg = code_config.get(code, {})
name = cfg.get('name', code)
etf_code = cfg.get('etf', '')
score = factors.get(code)
idx_close = self._get_index_close(code, last_date)
trade_code = self.signal_to_trade.get(code, code)
etf_close = self._get_etf_close(trade_code, last_date)
premium = self._get_latest_premium(trade_code, last_date)
# Determine action: 调入 (new on rebalance day) vs 维持 (already holding)
if is_rebalance_day and code not in prev_holdings:
action = '调入'
else:
action = '维持'
# Find entry info: scan backwards for continuous holding start
entry_date = None
entry_price = None
holding_days = 0
pnl = None
for rec in reversed(self.daily_records):
if code in rec['holdings']:
entry_date = pd.Timestamp(rec['date'])
p = self._get_etf_prices(trade_code, entry_date)
entry_price = p['open'] if p else None
else:
break
if entry_date is not None:
holding_days = (last_date - entry_date).days
if entry_price and entry_price > 0 and etf_close and etf_close > 0:
pnl = etf_close / entry_price - 1
positions_info.append({
'name': name, 'code': code, 'etf': etf_code,
'weight': weight, 'score': score,
'idx_close': idx_close, 'etf_close': etf_close,
'premium': premium, 'action': action,
'entry_date': entry_date, 'entry_price': entry_price,
'holding_days': holding_days, 'pnl': pnl,
})
# Find exit positions: ONLY show when the last day is a rebalance day
# If last day is NOT rebalance, don't show any "调出" info (already past)
exit_positions = []
if last_rec.get('is_rebalance', False):
# Find the previous day's holdings to compare
last_idx = len(self.daily_records) - 1
if last_idx > 0:
old_holdings = set(self.daily_records[last_idx - 1]['holdings'])
new_holdings = set(holdings)
removed = old_holdings - new_holdings
for code in sorted(removed):
cfg = code_config.get(code, {})
name = cfg.get('name', code)
etf_code = cfg.get('etf', '')
idx_close = self._get_index_close(code, last_date)
trade_code = self.signal_to_trade.get(code, code)
etf_close = self._get_etf_close(trade_code, last_date)
premium = self._get_latest_premium(trade_code, last_date)
exit_positions.append({
'name': name, 'code': code, 'etf': etf_code,
'weight': weight, 'score': None,
'idx_close': idx_close, 'etf_close': etf_close,
'premium': premium, 'action': '调出',
'entry_date': None, 'entry_price': None,
'holding_days': 0, 'pnl': None,
})
# ==================== Plot ====================
plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "WenQuanYi Zen Hei", "DejaVu Sans"]
plt.rcParams["axes.unicode_minus"] = False
n_rows = len(positions_info) + len(exit_positions)
signal_h = max(1.5, 0.5 + n_rows * 0.35)
fig = plt.figure(figsize=(16, 10 + signal_h + 1.2 + 8))
gs = fig.add_gridspec(5, 1, height_ratios=[signal_h, 1.2, 3, 1, 1.2], hspace=0.35)
# Panel 0: Signal table
ax0 = fig.add_subplot(gs[0])
ax0.axis("off")
ax0.set_title(f"最新调仓信号 (信号日期: {last_date.strftime('%Y-%m-%d')},下一交易日执行)",
fontsize=14, fontweight="bold", loc="left", pad=15)
col_labels = ["标的名称", "指数代码", "ETF代码", "仓位", "得分", "指数最新价",
"ETF收盘价", "溢价率", "操作", "持有天数", "盈亏"]
table_data = []
for p in positions_info:
score_s = f"{p['score']:.2f}" if p['score'] is not None else ""
idx_s = f"{p['idx_close']:.2f}" if p['idx_close'] is not None else ""
etf_s = f"{p['etf_close']:.3f}" if p['etf_close'] is not None else ""
prem_s = f"{p['premium']:+.2%}" if p['premium'] is not None else ""
days_s = str(p['holding_days']) if p['holding_days'] > 0 else ""
pnl_s = f"{p['pnl']:+.2%}" if p['pnl'] is not None else ""
table_data.append([
p['name'], p['code'], p['etf'], f"{p['weight']:.0%}",
score_s, idx_s, etf_s, prem_s, p['action'], days_s, pnl_s
])
for p in exit_positions:
idx_s = f"{p['idx_close']:.2f}" if p['idx_close'] is not None else ""
etf_s = f"{p['etf_close']:.3f}" if p['etf_close'] is not None else ""
prem_s = f"{p['premium']:+.2%}" if p['premium'] is not None else ""
table_data.append([
p['name'], p['code'], p['etf'], f"{p['weight']:.0%}",
"", idx_s, etf_s, prem_s, "调出", "", ""
])
if table_data:
tbl = ax0.table(cellText=table_data, colLabels=col_labels, loc="center",
cellLoc="center", bbox=[0, 0, 1, 1])
tbl.auto_set_font_size(False)
tbl.set_fontsize(9)
tbl.scale(1, 1.8)
for j in range(len(col_labels)):
tbl[0, j].set_facecolor("#2C3E50")
tbl[0, j].set_text_props(color="white", fontweight="bold")
for i in range(len(table_data)):
action = table_data[i][8]
# Legacy color scheme: 调入=green, 维持=yellow, 调出=red
if action == "调入":
color = "#d4edda" # 浅绿色
elif action == "调出":
color = "#f8d7da" # 浅红色
else:
color = "#fff3cd" # 浅黄色(维持)
for j in range(len(col_labels)):
tbl[i + 1, j].set_facecolor(color)
# Panel 1: Metrics table
ax1 = fig.add_subplot(gs[1])
ax1.axis("off")
ax1.set_title("策略绩效对比", fontsize=14, fontweight="bold", loc="left", pad=10)
start_s = df.index.min().strftime('%Y-%m-%d')
end_s = df.index.max().strftime('%Y-%m-%d')
perf_cols = ["策略", "开始时间", "结束时间", "累计收益", "年化收益", "最大回撤", "夏普比率", "Calmar比率", "日胜率"]
strat_row = ["轮动策略", start_s, end_s,
f"{s_total_return:.2%}", f"{s_annual:.2%}", f"{s_dd:.2%}",
f"{s_sharpe:.2f}", f"{s_calmar:.2f}", f"{s_win_rate:.2%}"]
bench_row = [f"基准({self.benchmark_name})", start_s, end_s,
f"{b_total_return:.2%}", f"{b_annual:.2%}", f"{b_dd:.2%}",
f"{b_sharpe:.2f}", "", ""]
ptbl = ax1.table(cellText=[strat_row, bench_row], colLabels=perf_cols,
loc="center", cellLoc="center", bbox=[0, 0, 1, 1])
ptbl.auto_set_font_size(False)
ptbl.set_fontsize(10)
ptbl.scale(1, 1.8)
for j in range(len(perf_cols)):
ptbl[0, j].set_facecolor("#2C3E50")
ptbl[0, j].set_text_props(color="white", fontweight="bold")
ptbl[1, j].set_facecolor("#d4edda")
ptbl[2, j].set_facecolor("#cce5ff")
# Panel 2: NAV curves
ax2 = fig.add_subplot(gs[2])
ax2.plot(strategy_nav.index, strategy_nav.values,
label="轮动策略", linewidth=2, color="#E74C3C")
if benchmark_nav is not None:
ax2.plot(benchmark_nav.index, benchmark_nav.values,
label=self.benchmark_name, linewidth=1.5, color="#3498DB", alpha=0.8)
colors = plt.cm.tab20.colors
for i, code in enumerate(self.signal_codes):
if code in asset_navs:
cfg = code_config.get(code, {})
lbl = cfg.get('name', code) if i < 10 else None
ax2.plot(asset_navs[code].index, asset_navs[code].values,
label=lbl, linewidth=0.8, alpha=0.4,
color=colors[i % len(colors)])
ax2.set_title("ETF轮动策略 - 净值曲线", fontsize=16, fontweight="bold")
ax2.set_ylabel("净值")
ax2.legend(loc="upper left", fontsize=8, ncol=2)
ax2.grid(True, alpha=0.3)
ax2.set_yscale("log")
# Panel 3: Drawdown
ax3 = fig.add_subplot(gs[3])
drawdown = (strategy_nav - s_peak) / s_peak
ax3.fill_between(drawdown.index, drawdown.values, 0, alpha=0.5, color="#E74C3C")
ax3.set_title("策略回撤", fontsize=12)
ax3.set_ylabel("回撤")
ax3.grid(True, alpha=0.3)
# Panel 4: Holdings distribution
ax4 = fig.add_subplot(gs[4])
holdings_series = df['holdings']
for i, code in enumerate(self.signal_codes):
cfg = code_config.get(code, {})
name = cfg.get('name', code)
mask = holdings_series.apply(lambda h: code in h)
if mask.any():
ax4.fill_between(mask.index, i, i + 0.8,
where=mask, alpha=0.7,
color=colors[i % len(colors)], label=name)
ylabels = [code_config.get(c, {}).get('name', c) for c in self.signal_codes]
ax4.set_title("每日持仓分布", fontsize=12)
ax4.set_yticks(range(len(ylabels)))
ax4.set_yticklabels(ylabels, fontsize=7)
ax4.grid(True, alpha=0.3)
chart_path = output_dir / 'simple_rotation_report.png'
plt.savefig(str(chart_path), dpi=150, bbox_inches="tight")
plt.close()
print(f" + Report: {chart_path}")
# ============================================================
# Entry point
@@ -914,3 +1252,4 @@ if __name__ == "__main__":
if result:
strategy.export_results()
strategy.generate_report()