""" 公共数据加载与工具函数 为各 Task 分析脚本提供统一的数据加载入口和常用统计/绘图函数。 """ import json import sys from pathlib import Path from typing import Dict, List, Tuple, Optional import numpy as np import pandas as pd # 路径常量 EXPERIMENT_DIR = Path(__file__).parent RESULTS_DIR = EXPERIMENT_DIR.parent / 'results' OUTPUT_DIR = EXPERIMENT_DIR / 'output' DETAIL_JSON = RESULTS_DIR / 'simple_rotation_detail.json' NAV_CSV = RESULTS_DIR / 'simple_rotation_nav.csv' SIGNALS_CSV = RESULTS_DIR / 'simple_rotation_signals.csv' METRICS_JSON = RESULTS_DIR / 'simple_rotation_metrics.json' # ============================================================ # 数据加载 # ============================================================ def load_nav() -> pd.DataFrame: """加载 NAV 曲线,返回 DataFrame(date, nav, daily_return)""" df = pd.read_csv(NAV_CSV, parse_dates=['date']) return df def load_signals() -> pd.DataFrame: """加载信号 CSV,返回 DataFrame(date, holdings, is_rebalance, added, removed)""" df = pd.read_csv(SIGNALS_CSV, parse_dates=['date']) return df def load_detail() -> dict: """加载 detail JSON(约 338K 行),返回 dict{meta, days}""" with open(DETAIL_JSON, 'r') as f: return json.load(f) def load_detail_days() -> List[dict]: """仅加载 detail JSON 的 days 部分""" data = load_detail() return data['days'] def load_detail_meta() -> dict: """仅加载 detail JSON 的 meta 部分""" data = load_detail() return data['meta'] def load_metrics() -> dict: """加载绩效指标 JSON""" with open(METRICS_JSON, 'r') as f: return json.load(f) def days_to_dataframe(days: List[dict]) -> pd.DataFrame: """将 detail JSON 的 days 列表转换为宽表 DataFrame。 列: date, nav, daily_return, is_rebalance, holdings, added, removed 以及每个资产 code 的 momentum_{code}, rank_{code}, threshold_{code}, ... """ rows = [] for day in days: row = { 'date': pd.Timestamp(day['date']), 'nav': day['nav'], 'daily_return': day['daily_return'], 'is_rebalance': day['is_rebalance'], 'holdings': day['holdings'], 'added': day.get('added', []), 'removed': day.get('removed', []), } for code, asset in day.get('assets', {}).items(): safe_code = code.replace('.', '_').replace('=', '_') row[f'momentum_{safe_code}'] = asset.get('momentum') row[f'rank_{safe_code}'] = asset.get('rank') row[f'threshold_{safe_code}'] = asset.get('threshold') row[f'above_threshold_{safe_code}'] = asset.get('above_threshold') row[f'premium_{safe_code}'] = asset.get('premium') row[f'is_held_{safe_code}'] = asset.get('is_held') row[f'index_return_{safe_code}'] = asset.get('index_return') row[f'etf_return_ctc_{safe_code}'] = asset.get('etf_return_ctc') row[f'holding_days_{safe_code}'] = asset.get('holding_days') row[f'cum_return_etf_{safe_code}'] = asset.get('cum_return_etf') rows.append(row) return pd.DataFrame(rows) def days_to_assets_long(days: List[dict]) -> pd.DataFrame: """将 detail JSON 转换为长表(每日每资产一行)。 列: date, code, momentum, rank, threshold, above_threshold, premium, is_held, index_return, etf_return_ctc, holding_days, cum_return_etf """ rows = [] for day in days: date = pd.Timestamp(day['date']) for code, asset in day.get('assets', {}).items(): rows.append({ 'date': date, 'code': code, 'momentum': asset.get('momentum'), 'rank': asset.get('rank'), 'threshold': asset.get('threshold'), 'above_threshold': asset.get('above_threshold'), 'premium': asset.get('premium'), 'is_held': asset.get('is_held'), 'index_return': asset.get('index_return'), 'etf_return_ctc': asset.get('etf_return_ctc'), 'holding_days': asset.get('holding_days'), 'cum_return_etf': asset.get('cum_return_etf'), 'cum_return_idx': asset.get('cum_return_idx'), }) return pd.DataFrame(rows) # ============================================================ # 统计工具 # ============================================================ def compute_drawdown(nav: pd.Series) -> pd.Series: """计算回撤序列""" peak = nav.cummax() return (nav - peak) / peak def compute_sharpe(returns: pd.Series, rf: float = 0.0) -> float: """计算年化夏普比率""" excess = returns - rf / 252 if excess.std() == 0: return 0.0 return float(excess.mean() / excess.std() * np.sqrt(252)) def compute_calmar(returns: pd.Series, nav: pd.Series) -> float: """计算 Calmar 比率""" n = len(returns) total = nav.iloc[-1] / nav.iloc[0] - 1 annual = (1 + total) ** (252 / n) - 1 dd = compute_drawdown(nav).min() if dd == 0: return 0.0 return float(annual / abs(dd)) def compute_annual_return(nav: pd.Series) -> float: """计算年化收益率""" n = len(nav) total = nav.iloc[-1] / nav.iloc[0] - 1 return (1 + total) ** (252 / n) - 1 def yearly_stats(nav_df: pd.DataFrame) -> pd.DataFrame: """按年份计算收益、回撤、夏普等统计""" nav_df = nav_df.copy() nav_df['year'] = nav_df['date'].dt.year rows = [] for year, grp in nav_df.groupby('year'): n = len(grp) total_ret = grp['nav'].iloc[-1] / grp['nav'].iloc[0] - 1 dd = compute_drawdown(grp['nav']).min() sharpe = compute_sharpe(grp['daily_return']) rows.append({ 'year': year, 'total_return': total_ret, 'max_drawdown': dd, 'sharpe': sharpe, 'n_days': n, }) return pd.DataFrame(rows) # ============================================================ # 输出工具 # ============================================================ def ensure_output_dir(): """确保输出目录存在""" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def print_section(title: str): """打印分隔符 + 标题""" print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def save_figure(fig, name: str): """保存 matplotlib 图表到 output 目录""" ensure_output_dir() path = OUTPUT_DIR / name fig.savefig(path, dpi=150, bbox_inches='tight') print(f" + 图表已保存: {path}") return path