""" 统一数据获取器 分层架构:对外统一接口,对内按资产类型独立实现 支持:A股指数/ETF、港股指数、美股指数、期货、加密货币 用法: from datasource import UniversalDataFetcher fetcher = UniversalDataFetcher() # 单标的获取(自动识别类型) df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31") # ETF获取(含净值,从 df.attrs 提取) df = fetcher.fetch_etf("513100.SH", "2024-01-01", "2024-12-31") nav_df = df.attrs.get('nav') premium_series = df.attrs.get('premium') # 批量获取 results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31") """ import os import time from typing import Optional, Dict, List, Tuple from datetime import datetime import pandas as pd from .tushare_source import TushareSource from .yfinance_source import YFinanceSource from .ssh_tunnel import SSHTunnelManager from .asset_type_detector import AssetTypeDetector, AssetType from .ccxt_source import CCXTSource, get_crypto_source class UniversalDataFetcher: """ 统一数据获取器 分层架构: - 对外:统一 fetch() 接口,自动路由 - 对内:各资产类型独立方法,职责单一 """ @staticmethod def get_ssh_config_from_env() -> Optional[Dict]: """ 从环境变量获取 SSH 配置 Returns: SSH 配置字典或 None """ enabled = os.getenv('SSH_ENABLED', 'false').lower() == 'true' if not enabled: return None return { "enabled": True, "host": os.getenv('SSH_HOST', ''), "port": int(os.getenv('SSH_PORT', '22')), "username": os.getenv('SSH_USERNAME', ''), "key_path": os.getenv('SSH_KEY_PATH', 'hk_ecs.pem'), "local_port": int(os.getenv('SSH_LOCAL_PORT', '1080')), } @classmethod def from_env(cls, **kwargs) -> 'UniversalDataFetcher': """ 从环境变量创建实例 Args: **kwargs: 其他初始化参数(use_cache, cache_dir 等) Returns: UniversalDataFetcher 实例 """ ssh_config = cls.get_ssh_config_from_env() return cls(ssh_config=ssh_config, **kwargs) def __init__( self, ssh_config: Optional[Dict] = None, use_cache: bool = True, cache_dir: str = "data/etf_cache/daily" ): """ 初始化 Args: ssh_config: SSH隧道配置(用于港美股) use_cache: 是否使用本地缓存 cache_dir: 缓存目录 """ self.ssh_config = ssh_config or {} self.use_cache = use_cache self.cache_dir = cache_dir # 数据源实例 self._tushare = TushareSource() self._yfinance = YFinanceSource() # SSH隧道(延迟初始化) self._tunnel: Optional[SSHTunnelManager] = None self._tunnel_started = False # 加密货币数据源(延迟初始化) self._crypto: Optional[CCXTSource] = None def __enter__(self): """上下文管理器入口""" self._start_tunnel() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器退出""" self._stop_tunnel() # ============================================================ # SSH隧道管理 # ============================================================ def _start_tunnel(self) -> bool: """启动SSH隧道""" if self._tunnel_started: return True if self.ssh_config.get('enabled'): self._tunnel = SSHTunnelManager(self.ssh_config) if self._tunnel.start(): self._tunnel_started = True return True return False return True def _stop_tunnel(self): """停止SSH隧道""" if self._tunnel: self._tunnel.stop() self._tunnel = None self._tunnel_started = False def get_ssh_status(self) -> Dict: """ 获取 SSH 隧道状态 Returns: SSH 状态信息字典 """ enabled = self.ssh_config.get('enabled', False) return { "status": "enabled" if enabled else "disabled", "host": self.ssh_config.get('host', '') if enabled else '', "port": self.ssh_config.get('port', 22) if enabled else None, "tunnel_started": self._tunnel_started, "required_types": [t.value for t in self.SSH_REQUIRED_TYPES], "description": "港美股/加密货币数据获取需要 SSH 隧道", } # ============================================================ # 统一入口(自动路由) # ============================================================ # 各资产类型支持的 adj 参数 VALID_ADJ_BY_TYPE = { AssetType.CHINA_INDEX: ['raw'], # 指数无复权 AssetType.CHINA_ETF: ['raw', 'hfq'], # ETF 仅支持后复权 AssetType.CHINA_STOCK: ['raw', 'qfq', 'hfq'], AssetType.US_INDEX: ['raw'], # 指数无复权 AssetType.US_STOCK: ['raw', 'qfq', 'hfq'], AssetType.HK_INDEX: ['raw'], # 指数无复权 AssetType.HK_STOCK: ['raw', 'qfq', 'hfq'], AssetType.FUTURES: ['raw'], # 期货无复权 AssetType.CRYPTO: ['raw'], # 加密货币无复权 } # 需要 SSH 隧道的资产类型(港美股/加密货币) SSH_REQUIRED_TYPES = { AssetType.US_INDEX, # 美股指数 AssetType.US_STOCK, # 美股股票 AssetType.HK_INDEX, # 港股指数 AssetType.HK_STOCK, # 港股股票 AssetType.CRYPTO, # 加密货币 } def fetch( self, code: str, start_date: str, end_date: str, adj: str = 'raw', retry: int = 3, timeframe: str = '1d' ) -> Optional[pd.DataFrame]: """ 统一数据获取入口(支持 adj 参数) 自动识别资产类型并路由到对应方法 Args: code: 标的代码 start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw' retry: 重试次数 timeframe: K线周期(仅加密货币需要,默认1d) Returns: DataFrame with columns: date, open, high, low, close, volume adj='hfq' 时 A股 ETF 会额外返回 adj_factor, close_hfq 示例: # 原始价格 df = fetcher.fetch("000300.SH", "2020-01-01", "2024-12-31") # A股股票后复权 df = fetcher.fetch("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq') # 美股股票前复权 df = fetcher.fetch("AAPL", "2020-01-01", "2024-12-31", adj='qfq') """ # 校验 adj 参数 if adj not in ['raw', 'qfq', 'hfq']: raise ValueError(f"adj 参数必须是 'raw', 'qfq' 或 'hfq',当前: {adj}") asset_type = AssetTypeDetector.detect(code) # 校验 adj 是否适用于该资产类型 valid_adj = self.VALID_ADJ_BY_TYPE.get(asset_type, ['raw']) if adj not in valid_adj: raise ValueError( f"adj='{adj}' 不适用于 {asset_type.value},支持的类型: {valid_adj}" ) # 统一启动 SSH 隧道(港美股/加密货币需要) if asset_type in self.SSH_REQUIRED_TYPES: self._start_tunnel() for attempt in range(retry): try: # 路由到具体方法(无需重复调用 _start_tunnel) if asset_type == AssetType.CHINA_INDEX: return self._tushare.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.CHINA_ETF: return self._tushare.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.CHINA_STOCK: return self._tushare.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.US_INDEX: return self._yfinance.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.US_STOCK: return self._yfinance.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.HK_INDEX: return self._yfinance.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.HK_STOCK: return self._yfinance.fetch(code, start_date, end_date, adj) elif asset_type == AssetType.FUTURES: return self._fetch_futures(code, start_date, end_date, adj) elif asset_type == AssetType.CRYPTO: return self._fetch_crypto(code, start_date, end_date, adj, timeframe) else: print(f"⚠️ 未知资产类型: {code} -> {asset_type}") return None except Exception as e: if attempt < retry - 1: time.sleep(2) else: print(f"✗ 获取 {code} adj={adj} 失败 (尝试 {attempt+1}/{retry}): {e}") return None return None # ============================================================ # 分层实现:各资产类型独立方法 # ============================================================ def _fetch_china_index( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取A股指数 特点:Tushare API,无需SSH隧道 """ return self._tushare.fetch_index(code, start_date, end_date) def _fetch_china_etf( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取A股ETF价格 特点:Tushare fund_daily接口 """ return self._tushare.fetch_etf(code, start_date, end_date) def fetch_etf_with_nav( self, code: str, start_date: str, end_date: str ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]: """ 获取ETF价格 + 净值 + 溢价率序列(⚠️ DEPRECATED 兼容旧接口) ⚠️ 推荐使用新接口: df = fetcher.fetch_etf(code, start_date, end_date, adj='raw') nav_df = df.attrs.get('nav') premium_series = df.attrs.get('premium') Args: code: ETF代码 start_date: 开始日期 end_date: 结束日期 Returns: (price_df, nav_df, premium_series) - price_df: ETF价格数据 (OHLCV) - nav_df: ETF净值数据(来自 df.attrs['nav']) - premium_series: 溢价率序列(来自 df.attrs['premium']) """ import warnings warnings.warn( "fetch_etf_with_nav() is deprecated. " "Use fetch_etf(code, start, end, adj='raw') and access df.attrs['nav'] and df.attrs['premium'] instead.", DeprecationWarning, stacklevel=2 ) # 调用统一的 fetch_etf() 方法 df = self._tushare.fetch_etf(code, start_date, end_date, adj='raw') if df is None: return None, None, None # 从 attrs 提取元数据 nav_df = df.attrs.get('nav') premium_series = df.attrs.get('premium') return df, nav_df, premium_series # ============================================================ # 内部方法:特殊资产类型(保留) # ============================================================ def _fetch_futures( self, code: str, start_date: str, end_date: str, adj: str = 'raw' ) -> Optional[pd.DataFrame]: """ 获取期货数据 特点: - 中国期货(.SHF/.DCE/.CZC): Tushare - NYMEX(.NYM): YFinance - 期货不支持复权(adj 只能为 'raw') """ # 期货不支持复权 if adj != 'raw': raise ValueError(f"期货不支持复权,adj='{adj}' 仅适用于股票/ETF") if code.endswith('.NYM'): # NYMEX期货走YFinance self._start_tunnel() return self._yfinance.fetch(code, start_date, end_date, adj='raw') else: # 中国期货走Tushare return self._tushare.fetch_futures(code, start_date, end_date) def _fetch_crypto( self, code: str, start_date: str, end_date: str, adj: str = 'raw', timeframe: str = '1d' ) -> Optional[pd.DataFrame]: """ 获取加密货币 特点: - 使用 CCXT + OKX - 需要通过 socks2http 将 SOCKS5 转 HTTP 代理 - 必须指定 timeframe - 不缓存(每次实时下载) - 加密货币仅支持 adj='raw'(无复权) Args: code: 加密货币代码(BTC, ETH) start_date: 开始日期 end_date: 结束日期 adj: 复权类型(仅支持 'raw') timeframe: K线周期(1d, 1h, 4h, 15m, 1m) """ # 延迟初始化加密货币数据源 if self._crypto is None: # 使用 SSH 隧道的 SOCKS5 端口 socks_port = self.ssh_config.get('local_port', 1080) self._crypto = get_crypto_source(socks_port=socks_port) return self._crypto.fetch(code, start_date, end_date, adj, timeframe) # ============================================================ # 批量获取 # ============================================================ def fetch_batch( self, codes: List[str], start_date: str, end_date: str, retry: int = 3 ) -> Dict[str, Optional[pd.DataFrame]]: """ 批量获取多只标的数据 Args: codes: 代码列表 start_date: 开始日期 end_date: 结束日期 Returns: {code: DataFrame} """ results = {} # 按资产类型分组 grouped = AssetTypeDetector.group_by_type(codes) print(f"开始获取 {len(codes)} 只标的...") for asset_type, code_list in grouped.items(): print(f" {asset_type.value}: {len(code_list)} 只") # 无需单独启动隧道,每个 fetch() 会自动处理 for code in codes: results[code] = self.fetch(code, start_date, end_date, retry) return results # ============================================================ # 辅助方法 # ============================================================ def get_asset_type(self, code: str) -> AssetType: """获取资产类型""" return AssetTypeDetector.detect(code) def is_supported(self, code: str) -> bool: """判断是否支持该代码""" return AssetTypeDetector.detect(code) != AssetType.UNKNOWN # ============================================================ # 扩展层:资产类型特有方法(复权/净值/溢价率) # ============================================================ def fetch_etf_adj( self, code: str, start_date: str, end_date: str ) -> Optional[pd.DataFrame]: """ 获取 A股 ETF 后复权价格(兼容旧接口) 内部调用统一的 fetch_etf(adj='hfq') 方法 Args: code: ETF代码,如 '159915.SZ', '513100.SH' start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' Returns: DataFrame with columns: date, open, close, adj_factor, close_hfq DataFrame.attrs['nav']: 净值 DataFrame DataFrame.attrs['premium']: 溢价率 Series(基于原始价格计算) """ return self._tushare.fetch_etf(code, start_date, end_date, adj='hfq') def fetch_us_adj( self, code: str, start_date: str, end_date: str, adj: str = 'qfq' ) -> Optional[pd.DataFrame]: """ 获取美股复权价格 使用 YFinance,支持前复权(qfq)和后复权(hfq) - 消除拆分(split)和分红(dividend)对价格的影响 - 适用于美股股票/ETF Args: code: 美股代码,如 'AAPL', 'TSLA', 'QQQ' start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq' Returns: DataFrame with columns: date, open, high, low, close, volume (复权后) 示例: # 苹果复权价格(包含分红和拆分调整) df = fetcher.fetch_us_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq') """ # 直接调用 fetch(),自动处理 SSH 隧道 return self.fetch(code, start_date, end_date, adj) def fetch_hk_adj( self, code: str, start_date: str, end_date: str, adj: str = 'qfq' ) -> Optional[pd.DataFrame]: """ 获取港股股票复权价格 使用 YFinance,支持前复权(qfq)和后复权(hfq) Args: code: 港股代码,如 '00700.HK', '00941.HK' start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq' Returns: DataFrame with columns: date, open, high, low, close, volume (复权后) """ # 直接调用 fetch(),自动处理 SSH 隧道 return self.fetch(code, start_date, end_date, adj) def fetch_stock_adj( self, code: str, start_date: str, end_date: str, adj: str = 'hfq' ) -> Optional[pd.DataFrame]: """ 获取 A股股票复权价格 使用 Tushare pro_bar 接口,支持前复权(qfq)和后复权(hfq) Args: code: A股股票代码,如 '000001.SZ', '600000.SH' start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'hfq' Returns: DataFrame with columns: date, open, high, low, close, volume, adj_factor """ return self._tushare.fetch_stock_adj(code, start_date, end_date, adj) # ============================================================ # 统一复权入口(简化版,直接调用 fetch) # ============================================================ def fetch_with_adj( self, code: str, start_date: str, end_date: str, adj: str = 'raw', retry: int = 3 ) -> Optional[pd.DataFrame]: """ 统一复权入口(简化版) 直接调用 fetch(adj=adj),无需重复实现路由逻辑。 Args: code: 标的代码 start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' adj: 复权类型,默认 'raw' retry: 重试次数 Returns: DataFrame,结构因资产类型和 adj 参数略有不同 示例: # A股股票后复权 df = fetcher.fetch_with_adj("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq') # 美股股票前复权 df = fetcher.fetch_with_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq') """ # 直接调用 fetch,传递 adj 参数 return self.fetch(code, start_date, end_date, adj, retry)