diff --git a/framework_v2/strategies/rotation/rotation.py b/framework_v2/strategies/rotation/rotation.py index 974d8a7..77ebfdc 100644 --- a/framework_v2/strategies/rotation/rotation.py +++ b/framework_v2/strategies/rotation/rotation.py @@ -161,9 +161,10 @@ class GlobalRotationStrategy(StrategyBase): 逻辑: 1. 计算动态短债阈值(如果使用) - 2. 每个 group 内竞争,选 Top 1 - 3. 溢价过滤(如果启用) - 4. 组合所有 group 的选股结果 + 2. 因子对齐到 A 股日历(ffill 填充休市日) + 3. 每个 group 内竞争,选 Top 1 + 4. 溢价过滤(如果启用) + 5. 组合所有 group 的选股结果 Args: factors: 因子字典 {code: Series} @@ -174,13 +175,18 @@ class GlobalRotationStrategy(StrategyBase): if not factors: return pd.DataFrame() - # 对齐所有因子的日期 + # 获取 A 股交易日历 + trading_calendar = self._get_trading_calendar() + + # 对齐所有因子到 A 股日历(关键:ffill 填充休市日) factor_df = pd.DataFrame(factors) + factor_df = factor_df.reindex(trading_calendar).ffill() # 获取动态短债阈值(如果使用) bond_threshold = None if self.use_dynamic_threshold and self.bond_code and self.bond_code in factors: - bond_threshold = factors[self.bond_code] + # 也要对齐到 A 股日历 + bond_threshold = factors[self.bond_code].reindex(trading_calendar).ffill() print(f" [阈值] 使用动态短债阈值: {self.bond_code}") # 获取溢价率数据(如果启用溢价控制) @@ -190,14 +196,20 @@ class GlobalRotationStrategy(StrategyBase): print(f" [溢价] 启用溢价过滤,阈值: {self.premium_threshold:.1%}") # 按 group 分组选股 - signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0) + # 注意:signals 的索引现在是 A 股交易日历 + signals = pd.DataFrame(index=trading_calendar, columns=factor_df.columns, data=0) groups = self.config.asset_pools.by_group for date in factor_df.index: selected_codes = [] - # 对每个 group 独立选股 + # 获取 BOND 组的动量作为阈值 + bond_threshold_value = None + if bond_threshold is not None and date in bond_threshold.index: + bond_threshold_value = bond_threshold.loc[date] * self.bond_ratio + + # 对每个 group 独立选股(包括 BOND 组) for group_name, assets in groups.items(): # 获取该 group 的信号标的 group_signal_codes = [asset.signal_source for asset in assets.values()] @@ -208,10 +220,9 @@ class GlobalRotationStrategy(StrategyBase): if date_factors.empty: continue - # 应用动态阈值过滤 - if bond_threshold is not None and date in bond_threshold.index: - threshold_value = bond_threshold.loc[date] * self.bond_ratio - date_factors = date_factors[date_factors >= threshold_value] + # 应用动态阈值过滤(非 BOND 组需要超过 BOND 动量) + if bond_threshold_value is not None and group_name != 'BOND': + date_factors = date_factors[date_factors >= bond_threshold_value] if date_factors.empty: continue @@ -229,7 +240,7 @@ class GlobalRotationStrategy(StrategyBase): top_code = date_factors.idxmax() selected_codes.append(top_code) - # 第二步:从所有 group 的 Top 1 中,按动量再选 Top select_num 个 + # 第二步:从所有 group 的 Top 1 中(包括BOND),按动量再选 Top select_num 个 if selected_codes: # 获取这些标的的当日因子值 candidate_factors = factor_df.loc[date][selected_codes].dropna() @@ -240,6 +251,16 @@ class GlobalRotationStrategy(StrategyBase): final_selected = candidate_factors.nlargest(self.select_num).index.tolist() else: final_selected = candidate_factors.index.tolist() + + # 如果选中的不足 select_num,用 BOND 填充空余仓位 + if self.fill_bond and self.bond_code: + bond_has_data = (self.bond_code in factor_df.columns and + pd.notna(factor_df.loc[date].get(self.bond_code))) + + if bond_has_data and self.bond_code not in final_selected: + n_bond_slots = self.select_num - len(final_selected) + for _ in range(n_bond_slots): + final_selected.append(self.bond_code) # 标记信号 signals.loc[date, final_selected] = 1 @@ -411,23 +432,43 @@ class GlobalRotationStrategy(StrategyBase): def _get_premium_data(self) -> Optional[Dict]: """ - 获取溢价率数据 + 从已获取的数据中提取溢价率 Returns: - 溢价率数据字典 {trade_code: {date: premium_rate}} + 溢价率数据字典 {signal_code: premium_series} """ - # TODO: 从数据源获取溢价率数据 - # 当前返回 None,后续实现 - return None + if not hasattr(self, '_data') or self._data is None: + print(" [警告] 数据未加载,无法获取溢价率") + return None + + signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() + + premium_dict = {} + for signal_code, trade_code in signal_to_trade.items(): + if trade_code in self._data: + etf_df = self._data[trade_code] + + # 从 attrs 中提取溢价率序列 + premium_series = etf_df.attrs.get('premium_series', {}) + + if premium_series: + # 转换为 Series 并确保 DatetimeIndex + premium_s = pd.Series(premium_series) + premium_s.index = pd.to_datetime(premium_s.index) + premium_dict[signal_code] = premium_s + + return premium_dict if premium_dict else None def _filter_by_premium(self, factors: pd.Series, date: pd.Timestamp, premium_data: Dict) -> pd.Series: """ 溢价过滤 + 逻辑:如果 ETF 溢价率 > 阈值,则从候选中排除 + Args: factors: 因子 Series date: 日期 - premium_data: 溢价率数据 + premium_data: 溢价率数据字典 Returns: 过滤后的因子 Series @@ -435,8 +476,24 @@ class GlobalRotationStrategy(StrategyBase): if premium_data is None: return factors - # TODO: 实现溢价过滤逻辑 - return factors + filtered_codes = [] + for code in factors.index: + if code in premium_data: + # 获取当前日期的溢价率(前向填充) + premium_s = premium_data[code] + premium_before = premium_s[premium_s.index <= date] + + if len(premium_before) > 0: + premium_rate = premium_before.iloc[-1] + + # 如果溢价率超过阈值,排除该标的 + if premium_rate > self.premium_threshold: + print(f" [溢价过滤] {code} 溢价率 {premium_rate:.2%} > 阈值 {self.premium_threshold:.2%},排除") + continue + + filtered_codes.append(code) + + return factors[filtered_codes] if filtered_codes else pd.Series(dtype=float) def _get_trading_calendar(self) -> pd.DatetimeIndex: """ @@ -472,3 +529,294 @@ class GlobalRotationStrategy(StrategyBase): start_dt = pd.Timestamp(start) end_dt = pd.Timestamp(end) return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日 + + @staticmethod + def _safe_val(v, decimals=4): + """安全转换数值,处理 NaN/Inf""" + import math + + if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): + return None + if isinstance(v, (np.floating, float)): + return round(float(v), decimals) + if isinstance(v, (np.integer, int)): + return int(v) + return v + + def _export_backtest_detail( + self, + factors: Dict[str, pd.Series], + signals: pd.DataFrame, + positions: pd.DataFrame, + result: Dict, + output_path: str + ): + """ + 导出逐日明细到 JSON + + Args: + factors: 因子字典 + signals: 信号 DataFrame + positions: 仓位 DataFrame + result: 回测结果 + output_path: 输出文件路径 + """ + import json + from pathlib import Path + + # 准备数据 + equity_curve = result['equity_curve'] + strategy_returns = result['strategy_returns'] + trading_calendar = equity_curve.index + + # 提取溢价率 + premium_dict = self._get_premium_data() + + # 准备价格数据 + signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping() + index_close_dict = {} + etf_close_dict = {} + + for signal_code, trade_code in signal_to_trade.items(): + if signal_code in self._data: + index_close_dict[signal_code] = self._data[signal_code]['close'] + if trade_code in self._data: + etf_close_dict[signal_code] = self._data[trade_code]['close'] + + # 计算收益率(对齐到 A 股日历) + index_return_dict = {} + etf_return_dict = {} + + for signal_code, trade_code in signal_to_trade.items(): + # 指数收益率 + if signal_code in index_close_dict: + idx_close = index_close_dict[signal_code].reindex(trading_calendar, method='ffill') + idx_return = idx_close.pct_change(fill_method=None).fillna(0) + index_return_dict[signal_code] = idx_return + + # ETF 收益率 + if signal_code in etf_close_dict: + etf_close = etf_close_dict[signal_code].reindex(trading_calendar, method='ffill') + etf_return = etf_close.pct_change(fill_method=None).fillna(0) + etf_return_dict[signal_code] = etf_return + + # 对齐因子 + factor_df = pd.DataFrame(factors) + if not isinstance(factor_df.index, pd.DatetimeIndex): + factor_df.index = pd.to_datetime(factor_df.index) + + factor_df_aligned = factor_df.reindex(trading_calendar).ffill() + + # 对齐价格 + positions_aligned = positions.reindex(trading_calendar, method='ffill') + + # 持仓状态跟踪 + holdings_state = {} + prev_holdings = set() + days_list = [] + + # 配置信息 + bond_code = self.bond_code if self.use_dynamic_threshold else None + bond_ratio = self.bond_ratio + + # 逐日构建 + for date in trading_calendar: + # 当前持仓 + pos_row = positions_aligned.loc[date] + current_holdings = set(pos_row[pos_row > 0].index.tolist()) + + # 调仓检测 + added = list(current_holdings - prev_holdings) + removed = list(prev_holdings - current_holdings) + is_rebalance = len(added) > 0 or len(removed) > 0 + + # 更新持仓状态 + for code in removed: + holdings_state.pop(code, None) + for code in added: + entry_price = None + if code in etf_close_dict: + ep = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + if pd.notna(ep): + entry_price = float(ep) + + holdings_state[code] = { + 'entry_date': date.strftime('%Y-%m-%d'), + 'entry_price': entry_price, + } + + # 动量得分和阈值 + factor_scores = {} + if date in factor_df_aligned.index: + for code in factor_df_aligned.columns: + v = factor_df_aligned.loc[date, code] + if pd.notna(v): + factor_scores[code] = float(v) + + bond_score = factor_scores.get(bond_code) if bond_code else None + threshold = bond_score * bond_ratio if bond_score else 0.0 + + # 排名(所有标的都参与排名,包括 BOND) + groups = self.config.asset_pools.by_group + bond_codes = set(groups.get('BOND', {}).keys()) + + # 所有标的都参与排名 + sorted_codes = sorted(factor_scores.keys(), key=lambda c: factor_scores[c], reverse=True) + rank_map = {c: r + 1 for r, c in enumerate(sorted_codes) if c in factor_scores} + + # 构建每标的详情 + assets = {} + all_codes = factor_df.columns.tolist() + + for code in all_codes: + asset = {} + + # 动量相关 + mom = factor_scores.get(code) + asset['momentum'] = self._safe_val(mom, 4) + asset['rank'] = rank_map.get(code) + asset['threshold'] = self._safe_val(threshold, 4) + asset['above_threshold'] = mom >= threshold if mom is not None else False + + # 价格 + if code in index_close_dict: + idx_close = index_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + asset['index_close'] = self._safe_val(idx_close, 2) if pd.notna(idx_close) else None + else: + asset['index_close'] = None + + if code in etf_close_dict: + etf_close = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + asset['etf_close'] = self._safe_val(etf_close, 3) if pd.notna(etf_close) else None + else: + asset['etf_close'] = None + + # 当日收益率 + if code in index_return_dict: + idx_ret = index_return_dict[code].loc[date] if date in index_return_dict[code].index else 0 + asset['index_return'] = self._safe_val(idx_ret, 6) if pd.notna(idx_ret) else 0.0 + else: + asset['index_return'] = 0.0 + + if code in etf_return_dict: + etf_ret = etf_return_dict[code].loc[date] if date in etf_return_dict[code].index else 0 + asset['etf_return_ctc'] = self._safe_val(etf_ret, 6) if pd.notna(etf_ret) else 0.0 + else: + asset['etf_return_ctc'] = 0.0 + + # 溢价率 + if code in premium_dict: + premium_s = premium_dict[code] + if date in premium_s.index: + premium_val = premium_s.loc[date] + asset['premium'] = round(float(premium_val), 4) if pd.notna(premium_val) else None + else: + premium_before = premium_s[premium_s.index <= date] + if len(premium_before) > 0: + asset['premium'] = round(float(premium_before.iloc[-1]), 4) + else: + asset['premium'] = None + else: + asset['premium'] = None + + # 持仓状态 + is_held = code in current_holdings + asset['is_held'] = is_held + + if is_held and code in holdings_state: + hs = holdings_state[code] + asset['entry_date'] = hs['entry_date'] + asset['entry_price_etf'] = self._safe_val(hs['entry_price'], 4) + asset['entry_price_idx'] = None + + entry_dt = pd.Timestamp(hs['entry_date']) + trading_days_held = len(trading_calendar[(trading_calendar >= entry_dt) & (trading_calendar <= date)]) + asset['holding_days'] = trading_days_held + + # 累计收益 + if hs['entry_price'] and hs['entry_price'] > 0: + if code in etf_close_dict: + cur = etf_close_dict[code].reindex(trading_calendar, method='ffill').get(date) + if cur and pd.notna(cur): + cum_ret = float(cur) / hs['entry_price'] - 1 + asset['cum_return_etf'] = self._safe_val(cum_ret, 4) + asset['cum_return_idx'] = self._safe_val(cum_ret, 4) + else: + asset['cum_return_etf'] = None + asset['cum_return_idx'] = None + else: + asset['cum_return_etf'] = None + asset['cum_return_idx'] = None + else: + asset['cum_return_etf'] = None + asset['cum_return_idx'] = None + else: + asset['entry_date'] = None + asset['entry_price_etf'] = None + asset['entry_price_idx'] = None + asset['holding_days'] = 0 + asset['cum_return_etf'] = None + asset['cum_return_idx'] = None + + assets[code] = asset + + # 信号 + signal_row = signals.loc[date] if date in signals.index else pd.Series(dtype=float) + active_signals = {code: int(val) for code, val in signal_row.items() if val > 0} + + # 构建日记录 + day_record = { + 'date': date.strftime('%Y-%m-%d'), + 'nav': self._safe_val(equity_curve.loc[date], 4), + 'daily_return': self._safe_val(strategy_returns.loc[date], 6), + 'is_rebalance': is_rebalance, + 'signals': active_signals, + 'holdings': sorted(list(current_holdings)), + 'added': sorted(added), + 'removed': sorted(removed), + 'assets': assets + } + days_list.append(day_record) + prev_holdings = current_holdings + + # 构建元数据 + codes_meta = {} + for code in all_codes: + asset_config = self.config.asset_pools.assets.get(code) + codes_meta[code] = { + 'name': asset_config.name if asset_config else code, + 'etf': asset_config.trade_source if asset_config else None, + 'market': asset_config.group if asset_config else None + } + + output = { + 'meta': { + 'mode': 'V2: 指数信号 + ETF收益', + 'start_date': trading_calendar[0].strftime('%Y-%m-%d'), + 'end_date': trading_calendar[-1].strftime('%Y-%m-%d'), + 'total_days': len(trading_calendar), + 'select_num': self.select_num, + 'n_days': self.config.factor.n_days, + 'trade_cost': self.trade_cost, + 'bond_threshold': { + 'enabled': self.use_dynamic_threshold, + 'bond_code': bond_code, + 'ratio': bond_ratio + }, + 'codes': codes_meta + }, + 'days': days_list + } + + # 输出 + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(output, f, ensure_ascii=False) + + file_size_mb = output_path.stat().st_size / 1024 / 1024 + print(f" 写入 {output_path}") + print(f" 大小: {file_size_mb:.1f} MB") + print(f" 天数: {len(days_list)}") + print(f" 标的: {len(all_codes)}")