import numpy as np import pandas as pd from scipy.special import logit as sp_logit from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss def compute_metrics( df: pd.DataFrame, n_bins: int = 10, bin_strategy: str = "uniform", # 'uniform' or 'quantile' include_draws: bool = True, eps: float = 1e-6, ) -> dict: """ 计算预测评估指标并拟合校准关系。 参数: - df: 包含至少两列: 'win_prob' (预测主胜概率), 'res' (取 'won','refunded','lost') - n_bins: ECE 分箱数 - bin_strategy: 'uniform' (等宽) 或 'quantile' (等频) - include_draws: 若 True, 将 'draw' 视为非胜 (y=0)。若 False, 丢弃 'draw' 行。 - eps: 概率裁剪下限,用于数值稳定 返回: dict 包含 logloss, brier, ece, accuracy, reg_alpha, reg_beta, ece_bins, n_samples """ # 处理 refunded if include_draws: mask = df["res"].isin(["won", "refunded", "lost"]) else: mask = df["res"].isin(["won", "lost"]) df = df[mask].copy() # 标签: won=1, others=0 (包括 refunded) y = df["res"].map({"won": 1, "refunded": 0, "lost": 0}).astype(int).values p = df["win_prob"].astype(float).values # 裁剪概率以保证数值稳定 p_clip = np.clip(p, eps, 1 - eps) # logloss: 使用 sklearn 实现以获得更稳健的数值行为 try: logloss = float(log_loss(y, p_clip, labels=[0, 1])) except Exception: # 备用实现 logloss = float(-np.mean(y * np.log(p_clip) + (1 - y) * np.log(1 - p_clip))) # brier score brier = float(np.mean((p_clip - y) ** 2)) # ECE 计算(支持 uniform 或 quantile) if bin_strategy == "quantile": # quantile bin edges try: edges = np.unique(np.percentile(p_clip, np.linspace(0, 100, n_bins + 1))) if len(edges) - 1 <= 0: # fallback to uniform bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) else: # searchsorted to assign bins bin_idxs = np.clip( np.searchsorted(edges, p_clip, side="right") - 1, 0, len(edges) - 2 ) except Exception: bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) else: bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1) ece = 0.0 total = len(y) bin_stats = [] for b in range(n_bins): idx = bin_idxs == b count = int(idx.sum()) if count == 0: bin_stats.append( {"count": 0, "mean_pred": float("nan"), "emp_freq": float("nan")} ) continue mean_pred = float(p_clip[idx].mean()) emp_freq = float(y[idx].mean()) ece += abs(mean_pred - emp_freq) * count bin_stats.append({"count": count, "mean_pred": mean_pred, "emp_freq": emp_freq}) ece = float(ece / total) if total > 0 else float("nan") # accuracy acc = float(np.mean((p_clip >= 0.5) == (y == 1))) # 校准拟合: 使用 LogisticRegression 拟合 logit(E[y]) = alpha + beta * logit(p) X = sp_logit(p_clip).reshape(-1, 1) clf = LogisticRegression(C=1e6, solver="lbfgs", max_iter=200) clf.fit(X, y) alpha = float(clf.intercept_[0]) beta = float(clf.coef_[0][0]) return { "logloss": logloss, "brier": brier, "ece": ece, "accuracy": acc, "reg_alpha": alpha, "reg_beta": beta, # 'ece_bins': bin_stats, "n_samples": int(total), } if __name__ == "__main__": df = pd.read_feather("data/p_res.feather") df["win_prob"] = df["power_p"] res = compute_metrics(df) print(res)