""" 技术指标计算模块 包含CCI、EMA、MACD等常用技术指标 """ import pandas as pd import numpy as np import talib as ta def calculate_cci( df: pd.DataFrame, period: int = 14, high_col: str = "high", low_col: str = "low", close_col: str = "close", ) -> pd.Series: """ 计算CCI指标(商品通道指数) Args: df: DataFrame with OHLC data period: CCI周期 high_col: 最高价列名 low_col: 最低价列名 close_col: 收盘价列名 Returns: Series: CCI值 """ return ta.CCI( high=df[high_col], low=df[low_col], close=df[close_col], timeperiod=period, ) def calculate_ema( price_series: pd.Series, period: int = 20, ) -> pd.Series: """ 计算指数移动平均线 Args: price_series: 价格序列 period: EMA周期 Returns: Series: EMA值 """ return ta.EMA(price_series, timeperiod=period) def calculate_macd( price_series: pd.Series, fastperiod: int = 12, slowperiod: int = 26, signalperiod: int = 9, ) -> tuple[pd.Series, pd.Series, pd.Series]: """ 计算MACD指标 Args: price_series: 价格序列 fastperiod: 快线周期 slowperiod: 慢线周期 signalperiod: 信号线周期 Returns: tuple: (macd, signal, hist) """ macd, signal, hist = ta.MACD( price_series, fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod, ) return macd, signal, hist def calculate_td_sequence(close_series: pd.Series) -> pd.Series: """ 计算TD序列(Tom DeMark Sequential) Args: close_series: 收盘价序列 Returns: Series: TD序列值(正数为上涨计数,负数为下跌计数) """ close = close_series.to_list() td = [0, 0, 0, 0] up = 0 down = 0 for i in range(4, len(close)): if close[i] > close[i - 4]: up += 1 down = 0 td.append(up) else: down -= 1 up = 0 td.append(down) return pd.Series(td, index=close_series.index) def resample_to_weekly(df: pd.DataFrame) -> pd.DataFrame: """ 将日线数据重采样为周线数据 Args: df: DataFrame with columns: date, open, high, low, close, volume Returns: DataFrame: 周线数据 """ df = df.copy() if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df.set_index("date", inplace=True) weekly = pd.DataFrame( { "code": df["code"].resample("W").first() if "code" in df.columns else None, "open": df["open"].resample("W").first(), "high": df["high"].resample("W").max(), "low": df["low"].resample("W").min(), "close": df["close"].resample("W").last(), "volume": df["volume"].resample("W").sum(), } ) return weekly.dropna() class TechnicalScreener: """技术指标筛选器基类""" def __init__(self, name: str): self.name = name def screen(self, df: pd.DataFrame) -> bool: """ 判断数据是否符合筛选条件 Args: df: DataFrame with OHLCV data Returns: bool: 是否符合条件 """ raise NotImplementedError class CCIScreener(TechnicalScreener): """CCI超卖筛选器""" def __init__( self, day_period: int = 14, week_period: int = 14, threshold: float = -100, use_weekly: bool = True, ): super().__init__("CCI超卖筛选") self.day_period = day_period self.week_period = week_period self.threshold = threshold self.use_weekly = use_weekly def screen(self, df: pd.DataFrame) -> dict: """ 筛选CCI超卖信号 Returns: dict: { 'triggered': bool, # 是否触发信号 'day_cci': float, # 日线CCI值 'week_cci': float, # 周线CCI值(如启用) } """ # 计算日线CCI day_cci = calculate_cci(df, period=self.day_period).iloc[-1] result = { "triggered": day_cci < self.threshold, "day_cci": day_cci, "week_cci": None, } # 计算周线CCI(如果启用) if self.use_weekly: weekly_df = resample_to_weekly(df) if len(weekly_df) >= self.week_period: week_cci = calculate_cci(weekly_df, period=self.week_period).iloc[-1] result["week_cci"] = week_cci # 日线或周线任一超卖即触发 result["triggered"] = ( day_cci < self.threshold or week_cci < self.threshold ) return result