Files
etf/datasource/universal_fetcher.py
aszerW 416f708d53 feat(datasource): 实现加密货币数据获取功能
- 新增 ccxt_source.py: CCXT + OKX 加密货币数据源
- 新增 socks2http.py: SOCKS5 转 HTTP 代理转换器
- 修改 universal_fetcher.py: 添加 _fetch_crypto 方法,支持 timeframe 参数
- 修改 flask_server.py: API 支持 timeframe 参数,加密货币不缓存

支持的 timeframe: 1d, 1h, 4h, 15m, 1m
测试验证: BTC 数据获取成功
2026-05-13 23:30:32 +08:00

417 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
统一数据获取器
分层架构:对外统一接口,对内按资产类型独立实现
支持A股指数/ETF、港股指数、美股指数、期货、加密货币
用法:
from datasource import UniversalDataFetcher
fetcher = UniversalDataFetcher()
# 单标的获取(自动识别类型)
df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31")
# ETF获取含净值
price_df, nav_df = fetcher.fetch_etf_with_nav("513100.SH", "2024-01-01", "2024-12-31")
# 批量获取
results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31")
"""
import os
import time
from typing import Optional, Dict, List, Tuple
from datetime import datetime
import pandas as pd
from .tushare_source import TushareSource
from .yfinance_source import YFinanceSource
from .ssh_tunnel import SSHTunnelManager
from .asset_type_detector import AssetTypeDetector, AssetType
from .ccxt_source import CCXTSource, get_crypto_source
class UniversalDataFetcher:
"""
统一数据获取器
分层架构:
- 对外:统一 fetch() 接口,自动路由
- 对内:各资产类型独立方法,职责单一
"""
def __init__(
self,
ssh_config: Optional[Dict] = None,
use_cache: bool = True,
cache_dir: str = "data/etf_cache/daily"
):
"""
初始化
Args:
ssh_config: SSH隧道配置用于港美股
use_cache: 是否使用本地缓存
cache_dir: 缓存目录
"""
self.ssh_config = ssh_config or {}
self.use_cache = use_cache
self.cache_dir = cache_dir
# 数据源实例
self._tushare = TushareSource()
self._yfinance = YFinanceSource()
# SSH隧道延迟初始化
self._tunnel: Optional[SSHTunnelManager] = None
self._tunnel_started = False
# 加密货币数据源(延迟初始化)
self._crypto: Optional[CCXTSource] = None
def __enter__(self):
"""上下文管理器入口"""
self._start_tunnel()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self._stop_tunnel()
# ============================================================
# SSH隧道管理
# ============================================================
def _start_tunnel(self) -> bool:
"""启动SSH隧道"""
if self._tunnel_started:
return True
if self.ssh_config.get('enabled'):
self._tunnel = SSHTunnelManager(self.ssh_config)
if self._tunnel.start():
self._tunnel_started = True
return True
return False
return True
def _stop_tunnel(self):
"""停止SSH隧道"""
if self._tunnel:
self._tunnel.stop()
self._tunnel = None
self._tunnel_started = False
# ============================================================
# 统一入口(自动路由)
# ============================================================
def fetch(
self,
code: str,
start_date: str,
end_date: str,
retry: int = 3,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
统一数据获取入口
自动识别资产类型并路由到对应方法
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
retry: 重试次数
timeframe: K线周期仅加密货币需要默认1d
Returns:
DataFrame with columns: date, open, high, low, close, volume
"""
asset_type = AssetTypeDetector.detect(code)
for attempt in range(retry):
try:
# 路由到具体方法
if asset_type == AssetType.CHINA_INDEX:
return self._fetch_china_index(code, start_date, end_date)
elif asset_type == AssetType.CHINA_ETF:
return self._fetch_china_etf(code, start_date, end_date)
elif asset_type == AssetType.US_INDEX:
return self._fetch_us_index(code, start_date, end_date)
elif asset_type == AssetType.US_STOCK:
return self._fetch_us_stock(code, start_date, end_date)
elif asset_type == AssetType.HK_INDEX:
return self._fetch_hk_index(code, start_date, end_date)
elif asset_type == AssetType.HK_STOCK:
return self._fetch_hk_stock(code, start_date, end_date)
elif asset_type == AssetType.FUTURES:
return self._fetch_futures(code, start_date, end_date)
elif asset_type == AssetType.CRYPTO:
return self._fetch_crypto(code, start_date, end_date, timeframe)
else:
print(f"⚠️ 未知资产类型: {code} -> {asset_type}")
return None
except Exception as e:
if attempt < retry - 1:
time.sleep(2)
else:
print(f"✗ 获取 {code} 失败 (尝试 {attempt+1}/{retry}): {e}")
return None
return None
# ============================================================
# 分层实现:各资产类型独立方法
# ============================================================
def _fetch_china_index(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股指数
特点Tushare API无需SSH隧道
"""
return self._tushare.fetch_index(code, start_date, end_date)
def _fetch_china_etf(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取A股ETF价格
特点Tushare fund_daily接口
"""
return self._tushare.fetch_etf(code, start_date, end_date)
def fetch_etf_with_nav(
self,
code: str,
start_date: str,
end_date: str
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]:
"""
获取ETF价格 + 净值 + 溢价率序列
计算每一天的溢价率,用于分析溢价率走势
Args:
code: ETF代码
start_date: 开始日期
end_date: 结束日期
Returns:
(price_df, nav_df, premium_series)
- price_df: ETF价格数据 (OHLCV)
- nav_df: ETF净值数据
- premium_series: 溢价率序列 (每天计算)
"""
price_df = self._tushare.fetch_etf(code, start_date, end_date)
nav_df = self._tushare.fetch_etf_nav(code, start_date, end_date)
# 计算历史溢价率序列
premium_series = None
if price_df is not None and nav_df is not None and len(nav_df) > 0:
premium_series = self._calculate_premium_series(price_df, nav_df)
return price_df, nav_df, premium_series
def _calculate_premium_series(
self,
price_df: pd.DataFrame,
nav_df: pd.DataFrame
) -> Optional[pd.Series]:
"""
计算历史溢价率序列
溢价率 = (ETF收盘价 - ETF净值) / ETF净值
注意净值数据通常T+1公布需要处理日期对齐问题
Args:
price_df: ETF价格数据索引为日期
nav_df: ETF净值数据索引为日期
Returns:
溢价率Series索引为日期值为溢价率
"""
# 对齐日期净值用ffill填充因为T+1公布
# 价格日期可能比净值日期多一天
aligned_nav = nav_df['nav'].reindex(price_df.index, method='ffill')
# 计算溢价率
close_prices = price_df['close']
premium = (close_prices - aligned_nav) / aligned_nav
# 过滤掉无效值(净值缺失的日期)
premium = premium.dropna()
return premium
def _fetch_us_index(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取美股指数
特点YFinance需要SSH隧道指数代码转换
"""
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
def _fetch_us_stock(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取美股股票
特点YFinance需要SSH隧道返回价格+股票信息
"""
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
def _fetch_hk_index(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取港股指数
特点YFinance需要SSH隧道
"""
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
def _fetch_hk_stock(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取港股股票
特点YFinance需要SSH隧道返回价格+股票信息
"""
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
def _fetch_futures(
self,
code: str,
start_date: str,
end_date: str
) -> Optional[pd.DataFrame]:
"""
获取期货数据
特点:
- 中国期货(.SHF/.DCE/.CZC): Tushare
- NYMEX(.NYM): YFinance
"""
if code.endswith('.NYM'):
# NYMEX期货走YFinance
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
else:
# 中国期货走Tushare
return self._tushare.fetch_futures(code, start_date, end_date)
def _fetch_crypto(
self,
code: str,
start_date: str,
end_date: str,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
获取加密货币
特点:
- 使用 CCXT + OKX
- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理
- 必须指定 timeframe
- 不缓存(每次实时下载)
Args:
code: 加密货币代码BTC, ETH
start_date: 开始日期
end_date: 结束日期
timeframe: K线周期1d, 1h, 4h, 15m, 1m
"""
# 延迟初始化加密货币数据源
if self._crypto is None:
# 使用 SSH 隧道的 SOCKS5 端口
socks_port = self.ssh_config.get('local_port', 1080)
self._crypto = get_crypto_source(socks_port=socks_port)
return self._crypto.fetch(code, start_date, end_date, timeframe)
# ============================================================
# 批量获取
# ============================================================
def fetch_batch(
self,
codes: List[str],
start_date: str,
end_date: str,
retry: int = 3
) -> Dict[str, Optional[pd.DataFrame]]:
"""
批量获取多只标的数据
Args:
codes: 代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
{code: DataFrame}
"""
results = {}
# 按资产类型分组
grouped = AssetTypeDetector.group_by_type(codes)
print(f"开始获取 {len(codes)} 只标的...")
for asset_type, code_list in grouped.items():
print(f" {asset_type.value}: {len(code_list)}")
# 启动隧道(港美股需要)
self._start_tunnel()
for code in codes:
results[code] = self.fetch(code, start_date, end_date, retry)
return results
# ============================================================
# 辅助方法
# ============================================================
def get_asset_type(self, code: str) -> AssetType:
"""获取资产类型"""
return AssetTypeDetector.detect(code)
def is_supported(self, code: str) -> bool:
"""判断是否支持该代码"""
return AssetTypeDetector.detect(code) != AssetType.UNKNOWN