""" OddsJam Bet Tracker - 重构版本 按照Python最佳实践重构,支持环境变量配置,模块化设计 核心功能: 1. 定时从数据库拉取数据上传到OddsJam 2. 定时从OddsJam获取结果数据更新到数据库 """ from email import message import os import sys import re import math import time import json import datetime import traceback from typing import Optional, List, Dict, Tuple from dataclasses import dataclass from abc import ABC, abstractmethod import pandas as pd from retry import retry from playwright.sync_api import sync_playwright, Page, BrowserContext, expect from loguru import logger # 导入项目模块 from dao.Database import Database from data_model import MysqlConfig, OddsjamOrderStatus from common.dingtalk import DingTalkBot from common.utils import ensure_directory_exists @dataclass class AppConfig: """应用配置类 - 支持环境变量配置""" # 邮箱配置 email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me") # 代理配置 http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890") # 数据库表名 table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order") # 定时任务配置 upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48") pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00") # 文件路径配置 root_dir: str = os.path.dirname(os.path.abspath(__file__)) config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json") # 钉钉配置 dingtalk_webhook: str = os.getenv( "DINGTALK_WEBHOOK", "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4", ) dingtalk_secret: str = os.getenv( "DINGTALK_SECRET", "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e", ) # 浏览器配置 headless: bool = os.getenv("HEADLESS", "true").lower() == "true" default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000")) def __str__(self) -> str: # 返回邮箱和数据表名的信息 return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})" def __post_init__(self): """初始化后处理""" self.mysql_config = MysqlConfig.parse_file(self.config_file_path) self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret) # 构建文件路径 self.login_state_path = os.path.join( self.root_dir, "data", "bet_data", "account_login_state", f"{self.email_account}.json", ) self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv") self.bet_name_map_path = os.path.join( self.root_dir, "data", "bet_data", "betname_map.json" ) class DatabaseService: """数据库服务类 - 负责所有数据库操作""" def __init__(self, config: AppConfig): self.config = config self.dao = Database(config.mysql_config) def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]: """根据日期查询订单数据""" sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'" rows = self.dao.fetchall(query=sql) return [OddsjamOrderStatus(row) for row in rows] if rows else [] @retry(tries=6) def update_bet_status(self, bet_id: str, bet_status: str) -> None: """更新投注状态""" sql = f"update {self.config.table_name} set bet_status = %s where id = %s" self.dao.execute(query=sql, args=(bet_status, bet_id)) class OddsjamService: """OddsJam业务服务类 - 负责与OddsJam网站的交互""" def __init__(self, config: AppConfig): self.config = config self.total_bet_cnt = 0 def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]: """获取浏览器上下文""" logger.info(f"加载cookies: {self.config.login_state_path}") browser = playwright.chromium.launch( args=["--start-maximized"], headless=self.config.headless, proxy=( {"server": self.config.http_proxy} if self.config.http_proxy else None ), ) context = browser.new_context( storage_state=self.config.login_state_path, no_viewport=True ) page = context.new_page() return page, browser def login_to_site(self) -> None: """登录到OddsJam网站""" logger.info(f"登录账户: {self.config.email_account}") with sync_playwright() as p: browser = p.chromium.launch(headless=self.config.headless) page = browser.new_page() page.set_default_timeout(timeout=self.config.default_timeout) url = "https://oddsjam.com/bet-tracker" page.goto(url) page.get_by_role("link", name="Login").click() expect(page.get_by_role("button", name="Sign in")).to_be_visible( timeout=self.config.default_timeout ) page.pause() time.sleep(10) try: browser.contexts[0].storage_state(path=self.config.login_state_path) logger.info(f"登录成功: {page.url}") except Exception as ex: logger.error(f"保存登录状态失败: {ex}") traceback.print_exc() page.screenshot(path="error.png") browser.close() @retry(tries=6) def upload_bets(self, bet_file_path: str) -> None: """上传投注数据到OddsJam""" url = "https://oddsjam.com/bet-tracker" with sync_playwright() as p: page, browser = self._get_browser_context(p) page.set_default_timeout(timeout=self.config.default_timeout) def on_response(response): if "dromo-user-imports-production" in response.url: logger.info(f"上传响应状态: {response.status}") if response.status == 400: browser.close() raise Exception(f"上传失败: {response.request.failure}") elif "oddsjam.com/api/backend/bets/import" in response.url: logger.info(f"导入响应状态: {response.status}") page.on("response", on_response) page.goto(url) # 执行上传流程 self._execute_upload_flow(page, bet_file_path) browser.close() def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None: """执行上传流程""" page.get_by_role("button", name="Import Bets").nth(1).click() time.sleep(5) iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]') # 选择文件 with page.expect_file_chooser() as fc_info: iframe_locator.get_by_role( "button", name="Choose a file", exact=True ).click() time.sleep(5) file_chooser = fc_info.value file_chooser.set_files(bet_file_path) # 确认选择 iframe_locator.get_by_role( "button", name="Confirm selection and continue" ).click() time.sleep(5) # 确认匹配 iframe_locator.get_by_role( "button", name="Confirm matching and continue" ).click() time.sleep(5) # 继续 iframe_locator.get_by_role("button", name="Continue").click() # 完成 iframe_locator.get_by_role("button", name="Finish").click() # 处理可能的错误 try: iframe_locator.get_by_role("button", name="Submit anyway").click( timeout=5000 ) except Exception: pass iframe_locator.get_by_role("button", name="Yes").click() # 等待处理完成 expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( timeout=self.config.default_timeout ) def get_all_bet_status(self, date_str: str, status_save_path: str) -> None: """获取所有投注状态""" logger.info(f"获取投注状态,日期: {date_str}") url = "https://oddsjam.com/bet-tracker" with sync_playwright() as p: page, browser = self._get_browser_context(p) def on_response(response): if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: try: if response.request.failure is None and response.status == 200: data = response.json() self.total_bet_cnt = data["totalCount"] logger.info(f"总投注数量: {self.total_bet_cnt}") # 保存响应数据 with open(status_save_path, "a", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) f.write("\n") except Exception as e: logger.error(f"处理响应数据失败: {e}") page.on("response", on_response) page.set_default_timeout(timeout=self.config.default_timeout) page.goto(url=url) # 处理弹窗 try: page.locator("#cello-widget-app").get_by_role("button").click( timeout=6000 ) except Exception: pass # 获取总页数并翻页 self._navigate_all_pages(page) browser.close() def _navigate_all_pages(self, page: Page) -> None: """导航所有页面""" inner_text = page.get_by_text( re.compile("Showing 1 to 50 of", re.IGNORECASE) ).inner_text() match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) if not match: raise ValueError("无法获取总结果数") total_bet_cnt = int(match[1]) total_page_no = math.ceil(total_bet_cnt / 50) logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页") # 翻页到最后一页 for page_no in range(2, total_page_no): if page_no <= total_page_no: expect(page.get_by_role("button", name="Next")).to_be_visible( timeout=self.config.default_timeout ) page.wait_for_timeout(timeout=3000) page.get_by_role("button", name="Next").click() logger.info(f"当前页码: {page_no} / {total_page_no}") class BetDataProcessor: """投注数据处理服务类""" def __init__(self, config: AppConfig): self.config = config def prepare_bet_data_for_upload(self, date_str: str) -> str: """准备上传的投注数据""" logger.info(f"准备投注数据,日期: {date_str}") # 确保目录存在 ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True) # 从数据库获取数据 db_service = DatabaseService(self.config) data_list = db_service.query_orders_by_date(date_str) if not data_list: message = f"日期 {date_str} 没有找到投注数据" logger.warning(message) raise ValueError(message) # 转换为DataFrame bet_df = pd.DataFrame([d.to_dict() for d in data_list]) # 列名映射 col_map = { "sportsbooks": "Sportsbook", "bet_name": "Bet Name", "market": "Market Name", "price": "Odds", "stake": "Stake", "event_name": "Event Name", "sport": "Sport", "league": "League", "game_id": "Game ID", "bet_type": "Bet Type", "bet_id": "Notes", "start_timestamp": "Game Start Date", } bet_df = bet_df.rename(columns=col_map) # 处理Sportsbook字段 bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) # 加载博彩公司名称映射 with open(self.config.bet_name_map_path, "r") as f: bet_name_map = json.load(f) bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( lambda x: bet_name_map.get(x, x) ) # 选择需要的列 bet_df = bet_df[col_map.values()] # 格式化时间 bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime( "%Y/%m/%d %H:%M" ) ) # 保存到文件 bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False) logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}") logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}") logger.info(f"投注数据形状: {bet_df.shape}") return ( f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n" f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n" f"投注数据形状: {bet_df.shape}" ) def process_status_data(self, status_file_path: str) -> None: """处理状态数据并更新数据库""" logger.info(f"处理状态数据: {status_file_path}") data_list = [] with open(status_file_path, "r", encoding="utf-8") as f: for line in f: data = json.loads(line) data_list.extend(data["entities"]) status_df = pd.DataFrame(data_list) # 处理错误状态 def get_error_status(row): if not pd.isna(row["autograder_errors"]): return f"error: {row['autograder_errors']}" return row["status"] status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) status_df = status_df[status_df["status"] != "pending"] status_df = status_df[["status", "notes"]].drop_duplicates() # 更新数据库 db_service = DatabaseService(self.config) status_list = status_df.to_dict(orient="records") for i, data in enumerate(status_list): bet_id = data["notes"] bet_status = data["status"] logger.info( f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}" ) for _ in range(3): try: db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status) break except Exception as e: logger.error(f"更新投注状态失败: {e}") class TaskScheduler: """定时任务调度器""" def __init__(self, config: AppConfig): self.config = config self.oddsjam_service = OddsjamService(config) self.data_processor = BetDataProcessor(config) def upload_bets_task(self, date_str: Optional[str] = None) -> None: """上传 T-1 时间的投注数据任务""" config_base_info = str(self.config) try: if date_str is None: date_str = ( datetime.datetime.now() - datetime.timedelta(days=1) ).strftime("%Y%m%d") logger.info(f"执行上传任务,日期: {date_str}") # 准备数据 data_info = self.data_processor.prepare_bet_data_for_upload(date_str) if not data_info: logger.warning(f"日期 {date_str} 没有数据需要上传") return # 上传到OddsJam self.oddsjam_service.upload_bets(self.config.bet_file_path) # 发送通知 self.config.dingtalk.send_text( f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}" ) except Exception as e: error_info = traceback.format_exc() logger.error(f"上传任务失败: {error_info}") self.config.dingtalk.send_text( f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}" ) def pull_status_task(self, date_str: Optional[str] = None) -> None: """拉取状态数据任务""" config_base_info = str(self.config) try: if date_str is None: date_str = datetime.datetime.now().strftime("%Y%m%d") logger.info(f"执行拉取任务,日期: {date_str}") # 构建状态文件路径 status_file_path = os.path.join( self.config.root_dir, "data", "bet_data", "bet_status", f"{self.config.email_account}_status_{date_str}.json", ) ensure_directory_exists(target_path=status_file_path, is_file=True) # 获取状态数据 self.oddsjam_service.get_all_bet_status(date_str, status_file_path) # 处理状态数据 self.data_processor.process_status_data(status_file_path) # 发送通知 self.config.dingtalk.send_text( f"{date_str}: 比赛状态更新完成\n{config_base_info}" ) except Exception as e: error_info = traceback.format_exc() logger.error(f"拉取任务失败: {error_info}") self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}") class BetTrackerApplication: """主应用类""" def __init__(self): self.config = AppConfig() self.scheduler = TaskScheduler(self.config) def run_scheduled_tasks(self) -> None: """运行定时任务""" import schedule logger.info(f"上传定时任务: {self.config.upload_schedule}") logger.info(f"拉取定时任务: {self.config.pull_schedule}") # 设置定时任务 schedule.every().day.at(self.config.pull_schedule).do( self.scheduler.pull_status_task ) schedule.every().day.at(self.config.upload_schedule).do( self.scheduler.upload_bets_task ) # 运行定时任务 while True: schedule.run_pending() time.sleep(0.05) def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None: """运行手动任务""" if task_type == "upload": self.scheduler.upload_bets_task(date_str) elif task_type == "pull": self.scheduler.pull_status_task(date_str) else: logger.error(f"未知的任务类型: {task_type}") def main(): """主入口函数""" app = BetTrackerApplication() # 检查命令行参数 if len(sys.argv) > 1: task_type = sys.argv[1] date_str = sys.argv[2] if len(sys.argv) > 2 else None app.run_manual_task(task_type, date_str) else: # 运行定时任务 app.run_scheduled_tasks() if __name__ == "__main__": main() # app = BetTrackerApplication() # logger.info(app.config) # app.run_manual_task(task_type="upload", date_str="20251026")