Files
etf/datasource/universal_fetcher.py
aszerW b7f7a756b6 refactor: SSH配置完全封装到UniversalDataFetcher
变更内容:

1. UniversalDataFetcher 新增方法:
   - get_ssh_config_from_env(): 从环境变量读取 SSH 配置
   - from_env(): 工厂方法,自动读取环境变量创建实例
   - get_ssh_status(): 返回 SSH 状态信息字典

2. flask_server.py 简化:
   - 移除 get_ssh_config() 函数(18行)
   - 移除 ssh_config 全局变量
   - get_fetcher() 使用 from_env()
   - / 和 /health 路由使用 get_ssh_status()

架构改进:
- SSH 配置逻辑完全封装在 UniversalDataFetcher
- flask_server.py 只依赖 fetcher 接口
- 减少 24 行重复代码
2026-05-23 21:20:43 +08:00

662 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一数据获取器
分层架构:对外统一接口,对内按资产类型独立实现
支持A股指数/ETF、港股指数、美股指数、期货、加密货币
用法:
from datasource import UniversalDataFetcher
fetcher = UniversalDataFetcher()
# 单标的获取(自动识别类型)
df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31")
# ETF获取含净值
price_df, nav_df = fetcher.fetch_etf_with_nav("513100.SH", "2024-01-01", "2024-12-31")
# 批量获取
results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31")
"""
import os
import time
from typing import Optional, Dict, List, Tuple
from datetime import datetime
import pandas as pd
from .tushare_source import TushareSource
from .yfinance_source import YFinanceSource
from .ssh_tunnel import SSHTunnelManager
from .asset_type_detector import AssetTypeDetector, AssetType
from .ccxt_source import CCXTSource, get_crypto_source
class UniversalDataFetcher:
"""
统一数据获取器
分层架构:
- 对外:统一 fetch() 接口,自动路由
- 对内:各资产类型独立方法,职责单一
"""
@staticmethod
def get_ssh_config_from_env() -> Optional[Dict]:
"""
从环境变量获取 SSH 配置
Returns:
SSH 配置字典或 None
"""
enabled = os.getenv('SSH_ENABLED', 'false').lower() == 'true'
if not enabled:
return None
return {
"enabled": True,
"host": os.getenv('SSH_HOST', ''),
"port": int(os.getenv('SSH_PORT', '22')),
"username": os.getenv('SSH_USERNAME', ''),
"key_path": os.getenv('SSH_KEY_PATH', 'hk_ecs.pem'),
"local_port": int(os.getenv('SSH_LOCAL_PORT', '1080')),
}
@classmethod
def from_env(cls, **kwargs) -> 'UniversalDataFetcher':
"""
从环境变量创建实例
Args:
**kwargs: 其他初始化参数use_cache, cache_dir 等)
Returns:
UniversalDataFetcher 实例
"""
ssh_config = cls.get_ssh_config_from_env()
return cls(ssh_config=ssh_config, **kwargs)
def __init__(
self,
ssh_config: Optional[Dict] = None,
use_cache: bool = True,
cache_dir: str = "data/etf_cache/daily"
):
"""
初始化
Args:
ssh_config: SSH隧道配置用于港美股
use_cache: 是否使用本地缓存
cache_dir: 缓存目录
"""
self.ssh_config = ssh_config or {}
self.use_cache = use_cache
self.cache_dir = cache_dir
# 数据源实例
self._tushare = TushareSource()
self._yfinance = YFinanceSource()
# SSH隧道延迟初始化
self._tunnel: Optional[SSHTunnelManager] = None
self._tunnel_started = False
# 加密货币数据源(延迟初始化)
self._crypto: Optional[CCXTSource] = None
def __enter__(self):
"""上下文管理器入口"""
self._start_tunnel()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self._stop_tunnel()
# ============================================================
# SSH隧道管理
# ============================================================
def _start_tunnel(self) -> bool:
"""启动SSH隧道"""
if self._tunnel_started:
return True
if self.ssh_config.get('enabled'):
self._tunnel = SSHTunnelManager(self.ssh_config)
if self._tunnel.start():
self._tunnel_started = True
return True
return False
return True
def _stop_tunnel(self):
"""停止SSH隧道"""
if self._tunnel:
self._tunnel.stop()
self._tunnel = None
self._tunnel_started = False
def get_ssh_status(self) -> Dict:
"""
获取 SSH 隧道状态
Returns:
SSH 状态信息字典
"""
enabled = self.ssh_config.get('enabled', False)
return {
"status": "enabled" if enabled else "disabled",
"host": self.ssh_config.get('host', '') if enabled else '',
"port": self.ssh_config.get('port', 22) if enabled else None,
"tunnel_started": self._tunnel_started,
"required_types": [t.value for t in self.SSH_REQUIRED_TYPES],
"description": "港美股/加密货币数据获取需要 SSH 隧道",
}
# ============================================================
# 统一入口(自动路由)
# ============================================================
# 各资产类型支持的 adj 参数
VALID_ADJ_BY_TYPE = {
AssetType.CHINA_INDEX: ['raw'], # 指数无复权
AssetType.CHINA_ETF: ['raw', 'hfq'], # ETF 仅支持后复权
AssetType.CHINA_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.US_INDEX: ['raw'], # 指数无复权
AssetType.US_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.HK_INDEX: ['raw'], # 指数无复权
AssetType.HK_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.FUTURES: ['raw'], # 期货无复权
AssetType.CRYPTO: ['raw'], # 加密货币无复权
}
# 需要 SSH 隧道的资产类型(港美股/加密货币)
SSH_REQUIRED_TYPES = {
AssetType.US_INDEX, # 美股指数
AssetType.US_STOCK, # 美股股票
AssetType.HK_INDEX, # 港股指数
AssetType.HK_STOCK, # 港股股票
AssetType.CRYPTO, # 加密货币
}
def fetch(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
统一数据获取入口(支持 adj 参数)
自动识别资产类型并路由到对应方法
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
retry: 重试次数
timeframe: K线周期仅加密货币需要默认1d
Returns:
DataFrame with columns: date, open, high, low, close, volume
adj='hfq' 时 A股 ETF 会额外返回 adj_factor, close_hfq
示例:
# 原始价格
df = fetcher.fetch("000300.SH", "2020-01-01", "2024-12-31")
# A股股票后复权
df = fetcher.fetch("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 校验 adj 参数
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'raw', 'qfq''hfq',当前: {adj}")
asset_type = AssetTypeDetector.detect(code)
# 校验 adj 是否适用于该资产类型
valid_adj = self.VALID_ADJ_BY_TYPE.get(asset_type, ['raw'])
if adj not in valid_adj:
raise ValueError(
f"adj='{adj}' 不适用于 {asset_type.value},支持的类型: {valid_adj}"
)
# 统一启动 SSH 隧道(港美股/加密货币需要)
if asset_type in self.SSH_REQUIRED_TYPES:
self._start_tunnel()
for attempt in range(retry):
try:
# 路由到具体方法(无需重复调用 _start_tunnel
if asset_type == AssetType.CHINA_INDEX:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_ETF:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_STOCK:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_INDEX:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_STOCK:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_INDEX:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_STOCK:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.FUTURES:
return self._fetch_futures(code, start_date, end_date, adj)
elif asset_type == AssetType.CRYPTO:
return self._fetch_crypto(code, start_date, end_date, timeframe)
else:
print(f"⚠️ 未知资产类型: {code} -> {asset_type}")
return None
except Exception as e:
if attempt < retry - 1:
time.sleep(2)
else:
print(f"✗ 获取 {code} adj={adj} 失败 (尝试 {attempt+1}/{retry}): {e}")
return None
return None
# ============================================================
# 分层实现:各资产类型独立方法
# ============================================================
def _fetch_china_index(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股指数
特点Tushare API无需SSH隧道
"""
return self._tushare.fetch_index(code, start_date, end_date)
def _fetch_china_etf(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股ETF价格
特点Tushare fund_daily接口
"""
return self._tushare.fetch_etf(code, start_date, end_date)
def fetch_etf_with_nav(
self,
code: str,
start_date: str,
end_date: str
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]:
"""
获取ETF价格 + 净值 + 溢价率序列
计算每一天的溢价率,用于分析溢价率走势
Args:
code: ETF代码
start_date: 开始日期
end_date: 结束日期
Returns:
(price_df, nav_df, premium_series)
- price_df: ETF价格数据 (OHLCV)
- nav_df: ETF净值数据
- premium_series: 溢价率序列 (每天计算)
"""
price_df = self._tushare.fetch_etf(code, start_date, end_date)
nav_df = self._tushare.fetch_etf_nav(code, start_date, end_date)
# 计算历史溢价率序列
premium_series = None
if price_df is not None and nav_df is not None and len(nav_df) > 0:
premium_series = self._calculate_premium_series(price_df, nav_df)
return price_df, nav_df, premium_series
def _calculate_premium_series(
self,
price_df: pd.DataFrame,
nav_df: pd.DataFrame
) -> Optional[pd.Series]:
"""
计算历史溢价率序列
溢价率 = (ETF收盘价 - ETF净值) / ETF净值
关键不同QDII基金净值披露规则不同
- 部分基金净值当天披露如日经ETF价格日期=净值日期
- 部分基金净值T+1披露如纳指ETF价格日期配T-1日净值
集思录做法:根据基金特性选择匹配方式
- 如果有当天净值数据,优先使用当天净值
- 如果当天净值不存在使用T-1日净值
Args:
price_df: ETF价格数据索引为日期
nav_df: ETF净值数据索引为日期
Returns:
溢价率Series索引为价格日期值为溢价率
"""
# 去除重复日期
price_index = price_df.index
if price_index.has_duplicates:
price_df = price_df[~price_df.index.duplicated(keep='last')]
nav_index = nav_df.index
if nav_index.has_duplicates:
nav_df = nav_df[~nav_df.index.duplicated(keep='last')]
# 优先尝试使用当天净值如日经ETF
same_day_dates = price_df.index.intersection(nav_df.index)
# 对于没有当天净值的日期使用T-1日净值如纳指ETF
nav_df_shifted = nav_df.copy()
nav_df_shifted.index = nav_df_shifted.index + pd.Timedelta(days=1)
shifted_dates = price_df.index.intersection(nav_df_shifted.index)
# 排除已有当天净值的日期
t1_dates = shifted_dates.difference(same_day_dates)
premium_data = {}
# 使用当天净值计算
if len(same_day_dates) > 0:
close_same = price_df.loc[same_day_dates, 'close']
nav_same = nav_df.loc[same_day_dates, 'nav']
for date in same_day_dates:
if pd.notna(close_same.loc[date]) and pd.notna(nav_same.loc[date]):
premium_data[date] = (close_same.loc[date] - nav_same.loc[date]) / nav_same.loc[date]
# 使用T-1日净值计算仅用于没有当天净值的日期
if len(t1_dates) > 0:
close_t1 = price_df.loc[t1_dates, 'close']
nav_t1 = nav_df_shifted.loc[t1_dates, 'nav']
for date in t1_dates:
if pd.notna(close_t1.loc[date]) and pd.notna(nav_t1.loc[date]):
premium_data[date] = (close_t1.loc[date] - nav_t1.loc[date]) / nav_t1.loc[date]
if len(premium_data) == 0:
return None
# 构建Series并按日期排序
premium = pd.Series(premium_data)
premium = premium.sort_index()
premium = premium.dropna()
return premium
# ============================================================
# 内部方法:特殊资产类型(保留)
# ============================================================
def _fetch_futures(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw'
) -> Optional[pd.DataFrame]:
"""
获取期货数据
特点:
- 中国期货(.SHF/.DCE/.CZC): Tushare
- NYMEX(.NYM): YFinance
- 期货不支持复权adj 只能为 'raw'
"""
# 期货不支持复权
if adj != 'raw':
raise ValueError(f"期货不支持复权adj='{adj}' 仅适用于股票/ETF")
if code.endswith('.NYM'):
# NYMEX期货走YFinance
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj='raw')
else:
# 中国期货走Tushare
return self._tushare.fetch_futures(code, start_date, end_date)
def _fetch_crypto(
self,
code: str,
start_date: str,
end_date: str,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
获取加密货币
特点:
- 使用 CCXT + OKX
- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理
- 必须指定 timeframe
- 不缓存(每次实时下载)
Args:
code: 加密货币代码BTC, ETH
start_date: 开始日期
end_date: 结束日期
timeframe: K线周期1d, 1h, 4h, 15m, 1m
"""
# 延迟初始化加密货币数据源
if self._crypto is None:
# 使用 SSH 隧道的 SOCKS5 端口
socks_port = self.ssh_config.get('local_port', 1080)
self._crypto = get_crypto_source(socks_port=socks_port)
return self._crypto.fetch(code, start_date, end_date, timeframe)
# ============================================================
# 批量获取
# ============================================================
def fetch_batch(
self,
codes: List[str],
start_date: str,
end_date: str,
retry: int = 3
) -> Dict[str, Optional[pd.DataFrame]]:
"""
批量获取多只标的数据
Args:
codes: 代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
{code: DataFrame}
"""
results = {}
# 按资产类型分组
grouped = AssetTypeDetector.group_by_type(codes)
print(f"开始获取 {len(codes)} 只标的...")
for asset_type, code_list in grouped.items():
print(f" {asset_type.value}: {len(code_list)}")
# 无需单独启动隧道,每个 fetch() 会自动处理
for code in codes:
results[code] = self.fetch(code, start_date, end_date, retry)
return results
# ============================================================
# 辅助方法
# ============================================================
def get_asset_type(self, code: str) -> AssetType:
"""获取资产类型"""
return AssetTypeDetector.detect(code)
def is_supported(self, code: str) -> bool:
"""判断是否支持该代码"""
return AssetTypeDetector.detect(code) != AssetType.UNKNOWN
# ============================================================
# 扩展层:资产类型特有方法(复权/净值/溢价率)
# ============================================================
def fetch_etf_adj(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取 A股 ETF 后复权价格
通过 fund_daily + fund_adj 手动计算后复权价格
- 消除份额折算(拆分)对收益率的影响
- 适用于计算真实收益率
Args:
code: ETF代码'159915.SZ', '513100.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
Returns:
DataFrame with columns: date, open, close, adj_factor, close_hfq
示例:
# 纳指ETF后复权正确计算收益率
df = fetcher.fetch_etf_adj("513100.SH", "2020-01-01", "2024-12-31")
# 使用 close_hfq 计算收益率,而非 close
"""
return self._tushare.fetch_etf_adj(code, start_date, end_date)
def fetch_us_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取美股复权价格
使用 YFinance支持前复权(qfq)和后复权(hfq)
- 消除拆分(split)和分红(dividend)对价格的影响
- 适用于美股股票/ETF
Args:
code: 美股代码,如 'AAPL', 'TSLA', 'QQQ'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
示例:
# 苹果复权价格(包含分红和拆分调整)
df = fetcher.fetch_us_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 直接调用 fetch(),自动处理 SSH 隧道
return self.fetch(code, start_date, end_date, adj)
def fetch_hk_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取港股股票复权价格
使用 YFinance支持前复权(qfq)和后复权(hfq)
Args:
code: 港股代码,如 '00700.HK', '00941.HK'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
"""
# 直接调用 fetch(),自动处理 SSH 隧道
return self.fetch(code, start_date, end_date, adj)
def fetch_stock_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'hfq'
) -> Optional[pd.DataFrame]:
"""
获取 A股股票复权价格
使用 Tushare pro_bar 接口,支持前复权(qfq)和后复权(hfq)
Args:
code: A股股票代码'000001.SZ', '600000.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'hfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume, adj_factor
"""
return self._tushare.fetch_stock_adj(code, start_date, end_date, adj)
# ============================================================
# 统一复权入口(简化版,直接调用 fetch
# ============================================================
def fetch_with_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3
) -> Optional[pd.DataFrame]:
"""
统一复权入口(简化版)
直接调用 fetch(adj=adj),无需重复实现路由逻辑。
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型,默认 'raw'
retry: 重试次数
Returns:
DataFrame结构因资产类型和 adj 参数略有不同
示例:
# A股股票后复权
df = fetcher.fetch_with_adj("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch_with_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 直接调用 fetch传递 adj 参数
return self.fetch(code, start_date, end_date, adj, retry)