Files
etf/datasource/ccxt_source.py
aszerW 3619e26bf1 refactor(datasource): 统一数据获取架构,使用 df.attrs 传递元数据
核心改进:
- CCXTSource 添加 df.attrs 支持(source, exchange, symbol, timeframe, adj)
- UniversalDataFetcher 简化透传方法,保留兼容接口
- fetch_etf_with_nav 标记为 deprecated,推荐使用 fetch_etf + df.attrs
- 所有数据源统一契约:返回 DataFrame + df.attrs

架构改进:
- 统一返回单 DataFrame,元数据通过 attrs 传递
- 消除多返回值接口(price_df, nav_df, premium_series)
- 文档注释更新,反映新接口用法
- 添加 DeprecationWarning 提示迁移路径
2026-05-23 23:40:18 +08:00

308 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CCXT 加密货币数据源
通过 CCXT 库从 OKX 获取加密货币 K 线数据
特点:
- 支持 BTC/ETH 等主流加密货币
- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理
- 支持多种 timeframe1d, 1h, 15m 等)
"""
import os
import time
import threading
from typing import Optional, Dict
from datetime import datetime, timedelta
import pandas as pd
try:
import ccxt
CCXT_AVAILABLE = True
except ImportError:
CCXT_AVAILABLE = False
print("⚠️ ccxt 未安装,加密货币获取不可用")
from .socks2http import Socks2Http
class CCXTSource:
"""
CCXT 加密货币数据源
使用 OKX 交易所获取加密货币数据
需要通过 socks2http 服务将 SSH SOCKS5 代理转为 HTTP 代理
"""
# 支持的 timeframe 映射
TIMEFRAME_MAP = {
'1d': '1d',
'1h': '1h',
'4h': '4h',
'15m': '15m',
'1m': '1m',
'daily': '1d',
'hourly': '1h',
}
# 默认 HTTP 代理端口
DEFAULT_HTTP_PORT = 8080
# socks2http 服务实例
_proxy_server: Optional[Socks2Http] = None
_proxy_thread: Optional[threading.Thread] = None
_proxy_started = False
def __init__(
self,
socks_host: str = '127.0.0.1',
socks_port: int = 1080,
http_port: int = 8080,
exchange: str = 'okx'
):
"""
初始化
Args:
socks_host: SOCKS5 代理地址
socks_port: SOCKS5 代理端口
http_port: HTTP 代理端口socks2http 转换后)
exchange: 交易所名称(默认 okx
"""
self.socks_host = socks_host
self.socks_port = socks_port
self.http_port = http_port
self.exchange_name = exchange
self._exchange = None
def start_proxy(self) -> bool:
"""启动 socks2http 代理服务"""
if self._proxy_started:
return True
if not CCXT_AVAILABLE:
print("⚠️ ccxt 未安装,无法启动加密货币数据源")
return False
try:
# 启动 socks2http 服务
self._proxy_server = Socks2Http(
socks_host=self.socks_host,
socks_port=self.socks_port,
http_port=self.http_port
)
# 在后台线程启动
self._proxy_thread = threading.Thread(
target=self._proxy_server.start,
daemon=True
)
self._proxy_thread.start()
# 等待服务启动
time.sleep(1)
self._proxy_started = True
print(f"✓ socks2http 代理已启动: http://127.0.0.1:{self.http_port}")
return True
except Exception as e:
print(f"✗ 启动 socks2http 代理失败: {e}")
return False
def stop_proxy(self):
"""停止 socks2http 代理服务"""
if self._proxy_server:
self._proxy_server.stop()
self._proxy_server = None
self._proxy_started = False
print("socks2http 代理已停止")
def _create_exchange(self) -> Optional[ccxt.Exchange]:
"""创建交易所实例"""
if not CCXT_AVAILABLE:
return None
# 确保 proxy 已启动
if not self._proxy_started:
if not self.start_proxy():
return None
# 配置 CCXT使用 HTTP 代理)
config = {
'enableRateLimit': True,
'proxies': {
'http': f'http://127.0.0.1:{self.http_port}',
'https': f'http://127.0.0.1:{self.http_port}'
},
'timeout': 30000,
}
# 创建交易所实例
exchange_class = getattr(ccxt, self.exchange_name)
return exchange_class(config)
def fetch(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
获取加密货币 K 线数据
Args:
code: 加密货币代码BTC, ETH 等)
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
adj: 复权类型(加密货币仅支持 'raw'
timeframe: K 线周期1d, 1h, 4h, 15m, 1m
Returns:
DataFrame with columns: date, open, high, low, close, volume
"""
# 校验 adj 参数(加密货币仅支持 raw
if adj != 'raw':
raise ValueError(f"加密货币不支持复权adj='{adj}' 无效,仅支持 'raw'")
if not CCXT_AVAILABLE:
print(f"⚠️ ccxt 未安装,无法获取 {code}")
return None
# 创建交易所实例
if self._exchange is None:
self._exchange = self._create_exchange()
if self._exchange is None:
return None
# 转换代码格式BTC -> BTC/USDT
symbol = f"{code.upper()}/USDT"
# 转换 timeframe
tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d')
try:
# 计算时间范围
start_dt = pd.Timestamp(start_date).tz_localize('Asia/Shanghai')
end_dt = pd.Timestamp(end_date).tz_localize('Asia/Shanghai')
# 转换为毫秒时间戳
since = int(start_dt.timestamp() * 1000)
print(f"{self.exchange_name} 获取 {symbol} {tf} 数据...")
print(f"时间范围: {start_date} ~ {end_date}")
# 获取 K 线数据
ohlcv = self._exchange.fetch_ohlcv(symbol, tf, since, limit=1000)
if not ohlcv:
print(f"未获取到 {symbol} 数据")
return None
# 转换为 DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# 转换时间戳为日期索引UTC -> 北京时间)
df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
df = df.set_index('date')
df = df[['open', 'high', 'low', 'close', 'volume']]
# 过滤日期范围
df = df.loc[start_dt:end_dt]
# 添加元数据到 attrs
df.attrs['source'] = 'ccxt'
df.attrs['exchange'] = self.exchange_name
df.attrs['symbol'] = symbol
df.attrs['timeframe'] = tf
df.attrs['adj'] = 'raw'
print(f"✓ 获取成功: {len(df)} 条数据")
return df
except Exception as e:
print(f"✗ 获取 {symbol} 失败: {e}")
return None
def fetch_latest(
self,
code: str,
timeframe: str = '1d',
limit: int = 100
) -> Optional[pd.DataFrame]:
"""
获取最近 N 条数据
Args:
code: 加密货币代码
timeframe: K 线周期
limit: 数据条数
Returns:
DataFrame
"""
if not CCXT_AVAILABLE:
return None
if self._exchange is None:
self._exchange = self._create_exchange()
if self._exchange is None:
return None
symbol = f"{code.upper()}/USDT"
tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d')
try:
ohlcv = self._exchange.fetch_ohlcv(symbol, tf, limit=limit)
if not ohlcv:
return None
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# 转换时间戳为日期索引UTC -> 北京时间)
# 注意:保留完整时间精度(包括分钟),用于分钟级 K 线数据
df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
df = df.set_index('date')
df = df[['open', 'high', 'low', 'close', 'volume']]
# 添加元数据到 attrs
df.attrs['source'] = 'ccxt'
df.attrs['exchange'] = self.exchange_name
df.attrs['symbol'] = symbol
df.attrs['timeframe'] = tf
df.attrs['adj'] = 'raw'
# 注意:不再使用 normalize(),保留完整时间精度
return df
except Exception as e:
print(f"✗ 获取 {symbol} 失败: {e}")
return None
# 全局实例(延迟初始化)
_crypto_source: Optional[CCXTSource] = None
def get_crypto_source(
socks_host: str = '127.0.0.1',
socks_port: int = 1080,
http_port: int = 8080
) -> CCXTSource:
"""获取加密货币数据源实例"""
global _crypto_source
if _crypto_source is None:
_crypto_source = CCXTSource(
socks_host=socks_host,
socks_port=socks_port,
http_port=http_port
)
return _crypto_source