每天收盘后定时获取最新价格更新到数据库

This commit is contained in:
2025-10-12 13:36:17 +08:00
parent fc9a002d2b
commit 0640355d8f

93
update_data.py Normal file
View File

@@ -0,0 +1,93 @@
import pandas as pd
from db_config import DatabaseManager, DatabaseConfig
from loguru import logger
from datetime import datetime
import akshare as ak
from index_downloader import get_all_stock_index
import schedule
import time
def get_latest_index_kline_date():
# df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/index_all_stock.csv", encoding="utf-8-sig"
# )
# index_hist_df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/data/index_history_data/000001.csv",
# encoding="utf-8-sig",
# )
df = get_all_stock_index()
column_mapping = {
"date": "date",
"代码": "code",
"今开": "open",
"最高": "high",
"最低": "low",
"最新价": "close",
"成交量": "volume",
}
df["date"] = None
df = df.rename(columns=column_mapping)
df = df[column_mapping.values()]
df = df.drop_duplicates(subset=["code"])
# 获取上证指数最近30天的数据
code = "000001" # 用来获取当前最新k线数据的日期参考 上证指数
latest = df[df["code"] == code].to_dict(orient="records")[0]
h, l, o, c = latest["high"], latest["low"], latest["open"], latest["close"]
cur_date = datetime.now().strftime("%Y%m%d")
seven_days_ago = (datetime.now() - pd.Timedelta(days=30)).strftime("%Y%m%d")
index_hist_df = ak.index_zh_a_hist(
symbol=code,
period="daily",
start_date=seven_days_ago,
end_date=cur_date,
)
index_hist_df = index_hist_df.sort_values(
by="日期", ascending=False
) # 按时间降序排列
index_hist_df = index_hist_df[
(index_hist_df["最高"] == h)
& (index_hist_df["最低"] == l)
& (index_hist_df["开盘"] == o)
& (index_hist_df["收盘"] == c)
]
index_hist = index_hist_df.to_dict(orient="records")[0]
hist_date = index_hist["日期"]
df["date"] = hist_date
return df
def main():
db_config = DatabaseConfig()
logger.info(f"数据库连接: {db_config.connection_string}")
# 如果只是测试连接
db_manager = DatabaseManager(db_config)
if db_manager.test_connection():
logger.info("✅ 数据库连接测试成功")
else:
logger.error("❌ 数据库连接测试失败")
df = get_latest_index_kline_date()
logger.info(df.head())
latest_date = df["date"].values[0]
res = db_manager.execute_query(
f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;"
)
# print(dict(res))
logger.info(len(res))
if len(res) == 0:
res = db_manager.insert_dataframe(df, "index_kline")
logger.info(res)
if __name__ == "__main__":
logger.info(datetime.now())
schedule.every().day.at("16:00").do(main)
while True:
schedule.run_pending()
time.sleep(1)