import numpy as np import pandas as pd from scipy.special import logit as sp_logit from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss def compute_metrics(df: pd.DataFrame, n_bins: int = 10, bin_strategy: str = 'uniform', # 'uniform' or 'quantile' include_draws: bool = True, eps: float = 1e-6) -> dict: """ 计算预测评估指标并拟合校准关系。 参数: - df: 包含至少两列: 'win_prob' (预测主胜概率), 'res' (取 'won','refunded','lost') - n_bins: ECE 分箱数 - bin_strategy: 'uniform' (等宽) 或 'quantile' (等频) - include_draws: 若 True, 将 'draw' 视为非胜 (y=0)。若 False, 丢弃 'draw' 行。 - eps: 概率裁剪下限,用于数值稳定 返回: dict 包含 logloss, brier, ece, accuracy, reg_alpha, reg_beta, ece_bins, n_samples """ # 处理 refunded if include_draws: mask = df['res'].isin(['won', 'refunded', 'lost']) else: mask = df['res'].isin(['won', 'lost']) df = df[mask].copy() # 标签: won=1, others=0 (包括 refunded) y = df['res'].map({'won': 1, 'refunded': 0, 'lost': 0}).astype(int).values p = df['win_prob'].astype(float).values # 裁剪概率以保证数值稳定 p_clip = np.clip(p, eps, 1 - eps) # logloss: 使用 sklearn 实现以获得更稳健的数值行为 try: logloss = float(log_loss(y, p_clip, labels=[0, 1])) except Exception: # 备用实现 logloss = float(-np.mean(y * np.log(p_clip) + (1 - y) * np.log(1 - p_clip))) # brier score brier = float(np.mean((p_clip - y) ** 2)) # ECE 计算(支持 uniform 或 quantile) if bin_strategy == 'quantile': # quantile bin edges try: edges = np.unique(np.percentile(p_clip, np.linspace(0, 100, n_bins + 1))) if len(edges) - 1 <= 0: # fallback to uniform bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) else: # searchsorted to assign bins bin_idxs = np.clip(np.searchsorted(edges, p_clip, side='right') - 1, 0, len(edges) - 2) except Exception: bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) else: bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) ece = 0.0 total = len(y) bin_stats = [] for b in range(n_bins): idx = bin_idxs == b count = int(idx.sum()) if count == 0: bin_stats.append({'count': 0, 'mean_pred': float('nan'), 'emp_freq': float('nan')}) continue mean_pred = float(p_clip[idx].mean()) emp_freq = float(y[idx].mean()) ece += abs(mean_pred - emp_freq) * count bin_stats.append({'count': count, 'mean_pred': mean_pred, 'emp_freq': emp_freq}) ece = float(ece / total) if total > 0 else float('nan') # accuracy acc = float(np.mean((p_clip >= 0.5) == (y == 1))) # 校准拟合: 使用 LogisticRegression 拟合 logit(E[y]) = alpha + beta * logit(p) X = sp_logit(p_clip).reshape(-1, 1) clf = LogisticRegression(C=1e6, solver='lbfgs', max_iter=200) clf.fit(X, y) alpha = float(clf.intercept_[0]) beta = float(clf.coef_[0][0]) return { 'logloss': logloss, 'brier': brier, 'ece': ece, 'accuracy': acc, 'reg_alpha': alpha, 'reg_beta': beta, # 'ece_bins': bin_stats, 'n_samples': int(total) } if __name__ == '__main__': df = pd.read_feather("data/p_res.feather") df['win_prob'] = df['power_p'] res = compute_metrics(df) print(res)