- 新增 ccxt_source.py: CCXT + OKX 加密货币数据源 - 新增 socks2http.py: SOCKS5 转 HTTP 代理转换器 - 修改 universal_fetcher.py: 添加 _fetch_crypto 方法,支持 timeframe 参数 - 修改 flask_server.py: API 支持 timeframe 参数,加密货币不缓存 支持的 timeframe: 1d, 1h, 4h, 15m, 1m 测试验证: BTC 数据获取成功
284 lines
8.2 KiB
Python
284 lines
8.2 KiB
Python
"""
|
||
CCXT 加密货币数据源
|
||
|
||
通过 CCXT 库从 OKX 获取加密货币 K 线数据
|
||
特点:
|
||
- 支持 BTC/ETH 等主流加密货币
|
||
- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理
|
||
- 支持多种 timeframe(1d, 1h, 15m 等)
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import threading
|
||
from typing import Optional, Dict
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
try:
|
||
import ccxt
|
||
CCXT_AVAILABLE = True
|
||
except ImportError:
|
||
CCXT_AVAILABLE = False
|
||
print("⚠️ ccxt 未安装,加密货币获取不可用")
|
||
|
||
from .socks2http import Socks2Http
|
||
|
||
|
||
class CCXTSource:
|
||
"""
|
||
CCXT 加密货币数据源
|
||
|
||
使用 OKX 交易所获取加密货币数据
|
||
需要通过 socks2http 服务将 SSH SOCKS5 代理转为 HTTP 代理
|
||
"""
|
||
|
||
# 支持的 timeframe 映射
|
||
TIMEFRAME_MAP = {
|
||
'1d': '1d',
|
||
'1h': '1h',
|
||
'4h': '4h',
|
||
'15m': '15m',
|
||
'1m': '1m',
|
||
'daily': '1d',
|
||
'hourly': '1h',
|
||
}
|
||
|
||
# 默认 HTTP 代理端口
|
||
DEFAULT_HTTP_PORT = 8080
|
||
|
||
# socks2http 服务实例
|
||
_proxy_server: Optional[Socks2Http] = None
|
||
_proxy_thread: Optional[threading.Thread] = None
|
||
_proxy_started = False
|
||
|
||
def __init__(
|
||
self,
|
||
socks_host: str = '127.0.0.1',
|
||
socks_port: int = 1080,
|
||
http_port: int = 8080,
|
||
exchange: str = 'okx'
|
||
):
|
||
"""
|
||
初始化
|
||
|
||
Args:
|
||
socks_host: SOCKS5 代理地址
|
||
socks_port: SOCKS5 代理端口
|
||
http_port: HTTP 代理端口(socks2http 转换后)
|
||
exchange: 交易所名称(默认 okx)
|
||
"""
|
||
self.socks_host = socks_host
|
||
self.socks_port = socks_port
|
||
self.http_port = http_port
|
||
self.exchange_name = exchange
|
||
self._exchange = None
|
||
|
||
def start_proxy(self) -> bool:
|
||
"""启动 socks2http 代理服务"""
|
||
if self._proxy_started:
|
||
return True
|
||
|
||
if not CCXT_AVAILABLE:
|
||
print("⚠️ ccxt 未安装,无法启动加密货币数据源")
|
||
return False
|
||
|
||
try:
|
||
# 启动 socks2http 服务
|
||
self._proxy_server = Socks2Http(
|
||
socks_host=self.socks_host,
|
||
socks_port=self.socks_port,
|
||
http_port=self.http_port
|
||
)
|
||
|
||
# 在后台线程启动
|
||
self._proxy_thread = threading.Thread(
|
||
target=self._proxy_server.start,
|
||
daemon=True
|
||
)
|
||
self._proxy_thread.start()
|
||
|
||
# 等待服务启动
|
||
time.sleep(1)
|
||
|
||
self._proxy_started = True
|
||
print(f"✓ socks2http 代理已启动: http://127.0.0.1:{self.http_port}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 启动 socks2http 代理失败: {e}")
|
||
return False
|
||
|
||
def stop_proxy(self):
|
||
"""停止 socks2http 代理服务"""
|
||
if self._proxy_server:
|
||
self._proxy_server.stop()
|
||
self._proxy_server = None
|
||
self._proxy_started = False
|
||
print("socks2http 代理已停止")
|
||
|
||
def _create_exchange(self) -> Optional[ccxt.Exchange]:
|
||
"""创建交易所实例"""
|
||
if not CCXT_AVAILABLE:
|
||
return None
|
||
|
||
# 确保 proxy 已启动
|
||
if not self._proxy_started:
|
||
if not self.start_proxy():
|
||
return None
|
||
|
||
# 配置 CCXT(使用 HTTP 代理)
|
||
config = {
|
||
'enableRateLimit': True,
|
||
'proxies': {
|
||
'http': f'http://127.0.0.1:{self.http_port}',
|
||
'https': f'http://127.0.0.1:{self.http_port}'
|
||
},
|
||
'timeout': 30000,
|
||
}
|
||
|
||
# 创建交易所实例
|
||
exchange_class = getattr(ccxt, self.exchange_name)
|
||
return exchange_class(config)
|
||
|
||
def fetch(
|
||
self,
|
||
code: str,
|
||
start_date: str,
|
||
end_date: str,
|
||
timeframe: str = '1d'
|
||
) -> Optional[pd.DataFrame]:
|
||
"""
|
||
获取加密货币 K 线数据
|
||
|
||
Args:
|
||
code: 加密货币代码(BTC, ETH 等)
|
||
start_date: 开始日期 YYYY-MM-DD
|
||
end_date: 结束日期 YYYY-MM-DD
|
||
timeframe: K 线周期(1d, 1h, 4h, 15m, 1m)
|
||
|
||
Returns:
|
||
DataFrame with columns: date, open, high, low, close, volume
|
||
"""
|
||
if not CCXT_AVAILABLE:
|
||
print(f"⚠️ ccxt 未安装,无法获取 {code}")
|
||
return None
|
||
|
||
# 创建交易所实例
|
||
if self._exchange is None:
|
||
self._exchange = self._create_exchange()
|
||
|
||
if self._exchange is None:
|
||
return None
|
||
|
||
# 转换代码格式:BTC -> BTC/USDT
|
||
symbol = f"{code.upper()}/USDT"
|
||
|
||
# 转换 timeframe
|
||
tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d')
|
||
|
||
try:
|
||
# 计算时间范围
|
||
start_dt = pd.Timestamp(start_date).tz_localize('Asia/Shanghai')
|
||
end_dt = pd.Timestamp(end_date).tz_localize('Asia/Shanghai')
|
||
|
||
# 转换为毫秒时间戳
|
||
since = int(start_dt.timestamp() * 1000)
|
||
|
||
print(f"从 {self.exchange_name} 获取 {symbol} {tf} 数据...")
|
||
print(f"时间范围: {start_date} ~ {end_date}")
|
||
|
||
# 获取 K 线数据
|
||
ohlcv = self._exchange.fetch_ohlcv(symbol, tf, since, limit=1000)
|
||
|
||
if not ohlcv:
|
||
print(f"未获取到 {symbol} 数据")
|
||
return None
|
||
|
||
# 转换为 DataFrame
|
||
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
||
|
||
# 转换时间戳为日期索引(UTC -> 北京时间)
|
||
df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
|
||
df = df.set_index('date')
|
||
df = df[['open', 'high', 'low', 'close', 'volume']]
|
||
|
||
# 过滤日期范围
|
||
df = df.loc[start_dt:end_dt]
|
||
|
||
print(f"✓ 获取成功: {len(df)} 条数据")
|
||
|
||
return df
|
||
|
||
except Exception as e:
|
||
print(f"✗ 获取 {symbol} 失败: {e}")
|
||
return None
|
||
|
||
def fetch_latest(
|
||
self,
|
||
code: str,
|
||
timeframe: str = '1d',
|
||
limit: int = 100
|
||
) -> Optional[pd.DataFrame]:
|
||
"""
|
||
获取最近 N 条数据
|
||
|
||
Args:
|
||
code: 加密货币代码
|
||
timeframe: K 线周期
|
||
limit: 数据条数
|
||
|
||
Returns:
|
||
DataFrame
|
||
"""
|
||
if not CCXT_AVAILABLE:
|
||
return None
|
||
|
||
if self._exchange is None:
|
||
self._exchange = self._create_exchange()
|
||
|
||
if self._exchange is None:
|
||
return None
|
||
|
||
symbol = f"{code.upper()}/USDT"
|
||
tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d')
|
||
|
||
try:
|
||
ohlcv = self._exchange.fetch_ohlcv(symbol, tf, limit=limit)
|
||
|
||
if not ohlcv:
|
||
return None
|
||
|
||
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
||
df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
|
||
df = df.set_index('date')
|
||
df = df[['open', 'high', 'low', 'close', 'volume']]
|
||
df.index = df.index.normalize()
|
||
|
||
return df
|
||
|
||
except Exception as e:
|
||
print(f"✗ 获取 {symbol} 失败: {e}")
|
||
return None
|
||
|
||
|
||
# 全局实例(延迟初始化)
|
||
_crypto_source: Optional[CCXTSource] = None
|
||
|
||
|
||
def get_crypto_source(
|
||
socks_host: str = '127.0.0.1',
|
||
socks_port: int = 1080,
|
||
http_port: int = 8080
|
||
) -> CCXTSource:
|
||
"""获取加密货币数据源实例"""
|
||
global _crypto_source
|
||
|
||
if _crypto_source is None:
|
||
_crypto_source = CCXTSource(
|
||
socks_host=socks_host,
|
||
socks_port=socks_port,
|
||
http_port=http_port
|
||
)
|
||
|
||
return _crypto_source |