Files
etf/em_index_sport.py
2025-10-29 23:32:54 +08:00

122 lines
3.9 KiB
Python

import time
from playwright.sync_api import sync_playwright
from loguru import logger
import datetime
import re
import json
import pandas as pd
def index_data_scraper(index_code: str, data_file_path: str):
with sync_playwright() as p:
def on_response(response):
if "push2.eastmoney.com/api/qt/clist/get" in response.url:
try:
if response.request.failure is None and response.status == 200:
data = response.text()
# logger.info(f"最新数据: \n{data}")
# 保存响应数据
with open(data_file_path, "a", encoding="utf-8") as f:
f.write(data)
f.write("\n")
except Exception as e:
logger.error(f"处理响应数据失败: {e}")
browser_state_file_path = "./em_browser_state.json"
browser = p.chromium.launch(args=["--start-maximized"], headless=True)
context = browser.new_context(
storage_state=browser_state_file_path, no_viewport=True
)
page = context.new_page()
page.on("response", on_response)
url = f"https://quote.eastmoney.com/center/gridlist.html#{index_code}"
page.goto(url)
# page.pause()
for i in range(1, 500):
logger.info(f"{i}次点击")
try:
page.get_by_role("link", name=">", exact=True).click(timeout=30000)
except Exception as e:
break
time.sleep(30)
def get_state():
with sync_playwright() as p:
browser_state_file_path = "./em_browser_state.json"
browser = p.chromium.launch(args=["--start-maximized"], headless=False)
page = browser.new_page()
url = f"https://quote.eastmoney.com/center/gridlist.html#index_sh"
page.goto(url)
page.pause()
browser.contexts[0].storage_state(path=browser_state_file_path)
def parse_data(data_file_path: str):
df_list = []
with open(data_file_path, "r", encoding="utf-8") as f:
for line in f:
match = re.search(r"\((\{.*\})\);?$", line)
json_str = match.group(1)
data = json.loads(json_str)
inner_temp_df = pd.DataFrame(data["data"]["diff"])
df_list.append(inner_temp_df)
temp_df = pd.concat(df_list, ignore_index=True)
temp_df["f3"] = pd.to_numeric(temp_df["f3"], errors="coerce")
temp_df.sort_values(by=["f3"], ascending=False, inplace=True, ignore_index=True)
temp_df.reset_index(inplace=True)
temp_df["index"] = temp_df["index"].astype(int) + 1
col_name_map = {
"f12": "代码",
"f14": "名称",
"f2": "最新价",
"f3": "涨跌幅",
"f4": "涨跌额",
"f5": "成交量",
"f6": "成交额",
"f7": "振幅",
"f15": "最高",
"f16": "最低",
"f17": "今开",
"f18": "昨收",
"f10": "量比",
}
temp_df.rename(
columns=col_name_map,
inplace=True,
)
new_cols = col_name_map.values()
temp_df = temp_df[new_cols]
for col in new_cols:
if col in [
"代码",
"名称",
]:
continue
temp_df[col] = pd.to_numeric(temp_df[col], errors="coerce")
return temp_df
def get_index_latest_data():
today = datetime.datetime.now().strftime("%Y%m%d")
data_file_path = f"./{today}.txt"
index_code = "index_sh"
index_code_list = ["index_sh", "index_sz", "index_components", "index_zzzs"]
for index_code in index_code_list:
logger.info(f"开始更新指数数据: {index_code}")
index_data_scraper(index_code=index_code, data_file_path=data_file_path)
df = parse_data(data_file_path)
return df
if __name__ == "__main__":
# df = get_index_latest_data()
# code = "000001"
# df = df[df["代码"] == code]
# print(df)
get_state()