Compare commits

...

15 Commits

Author SHA1 Message Date
be9d9a23d7 代码重构 2025-10-26 13:13:30 +08:00
c6f40b218e 重命名 2025-10-26 12:19:51 +08:00
428cdb1051 pull数据翻页最大页数不能+1; 2025-10-26 11:50:52 +08:00
da80381dd8 添加config str函数 2025-10-26 10:18:10 +08:00
aaf51c96ac 代码重构 2025-10-26 09:55:40 +08:00
9a5fcc8511 正则匹配元素文本没有加r; traceback.print_exc()改为traceback.format_exc() 2025-10-26 09:27:00 +08:00
d6bab518a9 移除main 2025-10-26 00:51:50 +08:00
982d0fda11 添加耗时统计装饰器工具 2025-10-26 00:51:22 +08:00
332284293f metrics函数移动到bet tools 2025-10-25 22:17:28 +08:00
22bede007e 计算brier score 2025-10-25 22:08:51 +08:00
4a8b1945e7 添加乘法no vig 2025-10-25 19:49:26 +08:00
79a94925cf 去掉中文字符 2025-10-25 19:41:17 +08:00
da9a846d01 变成package 2025-10-25 19:20:28 +08:00
cae26f8b88 添加 power no vig方法 2025-10-25 19:18:16 +08:00
eb057c236e 格式化代码 2025-10-25 16:39:27 +08:00
12 changed files with 1865 additions and 5375 deletions

66
BetMetices.py Normal file
View File

@@ -0,0 +1,66 @@
import pandas as pd
from common.bet_tools import compute_metrics
from common.bet_tools import calculate_no_vig_moneyline_power, moneyline_to_prob
from loguru import logger
from common.utils import timeit
def get_no_vig_prob(row) -> pd.Series:
odds = [row["first_price"], row["second_price"]]
no_vig_odds_power = calculate_no_vig_moneyline_power(odds)
novig_probs_power = [moneyline_to_prob(o) for o in no_vig_odds_power]
# 返回两个无水概率
return pd.Series(
{
"first_no_vig_prob": novig_probs_power[0],
"second_no_vig_prob": novig_probs_power[1],
}
)
@timeit
def calc_metrics(df: pd.DataFrame, cols: list) -> pd.DataFrame:
data_list = []
for cs in df[cols].drop_duplicates().values:
tmp_df = df[cols + ["win_prob", "res"]].copy()
for i, col in enumerate(cols):
tmp_df = tmp_df[tmp_df[col] == cs[i]]
# if len(tmp_df) < 10000:
# continue
res = compute_metrics(df=tmp_df, include_draws=False)
res["filter_cols"] = ",".join(cs)
data_list.append(res)
res_df = pd.DataFrame(data_list)
res_df["reg_alpha"] = abs(res_df["reg_alpha"])
res_df = res_df.sort_values(by=["brier", "logloss", "ece", "reg_alpha"])
return res_df
if __name__ == "__main__":
df = pd.read_csv(
"/Users/aszer/Documents/vscode/bet/data/pinnical_1xbet_all_api.csv",
encoding="utf-8-sig",
)
df = df[
[
"sportsbook",
"sport",
"league",
"fixture_id",
"game_id",
"market",
"first_price",
"second_price",
"market_width",
"result",
]
]
# 防止 SettingWithCopyWarning推荐使用 .loc 显式分配
df.loc[:, ["first_no_vig_prob", "second_no_vig_prob"]] = df[
["first_price", "second_price"]
].apply(get_no_vig_prob, axis=1)
df["win_prob"] = df["first_no_vig_prob"]
df["res"] = df["result"]
cols = ["sportsbook", "sport"]
res_df = calc_metrics(df, cols)
logger.info(f"\n{res_df}")

View File

@@ -1,423 +1,560 @@
import http """
import sys OddsJam Bet Tracker - 重构版本
按照Python最佳实践重构支持环境变量配置模块化设计
核心功能:
1. 定时从数据库拉取数据上传到OddsJam
2. 定时从OddsJam获取结果数据更新到数据库
"""
from email import message
import os import os
import sys
import re import re
import math import math
import time import time
import json import json
import datetime import datetime
import traceback import traceback
from typing import Optional, List, Dict, Tuple
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd import pandas as pd
from retry import retry from retry import retry
from common.utils import ensure_directory_exists
from dao.Database import Database
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
from loguru import logger from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsjamOrderStatus from data_model import MysqlConfig, OddsjamOrderStatus
from common.dingtalk import DingTalkBot from common.dingtalk import DingTalkBot
from common.utils import ensure_directory_exists
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
dingtalk = DingTalkBot(webhook, secret)
root_dir = os.path.dirname(os.path.abspath(__file__)) @dataclass
class AppConfig:
"""应用配置类 - 支持环境变量配置"""
config_file_path = "./config/mysql_config.json" # 邮箱配置
mysql_config = MysqlConfig.parse_file(config_file_path) email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me")
dao = Database(mysql_config)
# 代理配置
http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890")
# 数据库表名
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
# 定时任务配置
upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48")
pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00")
# 文件路径配置
root_dir: str = os.path.dirname(os.path.abspath(__file__))
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
# 钉钉配置
dingtalk_webhook: str = os.getenv(
"DINGTALK_WEBHOOK",
"https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4",
)
dingtalk_secret: str = os.getenv(
"DINGTALK_SECRET",
"SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e",
)
# 浏览器配置
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000"))
def __str__(self) -> str:
# 返回邮箱和数据表名的信息
return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret)
# 构建文件路径
self.login_state_path = os.path.join(
self.root_dir,
"data",
"bet_data",
"account_login_state",
f"{self.email_account}.json",
)
self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv")
self.bet_name_map_path = os.path.join(
self.root_dir, "data", "bet_data", "betname_map.json"
)
def query_by_create_time(ds: str): class DatabaseService:
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'" """数据库服务类 - 负责所有数据库操作"""
rows = dao.fetchall(query=sql)
if rows:
return [OddsjamOrderStatus(row) for row in rows]
else:
return []
@retry(tries=6) def __init__(self, config: AppConfig):
def update_bet_status(id: str, bet_status: str): self.config = config
sql = "update oddsjam_order_all set bet_status = %s where id = %s" self.dao = Database(config.mysql_config)
dao.execute(query=sql, args=(bet_status, id))
def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]:
"""根据日期查询订单数据"""
sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'"
rows = self.dao.fetchall(query=sql)
return [OddsjamOrderStatus(row) for row in rows] if rows else []
@retry(tries=6)
def update_bet_status(self, bet_id: str, bet_status: str) -> None:
"""更新投注状态"""
sql = f"update {self.config.table_name} set bet_status = %s where id = %s"
self.dao.execute(query=sql, args=(bet_status, bet_id))
class SyncOddsjamBetTracker: class OddsjamService:
def __init__( """OddsJam业务服务类 - 负责与OddsJam网站的交互"""
self,
login_state_path: str, def __init__(self, config: AppConfig):
intercept_response_res_save_path: str, self.config = config
headless: bool = False,
default_time_out: float = 6000000,
):
self.login_state_path = login_state_path
self.headless = headless
self.default_time_out = default_time_out
self.intercept_response_res_save_path = intercept_response_res_save_path
self.total_bet_cnt = 0 self.total_bet_cnt = 0
def login_oddsjam_cookies(self, p, headless=False) -> tuple: def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]:
# 获取HTTP_PROXY环境变量默认为None """获取浏览器上下文"""
import os logger.info(f"加载cookies: {self.config.login_state_path}")
http_proxy = os.environ.get("HTTP_PROXY", None) browser = playwright.chromium.launch(
if http_proxy is None:
http_proxy = "http://127.0.0.1:7890"
logger.info("加载cookies {}", self.login_state_path)
browser = p.chromium.launch(
args=["--start-maximized"], args=["--start-maximized"],
headless=headless, headless=self.config.headless,
proxy={ proxy=(
"server": http_proxy, {"server": self.config.http_proxy} if self.config.http_proxy else None
}, ),
) )
context = browser.new_context( context = browser.new_context(
storage_state=self.login_state_path, no_viewport=True storage_state=self.config.login_state_path, no_viewport=True
) )
page = context.new_page() page = context.new_page()
return page, browser return page, browser
def login_to_site(self): def login_to_site(self) -> None:
logger.info(f"login account: {self.email_account}") """登录到OddsJam网站"""
logger.info(f"登录账户: {self.config.email_account}")
with sync_playwright() as p: with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless) browser = p.chromium.launch(headless=self.config.headless)
page = browser.new_page() page = browser.new_page()
page.set_default_timeout(timeout=self.default_time_out) page.set_default_timeout(timeout=self.config.default_timeout)
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
page.goto(url) page.goto(url)
# page.get_by_label("Close Modal").click()
page.get_by_role("link", name="Login").click() page.get_by_role("link", name="Login").click()
expect(page.get_by_role("button", name="Sign in")).to_be_visible( expect(page.get_by_role("button", name="Sign in")).to_be_visible(
timeout=self.default_time_out timeout=self.config.default_timeout
) )
page.pause() page.pause()
time.sleep(10) time.sleep(10)
try: try:
browser.contexts[0].storage_state(path=self.login_state_path) browser.contexts[0].storage_state(path=self.config.login_state_path)
logger.info(f"登录成功: {page.url}")
except Exception as ex: except Exception as ex:
logger.error(f"保存登录状态失败: {ex}")
traceback.print_exc() traceback.print_exc()
page.screenshot(path="error.png") page.screenshot(path="error.png")
logger.info(page.url, "login_success")
browser.close() browser.close()
@retry(tries=6) @retry(tries=6)
def intercept_import_response(self, route, request): def upload_bets(self, bet_file_path: str) -> None:
if "oddsjam.com/api/backend/bets/import" in request.url: """上传投注数据到OddsJam"""
response = route.fetch(timeout=50000)
logger.info(response.status)
if response.status == 500:
route.fulfill(response=response, json={})
return
route.continue_()
@retry(tries=6)
def upload_new_bets(self, bet_file_path: str):
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p: with sync_playwright() as p:
page: Page page, browser = self._get_browser_context(p)
browser: BrowserContext page.set_default_timeout(timeout=self.config.default_timeout)
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
page.set_default_timeout(timeout=self.default_time_out)
def on_response(response): def on_response(response):
if "dromo-user-imports-production" in response.url: if "dromo-user-imports-production" in response.url:
print(response.status, response.request.failure) logger.info(f"上传响应状态: {response.status}")
if response.status == 400: if response.status == 400:
browser.close() browser.close()
raise Exception(response.request.failure) raise Exception(f"上传失败: {response.request.failure}")
elif "oddsjam.com/api/backend/bets/import" in response.url: elif "oddsjam.com/api/backend/bets/import" in response.url:
logger.info(response.status) logger.info(f"导入响应状态: {response.status}")
page.on("response", on_response) page.on("response", on_response)
page.goto(url) page.goto(url)
# page.pause()
page.get_by_role("button", name="Import Bets").nth(1).click()
time.sleep(5)
iframe_locator = page.frame_locator(
'iframe[title="Dromo Importer\\: Bets"]'
)
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
iframe_locator.get_by_role( # 执行上传流程
"button", name="Confirm selection and continue" self._execute_upload_flow(page, bet_file_path)
).click()
time.sleep(5)
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role("button", name="Continue").click()
iframe_locator.get_by_role("button", name="Finish").click()
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception as ex:
...
# print(ex)
iframe_locator.get_by_role("button", name="Yes").click()
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.default_time_out
)
# os.remove(bet_file_path)
# page.pause()
browser.close() browser.close()
def get_all_bet_status(self, ds: str = None): def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None:
if not ds: """执行上传流程"""
ds = datetime.datetime.now().strftime("%Y%m%d") page.get_by_role("button", name="Import Bets").nth(1).click()
day = datetime.datetime.strptime(ds, "%Y%m%d").day time.sleep(5)
logger.info(f"current date: {ds}")
logger.info(f"current day: {day}") iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]')
# 选择文件
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
# 确认选择
iframe_locator.get_by_role(
"button", name="Confirm selection and continue"
).click()
time.sleep(5)
# 确认匹配
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
# 继续
iframe_locator.get_by_role("button", name="Continue").click()
# 完成
iframe_locator.get_by_role("button", name="Finish").click()
# 处理可能的错误
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception:
pass
iframe_locator.get_by_role("button", name="Yes").click()
# 等待处理完成
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.config.default_timeout
)
def get_all_bet_status(self, date_str: str, status_save_path: str) -> None:
"""获取所有投注状态"""
logger.info(f"获取投注状态,日期: {date_str}")
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p: with sync_playwright() as p:
page: Page page, browser = self._get_browser_context(p)
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
def on_response(response): def on_response(response):
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
try: try:
# 确保响应已完成
if response.request.failure is None and response.status == 200: if response.request.failure is None and response.status == 200:
data = response.json() # 注意sync_api 中 .json() 是同步的 data = response.json()
self.total_bet_cnt = data["totalCount"] self.total_bet_cnt = data["totalCount"]
logger.info(f"total bet count: {self.total_bet_cnt}") logger.info(f"总投注数量: {self.total_bet_cnt}")
# 保存数据 # 保存响应数据
with open( with open(status_save_path, "a", encoding="utf-8") as f:
self.intercept_response_res_save_path,
"a",
encoding="utf-8",
) as f:
json.dump(data, f, ensure_ascii=False) json.dump(data, f, ensure_ascii=False)
f.write("\n") f.write("\n")
except Exception as e: except Exception as e:
logger.error(f"Error processing response: {e}") logger.error(f"处理响应数据失败: {e}")
# page.route("**", self.intercept_response)
page.on("response", on_response) page.on("response", on_response)
page.set_default_timeout(timeout=self.default_time_out) page.set_default_timeout(timeout=self.config.default_timeout)
page.goto(url=url) page.goto(url=url)
# 处理弹窗
try: try:
page.locator("#cello-widget-app").get_by_role("button").click( page.locator("#cello-widget-app").get_by_role("button").click(
timeout=6000 timeout=6000
) )
except: except Exception:
... pass
# page.locator(".mt-4 > div > .inline-flex").first.click() # 获取总页数并翻页
# time.sleep(5) self._navigate_all_pages(page)
# page.get_by_label("Clear").click()
# time.sleep(5)
# page.get_by_role("button", name="Date Range").click()
# time.sleep(5)
# page.get_by_text("Custom", exact=True).click()
# time.sleep(5)
# page.get_by_role("button", name=f"{day}").click()
# time.sleep(5)
page.get_by_role(
"button", name=re.compile("Show (\d+) Results", re.IGNORECASE)
).click()
inner_text = page.get_by_text( browser.close()
re.compile("Showing 1 to 50 of", re.IGNORECASE)
).inner_text() def _navigate_all_pages(self, page: Page) -> None:
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) """导航所有页面"""
total_bet_cnt = int(match[1]) inner_text = page.get_by_text(
total_page_no = math.ceil(total_bet_cnt / 50) re.compile("Showing 1 to 50 of", re.IGNORECASE)
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages") ).inner_text()
page.pause()
for page_no in range(2, total_page_no): match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
if not match:
raise ValueError("无法获取总结果数")
total_bet_cnt = int(match[1])
total_page_no = math.ceil(total_bet_cnt / 50)
logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no}")
# 翻页到最后一页
for page_no in range(2, total_page_no):
if page_no <= total_page_no:
expect(page.get_by_role("button", name="Next")).to_be_visible( expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out timeout=self.config.default_timeout
) )
page.wait_for_timeout(timeout=3000) page.wait_for_timeout(timeout=3000)
page.get_by_role("button", name="Next").click() page.get_by_role("button", name="Next").click()
logger.info(f"current page number: {page_no} / {total_page_no}") logger.info(f"当前页码: {page_no} / {total_page_no}")
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
expect(page.get_by_role("button", name="Next")).to_be_visible( class BetDataProcessor:
timeout=self.default_time_out """投注数据处理服务类"""
def __init__(self, config: AppConfig):
self.config = config
def prepare_bet_data_for_upload(self, date_str: str) -> str:
"""准备上传的投注数据"""
logger.info(f"准备投注数据,日期: {date_str}")
# 确保目录存在
ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True)
# 从数据库获取数据
db_service = DatabaseService(self.config)
data_list = db_service.query_orders_by_date(date_str)
if not data_list:
message = f"日期 {date_str} 没有找到投注数据"
logger.warning(message)
raise ValueError(message)
# 转换为DataFrame
bet_df = pd.DataFrame([d.to_dict() for d in data_list])
# 列名映射
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
# 处理Sportsbook字段
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
# 加载博彩公司名称映射
with open(self.config.bet_name_map_path, "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
) )
# page.pause()
browser.close()
# 选择需要的列
bet_df = bet_df[col_map.values()]
def get_tomorrow_over_bet_order_data(save_file_path: str, ds): # 格式化时间
logger.info(f"load bet data from db: {ds}") bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
data_list = query_by_create_time(ds=ds) lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime(
data_list = [d.to_dict() for d in data_list] "%Y/%m/%d %H:%M"
bet_df = pd.DataFrame(data_list) )
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
with open("./data/bet_data/betname_map.json", "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
) )
# bet_df = bet_df.explode("Sportsbook")
bet_df = bet_df[col_map.values()]
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
)
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
logger.info(bet_df.shape) # 保存到文件
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False)
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
info_message += f"bet order shape: {bet_df.shape}"
return info_message
logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}")
logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}")
logger.info(f"投注数据形状: {bet_df.shape}")
def update_db_order_status(status_file_path: str): return (
logger.info(status_file_path) f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n"
data_list = [] f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n"
with open(status_file_path, "r", encoding="utf-8") as f: f"投注数据形状: {bet_df.shape}"
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
for _ in range(3):
try:
update_bet_status(id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"update bet status error: {e}")
def pull_data_from_oddsjam_update(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"current date: {ds}")
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
pre_date = ds_date - datetime.timedelta(days=0)
pre_ds = pre_date.strftime("%Y%m%d")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
# oddsjam_bet_tracker.login_to_site()
oddsjam_bet_tracker.get_all_bet_status(ds)
intercept_response_res_save_path = (
oddsjam_bet_tracker.intercept_response_res_save_path
) )
update_db_order_status(status_file_path=intercept_response_res_save_path)
dingtalk.send_text(f"{ds}: 比赛状态更新完成") def process_status_data(self, status_file_path: str) -> None:
except Exception as e: """处理状态数据并更新数据库"""
error_info = traceback.print_exc() logger.info(f"处理状态数据: {status_file_path}")
logger.error(error_info)
dingtalk.send_text(error_info) data_list = []
with open(status_file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
# 处理错误状态
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
# 更新数据库
db_service = DatabaseService(self.config)
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(
f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}"
)
for _ in range(3):
try:
db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"更新投注状态失败: {e}")
def upload_new_bets_data2oddsjam(ds: str = None): class TaskScheduler:
try: """定时任务调度器"""
if ds is None:
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
# oddsjam_bet_tracker.login_to_site() def __init__(self, config: AppConfig):
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv") self.config = config
ensure_directory_exists(target_path=bet_file_path, is_file=True) self.oddsjam_service = OddsjamService(config)
data_info = get_tomorrow_over_bet_order_data( self.data_processor = BetDataProcessor(config)
save_file_path=bet_file_path, ds=ds
def upload_bets_task(self, date_str: Optional[str] = None) -> None:
"""上传 T-1 时间的投注数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = (
datetime.datetime.now() - datetime.timedelta(days=1)
).strftime("%Y%m%d")
logger.info(f"执行上传任务,日期: {date_str}")
# 准备数据
data_info = self.data_processor.prepare_bet_data_for_upload(date_str)
if not data_info:
logger.warning(f"日期 {date_str} 没有数据需要上传")
return
# 上传到OddsJam
self.oddsjam_service.upload_bets(self.config.bet_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"上传任务失败: {error_info}")
self.config.dingtalk.send_text(
f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}"
)
def pull_status_task(self, date_str: Optional[str] = None) -> None:
"""拉取状态数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"执行拉取任务,日期: {date_str}")
# 构建状态文件路径
status_file_path = os.path.join(
self.config.root_dir,
"data",
"bet_data",
"bet_status",
f"{self.config.email_account}_status_{date_str}.json",
)
ensure_directory_exists(target_path=status_file_path, is_file=True)
# 获取状态数据
self.oddsjam_service.get_all_bet_status(date_str, status_file_path)
# 处理状态数据
self.data_processor.process_status_data(status_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}: 比赛状态更新完成\n{config_base_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"拉取任务失败: {error_info}")
self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}")
class BetTrackerApplication:
"""主应用类"""
def __init__(self):
self.config = AppConfig()
self.scheduler = TaskScheduler(self.config)
def run_scheduled_tasks(self) -> None:
"""运行定时任务"""
import schedule
logger.info(f"上传定时任务: {self.config.upload_schedule}")
logger.info(f"拉取定时任务: {self.config.pull_schedule}")
# 设置定时任务
schedule.every().day.at(self.config.pull_schedule).do(
self.scheduler.pull_status_task
)
schedule.every().day.at(self.config.upload_schedule).do(
self.scheduler.upload_bets_task
) )
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
dingtalk.send_text(f"{ds}: \n {data_info}")
except Exception as e:
error_info = traceback.format_exc()
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(0.05)
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker: def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None:
email_account = "aszer27937@gmail.com" """运行手动任务"""
login_state_save_path = os.path.join( if task_type == "upload":
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json" self.scheduler.upload_bets_task(date_str)
) elif task_type == "pull":
ensure_directory_exists(target_path=login_state_save_path, is_file=True) self.scheduler.pull_status_task(date_str)
intercept_response_res_save_path = os.path.join( else:
root_dir, logger.error(f"未知的任务类型: {task_type}")
"data",
"bet_data",
"bet_status",
f"{email_account}_status_{ds}.json",
)
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
oddsjam_bet_tracker = SyncOddsjamBetTracker(
login_state_path=login_state_save_path,
intercept_response_res_save_path=intercept_response_res_save_path,
headless=True
)
return oddsjam_bet_tracker
def main(): def main():
ds = datetime.datetime.now() - datetime.timedelta(days=1) """主入口函数"""
ds = ds.strftime("%Y%m%d") app = BetTrackerApplication()
logger.info(f"current date: {ds}")
pull_data_from_oddsjam_update(ds=ds) # 检查命令行参数
# upload_new_bets_data2oddsjam(ds=ds) if len(sys.argv) > 1:
task_type = sys.argv[1]
date_str = sys.argv[2] if len(sys.argv) > 2 else None
app.run_manual_task(task_type, date_str)
else:
# 运行定时任务
app.run_scheduled_tasks()
if __name__ == "__main__": if __name__ == "__main__":
import schedule main()
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam) # app = BetTrackerApplication()
UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "10:00") # logger.info(app.config)
PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00") # app.run_manual_task(task_type="upload", date_str="20251026")
logger.info(f"upload schedule: {UPLOAD_SCHEDULE}")
logger.info(f"pull schedule: {PULL_SCHEDULE}")
schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update)
schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam)
while True:
schedule.run_pending()
time.sleep(0.05)
# main()

529
ProfitSimulation.py Normal file
View File

@@ -0,0 +1,529 @@
"""
Profit Simulation - 重构版本
按照Python最佳实践重构模块化设计支持配置化
核心功能:
1. 从数据库获取投注订单数据
2. 计算投注收益、赔率、胜率等指标
3. 模拟资金变化和收益率分析
4. 生成可视化图表和分析报告
"""
import os
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd
import plotly.graph_objects as go
from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder
@dataclass
class SimulationConfig:
"""模拟配置类 - 支持环境变量配置"""
# 数据库配置
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
# 模拟参数
initial_balance: float = float(os.getenv("INITIAL_BALANCE", "1000"))
market_width_min: float = float(os.getenv("MARKET_WIDTH_MIN", "20"))
market_width_max: float = float(os.getenv("MARKET_WIDTH_MAX", "25"))
# 文件路径
data_dir: str = os.getenv("DATA_DIR", "./data/bet_simulation/")
output_dir: str = os.getenv("OUTPUT_DIR", "./data/bet_simulation/")
# 缓存配置
enable_cache: bool = os.getenv("ENABLE_CACHE", "true").lower() == "true"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
# 确保目录存在
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
class DataService:
"""数据服务类 - 负责数据获取和缓存"""
def __init__(self, config: SimulationConfig):
self.config = config
self.dao = Database(config.mysql_config)
def get_oddsjam_order_data(self, load_from_cache: bool = None) -> pd.DataFrame:
"""获取OddsJam订单数据"""
if load_from_cache is None:
load_from_cache = self.config.enable_cache
current_date_str = datetime.now().strftime("%Y%m%d")
cache_file_path = os.path.join(
self.config.data_dir, f"oddsjam_order_data_{current_date_str}.feather"
)
# 尝试从缓存加载
if load_from_cache and os.path.exists(cache_file_path):
logger.info(f"从缓存加载数据: {cache_file_path}")
return pd.read_feather(cache_file_path)
# 从数据库获取数据
logger.info("从数据库获取订单数据")
select_query = f"SELECT * FROM bet.{self.config.table_name} where bet_status in ('won', 'lost')"
raw_data_list = self.dao.fetchall(query=select_query)
if not raw_data_list:
logger.warning("未找到符合条件的订单数据")
return pd.DataFrame()
# 转换为DataFrame
order_data_list = [OddsJamOrder(**data).model_dump() for data in raw_data_list]
order_df = pd.DataFrame(order_data_list)
# 保存到缓存
if self.config.enable_cache:
order_df.to_feather(cache_file_path)
logger.info(f"数据已缓存到: {cache_file_path}")
return order_df
def filter_data_by_market_width(self, data_df: pd.DataFrame) -> pd.DataFrame:
"""根据市场宽度过滤数据"""
original_count = len(data_df)
filtered_df = data_df[
(data_df["market_width"] >= self.config.market_width_min)
& (data_df["market_width"] <= self.config.market_width_max)
]
filtered_count = len(filtered_df)
logger.info(f"市场宽度过滤: {original_count} -> {filtered_count} 条记录")
return filtered_df
class ProfitCalculator:
"""收益计算服务类"""
@staticmethod
def calculate_benefit_by_order(order_info: Dict) -> float:
"""根据订单信息计算收益"""
home_or_away = order_info["home_or_away"]
price = order_info[f"{home_or_away}_price"] / 100
if order_info["outcome"] == -1:
return -1
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_odds(row: pd.Series) -> float:
"""计算赔率"""
home_or_away = row["home_or_away"]
price = row[f"{home_or_away}_price"] / 100
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_closing_balance(
day_benefit_list: List[float], pre_balance: float = 1000, pre_benefit: float = 0
) -> List[float]:
"""计算日末余额"""
closing_balance_list = []
for benefit in day_benefit_list:
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
closing_balance_list.append(closing_balance)
pre_balance = closing_balance
pre_benefit = benefit
return closing_balance_list
@staticmethod
def calculate_in_transit_funds_ratio(
daily_investment_list: List[float],
closing_balance_list: List[float],
start_closing_balance: float = 1000,
) -> List[float]:
"""计算在途资金比例"""
assert len(daily_investment_list) == len(closing_balance_list)
ratio_list = []
for i, daily_investment in enumerate(daily_investment_list):
if i == 0:
ratio = daily_investment / start_closing_balance
else:
ratio = daily_investment / closing_balance_list[i - 1]
ratio_list.append(ratio)
return ratio_list
class SimulationEngine:
"""模拟引擎类 - 负责收益模拟分析"""
def __init__(self, config: SimulationConfig):
self.config = config
self.calculator = ProfitCalculator()
def simulate_profit(
self, data_df: pd.DataFrame, init_balance: float = None
) -> Tuple[pd.DataFrame, float, float]:
"""执行收益模拟"""
if init_balance is None:
init_balance = self.config.initial_balance
logger.info(f"开始收益模拟,初始资金: {init_balance}")
# 按日期聚合数据
res_df = (
data_df.groupby("date")
.agg({"investment": "sum", "benefit": "sum"})
.reset_index()
)
res_df = res_df.rename(columns={"investment": "当日投入", "benefit": "日收益"})
# 计算收益率指标
res_df["日收益率"] = res_df["日收益"] / res_df["当日投入"]
res_df["累计收益"] = res_df["日收益"].cumsum()
res_df["累计投入"] = res_df["当日投入"].cumsum()
res_df["累计收益率"] = res_df["累计收益"] / res_df["累计投入"]
# 计算日末余额
day_benefit_list = res_df["日收益"].tolist()
closing_balance_list = self.calculator.calculate_closing_balance(
day_benefit_list=day_benefit_list, pre_balance=init_balance
)
res_df["日末余额(1.6天结算)"] = closing_balance_list
# 计算在途资金比例
daily_investment_list = res_df["当日投入"].tolist()
res_df["在途资金比例"] = self.calculator.calculate_in_transit_funds_ratio(
daily_investment_list=daily_investment_list,
closing_balance_list=closing_balance_list,
start_closing_balance=init_balance,
)
# 计算关键指标
annualized_sharpe_ratio = self._calculate_annualized_sharpe_ratio(
res_df, init_balance
)
roi = res_df["日收益"].sum() / res_df["当日投入"].sum()
logger.info(
f"模拟完成 - 年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
)
return res_df, annualized_sharpe_ratio, roi
def _calculate_annualized_sharpe_ratio(
self, res_df: pd.DataFrame, init_balance: float
) -> float:
"""计算年化夏普率"""
if res_df["日收益率"].std() == 0:
return 0
return (
res_df["日收益"].sum()
/ init_balance
/ res_df["日收益率"].std()
* ((365 / len(res_df)) ** 0.5)
)
def calculate_statistics(self, data_df: pd.DataFrame) -> Dict[str, float]:
"""计算统计指标"""
data_df["odds"] = data_df.apply(self.calculator.calculate_odds, axis=1)
total_mean_odds = data_df["odds"].mean()
won_rate = len(data_df[data_df["outcome"] == 1]) / len(data_df)
logger.info(f"统计指标 - 平均赔率: {total_mean_odds:.4f}, 胜率: {won_rate:.4f}")
return {
"total_mean_odds": total_mean_odds,
"won_rate": won_rate,
"total_bets": len(data_df),
}
class VisualizationService:
"""可视化服务类 - 负责图表生成"""
def __init__(self, config: SimulationConfig):
self.config = config
def plot_won_lost_mean_odds(
self, data_df: pd.DataFrame, output_path: str = None
) -> None:
"""绘制胜负数量和平均赔率图表"""
if output_path is None:
output_path = os.path.join(
self.config.output_dir, "won_lost_mean_odds.html"
)
logger.info("生成胜负数量和平均赔率图表")
data_df = data_df.sort_values(by="date")
date_x = data_df["date"].tolist()
fig = go.Figure()
# 添加胜负柱状图
cols = ["won", "lost"]
for col in cols:
if col in data_df.columns:
y_data = data_df[col].tolist()
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis="y1"))
# 添加平均赔率折线图
if "odds" in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df["odds"],
mode="markers+lines",
name="平均赔率",
yaxis="y2",
)
)
fig.update_layout(
barmode="group",
font=dict(family="Times New Roman"),
title="每天胜负数量以及平均赔率",
xaxis=dict(title="日期"),
yaxis=dict(title="数量"),
yaxis2=dict(title="赔率", overlaying="y", side="right"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
def plot_profit_simulation(
self, data_df: pd.DataFrame, title: str = None, output_path: str = None
) -> None:
"""绘制收益模拟图表"""
if output_path is None:
output_path = os.path.join(self.config.output_dir, "profit_simulation.html")
if title is None:
title = "收益模拟"
logger.info("生成收益模拟图表")
fig = go.Figure()
# 添加日末余额柱状图
fig.add_trace(
go.Bar(
x=data_df["date"],
y=data_df["日末余额(1.6天结算)"],
name="日末余额",
yaxis="y1",
)
)
# 添加收益率折线图
for col in ["日收益率", "累计收益率", "在途资金比例"]:
if col in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df[col],
mode="markers+lines",
name=col,
yaxis="y2",
)
)
fig.update_layout(
title=title,
font=dict(family="Times New Roman"),
xaxis=dict(title="日期"),
yaxis=dict(title="金额"),
yaxis2=dict(title="收益率", overlaying="y", side="right", tickformat=".1%"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
class ProfitAnalysisApp:
"""主应用类 - 协调各个服务完成分析"""
def __init__(self, config: SimulationConfig = None):
self.config = config or SimulationConfig()
self.data_service = DataService(self.config)
self.simulation_engine = SimulationEngine(self.config)
self.visualization_service = VisualizationService(self.config)
def run_analysis(self, load_from_cache: bool = None) -> Dict:
"""运行完整的收益分析"""
logger.info("开始收益分析")
# 获取数据
order_df = self.data_service.get_oddsjam_order_data(load_from_cache)
if order_df.empty:
logger.error("未获取到数据,分析终止")
return {}
# 数据预处理
order_df = order_df[~order_df["home_or_away"].isna()]
order_df["outcome"] = order_df["bet_status"].apply(
lambda x: 1 if x == "won" else -1
)
order_df["benefit"] = order_df.apply(
lambda row: ProfitCalculator.calculate_benefit_by_order(row.to_dict()),
axis=1,
)
order_df["date"] = order_df["start_timestamp"].apply(
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d")
)
# 过滤数据
data_df = self.data_service.filter_data_by_market_width(order_df)
if data_df.empty:
logger.error("过滤后无数据,分析终止")
return {}
# 设置投资金额
data_df["investment"] = 1
# 执行模拟
res_df, annualized_sharpe_ratio, roi = self.simulation_engine.simulate_profit(
data_df
)
# 计算统计指标
stats = self.simulation_engine.calculate_statistics(data_df)
# 合并数据
res_df = self._merge_additional_data(res_df, data_df)
# 保存结果
self._save_results(res_df, annualized_sharpe_ratio, roi, stats)
# 生成图表
self._generate_charts(res_df, annualized_sharpe_ratio, roi)
# 返回分析结果
result = {
"summary": {
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
},
"data": res_df,
}
logger.info("收益分析完成")
return result
def _merge_additional_data(
self, res_df: pd.DataFrame, data_df: pd.DataFrame
) -> pd.DataFrame:
"""合并额外数据"""
# 合并平均赔率
odds_df = data_df.groupby("date").agg({"odds": "mean"}).reset_index()
res_df = pd.merge(res_df, odds_df, on="date", how="left")
# 合并胜负统计
bet_status_df = pd.pivot_table(
data_df,
index=["date"],
columns=["bet_status"],
aggfunc="size",
fill_value=0,
).reset_index()
res_df = pd.merge(res_df, bet_status_df, on="date", how="left")
# 计算胜率
if "won" in res_df.columns and "lost" in res_df.columns:
res_df["won rate"] = res_df["won"] / (res_df["won"] + res_df["lost"])
# 合并市场宽度
if "market_width" in data_df.columns:
market_width_df = (
data_df.groupby("date").agg({"market_width": "mean"}).reset_index()
)
res_df = pd.merge(res_df, market_width_df, on="date", how="left")
return res_df
def _save_results(
self,
res_df: pd.DataFrame,
annualized_sharpe_ratio: float,
roi: float,
stats: Dict,
) -> None:
"""保存分析结果"""
output_path = os.path.join(self.config.output_dir, "profit_simulation.csv")
res_df.to_csv(output_path, index=False, encoding="utf-8-sig")
logger.info(f"分析结果已保存到: {output_path}")
# 保存摘要信息
summary = {
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
"market_width_range": f"{self.config.market_width_min}-{self.config.market_width_max}",
"initial_balance": self.config.initial_balance,
}
summary_path = os.path.join(self.config.output_dir, "analysis_summary.json")
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"分析摘要已保存到: {summary_path}")
def _generate_charts(
self, res_df: pd.DataFrame, annualized_sharpe_ratio: float, roi: float
) -> None:
"""生成图表"""
title = f"收益模拟,年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
self.visualization_service.plot_profit_simulation(data_df=res_df, title=title)
self.visualization_service.plot_won_lost_mean_odds(data_df=res_df)
def main():
"""主入口函数"""
# 创建配置
config = SimulationConfig()
# 创建应用实例
app = ProfitAnalysisApp(config)
# 运行分析
result = app.run_analysis()
if result:
summary = result["summary"]
print(f"分析完成!")
print(f"年化夏普率: {summary['annualized_sharpe_ratio']:.4f}")
print(f"ROI: {summary['roi']:.4f}")
print(f"总投注数: {summary['total_bets']}")
print(f"胜率: {summary['won_rate']:.4f}")
print(f"平均赔率: {summary['mean_odds']:.4f}")
if __name__ == "__main__":
main()

0
common/__init__.py Normal file
View File

View File

@@ -1,22 +1,281 @@
import math
import numpy as np
import pandas as pd
from scipy.optimize import fsolve # 导入 fsolve 函数用于数值求解
from scipy.special import logit as sp_logit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
def moneyline_to_prob(moneyline_odds: int) -> float:
"""将 Moneyline 赔率转换为隐含概率."""
if moneyline_odds == 0:
raise ValueError("Moneyline odds cannot be 0")
elif moneyline_odds > 0:
# 正赔率 +X -> 隐含概率 = 100 / (100 + X)
return 100 / (moneyline_odds + 100)
else: # moneyline_odds <= 0
# 负赔率 -X -> 隐含概率 = X / (X + 100)
return abs(moneyline_odds) / (abs(moneyline_odds) + 100)
def american_odds_to_probability(odds: int) -> float: def prob_to_moneyline(probability: float) -> int:
""" """将概率转换为 Moneyline 赔率 (四舍五入到最接近的整数)."""
根据美式赔率计算概率(以小数形式返回) if not 0 < probability < 1:
:param odds: 美式赔率(正数或负数) # 概率为 0 或 1 对应无限或 -100 的 Moneyline 赔率,这里简化处理,实际中极少遇到精确的 0 或 1
:return: 概率0~1之间的小数 if math.isclose(probability, 0):
""" return float("inf")
if odds > 0: if math.isclose(probability, 1):
probability = 100 / (odds + 100) return (
-100
) # 或者 raise ValueError("Probability must be between 0 and 1 (exclusive)")
raise ValueError("Probability must be between 0 and 1 (exclusive)")
if probability <= 0.5:
# 概率 <= 0.5 对应正 Moneyline 赔率 (Decimal >= 2.0)
# Decimal Odds = 1 / probability
# Moneyline = (Decimal Odds - 1) * 100
return round((1 / probability - 1) * 100, 2)
else: else:
probability = abs(odds) / (abs(odds) + 100) # 概率 > 0.5 对应负 Moneyline 赔率 (Decimal < 2.0)
return probability # Decimal Odds = 1 / probability
# Moneyline = -100 / (Decimal Odds - 1)
return round(-100 / (1 / probability - 1), 2)
def calculate_no_vig_moneyline_multir(moneyline_odds_list: list[int]) -> list[int]:
"""
通过乘法法(乘法归一法,Multiplicative Rescaling)对任意赔率组计算去除vig(取消庄家水钱)后的moneyline赔率。
具体步骤:
1. 将各moneyline赔率转换为隐含概率(带vig)。
2. 将所有隐含概率加总,得到带vig的总和sum_p,通常 >1。
3. 对每个概率除以总和,得到去vig的无水概率。
4. 将该去vig概率再换算回moneyline赔率。
示例:
输入: [+120, -150]
步骤:
implied_probs = [100/220, 150/250] = [0.4545, 0.6]
sum_p = 1.0545
novig_probs = [0.4545/1.0545, 0.6/1.0545]
回转moneyline
输出: 去vig后的moneyline列表
参数:
moneyline_odds_list (list[int]): 原始moneyline赔率列表
返回:
list[int]: 对应的去vig后moneyline赔率列表
"""
if not moneyline_odds_list:
return []
# 步骤1: 计算带vig的隐含概率
implied_probabilities = [moneyline_to_prob(odds) for odds in moneyline_odds_list]
# 步骤2: 计算总概率,理论上>1表示有vig
prob_total = sum(implied_probabilities)
# 步骤3: 每个概率除以总和,得到去vig的概率(归一化)
no_vig_probabilities = [prob / prob_total for prob in implied_probabilities]
# 步骤4: 概率转回moneyline赔率
no_vig_moneyline_odds = [
prob_to_moneyline(p_novig) for p_novig in no_vig_probabilities
]
return no_vig_moneyline_odds
def calculate_no_vig_moneyline_power(moneyline_odds_list: list[int]) -> list[int]:
"""
使用 Power Method (根据提供的文献描述) 计算无 vigorish 的 Moneyline 赔率。
该方法通过寻找 k 使得 sum(implied_prob^k) = 1 来调整概率。
参数:
moneyline_odds_list (list): 包含所有可能结果的 Moneyline 整数赔率列表 (例如, [+116, -156])。
返回:
list: 包含所有可能结果的无 vigorish Moneyline 整数赔率列表。
"""
if not moneyline_odds_list:
return []
# 1. 将 Moneyline 赔率转换为隐含概率 (pi)
implied_probabilities = [moneyline_to_prob(odds) for odds in moneyline_odds_list]
# 确保所有隐含概率都大于 0,否则无法进行幂运算或取对数 (数值求解时可能涉及)
if any(p <= 0 for p in implied_probabilities):
raise ValueError("All implied probabilities must be positive.")
total_implied_probability = sum(implied_probabilities)
# 如果总概率 <= 1,说明没有 vig 或 vig 极少,直接返回原始赔率
if total_implied_probability <= 1:
print(
"Warning: Input odds already have little or no vig. Returning original odds."
)
return moneyline_odds_list
# 2. 定义需要找到根的函数 f(k) = sum(pi^k) - 1
# 我们要找到 k 使得 sum(pi^k) = 1
# 由于 sum(pi) > 1 且 pi < 1, 我们需要 k > 1 才能让 pi^k < pi, 从而降低总和至 1。
def sum_pi_pow_k_minus_1(k):
# fsolve 传入的 k 是一个数组,我们需要取其第一个元素
k_val = k[0] if isinstance(k, (list, tuple)) else k
# 计算 sum(pi^k)
sum_val = sum(p**k_val for p in implied_probabilities)
return sum_val - 1 # 我们的目标是让这个函数等于 0
# 3. 寻找 k 使得 f(k) = 0
# 我们知道当 k=1 时,总和是 total_implied_probability (>1)。
# 当 k 增大时,sum(pi^k) 会减小。所以根 k 应该大于 1。
# 提供一个合理的初始猜测值给 fsolve,例如 1.1 或 1.5
initial_k_guess = [1.1] # fsolve 期望一个数组作为初始猜测
# 使用 fsolve 寻找 k
# fsolve 返回一个数组,即使只有一个解
k_solution = fsolve(sum_pi_pow_k_minus_1, initial_k_guess)
# 提取求解到的 k 值
k = k_solution[0]
# 4. 计算无 Vig 概率 pi_novig = pi^k
no_vig_probabilities = [p**k for p in implied_probabilities]
# 由于浮点数精度和数值求解的限制,最终的概率之和可能不严格等于 1。
# 虽然理论上由 k 的定义保证总和为 1,但实践中检查一下是有益的。
final_sum_check = sum(no_vig_probabilities)
if not math.isclose(final_sum_check, 1.0, abs_tol=1e-9):
print(
f"Warning: Final no-vig probabilities sum to {final_sum_check:.6f}, expected 1.0. Sum may need slight re-normalization."
)
# 理论上 Power Method 的定义保证了总和为 1,但如果因为数值误差偏离较多,
# 可以选择在这里进行最后的比例调整,但严格遵循方法定义是不需要的。
# 5. 将无 Vig 概率转换回 Moneyline 赔率
no_vig_moneyline_odds = [
prob_to_moneyline(p_novig) for p_novig in no_vig_probabilities
]
return no_vig_moneyline_odds
def compute_metrics(
df: pd.DataFrame,
n_bins: int = 10,
bin_strategy: str = "uniform", # 'uniform' or 'quantile'
include_draws: bool = True,
eps: float = 1e-6,
) -> dict:
"""
计算预测评估指标并拟合校准关系。
参数:
- df: 包含至少两列: 'win_prob' (预测主胜概率), 'res' (取 'won','refunded','lost')
- n_bins: ECE 分箱数
- bin_strategy: 'uniform' (等宽) 或 'quantile' (等频)
- include_draws: 若 True, 将 'draw' 视为非胜 (y=0)。若 False, 丢弃 'draw' 行。
- eps: 概率裁剪下限,用于数值稳定
返回:
dict 包含 logloss, brier, ece, accuracy, reg_alpha, reg_beta, ece_bins, n_samples
"""
# 处理 refunded
if include_draws:
mask = df["res"].isin(["won", "refunded", "lost"])
else:
mask = df["res"].isin(["won", "lost"])
df = df[mask].copy()
# 标签: won=1, others=0 (包括 refunded)
y = df["res"].map({"won": 1, "refunded": 0, "lost": 0}).astype(int).values
p = df["win_prob"].astype(float).values
# 裁剪概率以保证数值稳定
p_clip = np.clip(p, eps, 1 - eps)
# logloss: 使用 sklearn 实现以获得更稳健的数值行为
try:
logloss = float(log_loss(y, p_clip, labels=[0, 1]))
except Exception:
# 备用实现
logloss = float(-np.mean(y * np.log(p_clip) + (1 - y) * np.log(1 - p_clip)))
# brier score
brier = float(np.mean((p_clip - y) ** 2))
# ECE 计算 (支持 uniform 或 quantile)
if bin_strategy == "quantile":
# quantile bin edges
try:
edges = np.unique(np.percentile(p_clip, np.linspace(0, 100, n_bins + 1)))
if len(edges) - 1 <= 0:
# fallback to uniform
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
else:
# searchsorted to assign bins
bin_idxs = np.clip(
np.searchsorted(edges, p_clip, side="right") - 1, 0, len(edges) - 2
)
except Exception:
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
else:
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
ece = 0.0
total = len(y)
bin_stats = []
for b in range(n_bins):
idx = bin_idxs == b
count = int(idx.sum())
if count == 0:
bin_stats.append(
{"count": 0, "mean_pred": float("nan"), "emp_freq": float("nan")}
)
continue
mean_pred = float(p_clip[idx].mean())
emp_freq = float(y[idx].mean())
ece += abs(mean_pred - emp_freq) * count
bin_stats.append({"count": count, "mean_pred": mean_pred, "emp_freq": emp_freq})
ece = float(ece / total) if total > 0 else float("nan")
# accuracy
acc = float(np.mean((p_clip >= 0.5) == (y == 1)))
# 校准拟合: 使用 LogisticRegression 拟合 logit(E[y]) = alpha + beta * logit(p)
X = sp_logit(p_clip).reshape(-1, 1)
clf = LogisticRegression(C=1e6, solver="lbfgs", max_iter=200)
clf.fit(X, y)
alpha = float(clf.intercept_[0])
beta = float(clf.coef_[0][0])
return {
"n_samples": len(df),
"brier": brier,
"logloss": logloss,
"accuracy": acc,
"reg_alpha": alpha,
"reg_beta": beta,
"ece": ece,
# 'ece_bins': bin_stats,
}
# 示例 # 示例
if __name__ == "__main__": if __name__ == "__main__":
odds_list = [+150, -200, +300, -120] odds_list = [+150, -200, +300, -120]
for odds in odds_list: for odds in odds_list:
prob = american_odds_to_probability(odds) prob = moneyline_to_prob(odds)
print(f"赔率 {odds}: 概率 {prob:.4f}") print(f"赔率 {odds}: 概率 {prob:.4f}")
odds = [+116, -156]
# 计算无 Vig 赔率使用 Power Method
no_vig_odds_power = calculate_no_vig_moneyline_power(odds)
print(f"原始 Moneyline 赔率: {odds}")
print(f"无 Vig Moneyline 赔率 (Power Method): {no_vig_odds_power}")
# 可选: 验证无 vig 赔率对应的概率之和是否接近 1
if no_vig_odds_power:
novig_probs_power = [moneyline_to_prob(o) for o in no_vig_odds_power]
print(f"无 Vig 概率之和 (基于计算出的赔率): {sum(novig_probs_power):.6f}")

View File

@@ -97,8 +97,8 @@ class DingTalkBot:
if __name__ == "__main__": if __name__ == "__main__":
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546"
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token如果有否则留空 secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b"
# CTA 群机器人 # CTA 群机器人
# webhook = "https://oapi.dingtalk.com/robot/send?access_token=87c7abfcdd69b699c32da4e4f5981cd2ca6b0445474fc6ffb36f2ed0f6262fbb" # webhook = "https://oapi.dingtalk.com/robot/send?access_token=87c7abfcdd69b699c32da4e4f5981cd2ca6b0445474fc6ffb36f2ed0f6262fbb"

View File

@@ -1,4 +1,25 @@
import os import os
import time
from functools import wraps
from loguru import logger
def timeit(func):
"""
装饰器:计算函数执行时间
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logger.info(
f"Function {func.__name__} executed in {end_time - start_time:.4f} seconds"
)
return result
return wrapper
def ensure_directory_exists(target_path: str, is_file: bool = False): def ensure_directory_exists(target_path: str, is_file: bool = False):

1
config/__init__.py Normal file
View File

@@ -0,0 +1 @@
# 配置模块

59
main.py
View File

@@ -1,59 +0,0 @@
import pandas as pd
from typing import Generator
from pytz import timezone
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder, OddsjamBet
def generate_oddjams_bet(data: OddsJamOrder) -> Generator[OddsjamBet]:
# 根据 start_timestamp 是否小于当前时间两个小时判断要不要去看结果
# 怎么判断一个 oddjam order 是不是下单成功?
bet_name = data.home_bet_name
sport_books = data.home_sportsbooks
if data.away_price > data.away_no_vig_price:
bet_name = data.away_bet_name
sport_books = data.away_sportsbooks # 选 selected_sportsbook
# 时区转换, mysql 中的 game start date 是 utc 时间? 这个不需要
game_start_date = data.start_date
my_datetime_with_tz = game_start_date.replace(tzinfo=timezone('UTC'))
eastern = timezone('US/Eastern')
eastern_datetime = my_datetime_with_tz.astimezone(eastern)
game_start_date = eastern_datetime.strftime('%m/%d/%Y, %H:%M EDT')
for sport_book in sport_books:
yield OddsjamBet(
Sportsbook=sport_book,
BetName=bet_name,
MarketName=data.market,
Odds=110, # away price or home price
Stake=1,
EventName='', # home team vs away team 用这种格式, 需要带vs和两边的空格
Sport=data.sport,
League=data.league,
GameID=data.game_id,
GameStartDate=game_start_date,
BetType='Positive EV',
Notes=data.bet_id
)
if __name__ == '__main__':
config_file_path = 'config\mysql_config.json'
mysql_config = MysqlConfig.parse_file(config_file_path)
dao = Database(mysql_config)
# query
select_query = "SELECT * FROM bet.oddsjam_order limit 10;"
raw_data_list = dao.fetchall(query=select_query)
order_data_list = [OddsJamOrder(**data) for data in raw_data_list]
bet_list = []
for order in order_data_list:
bet_list.extend(list(generate_oddjams_bet(order)))
bet_list = [data.model_dump() for data in bet_list]
bet_df = pd.DataFrame(bet_list)
bet_df.to_csv('bet.csv', index=False, encoding='utf-8-sig')

View File

@@ -1,108 +0,0 @@
import numpy as np
import pandas as pd
from scipy.special import logit as sp_logit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
def compute_metrics(df: pd.DataFrame,
n_bins: int = 10,
bin_strategy: str = 'uniform', # 'uniform' or 'quantile'
include_draws: bool = True,
eps: float = 1e-6) -> dict:
"""
计算预测评估指标并拟合校准关系。
参数:
- df: 包含至少两列: 'win_prob' (预测主胜概率), 'res' (取 'won','refunded','lost')
- n_bins: ECE 分箱数
- bin_strategy: 'uniform' (等宽) 或 'quantile' (等频)
- include_draws: 若 True, 将 'draw' 视为非胜 (y=0)。若 False, 丢弃 'draw' 行。
- eps: 概率裁剪下限,用于数值稳定
返回:
dict 包含 logloss, brier, ece, accuracy, reg_alpha, reg_beta, ece_bins, n_samples
"""
# 处理 refunded
if include_draws:
mask = df['res'].isin(['won', 'refunded', 'lost'])
else:
mask = df['res'].isin(['won', 'lost'])
df = df[mask].copy()
# 标签: won=1, others=0 (包括 refunded)
y = df['res'].map({'won': 1, 'refunded': 0, 'lost': 0}).astype(int).values
p = df['win_prob'].astype(float).values
# 裁剪概率以保证数值稳定
p_clip = np.clip(p, eps, 1 - eps)
# logloss: 使用 sklearn 实现以获得更稳健的数值行为
try:
logloss = float(log_loss(y, p_clip, labels=[0, 1]))
except Exception:
# 备用实现
logloss = float(-np.mean(y * np.log(p_clip) + (1 - y) * np.log(1 - p_clip)))
# brier score
brier = float(np.mean((p_clip - y) ** 2))
# ECE 计算(支持 uniform 或 quantile
if bin_strategy == 'quantile':
# quantile bin edges
try:
edges = np.unique(np.percentile(p_clip, np.linspace(0, 100, n_bins + 1)))
if len(edges) - 1 <= 0:
# fallback to uniform
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
else:
# searchsorted to assign bins
bin_idxs = np.clip(np.searchsorted(edges, p_clip, side='right') - 1, 0, len(edges) - 2)
except Exception:
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
else:
bin_idxs = np.minimum((p_clip * n_bins).astype(int), n_bins - 1)
ece = 0.0
total = len(y)
bin_stats = []
for b in range(n_bins):
idx = bin_idxs == b
count = int(idx.sum())
if count == 0:
bin_stats.append({'count': 0, 'mean_pred': float('nan'), 'emp_freq': float('nan')})
continue
mean_pred = float(p_clip[idx].mean())
emp_freq = float(y[idx].mean())
ece += abs(mean_pred - emp_freq) * count
bin_stats.append({'count': count, 'mean_pred': mean_pred, 'emp_freq': emp_freq})
ece = float(ece / total) if total > 0 else float('nan')
# accuracy
acc = float(np.mean((p_clip >= 0.5) == (y == 1)))
# 校准拟合: 使用 LogisticRegression 拟合 logit(E[y]) = alpha + beta * logit(p)
X = sp_logit(p_clip).reshape(-1, 1)
clf = LogisticRegression(C=1e6, solver='lbfgs', max_iter=200)
clf.fit(X, y)
alpha = float(clf.intercept_[0])
beta = float(clf.coef_[0][0])
return {
'logloss': logloss,
'brier': brier,
'ece': ece,
'accuracy': acc,
'reg_alpha': alpha,
'reg_beta': beta,
# 'ece_bins': bin_stats,
'n_samples': int(total)
}
if __name__ == '__main__':
df = pd.read_feather("data/p_res.feather")
df['win_prob'] = df['power_p']
res = compute_metrics(df)
print(res)

View File

@@ -1,183 +0,0 @@
import pandas as pd
from datetime import datetime
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder, OddsjamBet
from typing import List
import json
import os
import plotly.graph_objects as go
def get_oddsjam_order_data_from_db(load_from_local: bool = False) -> pd.DataFrame:
current_date_str = datetime.now().strftime('%Y%m%d')
file_path = os.path.join('data', f'oddsjam_order_data_{current_date_str}.csv')
if load_from_local and os.path.exists(file_path):
return pd.read_csv(file_path, low_memory=False)
config_file_path = 'config\mysql_config.json'
mysql_config = MysqlConfig.parse_file(config_file_path)
dao = Database(mysql_config)
select_query = "SELECT * FROM bet.oddsjam_order where bet_status in ('won', 'lost');"
raw_data_list = dao.fetchall(query=select_query)
order_data_list = [OddsJamOrder(**data).model_dump()
for data in raw_data_list]
order_df = pd.DataFrame(order_data_list)
order_df.to_csv(file_path, index=False, encoding='utf-8-sig')
return order_df
def calc_benefit_by_order_info(order_info: dict) -> float:
home_or_away = order_info['home_or_away']
price = order_info[f'{home_or_away}_price'] / 100
if order_info['outcome'] == -1:
return -1
if price >= 0:
return price
else:
return 1 / abs(price)
def calc_odds(row):
home_or_away = row['home_or_away']
price = row[f'{home_or_away}_price'] / 100
if price >= 0:
return price
else:
return 1 / abs(price)
def clac_closing_balance(day_benefit_list: List, pre_balance: float = 1000, pre_benefit: float = 0) -> List:
closing_balance_list = []
for i, benefit in enumerate(day_benefit_list):
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
closing_balance_list.append(closing_balance)
pre_balance = closing_balance
pre_benefit = benefit
return closing_balance_list
def calc_in_transit_funds_ratio(daily_investment_list: List, closing_balance_list: List, start_closing_balance: float = 1000) -> List:
assert len(daily_investment_list) == len(closing_balance_list)
ratio_list = []
for i, daily_investment in enumerate(daily_investment_list):
if i == 0:
ratio = daily_investment / start_closing_balance
else:
ratio = daily_investment / closing_balance_list[i-1]
ratio_list.append(ratio)
return ratio_list
def simulate_profit(data_df: pd.DataFrame, init_balance=1000) -> pd.DataFrame:
res_df = data_df.groupby('date').agg({'investment': 'sum', 'benefit': 'sum'}).reset_index()
# res_df['日期'] = pd.to_datetime(res_df['date'])
res_df = res_df.rename(columns={'investment': '当日投入', 'benefit': '日收益'})
res_df['日收益率'] = res_df['日收益'] / res_df['当日投入']
res_df['累计收益'] = res_df['日收益'].cumsum()
res_df['累计投入'] = res_df['当日投入'].cumsum()
res_df['累计收益率'] = res_df['累计收益'] / res_df['累计投入']
day_benefit_list = res_df['日收益'].tolist()
closing_balance_list = clac_closing_balance(day_benefit_list=day_benefit_list,
pre_balance=init_balance)
res_df['日末余额(1.6天结算)'] = closing_balance_list
daily_investment_list = res_df['当日投入'].tolist()
res_df['在途资金比例'] = calc_in_transit_funds_ratio(daily_investment_list=daily_investment_list,
closing_balance_list=closing_balance_list,
start_closing_balance=init_balance)
annualized_sharpe_ratio = res_df['日收益'].sum() / init_balance / res_df['日收益率'].std() * ((365 / len(res_df))**0.5)
roi = res_df['日收益'].sum() / res_df['当日投入'].sum()
return res_df, annualized_sharpe_ratio, roi
def plot_won_lost_mean_odds(data_df: pd.DataFrame):
data_df = data_df.sort_values(by='date')
date_x = data_df['date'].tolist()
fig = go.Figure()
cols = ['won', 'lost']
for col in cols:
y_data = data_df[col].tolist()
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis='y1'))
fig.add_trace(go.Scatter(x=data_df['date'], y=data_df['odds'], mode='markers+lines', name='平均赔率', yaxis='y2'))
fig.update_layout(
barmode='group',
font=dict(family="Times New Roman"),
title='每天胜负数量以及平均赔率',
xaxis=dict(title='日期'),
yaxis=dict(title='数量'),
yaxis2=dict(title='赔率', overlaying='y', side='right'),
)
fig.write_html('data/won_lost_mean_odds.html')
def plot_profit_simulation(data_df: pd.DataFrame, title: str = None):
fig = go.Figure()
fig.add_trace(go.Bar(x=data_df['date'], y=data_df['日末余额(1.6天结算)'], name='日末余额', yaxis='y1'))
for col in ['日收益率', '累计收益率', '在途资金比例']:
fig.add_trace(go.Scatter(
x=data_df['date'],
y=data_df[col],
mode='markers+lines',
name=col,
yaxis='y2'))
if title is None:
title = '收益模拟'
fig.update_layout(
title=title,
font=dict(family="Times New Roman"),
xaxis=dict(title='日期'),
yaxis=dict(title='金额'),
yaxis2=dict(title='收益率', overlaying='y', side='right', tickformat='.1%'))
fig.write_html('data/profit_simulation.html')
if __name__ == '__main__':
order_df = get_oddsjam_order_data_from_db(load_from_local=True)
# order_df = pd.read_excel('data/PEV 3.11-10.26.xlsx', sheet_name='原始数据')
order_df['outcome'] = order_df['bet_status'].apply(lambda x: 1 if x == 'won' else -1)
order_df['benefit'] = order_df.apply(lambda row: calc_benefit_by_order_info(row.to_dict()), axis=1)
order_df['date'] = order_df['start_timestamp'].apply(
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d"))
data_df = order_df.copy()
data_df = data_df[data_df['market_width'] <= 25]
data_df = data_df[data_df['market_width'] >= 20]
market_width_df = data_df.groupby('date').agg({'market_width': 'mean'}).reset_index()
data_df['investment'] = 1
res_df, annualized_sharpe_ratio, roi = simulate_profit(data_df)
print(f'年化夏普率: {annualized_sharpe_ratio}')
print(f'ROI: {roi}')
data_df['odds'] = data_df.apply(calc_odds, axis=1)
total_mean_odds = data_df['odds'].mean()
print(f'{len(data_df)} 场比赛平均赔率: {total_mean_odds}')
won_rate = len(data_df[data_df['outcome'] == 1]) / len(data_df)
print(f'{len(data_df)} 场比赛胜率: {won_rate}')
odds_df = data_df.groupby('date').agg({'odds': 'mean'}).reset_index()
res_df = pd.merge(res_df, odds_df, on='date', how='left')
bet_status_df = pd.pivot_table(data_df, index=['date'], columns=[
'bet_status'], aggfunc='size', fill_value=0).reset_index()
res_df = pd.merge(res_df, bet_status_df, on='date', how='left')
res_df['won rate'] = res_df['won'] / (res_df['won'] + res_df['lost'])
if 'market_width' in data_df.columns:
res_df = pd.merge(res_df, market_width_df, on='date', how='left')
res_df.to_csv('data/profit_simulation.csv', index=False, encoding='utf-8-sig')
title = f'收益模拟,年化夏普率: {annualized_sharpe_ratio}, ROI: {roi}'
plot_profit_simulation(data_df=res_df, title=title)
plot_won_lost_mean_odds(data_df=res_df)

5223
test.ipynb

File diff suppressed because it is too large Load Diff