""" 统一数据获取接口使用示例 ======================== 展示如何使用 UniversalDataFetcher 获取各种资产的K线数据 """ import sys from pathlib import Path # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from dotenv import load_dotenv load_dotenv() from core.datasource.universal_fetcher import ( UniversalDataFetcher, detect_asset_type, fetch_kline ) import pandas as pd # ============================================================ # 示例1: 快速上手 - 获取单只标的 # ============================================================ def example_basic(): """基础用法:获取单只标的的K线数据""" print("\n" + "="*60) print("示例1: 基础用法") print("="*60) # 获取A股指数 df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31") if df is not None: print(f"\n沪深300指数数据:") print(f" 数据量: {len(df)} 条") print(f" 日期范围: {df.index.min()} ~ {df.index.max()}") print(f" 列: {list(df.columns)}") print(f"\n最新5条数据:") print(df.tail()) # ============================================================ # 示例2: 检测资产类型 # ============================================================ def example_detect_type(): """自动检测资产类型""" print("\n" + "="*60) print("示例2: 资产类型检测") print("="*60) codes = [ "000300.SH", # A股指数 "510300.SH", # A股ETF "600000.SH", # A股股票 "NDX", # 美股指数 "AAPL", # 美股股票 "HSI", # 港股指数 "AU.SHF", # 期货 "BTC", # 加密货币 ] print("\n资产类型检测结果:") for code in codes: asset_type = detect_asset_type(code) print(f" {code:15s} -> {asset_type}") # ============================================================ # 示例3: 批量获取多只标的 # ============================================================ def example_batch_fetch(): """批量获取多只标的""" print("\n" + "="*60) print("示例3: 批量获取") print("="*60) # 定义要获取的标的列表 codes = [ "000300.SH", # 沪深300 "399006.SZ", # 创业板指 "510300.SH", # 沪深300ETF "NDX", # 纳斯达克100 "HSTECH.HK", # 恒生科技 ] # 使用上下文管理器(推荐) fetcher = UniversalDataFetcher() with fetcher: results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31") # 处理结果 print("\n获取结果汇总:") for code, df in results.items(): if df is not None: print(f" ✓ {code:15s} {len(df):4d} 条, " f"收盘价: {df['close'].iloc[-1]:.3f}") else: print(f" ✗ {code:15s} 无数据") # ============================================================ # 示例4: 跨市场组合分析 # ============================================================ def example_cross_market(): """跨市场数据分析示例""" print("\n" + "="*60) print("示例4: 跨市场组合分析") print("="*60) # 定义全球资产组合 portfolio = { "000300.SH": "沪深300", "NDX": "纳斯达克100", "HSI": "恒生指数", "N225": "日经225", "GDAXI": "德国DAX", } # 获取数据 fetcher = UniversalDataFetcher() with fetcher: results = fetcher.fetch_multiple( list(portfolio.keys()), "2024-01-01", "2024-12-31" ) # 合并收盘价 close_prices = pd.DataFrame() for code, name in portfolio.items(): if results[code] is not None: close_prices[name] = results[code]['close'] # 计算收益率 if len(close_prices) > 0: returns = close_prices.pct_change().dropna() print("\n各市场收益率统计:") print((returns.mean() * 252 * 100).round(2).to_string()) print("\n相关系数矩阵:") print(returns.corr().round(3).to_string()) # ============================================================ # 示例5: 结合技术指标计算 # ============================================================ def example_with_indicators(): """结合技术指标计算""" print("\n" + "="*60) print("示例5: 技术指标计算") print("="*60) # 获取数据 df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30") if df is None: print("数据获取失败") return # 计算移动平均线 df['MA5'] = df['close'].rolling(window=5).mean() df['MA20'] = df['close'].rolling(window=20).mean() df['MA60'] = df['close'].rolling(window=60).mean() # 计算RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI'] = 100 - (100 / (1 + rs)) print("\n技术指标计算结果(最新10条):") print(df[['close', 'MA5', 'MA20', 'MA60', 'RSI']].tail(10).round(2)) # ============================================================ # 示例6: 错误处理与重试 # ============================================================ def example_error_handling(): """错误处理示例""" print("\n" + "="*60) print("示例6: 错误处理") print("="*60) # 测试无效代码 invalid_codes = ["INVALID", "999999.SH", ""] for code in invalid_codes: print(f"\n尝试获取: {code}") df = fetch_kline(code, "2024-01-01", "2024-01-31") if df is None: print(f" ✓ 正确返回 None") else: print(f" ✗ 意外获取到数据") # 测试重试机制 print("\n测试重试机制(网络错误):") fetcher = UniversalDataFetcher() df = fetcher.fetch("NDX", "2024-01-01", "2024-01-31", retry=3) if df is not None: print(f" ✓ 重试成功: {len(df)} 条") else: print(f" ✗ 重试后仍失败") # ============================================================ # 示例7: 与现有轮动策略集成 # ============================================================ def example_integration_with_rotation(): """与现有轮动策略集成示例""" print("\n" + "="*60) print("示例7: 与轮动策略集成") print("="*60) # 模拟轮动策略的代码配置 code_config = { "399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"}, "NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"}, "HSI": {"name": "恒生指数", "etf": "159920.SZ", "market": "HK"}, } # 使用 UniversalDataFetcher 获取数据 all_codes = list(code_config.keys()) etf_codes = [cfg['etf'] for cfg in code_config.values() if cfg.get('etf')] fetcher = UniversalDataFetcher() with fetcher: # 获取指数数据(用于因子计算) index_data = fetcher.fetch_multiple( all_codes, "2024-01-01", "2024-03-31" ) # 获取ETF数据(用于收益计算) etf_data = fetcher.fetch_multiple( etf_codes, "2024-01-01", "2024-03-31" ) # 验证数据完整性 print("\n数据完整性检查:") for code in all_codes: idx_df = index_data.get(code) etf_code = code_config[code].get('etf') etf_df = etf_data.get(etf_code) if etf_code else None print(f" {code}:") print(f" 指数数据: {'✓' if idx_df is not None else '✗'} " f"({len(idx_df) if idx_df is not None else 0} 条)") if etf_code: print(f" ETF数据 ({etf_code}): {'✓' if etf_df is not None else '✗'} " f"({len(etf_df) if etf_df is not None else 0} 条)") # ============================================================ # 运行所有示例 # ============================================================ if __name__ == "__main__": print("\n" + "="*60) print("统一数据获取接口 - 使用示例") print("="*60) # 选择要运行的示例 examples = [ ("基础用法", example_basic), ("资产类型检测", example_detect_type), ("批量获取", example_batch_fetch), ("跨市场组合分析", example_cross_market), ("技术指标计算", example_with_indicators), ("错误处理", example_error_handling), ("与轮动策略集成", example_integration_with_rotation), ] print("\n可用示例:") for i, (name, _) in enumerate(examples, 1): print(f" {i}. {name}") # 运行特定示例或全部 run_all = True # 改为 False 可以选择性运行 if run_all: for name, func in examples: try: func() except Exception as e: print(f"\n示例 '{name}' 运行失败: {e}") import traceback traceback.print_exc() else: # 运行单个示例 example_index = 0 # 修改为 1-7 运行特定示例 if 0 <= example_index < len(examples): examples[example_index][1]() print("\n" + "="*60) print("示例运行完成") print("="*60)