Files
bet/OddsjamBetTracker.py
2025-10-25 15:32:03 +08:00

424 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import http
import sys
import os
import re
import math
import time
import json
import datetime
import traceback
import pandas as pd
from retry import retry
from common.utils import ensure_directory_exists
from dao.Database import Database
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
from loguru import logger
from data_model import MysqlConfig, OddsjamOrderStatus
from common.dingtalk import DingTalkBot
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
dingtalk = DingTalkBot(webhook, secret)
root_dir = os.path.dirname(os.path.abspath(__file__))
config_file_path = "./config/mysql_config.json"
mysql_config = MysqlConfig.parse_file(config_file_path)
dao = Database(mysql_config)
def query_by_create_time(ds: str):
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'"
rows = dao.fetchall(query=sql)
if rows:
return [OddsjamOrderStatus(row) for row in rows]
else:
return []
@retry(tries=6)
def update_bet_status(id: str, bet_status: str):
sql = "update oddsjam_order_all set bet_status = %s where id = %s"
dao.execute(query=sql, args=(bet_status, id))
class SyncOddsjamBetTracker:
def __init__(
self,
login_state_path: str,
intercept_response_res_save_path: str,
headless: bool = False,
default_time_out: float = 6000000,
):
self.login_state_path = login_state_path
self.headless = headless
self.default_time_out = default_time_out
self.intercept_response_res_save_path = intercept_response_res_save_path
self.total_bet_cnt = 0
def login_oddsjam_cookies(self, p, headless=False) -> tuple:
# 获取HTTP_PROXY环境变量默认为None
import os
http_proxy = os.environ.get("HTTP_PROXY", None)
if http_proxy is None:
http_proxy = "http://127.0.0.1:7890"
logger.info("加载cookies {}", self.login_state_path)
browser = p.chromium.launch(
args=["--start-maximized"],
headless=headless,
proxy={
"server": http_proxy,
},
)
context = browser.new_context(
storage_state=self.login_state_path, no_viewport=True
)
page = context.new_page()
return page, browser
def login_to_site(self):
logger.info(f"login account: {self.email_account}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
page = browser.new_page()
page.set_default_timeout(timeout=self.default_time_out)
url = "https://oddsjam.com/bet-tracker"
page.goto(url)
# page.get_by_label("Close Modal").click()
page.get_by_role("link", name="Login").click()
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
timeout=self.default_time_out
)
page.pause()
time.sleep(10)
try:
browser.contexts[0].storage_state(path=self.login_state_path)
except Exception as ex:
traceback.print_exc()
page.screenshot(path="error.png")
logger.info(page.url, "login_success")
browser.close()
@retry(tries=6)
def intercept_import_response(self, route, request):
if "oddsjam.com/api/backend/bets/import" in request.url:
response = route.fetch(timeout=50000)
logger.info(response.status)
if response.status == 500:
route.fulfill(response=response, json={})
return
route.continue_()
@retry(tries=6)
def upload_new_bets(self, bet_file_path: str):
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page: Page
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
page.set_default_timeout(timeout=self.default_time_out)
def on_response(response):
if "dromo-user-imports-production" in response.url:
print(response.status, response.request.failure)
if response.status == 400:
browser.close()
raise Exception(response.request.failure)
elif "oddsjam.com/api/backend/bets/import" in response.url:
logger.info(response.status)
page.on("response", on_response)
page.goto(url)
# page.pause()
page.get_by_role("button", name="Import Bets").nth(1).click()
time.sleep(5)
iframe_locator = page.frame_locator(
'iframe[title="Dromo Importer\\: Bets"]'
)
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
iframe_locator.get_by_role(
"button", name="Confirm selection and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role("button", name="Continue").click()
iframe_locator.get_by_role("button", name="Finish").click()
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception as ex:
...
# print(ex)
iframe_locator.get_by_role("button", name="Yes").click()
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.default_time_out
)
# os.remove(bet_file_path)
# page.pause()
browser.close()
def get_all_bet_status(self, ds: str = None):
if not ds:
ds = datetime.datetime.now().strftime("%Y%m%d")
day = datetime.datetime.strptime(ds, "%Y%m%d").day
logger.info(f"current date: {ds}")
logger.info(f"current day: {day}")
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page: Page
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
def on_response(response):
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
try:
# 确保响应已完成
if response.request.failure is None and response.status == 200:
data = response.json() # 注意sync_api 中 .json() 是同步的
self.total_bet_cnt = data["totalCount"]
logger.info(f"total bet count: {self.total_bet_cnt}")
# 保存数据
with open(
self.intercept_response_res_save_path,
"a",
encoding="utf-8",
) as f:
json.dump(data, f, ensure_ascii=False)
f.write("\n")
except Exception as e:
logger.error(f"Error processing response: {e}")
# page.route("**", self.intercept_response)
page.on("response", on_response)
page.set_default_timeout(timeout=self.default_time_out)
page.goto(url=url)
try:
page.locator("#cello-widget-app").get_by_role("button").click(
timeout=6000
)
except:
...
# page.locator(".mt-4 > div > .inline-flex").first.click()
# time.sleep(5)
# page.get_by_label("Clear").click()
# time.sleep(5)
# page.get_by_role("button", name="Date Range").click()
# time.sleep(5)
# page.get_by_text("Custom", exact=True).click()
# time.sleep(5)
# page.get_by_role("button", name=f"{day}").click()
# time.sleep(5)
page.get_by_role(
"button", name=re.compile("Show (\d+) Results", re.IGNORECASE)
).click()
inner_text = page.get_by_text(
re.compile("Showing 1 to 50 of", re.IGNORECASE)
).inner_text()
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
total_bet_cnt = int(match[1])
total_page_no = math.ceil(total_bet_cnt / 50)
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages")
page.pause()
for page_no in range(2, total_page_no):
expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out
)
page.wait_for_timeout(timeout=3000)
page.get_by_role("button", name="Next").click()
logger.info(f"current page number: {page_no} / {total_page_no}")
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out
)
# page.pause()
browser.close()
def get_tomorrow_over_bet_order_data(save_file_path: str, ds):
logger.info(f"load bet data from db: {ds}")
data_list = query_by_create_time(ds=ds)
data_list = [d.to_dict() for d in data_list]
bet_df = pd.DataFrame(data_list)
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
with open("./data/bet_data/betname_map.json", "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
)
# bet_df = bet_df.explode("Sportsbook")
bet_df = bet_df[col_map.values()]
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
)
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
logger.info(bet_df.shape)
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
info_message += f"bet order shape: {bet_df.shape}"
return info_message
def update_db_order_status(status_file_path: str):
logger.info(status_file_path)
data_list = []
with open(status_file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
for _ in range(3):
try:
update_bet_status(id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"update bet status error: {e}")
def pull_data_from_oddsjam_update(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"current date: {ds}")
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
pre_date = ds_date - datetime.timedelta(days=0)
pre_ds = pre_date.strftime("%Y%m%d")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
# oddsjam_bet_tracker.login_to_site()
oddsjam_bet_tracker.get_all_bet_status(ds)
intercept_response_res_save_path = (
oddsjam_bet_tracker.intercept_response_res_save_path
)
update_db_order_status(status_file_path=intercept_response_res_save_path)
dingtalk.send_text(f"{ds}: 比赛状态更新完成")
except Exception as e:
error_info = traceback.print_exc()
logger.error(error_info)
dingtalk.send_text(error_info)
def upload_new_bets_data2oddsjam(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
# oddsjam_bet_tracker.login_to_site()
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv")
ensure_directory_exists(target_path=bet_file_path, is_file=True)
data_info = get_tomorrow_over_bet_order_data(
save_file_path=bet_file_path, ds=ds
)
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
dingtalk.send_text(f"{ds}: \n {data_info}")
except Exception as e:
error_info = traceback.format_exc()
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker:
email_account = "aszer27937@gmail.com"
login_state_save_path = os.path.join(
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json"
)
ensure_directory_exists(target_path=login_state_save_path, is_file=True)
intercept_response_res_save_path = os.path.join(
root_dir,
"data",
"bet_data",
"bet_status",
f"{email_account}_status_{ds}.json",
)
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
oddsjam_bet_tracker = SyncOddsjamBetTracker(
login_state_path=login_state_save_path,
intercept_response_res_save_path=intercept_response_res_save_path,
headless=True
)
return oddsjam_bet_tracker
def main():
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
pull_data_from_oddsjam_update(ds=ds)
# upload_new_bets_data2oddsjam(ds=ds)
if __name__ == "__main__":
import schedule
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam)
UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "10:00")
PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00")
logger.info(f"upload schedule: {UPLOAD_SCHEDULE}")
logger.info(f"pull schedule: {PULL_SCHEDULE}")
schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update)
schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam)
while True:
schedule.run_pending()
time.sleep(0.05)
# main()