""" Flask 数据服务 API ================== 提供 RESTful API 接口,支持获取各类资产的 K 线数据 特性: - 分层架构:各资产类型独立实现 - LRU + TTL 双缓存机制 - SSH隧道支持(港美股) - ETF净值获取(计算溢价率) 运行: python datasource/flask_server.py API 文档: GET / - 服务信息 GET /health - 健康检查 GET /api/v1/asset-type - 检测资产类型 GET /api/v1/ohlcv - 获取K线数据 POST /api/v1/ohlcv/batch - 批量获取K线数据 GET /api/v1/etf/nav - 获取ETF净值 POST /api/v1/cache/clear - 清理缓存 GET /api/v1/cache/stats - 缓存统计 """ import os import sys import json from pathlib import Path from datetime import datetime, timedelta from typing import Optional, Dict, Any, List, Tuple from functools import lru_cache # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv() from flask import Flask, request, jsonify from flask_cors import CORS from flask_compress import Compress import pandas as pd from datasource.universal_fetcher import UniversalDataFetcher from datasource.asset_type_detector import AssetTypeDetector, AssetType # ============================================================ # Flask 应用配置 # ============================================================ app = Flask(__name__) CORS(app) # 启用跨域支持 Compress(app) # 启用 gzip 压缩 # 全局数据获取器实例 fetcher: Optional[UniversalDataFetcher] = None ssh_config: Optional[Dict] = None # 缓存配置 CACHE_MAXSIZE = int(os.getenv('CACHE_MAXSIZE', '128')) CACHE_TTL_SECONDS = int(os.getenv('CACHE_TTL_SECONDS', '7200')) # 默认2小时 # 默认数据起点(下载全量数据时使用) # 设置为1980年以支持最长历史数据(标普500/日经225等) DEFAULT_START_DATE = os.getenv('DEFAULT_START_DATE', '1980-01-01') class TimedCacheEntry: """带时间戳的缓存条目""" def __init__(self, data: Any): self.data = data self.timestamp = datetime.now() def is_expired(self) -> bool: return (datetime.now() - self.timestamp).total_seconds() > CACHE_TTL_SECONDS # TTL缓存存储 _ttl_cache: Dict[Tuple, TimedCacheEntry] = {} # ============================================================ # 初始化 # ============================================================ def get_ssh_config() -> Optional[Dict]: """从环境变量获取 SSH 配置""" enabled = os.getenv('SSH_ENABLED', 'false').lower() == 'true' if not enabled: return None return { "enabled": True, "host": os.getenv('SSH_HOST', ''), "port": int(os.getenv('SSH_PORT', '22')), "username": os.getenv('SSH_USERNAME', ''), "key_path": os.getenv('SSH_KEY_PATH', 'hk_ecs.pem'), "local_port": int(os.getenv('SSH_LOCAL_PORT', '1080')), } def get_fetcher() -> UniversalDataFetcher: """获取或创建数据获取器实例""" global fetcher, ssh_config if fetcher is None: ssh_config = get_ssh_config() fetcher = UniversalDataFetcher(ssh_config=ssh_config) return fetcher # ============================================================ # 缓存机制 # ============================================================ @lru_cache(maxsize=CACHE_MAXSIZE) def _fetch_full_data_cached(code: str, today: str) -> Optional[str]: """ 缓存全量数据(仅日级别数据) 缓存策略: - 日级别数据(股票/指数/ETF/期货): 从 DEFAULT_START_DATE 到 today - 加密货币: 不缓存,每次实时下载 缓存Key: (code, today_date) - today: 实际的今天日期,用于每日更新缓存 Returns: JSON 序列化的全量数据(仅日级别数据) """ f = get_fetcher() # 检查资产类型 asset_type = AssetTypeDetector.detect(code) # 加密货币不缓存 if asset_type == AssetType.CRYPTO: return None # 不缓存加密货币 try: with f: # 下载数据:从默认起点到今天 df = f.fetch(code, DEFAULT_START_DATE, today) if df is None or len(df) == 0: return None # 保存为 DataFrame 格式(方便后续切片) result = { 'df_json': dataframe_to_json(df), 'code': code, 'asset_type': asset_type.value, 'data_start': df.index.min().strftime('%Y-%m-%d') if len(df) > 0 else None, 'data_end': df.index.max().strftime('%Y-%m-%d') if len(df) > 0 else None, 'cache_strategy': 'full_history', } return json.dumps(result) except Exception as e: return json.dumps({"error": str(e)}) def _slice_data_from_cache(cached_data: Dict, start: str, end: str) -> Dict: """ 从缓存的全量数据中切片指定日期范围 Args: cached_data: 缓存的全量数据 start: 用户请求的开始日期 end: 用户请求的结束日期 Returns: 切片后的数据(JSON格式) """ if 'df_json' not in cached_data or 'data' not in cached_data['df_json']: return cached_data # 从缓存数据中重建 DataFrame records = cached_data['df_json']['data'] info_data = cached_data['df_json'].get('info', None) # 从缓存获取 info if not records: result = { 'data': [], 'count': 0, 'code': cached_data['code'], 'asset_type': cached_data['asset_type'], 'requested_range': {'start': start, 'end': end}, 'available_range': {'start': cached_data['data_start'], 'end': cached_data['data_end']}, } # 保留 info(如果有) if info_data: result['info'] = info_data return result # 转换为 DataFrame df = pd.DataFrame(records) if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']) df = df.set_index('date') # 恢复 attrs(如果有 info) if info_data: df.attrs['info'] = info_data # 切片日期范围 start_dt = pd.to_datetime(start) end_dt = pd.to_datetime(end) # 确保索引已排序 df = df.sort_index() # 切片(使用 loc 进行日期范围选择) sliced_df = df.loc[start_dt:end_dt] # 转换为 JSON 格式(dataframe_to_json 会处理 df.attrs['info']) result = dataframe_to_json(sliced_df) result['code'] = cached_data['code'] result['asset_type'] = cached_data['asset_type'] result['requested_range'] = {'start': start, 'end': end} result['available_range'] = {'start': cached_data['data_start'], 'end': cached_data['data_end']} return result def fetch_data_with_ttl( code: str, start: str, end: str, nocache: bool = False, timeframe: str = '1d' ) -> Tuple[Optional[Dict], bool]: """ 获取数据,支持 TTL 缓存(加密货币不缓存) 缓存策略: - 日级别数据(股票/指数/ETF/期货): Key=(code, today), 缓存全量数据,切片返回 - 加密货币: 每次实时下载,不缓存,必须指定 timeframe Args: code: 标的代码 start: 用户请求的开始日期 end: 用户请求的结束日期 nocache: 是否跳过缓存 timeframe: K线周期(仅加密货币需要) Returns: (data, is_cached): 数据和是否命中缓存 """ # 获取今天的实际日期(用于缓存Key) today = datetime.now().strftime('%Y-%m-%d') # 检查资产类型 asset_type = AssetTypeDetector.detect(code) # 加密货币:直接下载,不缓存,必须指定 timeframe if asset_type == AssetType.CRYPTO: f = get_fetcher() try: with f: df = f.fetch(code, start, end, timeframe=timeframe) if df is None or len(df) == 0: return None, False result = dataframe_to_json(df, asset_type.value) result['code'] = code result['asset_type'] = asset_type.value result['cache_strategy'] = 'no_cache_crypto' result['requested_range'] = {'start': start, 'end': end} result['timeframe'] = timeframe return result, False except Exception as e: return {'error': str(e), 'code': code, 'asset_type': asset_type.value}, False # 日级别数据:使用缓存 full_cache_key = (code, today) # 跳过缓存:清理缓存后重新下载 if nocache: _fetch_full_data_cached.cache_clear() global _ttl_cache _ttl_cache.clear() result_json = _fetch_full_data_cached(code, today) if result_json is None: return None, False full_data = json.loads(result_json) return (_slice_data_from_cache(full_data, start, end), False) # 检查 TTL 缓存(全量数据缓存) if full_cache_key in _ttl_cache: entry = _ttl_cache[full_cache_key] if not entry.is_expired(): # 从缓存切片 sliced_data = _slice_data_from_cache(entry.data, start, end) return sliced_data, True # 过期,删除 del _ttl_cache[full_cache_key] # 从 LRU 缓存获取全量数据 result_json = _fetch_full_data_cached(code, today) if result_json is None: return None, False full_data = json.loads(result_json) # 检查是否有错误 if "error" in full_data: return full_data, False # 存入 TTL 缓存(存全量数据) _ttl_cache[full_cache_key] = TimedCacheEntry(full_data) # 从全量数据切片返回用户请求的范围 sliced_data = _slice_data_from_cache(full_data, start, end) return sliced_data, False def clear_cache(): """清理所有缓存""" global _ttl_cache _fetch_full_data_cached.cache_clear() _ttl_cache.clear() def get_cache_info() -> Dict: """获取缓存统计信息""" info = _fetch_full_data_cached.cache_info() return { "lru_cache": { "hits": info.hits, "misses": info.misses, "maxsize": info.maxsize, "currsize": info.currsize, }, "ttl_cache_size": len(_ttl_cache), "ttl_seconds": CACHE_TTL_SECONDS, "default_start_date": DEFAULT_START_DATE, "cache_strategy": "full_data_by_code_and_today", } # ============================================================ # DataFrame 转换 # ============================================================ class JSONEncoder(json.JSONEncoder): """自定义 JSON 编码器,处理特殊类型""" def default(self, obj): # 处理 pandas Timestamp if hasattr(obj, 'isoformat'): return obj.isoformat() # 处理 numpy 类型 if hasattr(obj, 'item'): return obj.item() # 处理 NaN/Infinity if isinstance(obj, float): if obj != obj: # NaN return None if obj == float('inf'): return None if obj == float('-inf'): return None return super().default(obj) def dataframe_to_json(df: pd.DataFrame, asset_type: Optional[str] = None) -> Dict: """将 DataFrame 转换为 JSON 可序列化的字典 Args: df: DataFrame 数据 asset_type: 资产类型,用于决定日期格式精度 - crypto: 使用分钟级格式 '%Y-%m-%d %H:%M:%S' - 其他: 使用天级格式 '%Y-%m-%d' 如果 df.attrs 中有 info 字段,会放到最外层返回 """ if df is None or len(df) == 0: result = {"data": [], "count": 0} # 即使空数据也返回 info(如果有) if hasattr(df, 'attrs') and 'info' in df.attrs: result['info'] = df.attrs['info'] return result # 重置索引 df_reset = df.reset_index() # 处理日期列 - 根据资产类型决定格式精度 date_columns = ['date', 'Date', 'index', 'trade_date', 'datetime'] # 加密货币使用分钟级格式,其他使用天级格式 date_format = '%Y-%m-%d %H:%M:%S' if asset_type == 'crypto' else '%Y-%m-%d' for col in date_columns: if col in df_reset.columns: try: df_reset[col] = pd.to_datetime(df_reset[col]).dt.strftime(date_format) if col != 'date': df_reset = df_reset.rename(columns={col: 'date'}) break except Exception: pass # 处理特殊值(NaN, Infinity) df_clean = df_reset.copy() for col in df_clean.columns: if df_clean[col].dtype in ['float64', 'float32']: df_clean[col] = df_clean[col].replace([float('inf'), float('-inf')], None) df_clean[col] = df_clean[col].where(df_clean[col].notna(), None) # 转换为字典列表 records = df_clean.to_dict(orient='records') # 构建返回结果 result = { "data": records, "count": len(records), "columns": list(df_clean.columns), "date_range": { "start": df.index.min().strftime(date_format) if len(df) > 0 else None, "end": df.index.max().strftime(date_format) if len(df) > 0 else None, } } # 将 info 从 df.attrs 放到最外层 if hasattr(df, 'attrs') and 'info' in df.attrs: result['info'] = df.attrs['info'] return result def validate_date(date_str: str) -> bool: """验证日期格式""" try: datetime.strptime(date_str, '%Y-%m-%d') return True except ValueError: return False def get_default_dates() -> Tuple[str, str]: """获取默认日期范围(最近3个月)""" end = datetime.now() start = end - timedelta(days=90) return start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d') # ============================================================ # API 路由 # ============================================================ @app.route('/') def index(): """首页 - API 信息""" return jsonify({ "name": "Universal Data Fetcher API", "version": "2.0.0", "description": "统一数据获取服务(分层架构)", "architecture": "Unified entry + Asset-specific methods", "features": [ "分层架构(各资产类型独立实现)", "LRU + TTL 双缓存机制", "SSH隧道支持(港美股)", "ETF净值获取(计算溢价率)", ], "endpoints": { "info": "/", "health": "/health", "asset_type": "/api/v1/asset-type?code={code}", "ohlcv": "/api/v1/ohlcv?code={code}&start={YYYY-MM-DD}&end={YYYY-MM-DD}&asset_type={type}", "ohlcv_nocache": "/api/v1/ohlcv?code={code}&nocache=true", "ohlcv_crypto": "/api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe)", "ohlcv_asset_type": "/api/v1/ohlcv?code={code}&asset_type=china_index (强制覆盖类型)", "batch": "POST /api/v1/ohlcv/batch", "etf_nav": "/api/v1/etf/nav?code={code}", "cache_clear": "POST /api/v1/cache/clear", "cache_stats": "/api/v1/cache/stats", }, "crypto_timeframes": { "1d": "日线", "1h": "小时线", "4h": "4小时线", "15m": "15分钟线", "1m": "分钟线", }, "asset_types": { "china_index": "中国指数 (000300.SH, 399006.SZ等)", "china_etf": "中国ETF (159915.SZ, 513100.SH等)", "us_index": "美股指数 (NDX, SPX, N225等)", "hk_index": "港股指数 (HSI, HSTECH.HK等)", "futures": "期货 (AU.SHF, CU.SHF等)", "crypto": "加密货币 (BTC, ETH - 不缓存)", }, "supported_assets": { "china_index": ["000300.SH", "399006.SZ", "H30269.CSI"], "china_etf": ["159915.SZ", "513100.SH", "518880.SH"], "hk_index": ["HSI", "HSTECH.HK"], "us_index": ["NDX", "SPX", "N225", "GDAXI"], "futures": ["AU.SHF", "CU.SHF", "CL.NYM"], "crypto": ["BTC", "ETH"], }, "cache_config": get_cache_info(), "ssh_status": "enabled" if ssh_config and ssh_config.get('enabled') else "disabled", }) @app.route('/health') def health(): """健康检查""" return jsonify({ "status": "healthy", "timestamp": datetime.now().isoformat(), "ssh_configured": ssh_config is not None and ssh_config.get('enabled', False), }) @app.route('/api/v1/asset-type') def detect_asset_type(): """检测资产类型""" code = request.args.get('code', '').strip() if not code: return jsonify({ "error": "Missing required parameter: code", "example": "/api/v1/asset-type?code=000300.SH" }), 400 asset_type = AssetTypeDetector.detect(code) description = AssetTypeDetector.get_description(asset_type) return jsonify({ "code": code, "asset_type": asset_type.value, "description": description, }) @app.route('/api/v1/ohlcv') def get_ohlcv(): """ 获取单只标的的 OHLCV 数据 Query Parameters: code: 标的代码 (required) start: 开始日期 YYYY-MM-DD (optional, 默认90天前) end: 结束日期 YYYY-MM-DD (optional, 默认今天) asset_type: 资产类型 (optional, 强制覆盖自动检测结果) - china_index: 中国指数 - china_etf: 中国ETF - us_index: 美股指数 - hk_index: 港股指数 - futures: 期货 - crypto: 加密货币 注:指定后会覆盖自动检测,用于修复检测逻辑问题 timeframe: K线周期 (optional, 仅加密货币需要) - 1d: 日线(默认) - 1h: 小时线 - 4h: 4小时线 - 15m: 15分钟线 - 1m: 分钟线 nocache: 是否跳过缓存 (optional, 默认false) """ code = request.args.get('code', '').strip() start = request.args.get('start', '').strip() end = request.args.get('end', '').strip() asset_type_param = request.args.get('asset_type', '').strip().lower() timeframe = request.args.get('timeframe', '1d').strip().lower() nocache = request.args.get('nocache', 'false').lower() == 'true' # 参数验证 if not code: return jsonify({ "error": "Missing required parameter: code", "example": "/api/v1/ohlcv?code=000300.SH&start=2024-01-01&end=2024-03-31", "asset_type_hint": "可选 asset_type 参数强制指定类型", }), 400 # 设置默认日期 if not start or not end: start, end = get_default_dates() # 日期格式验证 if not validate_date(start) or not validate_date(end): return jsonify({ "error": "Invalid date format. Use YYYY-MM-DD", "start": start, "end": end, }), 400 # 自动检测资产类型 detected_type = AssetTypeDetector.detect(code) # 最终使用的类型:优先使用用户指定的类型 final_type = detected_type if asset_type_param: try: # 将字符串转换为 AssetType(强制覆盖自动检测结果) final_type = AssetType(asset_type_param) except ValueError: return jsonify({ "error": f"Invalid asset_type: {asset_type_param}", "valid_types": [t.value for t in AssetType], }), 400 # 加密货币必须指定 timeframe(无论自动检测还是手动指定) if final_type == AssetType.CRYPTO: valid_timeframes = ['1d', '1h', '4h', '15m', '1m', 'daily', 'hourly'] if timeframe not in valid_timeframes: return jsonify({ "error": f"Invalid timeframe for crypto: {timeframe}", "valid_timeframes": valid_timeframes, "hint": "加密货币必须指定 timeframe 参数", }), 400 # 使用缓存获取数据(加密货币不缓存) result, is_cached = fetch_data_with_ttl(code, start, end, nocache, timeframe) if result is None: return jsonify({ "code": code, "asset_type": final_type.value, "detected_type": detected_type.value if asset_type_param else None, # 仅当用户指定时显示 "error": "No data available", "start": start, "end": end, }), 404 if "error" in result: return jsonify({ "code": code, "asset_type": final_type.value, "detected_type": detected_type.value if asset_type_param else None, "error": result["error"], }), 500 result['cached'] = is_cached result['asset_type'] = final_type.value # 使用最终类型 # 如果是中国 ETF,自动附加净值和溢价率数据 if final_type == AssetType.CHINA_ETF: try: f = get_fetcher() with f: price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end) # 添加净值数据 if nav_df is not None and len(nav_df) > 0: result['nav'] = dataframe_to_json(nav_df) # 添加溢价率序列 if premium_series is not None and len(premium_series) > 0: premium_data = [ {"date": date.strftime('%Y-%m-%d'), "premium": round(premium, 6)} for date, premium in premium_series.items() ] result['premium_series'] = premium_data # 最新溢价率 latest_premium = premium_series.iloc[-1] latest_date = premium_series.index[-1].strftime('%Y-%m-%d') result['latest_premium'] = round(latest_premium, 6) result['premium_date'] = latest_date # 溢价率统计 result['premium_stats'] = { "mean": round(premium_series.mean(), 6), "std": round(premium_series.std(), 6), "min": round(premium_series.min(), 6), "max": round(premium_series.max(), 6), "median": round(premium_series.median(), 6), } except Exception as e: # 净值获取失败不影响主数据返回 result['nav_error'] = str(e) # 如果用户指定了类型但与自动检测不同,显示提示 if asset_type_param and detected_type != final_type: result['type_override'] = { "detected": detected_type.value, "specified": final_type.value, "hint": "用户强制覆盖了自动检测结果", } return jsonify(result) @app.route('/api/v1/ohlcv/batch', methods=['POST']) def batch_ohlcv(): """批量获取多只标的的 OHLCV 数据""" data = request.get_json() if not data: return jsonify({ "error": "Missing request body", "example": { "codes": ["000300.SH", "NDX"], "start": "2024-01-01", "end": "2024-03-31", } }), 400 codes = data.get('codes', []) start = data.get('start', '').strip() end = data.get('end', '').strip() if not codes or not isinstance(codes, list): return jsonify({ "error": "Missing or invalid parameter: codes (must be a list)" }), 400 if not start or not end: start, end = get_default_dates() if not validate_date(start) or not validate_date(end): return jsonify({ "error": "Invalid date format. Use YYYY-MM-DD", }), 400 # 获取数据 f = get_fetcher() results = {} success_count = 0 failed_count = 0 try: with f: for code in codes: result, _ = fetch_data_with_ttl(code, start, end) if result is not None and "error" not in result: results[code] = result success_count += 1 else: results[code] = { "code": code, "asset_type": AssetTypeDetector.detect(code).value, "error": result.get("error", "No data") if result else "No data", "data": [], "count": 0, } failed_count += 1 except Exception as e: return jsonify({"error": f"Batch fetch failed: {str(e)}"}), 500 return jsonify({ "results": results, "success_count": success_count, "failed_count": failed_count, "total": len(codes), "start": start, "end": end, }) @app.route('/api/v1/etf/nav') def get_etf_nav(): """获取ETF净值数据(用于计算溢价率)""" code = request.args.get('code', '').strip() start = request.args.get('start', '').strip() end = request.args.get('end', '').strip() if not code: return jsonify({ "error": "Missing required parameter: code", "example": "/api/v1/etf/nav?code=513100.SH" }), 400 if not start or not end: start, end = get_default_dates() # 检查是否为ETF asset_type = AssetTypeDetector.detect(code) if asset_type != AssetType.CHINA_ETF: return jsonify({ "error": f"Not an ETF: {code} (type: {asset_type.value})", "hint": "Only A股ETF (codes starting with 51/52/15/16) supported", }), 400 # 获取净值和溢价率 f = get_fetcher() try: with f: price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end) result = { "code": code, "price": dataframe_to_json(price_df) if price_df else {"data": [], "count": 0}, "nav": dataframe_to_json(nav_df) if nav_df else {"data": [], "count": 0}, } # 添加历史溢价率序列 if premium_series is not None and len(premium_series) > 0: # 转换为日期-溢价率列表 premium_data = [ {"date": date.strftime('%Y-%m-%d'), "premium": round(premium, 6)} for date, premium in premium_series.items() ] result['premium_series'] = premium_data # 最新溢价率 latest_premium = premium_series.iloc[-1] latest_date = premium_series.index[-1].strftime('%Y-%m-%d') result['latest_premium'] = round(latest_premium, 6) result['premium_date'] = latest_date # 溢价率统计 result['premium_stats'] = { "mean": round(premium_series.mean(), 6), "std": round(premium_series.std(), 6), "min": round(premium_series.min(), 6), "max": round(premium_series.max(), 6), "median": round(premium_series.median(), 6), } return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/v1/cache/clear', methods=['POST']) def clear_cache_endpoint(): """清理缓存""" info_before = get_cache_info() clear_cache() return jsonify({ "message": "Cache cleared successfully", "before": info_before, "after": get_cache_info() }) @app.route('/api/v1/cache/stats') def cache_stats(): """获取缓存统计""" return jsonify(get_cache_info()) # ============================================================ # 错误处理 # ============================================================ @app.errorhandler(404) def not_found(error): return jsonify({ "error": "Endpoint not found", "available_endpoints": [ "/", "/health", "/api/v1/asset-type", "/api/v1/ohlcv", "/api/v1/ohlcv/batch", "/api/v1/etf/nav", "/api/v1/cache/clear", "/api/v1/cache/stats", ] }), 404 @app.errorhandler(500) def internal_error(error): return jsonify({ "error": "Internal server error", "message": str(error) }), 500 # ============================================================ # 启动服务 # ============================================================ if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Universal Data Fetcher API Server') parser.add_argument('--host', default='0.0.0.0', help='Host to bind') parser.add_argument('--port', type=int, default=80, help='Port to bind') parser.add_argument('--debug', action='store_true', help='Enable debug mode') args = parser.parse_args() # 预加载 SSH 配置 ssh_config = get_ssh_config() if ssh_config and ssh_config.get('enabled'): print(f"✓ SSH 隧道已配置: {ssh_config['host']}:{ssh_config['port']}") else: print("✗ SSH 隧道未启用(仅支持A股数据)") print(f"\n🚀 Universal Data Fetcher API Server v2.0") print(f" Host: {args.host}") print(f" Port: {args.port}") print(f" Cache: LRU({CACHE_MAXSIZE}) + TTL({CACHE_TTL_SECONDS}s)") print(f"\n📖 API: http://{args.host}:{args.port}/") print(f" 健康检查: http://{args.host}:{args.port}/health") app.run(host=args.host, port=args.port, debug=args.debug)