Files
etf/datasource/universal_fetcher.py
aszerW 798a316ad5 feat: ETF复权功能扩展至支持前复权qfq
核心变更:
- TushareSource: _fetch_etf_adj() 支持 qfq 和 hfq 双模式
  * 后复权(hfq): close × adj_factor
  * 前复权(qfq): close × adj_factor / latest_factor
- UniversalDataFetcher: VALID_ADJ_BY_TYPE 更新
  * CHINA_ETF: ['raw', 'hfq'] → ['raw', 'qfq', 'hfq']

复权公式验证:
- 纳指ETF(513100.SH): HFQ / QFQ = latest_factor (5.0020) 
- 5/5 个交易日全部通过验证

技术实现:
- fetch_etf_adj(): 公共接口支持 adj='qfq' 或 'hfq'
- _fetch_etf_adj(): 内部实现根据 adj 参数分支计算
- 前复权使用全量最新复权因子确保准确性
2026-05-25 00:15:59 +08:00

698 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一数据获取器
分层架构:对外统一接口,对内按资产类型独立实现
支持A股指数/ETF、港股指数、美股指数、期货、加密货币
用法:
from datasource import UniversalDataFetcher
fetcher = UniversalDataFetcher()
# 单标的获取(自动识别类型)
df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31")
# ETF获取含净值从 df.attrs 提取)
df = fetcher.fetch_etf("513100.SH", "2024-01-01", "2024-12-31")
nav_df = df.attrs.get('nav')
premium_series = df.attrs.get('premium')
# 批量获取
results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31")
"""
import os
import time
from typing import Optional, Dict, List, Tuple, Union
from datetime import datetime
import pandas as pd
from .tushare_source import TushareSource
from .yfinance_source import YFinanceSource
from .ssh_tunnel import SSHTunnelManager
from .asset_type_detector import AssetTypeDetector, AssetType
from .ccxt_source import CCXTSource, get_crypto_source
try:
import pandas_market_calendars as mcal
HAS_PANDAS_MARKET_CALENDARS = True
except ImportError:
HAS_PANDAS_MARKET_CALENDARS = False
class UniversalDataFetcher:
"""
统一数据获取器
分层架构:
- 对外:统一 fetch() 接口,自动路由
- 对内:各资产类型独立方法,职责单一
"""
@staticmethod
def get_ssh_config_from_env() -> Optional[Dict]:
"""
从环境变量获取 SSH 配置
Returns:
SSH 配置字典或 None
"""
enabled = os.getenv('SSH_ENABLED', 'false').lower() == 'true'
if not enabled:
return None
return {
"enabled": True,
"host": os.getenv('SSH_HOST', ''),
"port": int(os.getenv('SSH_PORT', '22')),
"username": os.getenv('SSH_USERNAME', ''),
"key_path": os.getenv('SSH_KEY_PATH', 'hk_ecs.pem'),
"local_port": int(os.getenv('SSH_LOCAL_PORT', '1080')),
}
@classmethod
def from_env(cls, **kwargs) -> 'UniversalDataFetcher':
"""
从环境变量创建实例
Args:
**kwargs: 其他初始化参数use_cache, cache_dir 等)
Returns:
UniversalDataFetcher 实例
"""
ssh_config = cls.get_ssh_config_from_env()
return cls(ssh_config=ssh_config, **kwargs)
def __init__(
self,
ssh_config: Optional[Dict] = None,
use_cache: bool = True,
cache_dir: str = "data/etf_cache/daily"
):
"""
初始化
Args:
ssh_config: SSH隧道配置用于港美股
use_cache: 是否使用本地缓存
cache_dir: 缓存目录
"""
self.ssh_config = ssh_config or {}
self.use_cache = use_cache
self.cache_dir = cache_dir
# 数据源实例
self._tushare = TushareSource()
self._yfinance = YFinanceSource()
# SSH隧道延迟初始化
self._tunnel: Optional[SSHTunnelManager] = None
self._tunnel_started = False
# 加密货币数据源(延迟初始化)
self._crypto: Optional[CCXTSource] = None
def __enter__(self):
"""上下文管理器入口"""
self._start_tunnel()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self._stop_tunnel()
# ============================================================
# SSH隧道管理
# ============================================================
def _start_tunnel(self) -> bool:
"""启动SSH隧道"""
if self._tunnel_started:
return True
if self.ssh_config.get('enabled'):
self._tunnel = SSHTunnelManager(self.ssh_config)
if self._tunnel.start():
self._tunnel_started = True
return True
return False
return True
def _stop_tunnel(self):
"""停止SSH隧道"""
if self._tunnel:
self._tunnel.stop()
self._tunnel = None
self._tunnel_started = False
def get_ssh_status(self) -> Dict:
"""
获取 SSH 隧道状态
Returns:
SSH 状态信息字典
"""
enabled = self.ssh_config.get('enabled', False)
return {
"status": "enabled" if enabled else "disabled",
"host": self.ssh_config.get('host', '') if enabled else '',
"port": self.ssh_config.get('port', 22) if enabled else None,
"tunnel_started": self._tunnel_started,
"required_types": [t.value for t in self.SSH_REQUIRED_TYPES],
"description": "港美股/加密货币数据获取需要 SSH 隧道",
}
# ============================================================
# 统一入口(自动路由)
# ============================================================
# 各资产类型支持的 adj 参数
VALID_ADJ_BY_TYPE = {
AssetType.CHINA_INDEX: ['raw'], # 指数无复权
AssetType.CHINA_ETF: ['raw', 'qfq', 'hfq'], # ETF 支持前复权/后复权
AssetType.CHINA_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.US_INDEX: ['raw'], # 指数无复权
AssetType.US_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.HK_INDEX: ['raw'], # 指数无复权
AssetType.HK_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.FUTURES: ['raw'], # 期货无复权
AssetType.CRYPTO: ['raw'], # 加密货币无复权
}
# 需要 SSH 隧道的资产类型(港美股/加密货币)
SSH_REQUIRED_TYPES = {
AssetType.US_INDEX, # 美股指数
AssetType.US_STOCK, # 美股股票
AssetType.HK_INDEX, # 港股指数
AssetType.HK_STOCK, # 港股股票
AssetType.CRYPTO, # 加密货币
}
def fetch(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
统一数据获取入口(支持 adj 参数)
自动识别资产类型并路由到对应方法
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
retry: 重试次数
timeframe: K线周期仅加密货币需要默认1d
Returns:
DataFrame with columns: date, open, high, low, close, volume
adj='hfq' 时 A股 ETF 会额外返回 adj_factor, close_hfq
示例:
# 原始价格
df = fetcher.fetch("000300.SH", "2020-01-01", "2024-12-31")
# A股股票后复权
df = fetcher.fetch("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 校验 adj 参数
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'raw', 'qfq''hfq',当前: {adj}")
asset_type = AssetTypeDetector.detect(code)
# 校验 adj 是否适用于该资产类型
valid_adj = self.VALID_ADJ_BY_TYPE.get(asset_type, ['raw'])
if adj not in valid_adj:
raise ValueError(
f"adj='{adj}' 不适用于 {asset_type.value},支持的类型: {valid_adj}"
)
# 统一启动 SSH 隧道(港美股/加密货币需要)
if asset_type in self.SSH_REQUIRED_TYPES:
self._start_tunnel()
for attempt in range(retry):
try:
# 路由到具体方法(无需重复调用 _start_tunnel
if asset_type == AssetType.CHINA_INDEX:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_ETF:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_STOCK:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_INDEX:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_STOCK:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_INDEX:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_STOCK:
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.FUTURES:
return self._fetch_futures(code, start_date, end_date, adj)
elif asset_type == AssetType.CRYPTO:
return self._fetch_crypto(code, start_date, end_date, adj, timeframe)
else:
print(f"⚠️ 未知资产类型: {code} -> {asset_type}")
return None
except Exception as e:
if attempt < retry - 1:
time.sleep(2)
else:
print(f"✗ 获取 {code} adj={adj} 失败 (尝试 {attempt+1}/{retry}): {e}")
return None
return None
# ============================================================
# 分层实现:各资产类型独立方法
# ============================================================
def _fetch_china_index(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股指数
特点Tushare API无需SSH隧道
"""
return self._tushare.fetch_index(code, start_date, end_date)
def _fetch_china_etf(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股ETF价格
特点Tushare fund_daily接口
"""
return self._tushare.fetch_etf(code, start_date, end_date)
def fetch_etf_with_nav(
self,
code: str,
start_date: str,
end_date: str
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]:
"""
获取ETF价格 + 净值 + 溢价率序列(⚠️ DEPRECATED 兼容旧接口)
⚠️ 推荐使用新接口:
df = fetcher.fetch_etf(code, start_date, end_date, adj='raw')
nav_df = df.attrs.get('nav')
premium_series = df.attrs.get('premium')
Args:
code: ETF代码
start_date: 开始日期
end_date: 结束日期
Returns:
(price_df, nav_df, premium_series)
- price_df: ETF价格数据 (OHLCV)
- nav_df: ETF净值数据来自 df.attrs['nav']
- premium_series: 溢价率序列(来自 df.attrs['premium']
"""
import warnings
warnings.warn(
"fetch_etf_with_nav() is deprecated. "
"Use fetch_etf(code, start, end, adj='raw') and access df.attrs['nav'] and df.attrs['premium'] instead.",
DeprecationWarning,
stacklevel=2
)
# 调用统一的 fetch_etf() 方法
df = self._tushare.fetch_etf(code, start_date, end_date, adj='raw')
if df is None:
return None, None, None
# 从 attrs 提取元数据
nav_df = df.attrs.get('nav')
premium_series = df.attrs.get('premium')
return df, nav_df, premium_series
# ============================================================
# 内部方法:特殊资产类型(保留)
# ============================================================
def _fetch_futures(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw'
) -> Optional[pd.DataFrame]:
"""
获取期货数据
特点:
- 中国期货(.SHF/.DCE/.CZC): Tushare
- NYMEX(.NYM): YFinance
- 期货不支持复权adj 只能为 'raw'
"""
# 期货不支持复权
if adj != 'raw':
raise ValueError(f"期货不支持复权adj='{adj}' 仅适用于股票/ETF")
if code.endswith('.NYM'):
# NYMEX期货走YFinance
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj='raw')
else:
# 中国期货走Tushare
return self._tushare.fetch_futures(code, start_date, end_date)
def _fetch_crypto(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
获取加密货币
特点:
- 使用 CCXT + OKX
- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理
- 必须指定 timeframe
- 不缓存(每次实时下载)
- 加密货币仅支持 adj='raw'(无复权)
Args:
code: 加密货币代码BTC, ETH
start_date: 开始日期
end_date: 结束日期
adj: 复权类型(仅支持 'raw'
timeframe: K线周期1d, 1h, 4h, 15m, 1m
"""
# 延迟初始化加密货币数据源
if self._crypto is None:
# 使用 SSH 隧道的 SOCKS5 端口
socks_port = self.ssh_config.get('local_port', 1080)
self._crypto = get_crypto_source(socks_port=socks_port)
return self._crypto.fetch(code, start_date, end_date, adj, timeframe)
# ============================================================
# 批量获取
# ============================================================
def fetch_batch(
self,
codes: List[str],
start_date: str,
end_date: str,
retry: int = 3
) -> Dict[str, Optional[pd.DataFrame]]:
"""
批量获取多只标的数据
Args:
codes: 代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
{code: DataFrame}
"""
results = {}
# 按资产类型分组
grouped = AssetTypeDetector.group_by_type(codes)
print(f"开始获取 {len(codes)} 只标的...")
for asset_type, code_list in grouped.items():
print(f" {asset_type.value}: {len(code_list)}")
# 无需单独启动隧道,每个 fetch() 会自动处理
for code in codes:
results[code] = self.fetch(code, start_date, end_date, retry)
return results
# ============================================================
# 辅助方法
# ============================================================
def get_asset_type(self, code: str) -> AssetType:
"""获取资产类型"""
return AssetTypeDetector.detect(code)
def is_supported(self, code: str) -> bool:
"""判断是否支持该代码"""
return AssetTypeDetector.detect(code) != AssetType.UNKNOWN
# ============================================================
# 扩展层:资产类型特有方法(复权/净值/溢价率)
# ============================================================
def fetch_etf_adj(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取 A股 ETF 后复权价格(兼容旧接口)
内部调用统一的 fetch_etf(adj='hfq') 方法
Args:
code: ETF代码'159915.SZ', '513100.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
Returns:
DataFrame with columns: date, open, close, adj_factor, close_hfq
DataFrame.attrs['nav']: 净值 DataFrame
DataFrame.attrs['premium']: 溢价率 Series基于原始价格计算
"""
return self._tushare.fetch_etf(code, start_date, end_date, adj='hfq')
def fetch_us_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取美股复权价格
使用 YFinance支持前复权(qfq)和后复权(hfq)
- 消除拆分(split)和分红(dividend)对价格的影响
- 适用于美股股票/ETF
Args:
code: 美股代码,如 'AAPL', 'TSLA', 'QQQ'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
示例:
# 苹果复权价格(包含分红和拆分调整)
df = fetcher.fetch_us_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 直接调用 fetch(),自动处理 SSH 隧道
return self.fetch(code, start_date, end_date, adj)
def fetch_hk_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取港股股票复权价格
使用 YFinance支持前复权(qfq)和后复权(hfq)
Args:
code: 港股代码,如 '00700.HK', '00941.HK'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
"""
# 直接调用 fetch(),自动处理 SSH 隧道
return self.fetch(code, start_date, end_date, adj)
def fetch_stock_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'hfq'
) -> Optional[pd.DataFrame]:
"""
获取 A股股票复权价格
使用 Tushare pro_bar 接口,支持前复权(qfq)和后复权(hfq)
Args:
code: A股股票代码'000001.SZ', '600000.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'hfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume, adj_factor
"""
return self._tushare.fetch_stock_adj(code, start_date, end_date, adj)
# ============================================================
# 交易日历获取
# ============================================================
def get_trading_calendar(
self,
market: str,
start_date: str,
end_date: str
) -> pd.DatetimeIndex:
"""
获取交易日历(支持 A股、美股、港股
Args:
market: 市场代码
- 'A''china': A股上交所/深交所,交易日历一致)
- 'US''us': 美股NYSE
- 'HK''hk': 港股HKEX
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
Returns:
DatetimeIndex: 交易日日期序列
示例:
# A股
cal = fetcher.get_trading_calendar('A', '2024-01-01', '2024-12-31')
# 美股
cal = fetcher.get_trading_calendar('US', '2024-01-01', '2024-12-31')
# 港股
cal = fetcher.get_trading_calendar('HK', '2024-01-01', '2024-12-31')
"""
if not HAS_PANDAS_MARKET_CALENDARS:
raise ImportError(
"需要安装 pandas_market_calendars: pip install pandas_market_calendars"
)
market_lower = market.lower()
# 直接调用 mcal根据市场选择日历
if market_lower in ['a', 'china']:
# A股上交所/深交所交易日历一致,统一使用 SSE
cal = mcal.get_calendar('SSE')
elif market_lower in ['us', 'usa', 'america']:
# 美股NYSE
cal = mcal.get_calendar('NYSE')
elif market_lower in ['hk', 'hongkong']:
# 港股HKEX
cal = mcal.get_calendar('HKEX')
else:
raise ValueError(f"不支持的市场: {market},支持: A/US/HK")
schedule = cal.schedule(start_date=start_date, end_date=end_date)
return pd.DatetimeIndex(schedule.index)
def get_calendar_info(self) -> Dict:
"""
获取交易日历支持信息
Returns:
支持的市场和交易所信息
"""
return {
"supported_markets": {
"A": {
"name": "A股",
"method": "pandas_market_calendars",
"exchanges": ["SSE"],
"default_exchange": "SSE",
"note": "上交所和深交所交易日历完全一致统一使用SSE",
},
"US": {
"name": "美股",
"method": "pandas_market_calendars",
"exchanges": ["NYSE", "NASDAQ"],
"default_exchange": "NYSE",
},
"HK": {
"name": "港股",
"method": "pandas_market_calendars",
"exchanges": ["HKEX"],
"default_exchange": "HKEX",
},
},
"pandas_market_calendars_installed": HAS_PANDAS_MARKET_CALENDARS,
"tushare_available": False, # 不再使用 Tushare
}
# ============================================================
# 统一复权入口(简化版,直接调用 fetch
# ============================================================
def fetch_with_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3
) -> Optional[pd.DataFrame]:
"""
统一复权入口(简化版)
直接调用 fetch(adj=adj),无需重复实现路由逻辑。
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型,默认 'raw'
retry: 重试次数
Returns:
DataFrame结构因资产类型和 adj 参数略有不同
示例:
# A股股票后复权
df = fetcher.fetch_with_adj("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch_with_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 直接调用 fetch传递 adj 参数
return self.fetch(code, start_date, end_date, adj, retry)