Files
etf/signal_calc.py
2025-10-30 22:00:16 +08:00

141 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from db_config import DatabaseManager, DatabaseConfig
from loguru import logger
from datetime import datetime
import akshare as ak
from index_downloader import get_all_stock_index
import schedule
import time
import traceback
from dingtalk import DingTalkBot
import talib as ta
from tabulate import tabulate
db_config = DatabaseConfig()
logger.info(f"数据库连接: {db_config.connection_string}")
# 如果只是测试连接
db_manager = DatabaseManager(db_config)
def get_all_index_code() -> list:
"""
获取所有指数代码
:return:
"""
sql = "SELECT distinct code FROM public.index_kline;"
res = db_manager.execute_query(sql)
code_list = [dict(item)["code"] for item in res]
return code_list
def get_index_recent_date(code: str, limit: int = None) -> pd.DataFrame:
"""
获取指数的近期数据
:param code:
:return:
"""
limit_clause = f" LIMIT {limit}" if limit else ""
sql = f"SELECT date, code, open, high, low, close, volume FROM public.index_kline WHERE code = '{code}' order by date desc {limit_clause};"
raw_data_list = db_manager.execute_query(sql)
data_list = [dict(item) for item in raw_data_list]
for i, data in enumerate(data_list):
data_list[i]["date"] = data["date"].strftime("%Y-%m-%d")
data_list[i]["volume"] = int(data["volume"])
data_list[i]["close"] = float(data["close"])
data_list[i]["open"] = float(data["open"])
data_list[i]["high"] = float(data["high"])
data_list[i]["low"] = float(data["low"])
df = pd.DataFrame(data_list)
return df
def main_calc_process():
if datetime.today().weekday() >= 5:
logger.info(f"非交易日")
return
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token如果有否则留空
dingtalk = DingTalkBot(webhook, secret)
index_fund_df = pd.read_csv("index_fund_info.csv", encoding="utf-8-sig")
code_df = index_fund_df.drop_duplicates(subset=["指数代码"])
code_list = code_df.to_dict(orient="records")
signal_list = []
for i, code_info in enumerate(code_list):
code = code_info["指数代码"]
df = get_index_recent_date(code, 100)
if len(df) < 100:
continue
# 将 'date' 列转换为 datetime 类型,并设置为索引
df["date"] = pd.to_datetime(df["date"])
# 判断最新日期是否为今天,如果不是则跳过
today_str = datetime.now().strftime("%Y-%m-%d")
if df["date"].max().strftime("%Y-%m-%d") != today_str:
continue
df = df.sort_values("date")
df.set_index("date", inplace=True)
# 按周重采样以每周最后一天为sampleopen为第一个、close为最后一个、high/low为最大/最小、volume为总和、code取第一个即可
df_weekly = pd.DataFrame(
{
"code": df["code"].resample("W").first(),
"open": df["open"].resample("W").first(),
"high": df["high"].resample("W").max(),
"low": df["low"].resample("W").min(),
"close": df["close"].resample("W").last(),
"volume": df["volume"].resample("W").sum(),
}
)
# 计算CCI指标以典型的20周期为例如果有更具体周期可以调整
df_weekly["cci"] = ta.CCI(
high=df_weekly["high"],
low=df_weekly["low"],
close=df_weekly["close"],
timeperiod=14,
)
df_weekly = df_weekly.tail(1)
week_cci = df_weekly["cci"].values[0]
df["cci"] = ta.CCI(
high=df["high"],
low=df["low"],
close=df["close"],
timeperiod=14,
)
cci = df["cci"].tail(1).values[0]
logger.info(f"{i}/{len(code_list)}: {code} week_cci: {week_cci} day_cci: {cci}")
if cci < -100 or week_cci < -100:
signal_list.append(
{
"code": code,
"name": code_info["指数名称"],
"天cci": cci,
"周cci": week_cci,
}
)
# break
signal_df = pd.DataFrame(signal_list)
# dingtalk.send_markdown(
# f"CCI信号", signal_df.to_markdown(tablefmt="simple", index=False)
# )
if len(signal_list) > 0:
dingtalk.send_text(
tabulate(signal_df, tablefmt="plain", headers="keys", showindex=False)
)
else:
logger.info("无信号")
if __name__ == "__main__":
# main_calc_process()
...
# main()
# logger.info(datetime.now())
# schedule.every().day.at("19:00").do(main)
# while True:
# schedule.run_pending()
# time.sleep(1)