revert(http): 改回串行数据获取

回退并行获取逻辑,恢复简单的串行循环:
- 移除 ThreadPoolExecutor 并行代码
- 移除 concurrent.futures 导入
- 保持简单的 for 循环串行获取
This commit is contained in:
2026-06-03 00:09:29 +08:00
parent e29f57749d
commit a2b4289080

View File

@@ -21,7 +21,6 @@ import pandas as pd
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
PROJECT_ROOT = Path(__file__).parent.parent PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT)) sys.path.insert(0, str(PROJECT_ROOT))
@@ -368,54 +367,32 @@ class SimpleRotationStrategy:
self.trading_calendar: Optional[pd.DatetimeIndex] = None self.trading_calendar: Optional[pd.DatetimeIndex] = None
def _preload_data(self): def _preload_data(self):
"""Preload all historical data (parallel fetching)""" """Preload all historical data"""
start_date = self.config.backtest.start_date start_date = self.config.backtest.start_date
end_date = self.config.backtest.end_date or datetime.now().strftime('%Y-%m-%d') end_date = self.config.backtest.end_date or datetime.now().strftime('%Y-%m-%d')
preload_start = (pd.Timestamp(start_date) - timedelta(days=self.n_days * 2)).strftime('%Y-%m-%d') preload_start = (pd.Timestamp(start_date) - timedelta(days=self.n_days * 2)).strftime('%Y-%m-%d')
print(f"\n[1/4] Preloading signal sources (index raw) [{len(self.signal_codes)} codes, parallel]...") print("\n[1/4] Preloading signal sources (index raw)...")
# Parallel fetch signal sources for code in self.signal_codes:
with ThreadPoolExecutor(max_workers=8) as executor: df = self.data_cache.preload(code, preload_start, end_date, adj='raw')
futures = {executor.submit(self.data_cache.preload, code, preload_start, end_date, 'raw'): code for code in self.signal_codes} if df is not None:
for future in as_completed(futures): self.index_data[code] = df
code = futures[future]
try:
df = future.result()
if df is not None:
self.index_data[code] = df
except Exception as e:
print(f" x {code}: {e}")
print(f"\n Signal: {len(self.index_data)}/{len(self.signal_codes)} OK") print(f"\n Signal: {len(self.index_data)}/{len(self.signal_codes)} OK")
print(f"\n[2/4] Preloading trade sources (ETF hfq) [{len(set(self.signal_to_trade.values()))} codes, parallel]...") print("\n[2/4] Preloading trade sources (ETF hfq)...")
trade_codes = set(self.signal_to_trade.values()) trade_codes = set(self.signal_to_trade.values())
# Determine adj for each trade code
trade_adj_map = {}
for code in trade_codes: for code in trade_codes:
is_bond = any(a.trade_source == code and a.group == 'BOND' for a in self.config.asset_pools.assets.values()) is_bond = any(
trade_adj_map[code] = 'raw' if is_bond else 'hfq' a.trade_source == code and a.group == 'BOND'
for a in self.config.asset_pools.assets.values()
# Parallel fetch trade sources )
with ThreadPoolExecutor(max_workers=8) as executor: adj = 'raw' if is_bond else 'hfq'
futures = {executor.submit(self.data_cache.preload, code, preload_start, end_date, trade_adj_map[code]): code for code in trade_codes} df = self.data_cache.preload(code, preload_start, end_date, adj=adj)
for future in as_completed(futures): if df is not None:
code = futures[future] self.etf_data[code] = df
try: # Load premium data cache for all ETF trade codes
df = future.result() for code in trade_codes:
if df is not None: self.data_cache.preload_premium(code, end_date=end_date)
self.etf_data[code] = df
except Exception as e:
print(f" x {code}: {e}")
# Parallel fetch premium data
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(self.data_cache.preload_premium, code, end_date): code for code in trade_codes}
for future in as_completed(futures):
code = futures[future]
try:
future.result()
except Exception:
pass
print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded") print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded")
# Load benchmark # Load benchmark