""" 执行层抽象接口(通用) 只提供抽象基类和Portfolio数据结构,具体执行器可扩展 """ from abc import ABC, abstractmethod from typing import Dict, List, Optional import pandas as pd from datetime import datetime from framework.risk import Position class Portfolio: """ 投资组合数据结构(通用) 用于管理持仓集合 """ def __init__(self, initial_capital: float = 100000): """初始化投资组合""" self.initial_capital = initial_capital self.cash = initial_capital self.positions: Dict[str, Position] = {} self.trades: List[Dict] = [] self._net_value_history: List[float] = [] def add_position(self, code: str, price: float, quantity: float, time: datetime) -> None: """添加持仓""" position = Position( code=code, entry_price=price, current_price=price, entry_time=time, quantity=quantity ) self.positions[code] = position self.cash -= price * quantity self.trades.append({ 'action': 'BUY', 'code': code, 'price': price, 'quantity': quantity, 'time': time }) def remove_position(self, code: str, price: float, time: datetime) -> float: """移除持仓""" if code not in self.positions: return 0 position = self.positions[code] profit = (price - position.entry_price) * position.quantity self.cash += price * position.quantity del self.positions[code] self.trades.append({ 'action': 'SELL', 'code': code, 'price': price, 'quantity': position.quantity, 'time': time, 'profit': profit }) return profit def update_prices(self, prices: Dict[str, float]) -> None: """更新持仓价格""" for code, price in prices.items(): if code in self.positions: self.positions[code].current_price = price def get_net_value(self) -> float: """计算净值""" positions_value = sum( pos.current_price * pos.quantity for pos in self.positions.values() ) return self.cash + positions_value def record_net_value(self) -> None: """记录当前净值""" self._net_value_history.append(self.get_net_value()) def get_net_value_series(self) -> pd.Series: """获取净值序列""" return pd.Series(self._net_value_history) def get_weight(self, code: str) -> float: """计算持仓权重""" if code not in self.positions: return 0 position_value = self.positions[code].current_price * self.positions[code].quantity return position_value / self.get_net_value() def __repr__(self) -> str: return f"Portfolio(capital={self.cash:.2f}, positions={len(self.positions)})" class Executor(ABC): """ 执行器抽象基类 所有执行器必须实现execute方法 """ mode: str = "base" def __init__(self, **params): """初始化执行器参数""" self._params = params @abstractmethod def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio: """ 执行信号 Args: signals: 信号DataFrame data: OHLCV数据 Returns: Portfolio对象 """ pass def __repr__(self) -> str: params_str = ', '.join([f"{k}={v}" for k, v in self._params.items()]) return f"{self.__class__.__name__}({params_str})" class BacktestExecutor(Executor): """ 回测执行器(通用骨架) 具体回测逻辑需要在strategies中定制实现 """ mode = "backtest" def __init__(self, initial_capital: float = 100000, trade_cost: float = 0.001): super().__init__(initial_capital=initial_capital, trade_cost=trade_cost) self.initial_capital = initial_capital self.trade_cost = trade_cost def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio: """ 执行回测(简化版本) 完整回测逻辑需要定制实现 """ portfolio = Portfolio(self.initial_capital) # 这里只提供骨架,具体逻辑需要定制实现 # 包括:净值计算、交易成本扣除、基准对比等 return portfolio class DryRunExecutor(Executor): """ Dry-run执行器(通用) 用于模拟运行,不实际执行交易 """ mode = "dry_run" def __init__(self, verbose: bool = True): super().__init__(verbose=verbose) self.verbose = verbose def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio: """模拟执行""" portfolio = Portfolio(100000) for date in signals.index: signal = signals.loc[date, 'signal'] if signal and self.verbose: print(f"[{date}] Signal: {signal}") return portfolio # 导出抽象接口 __all__ = ['Portfolio', 'Executor', 'BacktestExecutor', 'DryRunExecutor']