Files
aszerW 9e1ba2db03 refactor: 移除screener目录到archive
- strategies/screener/ → archive/legacy_screener/
- base.py (Screener抽象基类) 功能已被 framework/signals/SignalGenerator 覆盖
- cci.py (CCI策略) 依赖已归档的 core.factors.technical 和 core.common

保留新框架结构,使用 SignalGenerator 替代 Screener 抽象
2026-05-11 23:36:29 +08:00

187 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CCI技术指标筛选器
基于商品通道指数(CCI)筛选超卖标的
"""
import pandas as pd
from datetime import datetime
from .base import DataFrameScreener
from core.factors.technical import calculate_cci, resample_to_weekly
from core.common.db import DatabaseManager
from core.common.notify import NotificationManager
class CCIScreener(DataFrameScreener):
"""CCI超卖筛选器"""
def __init__(self, config: dict = None):
super().__init__("CCI超卖筛选", config)
self.day_period = config.get("day_period", 14)
self.week_period = config.get("week_period", 14)
self.threshold = config.get("threshold", -100)
self.use_weekly = config.get("use_weekly", True)
self.db_manager = DatabaseManager()
self.notifier = NotificationManager()
def screen(self, df: pd.DataFrame) -> dict:
"""
对单只标的进行CCI筛选
Args:
df: DataFrame with OHLCV data
Returns:
dict: {
'triggered': bool,
'day_cci': float,
'week_cci': float or None,
'current_price': float,
}
"""
if not self.validate_data(df):
return {"triggered": False, "error": "数据格式错误"}
# 计算日线CCI
day_cci = calculate_cci(df, period=self.day_period).iloc[-1]
current_price = df["close"].iloc[-1]
result = {
"triggered": day_cci < self.threshold,
"day_cci": round(day_cci, 2),
"week_cci": None,
"current_price": round(current_price, 2),
}
# 计算周线CCI
if self.use_weekly:
weekly_df = resample_to_weekly(df)
if len(weekly_df) >= self.week_period:
week_cci = calculate_cci(weekly_df, period=self.week_period).iloc[-1]
result["week_cci"] = round(week_cci, 2)
# 日线或周线任一超卖即触发
result["triggered"] = (
day_cci < self.threshold or week_cci < self.threshold
)
return result
def get_data_from_db(self, code: str, limit: int = 100) -> pd.DataFrame:
"""从数据库获取数据"""
sql = f"""
SELECT date, open, high, low, close, volume
FROM public.index_kline
WHERE code = '{code}'
ORDER BY date DESC
LIMIT {limit}
"""
result = self.db_manager.execute_query(sql)
if not result:
return pd.DataFrame()
df = pd.DataFrame(result)
df["date"] = pd.to_datetime(df["date"])
# 转换数值类型
for col in ["open", "high", "low", "close", "volume"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df = df.sort_values("date").reset_index(drop=True)
return df
def run_screening(self, code_list: list = None) -> list:
"""
执行批量筛选
Args:
code_list: 标的代码列表None则从配置文件读取
Returns:
list: 符合条件的标的列表
"""
if code_list is None:
# 从CSV文件读取指数列表
import os
csv_path = self.config.get("index_fund_info_file", "index_fund_info.csv")
if os.path.exists(csv_path):
df = pd.read_csv(csv_path, encoding="utf-8-sig")
code_list = df.drop_duplicates(subset=["指数代码"]).to_dict("records")
else:
raise ValueError(f"找不到标的列表文件: {csv_path}")
signals = []
today_str = datetime.now().strftime("%Y-%m-%d")
print(f"开始CCI筛选{len(code_list)} 个标的...")
for i, code_info in enumerate(code_list):
if isinstance(code_info, dict):
code = code_info.get("指数代码")
name = code_info.get("指数名称", code)
else:
code = code_info
name = code
try:
df = self.get_data_from_db(code, limit=self.config.get("lookback_days", 100))
if len(df) < self.day_period:
continue
# 检查最新日期
if df["date"].max().strftime("%Y-%m-%d") != today_str:
continue
result = self.screen(df)
if result["triggered"]:
signals.append({
"code": code,
"name": name,
"day_cci": result["day_cci"],
"week_cci": result["week_cci"],
"price": result["current_price"],
})
print(f"{code} ({name}): 日线CCI={result['day_cci']:.2f}")
except Exception as e:
print(f"{code}: {e}")
continue
print(f"\n筛选完成,{len(signals)} 个标的符合CCI超卖条件")
# 发送通知
if signals:
self.notifier.notify_signal(signals, signal_type="CCI超卖")
return signals
def run_daily(self):
"""每日定时运行"""
from datetime import datetime
# 检查是否为交易日
if datetime.today().weekday() >= 5 and self.config.get("skip_weekend", True):
print("非交易日,跳过")
return
self.run_screening()
def create_cci_screener_from_config(config_path: str = None) -> CCIScreener:
"""从配置文件创建CCI筛选器"""
import yaml
import os
if config_path is None:
config_path = os.path.join(
os.path.dirname(__file__), "..", "..", "config", "strategies", "cci.yaml"
)
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
return CCIScreener(config)