import pandas as pd from db_config import DatabaseManager, DatabaseConfig from loguru import logger from datetime import datetime import akshare as ak import schedule import time import traceback from dingtalk import DingTalkBot import os from signal_calc import main_calc_process import requests from retry import retry webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4" secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e" dingtalk = DingTalkBot(webhook, secret) def get_latest_index_kline_date(): df = ak.stock_zh_index_spot_sina() column_mapping = { "date": "date", "代码": "code", "今开": "open", "最高": "high", "最低": "low", "最新价": "close", "成交量": "volume", } df["date"] = None df = df.rename(columns=column_mapping) df['code'] = df['code'].str.extract(r'(\d+)')[0] df = df[column_mapping.values()] df = df.drop_duplicates(subset=["code"]) cur_date = datetime.now().strftime("%Y%m%d") df["date"] = cur_date df.dropna(how="any", inplace=True) # df.to_csv(f"aaaa.csv", index=False, encoding="utf-8-sig") return df def main(): if datetime.today().weekday() >= 5: logger.info(f"非交易日") return try: db_config = DatabaseConfig() logger.info(f"数据库连接: {db_config.connection_string}") # 如果只是测试连接 db_manager = DatabaseManager(db_config) if db_manager.test_connection(): logger.info("✅ 数据库连接测试成功") else: logger.error("❌ 数据库连接测试失败") raise Exception("数据库连接测试失败") df = get_latest_index_kline_date() logger.info(df.head()) latest_date = df["date"].values[0] res = db_manager.execute_query( f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;" ) # print(dict(res)) logger.info(len(res)) if len(res) == 0: res = db_manager.insert_dataframe(df, "index_kline") logger.info(res) main_calc_process() except Exception as e: error_message = f"{e}\n{traceback.format_exc()}" logger.error(error_message) dingtalk.send_text(f"A股指数抓取失败: \n{error_message}") if __name__ == "__main__": # main() logger.info(datetime.now()) PULL_SCHEDULE: str = os.getenv("PULL_SCHEDULE", "16:00") logger.info(f"PULL_SCHEDULE: {PULL_SCHEDULE}") schedule.every().day.at(PULL_SCHEDULE).do(main) while True: schedule.run_pending() time.sleep(1)