Files
etf/update_data.py

106 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from db_config import DatabaseManager, DatabaseConfig
from loguru import logger
from datetime import datetime
import akshare as ak
from index_downloader import get_all_stock_index
import schedule
import time
import traceback
from dingtalk import DingTalkBot
def get_latest_index_kline_date():
# df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/index_all_stock.csv", encoding="utf-8-sig"
# )
# index_hist_df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/data/index_history_data/000001.csv",
# encoding="utf-8-sig",
# )
df = get_all_stock_index()
column_mapping = {
"date": "date",
"代码": "code",
"今开": "open",
"最高": "high",
"最低": "low",
"最新价": "close",
"成交量": "volume",
}
df["date"] = None
df = df.rename(columns=column_mapping)
df = df[column_mapping.values()]
df = df.drop_duplicates(subset=["code"])
# 获取上证指数最近30天的数据
code = "000001" # 用来获取当前最新k线数据的日期参考 上证指数
latest = df[df["code"] == code].to_dict(orient="records")[0]
h, l, o, c = latest["high"], latest["low"], latest["open"], latest["close"]
cur_date = datetime.now().strftime("%Y%m%d")
seven_days_ago = (datetime.now() - pd.Timedelta(days=30)).strftime("%Y%m%d")
index_hist_df = ak.index_zh_a_hist(
symbol=code,
period="daily",
start_date=seven_days_ago,
end_date=cur_date,
)
index_hist_df = index_hist_df.sort_values(
by="日期", ascending=False
) # 按时间降序排列
index_hist_df = index_hist_df[
(index_hist_df["最高"] == h)
& (index_hist_df["最低"] == l)
& (index_hist_df["开盘"] == o)
& (index_hist_df["收盘"] == c)
]
index_hist = index_hist_df.to_dict(orient="records")[0]
hist_date = index_hist["日期"]
df["date"] = hist_date
return df
def main():
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token如果有否则留空
dingtalk = DingTalkBot(webhook, secret)
try:
db_config = DatabaseConfig()
logger.info(f"数据库连接: {db_config.connection_string}")
# 如果只是测试连接
db_manager = DatabaseManager(db_config)
if db_manager.test_connection():
logger.info("✅ 数据库连接测试成功")
else:
logger.error("❌ 数据库连接测试失败")
raise Exception("数据库连接测试失败")
df = get_latest_index_kline_date()
logger.info(df.head())
latest_date = df["date"].values[0]
res = db_manager.execute_query(
f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;"
)
# print(dict(res))
logger.info(len(res))
if len(res) == 0:
res = db_manager.insert_dataframe(df, "index_kline")
logger.info(res)
except Exception as e:
logger.error(f"main 函数执行异常: {e}\n{traceback.format_exc()}")
dingtalk.send_text(f"main 函数执行异常: {e}\n{traceback.format_exc()}")
if __name__ == "__main__":
logger.info(datetime.now())
schedule.every().day.at("16:00").do(main)
while True:
schedule.run_pending()
time.sleep(1)