import numpy as np import pandas as pd from typing import Dict, Callable, List, Optional, Any from abc import ABC, abstractmethod import inspect import talib from factor_mining.time_series_op import register_time_series_operator from factor_mining.operators import _registry # ==================== 因子公式解析与计算 ==================== class FactorFormula: """因子公式:支持序列化和反序列化""" def __init__(self, expression: str, feature_names: List[str]): """ Parameters: ----------- expression : str 因子表达式(使用算子名称) feature_names : List[str] 特征名称列表 """ self.expression = expression self.feature_names = feature_names def compute(self, features: Dict[str, np.ndarray]) -> np.ndarray: """ 计算因子值 Parameters: ----------- features : Dict[str, np.ndarray] 特征字典,key为特征名称 Returns: -------- np.ndarray: 因子值 """ # 构建计算环境 env = {} # 添加特征 for name in self.feature_names: if name not in features: raise KeyError(f"特征 '{name}' 不存在") env[name] = features[name] # 添加算子 for op_name in _registry.list_all(): op = _registry.get(op_name) if op: env[op_name] = op.func # 添加numpy和pandas(用于某些表达式) env["np"] = np env["pd"] = pd # 执行表达式 try: # 限制可用的内置函数 safe_builtins = { "abs": abs, "min": min, "max": max, "sum": sum, "len": len, } result = eval(self.expression, {"__builtins__": safe_builtins}, env) # 确保结果是numpy数组 if not isinstance(result, np.ndarray): if isinstance(result, (int, float)): # 标量转换为数组(广播) result = np.full(len(features[self.feature_names[0]]), result) else: result = np.array(result) # 确保长度一致 expected_len = len(features[self.feature_names[0]]) if len(result) != expected_len: raise ValueError( f"表达式结果长度 {len(result)} 与特征长度 {expected_len} 不匹配" ) return result except Exception as e: raise RuntimeError(f"计算因子表达式失败: {e}\n表达式: {self.expression}") def to_dict(self) -> Dict: """序列化为字典""" return {"expression": self.expression, "feature_names": self.feature_names} @classmethod def from_dict(cls, data: Dict) -> "FactorFormula": """从字典反序列化""" return cls(data["expression"], data["feature_names"]) def __repr__(self): return f"FactorFormula(expression='{self.expression}', features={self.feature_names})"