Files
bet/OddsjamBetTrackerRefactored.py
2025-10-26 10:18:10 +08:00

557 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
OddsJam Bet Tracker - 重构版本
按照Python最佳实践重构支持环境变量配置模块化设计
核心功能:
1. 定时从数据库拉取数据上传到OddsJam
2. 定时从OddsJam获取结果数据更新到数据库
"""
from email import message
import os
import sys
import re
import math
import time
import json
import datetime
import traceback
from typing import Optional, List, Dict, Tuple
from dataclasses import dataclass
from abc import ABC, abstractmethod
import pandas as pd
from retry import retry
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
from loguru import logger
# 导入项目模块
from dao.Database import Database
from data_model import MysqlConfig, OddsjamOrderStatus
from common.dingtalk import DingTalkBot
from common.utils import ensure_directory_exists
@dataclass
class AppConfig:
"""应用配置类 - 支持环境变量配置"""
# 邮箱配置
email_account: str = os.getenv("ODDSJAM_EMAIL", "aszer27937@gmail.com")
# 代理配置
http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890")
# 数据库表名
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order_all")
# 定时任务配置
upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48")
pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00")
# 文件路径配置
root_dir: str = os.path.dirname(os.path.abspath(__file__))
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
# 钉钉配置
dingtalk_webhook: str = os.getenv(
"DINGTALK_WEBHOOK",
"https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4",
)
dingtalk_secret: str = os.getenv(
"DINGTALK_SECRET",
"SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e",
)
# 浏览器配置
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000"))
def __str__(self) -> str:
# 返回邮箱和数据表名的信息
return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})"
def __post_init__(self):
"""初始化后处理"""
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret)
# 构建文件路径
self.login_state_path = os.path.join(
self.root_dir,
"data",
"bet_data",
"account_login_state",
f"{self.email_account}.json",
)
self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv")
self.bet_name_map_path = os.path.join(
self.root_dir, "data", "bet_data", "betname_map.json"
)
class DatabaseService:
"""数据库服务类 - 负责所有数据库操作"""
def __init__(self, config: AppConfig):
self.config = config
self.dao = Database(config.mysql_config)
def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]:
"""根据日期查询订单数据"""
sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'"
rows = self.dao.fetchall(query=sql)
return [OddsjamOrderStatus(row) for row in rows] if rows else []
@retry(tries=6)
def update_bet_status(self, bet_id: str, bet_status: str) -> None:
"""更新投注状态"""
sql = f"update {self.config.table_name} set bet_status = %s where id = %s"
self.dao.execute(query=sql, args=(bet_status, bet_id))
class OddsjamService:
"""OddsJam业务服务类 - 负责与OddsJam网站的交互"""
def __init__(self, config: AppConfig):
self.config = config
self.total_bet_cnt = 0
def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]:
"""获取浏览器上下文"""
logger.info(f"加载cookies: {self.config.login_state_path}")
browser = playwright.chromium.launch(
args=["--start-maximized"],
headless=self.config.headless,
proxy=(
{"server": self.config.http_proxy} if self.config.http_proxy else None
),
)
context = browser.new_context(
storage_state=self.config.login_state_path, no_viewport=True
)
page = context.new_page()
return page, browser
def login_to_site(self) -> None:
"""登录到OddsJam网站"""
logger.info(f"登录账户: {self.config.email_account}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.config.headless)
page = browser.new_page()
page.set_default_timeout(timeout=self.config.default_timeout)
url = "https://oddsjam.com/bet-tracker"
page.goto(url)
page.get_by_role("link", name="Login").click()
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
timeout=self.config.default_timeout
)
page.pause()
time.sleep(10)
try:
browser.contexts[0].storage_state(path=self.config.login_state_path)
logger.info(f"登录成功: {page.url}")
except Exception as ex:
logger.error(f"保存登录状态失败: {ex}")
traceback.print_exc()
page.screenshot(path="error.png")
browser.close()
@retry(tries=6)
def upload_bets(self, bet_file_path: str) -> None:
"""上传投注数据到OddsJam"""
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page, browser = self._get_browser_context(p)
page.set_default_timeout(timeout=self.config.default_timeout)
def on_response(response):
if "dromo-user-imports-production" in response.url:
logger.info(f"上传响应状态: {response.status}")
if response.status == 400:
browser.close()
raise Exception(f"上传失败: {response.request.failure}")
elif "oddsjam.com/api/backend/bets/import" in response.url:
logger.info(f"导入响应状态: {response.status}")
page.on("response", on_response)
page.goto(url)
# 执行上传流程
self._execute_upload_flow(page, bet_file_path)
browser.close()
def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None:
"""执行上传流程"""
page.get_by_role("button", name="Import Bets").nth(1).click()
time.sleep(5)
iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]')
# 选择文件
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
# 确认选择
iframe_locator.get_by_role(
"button", name="Confirm selection and continue"
).click()
time.sleep(5)
# 确认匹配
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
# 继续
iframe_locator.get_by_role("button", name="Continue").click()
# 完成
iframe_locator.get_by_role("button", name="Finish").click()
# 处理可能的错误
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception:
pass
iframe_locator.get_by_role("button", name="Yes").click()
# 等待处理完成
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.config.default_timeout
)
def get_all_bet_status(self, date_str: str, status_save_path: str) -> None:
"""获取所有投注状态"""
logger.info(f"获取投注状态,日期: {date_str}")
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page, browser = self._get_browser_context(p)
def on_response(response):
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
try:
if response.request.failure is None and response.status == 200:
data = response.json()
self.total_bet_cnt = data["totalCount"]
logger.info(f"总投注数量: {self.total_bet_cnt}")
# 保存响应数据
with open(status_save_path, "a", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
f.write("\n")
except Exception as e:
logger.error(f"处理响应数据失败: {e}")
page.on("response", on_response)
page.set_default_timeout(timeout=self.config.default_timeout)
page.goto(url=url)
# 处理弹窗
try:
page.locator("#cello-widget-app").get_by_role("button").click(
timeout=6000
)
except Exception:
pass
# 获取总页数并翻页
self._navigate_all_pages(page)
browser.close()
def _navigate_all_pages(self, page: Page) -> None:
"""导航所有页面"""
inner_text = page.get_by_text(
re.compile("Showing 1 to 50 of", re.IGNORECASE)
).inner_text()
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
if not match:
raise ValueError("无法获取总结果数")
total_bet_cnt = int(match[1])
total_page_no = math.ceil(total_bet_cnt / 50)
logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no}")
# 翻页到最后一页
for page_no in range(2, total_page_no + 1):
if page_no <= total_page_no:
expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.config.default_timeout
)
page.wait_for_timeout(timeout=3000)
page.get_by_role("button", name="Next").click()
logger.info(f"当前页码: {page_no} / {total_page_no}")
class BetDataProcessor:
"""投注数据处理服务类"""
def __init__(self, config: AppConfig):
self.config = config
def prepare_bet_data_for_upload(self, date_str: str) -> str:
"""准备上传的投注数据"""
logger.info(f"准备投注数据,日期: {date_str}")
# 确保目录存在
ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True)
# 从数据库获取数据
db_service = DatabaseService(self.config)
data_list = db_service.query_orders_by_date(date_str)
if not data_list:
message = f"日期 {date_str} 没有找到投注数据"
logger.warning(message)
raise ValueError(message)
# 转换为DataFrame
bet_df = pd.DataFrame([d.to_dict() for d in data_list])
# 列名映射
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
# 处理Sportsbook字段
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
# 加载博彩公司名称映射
with open(self.config.bet_name_map_path, "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
)
# 选择需要的列
bet_df = bet_df[col_map.values()]
# 格式化时间
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime(
"%Y/%m/%d %H:%M"
)
)
# 保存到文件
bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False)
logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}")
logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}")
logger.info(f"投注数据形状: {bet_df.shape}")
return (
f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n"
f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n"
f"投注数据形状: {bet_df.shape}"
)
def process_status_data(self, status_file_path: str) -> None:
"""处理状态数据并更新数据库"""
logger.info(f"处理状态数据: {status_file_path}")
data_list = []
with open(status_file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
# 处理错误状态
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
# 更新数据库
db_service = DatabaseService(self.config)
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(
f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}"
)
for _ in range(3):
try:
db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"更新投注状态失败: {e}")
class TaskScheduler:
"""定时任务调度器"""
def __init__(self, config: AppConfig):
self.config = config
self.oddsjam_service = OddsjamService(config)
self.data_processor = BetDataProcessor(config)
def upload_bets_task(self, date_str: Optional[str] = None) -> None:
"""上传投注数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = (
datetime.datetime.now() - datetime.timedelta(days=1)
).strftime("%Y%m%d")
logger.info(f"执行上传任务,日期: {date_str}")
# 准备数据
data_info = self.data_processor.prepare_bet_data_for_upload(date_str)
if not data_info:
logger.warning(f"日期 {date_str} 没有数据需要上传")
return
# 上传到OddsJam
self.oddsjam_service.upload_bets(self.config.bet_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}: \n {config_base_info} \n {data_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"上传任务失败: {error_info}")
self.config.dingtalk.send_text(
f"{date_str}: \n {config_base_info} \n 上传比赛失败: {e}\n{error_info}"
)
def pull_status_task(self, date_str: Optional[str] = None) -> None:
"""拉取状态数据任务"""
config_base_info = str(self.config)
try:
if date_str is None:
date_str = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"执行拉取任务,日期: {date_str}")
# 构建状态文件路径
status_file_path = os.path.join(
self.config.root_dir,
"data",
"bet_data",
"bet_status",
f"{self.config.email_account}_status_{date_str}.json",
)
ensure_directory_exists(target_path=status_file_path, is_file=True)
# 获取状态数据
self.oddsjam_service.get_all_bet_status(date_str, status_file_path)
# 处理状态数据
self.data_processor.process_status_data(status_file_path)
# 发送通知
self.config.dingtalk.send_text(
f"{date_str}: 比赛状态更新完成\n{config_base_info}"
)
except Exception as e:
error_info = traceback.format_exc()
logger.error(f"拉取任务失败: {error_info}")
self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}")
class BetTrackerApplication:
"""主应用类"""
def __init__(self):
self.config = AppConfig()
self.scheduler = TaskScheduler(self.config)
def run_scheduled_tasks(self) -> None:
"""运行定时任务"""
import schedule
logger.info(f"上传定时任务: {self.config.upload_schedule}")
logger.info(f"拉取定时任务: {self.config.pull_schedule}")
# 设置定时任务
schedule.every().day.at(self.config.pull_schedule).do(
self.scheduler.pull_status_task
)
schedule.every().day.at(self.config.upload_schedule).do(
self.scheduler.upload_bets_task
)
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(0.05)
def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None:
"""运行手动任务"""
if task_type == "upload":
self.scheduler.upload_bets_task(date_str)
elif task_type == "pull":
self.scheduler.pull_status_task(date_str)
else:
logger.error(f"未知的任务类型: {task_type}")
def main():
"""主入口函数"""
app = BetTrackerApplication()
# 检查命令行参数
if len(sys.argv) > 1:
task_type = sys.argv[1]
date_str = sys.argv[2] if len(sys.argv) > 2 else None
app.run_manual_task(task_type, date_str)
else:
# 运行定时任务
app.run_scheduled_tasks()
if __name__ == "__main__":
main()