比赛状态爬虫

This commit is contained in:
2025-10-25 13:24:03 +08:00
parent aa19628ce7
commit 54e9d05778

418
OddsjamBetTracker.py Normal file
View File

@@ -0,0 +1,418 @@
import http
import sys
import os
import re
import math
import time
import json
import datetime
import traceback
import pandas as pd
from retry import retry
from common.utils import ensure_directory_exists
from dao.Database import Database
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
from loguru import logger
from data_model import MysqlConfig, OddsjamOrderStatus
from dingtalk import DingTalkBot
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
dingtalk = DingTalkBot(webhook, secret)
root_dir = os.path.dirname(os.path.abspath(__file__))
config_file_path = "./config/mysql_config.json"
mysql_config = MysqlConfig.parse_file(config_file_path)
dao = Database(mysql_config)
def query_by_create_time(ds: str):
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'"
rows = dao.fetchall(query=sql)
if rows:
return [OddsjamOrderStatus(row) for row in rows]
else:
return []
@retry(tries=6)
def update_bet_status(id: str, bet_status: str):
sql = "update oddsjam_order_all set bet_status = %s where id = %s"
dao.execute(query=sql, args=(bet_status, id))
class SyncOddsjamBetTracker:
def __init__(
self,
login_state_path: str,
intercept_response_res_save_path: str,
headless: bool = False,
default_time_out: float = 6000000,
):
self.login_state_path = login_state_path
self.headless = headless
self.default_time_out = default_time_out
self.intercept_response_res_save_path = intercept_response_res_save_path
self.total_bet_cnt = 0
def login_oddsjam_cookies(self, p, headless=False) -> tuple:
# 获取HTTP_PROXY环境变量默认为None
import os
http_proxy = os.environ.get("HTTP_PROXY", None)
if http_proxy is None:
http_proxy = "http://127.0.0.1:7890"
logger.info("加载cookies {}", self.login_state_path)
browser = p.chromium.launch(
args=["--start-maximized"],
headless=headless,
proxy={
"server": http_proxy,
},
)
context = browser.new_context(
storage_state=self.login_state_path, no_viewport=True
)
page = context.new_page()
return page, browser
def login_to_site(self):
logger.info(f"login account: {self.email_account}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
page = browser.new_page()
page.set_default_timeout(timeout=self.default_time_out)
url = "https://oddsjam.com/bet-tracker"
page.goto(url)
# page.get_by_label("Close Modal").click()
page.get_by_role("link", name="Login").click()
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
timeout=self.default_time_out
)
page.pause()
time.sleep(10)
try:
browser.contexts[0].storage_state(path=self.login_state_path)
except Exception as ex:
traceback.print_exc()
page.screenshot(path="error.png")
logger.info(page.url, "login_success")
browser.close()
@retry(tries=6)
def intercept_import_response(self, route, request):
if "oddsjam.com/api/backend/bets/import" in request.url:
response = route.fetch(timeout=50000)
logger.info(response.status)
if response.status == 500:
route.fulfill(response=response, json={})
return
route.continue_()
@retry(tries=6)
def upload_new_bets(self, bet_file_path: str):
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page: Page
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
page.set_default_timeout(timeout=self.default_time_out)
def on_response(response):
if "dromo-user-imports-production" in response.url:
print(response.status, response.request.failure)
if response.status == 400:
browser.close()
raise Exception(response.request.failure)
elif "oddsjam.com/api/backend/bets/import" in response.url:
logger.info(response.status)
page.on("response", on_response)
page.goto(url)
# page.pause()
page.get_by_role("button", name="Import Bets").nth(1).click()
time.sleep(5)
iframe_locator = page.frame_locator(
'iframe[title="Dromo Importer\\: Bets"]'
)
with page.expect_file_chooser() as fc_info:
iframe_locator.get_by_role(
"button", name="Choose a file", exact=True
).click()
time.sleep(5)
file_chooser = fc_info.value
file_chooser.set_files(bet_file_path)
iframe_locator.get_by_role(
"button", name="Confirm selection and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role(
"button", name="Confirm matching and continue"
).click()
time.sleep(5)
iframe_locator.get_by_role("button", name="Continue").click()
iframe_locator.get_by_role("button", name="Finish").click()
try:
iframe_locator.get_by_role("button", name="Submit anyway").click(
timeout=5000
)
except Exception as ex:
...
# print(ex)
iframe_locator.get_by_role("button", name="Yes").click()
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
timeout=self.default_time_out
)
# os.remove(bet_file_path)
# page.pause()
browser.close()
def get_all_bet_status(self, ds: str = None):
if not ds:
ds = datetime.datetime.now().strftime("%Y%m%d")
day = datetime.datetime.strptime(ds, "%Y%m%d").day
logger.info(f"current date: {ds}")
logger.info(f"current day: {day}")
url = "https://oddsjam.com/bet-tracker"
with sync_playwright() as p:
page: Page
browser: BrowserContext
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
def on_response(response):
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
try:
# 确保响应已完成
if response.request.failure is None and response.status == 200:
data = response.json() # 注意sync_api 中 .json() 是同步的
self.total_bet_cnt = data["totalCount"]
logger.info(f"total bet count: {self.total_bet_cnt}")
# 保存数据
with open(
self.intercept_response_res_save_path,
"a",
encoding="utf-8",
) as f:
json.dump(data, f, ensure_ascii=False)
f.write("\n")
except Exception as e:
logger.error(f"Error processing response: {e}")
# page.route("**", self.intercept_response)
page.on("response", on_response)
page.set_default_timeout(timeout=self.default_time_out)
page.goto(url=url)
try:
page.locator("#cello-widget-app").get_by_role("button").click(
timeout=6000
)
except:
...
# page.locator(".mt-4 > div > .inline-flex").first.click()
# time.sleep(5)
# page.get_by_label("Clear").click()
# time.sleep(5)
# page.get_by_role("button", name="Date Range").click()
# time.sleep(5)
# page.get_by_text("Custom", exact=True).click()
# time.sleep(5)
# page.get_by_role("button", name=f"{day}").click()
# time.sleep(5)
page.get_by_role(
"button", name=re.compile("Show (\d+) Results", re.IGNORECASE)
).click()
inner_text = page.get_by_text(
re.compile("Showing 1 to 50 of", re.IGNORECASE)
).inner_text()
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
total_bet_cnt = int(match[1])
total_page_no = math.ceil(total_bet_cnt / 50)
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages")
page.pause()
for page_no in range(2, total_page_no):
expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out
)
page.wait_for_timeout(timeout=3000)
page.get_by_role("button", name="Next").click()
logger.info(f"current page number: {page_no} / {total_page_no}")
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
expect(page.get_by_role("button", name="Next")).to_be_visible(
timeout=self.default_time_out
)
# page.pause()
browser.close()
def get_tomorrow_over_bet_order_data(save_file_path: str, ds):
logger.info(f"load bet data from db: {ds}")
data_list = query_by_create_time(ds=ds)
data_list = [d.to_dict() for d in data_list]
bet_df = pd.DataFrame(data_list)
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
col_map = {
"sportsbooks": "Sportsbook",
"bet_name": "Bet Name",
"market": "Market Name",
"price": "Odds",
"stake": "Stake",
"event_name": "Event Name",
"sport": "Sport",
"league": "League",
"game_id": "Game ID",
"bet_type": "Bet Type",
"bet_id": "Notes",
"start_timestamp": "Game Start Date",
}
bet_df = bet_df.rename(columns=col_map)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
with open("./data/bet_data/betname_map.json", "r") as f:
bet_name_map = json.load(f)
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
lambda x: bet_name_map.get(x, x)
)
# bet_df = bet_df.explode("Sportsbook")
bet_df = bet_df[col_map.values()]
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
)
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
logger.info(bet_df.shape)
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
info_message += f"bet order shape: {bet_df.shape}"
return info_message
def update_db_order_status(status_file_path: str):
logger.info(status_file_path)
data_list = []
with open(status_file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
data_list.extend(data["entities"])
status_df = pd.DataFrame(data_list)
def get_error_status(row):
if not pd.isna(row["autograder_errors"]):
return f"error: {row['autograder_errors']}"
return row["status"]
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
status_df = status_df[status_df["status"] != "pending"]
status_df = status_df[["status", "notes"]].drop_duplicates()
status_list = status_df.to_dict(orient="records")
for i, data in enumerate(status_list):
bet_id = data["notes"]
bet_status = data["status"]
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
for _ in range(3):
try:
update_bet_status(id=bet_id, bet_status=bet_status)
break
except Exception as e:
logger.error(f"update bet status error: {e}")
def pull_data_from_oddsjam_update(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now().strftime("%Y%m%d")
logger.info(f"current date: {ds}")
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
pre_date = ds_date - datetime.timedelta(days=0)
pre_ds = pre_date.strftime("%Y%m%d")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
# oddsjam_bet_tracker.login_to_site()
oddsjam_bet_tracker.get_all_bet_status(ds)
intercept_response_res_save_path = (
oddsjam_bet_tracker.intercept_response_res_save_path
)
update_db_order_status(status_file_path=intercept_response_res_save_path)
dingtalk.send_text(f"{ds}: 比赛状态更新完成")
except Exception as e:
error_info = traceback.print_exc()
logger.error(error_info)
dingtalk.send_text(error_info)
def upload_new_bets_data2oddsjam(ds: str = None):
try:
if ds is None:
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
# oddsjam_bet_tracker.login_to_site()
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv")
ensure_directory_exists(target_path=bet_file_path, is_file=True)
data_info = get_tomorrow_over_bet_order_data(
save_file_path=bet_file_path, ds=ds
)
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
dingtalk.send_text(f"{ds}: \n {data_info}")
except Exception as e:
error_info = traceback.format_exc()
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker:
email_account = "aszer27937@gmail.com"
login_state_save_path = os.path.join(
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json"
)
ensure_directory_exists(target_path=login_state_save_path, is_file=True)
intercept_response_res_save_path = os.path.join(
root_dir,
"data",
"bet_data",
"bet_status",
f"{email_account}_status_{ds}.json",
)
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
oddsjam_bet_tracker = SyncOddsjamBetTracker(
login_state_path=login_state_save_path,
intercept_response_res_save_path=intercept_response_res_save_path,
# headless=True
)
return oddsjam_bet_tracker
def main():
ds = datetime.datetime.now() - datetime.timedelta(days=1)
ds = ds.strftime("%Y%m%d")
logger.info(f"current date: {ds}")
pull_data_from_oddsjam_update(ds=ds)
# upload_new_bets_data2oddsjam(ds=ds)
if __name__ == "__main__":
import schedule
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam)
schedule.every().day.at("11:20").do(pull_data_from_oddsjam_update)
schedule.every().day.at("14:00").do(upload_new_bets_data2oddsjam)
while True:
schedule.run_pending()
time.sleep(0.05)
# main()