From 4da8a6ad4ae7893897357a2c388bfabec17fef66 Mon Sep 17 00:00:00 2001 From: aszerW Date: Sun, 12 Oct 2025 15:43:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E7=AE=97=E6=8C=87=E6=95=B0=E7=9A=84?= =?UTF-8?q?=E5=91=A8=E7=BA=BFcci?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 3 +- signal_calc.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 signal_calc.py diff --git a/requirements.txt b/requirements.txt index 4904e69..95daf43 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,5 @@ tqdm>=4.65.0 # 时间处理 python-dateutil>=2.8.0 schedule -akshare \ No newline at end of file +akshare +TA-Lib \ No newline at end of file diff --git a/signal_calc.py b/signal_calc.py new file mode 100644 index 0000000..a89c978 --- /dev/null +++ b/signal_calc.py @@ -0,0 +1,101 @@ +import pandas as pd +from db_config import DatabaseManager, DatabaseConfig +from loguru import logger +from datetime import datetime +import akshare as ak +from index_downloader import get_all_stock_index +import schedule +import time +import traceback +from dingtalk import DingTalkBot +import talib as ta + +db_config = DatabaseConfig(env="daily") +logger.info(f"数据库连接: {db_config.connection_string}") + +# 如果只是测试连接 +db_manager = DatabaseManager(db_config) + + +def get_all_index_code() -> list: + """ + 获取所有指数代码 + :return: + """ + sql = "SELECT distinct code FROM public.index_kline;" + res = db_manager.execute_query(sql) + code_list = [dict(item)["code"] for item in res] + return code_list + + +def get_index_recent_date(code: str, limit: int = None) -> pd.DataFrame: + """ + 获取指数的近期数据 + :param code: + :return: + """ + limit_clause = f" LIMIT {limit}" if limit else "" + sql = f"SELECT date, code, open, high, low, close, volume FROM public.index_kline WHERE code = '{code}' order by date desc {limit_clause};" + raw_data_list = db_manager.execute_query(sql) + data_list = [dict(item) for item in raw_data_list] + + for i, data in enumerate(data_list): + data_list[i]["date"] = data["date"].strftime("%Y-%m-%d") + data_list[i]["volume"] = int(data["volume"]) + data_list[i]["close"] = float(data["close"]) + data_list[i]["open"] = float(data["open"]) + data_list[i]["high"] = float(data["high"]) + data_list[i]["low"] = float(data["low"]) + df = pd.DataFrame(data_list) + return df + + +def main(): + webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook + secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token(如果有),否则留空 + dingtalk = DingTalkBot(webhook, secret) + # code_list = get_all_index_code() + code_df = get_all_stock_index() + code_df = code_df.drop_duplicates(subset=["代码"]) + code_list = code_df.to_dict(orient="records") + signal_list = [] + for i, code_info in enumerate(code_list): + code = code_info["代码"] + df = get_index_recent_date(code, 100) + if len(df) < 100: + continue + # 将 'date' 列转换为 datetime 类型,并设置为索引 + df["date"] = pd.to_datetime(df["date"]) + df = df.sort_values("date") + df.set_index("date", inplace=True) + + # 按周重采样(以每周最后一天为sample),open为第一个、close为最后一个、high/low为最大/最小、volume为总和、code取第一个即可 + df_weekly = pd.DataFrame( + { + "code": df["code"].resample("W").first(), + "open": df["open"].resample("W").first(), + "high": df["high"].resample("W").max(), + "low": df["low"].resample("W").min(), + "close": df["close"].resample("W").last(), + "volume": df["volume"].resample("W").sum(), + } + ) + + # 计算CCI指标(以典型的20周期为例,如果有更具体周期可以调整) + df_weekly["cci"] = ta.CCI( + high=df_weekly["high"], + low=df_weekly["low"], + close=df_weekly["close"], + timeperiod=14, + ) + df_weekly = df_weekly.tail(1) + cci = df_weekly["cci"].values[0] + logger.info(f"{i}/{len(code_list)}: {code} cci: {cci}") + if cci < -100: + signal_list.append({"code": code, "name": code_info["名称"], "cci": cci}) + signal_df = pd.DataFrame(signal_list) + dingtalk.send_markdown(f"CCI信号", signal_df.to_markdown()) + + +if __name__ == "__main__": + main()