From a2b4289080b98d8674ad0efe3a474ae1cf8b4998 Mon Sep 17 00:00:00 2001 From: aszerW Date: Wed, 3 Jun 2026 00:09:29 +0800 Subject: [PATCH] =?UTF-8?q?revert(http):=20=E6=94=B9=E5=9B=9E=E4=B8=B2?= =?UTF-8?q?=E8=A1=8C=E6=95=B0=E6=8D=AE=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 回退并行获取逻辑,恢复简单的串行循环: - 移除 ThreadPoolExecutor 并行代码 - 移除 concurrent.futures 导入 - 保持简单的 for 循环串行获取 --- rotation/simple_rotation.py | 59 +++++++++++-------------------------- 1 file changed, 18 insertions(+), 41 deletions(-) diff --git a/rotation/simple_rotation.py b/rotation/simple_rotation.py index fd1f9ca..a0ceff3 100644 --- a/rotation/simple_rotation.py +++ b/rotation/simple_rotation.py @@ -21,7 +21,6 @@ import pandas as pd from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple -from concurrent.futures import ThreadPoolExecutor, as_completed PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) @@ -368,54 +367,32 @@ class SimpleRotationStrategy: self.trading_calendar: Optional[pd.DatetimeIndex] = None def _preload_data(self): - """Preload all historical data (parallel fetching)""" + """Preload all historical data""" start_date = self.config.backtest.start_date end_date = self.config.backtest.end_date or datetime.now().strftime('%Y-%m-%d') preload_start = (pd.Timestamp(start_date) - timedelta(days=self.n_days * 2)).strftime('%Y-%m-%d') - print(f"\n[1/4] Preloading signal sources (index raw) [{len(self.signal_codes)} codes, parallel]...") - # Parallel fetch signal sources - with ThreadPoolExecutor(max_workers=8) as executor: - futures = {executor.submit(self.data_cache.preload, code, preload_start, end_date, 'raw'): code for code in self.signal_codes} - for future in as_completed(futures): - code = futures[future] - try: - df = future.result() - if df is not None: - self.index_data[code] = df - except Exception as e: - print(f" x {code}: {e}") + print("\n[1/4] Preloading signal sources (index raw)...") + for code in self.signal_codes: + df = self.data_cache.preload(code, preload_start, end_date, adj='raw') + if df is not None: + self.index_data[code] = df print(f"\n Signal: {len(self.index_data)}/{len(self.signal_codes)} OK") - print(f"\n[2/4] Preloading trade sources (ETF hfq) [{len(set(self.signal_to_trade.values()))} codes, parallel]...") + print("\n[2/4] Preloading trade sources (ETF hfq)...") trade_codes = set(self.signal_to_trade.values()) - # Determine adj for each trade code - trade_adj_map = {} for code in trade_codes: - is_bond = any(a.trade_source == code and a.group == 'BOND' for a in self.config.asset_pools.assets.values()) - trade_adj_map[code] = 'raw' if is_bond else 'hfq' - - # Parallel fetch trade sources - with ThreadPoolExecutor(max_workers=8) as executor: - futures = {executor.submit(self.data_cache.preload, code, preload_start, end_date, trade_adj_map[code]): code for code in trade_codes} - for future in as_completed(futures): - code = futures[future] - try: - df = future.result() - if df is not None: - self.etf_data[code] = df - except Exception as e: - print(f" x {code}: {e}") - - # Parallel fetch premium data - with ThreadPoolExecutor(max_workers=8) as executor: - futures = {executor.submit(self.data_cache.preload_premium, code, end_date): code for code in trade_codes} - for future in as_completed(futures): - code = futures[future] - try: - future.result() - except Exception: - pass + is_bond = any( + a.trade_source == code and a.group == 'BOND' + for a in self.config.asset_pools.assets.values() + ) + adj = 'raw' if is_bond else 'hfq' + df = self.data_cache.preload(code, preload_start, end_date, adj=adj) + if df is not None: + self.etf_data[code] = df + # Load premium data cache for all ETF trade codes + for code in trade_codes: + self.data_cache.preload_premium(code, end_date=end_date) print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded") # Load benchmark