419 lines
16 KiB
Python
419 lines
16 KiB
Python
import http
|
||
import sys
|
||
import os
|
||
import re
|
||
import math
|
||
import time
|
||
import json
|
||
import datetime
|
||
import traceback
|
||
import pandas as pd
|
||
from retry import retry
|
||
from common.utils import ensure_directory_exists
|
||
|
||
from dao.Database import Database
|
||
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
|
||
from loguru import logger
|
||
from data_model import MysqlConfig, OddsjamOrderStatus
|
||
from dingtalk import DingTalkBot
|
||
|
||
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
|
||
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
|
||
|
||
dingtalk = DingTalkBot(webhook, secret)
|
||
|
||
|
||
root_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
config_file_path = "./config/mysql_config.json"
|
||
mysql_config = MysqlConfig.parse_file(config_file_path)
|
||
dao = Database(mysql_config)
|
||
|
||
|
||
def query_by_create_time(ds: str):
|
||
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'"
|
||
rows = dao.fetchall(query=sql)
|
||
if rows:
|
||
return [OddsjamOrderStatus(row) for row in rows]
|
||
else:
|
||
return []
|
||
|
||
@retry(tries=6)
|
||
def update_bet_status(id: str, bet_status: str):
|
||
sql = "update oddsjam_order_all set bet_status = %s where id = %s"
|
||
dao.execute(query=sql, args=(bet_status, id))
|
||
|
||
|
||
class SyncOddsjamBetTracker:
|
||
def __init__(
|
||
self,
|
||
login_state_path: str,
|
||
intercept_response_res_save_path: str,
|
||
headless: bool = False,
|
||
default_time_out: float = 6000000,
|
||
):
|
||
self.login_state_path = login_state_path
|
||
self.headless = headless
|
||
self.default_time_out = default_time_out
|
||
self.intercept_response_res_save_path = intercept_response_res_save_path
|
||
self.total_bet_cnt = 0
|
||
|
||
def login_oddsjam_cookies(self, p, headless=False) -> tuple:
|
||
# 获取HTTP_PROXY环境变量,默认为None
|
||
import os
|
||
|
||
http_proxy = os.environ.get("HTTP_PROXY", None)
|
||
if http_proxy is None:
|
||
http_proxy = "http://127.0.0.1:7890"
|
||
logger.info("加载cookies {}", self.login_state_path)
|
||
browser = p.chromium.launch(
|
||
args=["--start-maximized"],
|
||
headless=headless,
|
||
proxy={
|
||
"server": http_proxy,
|
||
},
|
||
)
|
||
context = browser.new_context(
|
||
storage_state=self.login_state_path, no_viewport=True
|
||
)
|
||
page = context.new_page()
|
||
return page, browser
|
||
|
||
def login_to_site(self):
|
||
logger.info(f"login account: {self.email_account}")
|
||
with sync_playwright() as p:
|
||
browser = p.chromium.launch(headless=self.headless)
|
||
page = browser.new_page()
|
||
page.set_default_timeout(timeout=self.default_time_out)
|
||
url = "https://oddsjam.com/bet-tracker"
|
||
page.goto(url)
|
||
|
||
# page.get_by_label("Close Modal").click()
|
||
|
||
page.get_by_role("link", name="Login").click()
|
||
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
|
||
timeout=self.default_time_out
|
||
)
|
||
page.pause()
|
||
time.sleep(10)
|
||
try:
|
||
browser.contexts[0].storage_state(path=self.login_state_path)
|
||
except Exception as ex:
|
||
traceback.print_exc()
|
||
page.screenshot(path="error.png")
|
||
logger.info(page.url, "login_success")
|
||
browser.close()
|
||
|
||
@retry(tries=6)
|
||
def intercept_import_response(self, route, request):
|
||
if "oddsjam.com/api/backend/bets/import" in request.url:
|
||
response = route.fetch(timeout=50000)
|
||
logger.info(response.status)
|
||
if response.status == 500:
|
||
route.fulfill(response=response, json={})
|
||
return
|
||
route.continue_()
|
||
|
||
@retry(tries=6)
|
||
def upload_new_bets(self, bet_file_path: str):
|
||
url = "https://oddsjam.com/bet-tracker"
|
||
with sync_playwright() as p:
|
||
page: Page
|
||
browser: BrowserContext
|
||
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
|
||
page.set_default_timeout(timeout=self.default_time_out)
|
||
|
||
def on_response(response):
|
||
if "dromo-user-imports-production" in response.url:
|
||
print(response.status, response.request.failure)
|
||
if response.status == 400:
|
||
browser.close()
|
||
raise Exception(response.request.failure)
|
||
elif "oddsjam.com/api/backend/bets/import" in response.url:
|
||
logger.info(response.status)
|
||
|
||
page.on("response", on_response)
|
||
page.goto(url)
|
||
# page.pause()
|
||
page.get_by_role("button", name="Import Bets").nth(1).click()
|
||
time.sleep(5)
|
||
iframe_locator = page.frame_locator(
|
||
'iframe[title="Dromo Importer\\: Bets"]'
|
||
)
|
||
with page.expect_file_chooser() as fc_info:
|
||
iframe_locator.get_by_role(
|
||
"button", name="Choose a file", exact=True
|
||
).click()
|
||
time.sleep(5)
|
||
file_chooser = fc_info.value
|
||
file_chooser.set_files(bet_file_path)
|
||
|
||
iframe_locator.get_by_role(
|
||
"button", name="Confirm selection and continue"
|
||
).click()
|
||
time.sleep(5)
|
||
iframe_locator.get_by_role(
|
||
"button", name="Confirm matching and continue"
|
||
).click()
|
||
time.sleep(5)
|
||
iframe_locator.get_by_role("button", name="Continue").click()
|
||
iframe_locator.get_by_role("button", name="Finish").click()
|
||
try:
|
||
iframe_locator.get_by_role("button", name="Submit anyway").click(
|
||
timeout=5000
|
||
)
|
||
except Exception as ex:
|
||
...
|
||
# print(ex)
|
||
iframe_locator.get_by_role("button", name="Yes").click()
|
||
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
|
||
timeout=self.default_time_out
|
||
)
|
||
# os.remove(bet_file_path)
|
||
# page.pause()
|
||
browser.close()
|
||
|
||
def get_all_bet_status(self, ds: str = None):
|
||
if not ds:
|
||
ds = datetime.datetime.now().strftime("%Y%m%d")
|
||
day = datetime.datetime.strptime(ds, "%Y%m%d").day
|
||
logger.info(f"current date: {ds}")
|
||
logger.info(f"current day: {day}")
|
||
url = "https://oddsjam.com/bet-tracker"
|
||
with sync_playwright() as p:
|
||
page: Page
|
||
browser: BrowserContext
|
||
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
|
||
|
||
def on_response(response):
|
||
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
|
||
try:
|
||
# 确保响应已完成
|
||
if response.request.failure is None and response.status == 200:
|
||
data = response.json() # 注意:sync_api 中 .json() 是同步的
|
||
self.total_bet_cnt = data["totalCount"]
|
||
logger.info(f"total bet count: {self.total_bet_cnt}")
|
||
|
||
# 保存数据
|
||
with open(
|
||
self.intercept_response_res_save_path,
|
||
"a",
|
||
encoding="utf-8",
|
||
) as f:
|
||
json.dump(data, f, ensure_ascii=False)
|
||
f.write("\n")
|
||
except Exception as e:
|
||
logger.error(f"Error processing response: {e}")
|
||
|
||
# page.route("**", self.intercept_response)
|
||
page.on("response", on_response)
|
||
page.set_default_timeout(timeout=self.default_time_out)
|
||
page.goto(url=url)
|
||
try:
|
||
page.locator("#cello-widget-app").get_by_role("button").click(
|
||
timeout=6000
|
||
)
|
||
except:
|
||
...
|
||
|
||
# page.locator(".mt-4 > div > .inline-flex").first.click()
|
||
# time.sleep(5)
|
||
# page.get_by_label("Clear").click()
|
||
# time.sleep(5)
|
||
# page.get_by_role("button", name="Date Range").click()
|
||
# time.sleep(5)
|
||
# page.get_by_text("Custom", exact=True).click()
|
||
# time.sleep(5)
|
||
# page.get_by_role("button", name=f"{day}").click()
|
||
# time.sleep(5)
|
||
page.get_by_role(
|
||
"button", name=re.compile("Show (\d+) Results", re.IGNORECASE)
|
||
).click()
|
||
|
||
inner_text = page.get_by_text(
|
||
re.compile("Showing 1 to 50 of", re.IGNORECASE)
|
||
).inner_text()
|
||
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
|
||
total_bet_cnt = int(match[1])
|
||
total_page_no = math.ceil(total_bet_cnt / 50)
|
||
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages")
|
||
page.pause()
|
||
for page_no in range(2, total_page_no):
|
||
expect(page.get_by_role("button", name="Next")).to_be_visible(
|
||
timeout=self.default_time_out
|
||
)
|
||
page.wait_for_timeout(timeout=3000)
|
||
page.get_by_role("button", name="Next").click()
|
||
logger.info(f"current page number: {page_no} / {total_page_no}")
|
||
|
||
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
|
||
expect(page.get_by_role("button", name="Next")).to_be_visible(
|
||
timeout=self.default_time_out
|
||
)
|
||
# page.pause()
|
||
browser.close()
|
||
|
||
|
||
def get_tomorrow_over_bet_order_data(save_file_path: str, ds):
|
||
logger.info(f"load bet data from db: {ds}")
|
||
data_list = query_by_create_time(ds=ds)
|
||
data_list = [d.to_dict() for d in data_list]
|
||
bet_df = pd.DataFrame(data_list)
|
||
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
|
||
|
||
col_map = {
|
||
"sportsbooks": "Sportsbook",
|
||
"bet_name": "Bet Name",
|
||
"market": "Market Name",
|
||
"price": "Odds",
|
||
"stake": "Stake",
|
||
"event_name": "Event Name",
|
||
"sport": "Sport",
|
||
"league": "League",
|
||
"game_id": "Game ID",
|
||
"bet_type": "Bet Type",
|
||
"bet_id": "Notes",
|
||
"start_timestamp": "Game Start Date",
|
||
}
|
||
|
||
bet_df = bet_df.rename(columns=col_map)
|
||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
|
||
with open("./data/bet_data/betname_map.json", "r") as f:
|
||
bet_name_map = json.load(f)
|
||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
|
||
lambda x: bet_name_map.get(x, x)
|
||
)
|
||
# bet_df = bet_df.explode("Sportsbook")
|
||
bet_df = bet_df[col_map.values()]
|
||
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
|
||
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
|
||
)
|
||
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
|
||
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
|
||
|
||
logger.info(bet_df.shape)
|
||
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
|
||
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
|
||
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
|
||
info_message += f"bet order shape: {bet_df.shape}"
|
||
return info_message
|
||
|
||
|
||
def update_db_order_status(status_file_path: str):
|
||
logger.info(status_file_path)
|
||
data_list = []
|
||
with open(status_file_path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
data = json.loads(line)
|
||
data_list.extend(data["entities"])
|
||
status_df = pd.DataFrame(data_list)
|
||
|
||
def get_error_status(row):
|
||
if not pd.isna(row["autograder_errors"]):
|
||
return f"error: {row['autograder_errors']}"
|
||
return row["status"]
|
||
|
||
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
|
||
status_df = status_df[status_df["status"] != "pending"]
|
||
status_df = status_df[["status", "notes"]].drop_duplicates()
|
||
status_list = status_df.to_dict(orient="records")
|
||
for i, data in enumerate(status_list):
|
||
bet_id = data["notes"]
|
||
bet_status = data["status"]
|
||
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
|
||
for _ in range(3):
|
||
try:
|
||
update_bet_status(id=bet_id, bet_status=bet_status)
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"update bet status error: {e}")
|
||
|
||
|
||
def pull_data_from_oddsjam_update(ds: str = None):
|
||
try:
|
||
if ds is None:
|
||
ds = datetime.datetime.now().strftime("%Y%m%d")
|
||
logger.info(f"current date: {ds}")
|
||
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
|
||
pre_date = ds_date - datetime.timedelta(days=0)
|
||
pre_ds = pre_date.strftime("%Y%m%d")
|
||
|
||
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
|
||
# oddsjam_bet_tracker.login_to_site()
|
||
oddsjam_bet_tracker.get_all_bet_status(ds)
|
||
intercept_response_res_save_path = (
|
||
oddsjam_bet_tracker.intercept_response_res_save_path
|
||
)
|
||
update_db_order_status(status_file_path=intercept_response_res_save_path)
|
||
dingtalk.send_text(f"{ds}: 比赛状态更新完成")
|
||
except Exception as e:
|
||
error_info = traceback.print_exc()
|
||
logger.error(error_info)
|
||
dingtalk.send_text(error_info)
|
||
|
||
|
||
def upload_new_bets_data2oddsjam(ds: str = None):
|
||
try:
|
||
if ds is None:
|
||
ds = datetime.datetime.now() - datetime.timedelta(days=1)
|
||
ds = ds.strftime("%Y%m%d")
|
||
logger.info(f"current date: {ds}")
|
||
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
|
||
|
||
# oddsjam_bet_tracker.login_to_site()
|
||
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv")
|
||
ensure_directory_exists(target_path=bet_file_path, is_file=True)
|
||
data_info = get_tomorrow_over_bet_order_data(
|
||
save_file_path=bet_file_path, ds=ds
|
||
)
|
||
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
|
||
dingtalk.send_text(f"{ds}: \n {data_info}")
|
||
except Exception as e:
|
||
error_info = traceback.format_exc()
|
||
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
|
||
|
||
|
||
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker:
|
||
email_account = "aszer27937@gmail.com"
|
||
login_state_save_path = os.path.join(
|
||
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json"
|
||
)
|
||
ensure_directory_exists(target_path=login_state_save_path, is_file=True)
|
||
intercept_response_res_save_path = os.path.join(
|
||
root_dir,
|
||
"data",
|
||
"bet_data",
|
||
"bet_status",
|
||
f"{email_account}_status_{ds}.json",
|
||
)
|
||
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
|
||
oddsjam_bet_tracker = SyncOddsjamBetTracker(
|
||
login_state_path=login_state_save_path,
|
||
intercept_response_res_save_path=intercept_response_res_save_path,
|
||
# headless=True
|
||
)
|
||
return oddsjam_bet_tracker
|
||
|
||
|
||
def main():
|
||
ds = datetime.datetime.now() - datetime.timedelta(days=1)
|
||
ds = ds.strftime("%Y%m%d")
|
||
logger.info(f"current date: {ds}")
|
||
|
||
pull_data_from_oddsjam_update(ds=ds)
|
||
# upload_new_bets_data2oddsjam(ds=ds)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import schedule
|
||
|
||
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam)
|
||
|
||
schedule.every().day.at("11:20").do(pull_data_from_oddsjam_update)
|
||
schedule.every().day.at("14:00").do(upload_new_bets_data2oddsjam)
|
||
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(0.05)
|
||
# main()
|