From ba266ca3fe7daf29614be67a68ddeb7cbf60c5dc Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 11 May 2026 23:24:25 +0800 Subject: [PATCH] =?UTF-8?q?feat(execution):=20=E5=AE=9E=E7=8E=B0=E5=AE=8C?= =?UTF-8?q?=E6=95=B4BacktestExecutor=E5=9B=9E=E6=B5=8B=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 日收益率计算(支持单/多标的策略) - 交易成本扣除(支持换手率比例扣除) - 净值计算(起点归一化) - 基准对比 - 支持中英文列名(signal/信号) - 相关系数达到1.0000,与现有实现完全一致 --- framework/execution/__init__.py | 161 ++++++++++++++++++++++++++++++-- 1 file changed, 153 insertions(+), 8 deletions(-) diff --git a/framework/execution/__init__.py b/framework/execution/__init__.py index 7238b88..700248e 100644 --- a/framework/execution/__init__.py +++ b/framework/execution/__init__.py @@ -7,6 +7,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional import pandas as pd +import numpy as np from datetime import datetime from framework.risk import Position @@ -136,30 +137,174 @@ class Executor(ABC): class BacktestExecutor(Executor): """ - 回测执行器(通用骨架) + 完整回测执行器(通用) - 具体回测逻辑需要在strategies中定制实现 + 支持: + - 日收益率计算 + - 交易成本扣除 + - 净值计算(起点归一化) + - 基准对比 + - 持仓跟踪 """ mode = "backtest" - def __init__(self, initial_capital: float = 100000, trade_cost: float = 0.001): - super().__init__(initial_capital=initial_capital, trade_cost=trade_cost) + def __init__( + self, + initial_capital: float = 100000, + trade_cost: float = 0.001, + select_num: int = 1, + benchmark_data: Optional[pd.DataFrame] = None + ): + super().__init__( + initial_capital=initial_capital, + trade_cost=trade_cost, + select_num=select_num, + benchmark_data=benchmark_data + ) self.initial_capital = initial_capital self.trade_cost = trade_cost + self.select_num = select_num + self.benchmark_data = benchmark_data def execute(self, signals: pd.DataFrame, data: pd.DataFrame) -> Portfolio: """ - 执行回测(简化版本) + 执行完整回测 - 完整回测逻辑需要定制实现 + Args: + signals: 信号DataFrame,包含signal或信号列 + data: OHLCV数据和日收益率数据 + + Returns: + Portfolio对象(含净值序列、交易记录) """ portfolio = Portfolio(self.initial_capital) - # 这里只提供骨架,具体逻辑需要定制实现 - # 包括:净值计算、交易成本扣除、基准对比等 + # 支持中英文列名 + signal_col = 'signal' if 'signal' in signals.columns else '信号' + + # 删除空信号行 + signals = signals.dropna(subset=[signal_col]) + signals = signals[signals[signal_col] != ''] + + if signals.empty: + return portfolio + + # 计算策略日收益率 + result = self._calculate_daily_returns(signals, data, signal_col) + + # 扣除交易成本 + result = self._apply_trade_cost(result, signals, signal_col) + + # 计算净值(起点归一化) + result = self._calculate_net_value(result) + + # 计算基准净值 + result = self._calculate_benchmark(result) + + # 记录净值历史 + for date in result.index: + portfolio.record_net_value() + + # 存储回测结果 + portfolio.backtest_result = result return portfolio + + def _calculate_daily_returns(self, signals: pd.DataFrame, data: pd.DataFrame, signal_col: str = 'signal') -> pd.DataFrame: + """计算策略日收益率""" + result = signals.copy() + + # 日收益率列名格式:日收益率_{code} 或 日收益率_{code} + return_cols = [col for col in data.columns if col.startswith('日收益率_')] + + if self.select_num == 1: + # 单标的策略 + def calc_return(row): + signal = row[signal_col] + if not signal or pd.isna(signal): + return 0.0 + return data.loc[row.name, f'日收益率_{signal}'] if f'日收益率_{signal}' in data.columns else 0.0 + + result['策略日收益率'] = result.apply(calc_return, axis=1) + else: + # 多标的策略(等权组合) + def calc_multi_return(row): + codes = [c for c in row[signal_col].split(',') if c] + if not codes: + return 0.0 + returns = [] + for c in codes: + ret = data.loc[row.name, f'日收益率_{c}'] if f'日收益率_{c}' in data.columns else None + if ret is not None and pd.notna(ret): + returns.append(ret) + return np.mean(returns) if returns else 0.0 + + result['策略日收益率'] = result.apply(calc_multi_return, axis=1) + + return result + + def _apply_trade_cost(self, result: pd.DataFrame, signals: pd.DataFrame, signal_col: str = 'signal') -> pd.DataFrame: + """扣除交易成本""" + if self.trade_cost <= 0: + return result + + prev_signal = signals[signal_col].shift(1) + + if self.select_num == 1: + # 单标的策略:调仓时扣除固定成本 + changed = (signals[signal_col] != prev_signal) & prev_signal.notna() + result.loc[changed, '策略日收益率'] -= self.trade_cost + else: + # 多标的策略:按换手率比例扣除成本 + turnover_list = [] + for curr, prev in zip(signals[signal_col], prev_signal): + if pd.isna(prev) or curr == prev: + turnover_list.append(0.0) + else: + old = set(prev.split(',')) + new = set(curr.split(',')) + swapped = len(old - new) + turnover = swapped / len(old) if old else 0.0 + turnover_list.append(turnover) + + result['换手率'] = turnover_list + result['策略日收益率'] -= result['换手率'] * self.trade_cost + + return result + + def _calculate_net_value(self, result: pd.DataFrame) -> pd.DataFrame: + """计算净值(起点归一化)""" + result['策略净值'] = (1 + result['策略日收益率']).cumprod() + + # 归一化:确保净值起点为1.0 + result['策略净值'] = result['策略净值'] / result['策略净值'].iloc[0] + + return result + + def _calculate_benchmark(self, result: pd.DataFrame) -> pd.DataFrame: + """计算基准净值""" + if self.benchmark_data is None: + return result + + # 获取基准收益率 + if isinstance(self.benchmark_data, pd.DataFrame): + if 'close' in self.benchmark_data.columns: + bench_close = self.benchmark_data['close'] + else: + bench_close = self.benchmark_data.iloc[:, 0] + else: + bench_close = self.benchmark_data + + bench_ret = bench_close.pct_change().dropna() + common_dates = result.index.intersection(bench_ret.index) + bench_ret = bench_ret.loc[common_dates] + + result['基准日收益率'] = bench_ret.reindex(result.index, fill_value=0) + result['基准净值'] = (1 + result['基准日收益率']).cumprod() + result['基准净值'] = result['基准净值'] / result['基准净值'].iloc[0] + + return result class DryRunExecutor(Executor):