import pandas as pd from db_config import DatabaseManager, DatabaseConfig from loguru import logger from datetime import datetime import akshare as ak # from index_downloader import get_all_stock_index from em_index_sport import get_index_latest_data import schedule import time import traceback from dingtalk import DingTalkBot import os from signal_calc import main_calc_process webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4" secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e" dingtalk = DingTalkBot(webhook, secret) def get_latest_index_kline_date(): # df = pd.read_csv( # "/Users/aszer/Documents/vscode/etf/index_all_stock.csv", encoding="utf-8-sig" # ) # index_hist_df = pd.read_csv( # "/Users/aszer/Documents/vscode/etf/data/index_history_data/000001.csv", # encoding="utf-8-sig", # ) # df = get_all_stock_index() df = get_index_latest_data() column_mapping = { "date": "date", "代码": "code", "今开": "open", "最高": "high", "最低": "low", "最新价": "close", "成交量": "volume", } df["date"] = None df = df.rename(columns=column_mapping) df = df[column_mapping.values()] df = df.drop_duplicates(subset=["code"]) # 获取上证指数最近30天的数据 code = "000001" # 用来获取当前最新k线数据的日期,参考 上证指数 latest = df[df["code"] == code].to_dict(orient="records")[0] h, l, o, c = latest["high"], latest["low"], latest["open"], latest["close"] cur_date = datetime.now().strftime("%Y%m%d") seven_days_ago = (datetime.now() - pd.Timedelta(days=30)).strftime("%Y%m%d") index_hist_df = ak.index_zh_a_hist( symbol=code, period="daily", start_date=seven_days_ago, end_date=cur_date, ) index_hist_df = index_hist_df.sort_values( by="日期", ascending=False ) # 按时间降序排列 index_hist_df = index_hist_df[ (index_hist_df["最高"] == h) & (index_hist_df["最低"] == l) & (index_hist_df["开盘"] == o) & (index_hist_df["收盘"] == c) ] index_hist = index_hist_df.to_dict(orient="records")[0] hist_date = index_hist["日期"] df["date"] = hist_date return df def main(): if datetime.today().weekday() >= 5: logger.info(f"非交易日") return try: db_config = DatabaseConfig(env="daily") logger.info(f"数据库连接: {db_config.connection_string}") # 如果只是测试连接 db_manager = DatabaseManager(db_config) if db_manager.test_connection(): logger.info("✅ 数据库连接测试成功") else: logger.error("❌ 数据库连接测试失败") raise Exception("数据库连接测试失败") df = get_latest_index_kline_date() logger.info(df.head()) latest_date = df["date"].values[0] res = db_manager.execute_query( f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;" ) # print(dict(res)) logger.info(len(res)) if len(res) == 0: res = db_manager.insert_dataframe(df, "index_kline") logger.info(res) main_calc_process() except Exception as e: error_message = f"{e}\n{traceback.format_exc()}" logger.error(error_message) dingtalk.send_text(f"A股指数抓取失败: \n{error_message}") if __name__ == "__main__": # main() logger.info(datetime.now()) PULL_SCHEDULE: str = os.getenv("PULL_SCHEDULE", "16:00") logger.info(f"PULL_SCHEDULE: {PULL_SCHEDULE}") schedule.every().day.at(PULL_SCHEDULE).do(main) while True: schedule.run_pending() time.sleep(1)