Files
bet/ProfitSimulation.py
2025-10-26 13:13:30 +08:00

530 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Profit Simulation - 重构版本
按照Python最佳实践重构模块化设计支持配置化
核心功能:
1. 从数据库获取投注订单数据
2. 计算投注收益、赔率、胜率等指标
3. 模拟资金变化和收益率分析
4. 生成可视化图表和分析报告
"""
import os
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd
import plotly.graph_objects as go
from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder
@dataclass
class SimulationConfig:
"""模拟配置类 - 支持环境变量配置"""
# 数据库配置
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
# 模拟参数
initial_balance: float = float(os.getenv("INITIAL_BALANCE", "1000"))
market_width_min: float = float(os.getenv("MARKET_WIDTH_MIN", "20"))
market_width_max: float = float(os.getenv("MARKET_WIDTH_MAX", "25"))
# 文件路径
data_dir: str = os.getenv("DATA_DIR", "./data/bet_simulation/")
output_dir: str = os.getenv("OUTPUT_DIR", "./data/bet_simulation/")
# 缓存配置
enable_cache: bool = os.getenv("ENABLE_CACHE", "true").lower() == "true"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
# 确保目录存在
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
class DataService:
"""数据服务类 - 负责数据获取和缓存"""
def __init__(self, config: SimulationConfig):
self.config = config
self.dao = Database(config.mysql_config)
def get_oddsjam_order_data(self, load_from_cache: bool = None) -> pd.DataFrame:
"""获取OddsJam订单数据"""
if load_from_cache is None:
load_from_cache = self.config.enable_cache
current_date_str = datetime.now().strftime("%Y%m%d")
cache_file_path = os.path.join(
self.config.data_dir, f"oddsjam_order_data_{current_date_str}.feather"
)
# 尝试从缓存加载
if load_from_cache and os.path.exists(cache_file_path):
logger.info(f"从缓存加载数据: {cache_file_path}")
return pd.read_feather(cache_file_path)
# 从数据库获取数据
logger.info("从数据库获取订单数据")
select_query = f"SELECT * FROM bet.{self.config.table_name} where bet_status in ('won', 'lost')"
raw_data_list = self.dao.fetchall(query=select_query)
if not raw_data_list:
logger.warning("未找到符合条件的订单数据")
return pd.DataFrame()
# 转换为DataFrame
order_data_list = [OddsJamOrder(**data).model_dump() for data in raw_data_list]
order_df = pd.DataFrame(order_data_list)
# 保存到缓存
if self.config.enable_cache:
order_df.to_feather(cache_file_path)
logger.info(f"数据已缓存到: {cache_file_path}")
return order_df
def filter_data_by_market_width(self, data_df: pd.DataFrame) -> pd.DataFrame:
"""根据市场宽度过滤数据"""
original_count = len(data_df)
filtered_df = data_df[
(data_df["market_width"] >= self.config.market_width_min)
& (data_df["market_width"] <= self.config.market_width_max)
]
filtered_count = len(filtered_df)
logger.info(f"市场宽度过滤: {original_count} -> {filtered_count} 条记录")
return filtered_df
class ProfitCalculator:
"""收益计算服务类"""
@staticmethod
def calculate_benefit_by_order(order_info: Dict) -> float:
"""根据订单信息计算收益"""
home_or_away = order_info["home_or_away"]
price = order_info[f"{home_or_away}_price"] / 100
if order_info["outcome"] == -1:
return -1
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_odds(row: pd.Series) -> float:
"""计算赔率"""
home_or_away = row["home_or_away"]
price = row[f"{home_or_away}_price"] / 100
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_closing_balance(
day_benefit_list: List[float], pre_balance: float = 1000, pre_benefit: float = 0
) -> List[float]:
"""计算日末余额"""
closing_balance_list = []
for benefit in day_benefit_list:
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
closing_balance_list.append(closing_balance)
pre_balance = closing_balance
pre_benefit = benefit
return closing_balance_list
@staticmethod
def calculate_in_transit_funds_ratio(
daily_investment_list: List[float],
closing_balance_list: List[float],
start_closing_balance: float = 1000,
) -> List[float]:
"""计算在途资金比例"""
assert len(daily_investment_list) == len(closing_balance_list)
ratio_list = []
for i, daily_investment in enumerate(daily_investment_list):
if i == 0:
ratio = daily_investment / start_closing_balance
else:
ratio = daily_investment / closing_balance_list[i - 1]
ratio_list.append(ratio)
return ratio_list
class SimulationEngine:
"""模拟引擎类 - 负责收益模拟分析"""
def __init__(self, config: SimulationConfig):
self.config = config
self.calculator = ProfitCalculator()
def simulate_profit(
self, data_df: pd.DataFrame, init_balance: float = None
) -> Tuple[pd.DataFrame, float, float]:
"""执行收益模拟"""
if init_balance is None:
init_balance = self.config.initial_balance
logger.info(f"开始收益模拟,初始资金: {init_balance}")
# 按日期聚合数据
res_df = (
data_df.groupby("date")
.agg({"investment": "sum", "benefit": "sum"})
.reset_index()
)
res_df = res_df.rename(columns={"investment": "当日投入", "benefit": "日收益"})
# 计算收益率指标
res_df["日收益率"] = res_df["日收益"] / res_df["当日投入"]
res_df["累计收益"] = res_df["日收益"].cumsum()
res_df["累计投入"] = res_df["当日投入"].cumsum()
res_df["累计收益率"] = res_df["累计收益"] / res_df["累计投入"]
# 计算日末余额
day_benefit_list = res_df["日收益"].tolist()
closing_balance_list = self.calculator.calculate_closing_balance(
day_benefit_list=day_benefit_list, pre_balance=init_balance
)
res_df["日末余额(1.6天结算)"] = closing_balance_list
# 计算在途资金比例
daily_investment_list = res_df["当日投入"].tolist()
res_df["在途资金比例"] = self.calculator.calculate_in_transit_funds_ratio(
daily_investment_list=daily_investment_list,
closing_balance_list=closing_balance_list,
start_closing_balance=init_balance,
)
# 计算关键指标
annualized_sharpe_ratio = self._calculate_annualized_sharpe_ratio(
res_df, init_balance
)
roi = res_df["日收益"].sum() / res_df["当日投入"].sum()
logger.info(
f"模拟完成 - 年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
)
return res_df, annualized_sharpe_ratio, roi
def _calculate_annualized_sharpe_ratio(
self, res_df: pd.DataFrame, init_balance: float
) -> float:
"""计算年化夏普率"""
if res_df["日收益率"].std() == 0:
return 0
return (
res_df["日收益"].sum()
/ init_balance
/ res_df["日收益率"].std()
* ((365 / len(res_df)) ** 0.5)
)
def calculate_statistics(self, data_df: pd.DataFrame) -> Dict[str, float]:
"""计算统计指标"""
data_df["odds"] = data_df.apply(self.calculator.calculate_odds, axis=1)
total_mean_odds = data_df["odds"].mean()
won_rate = len(data_df[data_df["outcome"] == 1]) / len(data_df)
logger.info(f"统计指标 - 平均赔率: {total_mean_odds:.4f}, 胜率: {won_rate:.4f}")
return {
"total_mean_odds": total_mean_odds,
"won_rate": won_rate,
"total_bets": len(data_df),
}
class VisualizationService:
"""可视化服务类 - 负责图表生成"""
def __init__(self, config: SimulationConfig):
self.config = config
def plot_won_lost_mean_odds(
self, data_df: pd.DataFrame, output_path: str = None
) -> None:
"""绘制胜负数量和平均赔率图表"""
if output_path is None:
output_path = os.path.join(
self.config.output_dir, "won_lost_mean_odds.html"
)
logger.info("生成胜负数量和平均赔率图表")
data_df = data_df.sort_values(by="date")
date_x = data_df["date"].tolist()
fig = go.Figure()
# 添加胜负柱状图
cols = ["won", "lost"]
for col in cols:
if col in data_df.columns:
y_data = data_df[col].tolist()
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis="y1"))
# 添加平均赔率折线图
if "odds" in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df["odds"],
mode="markers+lines",
name="平均赔率",
yaxis="y2",
)
)
fig.update_layout(
barmode="group",
font=dict(family="Times New Roman"),
title="每天胜负数量以及平均赔率",
xaxis=dict(title="日期"),
yaxis=dict(title="数量"),
yaxis2=dict(title="赔率", overlaying="y", side="right"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
def plot_profit_simulation(
self, data_df: pd.DataFrame, title: str = None, output_path: str = None
) -> None:
"""绘制收益模拟图表"""
if output_path is None:
output_path = os.path.join(self.config.output_dir, "profit_simulation.html")
if title is None:
title = "收益模拟"
logger.info("生成收益模拟图表")
fig = go.Figure()
# 添加日末余额柱状图
fig.add_trace(
go.Bar(
x=data_df["date"],
y=data_df["日末余额(1.6天结算)"],
name="日末余额",
yaxis="y1",
)
)
# 添加收益率折线图
for col in ["日收益率", "累计收益率", "在途资金比例"]:
if col in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df[col],
mode="markers+lines",
name=col,
yaxis="y2",
)
)
fig.update_layout(
title=title,
font=dict(family="Times New Roman"),
xaxis=dict(title="日期"),
yaxis=dict(title="金额"),
yaxis2=dict(title="收益率", overlaying="y", side="right", tickformat=".1%"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
class ProfitAnalysisApp:
"""主应用类 - 协调各个服务完成分析"""
def __init__(self, config: SimulationConfig = None):
self.config = config or SimulationConfig()
self.data_service = DataService(self.config)
self.simulation_engine = SimulationEngine(self.config)
self.visualization_service = VisualizationService(self.config)
def run_analysis(self, load_from_cache: bool = None) -> Dict:
"""运行完整的收益分析"""
logger.info("开始收益分析")
# 获取数据
order_df = self.data_service.get_oddsjam_order_data(load_from_cache)
if order_df.empty:
logger.error("未获取到数据,分析终止")
return {}
# 数据预处理
order_df = order_df[~order_df["home_or_away"].isna()]
order_df["outcome"] = order_df["bet_status"].apply(
lambda x: 1 if x == "won" else -1
)
order_df["benefit"] = order_df.apply(
lambda row: ProfitCalculator.calculate_benefit_by_order(row.to_dict()),
axis=1,
)
order_df["date"] = order_df["start_timestamp"].apply(
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d")
)
# 过滤数据
data_df = self.data_service.filter_data_by_market_width(order_df)
if data_df.empty:
logger.error("过滤后无数据,分析终止")
return {}
# 设置投资金额
data_df["investment"] = 1
# 执行模拟
res_df, annualized_sharpe_ratio, roi = self.simulation_engine.simulate_profit(
data_df
)
# 计算统计指标
stats = self.simulation_engine.calculate_statistics(data_df)
# 合并数据
res_df = self._merge_additional_data(res_df, data_df)
# 保存结果
self._save_results(res_df, annualized_sharpe_ratio, roi, stats)
# 生成图表
self._generate_charts(res_df, annualized_sharpe_ratio, roi)
# 返回分析结果
result = {
"summary": {
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
},
"data": res_df,
}
logger.info("收益分析完成")
return result
def _merge_additional_data(
self, res_df: pd.DataFrame, data_df: pd.DataFrame
) -> pd.DataFrame:
"""合并额外数据"""
# 合并平均赔率
odds_df = data_df.groupby("date").agg({"odds": "mean"}).reset_index()
res_df = pd.merge(res_df, odds_df, on="date", how="left")
# 合并胜负统计
bet_status_df = pd.pivot_table(
data_df,
index=["date"],
columns=["bet_status"],
aggfunc="size",
fill_value=0,
).reset_index()
res_df = pd.merge(res_df, bet_status_df, on="date", how="left")
# 计算胜率
if "won" in res_df.columns and "lost" in res_df.columns:
res_df["won rate"] = res_df["won"] / (res_df["won"] + res_df["lost"])
# 合并市场宽度
if "market_width" in data_df.columns:
market_width_df = (
data_df.groupby("date").agg({"market_width": "mean"}).reset_index()
)
res_df = pd.merge(res_df, market_width_df, on="date", how="left")
return res_df
def _save_results(
self,
res_df: pd.DataFrame,
annualized_sharpe_ratio: float,
roi: float,
stats: Dict,
) -> None:
"""保存分析结果"""
output_path = os.path.join(self.config.output_dir, "profit_simulation.csv")
res_df.to_csv(output_path, index=False, encoding="utf-8-sig")
logger.info(f"分析结果已保存到: {output_path}")
# 保存摘要信息
summary = {
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
"market_width_range": f"{self.config.market_width_min}-{self.config.market_width_max}",
"initial_balance": self.config.initial_balance,
}
summary_path = os.path.join(self.config.output_dir, "analysis_summary.json")
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"分析摘要已保存到: {summary_path}")
def _generate_charts(
self, res_df: pd.DataFrame, annualized_sharpe_ratio: float, roi: float
) -> None:
"""生成图表"""
title = f"收益模拟,年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
self.visualization_service.plot_profit_simulation(data_df=res_df, title=title)
self.visualization_service.plot_won_lost_mean_odds(data_df=res_df)
def main():
"""主入口函数"""
# 创建配置
config = SimulationConfig()
# 创建应用实例
app = ProfitAnalysisApp(config)
# 运行分析
result = app.run_analysis()
if result:
summary = result["summary"]
print(f"分析完成!")
print(f"年化夏普率: {summary['annualized_sharpe_ratio']:.4f}")
print(f"ROI: {summary['roi']:.4f}")
print(f"总投注数: {summary['total_bets']}")
print(f"胜率: {summary['won_rate']:.4f}")
print(f"平均赔率: {summary['mean_odds']:.4f}")
if __name__ == "__main__":
main()