Compare commits

..

6 Commits

Author SHA1 Message Date
be9d9a23d7 代码重构 2025-10-26 13:13:30 +08:00
c6f40b218e 重命名 2025-10-26 12:19:51 +08:00
428cdb1051 pull数据翻页最大页数不能+1; 2025-10-26 11:50:52 +08:00
da80381dd8 添加config str函数 2025-10-26 10:18:10 +08:00
aaf51c96ac 代码重构 2025-10-26 09:55:40 +08:00
9a5fcc8511 正则匹配元素文本没有加r; traceback.print_exc()改为traceback.format_exc() 2025-10-26 09:27:00 +08:00
4 changed files with 980 additions and 496 deletions

View File

@@ -1,423 +1,560 @@
import http """
import sys OddsJam Bet Tracker - 重构版本
按照Python最佳实践重构支持环境变量配置模块化设计
核心功能:
1. 定时从数据库拉取数据上传到OddsJam
2. 定时从OddsJam获取结果数据更新到数据库
"""
from email import message
import os import os
import sys
import re import re
import math import math
import time import time
import json import json
import datetime import datetime
import traceback import traceback
from typing import Optional, List, Dict, Tuple
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd import pandas as pd
from retry import retry from retry import retry
from common.utils import ensure_directory_exists
from dao.Database import Database
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
from loguru import logger from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsjamOrderStatus from data_model import MysqlConfig, OddsjamOrderStatus
from common.dingtalk import DingTalkBot from common.dingtalk import DingTalkBot
from common.utils import ensure_directory_exists
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
dingtalk = DingTalkBot(webhook, secret)
root_dir = os.path.dirname(os.path.abspath(__file__)) @dataclass
class AppConfig:
"""应用配置类 - 支持环境变量配置"""
config_file_path = "./config/mysql_config.json" # 邮箱配置
mysql_config = MysqlConfig.parse_file(config_file_path) email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me")
dao = Database(mysql_config)
# 代理配置
http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890")
# 数据库表名
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
# 定时任务配置
upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48")
pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00")
# 文件路径配置
root_dir: str = os.path.dirname(os.path.abspath(__file__))
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
# 钉钉配置
dingtalk_webhook: str = os.getenv(
"DINGTALK_WEBHOOK",
"https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4",
)
dingtalk_secret: str = os.getenv(
"DINGTALK_SECRET",
"SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e",
)
# 浏览器配置
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000"))
def __str__(self) -> str:
# 返回邮箱和数据表名的信息
return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret)
# 构建文件路径
self.login_state_path = os.path.join(
self.root_dir,
"data",
"bet_data",
"account_login_state",
f"{self.email_account}.json",
)
self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv")
self.bet_name_map_path = os.path.join(
self.root_dir, "data", "bet_data", "betname_map.json"
)
def query_by_create_time(ds: str): class DatabaseService:
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'" """数据库服务类 - 负责所有数据库操作"""
rows = dao.fetchall(query=sql)
if rows:
return [OddsjamOrderStatus(row) for row in rows]
else:
return []
@retry(tries=6) def __init__(self, config: AppConfig):
def update_bet_status(id: str, bet_status: str): self.config = config
sql = "update oddsjam_order_all set bet_status = %s where id = %s" self.dao = Database(config.mysql_config)
dao.execute(query=sql, args=(bet_status, id))
def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]:
"""根据日期查询订单数据"""
sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'"
rows = self.dao.fetchall(query=sql)
return [OddsjamOrderStatus(row) for row in rows] if rows else []
@retry(tries=6)
def update_bet_status(self, bet_id: str, bet_status: str) -> None:
"""更新投注状态"""
sql = f"update {self.config.table_name} set bet_status = %s where id = %s"
self.dao.execute(query=sql, args=(bet_status, bet_id))
class SyncOddsjamBetTracker: class OddsjamService:
def __init__( """OddsJam业务服务类 - 负责与OddsJam网站的交互"""
self,
login_state_path: str, def __init__(self, config: AppConfig):
intercept_response_res_save_path: str, self.config = config
headless: bool = False,
default_time_out: float = 6000000,
):
self.login_state_path = login_state_path
self.headless = headless
self.default_time_out = default_time_out
self.intercept_response_res_save_path = intercept_response_res_save_path
self.total_bet_cnt = 0 self.total_bet_cnt = 0
def login_oddsjam_cookies(self, p, headless=False) -> tuple: def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]:
# 获取HTTP_PROXY环境变量默认为None """获取浏览器上下文"""
import os logger.info(f"加载cookies: {self.config.login_state_path}")
http_proxy = os.environ.get("HTTP_PROXY", None) browser = playwright.chromium.launch(
if http_proxy is None:
http_proxy = "http://127.0.0.1:7890"
logger.info("加载cookies {}", self.login_state_path)
browser = p.chromium.launch(
args=["--start-maximized"], args=["--start-maximized"],
headless=headless, headless=self.config.headless,
proxy={ proxy=(
"server": http_proxy, {"server": self.config.http_proxy} if self.config.http_proxy else None
}, ),
) )
context = browser.new_context( context = browser.new_context(
storage_state=self.login_state_path, no_viewport=True storage_state=self.config.login_state_path, no_viewport=True
) )
page = context.new_page() page = context.new_page()
return page, browser return page, browser
def login_to_site(self): def login_to_site(self) -> None:
logger.info(f"login account: {self.email_account}") """登录到OddsJam网站"""
logger.info(f"登录账户: {self.config.email_account}")
with sync_playwright() as p: with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless) browser = p.chromium.launch(headless=self.config.headless)
page = browser.new_page() page = browser.new_page()
page.set_default_timeout(timeout=self.default_time_out) page.set_default_timeout(timeout=self.config.default_timeout)
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
page.goto(url) page.goto(url)
# page.get_by_label("Close Modal").click()
page.get_by_role("link", name="Login").click() page.get_by_role("link", name="Login").click()
expect(page.get_by_role("button", name="Sign in")).to_be_visible( expect(page.get_by_role("button", name="Sign in")).to_be_visible(
timeout=self.default_time_out timeout=self.config.default_timeout
) )
page.pause() page.pause()
time.sleep(10) time.sleep(10)
try: try:
browser.contexts[0].storage_state(path=self.login_state_path) browser.contexts[0].storage_state(path=self.config.login_state_path)
logger.info(f"登录成功: {page.url}")
except Exception as ex: except Exception as ex:
logger.error(f"保存登录状态失败: {ex}")
traceback.print_exc() traceback.print_exc()
page.screenshot(path="error.png") page.screenshot(path="error.png")
logger.info(page.url, "login_success")
browser.close() browser.close()
@retry(tries=6) @retry(tries=6)
def intercept_import_response(self, route, request): def upload_bets(self, bet_file_path: str) -> None:
if "oddsjam.com/api/backend/bets/import" in request.url: """上传投注数据到OddsJam"""
response = route.fetch(timeout=50000)
logger.info(response.status)
if response.status == 500:
route.fulfill(response=response, json={})
return
route.continue_()
@retry(tries=6)
def upload_new_bets(self, bet_file_path: str):
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p: with sync_playwright() as p:
page: Page page, browser = self._get_browser_context(p)
browser: BrowserContext page.set_default_timeout(timeout=self.config.default_timeout)
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
page.set_default_timeout(timeout=self.default_time_out)
def on_response(response): def on_response(response):
if "dromo-user-imports-production" in response.url: if "dromo-user-imports-production" in response.url:
print(response.status, response.request.failure) logger.info(f"上传响应状态: {response.status}")
if response.status == 400: if response.status == 400:
browser.close() browser.close()
raise Exception(response.request.failure) raise Exception(f"上传失败: {response.request.failure}")
elif "oddsjam.com/api/backend/bets/import" in response.url: elif "oddsjam.com/api/backend/bets/import" in response.url:
logger.info(response.status) logger.info(f"导入响应状态: {response.status}")
page.on("response", on_response) page.on("response", on_response)
page.goto(url) page.goto(url)
# page.pause()
page.get_by_role("button", name="Import Bets").nth(1).click()
time.sleep(5)
iframe_locator = page.frame_locator(
'iframe[title="Dromo Importer\\: Bets"]'
)
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
iframe_locator.get_by_role( # 执行上传流程
"button", name="Confirm selection and continue" self._execute_upload_flow(page, bet_file_path)
).click()
time.sleep(5)
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role("button", name="Continue").click()
iframe_locator.get_by_role("button", name="Finish").click()
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception as ex:
...
# print(ex)
iframe_locator.get_by_role("button", name="Yes").click()
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.default_time_out
)
# os.remove(bet_file_path)
# page.pause()
browser.close() browser.close()
def get_all_bet_status(self, ds: str = None): def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None:
if not ds: """执行上传流程"""
ds = datetime.datetime.now().strftime("%Y%m%d") page.get_by_role("button", name="Import Bets").nth(1).click()
day = datetime.datetime.strptime(ds, "%Y%m%d").day time.sleep(5)
logger.info(f"current date: {ds}")
logger.info(f"current day: {day}") iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]')
# 选择文件
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
# 确认选择
iframe_locator.get_by_role(
"button", name="Confirm selection and continue"
).click()
time.sleep(5)
# 确认匹配
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
# 继续
iframe_locator.get_by_role("button", name="Continue").click()
# 完成
iframe_locator.get_by_role("button", name="Finish").click()
# 处理可能的错误
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception:
pass
iframe_locator.get_by_role("button", name="Yes").click()
# 等待处理完成
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.config.default_timeout
)
def get_all_bet_status(self, date_str: str, status_save_path: str) -> None:
"""获取所有投注状态"""
logger.info(f"获取投注状态,日期: {date_str}")
url = "https://oddsjam.com/bet-tracker" url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p: with sync_playwright() as p:
page: Page page, browser = self._get_browser_context(p)
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
def on_response(response): def on_response(response):
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
try: try:
# 确保响应已完成
if response.request.failure is None and response.status == 200: if response.request.failure is None and response.status == 200:
data = response.json() # 注意: sync_api 中 .json() 是同步的 data = response.json()
self.total_bet_cnt = data["totalCount"] self.total_bet_cnt = data["totalCount"]
logger.info(f"total bet count: {self.total_bet_cnt}") logger.info(f"总投注数量: {self.total_bet_cnt}")
# 保存数据 # 保存响应数据
with open( with open(status_save_path, "a", encoding="utf-8") as f:
self.intercept_response_res_save_path,
"a",
encoding="utf-8",
) as f:
json.dump(data, f, ensure_ascii=False) json.dump(data, f, ensure_ascii=False)
f.write("\n") f.write("\n")
except Exception as e: except Exception as e:
logger.error(f"Error processing response: {e}") logger.error(f"处理响应数据失败: {e}")
# page.route("**", self.intercept_response)
page.on("response", on_response) page.on("response", on_response)
page.set_default_timeout(timeout=self.default_time_out) page.set_default_timeout(timeout=self.config.default_timeout)
page.goto(url=url) page.goto(url=url)
# 处理弹窗
try: try:
page.locator("#cello-widget-app").get_by_role("button").click( page.locator("#cello-widget-app").get_by_role("button").click(
timeout=6000 timeout=6000
) )
except: except Exception:
... pass
# page.locator(".mt-4 > div > .inline-flex").first.click() # 获取总页数并翻页
# time.sleep(5) self._navigate_all_pages(page)
# page.get_by_label("Clear").click()
# time.sleep(5)
# page.get_by_role("button", name="Date Range").click()
# time.sleep(5)
# page.get_by_text("Custom", exact=True).click()
# time.sleep(5)
# page.get_by_role("button", name=f"{day}").click()
# time.sleep(5)
page.get_by_role(
"button", name=re.compile("Show (\d+) Results", re.IGNORECASE)
).click()
inner_text = page.get_by_text( browser.close()
re.compile("Showing 1 to 50 of", re.IGNORECASE)
).inner_text() def _navigate_all_pages(self, page: Page) -> None:
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) """导航所有页面"""
total_bet_cnt = int(match[1]) inner_text = page.get_by_text(
total_page_no = math.ceil(total_bet_cnt / 50) re.compile("Showing 1 to 50 of", re.IGNORECASE)
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages") ).inner_text()
page.pause()
for page_no in range(2, total_page_no): match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
if not match:
raise ValueError("无法获取总结果数")
total_bet_cnt = int(match[1])
total_page_no = math.ceil(total_bet_cnt / 50)
logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no}")
# 翻页到最后一页
for page_no in range(2, total_page_no):
if page_no <= total_page_no:
expect(page.get_by_role("button", name="Next")).to_be_visible( expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out timeout=self.config.default_timeout
) )
page.wait_for_timeout(timeout=3000) page.wait_for_timeout(timeout=3000)
page.get_by_role("button", name="Next").click() page.get_by_role("button", name="Next").click()
logger.info(f"current page number: {page_no} / {total_page_no}") logger.info(f"当前页码: {page_no} / {total_page_no}")
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
expect(page.get_by_role("button", name="Next")).to_be_visible( class BetDataProcessor:
timeout=self.default_time_out """投注数据处理服务类"""
def __init__(self, config: AppConfig):
self.config = config
def prepare_bet_data_for_upload(self, date_str: str) -> str:
"""准备上传的投注数据"""
logger.info(f"准备投注数据,日期: {date_str}")
# 确保目录存在
ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True)
# 从数据库获取数据
db_service = DatabaseService(self.config)
data_list = db_service.query_orders_by_date(date_str)
if not data_list:
message = f"日期 {date_str} 没有找到投注数据"
logger.warning(message)
raise ValueError(message)
# 转换为DataFrame
bet_df = pd.DataFrame([d.to_dict() for d in data_list])
# 列名映射
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
# 处理Sportsbook字段
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
# 加载博彩公司名称映射
with open(self.config.bet_name_map_path, "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
) )
# page.pause()
browser.close()
# 选择需要的列
bet_df = bet_df[col_map.values()]
def get_tomorrow_over_bet_order_data(save_file_path: str, ds): # 格式化时间
logger.info(f"load bet data from db: {ds}") bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
data_list = query_by_create_time(ds=ds) lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime(
data_list = [d.to_dict() for d in data_list] "%Y/%m/%d %H:%M"
bet_df = pd.DataFrame(data_list) )
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
with open("./data/bet_data/betname_map.json", "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
) )
# bet_df = bet_df.explode("Sportsbook")
bet_df = bet_df[col_map.values()]
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
)
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
logger.info(bet_df.shape) # 保存到文件
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False)
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
info_message += f"bet order shape: {bet_df.shape}"
return info_message
logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}")
logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}")
logger.info(f"投注数据形状: {bet_df.shape}")
def update_db_order_status(status_file_path: str): return (
logger.info(status_file_path) f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n"
data_list = [] f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n"
with open(status_file_path, "r", encoding="utf-8") as f: f"投注数据形状: {bet_df.shape}"
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
for _ in range(3):
try:
update_bet_status(id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"update bet status error: {e}")
def pull_data_from_oddsjam_update(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"current date: {ds}")
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
pre_date = ds_date - datetime.timedelta(days=0)
pre_ds = pre_date.strftime("%Y%m%d")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
# oddsjam_bet_tracker.login_to_site()
oddsjam_bet_tracker.get_all_bet_status(ds)
intercept_response_res_save_path = (
oddsjam_bet_tracker.intercept_response_res_save_path
) )
update_db_order_status(status_file_path=intercept_response_res_save_path)
dingtalk.send_text(f"{ds}: 比赛状态更新完成") def process_status_data(self, status_file_path: str) -> None:
except Exception as e: """处理状态数据并更新数据库"""
error_info = traceback.print_exc() logger.info(f"处理状态数据: {status_file_path}")
logger.error(error_info)
dingtalk.send_text(error_info) data_list = []
with open(status_file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
# 处理错误状态
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
# 更新数据库
db_service = DatabaseService(self.config)
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(
f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}"
)
for _ in range(3):
try:
db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"更新投注状态失败: {e}")
def upload_new_bets_data2oddsjam(ds: str = None): class TaskScheduler:
try: """定时任务调度器"""
if ds is None:
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
# oddsjam_bet_tracker.login_to_site() def __init__(self, config: AppConfig):
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv") self.config = config
ensure_directory_exists(target_path=bet_file_path, is_file=True) self.oddsjam_service = OddsjamService(config)
data_info = get_tomorrow_over_bet_order_data( self.data_processor = BetDataProcessor(config)
save_file_path=bet_file_path, ds=ds
def upload_bets_task(self, date_str: Optional[str] = None) -> None:
"""上传 T-1 时间的投注数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = (
datetime.datetime.now() - datetime.timedelta(days=1)
).strftime("%Y%m%d")
logger.info(f"执行上传任务,日期: {date_str}")
# 准备数据
data_info = self.data_processor.prepare_bet_data_for_upload(date_str)
if not data_info:
logger.warning(f"日期 {date_str} 没有数据需要上传")
return
# 上传到OddsJam
self.oddsjam_service.upload_bets(self.config.bet_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"上传任务失败: {error_info}")
self.config.dingtalk.send_text(
f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}"
)
def pull_status_task(self, date_str: Optional[str] = None) -> None:
"""拉取状态数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"执行拉取任务,日期: {date_str}")
# 构建状态文件路径
status_file_path = os.path.join(
self.config.root_dir,
"data",
"bet_data",
"bet_status",
f"{self.config.email_account}_status_{date_str}.json",
)
ensure_directory_exists(target_path=status_file_path, is_file=True)
# 获取状态数据
self.oddsjam_service.get_all_bet_status(date_str, status_file_path)
# 处理状态数据
self.data_processor.process_status_data(status_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}: 比赛状态更新完成\n{config_base_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"拉取任务失败: {error_info}")
self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}")
class BetTrackerApplication:
"""主应用类"""
def __init__(self):
self.config = AppConfig()
self.scheduler = TaskScheduler(self.config)
def run_scheduled_tasks(self) -> None:
"""运行定时任务"""
import schedule
logger.info(f"上传定时任务: {self.config.upload_schedule}")
logger.info(f"拉取定时任务: {self.config.pull_schedule}")
# 设置定时任务
schedule.every().day.at(self.config.pull_schedule).do(
self.scheduler.pull_status_task
)
schedule.every().day.at(self.config.upload_schedule).do(
self.scheduler.upload_bets_task
) )
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
dingtalk.send_text(f"{ds}: \n {data_info}")
except Exception as e:
error_info = traceback.format_exc()
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(0.05)
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker: def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None:
email_account = "aszer27937@gmail.com" """运行手动任务"""
login_state_save_path = os.path.join( if task_type == "upload":
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json" self.scheduler.upload_bets_task(date_str)
) elif task_type == "pull":
ensure_directory_exists(target_path=login_state_save_path, is_file=True) self.scheduler.pull_status_task(date_str)
intercept_response_res_save_path = os.path.join( else:
root_dir, logger.error(f"未知的任务类型: {task_type}")
"data",
"bet_data",
"bet_status",
f"{email_account}_status_{ds}.json",
)
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
oddsjam_bet_tracker = SyncOddsjamBetTracker(
login_state_path=login_state_save_path,
intercept_response_res_save_path=intercept_response_res_save_path,
headless=True
)
return oddsjam_bet_tracker
def main(): def main():
ds = datetime.datetime.now() - datetime.timedelta(days=1) """主入口函数"""
ds = ds.strftime("%Y%m%d") app = BetTrackerApplication()
logger.info(f"current date: {ds}")
pull_data_from_oddsjam_update(ds=ds) # 检查命令行参数
# upload_new_bets_data2oddsjam(ds=ds) if len(sys.argv) > 1:
task_type = sys.argv[1]
date_str = sys.argv[2] if len(sys.argv) > 2 else None
app.run_manual_task(task_type, date_str)
else:
# 运行定时任务
app.run_scheduled_tasks()
if __name__ == "__main__": if __name__ == "__main__":
import schedule main()
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam) # app = BetTrackerApplication()
UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "10:00") # logger.info(app.config)
PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00") # app.run_manual_task(task_type="upload", date_str="20251026")
logger.info(f"upload schedule: {UPLOAD_SCHEDULE}")
logger.info(f"pull schedule: {PULL_SCHEDULE}")
schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update)
schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam)
while True:
schedule.run_pending()
time.sleep(0.05)
# main()

529
ProfitSimulation.py Normal file
View File

@@ -0,0 +1,529 @@
"""
Profit Simulation - 重构版本
按照Python最佳实践重构模块化设计支持配置化
核心功能:
1. 从数据库获取投注订单数据
2. 计算投注收益、赔率、胜率等指标
3. 模拟资金变化和收益率分析
4. 生成可视化图表和分析报告
"""
import os
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd
import plotly.graph_objects as go
from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder
@dataclass
class SimulationConfig:
"""模拟配置类 - 支持环境变量配置"""
# 数据库配置
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
# 模拟参数
initial_balance: float = float(os.getenv("INITIAL_BALANCE", "1000"))
market_width_min: float = float(os.getenv("MARKET_WIDTH_MIN", "20"))
market_width_max: float = float(os.getenv("MARKET_WIDTH_MAX", "25"))
# 文件路径
data_dir: str = os.getenv("DATA_DIR", "./data/bet_simulation/")
output_dir: str = os.getenv("OUTPUT_DIR", "./data/bet_simulation/")
# 缓存配置
enable_cache: bool = os.getenv("ENABLE_CACHE", "true").lower() == "true"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
# 确保目录存在
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
class DataService:
"""数据服务类 - 负责数据获取和缓存"""
def __init__(self, config: SimulationConfig):
self.config = config
self.dao = Database(config.mysql_config)
def get_oddsjam_order_data(self, load_from_cache: bool = None) -> pd.DataFrame:
"""获取OddsJam订单数据"""
if load_from_cache is None:
load_from_cache = self.config.enable_cache
current_date_str = datetime.now().strftime("%Y%m%d")
cache_file_path = os.path.join(
self.config.data_dir, f"oddsjam_order_data_{current_date_str}.feather"
)
# 尝试从缓存加载
if load_from_cache and os.path.exists(cache_file_path):
logger.info(f"从缓存加载数据: {cache_file_path}")
return pd.read_feather(cache_file_path)
# 从数据库获取数据
logger.info("从数据库获取订单数据")
select_query = f"SELECT * FROM bet.{self.config.table_name} where bet_status in ('won', 'lost')"
raw_data_list = self.dao.fetchall(query=select_query)
if not raw_data_list:
logger.warning("未找到符合条件的订单数据")
return pd.DataFrame()
# 转换为DataFrame
order_data_list = [OddsJamOrder(**data).model_dump() for data in raw_data_list]
order_df = pd.DataFrame(order_data_list)
# 保存到缓存
if self.config.enable_cache:
order_df.to_feather(cache_file_path)
logger.info(f"数据已缓存到: {cache_file_path}")
return order_df
def filter_data_by_market_width(self, data_df: pd.DataFrame) -> pd.DataFrame:
"""根据市场宽度过滤数据"""
original_count = len(data_df)
filtered_df = data_df[
(data_df["market_width"] >= self.config.market_width_min)
& (data_df["market_width"] <= self.config.market_width_max)
]
filtered_count = len(filtered_df)
logger.info(f"市场宽度过滤: {original_count} -> {filtered_count} 条记录")
return filtered_df
class ProfitCalculator:
"""收益计算服务类"""
@staticmethod
def calculate_benefit_by_order(order_info: Dict) -> float:
"""根据订单信息计算收益"""
home_or_away = order_info["home_or_away"]
price = order_info[f"{home_or_away}_price"] / 100
if order_info["outcome"] == -1:
return -1
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_odds(row: pd.Series) -> float:
"""计算赔率"""
home_or_away = row["home_or_away"]
price = row[f"{home_or_away}_price"] / 100
if price >= 0:
return price
else:
return 1 / abs(price)
@staticmethod
def calculate_closing_balance(
day_benefit_list: List[float], pre_balance: float = 1000, pre_benefit: float = 0
) -> List[float]:
"""计算日末余额"""
closing_balance_list = []
for benefit in day_benefit_list:
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
closing_balance_list.append(closing_balance)
pre_balance = closing_balance
pre_benefit = benefit
return closing_balance_list
@staticmethod
def calculate_in_transit_funds_ratio(
daily_investment_list: List[float],
closing_balance_list: List[float],
start_closing_balance: float = 1000,
) -> List[float]:
"""计算在途资金比例"""
assert len(daily_investment_list) == len(closing_balance_list)
ratio_list = []
for i, daily_investment in enumerate(daily_investment_list):
if i == 0:
ratio = daily_investment / start_closing_balance
else:
ratio = daily_investment / closing_balance_list[i - 1]
ratio_list.append(ratio)
return ratio_list
class SimulationEngine:
"""模拟引擎类 - 负责收益模拟分析"""
def __init__(self, config: SimulationConfig):
self.config = config
self.calculator = ProfitCalculator()
def simulate_profit(
self, data_df: pd.DataFrame, init_balance: float = None
) -> Tuple[pd.DataFrame, float, float]:
"""执行收益模拟"""
if init_balance is None:
init_balance = self.config.initial_balance
logger.info(f"开始收益模拟,初始资金: {init_balance}")
# 按日期聚合数据
res_df = (
data_df.groupby("date")
.agg({"investment": "sum", "benefit": "sum"})
.reset_index()
)
res_df = res_df.rename(columns={"investment": "当日投入", "benefit": "日收益"})
# 计算收益率指标
res_df["日收益率"] = res_df["日收益"] / res_df["当日投入"]
res_df["累计收益"] = res_df["日收益"].cumsum()
res_df["累计投入"] = res_df["当日投入"].cumsum()
res_df["累计收益率"] = res_df["累计收益"] / res_df["累计投入"]
# 计算日末余额
day_benefit_list = res_df["日收益"].tolist()
closing_balance_list = self.calculator.calculate_closing_balance(
day_benefit_list=day_benefit_list, pre_balance=init_balance
)
res_df["日末余额(1.6天结算)"] = closing_balance_list
# 计算在途资金比例
daily_investment_list = res_df["当日投入"].tolist()
res_df["在途资金比例"] = self.calculator.calculate_in_transit_funds_ratio(
daily_investment_list=daily_investment_list,
closing_balance_list=closing_balance_list,
start_closing_balance=init_balance,
)
# 计算关键指标
annualized_sharpe_ratio = self._calculate_annualized_sharpe_ratio(
res_df, init_balance
)
roi = res_df["日收益"].sum() / res_df["当日投入"].sum()
logger.info(
f"模拟完成 - 年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
)
return res_df, annualized_sharpe_ratio, roi
def _calculate_annualized_sharpe_ratio(
self, res_df: pd.DataFrame, init_balance: float
) -> float:
"""计算年化夏普率"""
if res_df["日收益率"].std() == 0:
return 0
return (
res_df["日收益"].sum()
/ init_balance
/ res_df["日收益率"].std()
* ((365 / len(res_df)) ** 0.5)
)
def calculate_statistics(self, data_df: pd.DataFrame) -> Dict[str, float]:
"""计算统计指标"""
data_df["odds"] = data_df.apply(self.calculator.calculate_odds, axis=1)
total_mean_odds = data_df["odds"].mean()
won_rate = len(data_df[data_df["outcome"] == 1]) / len(data_df)
logger.info(f"统计指标 - 平均赔率: {total_mean_odds:.4f}, 胜率: {won_rate:.4f}")
return {
"total_mean_odds": total_mean_odds,
"won_rate": won_rate,
"total_bets": len(data_df),
}
class VisualizationService:
"""可视化服务类 - 负责图表生成"""
def __init__(self, config: SimulationConfig):
self.config = config
def plot_won_lost_mean_odds(
self, data_df: pd.DataFrame, output_path: str = None
) -> None:
"""绘制胜负数量和平均赔率图表"""
if output_path is None:
output_path = os.path.join(
self.config.output_dir, "won_lost_mean_odds.html"
)
logger.info("生成胜负数量和平均赔率图表")
data_df = data_df.sort_values(by="date")
date_x = data_df["date"].tolist()
fig = go.Figure()
# 添加胜负柱状图
cols = ["won", "lost"]
for col in cols:
if col in data_df.columns:
y_data = data_df[col].tolist()
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis="y1"))
# 添加平均赔率折线图
if "odds" in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df["odds"],
mode="markers+lines",
name="平均赔率",
yaxis="y2",
)
)
fig.update_layout(
barmode="group",
font=dict(family="Times New Roman"),
title="每天胜负数量以及平均赔率",
xaxis=dict(title="日期"),
yaxis=dict(title="数量"),
yaxis2=dict(title="赔率", overlaying="y", side="right"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
def plot_profit_simulation(
self, data_df: pd.DataFrame, title: str = None, output_path: str = None
) -> None:
"""绘制收益模拟图表"""
if output_path is None:
output_path = os.path.join(self.config.output_dir, "profit_simulation.html")
if title is None:
title = "收益模拟"
logger.info("生成收益模拟图表")
fig = go.Figure()
# 添加日末余额柱状图
fig.add_trace(
go.Bar(
x=data_df["date"],
y=data_df["日末余额(1.6天结算)"],
name="日末余额",
yaxis="y1",
)
)
# 添加收益率折线图
for col in ["日收益率", "累计收益率", "在途资金比例"]:
if col in data_df.columns:
fig.add_trace(
go.Scatter(
x=data_df["date"],
y=data_df[col],
mode="markers+lines",
name=col,
yaxis="y2",
)
)
fig.update_layout(
title=title,
font=dict(family="Times New Roman"),
xaxis=dict(title="日期"),
yaxis=dict(title="金额"),
yaxis2=dict(title="收益率", overlaying="y", side="right", tickformat=".1%"),
)
fig.write_html(output_path)
logger.info(f"图表已保存到: {output_path}")
class ProfitAnalysisApp:
"""主应用类 - 协调各个服务完成分析"""
def __init__(self, config: SimulationConfig = None):
self.config = config or SimulationConfig()
self.data_service = DataService(self.config)
self.simulation_engine = SimulationEngine(self.config)
self.visualization_service = VisualizationService(self.config)
def run_analysis(self, load_from_cache: bool = None) -> Dict:
"""运行完整的收益分析"""
logger.info("开始收益分析")
# 获取数据
order_df = self.data_service.get_oddsjam_order_data(load_from_cache)
if order_df.empty:
logger.error("未获取到数据,分析终止")
return {}
# 数据预处理
order_df = order_df[~order_df["home_or_away"].isna()]
order_df["outcome"] = order_df["bet_status"].apply(
lambda x: 1 if x == "won" else -1
)
order_df["benefit"] = order_df.apply(
lambda row: ProfitCalculator.calculate_benefit_by_order(row.to_dict()),
axis=1,
)
order_df["date"] = order_df["start_timestamp"].apply(
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d")
)
# 过滤数据
data_df = self.data_service.filter_data_by_market_width(order_df)
if data_df.empty:
logger.error("过滤后无数据,分析终止")
return {}
# 设置投资金额
data_df["investment"] = 1
# 执行模拟
res_df, annualized_sharpe_ratio, roi = self.simulation_engine.simulate_profit(
data_df
)
# 计算统计指标
stats = self.simulation_engine.calculate_statistics(data_df)
# 合并数据
res_df = self._merge_additional_data(res_df, data_df)
# 保存结果
self._save_results(res_df, annualized_sharpe_ratio, roi, stats)
# 生成图表
self._generate_charts(res_df, annualized_sharpe_ratio, roi)
# 返回分析结果
result = {
"summary": {
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
},
"data": res_df,
}
logger.info("收益分析完成")
return result
def _merge_additional_data(
self, res_df: pd.DataFrame, data_df: pd.DataFrame
) -> pd.DataFrame:
"""合并额外数据"""
# 合并平均赔率
odds_df = data_df.groupby("date").agg({"odds": "mean"}).reset_index()
res_df = pd.merge(res_df, odds_df, on="date", how="left")
# 合并胜负统计
bet_status_df = pd.pivot_table(
data_df,
index=["date"],
columns=["bet_status"],
aggfunc="size",
fill_value=0,
).reset_index()
res_df = pd.merge(res_df, bet_status_df, on="date", how="left")
# 计算胜率
if "won" in res_df.columns and "lost" in res_df.columns:
res_df["won rate"] = res_df["won"] / (res_df["won"] + res_df["lost"])
# 合并市场宽度
if "market_width" in data_df.columns:
market_width_df = (
data_df.groupby("date").agg({"market_width": "mean"}).reset_index()
)
res_df = pd.merge(res_df, market_width_df, on="date", how="left")
return res_df
def _save_results(
self,
res_df: pd.DataFrame,
annualized_sharpe_ratio: float,
roi: float,
stats: Dict,
) -> None:
"""保存分析结果"""
output_path = os.path.join(self.config.output_dir, "profit_simulation.csv")
res_df.to_csv(output_path, index=False, encoding="utf-8-sig")
logger.info(f"分析结果已保存到: {output_path}")
# 保存摘要信息
summary = {
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"annualized_sharpe_ratio": annualized_sharpe_ratio,
"roi": roi,
"total_bets": stats["total_bets"],
"won_rate": stats["won_rate"],
"mean_odds": stats["total_mean_odds"],
"market_width_range": f"{self.config.market_width_min}-{self.config.market_width_max}",
"initial_balance": self.config.initial_balance,
}
summary_path = os.path.join(self.config.output_dir, "analysis_summary.json")
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"分析摘要已保存到: {summary_path}")
def _generate_charts(
self, res_df: pd.DataFrame, annualized_sharpe_ratio: float, roi: float
) -> None:
"""生成图表"""
title = f"收益模拟,年化夏普率: {annualized_sharpe_ratio:.4f}, ROI: {roi:.4f}"
self.visualization_service.plot_profit_simulation(data_df=res_df, title=title)
self.visualization_service.plot_won_lost_mean_odds(data_df=res_df)
def main():
"""主入口函数"""
# 创建配置
config = SimulationConfig()
# 创建应用实例
app = ProfitAnalysisApp(config)
# 运行分析
result = app.run_analysis()
if result:
summary = result["summary"]
print(f"分析完成!")
print(f"年化夏普率: {summary['annualized_sharpe_ratio']:.4f}")
print(f"ROI: {summary['roi']:.4f}")
print(f"总投注数: {summary['total_bets']}")
print(f"胜率: {summary['won_rate']:.4f}")
print(f"平均赔率: {summary['mean_odds']:.4f}")
if __name__ == "__main__":
main()

1
config/__init__.py Normal file
View File

@@ -0,0 +1 @@
# 配置模块

View File

@@ -1,183 +0,0 @@
import pandas as pd
from datetime import datetime
from dao.Database import Database
from data_model import MysqlConfig, OddsJamOrder, OddsjamBet
from typing import List
import json
import os
import plotly.graph_objects as go
def get_oddsjam_order_data_from_db(load_from_local: bool = False) -> pd.DataFrame:
current_date_str = datetime.now().strftime('%Y%m%d')
file_path = os.path.join('data', f'oddsjam_order_data_{current_date_str}.csv')
if load_from_local and os.path.exists(file_path):
return pd.read_csv(file_path, low_memory=False)
config_file_path = 'config\mysql_config.json'
mysql_config = MysqlConfig.parse_file(config_file_path)
dao = Database(mysql_config)
select_query = "SELECT * FROM bet.oddsjam_order where bet_status in ('won', 'lost');"
raw_data_list = dao.fetchall(query=select_query)
order_data_list = [OddsJamOrder(**data).model_dump()
for data in raw_data_list]
order_df = pd.DataFrame(order_data_list)
order_df.to_csv(file_path, index=False, encoding='utf-8-sig')
return order_df
def calc_benefit_by_order_info(order_info: dict) -> float:
home_or_away = order_info['home_or_away']
price = order_info[f'{home_or_away}_price'] / 100
if order_info['outcome'] == -1:
return -1
if price >= 0:
return price
else:
return 1 / abs(price)
def calc_odds(row):
home_or_away = row['home_or_away']
price = row[f'{home_or_away}_price'] / 100
if price >= 0:
return price
else:
return 1 / abs(price)
def clac_closing_balance(day_benefit_list: List, pre_balance: float = 1000, pre_benefit: float = 0) -> List:
closing_balance_list = []
for i, benefit in enumerate(day_benefit_list):
closing_balance = pre_balance + pre_benefit / 3 + benefit * 2 / 3
closing_balance_list.append(closing_balance)
pre_balance = closing_balance
pre_benefit = benefit
return closing_balance_list
def calc_in_transit_funds_ratio(daily_investment_list: List, closing_balance_list: List, start_closing_balance: float = 1000) -> List:
assert len(daily_investment_list) == len(closing_balance_list)
ratio_list = []
for i, daily_investment in enumerate(daily_investment_list):
if i == 0:
ratio = daily_investment / start_closing_balance
else:
ratio = daily_investment / closing_balance_list[i-1]
ratio_list.append(ratio)
return ratio_list
def simulate_profit(data_df: pd.DataFrame, init_balance=1000) -> pd.DataFrame:
res_df = data_df.groupby('date').agg({'investment': 'sum', 'benefit': 'sum'}).reset_index()
# res_df['日期'] = pd.to_datetime(res_df['date'])
res_df = res_df.rename(columns={'investment': '当日投入', 'benefit': '日收益'})
res_df['日收益率'] = res_df['日收益'] / res_df['当日投入']
res_df['累计收益'] = res_df['日收益'].cumsum()
res_df['累计投入'] = res_df['当日投入'].cumsum()
res_df['累计收益率'] = res_df['累计收益'] / res_df['累计投入']
day_benefit_list = res_df['日收益'].tolist()
closing_balance_list = clac_closing_balance(day_benefit_list=day_benefit_list,
pre_balance=init_balance)
res_df['日末余额(1.6天结算)'] = closing_balance_list
daily_investment_list = res_df['当日投入'].tolist()
res_df['在途资金比例'] = calc_in_transit_funds_ratio(daily_investment_list=daily_investment_list,
closing_balance_list=closing_balance_list,
start_closing_balance=init_balance)
annualized_sharpe_ratio = res_df['日收益'].sum() / init_balance / res_df['日收益率'].std() * ((365 / len(res_df))**0.5)
roi = res_df['日收益'].sum() / res_df['当日投入'].sum()
return res_df, annualized_sharpe_ratio, roi
def plot_won_lost_mean_odds(data_df: pd.DataFrame):
data_df = data_df.sort_values(by='date')
date_x = data_df['date'].tolist()
fig = go.Figure()
cols = ['won', 'lost']
for col in cols:
y_data = data_df[col].tolist()
fig.add_trace(go.Bar(x=date_x, y=y_data, name=col, yaxis='y1'))
fig.add_trace(go.Scatter(x=data_df['date'], y=data_df['odds'], mode='markers+lines', name='平均赔率', yaxis='y2'))
fig.update_layout(
barmode='group',
font=dict(family="Times New Roman"),
title='每天胜负数量以及平均赔率',
xaxis=dict(title='日期'),
yaxis=dict(title='数量'),
yaxis2=dict(title='赔率', overlaying='y', side='right'),
)
fig.write_html('data/won_lost_mean_odds.html')
def plot_profit_simulation(data_df: pd.DataFrame, title: str = None):
fig = go.Figure()
fig.add_trace(go.Bar(x=data_df['date'], y=data_df['日末余额(1.6天结算)'], name='日末余额', yaxis='y1'))
for col in ['日收益率', '累计收益率', '在途资金比例']:
fig.add_trace(go.Scatter(
x=data_df['date'],
y=data_df[col],
mode='markers+lines',
name=col,
yaxis='y2'))
if title is None:
title = '收益模拟'
fig.update_layout(
title=title,
font=dict(family="Times New Roman"),
xaxis=dict(title='日期'),
yaxis=dict(title='金额'),
yaxis2=dict(title='收益率', overlaying='y', side='right', tickformat='.1%'))
fig.write_html('data/profit_simulation.html')
if __name__ == '__main__':
order_df = get_oddsjam_order_data_from_db(load_from_local=True)
# order_df = pd.read_excel('data/PEV 3.11-10.26.xlsx', sheet_name='原始数据')
order_df['outcome'] = order_df['bet_status'].apply(lambda x: 1 if x == 'won' else -1)
order_df['benefit'] = order_df.apply(lambda row: calc_benefit_by_order_info(row.to_dict()), axis=1)
order_df['date'] = order_df['start_timestamp'].apply(
lambda x: datetime.fromtimestamp(x // 1000).strftime("%Y-%m-%d"))
data_df = order_df.copy()
data_df = data_df[data_df['market_width'] <= 25]
data_df = data_df[data_df['market_width'] >= 20]
market_width_df = data_df.groupby('date').agg({'market_width': 'mean'}).reset_index()
data_df['investment'] = 1
res_df, annualized_sharpe_ratio, roi = simulate_profit(data_df)
print(f'年化夏普率: {annualized_sharpe_ratio}')
print(f'ROI: {roi}')
data_df['odds'] = data_df.apply(calc_odds, axis=1)
total_mean_odds = data_df['odds'].mean()
print(f'{len(data_df)} 场比赛平均赔率: {total_mean_odds}')
won_rate = len(data_df[data_df['outcome'] == 1]) / len(data_df)
print(f'{len(data_df)} 场比赛胜率: {won_rate}')
odds_df = data_df.groupby('date').agg({'odds': 'mean'}).reset_index()
res_df = pd.merge(res_df, odds_df, on='date', how='left')
bet_status_df = pd.pivot_table(data_df, index=['date'], columns=[
'bet_status'], aggfunc='size', fill_value=0).reset_index()
res_df = pd.merge(res_df, bet_status_df, on='date', how='left')
res_df['won rate'] = res_df['won'] / (res_df['won'] + res_df['lost'])
if 'market_width' in data_df.columns:
res_df = pd.merge(res_df, market_width_df, on='date', how='left')
res_df.to_csv('data/profit_simulation.csv', index=False, encoding='utf-8-sig')
title = f'收益模拟,年化夏普率: {annualized_sharpe_ratio}, ROI: {roi}'
plot_profit_simulation(data_df=res_df, title=title)
plot_won_lost_mean_odds(data_df=res_df)