""" 统一数据获取器 分层架构:对外统一接口,对内按资产类型独立实现 支持:A股指数/ETF、港股指数、美股指数、期货、加密货币 用法: from datasource import UniversalDataFetcher fetcher = UniversalDataFetcher() # 单标的获取(自动识别类型) df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31") # ETF获取(含净值) price_df, nav_df = fetcher.fetch_etf_with_nav("513100.SH", "2024-01-01", "2024-12-31") # 批量获取 results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31") """ import os import time from typing import Optional, Dict, List, Tuple from datetime import datetime import pandas as pd from .tushare_source import TushareSource from .yfinance_source import YFinanceSource from .ssh_tunnel import SSHTunnelManager from .asset_type_detector import AssetTypeDetector, AssetType class UniversalDataFetcher: """ 统一数据获取器 分层架构: - 对外:统一 fetch() 接口,自动路由 - 对内:各资产类型独立方法,职责单一 """ def __init__( self, ssh_config: Optional[Dict] = None, use_cache: bool = True, cache_dir: str = "data/etf_cache/daily" ): """ 初始化 Args: ssh_config: SSH隧道配置(用于港美股) use_cache: 是否使用本地缓存 cache_dir: 缓存目录 """ self.ssh_config = ssh_config or {} self.use_cache = use_cache self.cache_dir = cache_dir # 数据源实例 self._tushare = TushareSource() self._yfinance = YFinanceSource() # SSH隧道(延迟初始化) self._tunnel: Optional[SSHTunnelManager] = None self._tunnel_started = False def __enter__(self): """上下文管理器入口""" self._start_tunnel() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器退出""" self._stop_tunnel() # ============================================================ # SSH隧道管理 # ============================================================ def _start_tunnel(self) -> bool: """启动SSH隧道""" if self._tunnel_started: return True if self.ssh_config.get('enabled'): self._tunnel = SSHTunnelManager(self.ssh_config) if self._tunnel.start(): self._tunnel_started = True return True return False return True def _stop_tunnel(self): """停止SSH隧道""" if self._tunnel: self._tunnel.stop() self._tunnel = None self._tunnel_started = False # ============================================================ # 统一入口(自动路由) # ============================================================ def fetch( self, code: str, start_date: str, end_date: str, retry: int = 3 ) -> Optional[pd.DataFrame]: """ 统一数据获取入口 自动识别资产类型并路由到对应方法 Args: code: 标的代码 start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' retry: 重试次数 Returns: DataFrame with columns: date, open, high, low, close, volume """ asset_type = AssetTypeDetector.detect(code) for attempt in range(retry): try: # 路由到具体方法 if asset_type == AssetType.CHINA_INDEX: return self._fetch_china_index(code, start_date, end_date) elif asset_type == AssetType.CHINA_ETF: return self._fetch_china_etf(code, start_date, end_date) elif asset_type == AssetType.US_INDEX: return self._fetch_us_index(code, start_date, end_date) elif asset_type == AssetType.HK_INDEX: return self._fetch_hk_index(code, start_date, end_date) elif asset_type == AssetType.FUTURES: return self._fetch_futures(code, start_date, end_date) elif asset_type == AssetType.CRYPTO: return self._fetch_crypto(code, start_date, end_date) else: print(f"⚠️ 未知资产类型: {code} -> {asset_type}") return None except Exception as e: if attempt < retry - 1: time.sleep(2) else: print(f"✗ 获取 {code} 失败 (尝试 {attempt+1}/{retry}): {e}") return None return None # ============================================================ # 分层实现:各资产类型独立方法 # ============================================================ def _fetch_china_index( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取A股指数 特点:Tushare API,无需SSH隧道 """ return self._tushare.fetch_index(code, start_date, end_date) def _fetch_china_etf( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取A股ETF价格 特点:Tushare fund_daily接口 """ return self._tushare.fetch_etf(code, start_date, end_date) def fetch_etf_with_nav( self, code: str, start_date: str, end_date: str ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]: """ 获取ETF价格 + 净值 + 溢价率序列 计算每一天的溢价率,用于分析溢价率走势 Args: code: ETF代码 start_date: 开始日期 end_date: 结束日期 Returns: (price_df, nav_df, premium_series) - price_df: ETF价格数据 (OHLCV) - nav_df: ETF净值数据 - premium_series: 溢价率序列 (每天计算) """ price_df = self._tushare.fetch_etf(code, start_date, end_date) nav_df = self._tushare.fetch_etf_nav(code, start_date, end_date) # 计算历史溢价率序列 premium_series = None if price_df is not None and nav_df is not None and len(nav_df) > 0: premium_series = self._calculate_premium_series(price_df, nav_df) return price_df, nav_df, premium_series def _calculate_premium_series( self, price_df: pd.DataFrame, nav_df: pd.DataFrame ) -> Optional[pd.Series]: """ 计算历史溢价率序列 溢价率 = (ETF收盘价 - ETF净值) / ETF净值 注意:净值数据通常T+1公布,需要处理日期对齐问题 Args: price_df: ETF价格数据(索引为日期) nav_df: ETF净值数据(索引为日期) Returns: 溢价率Series(索引为日期,值为溢价率) """ # 对齐日期:净值用ffill填充(因为T+1公布) # 价格日期可能比净值日期多一天 aligned_nav = nav_df['nav'].reindex(price_df.index, method='ffill') # 计算溢价率 close_prices = price_df['close'] premium = (close_prices - aligned_nav) / aligned_nav # 过滤掉无效值(净值缺失的日期) premium = premium.dropna() return premium def _fetch_us_index( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取美股指数 特点:YFinance,需要SSH隧道,指数代码转换 """ self._start_tunnel() return self._yfinance.fetch(code, start_date, end_date) def _fetch_hk_index( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取港股指数 特点:YFinance,需要SSH隧道 """ self._start_tunnel() return self._yfinance.fetch(code, start_date, end_date) def _fetch_futures( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取期货数据 特点: - 中国期货(.SHF/.DCE/.CZC): Tushare - NYMEX(.NYM): YFinance """ if code.endswith('.NYM'): # NYMEX期货走YFinance self._start_tunnel() return self._yfinance.fetch(code, start_date, end_date) else: # 中国期货走Tushare return self._tushare.fetch_futures(code, start_date, end_date) def _fetch_crypto( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取加密货币 特点:CCXT,不支持SOCKS5代理 TODO: 实现加密货币获取 """ print(f"⚠️ 加密货币数据获取尚未实现: {code}") return None # ============================================================ # 批量获取 # ============================================================ def fetch_batch( self, codes: List[str], start_date: str, end_date: str, retry: int = 3 ) -> Dict[str, Optional[pd.DataFrame]]: """ 批量获取多只标的数据 Args: codes: 代码列表 start_date: 开始日期 end_date: 结束日期 Returns: {code: DataFrame} """ results = {} # 按资产类型分组 grouped = AssetTypeDetector.group_by_type(codes) print(f"开始获取 {len(codes)} 只标的...") for asset_type, code_list in grouped.items(): print(f" {asset_type.value}: {len(code_list)} 只") # 启动隧道(港美股需要) self._start_tunnel() for code in codes: results[code] = self.fetch(code, start_date, end_date, retry) return results # ============================================================ # 辅助方法 # ============================================================ def get_asset_type(self, code: str) -> AssetType: """获取资产类型""" return AssetTypeDetector.detect(code) def is_supported(self, code: str) -> bool: """判断是否支持该代码""" return AssetTypeDetector.detect(code) != AssetType.UNKNOWN