112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
import pandas as pd
|
||
from db_config import DatabaseManager, DatabaseConfig
|
||
from loguru import logger
|
||
from datetime import datetime
|
||
import akshare as ak
|
||
from index_downloader import get_all_stock_index
|
||
import schedule
|
||
import time
|
||
import traceback
|
||
from dingtalk import DingTalkBot
|
||
import os
|
||
|
||
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
|
||
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
|
||
dingtalk = DingTalkBot(webhook, secret)
|
||
|
||
|
||
def get_latest_index_kline_date():
|
||
# df = pd.read_csv(
|
||
# "/Users/aszer/Documents/vscode/etf/index_all_stock.csv", encoding="utf-8-sig"
|
||
# )
|
||
# index_hist_df = pd.read_csv(
|
||
# "/Users/aszer/Documents/vscode/etf/data/index_history_data/000001.csv",
|
||
# encoding="utf-8-sig",
|
||
# )
|
||
df = get_all_stock_index()
|
||
column_mapping = {
|
||
"date": "date",
|
||
"代码": "code",
|
||
"今开": "open",
|
||
"最高": "high",
|
||
"最低": "low",
|
||
"最新价": "close",
|
||
"成交量": "volume",
|
||
}
|
||
|
||
df["date"] = None
|
||
df = df.rename(columns=column_mapping)
|
||
df = df[column_mapping.values()]
|
||
df = df.drop_duplicates(subset=["code"])
|
||
|
||
# 获取上证指数最近30天的数据
|
||
code = "000001" # 用来获取当前最新k线数据的日期,参考 上证指数
|
||
latest = df[df["code"] == code].to_dict(orient="records")[0]
|
||
h, l, o, c = latest["high"], latest["low"], latest["open"], latest["close"]
|
||
cur_date = datetime.now().strftime("%Y%m%d")
|
||
seven_days_ago = (datetime.now() - pd.Timedelta(days=30)).strftime("%Y%m%d")
|
||
index_hist_df = ak.index_zh_a_hist(
|
||
symbol=code,
|
||
period="daily",
|
||
start_date=seven_days_ago,
|
||
end_date=cur_date,
|
||
)
|
||
|
||
index_hist_df = index_hist_df.sort_values(
|
||
by="日期", ascending=False
|
||
) # 按时间降序排列
|
||
index_hist_df = index_hist_df[
|
||
(index_hist_df["最高"] == h)
|
||
& (index_hist_df["最低"] == l)
|
||
& (index_hist_df["开盘"] == o)
|
||
& (index_hist_df["收盘"] == c)
|
||
]
|
||
index_hist = index_hist_df.to_dict(orient="records")[0]
|
||
hist_date = index_hist["日期"]
|
||
df["date"] = hist_date
|
||
return df
|
||
|
||
|
||
def main():
|
||
if datetime.today().weekday() >= 5:
|
||
logger.info(f"非交易日")
|
||
return
|
||
|
||
try:
|
||
db_config = DatabaseConfig()
|
||
logger.info(f"数据库连接: {db_config.connection_string}")
|
||
|
||
# 如果只是测试连接
|
||
db_manager = DatabaseManager(db_config)
|
||
if db_manager.test_connection():
|
||
logger.info("✅ 数据库连接测试成功")
|
||
else:
|
||
logger.error("❌ 数据库连接测试失败")
|
||
raise Exception("数据库连接测试失败")
|
||
|
||
df = get_latest_index_kline_date()
|
||
logger.info(df.head())
|
||
latest_date = df["date"].values[0]
|
||
res = db_manager.execute_query(
|
||
f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;"
|
||
)
|
||
# print(dict(res))
|
||
logger.info(len(res))
|
||
if len(res) == 0:
|
||
res = db_manager.insert_dataframe(df, "index_kline")
|
||
logger.info(res)
|
||
except Exception as e:
|
||
error_message = f"{e}\n{traceback.format_exc()}"
|
||
logger.error(error_message)
|
||
dingtalk.send_text(f"A股指数抓取失败: \n{error_message}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logger.info(datetime.now())
|
||
PULL_SCHEDULE: str = os.getenv("PULL_SCHEDULE", "16:00")
|
||
logger.info(f"PULL_SCHEDULE: {PULL_SCHEDULE}")
|
||
schedule.every().day.at(PULL_SCHEDULE).do(main)
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|