""" CCXT 加密货币数据源 通过 CCXT 库从 OKX 获取加密货币 K 线数据 特点: - 支持 BTC/ETH 等主流加密货币 - 需要通过 socks2http 将 SOCKS5 转 HTTP 代理 - 支持多种 timeframe(1d, 1h, 15m 等) """ import os import time import threading from typing import Optional, Dict from datetime import datetime, timedelta import pandas as pd try: import ccxt CCXT_AVAILABLE = True except ImportError: CCXT_AVAILABLE = False print("⚠️ ccxt 未安装,加密货币获取不可用") from .socks2http import Socks2Http class CCXTSource: """ CCXT 加密货币数据源 使用 OKX 交易所获取加密货币数据 需要通过 socks2http 服务将 SSH SOCKS5 代理转为 HTTP 代理 """ # 支持的 timeframe 映射 TIMEFRAME_MAP = { '1d': '1d', '1h': '1h', '4h': '4h', '15m': '15m', '1m': '1m', 'daily': '1d', 'hourly': '1h', } # 默认 HTTP 代理端口 DEFAULT_HTTP_PORT = 8080 # socks2http 服务实例 _proxy_server: Optional[Socks2Http] = None _proxy_thread: Optional[threading.Thread] = None _proxy_started = False def __init__( self, socks_host: str = '127.0.0.1', socks_port: int = 1080, http_port: int = 8080, exchange: str = 'okx' ): """ 初始化 Args: socks_host: SOCKS5 代理地址 socks_port: SOCKS5 代理端口 http_port: HTTP 代理端口(socks2http 转换后) exchange: 交易所名称(默认 okx) """ self.socks_host = socks_host self.socks_port = socks_port self.http_port = http_port self.exchange_name = exchange self._exchange = None def start_proxy(self) -> bool: """启动 socks2http 代理服务""" if self._proxy_started: return True if not CCXT_AVAILABLE: print("⚠️ ccxt 未安装,无法启动加密货币数据源") return False try: # 启动 socks2http 服务 self._proxy_server = Socks2Http( socks_host=self.socks_host, socks_port=self.socks_port, http_port=self.http_port ) # 在后台线程启动 self._proxy_thread = threading.Thread( target=self._proxy_server.start, daemon=True ) self._proxy_thread.start() # 等待服务启动 time.sleep(1) self._proxy_started = True print(f"✓ socks2http 代理已启动: http://127.0.0.1:{self.http_port}") return True except Exception as e: print(f"✗ 启动 socks2http 代理失败: {e}") return False def stop_proxy(self): """停止 socks2http 代理服务""" if self._proxy_server: self._proxy_server.stop() self._proxy_server = None self._proxy_started = False print("socks2http 代理已停止") def _create_exchange(self) -> Optional[ccxt.Exchange]: """创建交易所实例""" if not CCXT_AVAILABLE: return None # 确保 proxy 已启动 if not self._proxy_started: if not self.start_proxy(): return None # 配置 CCXT(使用 HTTP 代理) config = { 'enableRateLimit': True, 'proxies': { 'http': f'http://127.0.0.1:{self.http_port}', 'https': f'http://127.0.0.1:{self.http_port}' }, 'timeout': 30000, } # 创建交易所实例 exchange_class = getattr(ccxt, self.exchange_name) return exchange_class(config) def fetch( self, code: str, start_date: str, end_date: str, timeframe: str = '1d' ) -> Optional[pd.DataFrame]: """ 获取加密货币 K 线数据 Args: code: 加密货币代码(BTC, ETH 等) start_date: 开始日期 YYYY-MM-DD end_date: 结束日期 YYYY-MM-DD timeframe: K 线周期(1d, 1h, 4h, 15m, 1m) Returns: DataFrame with columns: date, open, high, low, close, volume """ if not CCXT_AVAILABLE: print(f"⚠️ ccxt 未安装,无法获取 {code}") return None # 创建交易所实例 if self._exchange is None: self._exchange = self._create_exchange() if self._exchange is None: return None # 转换代码格式:BTC -> BTC/USDT symbol = f"{code.upper()}/USDT" # 转换 timeframe tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d') try: # 计算时间范围 start_dt = pd.Timestamp(start_date).tz_localize('Asia/Shanghai') end_dt = pd.Timestamp(end_date).tz_localize('Asia/Shanghai') # 转换为毫秒时间戳 since = int(start_dt.timestamp() * 1000) print(f"从 {self.exchange_name} 获取 {symbol} {tf} 数据...") print(f"时间范围: {start_date} ~ {end_date}") # 获取 K 线数据 ohlcv = self._exchange.fetch_ohlcv(symbol, tf, since, limit=1000) if not ohlcv: print(f"未获取到 {symbol} 数据") return None # 转换为 DataFrame df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # 转换时间戳为日期索引(UTC -> 北京时间) df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai') df = df.set_index('date') df = df[['open', 'high', 'low', 'close', 'volume']] # 过滤日期范围 df = df.loc[start_dt:end_dt] print(f"✓ 获取成功: {len(df)} 条数据") return df except Exception as e: print(f"✗ 获取 {symbol} 失败: {e}") return None def fetch_latest( self, code: str, timeframe: str = '1d', limit: int = 100 ) -> Optional[pd.DataFrame]: """ 获取最近 N 条数据 Args: code: 加密货币代码 timeframe: K 线周期 limit: 数据条数 Returns: DataFrame """ if not CCXT_AVAILABLE: return None if self._exchange is None: self._exchange = self._create_exchange() if self._exchange is None: return None symbol = f"{code.upper()}/USDT" tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d') try: ohlcv = self._exchange.fetch_ohlcv(symbol, tf, limit=limit) if not ohlcv: return None df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai') df = df.set_index('date') df = df[['open', 'high', 'low', 'close', 'volume']] df.index = df.index.normalize() return df except Exception as e: print(f"✗ 获取 {symbol} 失败: {e}") return None # 全局实例(延迟初始化) _crypto_source: Optional[CCXTSource] = None def get_crypto_source( socks_host: str = '127.0.0.1', socks_port: int = 1080, http_port: int = 8080 ) -> CCXTSource: """获取加密货币数据源实例""" global _crypto_source if _crypto_source is None: _crypto_source = CCXTSource( socks_host=socks_host, socks_port=socks_port, http_port=http_port ) return _crypto_source