重命名
This commit is contained in:
@@ -1,423 +1,560 @@
|
||||
import http
|
||||
import sys
|
||||
"""
|
||||
OddsJam Bet Tracker - 重构版本
|
||||
按照Python最佳实践重构,支持环境变量配置,模块化设计
|
||||
|
||||
核心功能:
|
||||
1. 定时从数据库拉取数据上传到OddsJam
|
||||
2. 定时从OddsJam获取结果数据更新到数据库
|
||||
"""
|
||||
|
||||
from email import message
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import math
|
||||
import time
|
||||
import json
|
||||
import datetime
|
||||
import traceback
|
||||
from typing import Optional, List, Dict, Tuple
|
||||
from dataclasses import dataclass
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import pandas as pd
|
||||
from retry import retry
|
||||
from common.utils import ensure_directory_exists
|
||||
|
||||
from dao.Database import Database
|
||||
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
|
||||
from loguru import logger
|
||||
|
||||
# 导入项目模块
|
||||
from dao.Database import Database
|
||||
from data_model import MysqlConfig, OddsjamOrderStatus
|
||||
from common.dingtalk import DingTalkBot
|
||||
|
||||
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
|
||||
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
|
||||
|
||||
dingtalk = DingTalkBot(webhook, secret)
|
||||
from common.utils import ensure_directory_exists
|
||||
|
||||
|
||||
root_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
@dataclass
|
||||
class AppConfig:
|
||||
"""应用配置类 - 支持环境变量配置"""
|
||||
|
||||
config_file_path = "./config/mysql_config.json"
|
||||
mysql_config = MysqlConfig.parse_file(config_file_path)
|
||||
dao = Database(mysql_config)
|
||||
# 邮箱配置
|
||||
email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me")
|
||||
|
||||
# 代理配置
|
||||
http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890")
|
||||
|
||||
# 数据库表名
|
||||
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
|
||||
|
||||
# 定时任务配置
|
||||
upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48")
|
||||
pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00")
|
||||
|
||||
# 文件路径配置
|
||||
root_dir: str = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
|
||||
|
||||
# 钉钉配置
|
||||
dingtalk_webhook: str = os.getenv(
|
||||
"DINGTALK_WEBHOOK",
|
||||
"https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4",
|
||||
)
|
||||
dingtalk_secret: str = os.getenv(
|
||||
"DINGTALK_SECRET",
|
||||
"SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e",
|
||||
)
|
||||
|
||||
# 浏览器配置
|
||||
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
|
||||
default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000"))
|
||||
|
||||
def __str__(self) -> str:
|
||||
# 返回邮箱和数据表名的信息
|
||||
return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})"
|
||||
|
||||
def __post_init__(self):
|
||||
"""初始化后处理"""
|
||||
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
|
||||
self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret)
|
||||
|
||||
# 构建文件路径
|
||||
self.login_state_path = os.path.join(
|
||||
self.root_dir,
|
||||
"data",
|
||||
"bet_data",
|
||||
"account_login_state",
|
||||
f"{self.email_account}.json",
|
||||
)
|
||||
self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv")
|
||||
self.bet_name_map_path = os.path.join(
|
||||
self.root_dir, "data", "bet_data", "betname_map.json"
|
||||
)
|
||||
|
||||
|
||||
def query_by_create_time(ds: str):
|
||||
sql = f"select * from oddsjam_order_all where DATE(create_time) = '{ds}'"
|
||||
rows = dao.fetchall(query=sql)
|
||||
if rows:
|
||||
return [OddsjamOrderStatus(row) for row in rows]
|
||||
else:
|
||||
return []
|
||||
class DatabaseService:
|
||||
"""数据库服务类 - 负责所有数据库操作"""
|
||||
|
||||
@retry(tries=6)
|
||||
def update_bet_status(id: str, bet_status: str):
|
||||
sql = "update oddsjam_order_all set bet_status = %s where id = %s"
|
||||
dao.execute(query=sql, args=(bet_status, id))
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.dao = Database(config.mysql_config)
|
||||
|
||||
def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]:
|
||||
"""根据日期查询订单数据"""
|
||||
sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'"
|
||||
rows = self.dao.fetchall(query=sql)
|
||||
return [OddsjamOrderStatus(row) for row in rows] if rows else []
|
||||
|
||||
@retry(tries=6)
|
||||
def update_bet_status(self, bet_id: str, bet_status: str) -> None:
|
||||
"""更新投注状态"""
|
||||
sql = f"update {self.config.table_name} set bet_status = %s where id = %s"
|
||||
self.dao.execute(query=sql, args=(bet_status, bet_id))
|
||||
|
||||
|
||||
class SyncOddsjamBetTracker:
|
||||
def __init__(
|
||||
self,
|
||||
login_state_path: str,
|
||||
intercept_response_res_save_path: str,
|
||||
headless: bool = False,
|
||||
default_time_out: float = 6000000,
|
||||
):
|
||||
self.login_state_path = login_state_path
|
||||
self.headless = headless
|
||||
self.default_time_out = default_time_out
|
||||
self.intercept_response_res_save_path = intercept_response_res_save_path
|
||||
class OddsjamService:
|
||||
"""OddsJam业务服务类 - 负责与OddsJam网站的交互"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.total_bet_cnt = 0
|
||||
|
||||
def login_oddsjam_cookies(self, p, headless=False) -> tuple:
|
||||
# 获取HTTP_PROXY环境变量,默认为None
|
||||
import os
|
||||
def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]:
|
||||
"""获取浏览器上下文"""
|
||||
logger.info(f"加载cookies: {self.config.login_state_path}")
|
||||
|
||||
http_proxy = os.environ.get("HTTP_PROXY", None)
|
||||
if http_proxy is None:
|
||||
http_proxy = "http://127.0.0.1:7890"
|
||||
logger.info("加载cookies {}", self.login_state_path)
|
||||
browser = p.chromium.launch(
|
||||
browser = playwright.chromium.launch(
|
||||
args=["--start-maximized"],
|
||||
headless=headless,
|
||||
proxy={
|
||||
"server": http_proxy,
|
||||
},
|
||||
headless=self.config.headless,
|
||||
proxy=(
|
||||
{"server": self.config.http_proxy} if self.config.http_proxy else None
|
||||
),
|
||||
)
|
||||
|
||||
context = browser.new_context(
|
||||
storage_state=self.login_state_path, no_viewport=True
|
||||
storage_state=self.config.login_state_path, no_viewport=True
|
||||
)
|
||||
page = context.new_page()
|
||||
return page, browser
|
||||
|
||||
def login_to_site(self):
|
||||
logger.info(f"login account: {self.email_account}")
|
||||
def login_to_site(self) -> None:
|
||||
"""登录到OddsJam网站"""
|
||||
logger.info(f"登录账户: {self.config.email_account}")
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=self.headless)
|
||||
browser = p.chromium.launch(headless=self.config.headless)
|
||||
page = browser.new_page()
|
||||
page.set_default_timeout(timeout=self.default_time_out)
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
page.goto(url)
|
||||
|
||||
# page.get_by_label("Close Modal").click()
|
||||
|
||||
page.get_by_role("link", name="Login").click()
|
||||
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
|
||||
timeout=self.default_time_out
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
|
||||
page.pause()
|
||||
time.sleep(10)
|
||||
|
||||
try:
|
||||
browser.contexts[0].storage_state(path=self.login_state_path)
|
||||
browser.contexts[0].storage_state(path=self.config.login_state_path)
|
||||
logger.info(f"登录成功: {page.url}")
|
||||
except Exception as ex:
|
||||
logger.error(f"保存登录状态失败: {ex}")
|
||||
traceback.print_exc()
|
||||
page.screenshot(path="error.png")
|
||||
logger.info(page.url, "login_success")
|
||||
|
||||
browser.close()
|
||||
|
||||
@retry(tries=6)
|
||||
def intercept_import_response(self, route, request):
|
||||
if "oddsjam.com/api/backend/bets/import" in request.url:
|
||||
response = route.fetch(timeout=50000)
|
||||
logger.info(response.status)
|
||||
if response.status == 500:
|
||||
route.fulfill(response=response, json={})
|
||||
return
|
||||
route.continue_()
|
||||
|
||||
@retry(tries=6)
|
||||
def upload_new_bets(self, bet_file_path: str):
|
||||
def upload_bets(self, bet_file_path: str) -> None:
|
||||
"""上传投注数据到OddsJam"""
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
|
||||
with sync_playwright() as p:
|
||||
page: Page
|
||||
browser: BrowserContext
|
||||
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
|
||||
page.set_default_timeout(timeout=self.default_time_out)
|
||||
page, browser = self._get_browser_context(p)
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
|
||||
def on_response(response):
|
||||
if "dromo-user-imports-production" in response.url:
|
||||
print(response.status, response.request.failure)
|
||||
logger.info(f"上传响应状态: {response.status}")
|
||||
if response.status == 400:
|
||||
browser.close()
|
||||
raise Exception(response.request.failure)
|
||||
raise Exception(f"上传失败: {response.request.failure}")
|
||||
elif "oddsjam.com/api/backend/bets/import" in response.url:
|
||||
logger.info(response.status)
|
||||
logger.info(f"导入响应状态: {response.status}")
|
||||
|
||||
page.on("response", on_response)
|
||||
page.goto(url)
|
||||
# page.pause()
|
||||
page.get_by_role("button", name="Import Bets").nth(1).click()
|
||||
time.sleep(5)
|
||||
iframe_locator = page.frame_locator(
|
||||
'iframe[title="Dromo Importer\\: Bets"]'
|
||||
)
|
||||
with page.expect_file_chooser() as fc_info:
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Choose a file", exact=True
|
||||
).click()
|
||||
time.sleep(5)
|
||||
file_chooser = fc_info.value
|
||||
file_chooser.set_files(bet_file_path)
|
||||
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm selection and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm matching and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
iframe_locator.get_by_role("button", name="Continue").click()
|
||||
iframe_locator.get_by_role("button", name="Finish").click()
|
||||
try:
|
||||
iframe_locator.get_by_role("button", name="Submit anyway").click(
|
||||
timeout=5000
|
||||
)
|
||||
except Exception as ex:
|
||||
...
|
||||
# print(ex)
|
||||
iframe_locator.get_by_role("button", name="Yes").click()
|
||||
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
|
||||
timeout=self.default_time_out
|
||||
)
|
||||
# os.remove(bet_file_path)
|
||||
# page.pause()
|
||||
# 执行上传流程
|
||||
self._execute_upload_flow(page, bet_file_path)
|
||||
|
||||
browser.close()
|
||||
|
||||
def get_all_bet_status(self, ds: str = None):
|
||||
if not ds:
|
||||
ds = datetime.datetime.now().strftime("%Y%m%d")
|
||||
day = datetime.datetime.strptime(ds, "%Y%m%d").day
|
||||
logger.info(f"current date: {ds}")
|
||||
logger.info(f"current day: {day}")
|
||||
def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None:
|
||||
"""执行上传流程"""
|
||||
page.get_by_role("button", name="Import Bets").nth(1).click()
|
||||
time.sleep(5)
|
||||
|
||||
iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]')
|
||||
|
||||
# 选择文件
|
||||
with page.expect_file_chooser() as fc_info:
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Choose a file", exact=True
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
file_chooser = fc_info.value
|
||||
file_chooser.set_files(bet_file_path)
|
||||
|
||||
# 确认选择
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm selection and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
# 确认匹配
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm matching and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
# 继续
|
||||
iframe_locator.get_by_role("button", name="Continue").click()
|
||||
|
||||
# 完成
|
||||
iframe_locator.get_by_role("button", name="Finish").click()
|
||||
|
||||
# 处理可能的错误
|
||||
try:
|
||||
iframe_locator.get_by_role("button", name="Submit anyway").click(
|
||||
timeout=5000
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
iframe_locator.get_by_role("button", name="Yes").click()
|
||||
|
||||
# 等待处理完成
|
||||
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
|
||||
def get_all_bet_status(self, date_str: str, status_save_path: str) -> None:
|
||||
"""获取所有投注状态"""
|
||||
logger.info(f"获取投注状态,日期: {date_str}")
|
||||
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
|
||||
with sync_playwright() as p:
|
||||
page: Page
|
||||
browser: BrowserContext
|
||||
page, browser = self.login_oddsjam_cookies(p, headless=self.headless)
|
||||
page, browser = self._get_browser_context(p)
|
||||
|
||||
def on_response(response):
|
||||
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
|
||||
try:
|
||||
# 确保响应已完成
|
||||
if response.request.failure is None and response.status == 200:
|
||||
data = response.json() # 注意: sync_api 中 .json() 是同步的
|
||||
data = response.json()
|
||||
self.total_bet_cnt = data["totalCount"]
|
||||
logger.info(f"total bet count: {self.total_bet_cnt}")
|
||||
logger.info(f"总投注数量: {self.total_bet_cnt}")
|
||||
|
||||
# 保存数据
|
||||
with open(
|
||||
self.intercept_response_res_save_path,
|
||||
"a",
|
||||
encoding="utf-8",
|
||||
) as f:
|
||||
# 保存响应数据
|
||||
with open(status_save_path, "a", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing response: {e}")
|
||||
logger.error(f"处理响应数据失败: {e}")
|
||||
|
||||
# page.route("**", self.intercept_response)
|
||||
page.on("response", on_response)
|
||||
page.set_default_timeout(timeout=self.default_time_out)
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
page.goto(url=url)
|
||||
|
||||
# 处理弹窗
|
||||
try:
|
||||
page.locator("#cello-widget-app").get_by_role("button").click(
|
||||
timeout=6000
|
||||
)
|
||||
except:
|
||||
...
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# page.locator(".mt-4 > div > .inline-flex").first.click()
|
||||
# time.sleep(5)
|
||||
# page.get_by_label("Clear").click()
|
||||
# time.sleep(5)
|
||||
# page.get_by_role("button", name="Date Range").click()
|
||||
# time.sleep(5)
|
||||
# page.get_by_text("Custom", exact=True).click()
|
||||
# time.sleep(5)
|
||||
# page.get_by_role("button", name=f"{day}").click()
|
||||
# time.sleep(5)
|
||||
# page.get_by_role(
|
||||
# "button", name=re.compile(r"Show (\d+) Results", re.IGNORECASE)
|
||||
# ).click()
|
||||
# 获取总页数并翻页
|
||||
self._navigate_all_pages(page)
|
||||
|
||||
inner_text = page.get_by_text(
|
||||
re.compile("Showing 1 to 50 of", re.IGNORECASE)
|
||||
).inner_text()
|
||||
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
|
||||
total_bet_cnt = int(match[1])
|
||||
total_page_no = math.ceil(total_bet_cnt / 50)
|
||||
logger.info(f"total {total_bet_cnt} results, {total_page_no} pages")
|
||||
page.pause()
|
||||
for page_no in range(2, total_page_no):
|
||||
browser.close()
|
||||
|
||||
def _navigate_all_pages(self, page: Page) -> None:
|
||||
"""导航所有页面"""
|
||||
inner_text = page.get_by_text(
|
||||
re.compile("Showing 1 to 50 of", re.IGNORECASE)
|
||||
).inner_text()
|
||||
|
||||
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
|
||||
if not match:
|
||||
raise ValueError("无法获取总结果数")
|
||||
|
||||
total_bet_cnt = int(match[1])
|
||||
total_page_no = math.ceil(total_bet_cnt / 50)
|
||||
logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页")
|
||||
|
||||
# 翻页到最后一页
|
||||
for page_no in range(2, total_page_no):
|
||||
if page_no <= total_page_no:
|
||||
expect(page.get_by_role("button", name="Next")).to_be_visible(
|
||||
timeout=self.default_time_out
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
page.wait_for_timeout(timeout=3000)
|
||||
page.get_by_role("button", name="Next").click()
|
||||
logger.info(f"current page number: {page_no} / {total_page_no}")
|
||||
logger.info(f"当前页码: {page_no} / {total_page_no}")
|
||||
|
||||
page.get_by_role("button", name=f"{total_page_no}", exact=True).click()
|
||||
expect(page.get_by_role("button", name="Next")).to_be_visible(
|
||||
timeout=self.default_time_out
|
||||
|
||||
class BetDataProcessor:
|
||||
"""投注数据处理服务类"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
|
||||
def prepare_bet_data_for_upload(self, date_str: str) -> str:
|
||||
"""准备上传的投注数据"""
|
||||
logger.info(f"准备投注数据,日期: {date_str}")
|
||||
|
||||
# 确保目录存在
|
||||
ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True)
|
||||
|
||||
# 从数据库获取数据
|
||||
db_service = DatabaseService(self.config)
|
||||
data_list = db_service.query_orders_by_date(date_str)
|
||||
|
||||
if not data_list:
|
||||
message = f"日期 {date_str} 没有找到投注数据"
|
||||
logger.warning(message)
|
||||
raise ValueError(message)
|
||||
|
||||
# 转换为DataFrame
|
||||
bet_df = pd.DataFrame([d.to_dict() for d in data_list])
|
||||
|
||||
# 列名映射
|
||||
col_map = {
|
||||
"sportsbooks": "Sportsbook",
|
||||
"bet_name": "Bet Name",
|
||||
"market": "Market Name",
|
||||
"price": "Odds",
|
||||
"stake": "Stake",
|
||||
"event_name": "Event Name",
|
||||
"sport": "Sport",
|
||||
"league": "League",
|
||||
"game_id": "Game ID",
|
||||
"bet_type": "Bet Type",
|
||||
"bet_id": "Notes",
|
||||
"start_timestamp": "Game Start Date",
|
||||
}
|
||||
|
||||
bet_df = bet_df.rename(columns=col_map)
|
||||
|
||||
# 处理Sportsbook字段
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
|
||||
|
||||
# 加载博彩公司名称映射
|
||||
with open(self.config.bet_name_map_path, "r") as f:
|
||||
bet_name_map = json.load(f)
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
|
||||
lambda x: bet_name_map.get(x, x)
|
||||
)
|
||||
# page.pause()
|
||||
browser.close()
|
||||
|
||||
# 选择需要的列
|
||||
bet_df = bet_df[col_map.values()]
|
||||
|
||||
def get_tomorrow_over_bet_order_data(save_file_path: str, ds):
|
||||
logger.info(f"load bet data from db: {ds}")
|
||||
data_list = query_by_create_time(ds=ds)
|
||||
data_list = [d.to_dict() for d in data_list]
|
||||
bet_df = pd.DataFrame(data_list)
|
||||
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
|
||||
|
||||
col_map = {
|
||||
"sportsbooks": "Sportsbook",
|
||||
"bet_name": "Bet Name",
|
||||
"market": "Market Name",
|
||||
"price": "Odds",
|
||||
"stake": "Stake",
|
||||
"event_name": "Event Name",
|
||||
"sport": "Sport",
|
||||
"league": "League",
|
||||
"game_id": "Game ID",
|
||||
"bet_type": "Bet Type",
|
||||
"bet_id": "Notes",
|
||||
"start_timestamp": "Game Start Date",
|
||||
}
|
||||
|
||||
bet_df = bet_df.rename(columns=col_map)
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
|
||||
with open("./data/bet_data/betname_map.json", "r") as f:
|
||||
bet_name_map = json.load(f)
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
|
||||
lambda x: bet_name_map.get(x, x)
|
||||
# 格式化时间
|
||||
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
|
||||
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime(
|
||||
"%Y/%m/%d %H:%M"
|
||||
)
|
||||
)
|
||||
# bet_df = bet_df.explode("Sportsbook")
|
||||
bet_df = bet_df[col_map.values()]
|
||||
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
|
||||
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime("%Y/%m/%d %H:%M")
|
||||
)
|
||||
logger.info(f"bet order min start date: {bet_df['Game Start Date'].min()}")
|
||||
logger.info(f"bet order max start date: {bet_df['Game Start Date'].max()}")
|
||||
|
||||
logger.info(bet_df.shape)
|
||||
bet_df.to_csv(save_file_path, encoding="utf-8-sig", index=False)
|
||||
info_message = f"bet order min start date: {bet_df['Game Start Date'].min()}\n"
|
||||
info_message += f"bet order max start date: {bet_df['Game Start Date'].max()}\n"
|
||||
info_message += f"bet order shape: {bet_df.shape}"
|
||||
return info_message
|
||||
# 保存到文件
|
||||
bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False)
|
||||
|
||||
logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}")
|
||||
logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}")
|
||||
logger.info(f"投注数据形状: {bet_df.shape}")
|
||||
|
||||
def update_db_order_status(status_file_path: str):
|
||||
logger.info(status_file_path)
|
||||
data_list = []
|
||||
with open(status_file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
data_list.extend(data["entities"])
|
||||
status_df = pd.DataFrame(data_list)
|
||||
|
||||
def get_error_status(row):
|
||||
if not pd.isna(row["autograder_errors"]):
|
||||
return f"error: {row['autograder_errors']}"
|
||||
return row["status"]
|
||||
|
||||
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
|
||||
status_df = status_df[status_df["status"] != "pending"]
|
||||
status_df = status_df[["status", "notes"]].drop_duplicates()
|
||||
status_list = status_df.to_dict(orient="records")
|
||||
for i, data in enumerate(status_list):
|
||||
bet_id = data["notes"]
|
||||
bet_status = data["status"]
|
||||
logger.info(f"{i+1}/{len(status_list)}, status->{bet_status}, bet_id->{bet_id}")
|
||||
for _ in range(3):
|
||||
try:
|
||||
update_bet_status(id=bet_id, bet_status=bet_status)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"update bet status error: {e}")
|
||||
|
||||
|
||||
def pull_data_from_oddsjam_update(ds: str = None):
|
||||
try:
|
||||
if ds is None:
|
||||
ds = datetime.datetime.now().strftime("%Y%m%d")
|
||||
logger.info(f"current date: {ds}")
|
||||
ds_date = datetime.datetime.strptime(ds, "%Y%m%d")
|
||||
pre_date = ds_date - datetime.timedelta(days=0)
|
||||
pre_ds = pre_date.strftime("%Y%m%d")
|
||||
|
||||
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=pre_ds)
|
||||
# oddsjam_bet_tracker.login_to_site()
|
||||
oddsjam_bet_tracker.get_all_bet_status(ds)
|
||||
intercept_response_res_save_path = (
|
||||
oddsjam_bet_tracker.intercept_response_res_save_path
|
||||
return (
|
||||
f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n"
|
||||
f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n"
|
||||
f"投注数据形状: {bet_df.shape}"
|
||||
)
|
||||
update_db_order_status(status_file_path=intercept_response_res_save_path)
|
||||
dingtalk.send_text(f"{ds}: 比赛状态更新完成")
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
logger.error(error_info)
|
||||
dingtalk.send_text(error_info)
|
||||
|
||||
def process_status_data(self, status_file_path: str) -> None:
|
||||
"""处理状态数据并更新数据库"""
|
||||
logger.info(f"处理状态数据: {status_file_path}")
|
||||
|
||||
data_list = []
|
||||
with open(status_file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
data_list.extend(data["entities"])
|
||||
|
||||
status_df = pd.DataFrame(data_list)
|
||||
|
||||
# 处理错误状态
|
||||
def get_error_status(row):
|
||||
if not pd.isna(row["autograder_errors"]):
|
||||
return f"error: {row['autograder_errors']}"
|
||||
return row["status"]
|
||||
|
||||
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
|
||||
status_df = status_df[status_df["status"] != "pending"]
|
||||
status_df = status_df[["status", "notes"]].drop_duplicates()
|
||||
|
||||
# 更新数据库
|
||||
db_service = DatabaseService(self.config)
|
||||
status_list = status_df.to_dict(orient="records")
|
||||
|
||||
for i, data in enumerate(status_list):
|
||||
bet_id = data["notes"]
|
||||
bet_status = data["status"]
|
||||
logger.info(
|
||||
f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}"
|
||||
)
|
||||
|
||||
for _ in range(3):
|
||||
try:
|
||||
db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"更新投注状态失败: {e}")
|
||||
|
||||
|
||||
def upload_new_bets_data2oddsjam(ds: str = None):
|
||||
try:
|
||||
if ds is None:
|
||||
ds = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
ds = ds.strftime("%Y%m%d")
|
||||
logger.info(f"current date: {ds}")
|
||||
oddsjam_bet_tracker = get_oddsjam_bet_tracker(ds=ds)
|
||||
class TaskScheduler:
|
||||
"""定时任务调度器"""
|
||||
|
||||
# oddsjam_bet_tracker.login_to_site()
|
||||
bet_file_path = os.path.join(root_dir, "data", "bet_data", "bet.csv")
|
||||
ensure_directory_exists(target_path=bet_file_path, is_file=True)
|
||||
data_info = get_tomorrow_over_bet_order_data(
|
||||
save_file_path=bet_file_path, ds=ds
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.oddsjam_service = OddsjamService(config)
|
||||
self.data_processor = BetDataProcessor(config)
|
||||
|
||||
def upload_bets_task(self, date_str: Optional[str] = None) -> None:
|
||||
"""上传 T-1 时间的投注数据任务"""
|
||||
config_base_info = str(self.config)
|
||||
try:
|
||||
if date_str is None:
|
||||
date_str = (
|
||||
datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
).strftime("%Y%m%d")
|
||||
|
||||
logger.info(f"执行上传任务,日期: {date_str}")
|
||||
|
||||
# 准备数据
|
||||
data_info = self.data_processor.prepare_bet_data_for_upload(date_str)
|
||||
|
||||
if not data_info:
|
||||
logger.warning(f"日期 {date_str} 没有数据需要上传")
|
||||
return
|
||||
|
||||
# 上传到OddsJam
|
||||
self.oddsjam_service.upload_bets(self.config.bet_file_path)
|
||||
|
||||
# 发送通知
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
logger.error(f"上传任务失败: {error_info}")
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}"
|
||||
)
|
||||
|
||||
def pull_status_task(self, date_str: Optional[str] = None) -> None:
|
||||
"""拉取状态数据任务"""
|
||||
config_base_info = str(self.config)
|
||||
try:
|
||||
if date_str is None:
|
||||
date_str = datetime.datetime.now().strftime("%Y%m%d")
|
||||
|
||||
logger.info(f"执行拉取任务,日期: {date_str}")
|
||||
|
||||
# 构建状态文件路径
|
||||
status_file_path = os.path.join(
|
||||
self.config.root_dir,
|
||||
"data",
|
||||
"bet_data",
|
||||
"bet_status",
|
||||
f"{self.config.email_account}_status_{date_str}.json",
|
||||
)
|
||||
ensure_directory_exists(target_path=status_file_path, is_file=True)
|
||||
|
||||
# 获取状态数据
|
||||
self.oddsjam_service.get_all_bet_status(date_str, status_file_path)
|
||||
|
||||
# 处理状态数据
|
||||
self.data_processor.process_status_data(status_file_path)
|
||||
|
||||
# 发送通知
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}: 比赛状态更新完成\n{config_base_info}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
logger.error(f"拉取任务失败: {error_info}")
|
||||
self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}")
|
||||
|
||||
|
||||
class BetTrackerApplication:
|
||||
"""主应用类"""
|
||||
|
||||
def __init__(self):
|
||||
self.config = AppConfig()
|
||||
self.scheduler = TaskScheduler(self.config)
|
||||
|
||||
def run_scheduled_tasks(self) -> None:
|
||||
"""运行定时任务"""
|
||||
import schedule
|
||||
|
||||
logger.info(f"上传定时任务: {self.config.upload_schedule}")
|
||||
logger.info(f"拉取定时任务: {self.config.pull_schedule}")
|
||||
|
||||
# 设置定时任务
|
||||
schedule.every().day.at(self.config.pull_schedule).do(
|
||||
self.scheduler.pull_status_task
|
||||
)
|
||||
schedule.every().day.at(self.config.upload_schedule).do(
|
||||
self.scheduler.upload_bets_task
|
||||
)
|
||||
oddsjam_bet_tracker.upload_new_bets(bet_file_path=bet_file_path)
|
||||
dingtalk.send_text(f"{ds}: \n {data_info}")
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
dingtalk.send_text(f"{ds}: 上传比赛失败: {e}\n{error_info}")
|
||||
|
||||
# 运行定时任务
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(0.05)
|
||||
|
||||
def get_oddsjam_bet_tracker(ds: str) -> SyncOddsjamBetTracker:
|
||||
email_account = "aszer27937@gmail.com"
|
||||
login_state_save_path = os.path.join(
|
||||
root_dir, "data", "bet_data", "account_login_state", f"{email_account}.json"
|
||||
)
|
||||
ensure_directory_exists(target_path=login_state_save_path, is_file=True)
|
||||
intercept_response_res_save_path = os.path.join(
|
||||
root_dir,
|
||||
"data",
|
||||
"bet_data",
|
||||
"bet_status",
|
||||
f"{email_account}_status_{ds}.json",
|
||||
)
|
||||
ensure_directory_exists(target_path=intercept_response_res_save_path, is_file=True)
|
||||
oddsjam_bet_tracker = SyncOddsjamBetTracker(
|
||||
login_state_path=login_state_save_path,
|
||||
intercept_response_res_save_path=intercept_response_res_save_path,
|
||||
headless=True
|
||||
)
|
||||
return oddsjam_bet_tracker
|
||||
def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None:
|
||||
"""运行手动任务"""
|
||||
if task_type == "upload":
|
||||
self.scheduler.upload_bets_task(date_str)
|
||||
elif task_type == "pull":
|
||||
self.scheduler.pull_status_task(date_str)
|
||||
else:
|
||||
logger.error(f"未知的任务类型: {task_type}")
|
||||
|
||||
|
||||
def main():
|
||||
ds = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
ds = ds.strftime("%Y%m%d")
|
||||
logger.info(f"current date: {ds}")
|
||||
"""主入口函数"""
|
||||
app = BetTrackerApplication()
|
||||
|
||||
pull_data_from_oddsjam_update(ds=ds)
|
||||
# upload_new_bets_data2oddsjam(ds=ds)
|
||||
# 检查命令行参数
|
||||
if len(sys.argv) > 1:
|
||||
task_type = sys.argv[1]
|
||||
date_str = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
app.run_manual_task(task_type, date_str)
|
||||
else:
|
||||
# 运行定时任务
|
||||
app.run_scheduled_tasks()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import schedule
|
||||
main()
|
||||
|
||||
# schedule.every().day.at("07:00").do(clear_order_from_oddsjam)
|
||||
UPLOAD_SCHEDULE = os.environ.get("UPLOAD_SCHEDULE", "10:00")
|
||||
PULL_SCHEDULE = os.environ.get("PULL_SCHEDULE", "13:00")
|
||||
logger.info(f"upload schedule: {UPLOAD_SCHEDULE}")
|
||||
logger.info(f"pull schedule: {PULL_SCHEDULE}")
|
||||
|
||||
|
||||
schedule.every().day.at(PULL_SCHEDULE).do(pull_data_from_oddsjam_update)
|
||||
schedule.every().day.at(UPLOAD_SCHEDULE).do(upload_new_bets_data2oddsjam)
|
||||
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(0.05)
|
||||
# main()
|
||||
# app = BetTrackerApplication()
|
||||
# logger.info(app.config)
|
||||
# app.run_manual_task(task_type="upload", date_str="20251026")
|
||||
|
||||
@@ -1,560 +0,0 @@
|
||||
"""
|
||||
OddsJam Bet Tracker - 重构版本
|
||||
按照Python最佳实践重构,支持环境变量配置,模块化设计
|
||||
|
||||
核心功能:
|
||||
1. 定时从数据库拉取数据上传到OddsJam
|
||||
2. 定时从OddsJam获取结果数据更新到数据库
|
||||
"""
|
||||
|
||||
from email import message
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import math
|
||||
import time
|
||||
import json
|
||||
import datetime
|
||||
import traceback
|
||||
from typing import Optional, List, Dict, Tuple
|
||||
from dataclasses import dataclass
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import pandas as pd
|
||||
from retry import retry
|
||||
from playwright.sync_api import sync_playwright, Page, BrowserContext, expect
|
||||
from loguru import logger
|
||||
|
||||
# 导入项目模块
|
||||
from dao.Database import Database
|
||||
from data_model import MysqlConfig, OddsjamOrderStatus
|
||||
from common.dingtalk import DingTalkBot
|
||||
from common.utils import ensure_directory_exists
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppConfig:
|
||||
"""应用配置类 - 支持环境变量配置"""
|
||||
|
||||
# 邮箱配置
|
||||
email_account: str = os.getenv("ODDSJAM_EMAIL", "ojbbbb21@proton.me")
|
||||
|
||||
# 代理配置
|
||||
http_proxy: str = os.getenv("HTTP_PROXY", "http://127.0.0.1:7890")
|
||||
|
||||
# 数据库表名
|
||||
table_name: str = os.getenv("BET_TABLE_NAME", "oddsjam_order")
|
||||
|
||||
# 定时任务配置
|
||||
upload_schedule: str = os.getenv("UPLOAD_SCHEDULE", "09:48")
|
||||
pull_schedule: str = os.getenv("PULL_SCHEDULE", "13:00")
|
||||
|
||||
# 文件路径配置
|
||||
root_dir: str = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file_path: str = os.getenv("MYSQL_CONFIG_PATH", "./config/mysql_config.json")
|
||||
|
||||
# 钉钉配置
|
||||
dingtalk_webhook: str = os.getenv(
|
||||
"DINGTALK_WEBHOOK",
|
||||
"https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4",
|
||||
)
|
||||
dingtalk_secret: str = os.getenv(
|
||||
"DINGTALK_SECRET",
|
||||
"SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e",
|
||||
)
|
||||
|
||||
# 浏览器配置
|
||||
headless: bool = os.getenv("HEADLESS", "true").lower() == "true"
|
||||
default_timeout: float = float(os.getenv("DEFAULT_TIMEOUT", "6000000"))
|
||||
|
||||
def __str__(self) -> str:
|
||||
# 返回邮箱和数据表名的信息
|
||||
return f"AppConfig(email_account={self.email_account}, table_name={self.table_name})"
|
||||
|
||||
def __post_init__(self):
|
||||
"""初始化后处理"""
|
||||
self.mysql_config = MysqlConfig.parse_file(self.config_file_path)
|
||||
self.dingtalk = DingTalkBot(self.dingtalk_webhook, self.dingtalk_secret)
|
||||
|
||||
# 构建文件路径
|
||||
self.login_state_path = os.path.join(
|
||||
self.root_dir,
|
||||
"data",
|
||||
"bet_data",
|
||||
"account_login_state",
|
||||
f"{self.email_account}.json",
|
||||
)
|
||||
self.bet_file_path = os.path.join(self.root_dir, "data", "bet_data", "bet.csv")
|
||||
self.bet_name_map_path = os.path.join(
|
||||
self.root_dir, "data", "bet_data", "betname_map.json"
|
||||
)
|
||||
|
||||
|
||||
class DatabaseService:
|
||||
"""数据库服务类 - 负责所有数据库操作"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.dao = Database(config.mysql_config)
|
||||
|
||||
def query_orders_by_date(self, date_str: str) -> List[OddsjamOrderStatus]:
|
||||
"""根据日期查询订单数据"""
|
||||
sql = f"select * from {self.config.table_name} where DATE(create_time) = '{date_str}'"
|
||||
rows = self.dao.fetchall(query=sql)
|
||||
return [OddsjamOrderStatus(row) for row in rows] if rows else []
|
||||
|
||||
@retry(tries=6)
|
||||
def update_bet_status(self, bet_id: str, bet_status: str) -> None:
|
||||
"""更新投注状态"""
|
||||
sql = f"update {self.config.table_name} set bet_status = %s where id = %s"
|
||||
self.dao.execute(query=sql, args=(bet_status, bet_id))
|
||||
|
||||
|
||||
class OddsjamService:
|
||||
"""OddsJam业务服务类 - 负责与OddsJam网站的交互"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.total_bet_cnt = 0
|
||||
|
||||
def _get_browser_context(self, playwright) -> Tuple[Page, BrowserContext]:
|
||||
"""获取浏览器上下文"""
|
||||
logger.info(f"加载cookies: {self.config.login_state_path}")
|
||||
|
||||
browser = playwright.chromium.launch(
|
||||
args=["--start-maximized"],
|
||||
headless=self.config.headless,
|
||||
proxy=(
|
||||
{"server": self.config.http_proxy} if self.config.http_proxy else None
|
||||
),
|
||||
)
|
||||
|
||||
context = browser.new_context(
|
||||
storage_state=self.config.login_state_path, no_viewport=True
|
||||
)
|
||||
page = context.new_page()
|
||||
return page, browser
|
||||
|
||||
def login_to_site(self) -> None:
|
||||
"""登录到OddsJam网站"""
|
||||
logger.info(f"登录账户: {self.config.email_account}")
|
||||
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=self.config.headless)
|
||||
page = browser.new_page()
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
page.goto(url)
|
||||
|
||||
page.get_by_role("link", name="Login").click()
|
||||
expect(page.get_by_role("button", name="Sign in")).to_be_visible(
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
|
||||
page.pause()
|
||||
time.sleep(10)
|
||||
|
||||
try:
|
||||
browser.contexts[0].storage_state(path=self.config.login_state_path)
|
||||
logger.info(f"登录成功: {page.url}")
|
||||
except Exception as ex:
|
||||
logger.error(f"保存登录状态失败: {ex}")
|
||||
traceback.print_exc()
|
||||
page.screenshot(path="error.png")
|
||||
|
||||
browser.close()
|
||||
|
||||
@retry(tries=6)
|
||||
def upload_bets(self, bet_file_path: str) -> None:
|
||||
"""上传投注数据到OddsJam"""
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
|
||||
with sync_playwright() as p:
|
||||
page, browser = self._get_browser_context(p)
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
|
||||
def on_response(response):
|
||||
if "dromo-user-imports-production" in response.url:
|
||||
logger.info(f"上传响应状态: {response.status}")
|
||||
if response.status == 400:
|
||||
browser.close()
|
||||
raise Exception(f"上传失败: {response.request.failure}")
|
||||
elif "oddsjam.com/api/backend/bets/import" in response.url:
|
||||
logger.info(f"导入响应状态: {response.status}")
|
||||
|
||||
page.on("response", on_response)
|
||||
page.goto(url)
|
||||
|
||||
# 执行上传流程
|
||||
self._execute_upload_flow(page, bet_file_path)
|
||||
|
||||
browser.close()
|
||||
|
||||
def _execute_upload_flow(self, page: Page, bet_file_path: str) -> None:
|
||||
"""执行上传流程"""
|
||||
page.get_by_role("button", name="Import Bets").nth(1).click()
|
||||
time.sleep(5)
|
||||
|
||||
iframe_locator = page.frame_locator('iframe[title="Dromo Importer\\: Bets"]')
|
||||
|
||||
# 选择文件
|
||||
with page.expect_file_chooser() as fc_info:
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Choose a file", exact=True
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
file_chooser = fc_info.value
|
||||
file_chooser.set_files(bet_file_path)
|
||||
|
||||
# 确认选择
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm selection and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
# 确认匹配
|
||||
iframe_locator.get_by_role(
|
||||
"button", name="Confirm matching and continue"
|
||||
).click()
|
||||
time.sleep(5)
|
||||
|
||||
# 继续
|
||||
iframe_locator.get_by_role("button", name="Continue").click()
|
||||
|
||||
# 完成
|
||||
iframe_locator.get_by_role("button", name="Finish").click()
|
||||
|
||||
# 处理可能的错误
|
||||
try:
|
||||
iframe_locator.get_by_role("button", name="Submit anyway").click(
|
||||
timeout=5000
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
iframe_locator.get_by_role("button", name="Yes").click()
|
||||
|
||||
# 等待处理完成
|
||||
expect(iframe_locator.get_by_text("Processing...")).to_be_hidden(
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
|
||||
def get_all_bet_status(self, date_str: str, status_save_path: str) -> None:
|
||||
"""获取所有投注状态"""
|
||||
logger.info(f"获取投注状态,日期: {date_str}")
|
||||
|
||||
url = "https://oddsjam.com/bet-tracker"
|
||||
|
||||
with sync_playwright() as p:
|
||||
page, browser = self._get_browser_context(p)
|
||||
|
||||
def on_response(response):
|
||||
if "oddsjam.com/api/backend/bets-and-parlays-V2/find" in response.url:
|
||||
try:
|
||||
if response.request.failure is None and response.status == 200:
|
||||
data = response.json()
|
||||
self.total_bet_cnt = data["totalCount"]
|
||||
logger.info(f"总投注数量: {self.total_bet_cnt}")
|
||||
|
||||
# 保存响应数据
|
||||
with open(status_save_path, "a", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
except Exception as e:
|
||||
logger.error(f"处理响应数据失败: {e}")
|
||||
|
||||
page.on("response", on_response)
|
||||
page.set_default_timeout(timeout=self.config.default_timeout)
|
||||
page.goto(url=url)
|
||||
|
||||
# 处理弹窗
|
||||
try:
|
||||
page.locator("#cello-widget-app").get_by_role("button").click(
|
||||
timeout=6000
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 获取总页数并翻页
|
||||
self._navigate_all_pages(page)
|
||||
|
||||
browser.close()
|
||||
|
||||
def _navigate_all_pages(self, page: Page) -> None:
|
||||
"""导航所有页面"""
|
||||
inner_text = page.get_by_text(
|
||||
re.compile("Showing 1 to 50 of", re.IGNORECASE)
|
||||
).inner_text()
|
||||
|
||||
match = re.search(r"Showing 1 to 50 of (\d+) results", inner_text)
|
||||
if not match:
|
||||
raise ValueError("无法获取总结果数")
|
||||
|
||||
total_bet_cnt = int(match[1])
|
||||
total_page_no = math.ceil(total_bet_cnt / 50)
|
||||
logger.info(f"总共 {total_bet_cnt} 条结果,{total_page_no} 页")
|
||||
|
||||
# 翻页到最后一页
|
||||
for page_no in range(2, total_page_no):
|
||||
if page_no <= total_page_no:
|
||||
expect(page.get_by_role("button", name="Next")).to_be_visible(
|
||||
timeout=self.config.default_timeout
|
||||
)
|
||||
page.wait_for_timeout(timeout=3000)
|
||||
page.get_by_role("button", name="Next").click()
|
||||
logger.info(f"当前页码: {page_no} / {total_page_no}")
|
||||
|
||||
|
||||
class BetDataProcessor:
|
||||
"""投注数据处理服务类"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
|
||||
def prepare_bet_data_for_upload(self, date_str: str) -> str:
|
||||
"""准备上传的投注数据"""
|
||||
logger.info(f"准备投注数据,日期: {date_str}")
|
||||
|
||||
# 确保目录存在
|
||||
ensure_directory_exists(target_path=self.config.bet_file_path, is_file=True)
|
||||
|
||||
# 从数据库获取数据
|
||||
db_service = DatabaseService(self.config)
|
||||
data_list = db_service.query_orders_by_date(date_str)
|
||||
|
||||
if not data_list:
|
||||
message = f"日期 {date_str} 没有找到投注数据"
|
||||
logger.warning(message)
|
||||
raise ValueError(message)
|
||||
|
||||
# 转换为DataFrame
|
||||
bet_df = pd.DataFrame([d.to_dict() for d in data_list])
|
||||
|
||||
# 列名映射
|
||||
col_map = {
|
||||
"sportsbooks": "Sportsbook",
|
||||
"bet_name": "Bet Name",
|
||||
"market": "Market Name",
|
||||
"price": "Odds",
|
||||
"stake": "Stake",
|
||||
"event_name": "Event Name",
|
||||
"sport": "Sport",
|
||||
"league": "League",
|
||||
"game_id": "Game ID",
|
||||
"bet_type": "Bet Type",
|
||||
"bet_id": "Notes",
|
||||
"start_timestamp": "Game Start Date",
|
||||
}
|
||||
|
||||
bet_df = bet_df.rename(columns=col_map)
|
||||
|
||||
# 处理Sportsbook字段
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(lambda x: eval(x)[0])
|
||||
|
||||
# 加载博彩公司名称映射
|
||||
with open(self.config.bet_name_map_path, "r") as f:
|
||||
bet_name_map = json.load(f)
|
||||
bet_df["Sportsbook"] = bet_df["Sportsbook"].apply(
|
||||
lambda x: bet_name_map.get(x, x)
|
||||
)
|
||||
|
||||
# 选择需要的列
|
||||
bet_df = bet_df[col_map.values()]
|
||||
|
||||
# 格式化时间
|
||||
bet_df["Game Start Date"] = bet_df["Game Start Date"].apply(
|
||||
lambda x: datetime.datetime.fromtimestamp(x // 1000).strftime(
|
||||
"%Y/%m/%d %H:%M"
|
||||
)
|
||||
)
|
||||
|
||||
# 保存到文件
|
||||
bet_df.to_csv(self.config.bet_file_path, encoding="utf-8-sig", index=False)
|
||||
|
||||
logger.info(f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}")
|
||||
logger.info(f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}")
|
||||
logger.info(f"投注数据形状: {bet_df.shape}")
|
||||
|
||||
return (
|
||||
f"投注数据最小开始时间: {bet_df['Game Start Date'].min()}\n"
|
||||
f"投注数据最大开始时间: {bet_df['Game Start Date'].max()}\n"
|
||||
f"投注数据形状: {bet_df.shape}"
|
||||
)
|
||||
|
||||
def process_status_data(self, status_file_path: str) -> None:
|
||||
"""处理状态数据并更新数据库"""
|
||||
logger.info(f"处理状态数据: {status_file_path}")
|
||||
|
||||
data_list = []
|
||||
with open(status_file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
data_list.extend(data["entities"])
|
||||
|
||||
status_df = pd.DataFrame(data_list)
|
||||
|
||||
# 处理错误状态
|
||||
def get_error_status(row):
|
||||
if not pd.isna(row["autograder_errors"]):
|
||||
return f"error: {row['autograder_errors']}"
|
||||
return row["status"]
|
||||
|
||||
status_df["status"] = status_df.apply(lambda row: get_error_status(row), axis=1)
|
||||
status_df = status_df[status_df["status"] != "pending"]
|
||||
status_df = status_df[["status", "notes"]].drop_duplicates()
|
||||
|
||||
# 更新数据库
|
||||
db_service = DatabaseService(self.config)
|
||||
status_list = status_df.to_dict(orient="records")
|
||||
|
||||
for i, data in enumerate(status_list):
|
||||
bet_id = data["notes"]
|
||||
bet_status = data["status"]
|
||||
logger.info(
|
||||
f"{i+1}/{len(status_list)}, 状态->{bet_status}, 投注ID->{bet_id}"
|
||||
)
|
||||
|
||||
for _ in range(3):
|
||||
try:
|
||||
db_service.update_bet_status(bet_id=bet_id, bet_status=bet_status)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"更新投注状态失败: {e}")
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""定时任务调度器"""
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
self.config = config
|
||||
self.oddsjam_service = OddsjamService(config)
|
||||
self.data_processor = BetDataProcessor(config)
|
||||
|
||||
def upload_bets_task(self, date_str: Optional[str] = None) -> None:
|
||||
"""上传 T-1 时间的投注数据任务"""
|
||||
config_base_info = str(self.config)
|
||||
try:
|
||||
if date_str is None:
|
||||
date_str = (
|
||||
datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
).strftime("%Y%m%d")
|
||||
|
||||
logger.info(f"执行上传任务,日期: {date_str}")
|
||||
|
||||
# 准备数据
|
||||
data_info = self.data_processor.prepare_bet_data_for_upload(date_str)
|
||||
|
||||
if not data_info:
|
||||
logger.warning(f"日期 {date_str} 没有数据需要上传")
|
||||
return
|
||||
|
||||
# 上传到OddsJam
|
||||
self.oddsjam_service.upload_bets(self.config.bet_file_path)
|
||||
|
||||
# 发送通知
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}:比赛数据上传完成\n{config_base_info}\n{data_info}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
logger.error(f"上传任务失败: {error_info}")
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}:\n{config_base_info}\n上传比赛失败: {e}\n{error_info}"
|
||||
)
|
||||
|
||||
def pull_status_task(self, date_str: Optional[str] = None) -> None:
|
||||
"""拉取状态数据任务"""
|
||||
config_base_info = str(self.config)
|
||||
try:
|
||||
if date_str is None:
|
||||
date_str = datetime.datetime.now().strftime("%Y%m%d")
|
||||
|
||||
logger.info(f"执行拉取任务,日期: {date_str}")
|
||||
|
||||
# 构建状态文件路径
|
||||
status_file_path = os.path.join(
|
||||
self.config.root_dir,
|
||||
"data",
|
||||
"bet_data",
|
||||
"bet_status",
|
||||
f"{self.config.email_account}_status_{date_str}.json",
|
||||
)
|
||||
ensure_directory_exists(target_path=status_file_path, is_file=True)
|
||||
|
||||
# 获取状态数据
|
||||
self.oddsjam_service.get_all_bet_status(date_str, status_file_path)
|
||||
|
||||
# 处理状态数据
|
||||
self.data_processor.process_status_data(status_file_path)
|
||||
|
||||
# 发送通知
|
||||
self.config.dingtalk.send_text(
|
||||
f"{date_str}: 比赛状态更新完成\n{config_base_info}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_info = traceback.format_exc()
|
||||
logger.error(f"拉取任务失败: {error_info}")
|
||||
self.config.dingtalk.send_text(f"{config_base_info}\n{error_info}")
|
||||
|
||||
|
||||
class BetTrackerApplication:
|
||||
"""主应用类"""
|
||||
|
||||
def __init__(self):
|
||||
self.config = AppConfig()
|
||||
self.scheduler = TaskScheduler(self.config)
|
||||
|
||||
def run_scheduled_tasks(self) -> None:
|
||||
"""运行定时任务"""
|
||||
import schedule
|
||||
|
||||
logger.info(f"上传定时任务: {self.config.upload_schedule}")
|
||||
logger.info(f"拉取定时任务: {self.config.pull_schedule}")
|
||||
|
||||
# 设置定时任务
|
||||
schedule.every().day.at(self.config.pull_schedule).do(
|
||||
self.scheduler.pull_status_task
|
||||
)
|
||||
schedule.every().day.at(self.config.upload_schedule).do(
|
||||
self.scheduler.upload_bets_task
|
||||
)
|
||||
|
||||
# 运行定时任务
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(0.05)
|
||||
|
||||
def run_manual_task(self, task_type: str, date_str: Optional[str] = None) -> None:
|
||||
"""运行手动任务"""
|
||||
if task_type == "upload":
|
||||
self.scheduler.upload_bets_task(date_str)
|
||||
elif task_type == "pull":
|
||||
self.scheduler.pull_status_task(date_str)
|
||||
else:
|
||||
logger.error(f"未知的任务类型: {task_type}")
|
||||
|
||||
|
||||
def main():
|
||||
"""主入口函数"""
|
||||
app = BetTrackerApplication()
|
||||
|
||||
# 检查命令行参数
|
||||
if len(sys.argv) > 1:
|
||||
task_type = sys.argv[1]
|
||||
date_str = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
app.run_manual_task(task_type, date_str)
|
||||
else:
|
||||
# 运行定时任务
|
||||
app.run_scheduled_tasks()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# app = BetTrackerApplication()
|
||||
# logger.info(app.config)
|
||||
# app.run_manual_task(task_type="upload", date_str="20251026")
|
||||
Reference in New Issue
Block a user