From 54e9d057789f875341f7f15961b6fe9838680ca6 Mon Sep 17 00:00:00 2001 From: aszerW Date: Sat, 25 Oct 2025 13:24:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AF=94=E8=B5=9B=E7=8A=B6=E6=80=81=E7=88=AC?= =?UTF-8?q?=E8=99=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- OddsjamBetTracker.py | 418 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 418 insertions(+) create mode 100644 OddsjamBetTracker.py diff --git a/OddsjamBetTracker.py b/OddsjamBetTracker.py new file mode 100644 index 0000000..1403279 --- /dev/null +++ b/OddsjamBetTracker.py @@ -0,0 +1,418 @@ +import http +import sys +import os +import re +import math +import time +import json +import datetime +import traceback +import pandas as pd +from retry import retry +from common.utils import ensure_directory_exists + +from dao.Database import Database +from playwright.sync_api import sync_playwright, Page, BrowserContext, expect +from loguru import logger +from data_model import MysqlConfig, OddsjamOrderStatus +from dingtalk import DingTalkBot + +webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4" +secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e" + +dingtalk = DingTalkBot(webhook, secret) + + +root_dir = os.path.dirname(os.path.abspath(__file__)) + +config_file_path = "./config/mysql_config.json" +mysql_config = MysqlConfig.parse_file(config_file_path) +dao = Database(mysql_config) + + +def query_by_create_time(ds: str): + sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'" + rows = dao.fetchall(query=sql) + if rows: + return [OddsjamOrderStatus(row) for row in rows] + else: + return [] + +@retry(tries=6) +def update_bet_status(id: str, bet_status: str): + sql = "update oddsjam_order_all set bet_status = %s where id = %s" + dao.execute(query=sql, args=(bet_status, id)) + + +class SyncOddsjamBetTracker: + def __init__( + self, + login_state_path: str, + intercept_response_res_save_path: str, + headless: bool = False, + default_time_out: float = 6000000, + ): + self.login_state_path = login_state_path + self.headless = headless + self.default_time_out = default_time_out + self.intercept_response_res_save_path = intercept_response_res_save_path + self.total_bet_cnt = 0 + + def login_oddsjam_cookies(self, p, headless=False) -> tuple: + # 获取HTTP_PROXY环境变量,默认为None + import os + + http_proxy = os.environ.get("HTTP_PROXY", None) + if http_proxy is None: + http_proxy = "http://127.0.0.1:7890" + logger.info("加载cookies {}", self.login_state_path) + browser = p.chromium.launch( + args=["--start-maximized"], + headless=headless, + proxy={ + "server": http_proxy, + }, + ) + context = browser.new_context( + storage_state=self.login_state_path, no_viewport=True + ) + page = context.new_page() + return page, browser + + def login_to_site(self): + logger.info(f"login account: {self.email_account}") + with sync_playwright() as p: + browser = p.chromium.launch(headless=self.headless) + page = browser.new_page() + page.set_default_timeout(timeout=self.default_time_out) + url = "https://oddsjam.com/bet-tracker" + page.goto(url) + + # page.get_by_label("Close Modal").click() + + page.get_by_role("link", name="Login").click() + expect(page.get_by_role("button", name="Sign in")).to_be_visible( + timeout=self.default_time_out + ) + page.pause() + time.sleep(10) + try: + browser.contexts[0].storage_state(path=self.login_state_path) + except Exception as ex: + traceback.print_exc() + page.screenshot(path="error.png") + logger.info(page.url, "login_success") + browser.close() + + @retry(tries=6) + def intercept_import_response(self, route, request): + if "oddsjam.com/api/backend/bets/import" in request.url: + response = route.fetch(timeout=50000) + logger.info(response.status) + if response.status == 500: + route.fulfill(response=response, json={}) + return + route.continue_() + + @retry(tries=6) + def upload_new_bets(self, bet_file_path: str): + url = "https://oddsjam.com/bet-tracker" + with sync_playwright() as p: + page: Page + browser: BrowserContext + page, browser = self.login_oddsjam_cookies(p, headless=self.headless) + page.set_default_timeout(timeout=self.default_time_out) + + def on_response(response): + if "dromo-user-imports-production" in response.url: + print(response.status, response.request.failure) + if response.status == 400: + browser.close() + raise Exception(response.request.failure) + elif "oddsjam.com/api/backend/bets/import" in response.url: + logger.info(response.status) + + page.on("response", on_response) + page.goto(url) + # page.pause() + page.get_by_role("button", name="Import Bets").nth(1).click() + time.sleep(5) + iframe_locator = page.frame_locator( + 'iframe[title="Dromo Importer\\: Bets"]' + ) + with page.expect_file_chooser() as fc_info: + iframe_locator.get_by_role( + "button", name="Choose a file", exact=True + ).click() + time.sleep(5) + file_chooser = fc_info.value + file_chooser.set_files(bet_file_path) + + iframe_locator.get_by_role( + "button", name="Confirm selection and continue" + ).click() + time.sleep(5) + iframe_locator.get_by_role( + "button", name="Confirm matching and continue" + ).click() + time.sleep(5) + iframe_locator.get_by_role("button", name="Continue").click() + iframe_locator.get_by_role("button", name="Finish").click() + try: + iframe_locator.get_by_role("button", name="Submit anyway").click( + timeout=5000 + ) + except Exception as ex: + ... + # print(ex) + iframe_locator.get_by_role("button", name="Yes").click() + expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( + timeout=self.default_time_out + ) + # os.remove(bet_file_path) + # page.pause() + browser.close() + + def get_all_bet_status(self, ds: str = None): + if not ds: + ds = datetime.datetime.now().strftime("%Y%m%d") + day = datetime.datetime.strptime(ds, "%Y%m%d").day + logger.info(f"current date: {ds}") + logger.info(f"current day: {day}") + url = "https://oddsjam.com/bet-tracker" + with sync_playwright() as p: + page: Page + browser: BrowserContext + page, browser = self.login_oddsjam_cookies(p, headless=self.headless) + + def on_response(response): + if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: + try: + # 确保响应已完成 + if response.request.failure is None and response.status == 200: + data = response.json() # 注意:sync_api 中 .json() 是同步的 + self.total_bet_cnt = data["totalCount"] + logger.info(f"total bet count: {self.total_bet_cnt}") + + # 保存数据 + with open( + self.intercept_response_res_save_path, + "a", + encoding="utf-8", + ) as f: + json.dump(data, f, ensure_ascii=False) + f.write("\n") + except Exception as e: + logger.error(f"Error processing response: {e}") + + # page.route("**", self.intercept_response) + page.on("response", on_response) + page.set_default_timeout(timeout=self.default_time_out) + page.goto(url=url) + try: + page.locator("#cello-widget-app").get_by_role("button").click( + timeout=6000 + ) + except: + ... + + # page.locator(".mt-4 > div > .inline-flex").first.click() + # time.sleep(5) + # page.get_by_label("Clear").click() + # time.sleep(5) + # page.get_by_role("button", name="Date Range").click() + # time.sleep(5) + # page.get_by_text("Custom", exact=True).click() + # time.sleep(5) + # page.get_by_role("button", name=f"{day}").click() + # time.sleep(5) + page.get_by_role( + "button", name=re.compile("Show (\d+) Results", re.IGNORECASE) + ).click() + + inner_text = page.get_by_text( + re.compile("Showing 1 to 50 of", re.IGNORECASE) + ).inner_text() + match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) + total_bet_cnt = int(match[1]) + total_page_no = math.ceil(total_bet_cnt / 50) + logger.info(f"total {total_bet_cnt} results, {total_page_no} pages") + page.pause() + for page_no in range(2, total_page_no): + expect(page.get_by_role("button", name="Next")).to_be_visible( + timeout=self.default_time_out + ) + page.wait_for_timeout(timeout=3000) + page.get_by_role("button", name="Next").click() + logger.info(f"current page number: {page_no} / {total_page_no}") + + page.get_by_role("button", name=f"{total_page_no}", exact=True).click() + expect(page.get_by_role("button", name="Next")).to_be_visible( + timeout=self.default_time_out + ) + # page.pause() + browser.close() + + +def get_tomorrow_over_bet_order_data(save_file_path: str, ds): + logger.info(f"load bet data from db: {ds}") + data_list = query_by_create_time(ds=ds) + data_list = [d.to_dict() for d in data_list] + bet_df = pd.DataFrame(data_list) + bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) + + col_map = { + "sportsbooks": "Sportsbook", + "bet_name": "Bet Name", + "market": "Market Name", + "price": "Odds", + "stake": "Stake", + "event_name": "Event Name", + "sport": "Sport", + "league": "League", + "game_id": "Game ID", + "bet_type": "Bet Type", + "bet_id": "Notes", + "start_timestamp": "Game Start Date", + } + + bet_df = bet_df.rename(columns=col_map) + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) + with open("./data/bet_data/betname_map.json", "r") as f: + bet_name_map = json.load(f) + bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( + lambda x: bet_name_map.get(x, x) + ) + # bet_df = bet_df.explode("Sportsbook") + bet_df = bet_df[col_map.values()] + bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( + lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M") + ) + logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}") + logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}") + + logger.info(bet_df.shape) + bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) + info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n" + info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n" + info_message += f"bet order shape: {bet_df.shape}" + return info_message + + +def update_db_order_status(status_file_path: str): + logger.info(status_file_path) + data_list = [] + with open(status_file_path, "r", encoding="utf-8") as f: + for line in f: + data = json.loads(line) + data_list.extend(data["entities"]) + status_df = pd.DataFrame(data_list) + + def get_error_status(row): + if not pd.isna(row["autograder_errors"]): + return f"error: {row['autograder_errors']}" + return row["status"] + + status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) + status_df = status_df[status_df["status"] != "pending"] + status_df = status_df[["status", "notes"]].drop_duplicates() + status_list = status_df.to_dict(orient="records") + for i, data in enumerate(status_list): + bet_id = data["notes"] + bet_status = data["status"] + logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}") + for _ in range(3): + try: + update_bet_status(id=bet_id, bet_status=bet_status) + break + except Exception as e: + logger.error(f"update bet status error: {e}") + + +def pull_data_from_oddsjam_update(ds: str = None): + try: + if ds is None: + ds = datetime.datetime.now().strftime("%Y%m%d") + logger.info(f"current date: {ds}") + ds_date = datetime.datetime.strptime(ds, "%Y%m%d") + pre_date = ds_date - datetime.timedelta(days=0) + pre_ds = pre_date.strftime("%Y%m%d") + + oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds) + # oddsjam_bet_tracker.login_to_site() + oddsjam_bet_tracker.get_all_bet_status(ds) + intercept_response_res_save_path = ( + oddsjam_bet_tracker.intercept_response_res_save_path + ) + update_db_order_status(status_file_path=intercept_response_res_save_path) + dingtalk.send_text(f"{ds}: 比赛状态更新完成") + except Exception as e: + error_info = traceback.print_exc() + logger.error(error_info) + dingtalk.send_text(error_info) + + +def upload_new_bets_data2oddsjam(ds: str = None): + try: + if ds is None: + ds = datetime.datetime.now() - datetime.timedelta(days=1) + ds = ds.strftime("%Y%m%d") + logger.info(f"current date: {ds}") + oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds) + + # oddsjam_bet_tracker.login_to_site() + bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv") + ensure_directory_exists(target_path=bet_file_path, is_file=True) + data_info = get_tomorrow_over_bet_order_data( + save_file_path=bet_file_path, ds=ds + ) + oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path) + dingtalk.send_text(f"{ds}: \n {data_info}") + except Exception as e: + error_info = traceback.format_exc() + dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}") + + +def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker: + email_account = "aszer27937@gmail.com" + login_state_save_path = os.path.join( + root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json" + ) + ensure_directory_exists(target_path=login_state_save_path, is_file=True) + intercept_response_res_save_path = os.path.join( + root_dir, + "data", + "bet_data", + "bet_status", + f"{email_account}_status_{ds}.json", + ) + ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True) + oddsjam_bet_tracker = SyncOddsjamBetTracker( + login_state_path=login_state_save_path, + intercept_response_res_save_path=intercept_response_res_save_path, + # headless=True + ) + return oddsjam_bet_tracker + + +def main(): + ds = datetime.datetime.now() - datetime.timedelta(days=1) + ds = ds.strftime("%Y%m%d") + logger.info(f"current date: {ds}") + + pull_data_from_oddsjam_update(ds=ds) + # upload_new_bets_data2oddsjam(ds=ds) + + +if __name__ == "__main__": + import schedule + + # schedule.every().day.at("07:00").do(clear_order_from_oddsjam) + + schedule.every().day.at("11:20").do(pull_data_from_oddsjam_update) + schedule.every().day.at("14:00").do(upload_new_bets_data2oddsjam) + + while True: + schedule.run_pending() + time.sleep(0.05) + # main()