diff --git a/examples/flask_api_client.py b/examples/flask_api_client.py new file mode 100644 index 0000000..1dc2ce2 --- /dev/null +++ b/examples/flask_api_client.py @@ -0,0 +1,298 @@ +""" +Flask API 客户端示例 +==================== +演示如何调用 Universal Data Fetcher API +""" + +import requests +import json +from typing import Dict, List, Optional + + +class DataFetcherClient: + """ + Universal Data Fetcher API 客户端 + + 使用示例: + client = DataFetcherClient("http://localhost:5000") + + # 获取单只标的 + data = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31") + + # 批量获取 + results = client.batch_ohlcv( + ["000300.SH", "NDX", "HSI"], + "2024-01-01", + "2024-03-31" + ) + """ + + def __init__(self, base_url: str = "http://localhost:5000"): + self.base_url = base_url.rstrip('/') + + def health_check(self) -> Dict: + """健康检查""" + response = requests.get(f"{self.base_url}/health") + return response.json() + + def detect_asset_type(self, code: str) -> Dict: + """检测资产类型""" + response = requests.get( + f"{self.base_url}/api/v1/asset-type", + params={"code": code} + ) + return response.json() + + def get_ohlcv( + self, + code: str, + start: Optional[str] = None, + end: Optional[str] = None, + retry: int = 3 + ) -> Dict: + """ + 获取单只标的的 OHLCV 数据 + + Args: + code: 标的代码 + start: 开始日期 (YYYY-MM-DD) + end: 结束日期 (YYYY-MM-DD) + retry: 重试次数 + + Returns: + { + "code": "000300.SH", + "asset_type": "china_index", + "data": [...], + "count": 58, + "date_range": {...} + } + """ + params = {"code": code, "retry": retry} + if start: + params["start"] = start + if end: + params["end"] = end + + response = requests.get( + f"{self.base_url}/api/v1/ohlcv", + params=params + ) + return response.json() + + def batch_ohlcv( + self, + codes: List[str], + start: Optional[str] = None, + end: Optional[str] = None, + retry: int = 3 + ) -> Dict: + """ + 批量获取多只标的的 OHLCV 数据 + + Args: + codes: 标的代码列表 + start: 开始日期 + end: 结束日期 + retry: 重试次数 + + Returns: + { + "results": {...}, + "success_count": 2, + "failed_count": 1, + "total": 3 + } + """ + payload = { + "codes": codes, + "retry": retry + } + if start: + payload["start"] = start + if end: + payload["end"] = end + + response = requests.post( + f"{self.base_url}/api/v1/ohlcv/batch", + json=payload + ) + return response.json() + + def get_supported_codes(self) -> Dict: + """获取支持的代码示例""" + response = requests.get(f"{self.base_url}/api/v1/supported-codes") + return response.json() + + +# ============================================================ +# 使用示例 +# ============================================================ + +def example_basic(): + """基础使用示例""" + print("\n" + "="*60) + print("示例1: 基础使用") + print("="*60) + + client = DataFetcherClient("http://localhost:5000") + + # 健康检查 + print("\n1. 健康检查:") + health = client.health_check() + print(f" 状态: {health.get('status')}") + print(f" SSH配置: {health.get('ssh_configured')}") + + # 检测资产类型 + print("\n2. 检测资产类型:") + for code in ["000300.SH", "NDX", "BTC", "AAPL"]: + result = client.detect_asset_type(code) + print(f" {code:15s} -> {result.get('asset_type'):15s} ({result.get('description')})") + + +def example_single_fetch(): + """获取单只标的""" + print("\n" + "="*60) + print("示例2: 获取单只标的") + print("="*60) + + client = DataFetcherClient("http://localhost:5000") + + # 获取A股指数 + print("\n1. 沪深300指数 (000300.SH):") + result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31") + + if "error" in result: + print(f" ✗ 错误: {result['error']}") + else: + print(f" ✓ 获取成功: {result['count']} 条") + print(f" 资产类型: {result['asset_type']}") + print(f" 日期范围: {result['date_range']['start']} ~ {result['date_range']['end']}") + if result['data']: + latest = result['data'][-1] + print(f" 最新数据: {latest['date']} 收盘 {latest['close']}") + + +def example_batch_fetch(): + """批量获取""" + print("\n" + "="*60) + print("示例3: 批量获取") + print("="*60) + + client = DataFetcherClient("http://localhost:5000") + + codes = [ + "000300.SH", # A股指数 + "510300.SH", # A股ETF + "NDX", # 美股指数 + "HSI", # 港股指数 + ] + + print(f"\n批量获取 {len(codes)} 只标的...") + result = client.batch_ohlcv(codes, "2024-01-01", "2024-03-31") + + print(f"\n结果统计:") + print(f" 成功: {result['success_count']}/{result['total']}") + print(f" 失败: {result['failed_count']}/{result['total']}") + + print(f"\n详细结果:") + for code, data in result['results'].items(): + if 'error' in data: + print(f" ✗ {code:15s} 错误: {data['error']}") + else: + print(f" ✓ {code:15s} {data['count']:4d} 条 " + f"({data['date_range']['start']} ~ {data['date_range']['end']})") + + +def example_cli(): + """命令行接口示例""" + print("\n" + "="*60) + print("示例4: 命令行调用") + print("="*60) + + import subprocess + + # 使用 curl 调用 API + commands = [ + ("健康检查", "curl -s http://localhost:5000/health | python -m json.tool"), + ("检测资产类型", "curl -s 'http://localhost:5000/api/v1/asset-type?code=000300.SH' | python -m json.tool"), + ("获取K线数据", "curl -s 'http://localhost:5000/api/v1/ohlcv?code=000300.SH&start=2024-01-01&end=2024-01-31' | python -m json.tool"), + ] + + for name, cmd in commands: + print(f"\n{name}:") + print(f" 命令: {cmd}") + print(" 输出:") + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10) + if result.returncode == 0: + print(result.stdout[:500] + "..." if len(result.stdout) > 500 else result.stdout) + else: + print(f" 错误: {result.stderr}") + except Exception as e: + print(f" 执行失败: {e}") + + +def example_dataframe(): + """转换为 DataFrame""" + print("\n" + "="*60) + print("示例5: 转换为 DataFrame") + print("="*60) + + import pandas as pd + + client = DataFetcherClient("http://localhost:5000") + + # 获取数据 + result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-01-31") + + if "error" not in result and result['data']: + # 转换为 DataFrame + df = pd.DataFrame(result['data']) + df['date'] = pd.to_datetime(df['date']) + df = df.set_index('date') + + print(f"\n转换为 DataFrame:") + print(f" 形状: {df.shape}") + print(f" 列: {list(df.columns)}") + print(f"\n数据预览:") + print(df.head()) + + # 计算收益率 + df['return'] = df['close'].pct_change() + print(f"\n收益率统计:") + print(df['return'].describe()) + + +# ============================================================ +# 运行示例 +# ============================================================ + +if __name__ == "__main__": + print("\n" + "="*60) + print("Universal Data Fetcher API 客户端示例") + print("="*60) + print("\n确保 API 服务器已启动:") + print(" python core/datasource/flask_server.py") + print("\n") + + # 运行示例 + examples = [ + ("基础使用", example_basic), + ("单只标的", example_single_fetch), + ("批量获取", example_batch_fetch), + ("命令行调用", example_cli), + ("DataFrame转换", example_dataframe), + ] + + for name, func in examples: + try: + func() + except Exception as e: + print(f"\n示例 '{name}' 失败: {e}") + import traceback + traceback.print_exc() + + print("\n" + "="*60) + print("示例完成") + print("="*60) diff --git a/examples/universal_fetcher_examples.py b/examples/universal_fetcher_examples.py new file mode 100644 index 0000000..a8127e8 --- /dev/null +++ b/examples/universal_fetcher_examples.py @@ -0,0 +1,299 @@ +""" +统一数据获取接口使用示例 +======================== +展示如何使用 UniversalDataFetcher 获取各种资产的K线数据 +""" + +import sys +from pathlib import Path + +# 添加项目根目录到路径 +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from dotenv import load_dotenv +load_dotenv() + +from core.datasource.universal_fetcher import ( + UniversalDataFetcher, + detect_asset_type, + fetch_kline +) +import pandas as pd + + +# ============================================================ +# 示例1: 快速上手 - 获取单只标的 +# ============================================================ +def example_basic(): + """基础用法:获取单只标的的K线数据""" + print("\n" + "="*60) + print("示例1: 基础用法") + print("="*60) + + # 获取A股指数 + df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31") + + if df is not None: + print(f"\n沪深300指数数据:") + print(f" 数据量: {len(df)} 条") + print(f" 日期范围: {df.index.min()} ~ {df.index.max()}") + print(f" 列: {list(df.columns)}") + print(f"\n最新5条数据:") + print(df.tail()) + + +# ============================================================ +# 示例2: 检测资产类型 +# ============================================================ +def example_detect_type(): + """自动检测资产类型""" + print("\n" + "="*60) + print("示例2: 资产类型检测") + print("="*60) + + codes = [ + "000300.SH", # A股指数 + "510300.SH", # A股ETF + "600000.SH", # A股股票 + "NDX", # 美股指数 + "AAPL", # 美股股票 + "HSI", # 港股指数 + "AU.SHF", # 期货 + "BTC", # 加密货币 + ] + + print("\n资产类型检测结果:") + for code in codes: + asset_type = detect_asset_type(code) + print(f" {code:15s} -> {asset_type}") + + +# ============================================================ +# 示例3: 批量获取多只标的 +# ============================================================ +def example_batch_fetch(): + """批量获取多只标的""" + print("\n" + "="*60) + print("示例3: 批量获取") + print("="*60) + + # 定义要获取的标的列表 + codes = [ + "000300.SH", # 沪深300 + "399006.SZ", # 创业板指 + "510300.SH", # 沪深300ETF + "NDX", # 纳斯达克100 + "HSTECH.HK", # 恒生科技 + ] + + # 使用上下文管理器(推荐) + fetcher = UniversalDataFetcher() + with fetcher: + results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31") + + # 处理结果 + print("\n获取结果汇总:") + for code, df in results.items(): + if df is not None: + print(f" ✓ {code:15s} {len(df):4d} 条, " + f"收盘价: {df['close'].iloc[-1]:.3f}") + else: + print(f" ✗ {code:15s} 无数据") + + +# ============================================================ +# 示例4: 跨市场组合分析 +# ============================================================ +def example_cross_market(): + """跨市场数据分析示例""" + print("\n" + "="*60) + print("示例4: 跨市场组合分析") + print("="*60) + + # 定义全球资产组合 + portfolio = { + "000300.SH": "沪深300", + "NDX": "纳斯达克100", + "HSI": "恒生指数", + "N225": "日经225", + "GDAXI": "德国DAX", + } + + # 获取数据 + fetcher = UniversalDataFetcher() + with fetcher: + results = fetcher.fetch_multiple( + list(portfolio.keys()), + "2024-01-01", + "2024-12-31" + ) + + # 合并收盘价 + close_prices = pd.DataFrame() + for code, name in portfolio.items(): + if results[code] is not None: + close_prices[name] = results[code]['close'] + + # 计算收益率 + if len(close_prices) > 0: + returns = close_prices.pct_change().dropna() + + print("\n各市场收益率统计:") + print((returns.mean() * 252 * 100).round(2).to_string()) + + print("\n相关系数矩阵:") + print(returns.corr().round(3).to_string()) + + +# ============================================================ +# 示例5: 结合技术指标计算 +# ============================================================ +def example_with_indicators(): + """结合技术指标计算""" + print("\n" + "="*60) + print("示例5: 技术指标计算") + print("="*60) + + # 获取数据 + df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30") + + if df is None: + print("数据获取失败") + return + + # 计算移动平均线 + df['MA5'] = df['close'].rolling(window=5).mean() + df['MA20'] = df['close'].rolling(window=20).mean() + df['MA60'] = df['close'].rolling(window=60).mean() + + # 计算RSI + delta = df['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() + rs = gain / loss + df['RSI'] = 100 - (100 / (1 + rs)) + + print("\n技术指标计算结果(最新10条):") + print(df[['close', 'MA5', 'MA20', 'MA60', 'RSI']].tail(10).round(2)) + + +# ============================================================ +# 示例6: 错误处理与重试 +# ============================================================ +def example_error_handling(): + """错误处理示例""" + print("\n" + "="*60) + print("示例6: 错误处理") + print("="*60) + + # 测试无效代码 + invalid_codes = ["INVALID", "999999.SH", ""] + + for code in invalid_codes: + print(f"\n尝试获取: {code}") + df = fetch_kline(code, "2024-01-01", "2024-01-31") + if df is None: + print(f" ✓ 正确返回 None") + else: + print(f" ✗ 意外获取到数据") + + # 测试重试机制 + print("\n测试重试机制(网络错误):") + fetcher = UniversalDataFetcher() + df = fetcher.fetch("NDX", "2024-01-01", "2024-01-31", retry=3) + if df is not None: + print(f" ✓ 重试成功: {len(df)} 条") + else: + print(f" ✗ 重试后仍失败") + + +# ============================================================ +# 示例7: 与现有轮动策略集成 +# ============================================================ +def example_integration_with_rotation(): + """与现有轮动策略集成示例""" + print("\n" + "="*60) + print("示例7: 与轮动策略集成") + print("="*60) + + # 模拟轮动策略的代码配置 + code_config = { + "399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"}, + "NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"}, + "HSI": {"name": "恒生指数", "etf": "159920.SZ", "market": "HK"}, + } + + # 使用 UniversalDataFetcher 获取数据 + all_codes = list(code_config.keys()) + etf_codes = [cfg['etf'] for cfg in code_config.values() if cfg.get('etf')] + + fetcher = UniversalDataFetcher() + with fetcher: + # 获取指数数据(用于因子计算) + index_data = fetcher.fetch_multiple( + all_codes, "2024-01-01", "2024-03-31" + ) + + # 获取ETF数据(用于收益计算) + etf_data = fetcher.fetch_multiple( + etf_codes, "2024-01-01", "2024-03-31" + ) + + # 验证数据完整性 + print("\n数据完整性检查:") + for code in all_codes: + idx_df = index_data.get(code) + etf_code = code_config[code].get('etf') + etf_df = etf_data.get(etf_code) if etf_code else None + + print(f" {code}:") + print(f" 指数数据: {'✓' if idx_df is not None else '✗'} " + f"({len(idx_df) if idx_df is not None else 0} 条)") + if etf_code: + print(f" ETF数据 ({etf_code}): {'✓' if etf_df is not None else '✗'} " + f"({len(etf_df) if etf_df is not None else 0} 条)") + + +# ============================================================ +# 运行所有示例 +# ============================================================ +if __name__ == "__main__": + print("\n" + "="*60) + print("统一数据获取接口 - 使用示例") + print("="*60) + + # 选择要运行的示例 + examples = [ + ("基础用法", example_basic), + ("资产类型检测", example_detect_type), + ("批量获取", example_batch_fetch), + ("跨市场组合分析", example_cross_market), + ("技术指标计算", example_with_indicators), + ("错误处理", example_error_handling), + ("与轮动策略集成", example_integration_with_rotation), + ] + + print("\n可用示例:") + for i, (name, _) in enumerate(examples, 1): + print(f" {i}. {name}") + + # 运行特定示例或全部 + run_all = True # 改为 False 可以选择性运行 + + if run_all: + for name, func in examples: + try: + func() + except Exception as e: + print(f"\n示例 '{name}' 运行失败: {e}") + import traceback + traceback.print_exc() + else: + # 运行单个示例 + example_index = 0 # 修改为 1-7 运行特定示例 + if 0 <= example_index < len(examples): + examples[example_index][1]() + + print("\n" + "="*60) + print("示例运行完成") + print("="*60)