Files
etf/update_data.py

133 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from db_config import DatabaseManager, DatabaseConfig
from loguru import logger
from datetime import datetime
import akshare as ak
from index_downloader import get_all_stock_index
from em_index_sport import get_index_latest_data
import schedule
import time
import traceback
from dingtalk import DingTalkBot
import os
from signal_calc import main_calc_process
import requests
from retry import retry
webhook = "https://oapi.dingtalk.com/robot/send?access_token=21de667159edadd33172c6ec414a2addf9c6359189350ffd36819d2a20e8a0f4"
secret = "SEC43a0fa0b29717f98637a119b92a0bd5f7b2b6da671bdd2bd1279ed8323454d5e"
dingtalk = DingTalkBot(webhook, secret)
def set_proxy():
url = "http://v2.api.juliangip.com/company/postpay/getips?auto_white=1&num=1&pt=1&result_type=json&trade_no=6735165951220899&sign=bd763049a68b4b817608520dad35ea25"
r = requests.get(url)
data = r.json()
logger.info(data)
proxy = data["data"]["proxy_list"][0]
os.environ["https_proxy"] = f"http://{proxy}"
os.environ["http_proxy"] = f"http://{proxy}"
@retry(tries=3, delay=10)
def get_latest_index_kline_date():
# df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/index_all_stock.csv", encoding="utf-8-sig"
# )
# index_hist_df = pd.read_csv(
# "/Users/aszer/Documents/vscode/etf/data/index_history_data/000001.csv",
# encoding="utf-8-sig",
# )
set_proxy()
df = get_all_stock_index()
# df = get_index_latest_data()
column_mapping = {
"date": "date",
"代码": "code",
"今开": "open",
"最高": "high",
"最低": "low",
"最新价": "close",
"成交量": "volume",
}
df["date"] = None
df = df.rename(columns=column_mapping)
df = df[column_mapping.values()]
df = df.drop_duplicates(subset=["code"])
# 获取上证指数最近30天的数据
code = "000001" # 用来获取当前最新k线数据的日期参考 上证指数
latest = df[df["code"] == code].to_dict(orient="records")[0]
h, l, o, c = latest["high"], latest["low"], latest["open"], latest["close"]
cur_date = datetime.now().strftime("%Y%m%d")
seven_days_ago = (datetime.now() - pd.Timedelta(days=30)).strftime("%Y%m%d")
index_hist_df = ak.index_zh_a_hist(
symbol=code,
period="daily",
start_date=seven_days_ago,
end_date=cur_date,
)
index_hist_df = index_hist_df.sort_values(
by="日期", ascending=False
) # 按时间降序排列
index_hist_df = index_hist_df[
(index_hist_df["最高"] == h)
& (index_hist_df["最低"] == l)
& (index_hist_df["开盘"] == o)
& (index_hist_df["收盘"] == c)
]
index_hist = index_hist_df.to_dict(orient="records")[0]
hist_date = index_hist["日期"]
df["date"] = hist_date
df.dropna(how="any", inplace=True)
# df.to_csv(f"aaaa.csv", index=False, encoding="utf-8-sig")
return df
def main():
if datetime.today().weekday() >= 5:
logger.info(f"非交易日")
return
try:
db_config = DatabaseConfig()
logger.info(f"数据库连接: {db_config.connection_string}")
# 如果只是测试连接
db_manager = DatabaseManager(db_config)
if db_manager.test_connection():
logger.info("✅ 数据库连接测试成功")
else:
logger.error("❌ 数据库连接测试失败")
raise Exception("数据库连接测试失败")
df = get_latest_index_kline_date()
logger.info(df.head())
latest_date = df["date"].values[0]
res = db_manager.execute_query(
f"SELECT date, code, open, high, low, close, volume FROM public.index_kline where code='000001' and date='{latest_date}' order by date desc limit 1;"
)
# print(dict(res))
logger.info(len(res))
if len(res) == 0:
res = db_manager.insert_dataframe(df, "index_kline")
logger.info(res)
main_calc_process()
except Exception as e:
error_message = f"{e}\n{traceback.format_exc()}"
logger.error(error_message)
dingtalk.send_text(f"A股指数抓取失败: \n{error_message}")
if __name__ == "__main__":
# main()
logger.info(datetime.now())
PULL_SCHEDULE: str = os.getenv("PULL_SCHEDULE", "16:00")
logger.info(f"PULL_SCHEDULE: {PULL_SCHEDULE}")
schedule.every().day.at(PULL_SCHEDULE).do(main)
while True:
schedule.run_pending()
time.sleep(1)