计算指数的周线cci

This commit is contained in:
2025-10-12 15:43:52 +08:00
parent 30a237829b
commit 4da8a6ad4a
2 changed files with 103 additions and 1 deletions

View File

@@ -20,4 +20,5 @@ tqdm>=4.65.0
# 时间处理
python-dateutil>=2.8.0
schedule
akshare
akshare
TA-Lib

101
signal_calc.py Normal file
View File

@@ -0,0 +1,101 @@
import pandas as pd
from db_config import DatabaseManager, DatabaseConfig
from loguru import logger
from datetime import datetime
import akshare as ak
from index_downloader import get_all_stock_index
import schedule
import time
import traceback
from dingtalk import DingTalkBot
import talib as ta
db_config = DatabaseConfig(env="daily")
logger.info(f"数据库连接: {db_config.connection_string}")
# 如果只是测试连接
db_manager = DatabaseManager(db_config)
def get_all_index_code() -> list:
"""
获取所有指数代码
:return:
"""
sql = "SELECT distinct code FROM public.index_kline;"
res = db_manager.execute_query(sql)
code_list = [dict(item)["code"] for item in res]
return code_list
def get_index_recent_date(code: str, limit: int = None) -> pd.DataFrame:
"""
获取指数的近期数据
:param code:
:return:
"""
limit_clause = f" LIMIT {limit}" if limit else ""
sql = f"SELECT date, code, open, high, low, close, volume FROM public.index_kline WHERE code = '{code}' order by date desc {limit_clause};"
raw_data_list = db_manager.execute_query(sql)
data_list = [dict(item) for item in raw_data_list]
for i, data in enumerate(data_list):
data_list[i]["date"] = data["date"].strftime("%Y-%m-%d")
data_list[i]["volume"] = int(data["volume"])
data_list[i]["close"] = float(data["close"])
data_list[i]["open"] = float(data["open"])
data_list[i]["high"] = float(data["high"])
data_list[i]["low"] = float(data["low"])
df = pd.DataFrame(data_list)
return df
def main():
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token如果有否则留空
dingtalk = DingTalkBot(webhook, secret)
# code_list = get_all_index_code()
code_df = get_all_stock_index()
code_df = code_df.drop_duplicates(subset=["代码"])
code_list = code_df.to_dict(orient="records")
signal_list = []
for i, code_info in enumerate(code_list):
code = code_info["代码"]
df = get_index_recent_date(code, 100)
if len(df) < 100:
continue
# 将 'date' 列转换为 datetime 类型,并设置为索引
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date")
df.set_index("date", inplace=True)
# 按周重采样以每周最后一天为sampleopen为第一个、close为最后一个、high/low为最大/最小、volume为总和、code取第一个即可
df_weekly = pd.DataFrame(
{
"code": df["code"].resample("W").first(),
"open": df["open"].resample("W").first(),
"high": df["high"].resample("W").max(),
"low": df["low"].resample("W").min(),
"close": df["close"].resample("W").last(),
"volume": df["volume"].resample("W").sum(),
}
)
# 计算CCI指标以典型的20周期为例如果有更具体周期可以调整
df_weekly["cci"] = ta.CCI(
high=df_weekly["high"],
low=df_weekly["low"],
close=df_weekly["close"],
timeperiod=14,
)
df_weekly = df_weekly.tail(1)
cci = df_weekly["cci"].values[0]
logger.info(f"{i}/{len(code_list)}: {code} cci: {cci}")
if cci < -100:
signal_list.append({"code": code, "name": code_info["名称"], "cci": cci})
signal_df = pd.DataFrame(signal_list)
dingtalk.send_markdown(f"CCI信号", signal_df.to_markdown())
if __name__ == "__main__":
main()