""" 全球资产大类轮动策略(V2 正式版) 基于动量因子的全球资产轮动策略 - 支持信号-交易分离(指数信号 → ETF收益) - 强制分散化选股(每个 group 只选 1 个) - 动态短债阈值(标的动量 < 短债动量 → 不持有) - 溢价过滤(避免买入高溢价 ETF) - 调仓控制(rebalance_days + rebalance_threshold) - 交易成本计算(trade_cost: 0.1%) """ import pandas as pd import numpy as np from typing import Dict, Optional, Tuple from datetime import datetime, timedelta from framework_v2.core.strategy import StrategyBase from framework_v2.config.schemas import StrategyConfig from framework_v2.shared.factors import MomentumFactor from framework_v2.shared.data.alignment import CrossMarketAligner class GlobalRotationStrategy(StrategyBase): """ 全球资产大类轮动策略(V2 正式版) 策略逻辑: 1. 计算各指数标的动量得分(加权线性回归) 2. 使用动态短债阈值过滤负动量标的 3. 每个 group 内竞争,只选 Top 1(强制分散化) 4. 溢价过滤:排除溢价率 > 阈值的 ETF 5. 调仓控制:最低持仓天数 + 调仓阈值 6. 等权分配仓位 7. 扣除交易成本(0.1%) 示例: from framework_v2.config import load_config from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy config = load_config('config_simple.yaml') strategy = GlobalRotationStrategy(config) result = strategy.run() """ def __init__(self, config: StrategyConfig): """ 初始化策略 Args: config: 策略配置 """ super().__init__(config) # 初始化动量因子 self.momentum = MomentumFactor( n_days=config.factor.n_days, weighted=(config.factor.type.value == 'weighted_momentum') ) # 策略参数(从 config 中读取) rotation_config = config.rotation self.select_num = rotation_config.select_num if rotation_config else 3 self.diversified = rotation_config.diversified if rotation_config else True # 动态阈值配置 self.use_dynamic_threshold = False self.bond_code = None self.bond_ratio = 1.0 self.fill_bond = True if rotation_config and rotation_config.threshold: threshold_config = rotation_config.threshold if hasattr(threshold_config, 'mode') and threshold_config.mode == 'dynamic': self.use_dynamic_threshold = True dynamic_config = threshold_config.dynamic self.bond_code = dynamic_config.reference self.bond_ratio = dynamic_config.ratio # 调仓控制 self.rebalance_days = getattr(rotation_config, 'rebalance_days', 1) if rotation_config else 1 self.rebalance_threshold = getattr(rotation_config, 'rebalance_threshold', 0.0) if rotation_config else 0.0 # 交易成本 self.trade_cost = getattr(config.backtest, 'trade_cost', 0.001) if config.backtest else 0.001 # 溢价控制 self.use_premium_control = False self.premium_threshold = 0.10 # 默认 10% if hasattr(config, 'premium_control'): premium_config = config.premium_control self.use_premium_control = getattr(premium_config, 'enabled', False) if self.use_premium_control: self.premium_threshold = getattr(premium_config, 'default_threshold', 0.10) def get_codes(self) -> list: """ 获取标的列表(信号标的 + 交易标的 + 短债) Returns: 标的代码列表 """ codes = set() # 添加所有信号标的 codes.update(self.config.asset_pools.get_signal_codes()) # 添加所有交易标的 codes.update(self.config.asset_pools.get_trade_codes()) # 如果使用动态阈值,添加短债标的 if self.use_dynamic_threshold and self.bond_code: codes.add(self.bond_code) return list(codes) def get_data(self) -> Dict[str, pd.DataFrame]: """ 获取数据(分别获取指数和 ETF,使用不同的复权方式) 指数数据:使用 raw(原始价格)用于信号计算 ETF 数据:使用 hfq(后复权价格)用于收益计算 Returns: 数据字典 {code: DataFrame} """ if self._data_fetcher is None: self._data_fetcher = self._create_data_fetcher() # 获取信号→交易映射 signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() # 处理 end_date 为 None 的情况(使用今天) from datetime import date start = self.config.backtest.start_date end = self.config.backtest.end_date if end is None: end = date.today().strftime('%Y-%m-%d') data = {} # 1. 获取指数数据(信号标的,使用 raw) signal_codes = set(self.config.asset_pools.get_signal_codes()) if self.use_dynamic_threshold and self.bond_code: signal_codes.add(self.bond_code) if signal_codes: print(f"\n[数据] 获取 {len(signal_codes)} 只指数数据(adj='raw')...") try: index_data = self._data_fetcher.fetch_indices( codes=list(signal_codes), start=start, end=end, adj='raw' # 指数使用原始价格 ) data.update(index_data) print(f" ✓ 指数数据: {len(index_data)} 只") except Exception as e: print(f" ✗ 指数数据获取失败: {e}") # 2. 获取 ETF 数据(交易标的,使用 hfq) trade_codes = list(set(signal_to_trade.values())) if trade_codes: print(f"\n[数据] 获取 {len(trade_codes)} 只 ETF 数据(adj='hfq')...") try: etf_data = self._data_fetcher.fetch_etf( codes=trade_codes, start=start, end=end, adj='hfq' # ETF 使用后复权价格 ) data.update(etf_data) print(f" ✓ ETF 数据: {len(etf_data)} 只") except Exception as e: print(f" ✗ ETF 数据获取失败: {e}") return data def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]: """ 计算动量因子(只使用信号标的的数据) Args: data: 数据字典 {code: DataFrame} Returns: 因子字典 {signal_source: Series} """ factors = {} # 只使用信号标的计算因子 signal_codes = self.config.asset_pools.get_signal_codes() for code in signal_codes: if code not in data: print(f" 警告: {code} 数据不存在,跳过") continue try: df = data[code] factor_values = self.momentum.compute(df) factors[code] = factor_values except Exception as e: print(f" 警告: {code} 因子计算失败 - {e}") continue # 如果使用动态阈值,计算短债因子 if self.use_dynamic_threshold and self.bond_code and self.bond_code in data: try: df = data[self.bond_code] bond_factor = self.momentum.compute(df) factors[self.bond_code] = bond_factor print(f" [阈值] 短债动量因子已计算: {self.bond_code}") except Exception as e: print(f" 警告: 短债因子计算失败 - {e}") return factors def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame: """ 生成轮动信号(支持动态阈值和强制分散化) 逻辑: 1. 计算动态短债阈值(如果使用) 2. 因子对齐到 A 股日历(ffill 填充休市日) 3. 每个 group 内竞争,选 Top 1 4. 溢价过滤(如果启用) 5. 组合所有 group 的选股结果 Args: factors: 因子字典 {code: Series} Returns: 信号 DataFrame(index=日期, columns=signal_source, values=1或0) """ if not factors: return pd.DataFrame() # 获取 A 股交易日历 trading_calendar = self._get_trading_calendar() # 对齐所有因子到 A 股日历(关键:ffill 填充休市日) factor_df = pd.DataFrame(factors) factor_df = factor_df.reindex(trading_calendar).ffill() # 获取动态短债阈值(如果使用) bond_threshold = None if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors: # 也要对齐到 A 股日历 bond_threshold = factors[self.bond_code].reindex(trading_calendar).ffill() print(f" [阈值] 使用动态短债阈值: {self.bond_code}") # 获取溢价率数据(如果启用溢价控制) premium_data = None if self.use_premium_control: premium_data = self._get_premium_data() print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}") # 按 group 分组选股 # 注意:signals 的索引现在是 A 股交易日历 signals = pd.DataFrame(index=trading_calendar, columns=factor_df.columns, data=0) groups = self.config.asset_pools.by_group for date in factor_df.index: selected_codes = [] # 获取 BOND 组的动量作为阈值 bond_threshold_value = None if bond_threshold is not None and date in bond_threshold.index: bond_threshold_value = bond_threshold.loc[date] * self.bond_ratio # 对每个 group 独立选股(包括 BOND 组) for group_name, assets in groups.items(): # 获取该 group 的信号标的 group_signal_codes = [asset.signal_source for asset in assets.values()] # 获取当日因子值 date_factors = factor_df.loc[date][group_signal_codes].dropna() if date_factors.empty: continue # 应用动态阈值过滤(非 BOND 组需要超过 BOND 动量) if bond_threshold_value is not None and group_name != 'BOND': date_factors = date_factors[date_factors >= bond_threshold_value] if date_factors.empty: continue # 应用溢价过滤 if premium_data is not None: date_factors = self._filter_by_premium( date_factors, date, premium_data ) if date_factors.empty: continue # 选择 Top 1(强制分散化) top_code = date_factors.idxmax() selected_codes.append(top_code) # 第二步:从所有 group 的 Top 1 中(包括BOND),按动量再选 Top select_num 个 if selected_codes: # 获取这些标的的当日因子值 candidate_factors = factor_df.loc[date][selected_codes].dropna() if not candidate_factors.empty: # 按动量排序,选 Top select_num if len(candidate_factors) > self.select_num: final_selected = candidate_factors.nlargest(self.select_num).index.tolist() else: final_selected = candidate_factors.index.tolist() # 如果选中的不足 select_num,用 BOND 填充空余仓位 if self.fill_bond and self.bond_code: bond_has_data = (self.bond_code in factor_df.columns and pd.notna(factor_df.loc[date].get(self.bond_code))) if bond_has_data and self.bond_code not in final_selected: n_bond_slots = self.select_num - len(final_selected) for _ in range(n_bond_slots): final_selected.append(self.bond_code) # 标记信号 signals.loc[date, final_selected] = 1 return signals.astype(int) def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame: """ 仓位管理(等权分配 + 调仓控制) Args: signals: 信号 DataFrame Returns: 仓位 DataFrame """ positions = signals.astype(float).copy() # 跟踪上次调仓日期 last_rebalance_date = None for date in positions.index: signal_row = positions.loc[date].copy() n_selected = signal_row.sum() if n_selected == 0: # 空仓 positions.loc[date] = 0 continue # 检查是否需要调仓 if last_rebalance_date is not None: # 检查持仓天数 holding_days = (date - last_rebalance_date).days if holding_days < self.rebalance_days: # 未达到最低持仓天数,保持上次仓位 positions.loc[date] = positions.loc[last_rebalance_date] continue # 等权分配 positions.loc[date] = signal_row / n_selected last_rebalance_date = date return positions def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]: """ 执行回测(使用 CrossMarketAligner 进行正确的数据对齐) Args: positions: 仓位 DataFrame data: 数据字典 Returns: 回测结果字典 """ # 获取信号→交易映射 signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() # 获取 A 股交易日历 print("\n [对齐] 获取 A 股交易日历...") trading_calendar = self._get_trading_calendar() print(f" [日历] A 股交易日: {len(trading_calendar)} 天 ({trading_calendar[0]} ~ {trading_calendar[-1]})") # 创建对齐器 aligner = CrossMarketAligner(target_calendar=trading_calendar) # 提取交易标的的收盘价,并对齐到 A 股日历 print(" [对齐] 构建可实现价格序列(模拟真实交易)...") executable_close_dict = {} for signal_code, trade_code in signal_to_trade.items(): if trade_code in data: # 提取开盘价和收盘价 etf_df = data[trade_code] open_series = etf_df['open'].reindex(trading_calendar, method='ffill') close_series = etf_df['close'].reindex(trading_calendar, method='ffill') # 默认使用收盘价 exec_close = close_series.copy() # 检测调仓日,调整价格以反映真实交易 for i in range(1, len(trading_calendar)): date = trading_calendar[i] prev_date = trading_calendar[i-1] # 获取仓位变化 prev_pos = positions.loc[prev_date, signal_code] if signal_code in positions.columns else 0 curr_pos = positions.loc[date, signal_code] if signal_code in positions.columns else 0 # 买入日:修改前一天价格为当日开盘价 # 这样收益率 = (close[t] - open[t]) / open[t] = 日内收益 if pd.isna(prev_pos) or prev_pos == 0: if pd.notna(curr_pos) and curr_pos > 0: exec_close.loc[prev_date] = open_series.loc[date] # 卖出日:不需要修改(因为 positions[t]=0,不会计算收益) executable_close_dict[signal_code] = exec_close else: print(f" 警告: {trade_code} 数据不存在,跳过") # 使用 CrossMarketAligner 对齐多标的收益率 # 内部逻辑:先 ffill 价格到 A 股日历,再计算收益率 print(" [对齐] 计算收益率(使用可实现价格)...") returns_df = aligner.align_multi_asset(executable_close_dict) print(f" [对齐] 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的") # 对齐 positions 到 A 股日历 # 注意:必须先 reindex 再 ffill,因为 reindex(method='ffill') 不会填充已有的 NaN positions = positions.reindex(trading_calendar) # 卖出日不向前填充(保持 0) positions = positions.ffill().fillna(0) # 计算策略收益(仓位加权,无需延迟) # 因为 positions[t] 已表示 t 日的实际持仓,且价格已调整为可实现价格 strategy_returns = (positions * returns_df).sum(axis=1) # 扣除交易成本 strategy_returns, rebalance_count = self._apply_trade_cost( strategy_returns, positions ) print(f" [成本] 调仓次数: {rebalance_count}, 交易成本: {self.trade_cost:.2%}") # 计算净值曲线 equity_curve = (1 + strategy_returns).cumprod() # 检查是否有数据 if len(equity_curve) == 0: return { 'equity_curve': equity_curve, 'strategy_returns': strategy_returns, 'positions': positions, 'metrics': { 'total_return': 0, 'annual_return': 0, 'max_drawdown': 0, 'sharpe_ratio': 0, 'n_days': 0, 'rebalance_count': 0, } } # 计算绩效指标 total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1 n_days = len(strategy_returns) annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0 # 最大回撤 cumulative_max = equity_curve.cummax() drawdown = (equity_curve - cumulative_max) / cumulative_max max_drawdown = drawdown.min() # 夏普比率 sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0 return { 'equity_curve': equity_curve, 'strategy_returns': strategy_returns, 'positions': positions, 'metrics': { 'total_return': total_return, 'annual_return': annual_return, 'max_drawdown': max_drawdown, 'sharpe_ratio': sharpe, 'n_days': n_days, 'rebalance_count': rebalance_count, } } def _apply_trade_cost(self, strategy_returns: pd.Series, positions: pd.DataFrame) -> Tuple[pd.Series, int]: """ 扣除交易成本 Args: strategy_returns: 策略收益率 positions: 仓位 DataFrame Returns: (扣除成本后的收益率, 调仓次数) """ if self.trade_cost <= 0: return strategy_returns, 0 # 检测调仓(持仓变化) position_changes = (positions != positions.shift(1)).any(axis=1) rebalance_count = position_changes.sum() # 扣除交易成本 strategy_returns[position_changes] -= self.trade_cost return strategy_returns, rebalance_count def _get_premium_data(self) -> Optional[Dict]: """ 从已获取的数据中提取溢价率 Returns: 溢价率数据字典 {signal_code: premium_series} """ if not hasattr(self, '_data') or self._data is None: print(" [警告] 数据未加载,无法获取溢价率") return None signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() premium_dict = {} for signal_code, trade_code in signal_to_trade.items(): if trade_code in self._data: etf_df = self._data[trade_code] # 从 attrs 中提取溢价率序列 premium_series = etf_df.attrs.get('premium_series', {}) if premium_series: # 转换为 Series 并确保 DatetimeIndex premium_s = pd.Series(premium_series) premium_s.index = pd.to_datetime(premium_s.index) premium_dict[signal_code] = premium_s return premium_dict if premium_dict else None def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series: """ 溢价过滤 逻辑:如果 ETF 溢价率 > 阈值,则从候选中排除 Args: factors: 因子 Series date: 日期 premium_data: 溢价率数据字典 Returns: 过滤后的因子 Series """ if premium_data is None: return factors filtered_codes = [] for code in factors.index: if code in premium_data: # 获取当前日期的溢价率(前向填充) premium_s = premium_data[code] premium_before = premium_s[premium_s.index <= date] if len(premium_before) > 0: premium_rate = premium_before.iloc[-1] # 如果溢价率超过阈值,排除该标的 if premium_rate > self.premium_threshold: print(f" [溢价过滤] {code} 溢价率 {premium_rate:.2%} > 阈值 {self.premium_threshold:.2%},排除") continue filtered_codes.append(code) return factors[filtered_codes] if filtered_codes else pd.Series(dtype=float) def _get_trading_calendar(self) -> pd.DatetimeIndex: """ 获取 A 股交易日历 Returns: A 股交易日历 DatetimeIndex """ from datetime import date # 获取回测区间 start = self.config.backtest.start_date end = self.config.backtest.end_date if end is None: end = date.today().strftime('%Y-%m-%d') # 创建临时数据获取器来获取交易日历 if self._data_fetcher is None: self._data_fetcher = self._create_data_fetcher() try: # 调用 get_trading_calendar 方法 calendar = self._data_fetcher.get_trading_calendar( market='A', start=start, end=end ) print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})") return calendar except Exception as e: print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}") # 降级方案:使用 pandas 生成工作日 start_dt = pd.Timestamp(start) end_dt = pd.Timestamp(end) return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日 @staticmethod def _safe_val(v, decimals=4): """安全转换数值,处理 NaN/Inf""" import math if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): return None if isinstance(v, (np.floating, float)): return round(float(v), decimals) if isinstance(v, (np.integer, int)): return int(v) return v def _export_backtest_detail( self, factors: Dict[str, pd.Series], signals: pd.DataFrame, positions: pd.DataFrame, result: Dict, output_path: str ): """ 导出逐日明细到 JSON Args: factors: 因子字典 signals: 信号 DataFrame positions: 仓位 DataFrame result: 回测结果 output_path: 输出文件路径 """ import json from pathlib import Path # 准备数据 equity_curve = result['equity_curve'] strategy_returns = result['strategy_returns'] trading_calendar = equity_curve.index # 提取溢价率 premium_dict = self._get_premium_data() # 准备价格数据 signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() index_close_dict = {} etf_close_dict = {} for signal_code, trade_code in signal_to_trade.items(): if signal_code in self._data: index_close_dict[signal_code] = self._data[signal_code]['close'] if trade_code in self._data: etf_close_dict[signal_code] = self._data[trade_code]['close'] # 计算收益率(对齐到 A 股日历) index_return_dict = {} etf_return_dict = {} # 构建 ETF 可实现价格序列(与回测一致) executable_etf_close = {} for signal_code, trade_code in signal_to_trade.items(): if trade_code in self._data: etf_df = self._data[trade_code] open_series = etf_df['open'].reindex(trading_calendar, method='ffill') close_series = etf_df['close'].reindex(trading_calendar, method='ffill') # 默认使用 close exec_close = close_series.copy() # 检测调仓日,调整价格 for i in range(1, len(trading_calendar)): date = trading_calendar[i] prev_date = trading_calendar[i-1] # 获取仓位变化 prev_pos = positions.loc[prev_date, signal_code] if signal_code in positions.columns else 0 curr_pos = positions.loc[date, signal_code] if signal_code in positions.columns else 0 # 买入日:修改前一天价格为 open if pd.isna(prev_pos) or prev_pos == 0: if pd.notna(curr_pos) and curr_pos > 0: exec_close.loc[prev_date] = open_series.loc[date] executable_etf_close[signal_code] = exec_close for signal_code, trade_code in signal_to_trade.items(): # 指数收益率 if signal_code in index_close_dict: idx_close = index_close_dict[signal_code].reindex(trading_calendar, method='ffill') idx_return = idx_close.pct_change(fill_method=None).fillna(0) index_return_dict[signal_code] = idx_return # ETF 收益率(使用可实现价格) if signal_code in executable_etf_close: etf_exec = executable_etf_close[signal_code] etf_return = etf_exec.pct_change(fill_method=None).fillna(0) etf_return_dict[signal_code] = etf_return # 对齐因子 factor_df = pd.DataFrame(factors) if not isinstance(factor_df.index, pd.DatetimeIndex): factor_df.index = pd.to_datetime(factor_df.index) factor_df_aligned = factor_df.reindex(trading_calendar).ffill() # 对齐价格 positions_aligned = positions.reindex(trading_calendar, method='ffill') # 持仓状态跟踪 holdings_state = {} prev_holdings = set() days_list = [] # 配置信息 bond_code = self.bond_code if self.use_dynamic_threshold else None bond_ratio = self.bond_ratio # 逐日构建 for date in trading_calendar: # 当前持仓 pos_row = positions_aligned.loc[date] current_holdings = set(pos_row[pos_row > 0].index.tolist()) # 调仓检测 added = list(current_holdings - prev_holdings) removed = list(prev_holdings - current_holdings) is_rebalance = len(added) > 0 or len(removed) > 0 # 更新持仓状态 for code in removed: holdings_state.pop(code, None) for code in added: entry_price = None if code in etf_close_dict: ep = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) if pd.notna(ep): entry_price = float(ep) holdings_state[code] = { 'entry_date': date.strftime('%Y-%m-%d'), 'entry_price': entry_price, } # 动量得分和阈值 factor_scores = {} if date in factor_df_aligned.index: for code in factor_df_aligned.columns: v = factor_df_aligned.loc[date, code] if pd.notna(v): factor_scores[code] = float(v) bond_score = factor_scores.get(bond_code) if bond_code else None threshold = bond_score * bond_ratio if bond_score else 0.0 # 排名(所有标的都参与排名,包括 BOND) groups = self.config.asset_pools.by_group bond_codes = set(groups.get('BOND', {}).keys()) # 所有标的都参与排名 sorted_codes = sorted(factor_scores.keys(), key=lambda c: factor_scores[c], reverse=True) rank_map = {c: r + 1 for r, c in enumerate(sorted_codes) if c in factor_scores} # 构建每标的详情 assets = {} all_codes = factor_df.columns.tolist() for code in all_codes: asset = {} # 动量相关 mom = factor_scores.get(code) asset['momentum'] = self._safe_val(mom, 4) asset['rank'] = rank_map.get(code) asset['threshold'] = self._safe_val(threshold, 4) asset['above_threshold'] = mom >= threshold if mom is not None else False # 价格 if code in index_close_dict: idx_close = index_close_dict[code].reindex(trading_calendar, method='ffill').get(date) asset['index_close'] = self._safe_val(idx_close, 2) if pd.notna(idx_close) else None else: asset['index_close'] = None if code in etf_close_dict: etf_close = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) asset['etf_close'] = self._safe_val(etf_close, 3) if pd.notna(etf_close) else None else: asset['etf_close'] = None # 当日收益率 if code in index_return_dict: idx_ret = index_return_dict[code].loc[date] if date in index_return_dict[code].index else 0 asset['index_return'] = self._safe_val(idx_ret, 6) if pd.notna(idx_ret) else 0.0 else: asset['index_return'] = 0.0 if code in etf_return_dict: etf_ret = etf_return_dict[code].loc[date] if date in etf_return_dict[code].index else 0 asset['etf_return_ctc'] = self._safe_val(etf_ret, 6) if pd.notna(etf_ret) else 0.0 else: asset['etf_return_ctc'] = 0.0 # 溢价率 if code in premium_dict: premium_s = premium_dict[code] if date in premium_s.index: premium_val = premium_s.loc[date] asset['premium'] = round(float(premium_val), 4) if pd.notna(premium_val) else None else: premium_before = premium_s[premium_s.index <= date] if len(premium_before) > 0: asset['premium'] = round(float(premium_before.iloc[-1]), 4) else: asset['premium'] = None else: asset['premium'] = None # 持仓状态 is_held = code in current_holdings asset['is_held'] = is_held if is_held and code in holdings_state: hs = holdings_state[code] asset['entry_date'] = hs['entry_date'] asset['entry_price_etf'] = self._safe_val(hs['entry_price'], 4) asset['entry_price_idx'] = None entry_dt = pd.Timestamp(hs['entry_date']) trading_days_held = len(trading_calendar[(trading_calendar >= entry_dt) & (trading_calendar <= date)]) asset['holding_days'] = trading_days_held # 累计收益(分别使用 ETF 和指数价格计算) if hs['entry_price'] and hs['entry_price'] > 0: # ETF 累计收益 if code in etf_close_dict: etf_cur = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) if etf_cur and pd.notna(etf_cur): etf_cum_ret = float(etf_cur) / hs['entry_price'] - 1 asset['cum_return_etf'] = self._safe_val(etf_cum_ret, 4) else: asset['cum_return_etf'] = None else: asset['cum_return_etf'] = None # 指数累计收益(独立计算) if code in index_close_dict: idx_cur = index_close_dict[code].reindex(trading_calendar, method='ffill').get(date) idx_entry = index_close_dict[code].reindex(trading_calendar, method='ffill').get(entry_dt) if idx_cur and idx_entry and pd.notna(idx_entry) and float(idx_entry) > 0: idx_cum_ret = float(idx_cur) / float(idx_entry) - 1 asset['cum_return_idx'] = self._safe_val(idx_cum_ret, 4) else: asset['cum_return_idx'] = None else: asset['cum_return_idx'] = None else: asset['cum_return_etf'] = None asset['cum_return_idx'] = None else: asset['entry_date'] = None asset['entry_price_etf'] = None asset['entry_price_idx'] = None asset['holding_days'] = 0 asset['cum_return_etf'] = None asset['cum_return_idx'] = None assets[code] = asset # 信号 signal_row = signals.loc[date] if date in signals.index else pd.Series(dtype=float) active_signals = {code: int(val) for code, val in signal_row.items() if val > 0} # 构建日记录 day_record = { 'date': date.strftime('%Y-%m-%d'), 'nav': self._safe_val(equity_curve.loc[date], 4), 'daily_return': self._safe_val(strategy_returns.loc[date], 6), 'is_rebalance': is_rebalance, 'signals': active_signals, 'holdings': sorted(list(current_holdings)), 'added': sorted(added), 'removed': sorted(removed), 'assets': assets } days_list.append(day_record) prev_holdings = current_holdings # 构建元数据 codes_meta = {} for code in all_codes: asset_config = self.config.asset_pools.assets.get(code) codes_meta[code] = { 'name': asset_config.name if asset_config else code, 'etf': asset_config.trade_source if asset_config else None, 'market': asset_config.group if asset_config else None } output = { 'meta': { 'mode': 'V2: 指数信号 + ETF收益', 'start_date': trading_calendar[0].strftime('%Y-%m-%d'), 'end_date': trading_calendar[-1].strftime('%Y-%m-%d'), 'total_days': len(trading_calendar), 'select_num': self.select_num, 'n_days': self.config.factor.n_days, 'trade_cost': self.trade_cost, 'bond_threshold': { 'enabled': self.use_dynamic_threshold, 'bond_code': bond_code, 'ratio': bond_ratio }, 'codes': codes_meta }, 'days': days_list } # 输出 output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False) file_size_mb = output_path.stat().st_size / 1024 / 1024 print(f" 写入 {output_path}") print(f" 大小: {file_size_mb:.1f} MB") print(f" 天数: {len(days_list)}") print(f" 标的: {len(all_codes)}")