import time from playwright.sync_api import sync_playwright from loguru import logger import datetime import re import json import pandas as pd def index_data_scraper(index_code: str, data_file_path: str): with sync_playwright() as p: def on_response(response): if "push2.eastmoney.com/api/qt/clist/get" in response.url: try: if response.request.failure is None and response.status == 200: data = response.text() # logger.info(f"最新数据: \n{data}") # 保存响应数据 with open(data_file_path, "a", encoding="utf-8") as f: f.write(data) f.write("\n") except Exception as e: logger.error(f"处理响应数据失败: {e}") browser_state_file_path = "./em_browser_state.json" browser = p.chromium.launch(args=["--start-maximized"], headless=True) context = browser.new_context( storage_state=browser_state_file_path, no_viewport=True ) page = context.new_page() page.on("response", on_response) url = f"https://quote.eastmoney.com/center/gridlist.html#{index_code}" page.goto(url) # page.pause() for i in range(1, 500): logger.info(f"第{i}次点击") try: page.get_by_role("link", name=">", exact=True).click(timeout=30000) except Exception as e: break time.sleep(10) def parse_data(data_file_path: str): df_list = [] with open(data_file_path, "r", encoding="utf-8") as f: for line in f: match = re.search(r"\((\{.*\})\);?$", line) json_str = match.group(1) data = json.loads(json_str) inner_temp_df = pd.DataFrame(data["data"]["diff"]) df_list.append(inner_temp_df) temp_df = pd.concat(df_list, ignore_index=True) temp_df["f3"] = pd.to_numeric(temp_df["f3"], errors="coerce") temp_df.sort_values(by=["f3"], ascending=False, inplace=True, ignore_index=True) temp_df.reset_index(inplace=True) temp_df["index"] = temp_df["index"].astype(int) + 1 col_name_map = { "f12": "代码", "f14": "名称", "f2": "最新价", "f3": "涨跌幅", "f4": "涨跌额", "f5": "成交量", "f6": "成交额", "f7": "振幅", "f15": "最高", "f16": "最低", "f17": "今开", "f18": "昨收", "f10": "量比", } temp_df.rename( columns=col_name_map, inplace=True, ) new_cols = col_name_map.values() temp_df = temp_df[new_cols] for col in new_cols: if col in [ "代码", "名称", ]: continue temp_df[col] = pd.to_numeric(temp_df[col], errors="coerce") return temp_df def get_index_latest_data(): today = datetime.datetime.now().strftime("%Y%m%d") data_file_path = f"./{today}.txt" index_code = "index_sh" index_code_list = ["index_sh", "index_sz", "index_components", "index_zzzs"] for index_code in index_code_list: logger.info(f"开始更新指数数据: {index_code}") index_data_scraper(index_code=index_code, data_file_path=data_file_path) df = parse_data(data_file_path) return df if __name__ == "__main__": df = get_index_latest_data() code = "000001" df = df[df["代码"] == code] print(df)