530 lines
18 KiB
Python
530 lines
18 KiB
Python
"""
|
||
Profit Simulation - 重构版本
|
||
按照Python最佳实践重构,模块化设计,支持配置化
|
||
|
||
核心功能:
|
||
1. 从数据库获取投注订单数据
|
||
2. 计算投注收益、赔率、胜率等指标
|
||
3. 模拟资金变化和收益率分析
|
||
4. 生成可视化图表和分析报告
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
from datetime import datetime
|
||
from typing import List, Dict, Tuple, Optional
|
||
from dataclasses import dataclass
|
||
from abc import ABC, abstractmethod
|
||
|
||
import pandas as pd
|
||
import plotly.graph_objects as go
|
||
from loguru import logger
|
||
|
||
# 导入项目模块
|
||
from dao.Database import Database
|
||
from data_model import MysqlConfig, OddsJamOrder
|
||
|
||
|
||
@dataclass
|
||
class SimulationConfig:
|
||
"""模拟配置类 - 支持环境变量配置"""
|
||
|
||
# 数据库配置
|
||
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
|
||
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
|
||
|
||
# 模拟参数
|
||
initial_balance: float = float(os.getenv("INITIAL_BALANCE", "1000"))
|
||
market_width_min: float = float(os.getenv("MARKET_WIDTH_MIN", "20"))
|
||
market_width_max: float = float(os.getenv("MARKET_WIDTH_MAX", "25"))
|
||
|
||
# 文件路径
|
||
data_dir: str = os.getenv("DATA_DIR", "./data/bet_simulation/")
|
||
output_dir: str = os.getenv("OUTPUT_DIR", "./data/bet_simulation/")
|
||
|
||
# 缓存配置
|
||
enable_cache: bool = os.getenv("ENABLE_CACHE", "true").lower() == "true"
|
||
|
||
def __post_init__(self):
|
||
"""初始化后处理"""
|
||
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(self.data_dir, exist_ok=True)
|
||
os.makedirs(self.output_dir, exist_ok=True)
|
||
|
||
|
||
class DataService:
|
||
"""数据服务类 - 负责数据获取和缓存"""
|
||
|
||
def __init__(self, config: SimulationConfig):
|
||
self.config = config
|
||
self.dao = Database(config.mysql_config)
|
||
|
||
def get_oddsjam_order_data(self, load_from_cache: bool = None) -> pd.DataFrame:
|
||
"""获取OddsJam订单数据"""
|
||
if load_from_cache is None:
|
||
load_from_cache = self.config.enable_cache
|
||
|
||
current_date_str = datetime.now().strftime("%Y%m%d")
|
||
cache_file_path = os.path.join(
|
||
self.config.data_dir, f"oddsjam_order_data_{current_date_str}.feather"
|
||
)
|
||
|
||
# 尝试从缓存加载
|
||
if load_from_cache and os.path.exists(cache_file_path):
|
||
logger.info(f"从缓存加载数据: {cache_file_path}")
|
||
return pd.read_feather(cache_file_path)
|
||
|
||
# 从数据库获取数据
|
||
logger.info("从数据库获取订单数据")
|
||
select_query = f"SELECT * FROM bet.{self.config.table_name} where bet_status in ('won', 'lost')"
|
||
raw_data_list = self.dao.fetchall(query=select_query)
|
||
|
||
if not raw_data_list:
|
||
logger.warning("未找到符合条件的订单数据")
|
||
return pd.DataFrame()
|
||
|
||
# 转换为DataFrame
|
||
order_data_list = [OddsJamOrder(**data).model_dump() for data in raw_data_list]
|
||
order_df = pd.DataFrame(order_data_list)
|
||
|
||
# 保存到缓存
|
||
if self.config.enable_cache:
|
||
order_df.to_feather(cache_file_path)
|
||
logger.info(f"数据已缓存到: {cache_file_path}")
|
||
|
||
return order_df
|
||
|
||
def filter_data_by_market_width(self, data_df: pd.DataFrame) -> pd.DataFrame:
|
||
"""根据市场宽度过滤数据"""
|
||
original_count = len(data_df)
|
||
filtered_df = data_df[
|
||
(data_df["market_width"] >= self.config.market_width_min)
|
||
& (data_df["market_width"] <= self.config.market_width_max)
|
||
]
|
||
filtered_count = len(filtered_df)
|
||
|
||
logger.info(f"市场宽度过滤: {original_count} -> {filtered_count} 条记录")
|
||
return filtered_df
|
||
|
||
|
||
class ProfitCalculator:
|
||
"""收益计算服务类"""
|
||
|
||
@staticmethod
|
||
def calculate_benefit_by_order(order_info: Dict) -> float:
|
||
"""根据订单信息计算收益"""
|
||
home_or_away = order_info["home_or_away"]
|
||
price = order_info[f"{home_or_away}_price"] / 100
|
||
|
||
if order_info["outcome"] == -1:
|
||
return -1
|
||
|
||
if price >= 0:
|
||
return price
|
||
else:
|
||
return 1 / abs(price)
|
||
|
||
@staticmethod
|
||
def calculate_odds(row: pd.Series) -> float:
|
||
"""计算赔率"""
|
||
home_or_away = row["home_or_away"]
|
||
price = row[f"{home_or_away}_price"] / 100
|
||
|
||
if price >= 0:
|
||
return price
|
||
else:
|
||
return 1 / abs(price)
|
||
|
||
@staticmethod
|
||
def calculate_closing_balance(
|
||
day_benefit_list: List[float], pre_balance: float = 1000, pre_benefit: float = 0
|
||
) -> List[float]:
|
||
"""计算日末余额"""
|
||
closing_balance_list = []
|
||
for benefit in day_benefit_list:
|
||
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
|
||
closing_balance_list.append(closing_balance)
|
||
pre_balance = closing_balance
|
||
pre_benefit = benefit
|
||
return closing_balance_list
|
||
|
||
@staticmethod
|
||
def calculate_in_transit_funds_ratio(
|
||
daily_investment_list: List[float],
|
||
closing_balance_list: List[float],
|
||
start_closing_balance: float = 1000,
|
||
) -> List[float]:
|
||
"""计算在途资金比例"""
|
||
assert len(daily_investment_list) == len(closing_balance_list)
|
||
ratio_list = []
|
||
|
||
for i, daily_investment in enumerate(daily_investment_list):
|
||
if i == 0:
|
||
ratio = daily_investment / start_closing_balance
|
||
else:
|
||
ratio = daily_investment / closing_balance_list[i - 1]
|
||
ratio_list.append(ratio)
|
||
|
||
return ratio_list
|
||
|
||
|
||
class SimulationEngine:
|
||
"""模拟引擎类 - 负责收益模拟分析"""
|
||
|
||
def __init__(self, config: SimulationConfig):
|
||
self.config = config
|
||
self.calculator = ProfitCalculator()
|
||
|
||
def simulate_profit(
|
||
self, data_df: pd.DataFrame, init_balance: float = None
|
||
) -> Tuple[pd.DataFrame, float, float]:
|
||
"""执行收益模拟"""
|
||
if init_balance is None:
|
||
init_balance = self.config.initial_balance
|
||
|
||
logger.info(f"开始收益模拟,初始资金: {init_balance}")
|
||
|
||
# 按日期聚合数据
|
||
res_df = (
|
||
data_df.groupby("date")
|
||
.agg({"investment": "sum", "benefit": "sum"})
|
||
.reset_index()
|
||
)
|
||
res_df = res_df.rename(columns={"investment": "当日投入", "benefit": "日收益"})
|
||
|
||
# 计算收益率指标
|
||
res_df["日收益率"] = res_df["日收益"] / res_df["当日投入"]
|
||
res_df["累计收益"] = res_df["日收益"].cumsum()
|
||
res_df["累计投入"] = res_df["当日投入"].cumsum()
|
||
res_df["累计收益率"] = res_df["累计收益"] / res_df["累计投入"]
|
||
|
||
# 计算日末余额
|
||
day_benefit_list = res_df["日收益"].tolist()
|
||
closing_balance_list = self.calculator.calculate_closing_balance(
|
||
day_benefit_list=day_benefit_list, pre_balance=init_balance
|
||
)
|
||
res_df["日末余额(1.6天结算)"] = closing_balance_list
|
||
|
||
# 计算在途资金比例
|
||
daily_investment_list = res_df["当日投入"].tolist()
|
||
res_df["在途资金比例"] = self.calculator.calculate_in_transit_funds_ratio(
|
||
daily_investment_list=daily_investment_list,
|
||
closing_balance_list=closing_balance_list,
|
||
start_closing_balance=init_balance,
|
||
)
|
||
|
||
# 计算关键指标
|
||
annualized_sharpe_ratio = self._calculate_annualized_sharpe_ratio(
|
||
res_df, init_balance
|
||
)
|
||
roi = res_df["日收益"].sum() / res_df["当日投入"].sum()
|
||
|
||
logger.info(
|
||
f"模拟完成 - 年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
|
||
)
|
||
|
||
return res_df, annualized_sharpe_ratio, roi
|
||
|
||
def _calculate_annualized_sharpe_ratio(
|
||
self, res_df: pd.DataFrame, init_balance: float
|
||
) -> float:
|
||
"""计算年化夏普率"""
|
||
if res_df["日收益率"].std() == 0:
|
||
return 0
|
||
|
||
return (
|
||
res_df["日收益"].sum()
|
||
/ init_balance
|
||
/ res_df["日收益率"].std()
|
||
* ((365 / len(res_df)) ** 0.5)
|
||
)
|
||
|
||
def calculate_statistics(self, data_df: pd.DataFrame) -> Dict[str, float]:
|
||
"""计算统计指标"""
|
||
data_df["odds"] = data_df.apply(self.calculator.calculate_odds, axis=1)
|
||
|
||
total_mean_odds = data_df["odds"].mean()
|
||
won_rate = len(data_df[data_df["outcome"] == 1]) / len(data_df)
|
||
|
||
logger.info(f"统计指标 - 平均赔率: {total_mean_odds:.4f}, 胜率: {won_rate:.4f}")
|
||
|
||
return {
|
||
"total_mean_odds": total_mean_odds,
|
||
"won_rate": won_rate,
|
||
"total_bets": len(data_df),
|
||
}
|
||
|
||
|
||
class VisualizationService:
|
||
"""可视化服务类 - 负责图表生成"""
|
||
|
||
def __init__(self, config: SimulationConfig):
|
||
self.config = config
|
||
|
||
def plot_won_lost_mean_odds(
|
||
self, data_df: pd.DataFrame, output_path: str = None
|
||
) -> None:
|
||
"""绘制胜负数量和平均赔率图表"""
|
||
if output_path is None:
|
||
output_path = os.path.join(
|
||
self.config.output_dir, "won_lost_mean_odds.html"
|
||
)
|
||
|
||
logger.info("生成胜负数量和平均赔率图表")
|
||
|
||
data_df = data_df.sort_values(by="date")
|
||
date_x = data_df["date"].tolist()
|
||
|
||
fig = go.Figure()
|
||
|
||
# 添加胜负柱状图
|
||
cols = ["won", "lost"]
|
||
for col in cols:
|
||
if col in data_df.columns:
|
||
y_data = data_df[col].tolist()
|
||
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis="y1"))
|
||
|
||
# 添加平均赔率折线图
|
||
if "odds" in data_df.columns:
|
||
fig.add_trace(
|
||
go.Scatter(
|
||
x=data_df["date"],
|
||
y=data_df["odds"],
|
||
mode="markers+lines",
|
||
name="平均赔率",
|
||
yaxis="y2",
|
||
)
|
||
)
|
||
|
||
fig.update_layout(
|
||
barmode="group",
|
||
font=dict(family="Times New Roman"),
|
||
title="每天胜负数量以及平均赔率",
|
||
xaxis=dict(title="日期"),
|
||
yaxis=dict(title="数量"),
|
||
yaxis2=dict(title="赔率", overlaying="y", side="right"),
|
||
)
|
||
|
||
fig.write_html(output_path)
|
||
logger.info(f"图表已保存到: {output_path}")
|
||
|
||
def plot_profit_simulation(
|
||
self, data_df: pd.DataFrame, title: str = None, output_path: str = None
|
||
) -> None:
|
||
"""绘制收益模拟图表"""
|
||
if output_path is None:
|
||
output_path = os.path.join(self.config.output_dir, "profit_simulation.html")
|
||
|
||
if title is None:
|
||
title = "收益模拟"
|
||
|
||
logger.info("生成收益模拟图表")
|
||
|
||
fig = go.Figure()
|
||
|
||
# 添加日末余额柱状图
|
||
fig.add_trace(
|
||
go.Bar(
|
||
x=data_df["date"],
|
||
y=data_df["日末余额(1.6天结算)"],
|
||
name="日末余额",
|
||
yaxis="y1",
|
||
)
|
||
)
|
||
|
||
# 添加收益率折线图
|
||
for col in ["日收益率", "累计收益率", "在途资金比例"]:
|
||
if col in data_df.columns:
|
||
fig.add_trace(
|
||
go.Scatter(
|
||
x=data_df["date"],
|
||
y=data_df[col],
|
||
mode="markers+lines",
|
||
name=col,
|
||
yaxis="y2",
|
||
)
|
||
)
|
||
|
||
fig.update_layout(
|
||
title=title,
|
||
font=dict(family="Times New Roman"),
|
||
xaxis=dict(title="日期"),
|
||
yaxis=dict(title="金额"),
|
||
yaxis2=dict(title="收益率", overlaying="y", side="right", tickformat=".1%"),
|
||
)
|
||
|
||
fig.write_html(output_path)
|
||
logger.info(f"图表已保存到: {output_path}")
|
||
|
||
|
||
class ProfitAnalysisApp:
|
||
"""主应用类 - 协调各个服务完成分析"""
|
||
|
||
def __init__(self, config: SimulationConfig = None):
|
||
self.config = config or SimulationConfig()
|
||
self.data_service = DataService(self.config)
|
||
self.simulation_engine = SimulationEngine(self.config)
|
||
self.visualization_service = VisualizationService(self.config)
|
||
|
||
def run_analysis(self, load_from_cache: bool = None) -> Dict:
|
||
"""运行完整的收益分析"""
|
||
logger.info("开始收益分析")
|
||
|
||
# 获取数据
|
||
order_df = self.data_service.get_oddsjam_order_data(load_from_cache)
|
||
|
||
if order_df.empty:
|
||
logger.error("未获取到数据,分析终止")
|
||
return {}
|
||
|
||
# 数据预处理
|
||
order_df = order_df[~order_df["home_or_away"].isna()]
|
||
order_df["outcome"] = order_df["bet_status"].apply(
|
||
lambda x: 1 if x == "won" else -1
|
||
)
|
||
order_df["benefit"] = order_df.apply(
|
||
lambda row: ProfitCalculator.calculate_benefit_by_order(row.to_dict()),
|
||
axis=1,
|
||
)
|
||
order_df["date"] = order_df["start_timestamp"].apply(
|
||
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d")
|
||
)
|
||
|
||
# 过滤数据
|
||
data_df = self.data_service.filter_data_by_market_width(order_df)
|
||
|
||
if data_df.empty:
|
||
logger.error("过滤后无数据,分析终止")
|
||
return {}
|
||
|
||
# 设置投资金额
|
||
data_df["investment"] = 1
|
||
|
||
# 执行模拟
|
||
res_df, annualized_sharpe_ratio, roi = self.simulation_engine.simulate_profit(
|
||
data_df
|
||
)
|
||
|
||
# 计算统计指标
|
||
stats = self.simulation_engine.calculate_statistics(data_df)
|
||
|
||
# 合并数据
|
||
res_df = self._merge_additional_data(res_df, data_df)
|
||
|
||
# 保存结果
|
||
self._save_results(res_df, annualized_sharpe_ratio, roi, stats)
|
||
|
||
# 生成图表
|
||
self._generate_charts(res_df, annualized_sharpe_ratio, roi)
|
||
|
||
# 返回分析结果
|
||
result = {
|
||
"summary": {
|
||
"annualized_sharpe_ratio": annualized_sharpe_ratio,
|
||
"roi": roi,
|
||
"total_bets": stats["total_bets"],
|
||
"won_rate": stats["won_rate"],
|
||
"mean_odds": stats["total_mean_odds"],
|
||
},
|
||
"data": res_df,
|
||
}
|
||
|
||
logger.info("收益分析完成")
|
||
return result
|
||
|
||
def _merge_additional_data(
|
||
self, res_df: pd.DataFrame, data_df: pd.DataFrame
|
||
) -> pd.DataFrame:
|
||
"""合并额外数据"""
|
||
# 合并平均赔率
|
||
odds_df = data_df.groupby("date").agg({"odds": "mean"}).reset_index()
|
||
res_df = pd.merge(res_df, odds_df, on="date", how="left")
|
||
|
||
# 合并胜负统计
|
||
bet_status_df = pd.pivot_table(
|
||
data_df,
|
||
index=["date"],
|
||
columns=["bet_status"],
|
||
aggfunc="size",
|
||
fill_value=0,
|
||
).reset_index()
|
||
res_df = pd.merge(res_df, bet_status_df, on="date", how="left")
|
||
|
||
# 计算胜率
|
||
if "won" in res_df.columns and "lost" in res_df.columns:
|
||
res_df["won rate"] = res_df["won"] / (res_df["won"] + res_df["lost"])
|
||
|
||
# 合并市场宽度
|
||
if "market_width" in data_df.columns:
|
||
market_width_df = (
|
||
data_df.groupby("date").agg({"market_width": "mean"}).reset_index()
|
||
)
|
||
res_df = pd.merge(res_df, market_width_df, on="date", how="left")
|
||
|
||
return res_df
|
||
|
||
def _save_results(
|
||
self,
|
||
res_df: pd.DataFrame,
|
||
annualized_sharpe_ratio: float,
|
||
roi: float,
|
||
stats: Dict,
|
||
) -> None:
|
||
"""保存分析结果"""
|
||
output_path = os.path.join(self.config.output_dir, "profit_simulation.csv")
|
||
res_df.to_csv(output_path, index=False, encoding="utf-8-sig")
|
||
logger.info(f"分析结果已保存到: {output_path}")
|
||
|
||
# 保存摘要信息
|
||
summary = {
|
||
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"annualized_sharpe_ratio": annualized_sharpe_ratio,
|
||
"roi": roi,
|
||
"total_bets": stats["total_bets"],
|
||
"won_rate": stats["won_rate"],
|
||
"mean_odds": stats["total_mean_odds"],
|
||
"market_width_range": f"{self.config.market_width_min}-{self.config.market_width_max}",
|
||
"initial_balance": self.config.initial_balance,
|
||
}
|
||
|
||
summary_path = os.path.join(self.config.output_dir, "analysis_summary.json")
|
||
with open(summary_path, "w", encoding="utf-8") as f:
|
||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"分析摘要已保存到: {summary_path}")
|
||
|
||
def _generate_charts(
|
||
self, res_df: pd.DataFrame, annualized_sharpe_ratio: float, roi: float
|
||
) -> None:
|
||
"""生成图表"""
|
||
title = f"收益模拟,年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
|
||
|
||
self.visualization_service.plot_profit_simulation(data_df=res_df, title=title)
|
||
self.visualization_service.plot_won_lost_mean_odds(data_df=res_df)
|
||
|
||
|
||
def main():
|
||
"""主入口函数"""
|
||
# 创建配置
|
||
config = SimulationConfig()
|
||
|
||
# 创建应用实例
|
||
app = ProfitAnalysisApp(config)
|
||
|
||
# 运行分析
|
||
result = app.run_analysis()
|
||
|
||
if result:
|
||
summary = result["summary"]
|
||
print(f"分析完成!")
|
||
print(f"年化夏普率: {summary['annualized_sharpe_ratio']:.4f}")
|
||
print(f"ROI: {summary['roi']:.4f}")
|
||
print(f"总投注数: {summary['total_bets']}")
|
||
print(f"胜率: {summary['won_rate']:.4f}")
|
||
print(f"平均赔率: {summary['mean_odds']:.4f}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|