import numpy as np import pandas as pd # ==================== 时间序列算子 ==================== def _rolling_mean(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).mean().to_numpy() def _rolling_std(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return s.rolling(window, min_periods=max(2, window // 2)).std().to_numpy() def _ts_delta(x: np.ndarray, period: int) -> np.ndarray: s = pd.Series(x) return s.diff(period).to_numpy() def _ts_rank(x: np.ndarray, window: int) -> np.ndarray: s = pd.Series(x) return ( s.rolling(window, min_periods=max(2, window // 2)) .apply(lambda a: pd.Series(a).rank(pct=True).iloc[-1], raw=False) .to_numpy() ) def _delay(x: np.ndarray, period: int) -> np.ndarray: s = pd.Series(x) return s.shift(period).to_numpy() def _pct_change(x: np.ndarray, period: int = 1) -> np.ndarray: """百分比变化""" s = pd.Series(x) return s.pct_change(periods=period, fill_method=None).to_numpy() def register_time_series_operator(registry) -> None: """注册算子""" # 注册时间序列算子(带不同窗口) for w in range(5, 50, 5): registry.register_function( f"sma{w}", (lambda win: lambda x: _rolling_mean(x, win))(w), f"简单移动平均: SMA(x, {w})", ) registry.register_function( f"std{w}", (lambda win: lambda x: _rolling_std(x, win))(w), f"滚动标准差: STD(x, {w})", ) registry.register_function( f"rank{w}", (lambda win: lambda x: _ts_rank(x, win))(w), f"滚动排名: RANK(x, {w})", ) registry.register_function( f"delta{w}", (lambda win: lambda x: _ts_delta(x, win))(w), f"差分: DELTA(x, {w})", ) registry.register_function( f"delay{w}", (lambda win: lambda x: _delay(x, win))(w), f"延迟: DELAY(x, {w})", ) registry.register_function( f"pct_change{w}", (lambda win: lambda x: _pct_change(x, win))(w), f"百分比变化: PCT_CHANGE(x, {w})", )