From c6f40b218e1f622ea7ec44802fc3a60f550f958d Mon Sep 17 00:00:00 2001 From: aszerW Date: Sun, 26 Oct 2025 12:19:51 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- OddsjamBetTracker.py | 763 +++++++++++++++++++-------------- OddsjamBetTrackerRefactored.py | 560 ------------------------ 2 files changed, 450 insertions(+), 873 deletions(-) delete mode 100644 OddsjamBetTrackerRefactored.py diff --git a/OddsjamBetTracker.py b/OddsjamBetTracker.py index ef579af..c14aa2b 100644 --- a/OddsjamBetTracker.py +++ b/OddsjamBetTracker.py @@ -1,423 +1,560 @@ -import http -import sys +""" +OddsJam Bet Tracker - 重构版本 +按照Python最佳实践重构,支持环境变量配置,模块化设计 + +核心功能: +1. 定时从数据库拉取数据上传到OddsJam +2. 定时从OddsJam获取结果数据更新到数据库 +""" + +from email import message import os +import sys import re import math import time import json import datetime import traceback +from typing import Optional, List, Dict, Tuple +from dataclasses import dataclass +from abc import ABC, abstractmethod + import pandas as pd from retry import retry -from common.utils import ensure_directory_exists - -from dao.Database import Database from playwright.sync_api import sync_playwright, Page, BrowserContext, expect from loguru import logger + +# 导入项目模块 +from dao.Database import Database from data_model import MysqlConfig, OddsjamOrderStatus from common.dingtalk import DingTalkBot - -webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4" -secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e" - -dingtalk = DingTalkBot(webhook, secret) +from common.utils import ensure_directory_exists -root_dir = os.path.dirname(os.path.abspath(__file__)) +@dataclass +class AppConfig: + """应用配置类 - 支持环境变量配置""" -config_file_path = "./config/mysql_config.json" -mysql_config = MysqlConfig.parse_file(config_file_path) -dao = Database(mysql_config) + # 邮箱配置 + email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me") + + # 代理配置 + http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890") + + # 数据库表名 + table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order") + + # 定时任务配置 + upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48") + pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00") + + # 文件路径配置 + root_dir: str = os.path.dirname(os.path.abspath(__file__)) + config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json") + + # 钉钉配置 + dingtalk_webhook: str = os.getenv( + "DINGTALK_WEBHOOK", + "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4", + ) + dingtalk_secret: str = os.getenv( + "DINGTALK_SECRET", + "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e", + ) + + # 浏览器配置 + headless: bool = os.getenv("HEADLESS", "true").lower() == "true" + default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000")) + + def __str__(self) -> str: + # 返回邮箱和数据表名的信息 + return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})" + + def __post_init__(self): + """初始化后处理""" + self.mysql_config = MysqlConfig.parse_file(self.config_file_path) + self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret) + + # 构建文件路径 + self.login_state_path = os.path.join( + self.root_dir, + "data", + "bet_data", + "account_login_state", + f"{self.email_account}.json", + ) + self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv") + self.bet_name_map_path = os.path.join( + self.root_dir, "data", "bet_data", "betname_map.json" + ) -def query_by_create_time(ds: str): - sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'" - rows = dao.fetchall(query=sql) - if rows: - return [OddsjamOrderStatus(row) for row in rows] - else: - return [] +class DatabaseService: + """数据库服务类 - 负责所有数据库操作""" -@retry(tries=6) -def update_bet_status(id: str, bet_status: str): - sql = "update oddsjam_order_all set bet_status = %s where id = %s" - dao.execute(query=sql, args=(bet_status, id)) + def __init__(self, config: AppConfig): + self.config = config + self.dao = Database(config.mysql_config) + + def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]: + """根据日期查询订单数据""" + sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'" + rows = self.dao.fetchall(query=sql) + return [OddsjamOrderStatus(row) for row in rows] if rows else [] + + @retry(tries=6) + def update_bet_status(self, bet_id: str, bet_status: str) -> None: + """更新投注状态""" + sql = f"update {self.config.table_name} set bet_status = %s where id = %s" + self.dao.execute(query=sql, args=(bet_status, bet_id)) -class SyncOddsjamBetTracker: - def __init__( - self, - login_state_path: str, - intercept_response_res_save_path: str, - headless: bool = False, - default_time_out: float = 6000000, - ): - self.login_state_path = login_state_path - self.headless = headless - self.default_time_out = default_time_out - self.intercept_response_res_save_path = intercept_response_res_save_path +class OddsjamService: + """OddsJam业务服务类 - 负责与OddsJam网站的交互""" + + def __init__(self, config: AppConfig): + self.config = config self.total_bet_cnt = 0 - def login_oddsjam_cookies(self, p, headless=False) -> tuple: - # 获取HTTP_PROXY环境变量,默认为None - import os + def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]: + """获取浏览器上下文""" + logger.info(f"加载cookies: {self.config.login_state_path}") - http_proxy = os.environ.get("HTTP_PROXY", None) - if http_proxy is None: - http_proxy = "http://127.0.0.1:7890" - logger.info("加载cookies {}", self.login_state_path) - browser = p.chromium.launch( + browser = playwright.chromium.launch( args=["--start-maximized"], - headless=headless, - proxy={ - "server": http_proxy, - }, + headless=self.config.headless, + proxy=( + {"server": self.config.http_proxy} if self.config.http_proxy else None + ), ) + context = browser.new_context( - storage_state=self.login_state_path, no_viewport=True + storage_state=self.config.login_state_path, no_viewport=True ) page = context.new_page() return page, browser - def login_to_site(self): - logger.info(f"login account: {self.email_account}") + def login_to_site(self) -> None: + """登录到OddsJam网站""" + logger.info(f"登录账户: {self.config.email_account}") + with sync_playwright() as p: - browser = p.chromium.launch(headless=self.headless) + browser = p.chromium.launch(headless=self.config.headless) page = browser.new_page() - page.set_default_timeout(timeout=self.default_time_out) + page.set_default_timeout(timeout=self.config.default_timeout) + url = "https://oddsjam.com/bet-tracker" page.goto(url) - # page.get_by_label("Close Modal").click() - page.get_by_role("link", name="Login").click() expect(page.get_by_role("button", name="Sign in")).to_be_visible( - timeout=self.default_time_out + timeout=self.config.default_timeout ) + page.pause() time.sleep(10) + try: - browser.contexts[0].storage_state(path=self.login_state_path) + browser.contexts[0].storage_state(path=self.config.login_state_path) + logger.info(f"登录成功: {page.url}") except Exception as ex: + logger.error(f"保存登录状态失败: {ex}") traceback.print_exc() page.screenshot(path="error.png") - logger.info(page.url, "login_success") + browser.close() @retry(tries=6) - def intercept_import_response(self, route, request): - if "oddsjam.com/api/backend/bets/import" in request.url: - response = route.fetch(timeout=50000) - logger.info(response.status) - if response.status == 500: - route.fulfill(response=response, json={}) - return - route.continue_() - - @retry(tries=6) - def upload_new_bets(self, bet_file_path: str): + def upload_bets(self, bet_file_path: str) -> None: + """上传投注数据到OddsJam""" url = "https://oddsjam.com/bet-tracker" + with sync_playwright() as p: - page: Page - browser: BrowserContext - page, browser = self.login_oddsjam_cookies(p, headless=self.headless) - page.set_default_timeout(timeout=self.default_time_out) + page, browser = self._get_browser_context(p) + page.set_default_timeout(timeout=self.config.default_timeout) def on_response(response): if "dromo-user-imports-production" in response.url: - print(response.status, response.request.failure) + logger.info(f"上传响应状态: {response.status}") if response.status == 400: browser.close() - raise Exception(response.request.failure) + raise Exception(f"上传失败: {response.request.failure}") elif "oddsjam.com/api/backend/bets/import" in response.url: - logger.info(response.status) + logger.info(f"导入响应状态: {response.status}") page.on("response", on_response) page.goto(url) - # page.pause() - page.get_by_role("button", name="Import Bets").nth(1).click() - time.sleep(5) - iframe_locator = page.frame_locator( - 'iframe[title="Dromo Importer\\: Bets"]' - ) - with page.expect_file_chooser() as fc_info: - iframe_locator.get_by_role( - "button", name="Choose a file", exact=True - ).click() - time.sleep(5) - file_chooser = fc_info.value - file_chooser.set_files(bet_file_path) - iframe_locator.get_by_role( - "button", name="Confirm selection and continue" - ).click() - time.sleep(5) - iframe_locator.get_by_role( - "button", name="Confirm matching and continue" - ).click() - time.sleep(5) - iframe_locator.get_by_role("button", name="Continue").click() - iframe_locator.get_by_role("button", name="Finish").click() - try: - iframe_locator.get_by_role("button", name="Submit anyway").click( - timeout=5000 - ) - except Exception as ex: - ... - # print(ex) - iframe_locator.get_by_role("button", name="Yes").click() - expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( - timeout=self.default_time_out - ) - # os.remove(bet_file_path) - # page.pause() + # 执行上传流程 + self._execute_upload_flow(page, bet_file_path) + browser.close() - def get_all_bet_status(self, ds: str = None): - if not ds: - ds = datetime.datetime.now().strftime("%Y%m%d") - day = datetime.datetime.strptime(ds, "%Y%m%d").day - logger.info(f"current date: {ds}") - logger.info(f"current day: {day}") + def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None: + """执行上传流程""" + page.get_by_role("button", name="Import Bets").nth(1).click() + time.sleep(5) + + iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]') + + # 选择文件 + with page.expect_file_chooser() as fc_info: + iframe_locator.get_by_role( + "button", name="Choose a file", exact=True + ).click() + time.sleep(5) + + file_chooser = fc_info.value + file_chooser.set_files(bet_file_path) + + # 确认选择 + iframe_locator.get_by_role( + "button", name="Confirm selection and continue" + ).click() + time.sleep(5) + + # 确认匹配 + iframe_locator.get_by_role( + "button", name="Confirm matching and continue" + ).click() + time.sleep(5) + + # 继续 + iframe_locator.get_by_role("button", name="Continue").click() + + # 完成 + iframe_locator.get_by_role("button", name="Finish").click() + + # 处理可能的错误 + try: + iframe_locator.get_by_role("button", name="Submit anyway").click( + timeout=5000 + ) + except Exception: + pass + + iframe_locator.get_by_role("button", name="Yes").click() + + # 等待处理完成 + expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( + timeout=self.config.default_timeout + ) + + def get_all_bet_status(self, date_str: str, status_save_path: str) -> None: + """获取所有投注状态""" + logger.info(f"获取投注状态,日期: {date_str}") + url = "https://oddsjam.com/bet-tracker" + with sync_playwright() as p: - page: Page - browser: BrowserContext - page, browser = self.login_oddsjam_cookies(p, headless=self.headless) + page, browser = self._get_browser_context(p) def on_response(response): if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: try: - # 确保响应已完成 if response.request.failure is None and response.status == 200: - data = response.json() # 注意: sync_api 中 .json() 是同步的 + data = response.json() self.total_bet_cnt = data["totalCount"] - logger.info(f"total bet count: {self.total_bet_cnt}") + logger.info(f"总投注数量: {self.total_bet_cnt}") - # 保存数据 - with open( - self.intercept_response_res_save_path, - "a", - encoding="utf-8", - ) as f: + # 保存响应数据 + with open(status_save_path, "a", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) f.write("\n") except Exception as e: - logger.error(f"Error processing response: {e}") + logger.error(f"处理响应数据失败: {e}") - # page.route("**", self.intercept_response) page.on("response", on_response) - page.set_default_timeout(timeout=self.default_time_out) + page.set_default_timeout(timeout=self.config.default_timeout) page.goto(url=url) + + # 处理弹窗 try: page.locator("#cello-widget-app").get_by_role("button").click( timeout=6000 ) - except: - ... + except Exception: + pass - # page.locator(".mt-4 > div > .inline-flex").first.click() - # time.sleep(5) - # page.get_by_label("Clear").click() - # time.sleep(5) - # page.get_by_role("button", name="Date Range").click() - # time.sleep(5) - # page.get_by_text("Custom", exact=True).click() - # time.sleep(5) - # page.get_by_role("button", name=f"{day}").click() - # time.sleep(5) - # page.get_by_role( - # "button", name=re.compile(r"Show (\d+) Results", re.IGNORECASE) - # ).click() + # 获取总页数并翻页 + self._navigate_all_pages(page) - inner_text = page.get_by_text( - re.compile("Showing 1 to 50 of", re.IGNORECASE) - ).inner_text() - match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) - total_bet_cnt = int(match[1]) - total_page_no = math.ceil(total_bet_cnt / 50) - logger.info(f"total {total_bet_cnt} results, {total_page_no} pages") - page.pause() - for page_no in range(2, total_page_no): + browser.close() + + def _navigate_all_pages(self, page: Page) -> None: + """导航所有页面""" + inner_text = page.get_by_text( + re.compile("Showing 1 to 50 of", re.IGNORECASE) + ).inner_text() + + match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) + if not match: + raise ValueError("无法获取总结果数") + + total_bet_cnt = int(match[1]) + total_page_no = math.ceil(total_bet_cnt / 50) + logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页") + + # 翻页到最后一页 + for page_no in range(2, total_page_no): + if page_no <= total_page_no: expect(page.get_by_role("button", name="Next")).to_be_visible( - timeout=self.default_time_out + timeout=self.config.default_timeout ) page.wait_for_timeout(timeout=3000) page.get_by_role("button", name="Next").click() - logger.info(f"current page number: {page_no} / {total_page_no}") + logger.info(f"当前页码: {page_no} / {total_page_no}") - page.get_by_role("button", name=f"{total_page_no}", exact=True).click() - expect(page.get_by_role("button", name="Next")).to_be_visible( - timeout=self.default_time_out + +class BetDataProcessor: + """投注数据处理服务类""" + + def __init__(self, config: AppConfig): + self.config = config + + def prepare_bet_data_for_upload(self, date_str: str) -> str: + """准备上传的投注数据""" + logger.info(f"准备投注数据,日期: {date_str}") + + # 确保目录存在 + ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True) + + # 从数据库获取数据 + db_service = DatabaseService(self.config) + data_list = db_service.query_orders_by_date(date_str) + + if not data_list: + message = f"日期 {date_str} 没有找到投注数据" + logger.warning(message) + raise ValueError(message) + + # 转换为DataFrame + bet_df = pd.DataFrame([d.to_dict() for d in data_list]) + + # 列名映射 + col_map = { + "sportsbooks": "Sportsbook", + "bet_name": "Bet Name", + "market": "Market Name", + "price": "Odds", + "stake": "Stake", + "event_name": "Event Name", + "sport": "Sport", + "league": "League", + "game_id": "Game ID", + "bet_type": "Bet Type", + "bet_id": "Notes", + "start_timestamp": "Game Start Date", + } + + bet_df = bet_df.rename(columns=col_map) + + # 处理Sportsbook字段 + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) + + # 加载博彩公司名称映射 + with open(self.config.bet_name_map_path, "r") as f: + bet_name_map = json.load(f) + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( + lambda x: bet_name_map.get(x, x) ) - # page.pause() - browser.close() + # 选择需要的列 + bet_df = bet_df[col_map.values()] -def get_tomorrow_over_bet_order_data(save_file_path: str, ds): - logger.info(f"load bet data from db: {ds}") - data_list = query_by_create_time(ds=ds) - data_list = [d.to_dict() for d in data_list] - bet_df = pd.DataFrame(data_list) - bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) - - col_map = { - "sportsbooks": "Sportsbook", - "bet_name": "Bet Name", - "market": "Market Name", - "price": "Odds", - "stake": "Stake", - "event_name": "Event Name", - "sport": "Sport", - "league": "League", - "game_id": "Game ID", - "bet_type": "Bet Type", - "bet_id": "Notes", - "start_timestamp": "Game Start Date", - } - - bet_df = bet_df.rename(columns=col_map) - bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) - with open("./data/bet_data/betname_map.json", "r") as f: - bet_name_map = json.load(f) - bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( - lambda x: bet_name_map.get(x, x) + # 格式化时间 + bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( + lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime( + "%Y/%m/%d %H:%M" + ) ) - # bet_df = bet_df.explode("Sportsbook") - bet_df = bet_df[col_map.values()] - bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( - lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M") - ) - logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}") - logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}") - logger.info(bet_df.shape) - bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) - info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n" - info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n" - info_message += f"bet order shape: {bet_df.shape}" - return info_message + # 保存到文件 + bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False) + logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}") + logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}") + logger.info(f"投注数据形状: {bet_df.shape}") -def update_db_order_status(status_file_path: str): - logger.info(status_file_path) - data_list = [] - with open(status_file_path, "r", encoding="utf-8") as f: - for line in f: - data = json.loads(line) - data_list.extend(data["entities"]) - status_df = pd.DataFrame(data_list) - - def get_error_status(row): - if not pd.isna(row["autograder_errors"]): - return f"error: {row['autograder_errors']}" - return row["status"] - - status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) - status_df = status_df[status_df["status"] != "pending"] - status_df = status_df[["status", "notes"]].drop_duplicates() - status_list = status_df.to_dict(orient="records") - for i, data in enumerate(status_list): - bet_id = data["notes"] - bet_status = data["status"] - logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}") - for _ in range(3): - try: - update_bet_status(id=bet_id, bet_status=bet_status) - break - except Exception as e: - logger.error(f"update bet status error: {e}") - - -def pull_data_from_oddsjam_update(ds: str = None): - try: - if ds is None: - ds = datetime.datetime.now().strftime("%Y%m%d") - logger.info(f"current date: {ds}") - ds_date = datetime.datetime.strptime(ds, "%Y%m%d") - pre_date = ds_date - datetime.timedelta(days=0) - pre_ds = pre_date.strftime("%Y%m%d") - - oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds) - # oddsjam_bet_tracker.login_to_site() - oddsjam_bet_tracker.get_all_bet_status(ds) - intercept_response_res_save_path = ( - oddsjam_bet_tracker.intercept_response_res_save_path + return ( + f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n" + f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n" + f"投注数据形状: {bet_df.shape}" ) - update_db_order_status(status_file_path=intercept_response_res_save_path) - dingtalk.send_text(f"{ds}: 比赛状态更新完成") - except Exception as e: - error_info = traceback.format_exc() - logger.error(error_info) - dingtalk.send_text(error_info) + + def process_status_data(self, status_file_path: str) -> None: + """处理状态数据并更新数据库""" + logger.info(f"处理状态数据: {status_file_path}") + + data_list = [] + with open(status_file_path, "r", encoding="utf-8") as f: + for line in f: + data = json.loads(line) + data_list.extend(data["entities"]) + + status_df = pd.DataFrame(data_list) + + # 处理错误状态 + def get_error_status(row): + if not pd.isna(row["autograder_errors"]): + return f"error: {row['autograder_errors']}" + return row["status"] + + status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) + status_df = status_df[status_df["status"] != "pending"] + status_df = status_df[["status", "notes"]].drop_duplicates() + + # 更新数据库 + db_service = DatabaseService(self.config) + status_list = status_df.to_dict(orient="records") + + for i, data in enumerate(status_list): + bet_id = data["notes"] + bet_status = data["status"] + logger.info( + f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}" + ) + + for _ in range(3): + try: + db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status) + break + except Exception as e: + logger.error(f"更新投注状态失败: {e}") -def upload_new_bets_data2oddsjam(ds: str = None): - try: - if ds is None: - ds = datetime.datetime.now() - datetime.timedelta(days=1) - ds = ds.strftime("%Y%m%d") - logger.info(f"current date: {ds}") - oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds) +class TaskScheduler: + """定时任务调度器""" - # oddsjam_bet_tracker.login_to_site() - bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv") - ensure_directory_exists(target_path=bet_file_path, is_file=True) - data_info = get_tomorrow_over_bet_order_data( - save_file_path=bet_file_path, ds=ds + def __init__(self, config: AppConfig): + self.config = config + self.oddsjam_service = OddsjamService(config) + self.data_processor = BetDataProcessor(config) + + def upload_bets_task(self, date_str: Optional[str] = None) -> None: + """上传 T-1 时间的投注数据任务""" + config_base_info = str(self.config) + try: + if date_str is None: + date_str = ( + datetime.datetime.now() - datetime.timedelta(days=1) + ).strftime("%Y%m%d") + + logger.info(f"执行上传任务,日期: {date_str}") + + # 准备数据 + data_info = self.data_processor.prepare_bet_data_for_upload(date_str) + + if not data_info: + logger.warning(f"日期 {date_str} 没有数据需要上传") + return + + # 上传到OddsJam + self.oddsjam_service.upload_bets(self.config.bet_file_path) + + # 发送通知 + self.config.dingtalk.send_text( + f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}" + ) + + except Exception as e: + error_info = traceback.format_exc() + logger.error(f"上传任务失败: {error_info}") + self.config.dingtalk.send_text( + f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}" + ) + + def pull_status_task(self, date_str: Optional[str] = None) -> None: + """拉取状态数据任务""" + config_base_info = str(self.config) + try: + if date_str is None: + date_str = datetime.datetime.now().strftime("%Y%m%d") + + logger.info(f"执行拉取任务,日期: {date_str}") + + # 构建状态文件路径 + status_file_path = os.path.join( + self.config.root_dir, + "data", + "bet_data", + "bet_status", + f"{self.config.email_account}_status_{date_str}.json", + ) + ensure_directory_exists(target_path=status_file_path, is_file=True) + + # 获取状态数据 + self.oddsjam_service.get_all_bet_status(date_str, status_file_path) + + # 处理状态数据 + self.data_processor.process_status_data(status_file_path) + + # 发送通知 + self.config.dingtalk.send_text( + f"{date_str}: 比赛状态更新完成\n{config_base_info}" + ) + + except Exception as e: + error_info = traceback.format_exc() + logger.error(f"拉取任务失败: {error_info}") + self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}") + + +class BetTrackerApplication: + """主应用类""" + + def __init__(self): + self.config = AppConfig() + self.scheduler = TaskScheduler(self.config) + + def run_scheduled_tasks(self) -> None: + """运行定时任务""" + import schedule + + logger.info(f"上传定时任务: {self.config.upload_schedule}") + logger.info(f"拉取定时任务: {self.config.pull_schedule}") + + # 设置定时任务 + schedule.every().day.at(self.config.pull_schedule).do( + self.scheduler.pull_status_task + ) + schedule.every().day.at(self.config.upload_schedule).do( + self.scheduler.upload_bets_task ) - oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path) - dingtalk.send_text(f"{ds}: \n {data_info}") - except Exception as e: - error_info = traceback.format_exc() - dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}") + # 运行定时任务 + while True: + schedule.run_pending() + time.sleep(0.05) -def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker: - email_account = "aszer27937@gmail.com" - login_state_save_path = os.path.join( - root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json" - ) - ensure_directory_exists(target_path=login_state_save_path, is_file=True) - intercept_response_res_save_path = os.path.join( - root_dir, - "data", - "bet_data", - "bet_status", - f"{email_account}_status_{ds}.json", - ) - ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True) - oddsjam_bet_tracker = SyncOddsjamBetTracker( - login_state_path=login_state_save_path, - intercept_response_res_save_path=intercept_response_res_save_path, - headless=True - ) - return oddsjam_bet_tracker + def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None: + """运行手动任务""" + if task_type == "upload": + self.scheduler.upload_bets_task(date_str) + elif task_type == "pull": + self.scheduler.pull_status_task(date_str) + else: + logger.error(f"未知的任务类型: {task_type}") def main(): - ds = datetime.datetime.now() - datetime.timedelta(days=1) - ds = ds.strftime("%Y%m%d") - logger.info(f"current date: {ds}") + """主入口函数""" + app = BetTrackerApplication() - pull_data_from_oddsjam_update(ds=ds) - # upload_new_bets_data2oddsjam(ds=ds) + # 检查命令行参数 + if len(sys.argv) > 1: + task_type = sys.argv[1] + date_str = sys.argv[2] if len(sys.argv) > 2 else None + app.run_manual_task(task_type, date_str) + else: + # 运行定时任务 + app.run_scheduled_tasks() if __name__ == "__main__": - import schedule + main() - # schedule.every().day.at("07:00").do(clear_order_from_oddsjam) - UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "10:00") - PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00") - logger.info(f"upload schedule: {UPLOAD_SCHEDULE}") - logger.info(f"pull schedule: {PULL_SCHEDULE}") - - - schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update) - schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam) - - while True: - schedule.run_pending() - time.sleep(0.05) - # main() + # app = BetTrackerApplication() + # logger.info(app.config) + # app.run_manual_task(task_type="upload", date_str="20251026") diff --git a/OddsjamBetTrackerRefactored.py b/OddsjamBetTrackerRefactored.py deleted file mode 100644 index c14aa2b..0000000 --- a/OddsjamBetTrackerRefactored.py +++ /dev/null @@ -1,560 +0,0 @@ -""" -OddsJam Bet Tracker - 重构版本 -按照Python最佳实践重构,支持环境变量配置,模块化设计 - -核心功能: -1. 定时从数据库拉取数据上传到OddsJam -2. 定时从OddsJam获取结果数据更新到数据库 -""" - -from email import message -import os -import sys -import re -import math -import time -import json -import datetime -import traceback -from typing import Optional, List, Dict, Tuple -from dataclasses import dataclass -from abc import ABC, abstractmethod - -import pandas as pd -from retry import retry -from playwright.sync_api import sync_playwright, Page, BrowserContext, expect -from loguru import logger - -# 导入项目模块 -from dao.Database import Database -from data_model import MysqlConfig, OddsjamOrderStatus -from common.dingtalk import DingTalkBot -from common.utils import ensure_directory_exists - - -@dataclass -class AppConfig: - """应用配置类 - 支持环境变量配置""" - - # 邮箱配置 - email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me") - - # 代理配置 - http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890") - - # 数据库表名 - table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order") - - # 定时任务配置 - upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48") - pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00") - - # 文件路径配置 - root_dir: str = os.path.dirname(os.path.abspath(__file__)) - config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json") - - # 钉钉配置 - dingtalk_webhook: str = os.getenv( - "DINGTALK_WEBHOOK", - "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4", - ) - dingtalk_secret: str = os.getenv( - "DINGTALK_SECRET", - "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e", - ) - - # 浏览器配置 - headless: bool = os.getenv("HEADLESS", "true").lower() == "true" - default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000")) - - def __str__(self) -> str: - # 返回邮箱和数据表名的信息 - return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})" - - def __post_init__(self): - """初始化后处理""" - self.mysql_config = MysqlConfig.parse_file(self.config_file_path) - self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret) - - # 构建文件路径 - self.login_state_path = os.path.join( - self.root_dir, - "data", - "bet_data", - "account_login_state", - f"{self.email_account}.json", - ) - self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv") - self.bet_name_map_path = os.path.join( - self.root_dir, "data", "bet_data", "betname_map.json" - ) - - -class DatabaseService: - """数据库服务类 - 负责所有数据库操作""" - - def __init__(self, config: AppConfig): - self.config = config - self.dao = Database(config.mysql_config) - - def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]: - """根据日期查询订单数据""" - sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'" - rows = self.dao.fetchall(query=sql) - return [OddsjamOrderStatus(row) for row in rows] if rows else [] - - @retry(tries=6) - def update_bet_status(self, bet_id: str, bet_status: str) -> None: - """更新投注状态""" - sql = f"update {self.config.table_name} set bet_status = %s where id = %s" - self.dao.execute(query=sql, args=(bet_status, bet_id)) - - -class OddsjamService: - """OddsJam业务服务类 - 负责与OddsJam网站的交互""" - - def __init__(self, config: AppConfig): - self.config = config - self.total_bet_cnt = 0 - - def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]: - """获取浏览器上下文""" - logger.info(f"加载cookies: {self.config.login_state_path}") - - browser = playwright.chromium.launch( - args=["--start-maximized"], - headless=self.config.headless, - proxy=( - {"server": self.config.http_proxy} if self.config.http_proxy else None - ), - ) - - context = browser.new_context( - storage_state=self.config.login_state_path, no_viewport=True - ) - page = context.new_page() - return page, browser - - def login_to_site(self) -> None: - """登录到OddsJam网站""" - logger.info(f"登录账户: {self.config.email_account}") - - with sync_playwright() as p: - browser = p.chromium.launch(headless=self.config.headless) - page = browser.new_page() - page.set_default_timeout(timeout=self.config.default_timeout) - - url = "https://oddsjam.com/bet-tracker" - page.goto(url) - - page.get_by_role("link", name="Login").click() - expect(page.get_by_role("button", name="Sign in")).to_be_visible( - timeout=self.config.default_timeout - ) - - page.pause() - time.sleep(10) - - try: - browser.contexts[0].storage_state(path=self.config.login_state_path) - logger.info(f"登录成功: {page.url}") - except Exception as ex: - logger.error(f"保存登录状态失败: {ex}") - traceback.print_exc() - page.screenshot(path="error.png") - - browser.close() - - @retry(tries=6) - def upload_bets(self, bet_file_path: str) -> None: - """上传投注数据到OddsJam""" - url = "https://oddsjam.com/bet-tracker" - - with sync_playwright() as p: - page, browser = self._get_browser_context(p) - page.set_default_timeout(timeout=self.config.default_timeout) - - def on_response(response): - if "dromo-user-imports-production" in response.url: - logger.info(f"上传响应状态: {response.status}") - if response.status == 400: - browser.close() - raise Exception(f"上传失败: {response.request.failure}") - elif "oddsjam.com/api/backend/bets/import" in response.url: - logger.info(f"导入响应状态: {response.status}") - - page.on("response", on_response) - page.goto(url) - - # 执行上传流程 - self._execute_upload_flow(page, bet_file_path) - - browser.close() - - def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None: - """执行上传流程""" - page.get_by_role("button", name="Import Bets").nth(1).click() - time.sleep(5) - - iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]') - - # 选择文件 - with page.expect_file_chooser() as fc_info: - iframe_locator.get_by_role( - "button", name="Choose a file", exact=True - ).click() - time.sleep(5) - - file_chooser = fc_info.value - file_chooser.set_files(bet_file_path) - - # 确认选择 - iframe_locator.get_by_role( - "button", name="Confirm selection and continue" - ).click() - time.sleep(5) - - # 确认匹配 - iframe_locator.get_by_role( - "button", name="Confirm matching and continue" - ).click() - time.sleep(5) - - # 继续 - iframe_locator.get_by_role("button", name="Continue").click() - - # 完成 - iframe_locator.get_by_role("button", name="Finish").click() - - # 处理可能的错误 - try: - iframe_locator.get_by_role("button", name="Submit anyway").click( - timeout=5000 - ) - except Exception: - pass - - iframe_locator.get_by_role("button", name="Yes").click() - - # 等待处理完成 - expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( - timeout=self.config.default_timeout - ) - - def get_all_bet_status(self, date_str: str, status_save_path: str) -> None: - """获取所有投注状态""" - logger.info(f"获取投注状态,日期: {date_str}") - - url = "https://oddsjam.com/bet-tracker" - - with sync_playwright() as p: - page, browser = self._get_browser_context(p) - - def on_response(response): - if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: - try: - if response.request.failure is None and response.status == 200: - data = response.json() - self.total_bet_cnt = data["totalCount"] - logger.info(f"总投注数量: {self.total_bet_cnt}") - - # 保存响应数据 - with open(status_save_path, "a", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False) - f.write("\n") - except Exception as e: - logger.error(f"处理响应数据失败: {e}") - - page.on("response", on_response) - page.set_default_timeout(timeout=self.config.default_timeout) - page.goto(url=url) - - # 处理弹窗 - try: - page.locator("#cello-widget-app").get_by_role("button").click( - timeout=6000 - ) - except Exception: - pass - - # 获取总页数并翻页 - self._navigate_all_pages(page) - - browser.close() - - def _navigate_all_pages(self, page: Page) -> None: - """导航所有页面""" - inner_text = page.get_by_text( - re.compile("Showing 1 to 50 of", re.IGNORECASE) - ).inner_text() - - match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) - if not match: - raise ValueError("无法获取总结果数") - - total_bet_cnt = int(match[1]) - total_page_no = math.ceil(total_bet_cnt / 50) - logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页") - - # 翻页到最后一页 - for page_no in range(2, total_page_no): - if page_no <= total_page_no: - expect(page.get_by_role("button", name="Next")).to_be_visible( - timeout=self.config.default_timeout - ) - page.wait_for_timeout(timeout=3000) - page.get_by_role("button", name="Next").click() - logger.info(f"当前页码: {page_no} / {total_page_no}") - - -class BetDataProcessor: - """投注数据处理服务类""" - - def __init__(self, config: AppConfig): - self.config = config - - def prepare_bet_data_for_upload(self, date_str: str) -> str: - """准备上传的投注数据""" - logger.info(f"准备投注数据,日期: {date_str}") - - # 确保目录存在 - ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True) - - # 从数据库获取数据 - db_service = DatabaseService(self.config) - data_list = db_service.query_orders_by_date(date_str) - - if not data_list: - message = f"日期 {date_str} 没有找到投注数据" - logger.warning(message) - raise ValueError(message) - - # 转换为DataFrame - bet_df = pd.DataFrame([d.to_dict() for d in data_list]) - - # 列名映射 - col_map = { - "sportsbooks": "Sportsbook", - "bet_name": "Bet Name", - "market": "Market Name", - "price": "Odds", - "stake": "Stake", - "event_name": "Event Name", - "sport": "Sport", - "league": "League", - "game_id": "Game ID", - "bet_type": "Bet Type", - "bet_id": "Notes", - "start_timestamp": "Game Start Date", - } - - bet_df = bet_df.rename(columns=col_map) - - # 处理Sportsbook字段 - bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) - - # 加载博彩公司名称映射 - with open(self.config.bet_name_map_path, "r") as f: - bet_name_map = json.load(f) - bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( - lambda x: bet_name_map.get(x, x) - ) - - # 选择需要的列 - bet_df = bet_df[col_map.values()] - - # 格式化时间 - bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( - lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime( - "%Y/%m/%d %H:%M" - ) - ) - - # 保存到文件 - bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False) - - logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}") - logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}") - logger.info(f"投注数据形状: {bet_df.shape}") - - return ( - f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n" - f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n" - f"投注数据形状: {bet_df.shape}" - ) - - def process_status_data(self, status_file_path: str) -> None: - """处理状态数据并更新数据库""" - logger.info(f"处理状态数据: {status_file_path}") - - data_list = [] - with open(status_file_path, "r", encoding="utf-8") as f: - for line in f: - data = json.loads(line) - data_list.extend(data["entities"]) - - status_df = pd.DataFrame(data_list) - - # 处理错误状态 - def get_error_status(row): - if not pd.isna(row["autograder_errors"]): - return f"error: {row['autograder_errors']}" - return row["status"] - - status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) - status_df = status_df[status_df["status"] != "pending"] - status_df = status_df[["status", "notes"]].drop_duplicates() - - # 更新数据库 - db_service = DatabaseService(self.config) - status_list = status_df.to_dict(orient="records") - - for i, data in enumerate(status_list): - bet_id = data["notes"] - bet_status = data["status"] - logger.info( - f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}" - ) - - for _ in range(3): - try: - db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status) - break - except Exception as e: - logger.error(f"更新投注状态失败: {e}") - - -class TaskScheduler: - """定时任务调度器""" - - def __init__(self, config: AppConfig): - self.config = config - self.oddsjam_service = OddsjamService(config) - self.data_processor = BetDataProcessor(config) - - def upload_bets_task(self, date_str: Optional[str] = None) -> None: - """上传 T-1 时间的投注数据任务""" - config_base_info = str(self.config) - try: - if date_str is None: - date_str = ( - datetime.datetime.now() - datetime.timedelta(days=1) - ).strftime("%Y%m%d") - - logger.info(f"执行上传任务,日期: {date_str}") - - # 准备数据 - data_info = self.data_processor.prepare_bet_data_for_upload(date_str) - - if not data_info: - logger.warning(f"日期 {date_str} 没有数据需要上传") - return - - # 上传到OddsJam - self.oddsjam_service.upload_bets(self.config.bet_file_path) - - # 发送通知 - self.config.dingtalk.send_text( - f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}" - ) - - except Exception as e: - error_info = traceback.format_exc() - logger.error(f"上传任务失败: {error_info}") - self.config.dingtalk.send_text( - f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}" - ) - - def pull_status_task(self, date_str: Optional[str] = None) -> None: - """拉取状态数据任务""" - config_base_info = str(self.config) - try: - if date_str is None: - date_str = datetime.datetime.now().strftime("%Y%m%d") - - logger.info(f"执行拉取任务,日期: {date_str}") - - # 构建状态文件路径 - status_file_path = os.path.join( - self.config.root_dir, - "data", - "bet_data", - "bet_status", - f"{self.config.email_account}_status_{date_str}.json", - ) - ensure_directory_exists(target_path=status_file_path, is_file=True) - - # 获取状态数据 - self.oddsjam_service.get_all_bet_status(date_str, status_file_path) - - # 处理状态数据 - self.data_processor.process_status_data(status_file_path) - - # 发送通知 - self.config.dingtalk.send_text( - f"{date_str}: 比赛状态更新完成\n{config_base_info}" - ) - - except Exception as e: - error_info = traceback.format_exc() - logger.error(f"拉取任务失败: {error_info}") - self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}") - - -class BetTrackerApplication: - """主应用类""" - - def __init__(self): - self.config = AppConfig() - self.scheduler = TaskScheduler(self.config) - - def run_scheduled_tasks(self) -> None: - """运行定时任务""" - import schedule - - logger.info(f"上传定时任务: {self.config.upload_schedule}") - logger.info(f"拉取定时任务: {self.config.pull_schedule}") - - # 设置定时任务 - schedule.every().day.at(self.config.pull_schedule).do( - self.scheduler.pull_status_task - ) - schedule.every().day.at(self.config.upload_schedule).do( - self.scheduler.upload_bets_task - ) - - # 运行定时任务 - while True: - schedule.run_pending() - time.sleep(0.05) - - def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None: - """运行手动任务""" - if task_type == "upload": - self.scheduler.upload_bets_task(date_str) - elif task_type == "pull": - self.scheduler.pull_status_task(date_str) - else: - logger.error(f"未知的任务类型: {task_type}") - - -def main(): - """主入口函数""" - app = BetTrackerApplication() - - # 检查命令行参数 - if len(sys.argv) > 1: - task_type = sys.argv[1] - date_str = sys.argv[2] if len(sys.argv) > 2 else None - app.run_manual_task(task_type, date_str) - else: - # 运行定时任务 - app.run_scheduled_tasks() - - -if __name__ == "__main__": - main() - - # app = BetTrackerApplication() - # logger.info(app.config) - # app.run_manual_task(task_type="upload", date_str="20251026")