diff --git a/datasource/ccxt_source.py b/datasource/ccxt_source.py new file mode 100644 index 0000000..87b413b --- /dev/null +++ b/datasource/ccxt_source.py @@ -0,0 +1,284 @@ +""" +CCXT 加密货币数据源 + +通过 CCXT 库从 OKX 获取加密货币 K 线数据 +特点: +- 支持 BTC/ETH 等主流加密货币 +- 需要通过 socks2http 将 SOCKS5 转 HTTP 代理 +- 支持多种 timeframe(1d, 1h, 15m 等) +""" + +import os +import time +import threading +from typing import Optional, Dict +from datetime import datetime, timedelta +import pandas as pd + +try: + import ccxt + CCXT_AVAILABLE = True +except ImportError: + CCXT_AVAILABLE = False + print("⚠️ ccxt 未安装,加密货币获取不可用") + +from .socks2http import Socks2Http + + +class CCXTSource: + """ + CCXT 加密货币数据源 + + 使用 OKX 交易所获取加密货币数据 + 需要通过 socks2http 服务将 SSH SOCKS5 代理转为 HTTP 代理 + """ + + # 支持的 timeframe 映射 + TIMEFRAME_MAP = { + '1d': '1d', + '1h': '1h', + '4h': '4h', + '15m': '15m', + '1m': '1m', + 'daily': '1d', + 'hourly': '1h', + } + + # 默认 HTTP 代理端口 + DEFAULT_HTTP_PORT = 8080 + + # socks2http 服务实例 + _proxy_server: Optional[Socks2Http] = None + _proxy_thread: Optional[threading.Thread] = None + _proxy_started = False + + def __init__( + self, + socks_host: str = '127.0.0.1', + socks_port: int = 1080, + http_port: int = 8080, + exchange: str = 'okx' + ): + """ + 初始化 + + Args: + socks_host: SOCKS5 代理地址 + socks_port: SOCKS5 代理端口 + http_port: HTTP 代理端口(socks2http 转换后) + exchange: 交易所名称(默认 okx) + """ + self.socks_host = socks_host + self.socks_port = socks_port + self.http_port = http_port + self.exchange_name = exchange + self._exchange = None + + def start_proxy(self) -> bool: + """启动 socks2http 代理服务""" + if self._proxy_started: + return True + + if not CCXT_AVAILABLE: + print("⚠️ ccxt 未安装,无法启动加密货币数据源") + return False + + try: + # 启动 socks2http 服务 + self._proxy_server = Socks2Http( + socks_host=self.socks_host, + socks_port=self.socks_port, + http_port=self.http_port + ) + + # 在后台线程启动 + self._proxy_thread = threading.Thread( + target=self._proxy_server.start, + daemon=True + ) + self._proxy_thread.start() + + # 等待服务启动 + time.sleep(1) + + self._proxy_started = True + print(f"✓ socks2http 代理已启动: http://127.0.0.1:{self.http_port}") + return True + + except Exception as e: + print(f"✗ 启动 socks2http 代理失败: {e}") + return False + + def stop_proxy(self): + """停止 socks2http 代理服务""" + if self._proxy_server: + self._proxy_server.stop() + self._proxy_server = None + self._proxy_started = False + print("socks2http 代理已停止") + + def _create_exchange(self) -> Optional[ccxt.Exchange]: + """创建交易所实例""" + if not CCXT_AVAILABLE: + return None + + # 确保 proxy 已启动 + if not self._proxy_started: + if not self.start_proxy(): + return None + + # 配置 CCXT(使用 HTTP 代理) + config = { + 'enableRateLimit': True, + 'proxies': { + 'http': f'http://127.0.0.1:{self.http_port}', + 'https': f'http://127.0.0.1:{self.http_port}' + }, + 'timeout': 30000, + } + + # 创建交易所实例 + exchange_class = getattr(ccxt, self.exchange_name) + return exchange_class(config) + + def fetch( + self, + code: str, + start_date: str, + end_date: str, + timeframe: str = '1d' + ) -> Optional[pd.DataFrame]: + """ + 获取加密货币 K 线数据 + + Args: + code: 加密货币代码(BTC, ETH 等) + start_date: 开始日期 YYYY-MM-DD + end_date: 结束日期 YYYY-MM-DD + timeframe: K 线周期(1d, 1h, 4h, 15m, 1m) + + Returns: + DataFrame with columns: date, open, high, low, close, volume + """ + if not CCXT_AVAILABLE: + print(f"⚠️ ccxt 未安装,无法获取 {code}") + return None + + # 创建交易所实例 + if self._exchange is None: + self._exchange = self._create_exchange() + + if self._exchange is None: + return None + + # 转换代码格式:BTC -> BTC/USDT + symbol = f"{code.upper()}/USDT" + + # 转换 timeframe + tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d') + + try: + # 计算时间范围 + start_dt = pd.Timestamp(start_date).tz_localize('Asia/Shanghai') + end_dt = pd.Timestamp(end_date).tz_localize('Asia/Shanghai') + + # 转换为毫秒时间戳 + since = int(start_dt.timestamp() * 1000) + + print(f"从 {self.exchange_name} 获取 {symbol} {tf} 数据...") + print(f"时间范围: {start_date} ~ {end_date}") + + # 获取 K 线数据 + ohlcv = self._exchange.fetch_ohlcv(symbol, tf, since, limit=1000) + + if not ohlcv: + print(f"未获取到 {symbol} 数据") + return None + + # 转换为 DataFrame + df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) + + # 转换时间戳为日期索引(UTC -> 北京时间) + df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai') + df = df.set_index('date') + df = df[['open', 'high', 'low', 'close', 'volume']] + + # 过滤日期范围 + df = df.loc[start_dt:end_dt] + + print(f"✓ 获取成功: {len(df)} 条数据") + + return df + + except Exception as e: + print(f"✗ 获取 {symbol} 失败: {e}") + return None + + def fetch_latest( + self, + code: str, + timeframe: str = '1d', + limit: int = 100 + ) -> Optional[pd.DataFrame]: + """ + 获取最近 N 条数据 + + Args: + code: 加密货币代码 + timeframe: K 线周期 + limit: 数据条数 + + Returns: + DataFrame + """ + if not CCXT_AVAILABLE: + return None + + if self._exchange is None: + self._exchange = self._create_exchange() + + if self._exchange is None: + return None + + symbol = f"{code.upper()}/USDT" + tf = self.TIMEFRAME_MAP.get(timeframe.lower(), '1d') + + try: + ohlcv = self._exchange.fetch_ohlcv(symbol, tf, limit=limit) + + if not ohlcv: + return None + + df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) + df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai') + df = df.set_index('date') + df = df[['open', 'high', 'low', 'close', 'volume']] + df.index = df.index.normalize() + + return df + + except Exception as e: + print(f"✗ 获取 {symbol} 失败: {e}") + return None + + +# 全局实例(延迟初始化) +_crypto_source: Optional[CCXTSource] = None + + +def get_crypto_source( + socks_host: str = '127.0.0.1', + socks_port: int = 1080, + http_port: int = 8080 +) -> CCXTSource: + """获取加密货币数据源实例""" + global _crypto_source + + if _crypto_source is None: + _crypto_source = CCXTSource( + socks_host=socks_host, + socks_port=socks_port, + http_port=http_port + ) + + return _crypto_source \ No newline at end of file diff --git a/datasource/flask_server.py b/datasource/flask_server.py index 85a1f27..7c6a94b 100644 --- a/datasource/flask_server.py +++ b/datasource/flask_server.py @@ -230,20 +230,22 @@ def fetch_data_with_ttl( code: str, start: str, end: str, - nocache: bool = False + nocache: bool = False, + timeframe: str = '1d' ) -> Tuple[Optional[Dict], bool]: """ 获取数据,支持 TTL 缓存(加密货币不缓存) 缓存策略: - 日级别数据(股票/指数/ETF/期货): Key=(code, today), 缓存全量数据,切片返回 - - 加密货币: 每次实时下载,不缓存 + - 加密货币: 每次实时下载,不缓存,必须指定 timeframe Args: code: 标的代码 start: 用户请求的开始日期 end: 用户请求的结束日期 nocache: 是否跳过缓存 + timeframe: K线周期(仅加密货币需要) Returns: (data, is_cached): 数据和是否命中缓存 @@ -254,12 +256,12 @@ def fetch_data_with_ttl( # 检查资产类型 asset_type = AssetTypeDetector.detect(code) - # 加密货币:直接下载,不缓存 + # 加密货币:直接下载,不缓存,必须指定 timeframe if asset_type == AssetType.CRYPTO: f = get_fetcher() try: with f: - df = f.fetch(code, start, end) + df = f.fetch(code, start, end, timeframe=timeframe) if df is None or len(df) == 0: return None, False result = dataframe_to_json(df) @@ -267,6 +269,7 @@ def fetch_data_with_ttl( result['asset_type'] = asset_type.value result['cache_strategy'] = 'no_cache_crypto' result['requested_range'] = {'start': start, 'end': end} + result['timeframe'] = timeframe return result, False except Exception as e: return {'error': str(e), 'code': code, 'asset_type': asset_type.value}, False @@ -459,12 +462,20 @@ def index(): "asset_type": "/api/v1/asset-type?code={code}", "ohlcv": "/api/v1/ohlcv?code={code}&start={YYYY-MM-DD}&end={YYYY-MM-DD}&asset_type={type}", "ohlcv_nocache": "/api/v1/ohlcv?code={code}&nocache=true", + "ohlcv_crypto": "/api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe)", "ohlcv_asset_type": "/api/v1/ohlcv?code={code}&asset_type=china_index (强制覆盖类型)", "batch": "POST /api/v1/ohlcv/batch", "etf_nav": "/api/v1/etf/nav?code={code}", "cache_clear": "POST /api/v1/cache/clear", "cache_stats": "/api/v1/cache/stats", }, + "crypto_timeframes": { + "1d": "日线", + "1h": "小时线", + "4h": "4小时线", + "15m": "15分钟线", + "1m": "分钟线", + }, "asset_types": { "china_index": "中国指数 (000300.SH, 399006.SZ等)", "china_etf": "中国ETF (159915.SZ, 513100.SH等)", @@ -534,12 +545,19 @@ def get_ohlcv(): - futures: 期货 - crypto: 加密货币 注:指定后会覆盖自动检测,用于修复检测逻辑问题 + timeframe: K线周期 (optional, 仅加密货币需要) + - 1d: 日线(默认) + - 1h: 小时线 + - 4h: 4小时线 + - 15m: 15分钟线 + - 1m: 分钟线 nocache: 是否跳过缓存 (optional, 默认false) """ code = request.args.get('code', '').strip() start = request.args.get('start', '').strip() end = request.args.get('end', '').strip() asset_type_param = request.args.get('asset_type', '').strip().lower() + timeframe = request.args.get('timeframe', '1d').strip().lower() nocache = request.args.get('nocache', 'false').lower() == 'true' # 参数验证 @@ -577,8 +595,18 @@ def get_ohlcv(): "valid_types": [t.value for t in AssetType], }), 400 - # 使用缓存获取数据 - result, is_cached = fetch_data_with_ttl(code, start, end, nocache) + # 加密货币必须指定 timeframe(无论自动检测还是手动指定) + if final_type == AssetType.CRYPTO: + valid_timeframes = ['1d', '1h', '4h', '15m', '1m', 'daily', 'hourly'] + if timeframe not in valid_timeframes: + return jsonify({ + "error": f"Invalid timeframe for crypto: {timeframe}", + "valid_timeframes": valid_timeframes, + "hint": "加密货币必须指定 timeframe 参数", + }), 400 + + # 使用缓存获取数据(加密货币不缓存) + result, is_cached = fetch_data_with_ttl(code, start, end, nocache, timeframe) if result is None: return jsonify({ diff --git a/datasource/socks2http.py b/datasource/socks2http.py new file mode 100644 index 0000000..37ba687 --- /dev/null +++ b/datasource/socks2http.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +SOCKS5 转 HTTP 代理转发工具 +将 SSH 隧道的 SOCKS5 代理 (1080) 转为 HTTP 代理 (8080) +供 CCXT 等只支持 HTTP 代理的库使用 +""" + +import socket +import threading +import select +import sys +from urllib.parse import urlparse + + +class Socks2Http: + def __init__(self, socks_host='127.0.0.1', socks_port=1080, http_port=8080): + self.socks_host = socks_host + self.socks_port = socks_port + self.http_port = http_port + self.server = None + self.running = False + + def start(self): + """启动 HTTP 代理服务器""" + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind(('127.0.0.1', self.http_port)) + self.server.listen(5) + self.running = True + + print(f"HTTP 代理已启动: http://127.0.0.1:{self.http_port}") + print(f"转发到 SOCKS5: {self.socks_host}:{self.socks_port}") + + while self.running: + try: + client, addr = self.server.accept() + thread = threading.Thread(target=self._handle_client, args=(client,)) + thread.daemon = True + thread.start() + except Exception as e: + if self.running: + print(f"接受连接错误: {e}") + + def stop(self): + """停止代理服务器""" + self.running = False + if self.server: + self.server.close() + print("HTTP 代理已停止") + + def _handle_client(self, client): + """处理客户端连接""" + try: + # 读取 HTTP 请求 + request = client.recv(4096) + if not request: + client.close() + return + + # 解析 CONNECT 请求 + first_line = request.split(b'\r\n')[0].decode('utf-8', errors='ignore') + + if first_line.startswith('CONNECT'): + # HTTPS 代理 + parts = first_line.split() + if len(parts) >= 2: + target = parts[1] + host, port = target.rsplit(':', 1) + port = int(port) + + # 连接到 SOCKS5 代理 + remote = self._connect_via_socks5(host, port) + if remote: + client.send(b'HTTP/1.1 200 Connection established\r\n\r\n') + self._relay(client, remote) + else: + client.send(b'HTTP/1.1 502 Bad Gateway\r\n\r\n') + else: + # HTTP 代理 + lines = first_line.split() + if len(lines) >= 2: + url = lines[1] + parsed = urlparse(url) + host = parsed.hostname + port = parsed.port or 80 + + # 连接到 SOCKS5 代理 + remote = self._connect_via_socks5(host, port) + if remote: + # 修改请求,去掉完整 URL + new_request = request.replace( + f'{lines[0]} {url} '.encode(), + f'{lines[0]} {parsed.path or "/"}{"?" + parsed.query if parsed.query else ""} '.encode() + ) + remote.send(new_request) + self._relay(client, remote) + + except Exception as e: + print(f"处理客户端错误: {e}") + finally: + client.close() + + def _connect_via_socks5(self, host, port): + """通过 SOCKS5 代理连接目标服务器""" + try: + # 连接到 SOCKS5 代理 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(30) + sock.connect((self.socks_host, self.socks_port)) + + # SOCKS5 握手 + # 1. 发送认证方法 + sock.send(b'\x05\x01\x00') # VER=5, NMETHODS=1, METHOD=0 (无认证) + resp = sock.recv(2) + if resp[0] != 0x05 or resp[1] != 0x00: + sock.close() + return None + + # 2. 发送连接请求 + req = b'\x05\x01\x00\x03' # VER=5, CMD=CONNECT, RSV=0, ATYP=DOMAIN + req += bytes([len(host)]) + host.encode() + req += bytes([(port >> 8) & 0xFF, port & 0xFF]) + sock.send(req) + + # 3. 读取响应 + resp = sock.recv(10) + if len(resp) < 4 or resp[1] != 0x00: + sock.close() + return None + + return sock + + except Exception as e: + print(f"SOCKS5 连接错误: {e}") + return None + + def _relay(self, client, remote): + """双向转发数据""" + try: + while True: + readable, _, _ = select.select([client, remote], [], [], 60) + if not readable: + break + + if client in readable: + data = client.recv(4096) + if not data: + break + remote.send(data) + + if remote in readable: + data = remote.recv(4096) + if not data: + break + client.send(data) + except: + pass + finally: + client.close() + remote.close() + + +if __name__ == '__main__': + proxy = Socks2Http(socks_port=1080, http_port=8080) + try: + proxy.start() + except KeyboardInterrupt: + proxy.stop() diff --git a/datasource/universal_fetcher.py b/datasource/universal_fetcher.py index 9550cb0..2dae549 100644 --- a/datasource/universal_fetcher.py +++ b/datasource/universal_fetcher.py @@ -29,6 +29,7 @@ from .tushare_source import TushareSource from .yfinance_source import YFinanceSource from .ssh_tunnel import SSHTunnelManager from .asset_type_detector import AssetTypeDetector, AssetType +from .ccxt_source import CCXTSource, get_crypto_source class UniversalDataFetcher: @@ -65,6 +66,9 @@ class UniversalDataFetcher: # SSH隧道(延迟初始化) self._tunnel: Optional[SSHTunnelManager] = None self._tunnel_started = False + + # 加密货币数据源(延迟初始化) + self._crypto: Optional[CCXTSource] = None def __enter__(self): """上下文管理器入口""" @@ -108,7 +112,8 @@ class UniversalDataFetcher: code: str, start_date: str, end_date: str, - retry: int = 3 + retry: int = 3, + timeframe: str = '1d' ) -> Optional[pd.DataFrame]: """ 统一数据获取入口 @@ -120,6 +125,7 @@ class UniversalDataFetcher: start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' retry: 重试次数 + timeframe: K线周期(仅加密货币需要,默认1d) Returns: DataFrame with columns: date, open, high, low, close, volume @@ -144,7 +150,7 @@ class UniversalDataFetcher: elif asset_type == AssetType.FUTURES: return self._fetch_futures(code, start_date, end_date) elif asset_type == AssetType.CRYPTO: - return self._fetch_crypto(code, start_date, end_date) + return self._fetch_crypto(code, start_date, end_date, timeframe) else: print(f"⚠️ 未知资产类型: {code} -> {asset_type}") return None @@ -333,17 +339,31 @@ class UniversalDataFetcher: self, code: str, start_date: str, - end_date: str + end_date: str, + timeframe: str = '1d' ) -> Optional[pd.DataFrame]: """ 获取加密货币 - 特点:CCXT,不支持SOCKS5代理 + 特点: + - 使用 CCXT + OKX + - 需要通过 socks2http 将 SOCKS5 转 HTTP 代理 + - 必须指定 timeframe + - 不缓存(每次实时下载) - TODO: 实现加密货币获取 + Args: + code: 加密货币代码(BTC, ETH) + start_date: 开始日期 + end_date: 结束日期 + timeframe: K线周期(1d, 1h, 4h, 15m, 1m) """ - print(f"⚠️ 加密货币数据获取尚未实现: {code}") - return None + # 延迟初始化加密货币数据源 + if self._crypto is None: + # 使用 SSH 隧道的 SOCKS5 端口 + socks_port = self.ssh_config.get('local_port', 1080) + self._crypto = get_crypto_source(socks_port=socks_port) + + return self._crypto.fetch(code, start_date, end_date, timeframe) # ============================================================ # 批量获取