Files
etf/datasource/flask_server.py
aszerW d62763b0bd feat(flask): OHLCV 端点自动附加 ETF 净值和溢价率
flask_server.py:
- 当 asset_type 为 china_etf 时,自动调用 fetch_etf_with_nav
- 响应中添加 nav、premium_series、latest_premium、premium_stats

flask_api_source.py:
- 解析 ETF 数据中的净值和溢价率信息
- 将 nav_df、premium_series、premium_stats 存入 DataFrame.attrs
2026-05-14 00:57:37 +08:00

890 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Flask 数据服务 API
==================
提供 RESTful API 接口,支持获取各类资产的 K 线数据
特性:
- 分层架构:各资产类型独立实现
- LRU + TTL 双缓存机制
- SSH隧道支持港美股
- ETF净值获取计算溢价率
运行:
python datasource/flask_server.py
API 文档:
GET / - 服务信息
GET /health - 健康检查
GET /api/v1/asset-type - 检测资产类型
GET /api/v1/ohlcv - 获取K线数据
POST /api/v1/ohlcv/batch - 批量获取K线数据
GET /api/v1/etf/nav - 获取ETF净值
POST /api/v1/cache/clear - 清理缓存
GET /api/v1/cache/stats - 缓存统计
"""
import os
import sys
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple
from functools import lru_cache
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv()
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_compress import Compress
import pandas as pd
from datasource.universal_fetcher import UniversalDataFetcher
from datasource.asset_type_detector import AssetTypeDetector, AssetType
# ============================================================
# Flask 应用配置
# ============================================================
app = Flask(__name__)
CORS(app) # 启用跨域支持
Compress(app) # 启用 gzip 压缩
# 全局数据获取器实例
fetcher: Optional[UniversalDataFetcher] = None
ssh_config: Optional[Dict] = None
# 缓存配置
CACHE_MAXSIZE = int(os.getenv('CACHE_MAXSIZE', '128'))
CACHE_TTL_SECONDS = int(os.getenv('CACHE_TTL_SECONDS', '7200')) # 默认2小时
# 默认数据起点(下载全量数据时使用)
# 设置为1980年以支持最长历史数据标普500/日经225等
DEFAULT_START_DATE = os.getenv('DEFAULT_START_DATE', '1980-01-01')
class TimedCacheEntry:
"""带时间戳的缓存条目"""
def __init__(self, data: Any):
self.data = data
self.timestamp = datetime.now()
def is_expired(self) -> bool:
return (datetime.now() - self.timestamp).total_seconds() > CACHE_TTL_SECONDS
# TTL缓存存储
_ttl_cache: Dict[Tuple, TimedCacheEntry] = {}
# ============================================================
# 初始化
# ============================================================
def get_ssh_config() -> Optional[Dict]:
"""从环境变量获取 SSH 配置"""
enabled = os.getenv('SSH_ENABLED', 'false').lower() == 'true'
if not enabled:
return None
return {
"enabled": True,
"host": os.getenv('SSH_HOST', ''),
"port": int(os.getenv('SSH_PORT', '22')),
"username": os.getenv('SSH_USERNAME', ''),
"key_path": os.getenv('SSH_KEY_PATH', 'hk_ecs.pem'),
"local_port": int(os.getenv('SSH_LOCAL_PORT', '1080')),
}
def get_fetcher() -> UniversalDataFetcher:
"""获取或创建数据获取器实例"""
global fetcher, ssh_config
if fetcher is None:
ssh_config = get_ssh_config()
fetcher = UniversalDataFetcher(ssh_config=ssh_config)
return fetcher
# ============================================================
# 缓存机制
# ============================================================
@lru_cache(maxsize=CACHE_MAXSIZE)
def _fetch_full_data_cached(code: str, today: str) -> Optional[str]:
"""
缓存全量数据(仅日级别数据)
缓存策略:
- 日级别数据(股票/指数/ETF/期货): 从 DEFAULT_START_DATE 到 today
- 加密货币: 不缓存,每次实时下载
缓存Key: (code, today_date)
- today: 实际的今天日期,用于每日更新缓存
Returns:
JSON 序列化的全量数据(仅日级别数据)
"""
f = get_fetcher()
# 检查资产类型
asset_type = AssetTypeDetector.detect(code)
# 加密货币不缓存
if asset_type == AssetType.CRYPTO:
return None # 不缓存加密货币
try:
with f:
# 下载数据:从默认起点到今天
df = f.fetch(code, DEFAULT_START_DATE, today)
if df is None or len(df) == 0:
return None
# 保存为 DataFrame 格式(方便后续切片)
result = {
'df_json': dataframe_to_json(df),
'code': code,
'asset_type': asset_type.value,
'data_start': df.index.min().strftime('%Y-%m-%d') if len(df) > 0 else None,
'data_end': df.index.max().strftime('%Y-%m-%d') if len(df) > 0 else None,
'cache_strategy': 'full_history',
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": str(e)})
def _slice_data_from_cache(cached_data: Dict, start: str, end: str) -> Dict:
"""
从缓存的全量数据中切片指定日期范围
Args:
cached_data: 缓存的全量数据
start: 用户请求的开始日期
end: 用户请求的结束日期
Returns:
切片后的数据JSON格式
"""
if 'df_json' not in cached_data or 'data' not in cached_data['df_json']:
return cached_data
# 从缓存数据中重建 DataFrame
records = cached_data['df_json']['data']
info_data = cached_data['df_json'].get('info', None) # 从缓存获取 info
if not records:
result = {
'data': [],
'count': 0,
'code': cached_data['code'],
'asset_type': cached_data['asset_type'],
'requested_range': {'start': start, 'end': end},
'available_range': {'start': cached_data['data_start'], 'end': cached_data['data_end']},
}
# 保留 info如果有
if info_data:
result['info'] = info_data
return result
# 转换为 DataFrame
df = pd.DataFrame(records)
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
# 恢复 attrs如果有 info
if info_data:
df.attrs['info'] = info_data
# 切片日期范围
start_dt = pd.to_datetime(start)
end_dt = pd.to_datetime(end)
# 确保索引已排序
df = df.sort_index()
# 切片(使用 loc 进行日期范围选择)
sliced_df = df.loc[start_dt:end_dt]
# 转换为 JSON 格式dataframe_to_json 会处理 df.attrs['info']
result = dataframe_to_json(sliced_df)
result['code'] = cached_data['code']
result['asset_type'] = cached_data['asset_type']
result['requested_range'] = {'start': start, 'end': end}
result['available_range'] = {'start': cached_data['data_start'], 'end': cached_data['data_end']}
return result
def fetch_data_with_ttl(
code: str,
start: str,
end: str,
nocache: bool = False,
timeframe: str = '1d'
) -> Tuple[Optional[Dict], bool]:
"""
获取数据,支持 TTL 缓存(加密货币不缓存)
缓存策略:
- 日级别数据(股票/指数/ETF/期货): Key=(code, today), 缓存全量数据,切片返回
- 加密货币: 每次实时下载,不缓存,必须指定 timeframe
Args:
code: 标的代码
start: 用户请求的开始日期
end: 用户请求的结束日期
nocache: 是否跳过缓存
timeframe: K线周期仅加密货币需要
Returns:
(data, is_cached): 数据和是否命中缓存
"""
# 获取今天的实际日期用于缓存Key
today = datetime.now().strftime('%Y-%m-%d')
# 检查资产类型
asset_type = AssetTypeDetector.detect(code)
# 加密货币:直接下载,不缓存,必须指定 timeframe
if asset_type == AssetType.CRYPTO:
f = get_fetcher()
try:
with f:
df = f.fetch(code, start, end, timeframe=timeframe)
if df is None or len(df) == 0:
return None, False
result = dataframe_to_json(df)
result['code'] = code
result['asset_type'] = asset_type.value
result['cache_strategy'] = 'no_cache_crypto'
result['requested_range'] = {'start': start, 'end': end}
result['timeframe'] = timeframe
return result, False
except Exception as e:
return {'error': str(e), 'code': code, 'asset_type': asset_type.value}, False
# 日级别数据:使用缓存
full_cache_key = (code, today)
# 跳过缓存:清理缓存后重新下载
if nocache:
_fetch_full_data_cached.cache_clear()
global _ttl_cache
_ttl_cache.clear()
result_json = _fetch_full_data_cached(code, today)
if result_json is None:
return None, False
full_data = json.loads(result_json)
return (_slice_data_from_cache(full_data, start, end), False)
# 检查 TTL 缓存(全量数据缓存)
if full_cache_key in _ttl_cache:
entry = _ttl_cache[full_cache_key]
if not entry.is_expired():
# 从缓存切片
sliced_data = _slice_data_from_cache(entry.data, start, end)
return sliced_data, True
# 过期,删除
del _ttl_cache[full_cache_key]
# 从 LRU 缓存获取全量数据
result_json = _fetch_full_data_cached(code, today)
if result_json is None:
return None, False
full_data = json.loads(result_json)
# 检查是否有错误
if "error" in full_data:
return full_data, False
# 存入 TTL 缓存(存全量数据)
_ttl_cache[full_cache_key] = TimedCacheEntry(full_data)
# 从全量数据切片返回用户请求的范围
sliced_data = _slice_data_from_cache(full_data, start, end)
return sliced_data, False
def clear_cache():
"""清理所有缓存"""
global _ttl_cache
_fetch_full_data_cached.cache_clear()
_ttl_cache.clear()
def get_cache_info() -> Dict:
"""获取缓存统计信息"""
info = _fetch_full_data_cached.cache_info()
return {
"lru_cache": {
"hits": info.hits,
"misses": info.misses,
"maxsize": info.maxsize,
"currsize": info.currsize,
},
"ttl_cache_size": len(_ttl_cache),
"ttl_seconds": CACHE_TTL_SECONDS,
"default_start_date": DEFAULT_START_DATE,
"cache_strategy": "full_data_by_code_and_today",
}
# ============================================================
# DataFrame 转换
# ============================================================
class JSONEncoder(json.JSONEncoder):
"""自定义 JSON 编码器,处理特殊类型"""
def default(self, obj):
# 处理 pandas Timestamp
if hasattr(obj, 'isoformat'):
return obj.isoformat()
# 处理 numpy 类型
if hasattr(obj, 'item'):
return obj.item()
# 处理 NaN/Infinity
if isinstance(obj, float):
if obj != obj: # NaN
return None
if obj == float('inf'):
return None
if obj == float('-inf'):
return None
return super().default(obj)
def dataframe_to_json(df: pd.DataFrame) -> Dict:
"""将 DataFrame 转换为 JSON 可序列化的字典
如果 df.attrs 中有 info 字段,会放到最外层返回
"""
if df is None or len(df) == 0:
result = {"data": [], "count": 0}
# 即使空数据也返回 info如果有
if hasattr(df, 'attrs') and 'info' in df.attrs:
result['info'] = df.attrs['info']
return result
# 重置索引
df_reset = df.reset_index()
# 处理日期列
date_columns = ['date', 'Date', 'index', 'trade_date', 'datetime']
for col in date_columns:
if col in df_reset.columns:
try:
df_reset[col] = pd.to_datetime(df_reset[col]).dt.strftime('%Y-%m-%d')
if col != 'date':
df_reset = df_reset.rename(columns={col: 'date'})
break
except Exception:
pass
# 处理特殊值NaN, Infinity
df_clean = df_reset.copy()
for col in df_clean.columns:
if df_clean[col].dtype in ['float64', 'float32']:
df_clean[col] = df_clean[col].replace([float('inf'), float('-inf')], None)
df_clean[col] = df_clean[col].where(df_clean[col].notna(), None)
# 转换为字典列表
records = df_clean.to_dict(orient='records')
# 构建返回结果
result = {
"data": records,
"count": len(records),
"columns": list(df_clean.columns),
"date_range": {
"start": df.index.min().strftime('%Y-%m-%d') if len(df) > 0 else None,
"end": df.index.max().strftime('%Y-%m-%d') if len(df) > 0 else None,
}
}
# 将 info 从 df.attrs 放到最外层
if hasattr(df, 'attrs') and 'info' in df.attrs:
result['info'] = df.attrs['info']
return result
def validate_date(date_str: str) -> bool:
"""验证日期格式"""
try:
datetime.strptime(date_str, '%Y-%m-%d')
return True
except ValueError:
return False
def get_default_dates() -> Tuple[str, str]:
"""获取默认日期范围最近3个月"""
end = datetime.now()
start = end - timedelta(days=90)
return start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
# ============================================================
# API 路由
# ============================================================
@app.route('/')
def index():
"""首页 - API 信息"""
return jsonify({
"name": "Universal Data Fetcher API",
"version": "2.0.0",
"description": "统一数据获取服务(分层架构)",
"architecture": "Unified entry + Asset-specific methods",
"features": [
"分层架构(各资产类型独立实现)",
"LRU + TTL 双缓存机制",
"SSH隧道支持港美股",
"ETF净值获取计算溢价率",
],
"endpoints": {
"info": "/",
"health": "/health",
"asset_type": "/api/v1/asset-type?code={code}",
"ohlcv": "/api/v1/ohlcv?code={code}&start={YYYY-MM-DD}&end={YYYY-MM-DD}&asset_type={type}",
"ohlcv_nocache": "/api/v1/ohlcv?code={code}&nocache=true",
"ohlcv_crypto": "/api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe)",
"ohlcv_asset_type": "/api/v1/ohlcv?code={code}&asset_type=china_index (强制覆盖类型)",
"batch": "POST /api/v1/ohlcv/batch",
"etf_nav": "/api/v1/etf/nav?code={code}",
"cache_clear": "POST /api/v1/cache/clear",
"cache_stats": "/api/v1/cache/stats",
},
"crypto_timeframes": {
"1d": "日线",
"1h": "小时线",
"4h": "4小时线",
"15m": "15分钟线",
"1m": "分钟线",
},
"asset_types": {
"china_index": "中国指数 (000300.SH, 399006.SZ等)",
"china_etf": "中国ETF (159915.SZ, 513100.SH等)",
"us_index": "美股指数 (NDX, SPX, N225等)",
"hk_index": "港股指数 (HSI, HSTECH.HK等)",
"futures": "期货 (AU.SHF, CU.SHF等)",
"crypto": "加密货币 (BTC, ETH - 不缓存)",
},
"supported_assets": {
"china_index": ["000300.SH", "399006.SZ", "H30269.CSI"],
"china_etf": ["159915.SZ", "513100.SH", "518880.SH"],
"hk_index": ["HSI", "HSTECH.HK"],
"us_index": ["NDX", "SPX", "N225", "GDAXI"],
"futures": ["AU.SHF", "CU.SHF", "CL.NYM"],
"crypto": ["BTC", "ETH"],
},
"cache_config": get_cache_info(),
"ssh_status": "enabled" if ssh_config and ssh_config.get('enabled') else "disabled",
})
@app.route('/health')
def health():
"""健康检查"""
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"ssh_configured": ssh_config is not None and ssh_config.get('enabled', False),
})
@app.route('/api/v1/asset-type')
def detect_asset_type():
"""检测资产类型"""
code = request.args.get('code', '').strip()
if not code:
return jsonify({
"error": "Missing required parameter: code",
"example": "/api/v1/asset-type?code=000300.SH"
}), 400
asset_type = AssetTypeDetector.detect(code)
description = AssetTypeDetector.get_description(asset_type)
return jsonify({
"code": code,
"asset_type": asset_type.value,
"description": description,
})
@app.route('/api/v1/ohlcv')
def get_ohlcv():
"""
获取单只标的的 OHLCV 数据
Query Parameters:
code: 标的代码 (required)
start: 开始日期 YYYY-MM-DD (optional, 默认90天前)
end: 结束日期 YYYY-MM-DD (optional, 默认今天)
asset_type: 资产类型 (optional, 强制覆盖自动检测结果)
- china_index: 中国指数
- china_etf: 中国ETF
- us_index: 美股指数
- hk_index: 港股指数
- futures: 期货
- crypto: 加密货币
注:指定后会覆盖自动检测,用于修复检测逻辑问题
timeframe: K线周期 (optional, 仅加密货币需要)
- 1d: 日线(默认)
- 1h: 小时线
- 4h: 4小时线
- 15m: 15分钟线
- 1m: 分钟线
nocache: 是否跳过缓存 (optional, 默认false)
"""
code = request.args.get('code', '').strip()
start = request.args.get('start', '').strip()
end = request.args.get('end', '').strip()
asset_type_param = request.args.get('asset_type', '').strip().lower()
timeframe = request.args.get('timeframe', '1d').strip().lower()
nocache = request.args.get('nocache', 'false').lower() == 'true'
# 参数验证
if not code:
return jsonify({
"error": "Missing required parameter: code",
"example": "/api/v1/ohlcv?code=000300.SH&start=2024-01-01&end=2024-03-31",
"asset_type_hint": "可选 asset_type 参数强制指定类型",
}), 400
# 设置默认日期
if not start or not end:
start, end = get_default_dates()
# 日期格式验证
if not validate_date(start) or not validate_date(end):
return jsonify({
"error": "Invalid date format. Use YYYY-MM-DD",
"start": start,
"end": end,
}), 400
# 自动检测资产类型
detected_type = AssetTypeDetector.detect(code)
# 最终使用的类型:优先使用用户指定的类型
final_type = detected_type
if asset_type_param:
try:
# 将字符串转换为 AssetType强制覆盖自动检测结果
final_type = AssetType(asset_type_param)
except ValueError:
return jsonify({
"error": f"Invalid asset_type: {asset_type_param}",
"valid_types": [t.value for t in AssetType],
}), 400
# 加密货币必须指定 timeframe无论自动检测还是手动指定
if final_type == AssetType.CRYPTO:
valid_timeframes = ['1d', '1h', '4h', '15m', '1m', 'daily', 'hourly']
if timeframe not in valid_timeframes:
return jsonify({
"error": f"Invalid timeframe for crypto: {timeframe}",
"valid_timeframes": valid_timeframes,
"hint": "加密货币必须指定 timeframe 参数",
}), 400
# 使用缓存获取数据(加密货币不缓存)
result, is_cached = fetch_data_with_ttl(code, start, end, nocache, timeframe)
if result is None:
return jsonify({
"code": code,
"asset_type": final_type.value,
"detected_type": detected_type.value if asset_type_param else None, # 仅当用户指定时显示
"error": "No data available",
"start": start,
"end": end,
}), 404
if "error" in result:
return jsonify({
"code": code,
"asset_type": final_type.value,
"detected_type": detected_type.value if asset_type_param else None,
"error": result["error"],
}), 500
result['cached'] = is_cached
result['asset_type'] = final_type.value # 使用最终类型
# 如果是中国 ETF自动附加净值和溢价率数据
if final_type == AssetType.CHINA_ETF:
try:
f = get_fetcher()
with f:
price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end)
# 添加净值数据
if nav_df is not None and len(nav_df) > 0:
result['nav'] = dataframe_to_json(nav_df)
# 添加溢价率序列
if premium_series is not None and len(premium_series) > 0:
premium_data = [
{"date": date.strftime('%Y-%m-%d'), "premium": round(premium, 6)}
for date, premium in premium_series.items()
]
result['premium_series'] = premium_data
# 最新溢价率
latest_premium = premium_series.iloc[-1]
latest_date = premium_series.index[-1].strftime('%Y-%m-%d')
result['latest_premium'] = round(latest_premium, 6)
result['premium_date'] = latest_date
# 溢价率统计
result['premium_stats'] = {
"mean": round(premium_series.mean(), 6),
"std": round(premium_series.std(), 6),
"min": round(premium_series.min(), 6),
"max": round(premium_series.max(), 6),
"median": round(premium_series.median(), 6),
}
except Exception as e:
# 净值获取失败不影响主数据返回
result['nav_error'] = str(e)
# 如果用户指定了类型但与自动检测不同,显示提示
if asset_type_param and detected_type != final_type:
result['type_override'] = {
"detected": detected_type.value,
"specified": final_type.value,
"hint": "用户强制覆盖了自动检测结果",
}
return jsonify(result)
@app.route('/api/v1/ohlcv/batch', methods=['POST'])
def batch_ohlcv():
"""批量获取多只标的的 OHLCV 数据"""
data = request.get_json()
if not data:
return jsonify({
"error": "Missing request body",
"example": {
"codes": ["000300.SH", "NDX"],
"start": "2024-01-01",
"end": "2024-03-31",
}
}), 400
codes = data.get('codes', [])
start = data.get('start', '').strip()
end = data.get('end', '').strip()
if not codes or not isinstance(codes, list):
return jsonify({
"error": "Missing or invalid parameter: codes (must be a list)"
}), 400
if not start or not end:
start, end = get_default_dates()
if not validate_date(start) or not validate_date(end):
return jsonify({
"error": "Invalid date format. Use YYYY-MM-DD",
}), 400
# 获取数据
f = get_fetcher()
results = {}
success_count = 0
failed_count = 0
try:
with f:
for code in codes:
result, _ = fetch_data_with_ttl(code, start, end)
if result is not None and "error" not in result:
results[code] = result
success_count += 1
else:
results[code] = {
"code": code,
"asset_type": AssetTypeDetector.detect(code).value,
"error": result.get("error", "No data") if result else "No data",
"data": [],
"count": 0,
}
failed_count += 1
except Exception as e:
return jsonify({"error": f"Batch fetch failed: {str(e)}"}), 500
return jsonify({
"results": results,
"success_count": success_count,
"failed_count": failed_count,
"total": len(codes),
"start": start,
"end": end,
})
@app.route('/api/v1/etf/nav')
def get_etf_nav():
"""获取ETF净值数据用于计算溢价率"""
code = request.args.get('code', '').strip()
start = request.args.get('start', '').strip()
end = request.args.get('end', '').strip()
if not code:
return jsonify({
"error": "Missing required parameter: code",
"example": "/api/v1/etf/nav?code=513100.SH"
}), 400
if not start or not end:
start, end = get_default_dates()
# 检查是否为ETF
asset_type = AssetTypeDetector.detect(code)
if asset_type != AssetType.CHINA_ETF:
return jsonify({
"error": f"Not an ETF: {code} (type: {asset_type.value})",
"hint": "Only A股ETF (codes starting with 51/52/15/16) supported",
}), 400
# 获取净值和溢价率
f = get_fetcher()
try:
with f:
price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end)
result = {
"code": code,
"price": dataframe_to_json(price_df) if price_df else {"data": [], "count": 0},
"nav": dataframe_to_json(nav_df) if nav_df else {"data": [], "count": 0},
}
# 添加历史溢价率序列
if premium_series is not None and len(premium_series) > 0:
# 转换为日期-溢价率列表
premium_data = [
{"date": date.strftime('%Y-%m-%d'), "premium": round(premium, 6)}
for date, premium in premium_series.items()
]
result['premium_series'] = premium_data
# 最新溢价率
latest_premium = premium_series.iloc[-1]
latest_date = premium_series.index[-1].strftime('%Y-%m-%d')
result['latest_premium'] = round(latest_premium, 6)
result['premium_date'] = latest_date
# 溢价率统计
result['premium_stats'] = {
"mean": round(premium_series.mean(), 6),
"std": round(premium_series.std(), 6),
"min": round(premium_series.min(), 6),
"max": round(premium_series.max(), 6),
"median": round(premium_series.median(), 6),
}
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/v1/cache/clear', methods=['POST'])
def clear_cache_endpoint():
"""清理缓存"""
info_before = get_cache_info()
clear_cache()
return jsonify({
"message": "Cache cleared successfully",
"before": info_before,
"after": get_cache_info()
})
@app.route('/api/v1/cache/stats')
def cache_stats():
"""获取缓存统计"""
return jsonify(get_cache_info())
# ============================================================
# 错误处理
# ============================================================
@app.errorhandler(404)
def not_found(error):
return jsonify({
"error": "Endpoint not found",
"available_endpoints": [
"/", "/health",
"/api/v1/asset-type",
"/api/v1/ohlcv",
"/api/v1/ohlcv/batch",
"/api/v1/etf/nav",
"/api/v1/cache/clear",
"/api/v1/cache/stats",
]
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
"error": "Internal server error",
"message": str(error)
}), 500
# ============================================================
# 启动服务
# ============================================================
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Universal Data Fetcher API Server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind')
parser.add_argument('--port', type=int, default=80, help='Port to bind')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
# 预加载 SSH 配置
ssh_config = get_ssh_config()
if ssh_config and ssh_config.get('enabled'):
print(f"✓ SSH 隧道已配置: {ssh_config['host']}:{ssh_config['port']}")
else:
print("✗ SSH 隧道未启用仅支持A股数据")
print(f"\n🚀 Universal Data Fetcher API Server v2.0")
print(f" Host: {args.host}")
print(f" Port: {args.port}")
print(f" Cache: LRU({CACHE_MAXSIZE}) + TTL({CACHE_TTL_SECONDS}s)")
print(f"\n📖 API: http://{args.host}:{args.port}/")
print(f" 健康检查: http://{args.host}:{args.port}/health")
app.run(host=args.host, port=args.port, debug=args.debug)