""" Flask API 客户端示例 ==================== 演示如何调用 Universal Data Fetcher API """ import requests import json from typing import Dict, List, Optional class DataFetcherClient: """ Universal Data Fetcher API 客户端 使用示例: client = DataFetcherClient("http://localhost:5000") # 获取单只标的 data = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31") # 批量获取 results = client.batch_ohlcv( ["000300.SH", "NDX", "HSI"], "2024-01-01", "2024-03-31" ) """ def __init__(self, base_url: str = "http://localhost:5000"): self.base_url = base_url.rstrip('/') def health_check(self) -> Dict: """健康检查""" response = requests.get(f"{self.base_url}/health") return response.json() def detect_asset_type(self, code: str) -> Dict: """检测资产类型""" response = requests.get( f"{self.base_url}/api/v1/asset-type", params={"code": code} ) return response.json() def get_ohlcv( self, code: str, start: Optional[str] = None, end: Optional[str] = None, retry: int = 3 ) -> Dict: """ 获取单只标的的 OHLCV 数据 Args: code: 标的代码 start: 开始日期 (YYYY-MM-DD) end: 结束日期 (YYYY-MM-DD) retry: 重试次数 Returns: { "code": "000300.SH", "asset_type": "china_index", "data": [...], "count": 58, "date_range": {...} } """ params = {"code": code, "retry": retry} if start: params["start"] = start if end: params["end"] = end response = requests.get( f"{self.base_url}/api/v1/ohlcv", params=params ) return response.json() def batch_ohlcv( self, codes: List[str], start: Optional[str] = None, end: Optional[str] = None, retry: int = 3 ) -> Dict: """ 批量获取多只标的的 OHLCV 数据 Args: codes: 标的代码列表 start: 开始日期 end: 结束日期 retry: 重试次数 Returns: { "results": {...}, "success_count": 2, "failed_count": 1, "total": 3 } """ payload = { "codes": codes, "retry": retry } if start: payload["start"] = start if end: payload["end"] = end response = requests.post( f"{self.base_url}/api/v1/ohlcv/batch", json=payload ) return response.json() def get_supported_codes(self) -> Dict: """获取支持的代码示例""" response = requests.get(f"{self.base_url}/api/v1/supported-codes") return response.json() # ============================================================ # 使用示例 # ============================================================ def example_basic(): """基础使用示例""" print("\n" + "="*60) print("示例1: 基础使用") print("="*60) client = DataFetcherClient("http://localhost:5000") # 健康检查 print("\n1. 健康检查:") health = client.health_check() print(f" 状态: {health.get('status')}") print(f" SSH配置: {health.get('ssh_configured')}") # 检测资产类型 print("\n2. 检测资产类型:") for code in ["000300.SH", "NDX", "BTC", "AAPL"]: result = client.detect_asset_type(code) print(f" {code:15s} -> {result.get('asset_type'):15s} ({result.get('description')})") def example_single_fetch(): """获取单只标的""" print("\n" + "="*60) print("示例2: 获取单只标的") print("="*60) client = DataFetcherClient("http://localhost:5000") # 获取A股指数 print("\n1. 沪深300指数 (000300.SH):") result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31") if "error" in result: print(f" ✗ 错误: {result['error']}") else: print(f" ✓ 获取成功: {result['count']} 条") print(f" 资产类型: {result['asset_type']}") print(f" 日期范围: {result['date_range']['start']} ~ {result['date_range']['end']}") if result['data']: latest = result['data'][-1] print(f" 最新数据: {latest['date']} 收盘 {latest['close']}") def example_batch_fetch(): """批量获取""" print("\n" + "="*60) print("示例3: 批量获取") print("="*60) client = DataFetcherClient("http://localhost:5000") codes = [ "000300.SH", # A股指数 "510300.SH", # A股ETF "NDX", # 美股指数 "HSI", # 港股指数 ] print(f"\n批量获取 {len(codes)} 只标的...") result = client.batch_ohlcv(codes, "2024-01-01", "2024-03-31") print(f"\n结果统计:") print(f" 成功: {result['success_count']}/{result['total']}") print(f" 失败: {result['failed_count']}/{result['total']}") print(f"\n详细结果:") for code, data in result['results'].items(): if 'error' in data: print(f" ✗ {code:15s} 错误: {data['error']}") else: print(f" ✓ {code:15s} {data['count']:4d} 条 " f"({data['date_range']['start']} ~ {data['date_range']['end']})") def example_cli(): """命令行接口示例""" print("\n" + "="*60) print("示例4: 命令行调用") print("="*60) import subprocess # 使用 curl 调用 API commands = [ ("健康检查", "curl -s http://localhost:5000/health | python -m json.tool"), ("检测资产类型", "curl -s 'http://localhost:5000/api/v1/asset-type?code=000300.SH' | python -m json.tool"), ("获取K线数据", "curl -s 'http://localhost:5000/api/v1/ohlcv?code=000300.SH&start=2024-01-01&end=2024-01-31' | python -m json.tool"), ] for name, cmd in commands: print(f"\n{name}:") print(f" 命令: {cmd}") print(" 输出:") try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10) if result.returncode == 0: print(result.stdout[:500] + "..." if len(result.stdout) > 500 else result.stdout) else: print(f" 错误: {result.stderr}") except Exception as e: print(f" 执行失败: {e}") def example_dataframe(): """转换为 DataFrame""" print("\n" + "="*60) print("示例5: 转换为 DataFrame") print("="*60) import pandas as pd client = DataFetcherClient("http://localhost:5000") # 获取数据 result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-01-31") if "error" not in result and result['data']: # 转换为 DataFrame df = pd.DataFrame(result['data']) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date') print(f"\n转换为 DataFrame:") print(f" 形状: {df.shape}") print(f" 列: {list(df.columns)}") print(f"\n数据预览:") print(df.head()) # 计算收益率 df['return'] = df['close'].pct_change() print(f"\n收益率统计:") print(df['return'].describe()) # ============================================================ # 运行示例 # ============================================================ if __name__ == "__main__": print("\n" + "="*60) print("Universal Data Fetcher API 客户端示例") print("="*60) print("\n确保 API 服务器已启动:") print(" python core/datasource/flask_server.py") print("\n") # 运行示例 examples = [ ("基础使用", example_basic), ("单只标的", example_single_fetch), ("批量获取", example_batch_fetch), ("命令行调用", example_cli), ("DataFrame转换", example_dataframe), ] for name, func in examples: try: func() except Exception as e: print(f"\n示例 '{name}' 失败: {e}") import traceback traceback.print_exc() print("\n" + "="*60) print("示例完成") print("="*60)