import http import sys import os import re import math import time import json import datetime import traceback import pandas as pd from retry import retry from common.utils import ensure_directory_exists from dao.Database import Database from playwright.sync_api import sync_playwright, Page, BrowserContext, expect from loguru import logger from data_model import MysqlConfig, OddsjamOrderStatus from dingtalk import DingTalkBot webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4" secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e" dingtalk = DingTalkBot(webhook, secret) root_dir = os.path.dirname(os.path.abspath(__file__)) config_file_path = "./config/mysql_config.json" mysql_config = MysqlConfig.parse_file(config_file_path) dao = Database(mysql_config) def query_by_create_time(ds: str): sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'" rows = dao.fetchall(query=sql) if rows: return [OddsjamOrderStatus(row) for row in rows] else: return [] @retry(tries=6) def update_bet_status(id: str, bet_status: str): sql = "update oddsjam_order_all set bet_status = %s where id = %s" dao.execute(query=sql, args=(bet_status, id)) class SyncOddsjamBetTracker: def __init__( self, login_state_path: str, intercept_response_res_save_path: str, headless: bool = False, default_time_out: float = 6000000, ): self.login_state_path = login_state_path self.headless = headless self.default_time_out = default_time_out self.intercept_response_res_save_path = intercept_response_res_save_path self.total_bet_cnt = 0 def login_oddsjam_cookies(self, p, headless=False) -> tuple: # 获取HTTP_PROXY环境变量,默认为None import os http_proxy = os.environ.get("HTTP_PROXY", None) if http_proxy is None: http_proxy = "http://127.0.0.1:7890" logger.info("加载cookies {}", self.login_state_path) browser = p.chromium.launch( args=["--start-maximized"], headless=headless, proxy={ "server": http_proxy, }, ) context = browser.new_context( storage_state=self.login_state_path, no_viewport=True ) page = context.new_page() return page, browser def login_to_site(self): logger.info(f"login account: {self.email_account}") with sync_playwright() as p: browser = p.chromium.launch(headless=self.headless) page = browser.new_page() page.set_default_timeout(timeout=self.default_time_out) url = "https://oddsjam.com/bet-tracker" page.goto(url) # page.get_by_label("Close Modal").click() page.get_by_role("link", name="Login").click() expect(page.get_by_role("button", name="Sign in")).to_be_visible( timeout=self.default_time_out ) page.pause() time.sleep(10) try: browser.contexts[0].storage_state(path=self.login_state_path) except Exception as ex: traceback.print_exc() page.screenshot(path="error.png") logger.info(page.url, "login_success") browser.close() @retry(tries=6) def intercept_import_response(self, route, request): if "oddsjam.com/api/backend/bets/import" in request.url: response = route.fetch(timeout=50000) logger.info(response.status) if response.status == 500: route.fulfill(response=response, json={}) return route.continue_() @retry(tries=6) def upload_new_bets(self, bet_file_path: str): url = "https://oddsjam.com/bet-tracker" with sync_playwright() as p: page: Page browser: BrowserContext page, browser = self.login_oddsjam_cookies(p, headless=self.headless) page.set_default_timeout(timeout=self.default_time_out) def on_response(response): if "dromo-user-imports-production" in response.url: print(response.status, response.request.failure) if response.status == 400: browser.close() raise Exception(response.request.failure) elif "oddsjam.com/api/backend/bets/import" in response.url: logger.info(response.status) page.on("response", on_response) page.goto(url) # page.pause() page.get_by_role("button", name="Import Bets").nth(1).click() time.sleep(5) iframe_locator = page.frame_locator( 'iframe[title="Dromo Importer\\: Bets"]' ) with page.expect_file_chooser() as fc_info: iframe_locator.get_by_role( "button", name="Choose a file", exact=True ).click() time.sleep(5) file_chooser = fc_info.value file_chooser.set_files(bet_file_path) iframe_locator.get_by_role( "button", name="Confirm selection and continue" ).click() time.sleep(5) iframe_locator.get_by_role( "button", name="Confirm matching and continue" ).click() time.sleep(5) iframe_locator.get_by_role("button", name="Continue").click() iframe_locator.get_by_role("button", name="Finish").click() try: iframe_locator.get_by_role("button", name="Submit anyway").click( timeout=5000 ) except Exception as ex: ... # print(ex) iframe_locator.get_by_role("button", name="Yes").click() expect(iframe_locator.get_by_text("Processing...")).to_be_hidden( timeout=self.default_time_out ) # os.remove(bet_file_path) # page.pause() browser.close() def get_all_bet_status(self, ds: str = None): if not ds: ds = datetime.datetime.now().strftime("%Y%m%d") day = datetime.datetime.strptime(ds, "%Y%m%d").day logger.info(f"current date: {ds}") logger.info(f"current day: {day}") url = "https://oddsjam.com/bet-tracker" with sync_playwright() as p: page: Page browser: BrowserContext page, browser = self.login_oddsjam_cookies(p, headless=self.headless) def on_response(response): if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url: try: # 确保响应已完成 if response.request.failure is None and response.status == 200: data = response.json() # 注意:sync_api 中 .json() 是同步的 self.total_bet_cnt = data["totalCount"] logger.info(f"total bet count: {self.total_bet_cnt}") # 保存数据 with open( self.intercept_response_res_save_path, "a", encoding="utf-8", ) as f: json.dump(data, f, ensure_ascii=False) f.write("\n") except Exception as e: logger.error(f"Error processing response: {e}") # page.route("**", self.intercept_response) page.on("response", on_response) page.set_default_timeout(timeout=self.default_time_out) page.goto(url=url) try: page.locator("#cello-widget-app").get_by_role("button").click( timeout=6000 ) except: ... # page.locator(".mt-4 > div > .inline-flex").first.click() # time.sleep(5) # page.get_by_label("Clear").click() # time.sleep(5) # page.get_by_role("button", name="Date Range").click() # time.sleep(5) # page.get_by_text("Custom", exact=True).click() # time.sleep(5) # page.get_by_role("button", name=f"{day}").click() # time.sleep(5) page.get_by_role( "button", name=re.compile("Show (\d+) Results", re.IGNORECASE) ).click() inner_text = page.get_by_text( re.compile("Showing 1 to 50 of", re.IGNORECASE) ).inner_text() match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text) total_bet_cnt = int(match[1]) total_page_no = math.ceil(total_bet_cnt / 50) logger.info(f"total {total_bet_cnt} results, {total_page_no} pages") page.pause() for page_no in range(2, total_page_no): expect(page.get_by_role("button", name="Next")).to_be_visible( timeout=self.default_time_out ) page.wait_for_timeout(timeout=3000) page.get_by_role("button", name="Next").click() logger.info(f"current page number: {page_no} / {total_page_no}") page.get_by_role("button", name=f"{total_page_no}", exact=True).click() expect(page.get_by_role("button", name="Next")).to_be_visible( timeout=self.default_time_out ) # page.pause() browser.close() def get_tomorrow_over_bet_order_data(save_file_path: str, ds): logger.info(f"load bet data from db: {ds}") data_list = query_by_create_time(ds=ds) data_list = [d.to_dict() for d in data_list] bet_df = pd.DataFrame(data_list) bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) col_map = { "sportsbooks": "Sportsbook", "bet_name": "Bet Name", "market": "Market Name", "price": "Odds", "stake": "Stake", "event_name": "Event Name", "sport": "Sport", "league": "League", "game_id": "Game ID", "bet_type": "Bet Type", "bet_id": "Notes", "start_timestamp": "Game Start Date", } bet_df = bet_df.rename(columns=col_map) bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0]) with open("./data/bet_data/betname_map.json", "r") as f: bet_name_map = json.load(f) bet_df["Sportsbook"] = bet_df["Sportsbook"].apply( lambda x: bet_name_map.get(x, x) ) # bet_df = bet_df.explode("Sportsbook") bet_df = bet_df[col_map.values()] bet_df["Game Start Date"] = bet_df["Game Start Date"].apply( lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M") ) logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}") logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}") logger.info(bet_df.shape) bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False) info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n" info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n" info_message += f"bet order shape: {bet_df.shape}" return info_message def update_db_order_status(status_file_path: str): logger.info(status_file_path) data_list = [] with open(status_file_path, "r", encoding="utf-8") as f: for line in f: data = json.loads(line) data_list.extend(data["entities"]) status_df = pd.DataFrame(data_list) def get_error_status(row): if not pd.isna(row["autograder_errors"]): return f"error: {row['autograder_errors']}" return row["status"] status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1) status_df = status_df[status_df["status"] != "pending"] status_df = status_df[["status", "notes"]].drop_duplicates() status_list = status_df.to_dict(orient="records") for i, data in enumerate(status_list): bet_id = data["notes"] bet_status = data["status"] logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}") for _ in range(3): try: update_bet_status(id=bet_id, bet_status=bet_status) break except Exception as e: logger.error(f"update bet status error: {e}") def pull_data_from_oddsjam_update(ds: str = None): try: if ds is None: ds = datetime.datetime.now().strftime("%Y%m%d") logger.info(f"current date: {ds}") ds_date = datetime.datetime.strptime(ds, "%Y%m%d") pre_date = ds_date - datetime.timedelta(days=0) pre_ds = pre_date.strftime("%Y%m%d") oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds) # oddsjam_bet_tracker.login_to_site() oddsjam_bet_tracker.get_all_bet_status(ds) intercept_response_res_save_path = ( oddsjam_bet_tracker.intercept_response_res_save_path ) update_db_order_status(status_file_path=intercept_response_res_save_path) dingtalk.send_text(f"{ds}: 比赛状态更新完成") except Exception as e: error_info = traceback.print_exc() logger.error(error_info) dingtalk.send_text(error_info) def upload_new_bets_data2oddsjam(ds: str = None): try: if ds is None: ds = datetime.datetime.now() - datetime.timedelta(days=1) ds = ds.strftime("%Y%m%d") logger.info(f"current date: {ds}") oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds) # oddsjam_bet_tracker.login_to_site() bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv") ensure_directory_exists(target_path=bet_file_path, is_file=True) data_info = get_tomorrow_over_bet_order_data( save_file_path=bet_file_path, ds=ds ) oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path) dingtalk.send_text(f"{ds}: \n {data_info}") except Exception as e: error_info = traceback.format_exc() dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}") def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker: email_account = "aszer27937@gmail.com" login_state_save_path = os.path.join( root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json" ) ensure_directory_exists(target_path=login_state_save_path, is_file=True) intercept_response_res_save_path = os.path.join( root_dir, "data", "bet_data", "bet_status", f"{email_account}_status_{ds}.json", ) ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True) oddsjam_bet_tracker = SyncOddsjamBetTracker( login_state_path=login_state_save_path, intercept_response_res_save_path=intercept_response_res_save_path, # headless=True ) return oddsjam_bet_tracker def main(): ds = datetime.datetime.now() - datetime.timedelta(days=1) ds = ds.strftime("%Y%m%d") logger.info(f"current date: {ds}") pull_data_from_oddsjam_update(ds=ds) # upload_new_bets_data2oddsjam(ds=ds) if __name__ == "__main__": import schedule # schedule.every().day.at("07:00").do(clear_order_from_oddsjam) UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "11:20") PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00") schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update) schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam) while True: schedule.run_pending() time.sleep(0.05) # main()