From 5e11b6b690f9cccc7b44665ffbcf275f4c3dd76f Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 1 Jun 2026 23:56:18 +0800 Subject: [PATCH] =?UTF-8?q?fix(rotation):=20=E6=BA=A2=E4=BB=B7=E7=8E=87?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E5=A2=9E=E5=8A=A0=E5=A2=9E=E9=87=8F=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - preload_premium: 检查缓存日期范围,不足时增量拉取 - 新增 _fetch_premium_api: 拉取并合并新溢价率数据 - 调用时传入 end_date 触发增量检查 修复前: premium CSV存在即返回旧数据,明天9点运行时拿不到最新 修复后: 检测 latest_cached < end_date 时自动拉取增量 --- rotation/simple_rotation.py | 54 ++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/rotation/simple_rotation.py b/rotation/simple_rotation.py index 11d11ef..d50a520 100644 --- a/rotation/simple_rotation.py +++ b/rotation/simple_rotation.py @@ -169,22 +169,40 @@ class DataCache: except Exception: pass - def preload_premium(self, code: str, start_date: str = '2000-01-01', end_date: str = None) -> Optional[Dict[str, float]]: - """Load premium data for an ETF code from cache, or fetch from API if not available""" + def preload_premium(self, code: str, end_date: str = None) -> Optional[Dict[str, float]]: + """Load premium data for an ETF code from cache, with incremental update. + If cache exists but doesn't cover end_date, fetches the gap.""" if code in self.premium_data: - return self.premium_data[code] + # Already in memory - check if up-to-date + if end_date: + dates = sorted(self.premium_data[code].keys()) + if dates and dates[-1] >= end_date: + return self.premium_data[code] + else: + return self.premium_data[code] cache_path = self._premium_cache_path(code) if cache_path.exists(): try: df = pd.read_csv(cache_path) if len(df) > 0 and 'date' in df.columns and 'premium' in df.columns: self.premium_data[code] = dict(zip(df['date'].astype(str), df['premium'])) + # Check if cache covers end_date + if end_date: + latest_cached = max(self.premium_data[code].keys()) + if latest_cached >= end_date: + return self.premium_data[code] + # Cache is stale - fetch gap from latest_cached+1 to end_date + fetch_start = (pd.Timestamp(latest_cached) + timedelta(days=1)).strftime('%Y-%m-%d') + self._fetch_premium_api(code, fetch_start, end_date) return self.premium_data[code] except Exception: pass - # No cache: fetch premium_series directly from API (returns full history) - if end_date is None: - end_date = datetime.now().strftime('%Y-%m-%d') + # No cache: fetch full history from API + self._fetch_premium_api(code, '2000-01-01', end_date or datetime.now().strftime('%Y-%m-%d')) + return self.premium_data.get(code) + + def _fetch_premium_api(self, code: str, start_date: str, end_date: str): + """Fetch premium_series from API and merge into cache""" url = f"{self.base_url}{self.api_path}" params = {'code': code, 'start': start_date, 'end': end_date, 'adj': 'raw'} for attempt in range(3): @@ -194,25 +212,25 @@ class DataCache: if attempt < 2: time.sleep(1) continue - return None + return data = resp.json() if 'error' in data: - return None + return premium_series = data.get('premium_series', []) if premium_series: - premium_dict = {item['date']: item['premium'] for item in premium_series} - self.premium_data[code] = premium_dict - self._save_premium_cache(code, premium_dict) - print(f" + premium {code}: {len(premium_dict)} days") - return premium_dict - return None + new_data = {item['date']: item['premium'] for item in premium_series} + if code not in self.premium_data: + self.premium_data[code] = {} + self.premium_data[code].update(new_data) + self._save_premium_cache(code, self.premium_data[code]) + print(f" + premium {code}: +{len(new_data)} days (total {len(self.premium_data[code])})") + return except requests.exceptions.Timeout: if attempt < 2: continue - return None + return except Exception: - return None - return None + return def get_trading_calendar(self, market: str, start_date: str, end_date: str) -> Optional[pd.DatetimeIndex]: """Fetch trading calendar from API""" @@ -330,7 +348,7 @@ class SimpleRotationStrategy: self.etf_data[code] = df # Load premium data cache for all ETF trade codes for code in trade_codes: - self.data_cache.preload_premium(code) + self.data_cache.preload_premium(code, end_date=end_date) print(f"\n Trade: {len(self.etf_data)}/{len(trade_codes)} OK, premium: {len(self.data_cache.premium_data)} loaded") def _compute_momentum(self, signal_code: str, date: pd.Timestamp) -> Optional[float]: