From aaf51c96ac0e8ed47bd1e578988f1eba94ea92ee Mon Sep 17 00:00:00 2001 From: aszerW Date: Sun, 26 Oct 2025 09:55:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- OddsjamBetTrackerRefactored.py | 547 +++++++++++++++++++++++++++++++++ config/__init__.py | 1 + 2 files changed, 548 insertions(+) create mode 100644 OddsjamBetTrackerRefactored.py create mode 100644 config/__init__.py diff --git a/OddsjamBetTrackerRefactored.py b/OddsjamBetTrackerRefactored.py new file mode 100644 index 0000000..fbff803 --- /dev/null +++ b/OddsjamBetTrackerRefactored.py @@ -0,0 +1,547 @@ +""" +OddsJam Bet Tracker - 重构版本 +按照Python最佳实践重构,支持环境变量配置,模块化设计 + +核心功能: +1. 定时从数据库拉取数据上传到OddsJam +2. 定时从OddsJam获取结果数据更新到数据库 +""" + +from email import message +import os +import sys +import re +import math +import time +import json +import datetime +import traceback +from typing import Optional, List, Dict, Tuple +from dataclasses import dataclass +from abc import ABC, abstractmethod + +import pandas as pd +from retry import retry +from playwright.sync_api import sync_playwright, Page, BrowserContext, expect +from loguru import logger + +# 导入项目模块 +from dao.Database import Database +from data_model import MysqlConfig, OddsjamOrderStatus +from common.dingtalk import DingTalkBot +from common.utils import ensure_directory_exists + + +@dataclass +class AppConfig: + """应用配置类 - 支持环境变量配置""" + + # 邮箱配置 + email_account: str = os.getenv("ODDSJAM_EMAIL", "aszer27937@gmail.com") + + # 代理配置 + http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890") + + # 数据库表名 + table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order_all") + + # 定时任务配置 + upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48") + pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00") + + # 文件路径配置 + root_dir: str = os.path.dirname(os.path.abspath(__file__)) + config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json") + + # 钉钉配置 + dingtalk_webhook: str = os.getenv( + "DINGTALK_WEBHOOK", + "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4", + ) + dingtalk_secret: str = os.getenv( + "DINGTALK_SECRET", + "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e", + ) + + # 浏览器配置 + headless: bool = os.getenv("HEADLESS", "true").lower() == "true" + headless = False + default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000")) + + def __post_init__(self): + """初始化后处理""" + self.mysql_config = MysqlConfig.parse_file(self.config_file_path) + self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret) + + # 构建文件路径 + self.login_state_path = os.path.join( + self.root_dir, + "data", + "bet_data", + "account_login_state", + f"{self.email_account}.json", + ) + self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv") + self.bet_name_map_path = os.path.join( + self.root_dir, "data", "bet_data", "betname_map.json" + ) + + +class DatabaseService: + """数据库服务类 - 负责所有数据库操作""" + + def __init__(self, config: AppConfig): + self.config = config + self.dao = Database(config.mysql_config) + + def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]: + """根据日期查询订单数据""" + sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'" + rows = self.dao.fetchall(query=sql) + return [OddsjamOrderStatus(row) for row in rows] if rows else [] + + @retry(tries=6) + def update_bet_status(self, bet_id: str, bet_status: str) -> None: + """更新投注状态""" + sql = f"update {self.config.table_name} set bet_status = %s where id = %s" + self.dao.execute(query=sql, args=(bet_status, bet_id)) + + +class OddsjamService: + """OddsJam业务服务类 - 负责与OddsJam网站的交互""" + + def __init__(self, config: AppConfig): + self.config = config + self.total_bet_cnt = 0 + + def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]: + """获取浏览器上下文""" + logger.info(f"加载cookies: {self.config.login_state_path}") + + browser = playwright.chromium.launch( + args=["--start-maximized"], + headless=self.config.headless, + proxy=( + {"server": self.config.http_proxy} if self.config.http_proxy else None + ), + ) + + context = browser.new_context( + storage_state=self.config.login_state_path, no_viewport=True + ) + page = context.new_page() + return page, browser + + def login_to_site(self) -> None: + """登录到OddsJam网站""" + logger.info(f"登录账户: {self.config.email_account}") + + with sync_playwright() as p: + browser = p.chromium.launch(headless=self.config.headless) + page = browser.new_page() + page.set_default_timeout(timeout=self.config.default_timeout) + + url = "https://oddsjam.com/bet-tracker" + page.goto(url) + + page.get_by_role("link", name="Login").click() + expect(page.get_by_role("button", name="Sign in")).to_be_visible( + timeout=self.config.default_timeout + ) + + page.pause() + time.sleep(10) + + try: + browser.contexts[0].storage_state(path=self.config.login_state_path) + logger.info(f"登录成功: {page.url}") + except Exception as ex: + logger.error(f"保存登录状态失败: {ex}") + traceback.print_exc() + page.screenshot(path="error.png") + + browser.close() + + @retry(tries=6) + def upload_bets(self, bet_file_path: str) -> None: + """上传投注数据到OddsJam""" + url = "https://oddsjam.com/bet-tracker" + + with sync_playwright() as p: + page, browser = self._get_browser_context(p) + page.set_default_timeout(timeout=self.config.default_timeout) + + def on_response(response): + if "dromo-user-imports-production" in response.url: + logger.info(f"上传响应状态: {response.status}") + if response.status == 400: + browser.close() + raise Exception(f"上传失败: {response.request.failure}") + elif "oddsjam.com/api/backend/bets/import" in response.url: + logger.info(f"导入响应状态: {response.status}") + + page.on("response", on_response) + page.goto(url) + + # 执行上传流程 + self._execute_upload_flow(page, bet_file_path) + + browser.close() + + def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None: + """执行上传流程""" + page.get_by_role("button", name="Import Bets").nth(1).click() + time.sleep(5) + + iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]') + + # 选择文件 + with page.expect_file_chooser() as fc_info: + iframe_locator.get_by_role( + "button", name="Choose a file", exact=True + ).click() + time.sleep(5) + + file_chooser = fc_info.value + file_chooser.set_files(bet_file_path) + + # 确认选择 + iframe_locator.get_by_role( + "button", name="Confirm selection and continue" + ).click() + time.sleep(5) + + # 确认匹配 + iframe_locator.get_by_role( + "button", name="Confirm matching and continue" + ).click() + time.sleep(5) + + # 继续 + iframe_locator.get_by_role("button", name="Continue").click() + + # 完成 + iframe_locator.get_by_role("button", name="Finish").click() + + # 处理可能的错误 + try: + iframe_locator.get_by_role("button", name="Submit anyway").click( + timeout=5000 + ) + except Exception: + pass + + iframe_locator.get_by_role("button", name="Yes").click() + + # 等待处理完成 + expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( + timeout=self.config.default_timeout + ) + + def get_all_bet_status(self, date_str: str, status_save_path: str) -> None: + """获取所有投注状态""" + logger.info(f"获取投注状态,日期: {date_str}") + + url = "https://oddsjam.com/bet-tracker" + + with sync_playwright() as p: + page, browser = self._get_browser_context(p) + + def on_response(response): + if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: + try: + if response.request.failure is None and response.status == 200: + data = response.json() + self.total_bet_cnt = data["totalCount"] + logger.info(f"总投注数量: {self.total_bet_cnt}") + + # 保存响应数据 + with open(status_save_path, "a", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False) + f.write("\n") + except Exception as e: + logger.error(f"处理响应数据失败: {e}") + + page.on("response", on_response) + page.set_default_timeout(timeout=self.config.default_timeout) + page.goto(url=url) + + # 处理弹窗 + try: + page.locator("#cello-widget-app").get_by_role("button").click( + timeout=6000 + ) + except Exception: + pass + + # 获取总页数并翻页 + self._navigate_all_pages(page) + + browser.close() + + def _navigate_all_pages(self, page: Page) -> None: + """导航所有页面""" + inner_text = page.get_by_text( + re.compile("Showing 1 to 50 of", re.IGNORECASE) + ).inner_text() + + match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) + if not match: + raise ValueError("无法获取总结果数") + + total_bet_cnt = int(match[1]) + total_page_no = math.ceil(total_bet_cnt / 50) + logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页") + + # 翻页到最后一页 + for page_no in range(2, total_page_no + 1): + if page_no <= total_page_no: + expect(page.get_by_role("button", name="Next")).to_be_visible( + timeout=self.config.default_timeout + ) + page.wait_for_timeout(timeout=3000) + page.get_by_role("button", name="Next").click() + logger.info(f"当前页码: {page_no} / {total_page_no}") + + +class BetDataProcessor: + """投注数据处理服务类""" + + def __init__(self, config: AppConfig): + self.config = config + + def prepare_bet_data_for_upload(self, date_str: str) -> str: + """准备上传的投注数据""" + logger.info(f"准备投注数据,日期: {date_str}") + + # 确保目录存在 + ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True) + + # 从数据库获取数据 + db_service = DatabaseService(self.config) + data_list = db_service.query_orders_by_date(date_str) + + if not data_list: + message = f"日期 {date_str} 没有找到投注数据" + logger.warning(message) + raise ValueError(message) + + # 转换为DataFrame + bet_df = pd.DataFrame([d.to_dict() for d in data_list]) + + # 列名映射 + col_map = { + "sportsbooks": "Sportsbook", + "bet_name": "Bet Name", + "market": "Market Name", + "price": "Odds", + "stake": "Stake", + "event_name": "Event Name", + "sport": "Sport", + "league": "League", + "game_id": "Game ID", + "bet_type": "Bet Type", + "bet_id": "Notes", + "start_timestamp": "Game Start Date", + } + + bet_df = bet_df.rename(columns=col_map) + + # 处理Sportsbook字段 + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) + + # 加载博彩公司名称映射 + with open(self.config.bet_name_map_path, "r") as f: + bet_name_map = json.load(f) + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( + lambda x: bet_name_map.get(x, x) + ) + + # 选择需要的列 + bet_df = bet_df[col_map.values()] + + # 格式化时间 + bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( + lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime( + "%Y/%m/%d %H:%M" + ) + ) + + # 保存到文件 + bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False) + + logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}") + logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}") + logger.info(f"投注数据形状: {bet_df.shape}") + + return ( + f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n" + f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n" + f"投注数据形状: {bet_df.shape}" + ) + + def process_status_data(self, status_file_path: str) -> None: + """处理状态数据并更新数据库""" + logger.info(f"处理状态数据: {status_file_path}") + + data_list = [] + with open(status_file_path, "r", encoding="utf-8") as f: + for line in f: + data = json.loads(line) + data_list.extend(data["entities"]) + + status_df = pd.DataFrame(data_list) + + # 处理错误状态 + def get_error_status(row): + if not pd.isna(row["autograder_errors"]): + return f"error: {row['autograder_errors']}" + return row["status"] + + status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) + status_df = status_df[status_df["status"] != "pending"] + status_df = status_df[["status", "notes"]].drop_duplicates() + + # 更新数据库 + db_service = DatabaseService(self.config) + status_list = status_df.to_dict(orient="records") + + for i, data in enumerate(status_list): + bet_id = data["notes"] + bet_status = data["status"] + logger.info( + f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}" + ) + + for _ in range(3): + try: + db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status) + break + except Exception as e: + logger.error(f"更新投注状态失败: {e}") + + +class TaskScheduler: + """定时任务调度器""" + + def __init__(self, config: AppConfig): + self.config = config + self.oddsjam_service = OddsjamService(config) + self.data_processor = BetDataProcessor(config) + + def upload_bets_task(self, date_str: Optional[str] = None) -> None: + """上传投注数据任务""" + try: + if date_str is None: + date_str = ( + datetime.datetime.now() - datetime.timedelta(days=1) + ).strftime("%Y%m%d") + + logger.info(f"执行上传任务,日期: {date_str}") + + # 准备数据 + data_info = self.data_processor.prepare_bet_data_for_upload(date_str) + + if not data_info: + logger.warning(f"日期 {date_str} 没有数据需要上传") + return + + # 上传到OddsJam + self.oddsjam_service.upload_bets(self.config.bet_file_path) + + # 发送通知 + self.config.dingtalk.send_text(f"{date_str}: \n {data_info}") + + except Exception as e: + error_info = traceback.format_exc() + logger.error(f"上传任务失败: {error_info}") + self.config.dingtalk.send_text( + f"{date_str}: 上传比赛失败: {e}\n{error_info}" + ) + + def pull_status_task(self, date_str: Optional[str] = None) -> None: + """拉取状态数据任务""" + try: + if date_str is None: + date_str = datetime.datetime.now().strftime("%Y%m%d") + + logger.info(f"执行拉取任务,日期: {date_str}") + + # 构建状态文件路径 + status_file_path = os.path.join( + self.config.root_dir, + "data", + "bet_data", + "bet_status", + f"{self.config.email_account}_status_{date_str}.json", + ) + ensure_directory_exists(target_path=status_file_path, is_file=True) + + # 获取状态数据 + self.oddsjam_service.get_all_bet_status(date_str, status_file_path) + + # 处理状态数据 + self.data_processor.process_status_data(status_file_path) + + # 发送通知 + self.config.dingtalk.send_text(f"{date_str}: 比赛状态更新完成") + + except Exception as e: + error_info = traceback.format_exc() + logger.error(f"拉取任务失败: {error_info}") + self.config.dingtalk.send_text(error_info) + + +class BetTrackerApplication: + """主应用类""" + + def __init__(self): + self.config = AppConfig() + self.scheduler = TaskScheduler(self.config) + + def run_scheduled_tasks(self) -> None: + """运行定时任务""" + import schedule + + logger.info(f"上传定时任务: {self.config.upload_schedule}") + logger.info(f"拉取定时任务: {self.config.pull_schedule}") + + # 设置定时任务 + schedule.every().day.at(self.config.pull_schedule).do( + self.scheduler.pull_status_task + ) + schedule.every().day.at(self.config.upload_schedule).do( + self.scheduler.upload_bets_task + ) + + # 运行定时任务 + while True: + schedule.run_pending() + time.sleep(0.05) + + def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None: + """运行手动任务""" + if task_type == "upload": + self.scheduler.upload_bets_task(date_str) + elif task_type == "pull": + self.scheduler.pull_status_task(date_str) + else: + logger.error(f"未知的任务类型: {task_type}") + + +def main(): + """主入口函数""" + app = BetTrackerApplication() + + # 检查命令行参数 + if len(sys.argv) > 1: + task_type = sys.argv[1] + date_str = sys.argv[2] if len(sys.argv) > 2 else None + app.run_manual_task(task_type, date_str) + else: + # 运行定时任务 + app.run_scheduled_tasks() + + +if __name__ == "__main__": + main() diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..bf3e2e8 --- /dev/null +++ b/config/__init__.py @@ -0,0 +1 @@ +# 配置模块