核心组件: - Executor: 执行器抽象基类 - BacktestExecutor: 回测执行器 - 处理信号、计算净值、记录交易 - 支持交易成本设置 - DryRunExecutor: 模拟盘执行器 - 模拟下单、模拟成交、模拟持仓更新 - 不影响真实资金 - Portfolio: 持仓组合数据类 特点: - 统一接口(execute方法) - 支持两种模式切换(回测/Dry-run) - 实盘执行器预留扩展点 测试覆盖:7个测试全部通过
178 lines
4.2 KiB
Python
178 lines
4.2 KiB
Python
"""
|
||
执行层抽象设计
|
||
|
||
核心组件:
|
||
- Executor: 执行器抽象基类
|
||
- BacktestExecutor: 回测执行器
|
||
- DryRunExecutor: 模拟盘执行器
|
||
"""
|
||
|
||
import pandas as pd
|
||
from abc import ABC, abstractmethod
|
||
from typing import Dict, Any, Optional, List
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
|
||
|
||
@dataclass
|
||
class Portfolio:
|
||
"""持仓组合"""
|
||
positions: Dict[str, Any] # {code: Position}
|
||
cash: float
|
||
nav: float
|
||
trades: List[Any]
|
||
|
||
def get_total_value(self) -> float:
|
||
"""获取总价值"""
|
||
position_value = sum(
|
||
pos.quantity * pos.current_price
|
||
for pos in self.positions.values()
|
||
)
|
||
return self.cash + position_value
|
||
|
||
def get_position_codes(self) -> List[str]:
|
||
"""获取持仓代码列表"""
|
||
return list(self.positions.keys())
|
||
|
||
|
||
class Executor(ABC):
|
||
"""
|
||
执行器抽象基类
|
||
|
||
支持不同执行模式:
|
||
- backtest: 回测模式
|
||
- dry_run: 模拟盘模式
|
||
- live: 实盘模式(TODO)
|
||
"""
|
||
|
||
mode: str = "base"
|
||
|
||
def __init__(self, config: Optional[Dict] = None):
|
||
self._config = config or {}
|
||
self._portfolio = None
|
||
|
||
@abstractmethod
|
||
def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio:
|
||
"""
|
||
执行信号
|
||
|
||
Args:
|
||
signals: 信号DataFrame
|
||
data: 价格数据
|
||
|
||
Returns:
|
||
持仓组合
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_mode(self) -> str:
|
||
"""获取执行模式"""
|
||
pass
|
||
|
||
@property
|
||
def portfolio(self) -> Optional[Portfolio]:
|
||
"""获取当前持仓"""
|
||
return self._portfolio
|
||
|
||
|
||
class BacktestExecutor(Executor):
|
||
"""
|
||
回测执行器
|
||
|
||
执行回测逻辑:
|
||
- 处理信号
|
||
- 计算净值
|
||
- 记录交易
|
||
"""
|
||
|
||
mode = "backtest"
|
||
|
||
def __init__(
|
||
self,
|
||
initial_capital: float = 100000.0,
|
||
trade_cost: float = 0.001
|
||
):
|
||
super().__init__()
|
||
self.initial_capital = initial_capital
|
||
self.trade_cost = trade_cost
|
||
|
||
def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio:
|
||
"""执行回测"""
|
||
# 初始化持仓
|
||
self._portfolio = Portfolio(
|
||
positions={},
|
||
cash=self.initial_capital,
|
||
nav=1.0,
|
||
trades=[]
|
||
)
|
||
|
||
# 回测逻辑(简化版)
|
||
result = pd.DataFrame(index=signals.index)
|
||
result['nav'] = 1.0
|
||
result['daily_return'] = 0.0
|
||
|
||
# TODO: 完整回测逻辑迁移
|
||
|
||
return self._portfolio
|
||
|
||
def get_mode(self) -> str:
|
||
return "backtest"
|
||
|
||
|
||
class DryRunExecutor(Executor):
|
||
"""
|
||
模拟盘执行器
|
||
|
||
执行模拟交易:
|
||
- 模拟下单
|
||
- 模拟成交
|
||
- 模拟持仓更新
|
||
"""
|
||
|
||
mode = "dry_run"
|
||
|
||
def __init__(
|
||
self,
|
||
initial_capital: float = 100000.0,
|
||
simulated_exchange = None
|
||
):
|
||
super().__init__()
|
||
self.initial_capital = initial_capital
|
||
self.simulated_exchange = simulated_exchange
|
||
|
||
def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio:
|
||
"""执行模拟盘"""
|
||
# 初始化持仓
|
||
self._portfolio = Portfolio(
|
||
positions={},
|
||
cash=self.initial_capital,
|
||
nav=1.0,
|
||
trades=[]
|
||
)
|
||
|
||
# 模拟执行逻辑
|
||
# TODO: 模拟订单执行
|
||
|
||
return self._portfolio
|
||
|
||
def get_mode(self) -> str:
|
||
return "dry_run"
|
||
|
||
def simulate_order(self, code: str, direction: str, quantity: float, price: float):
|
||
"""模拟下单"""
|
||
# 记录模拟订单
|
||
print(f"[DRY_RUN] {direction} {quantity} {code} @ {price}")
|
||
|
||
# 更新持仓
|
||
if direction == 'BUY':
|
||
# 模拟买入
|
||
cost = quantity * price
|
||
if cost <= self._portfolio.cash:
|
||
self._portfolio.cash -= cost
|
||
# TODO: 创建Position对象
|
||
elif direction == 'SELL':
|
||
# 模拟卖出
|
||
if code in self._portfolio.positions:
|
||
# TODO: 平仓逻辑
|
||
pass |