docs(examples): 添加 Flask API 客户端示例

- 新增 flask_api_client.py:DataFetcherClient 类封装
- 提供健康检查、资产类型检测、单只/批量数据获取方法
- 包含命令行调用示例和 DataFrame 转换示例
- 新增 universal_fetcher_examples.py:7个完整使用示例
- 涵盖基础用法、资产检测、批量获取、跨市场分析、技术指标
This commit is contained in:
2026-05-07 21:19:52 +08:00
parent fbaa3f9d73
commit cc2f7cb6c8
2 changed files with 597 additions and 0 deletions

View File

@@ -0,0 +1,299 @@
"""
统一数据获取接口使用示例
========================
展示如何使用 UniversalDataFetcher 获取各种资产的K线数据
"""
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
from core.datasource.universal_fetcher import (
UniversalDataFetcher,
detect_asset_type,
fetch_kline
)
import pandas as pd
# ============================================================
# 示例1: 快速上手 - 获取单只标的
# ============================================================
def example_basic():
"""基础用法获取单只标的的K线数据"""
print("\n" + "="*60)
print("示例1: 基础用法")
print("="*60)
# 获取A股指数
df = fetch_kline("000300.SH", "2024-01-01", "2024-03-31")
if df is not None:
print(f"\n沪深300指数数据:")
print(f" 数据量: {len(df)}")
print(f" 日期范围: {df.index.min()} ~ {df.index.max()}")
print(f" 列: {list(df.columns)}")
print(f"\n最新5条数据:")
print(df.tail())
# ============================================================
# 示例2: 检测资产类型
# ============================================================
def example_detect_type():
"""自动检测资产类型"""
print("\n" + "="*60)
print("示例2: 资产类型检测")
print("="*60)
codes = [
"000300.SH", # A股指数
"510300.SH", # A股ETF
"600000.SH", # A股股票
"NDX", # 美股指数
"AAPL", # 美股股票
"HSI", # 港股指数
"AU.SHF", # 期货
"BTC", # 加密货币
]
print("\n资产类型检测结果:")
for code in codes:
asset_type = detect_asset_type(code)
print(f" {code:15s} -> {asset_type}")
# ============================================================
# 示例3: 批量获取多只标的
# ============================================================
def example_batch_fetch():
"""批量获取多只标的"""
print("\n" + "="*60)
print("示例3: 批量获取")
print("="*60)
# 定义要获取的标的列表
codes = [
"000300.SH", # 沪深300
"399006.SZ", # 创业板指
"510300.SH", # 沪深300ETF
"NDX", # 纳斯达克100
"HSTECH.HK", # 恒生科技
]
# 使用上下文管理器(推荐)
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(codes, "2024-01-01", "2024-03-31")
# 处理结果
print("\n获取结果汇总:")
for code, df in results.items():
if df is not None:
print(f"{code:15s} {len(df):4d} 条, "
f"收盘价: {df['close'].iloc[-1]:.3f}")
else:
print(f"{code:15s} 无数据")
# ============================================================
# 示例4: 跨市场组合分析
# ============================================================
def example_cross_market():
"""跨市场数据分析示例"""
print("\n" + "="*60)
print("示例4: 跨市场组合分析")
print("="*60)
# 定义全球资产组合
portfolio = {
"000300.SH": "沪深300",
"NDX": "纳斯达克100",
"HSI": "恒生指数",
"N225": "日经225",
"GDAXI": "德国DAX",
}
# 获取数据
fetcher = UniversalDataFetcher()
with fetcher:
results = fetcher.fetch_multiple(
list(portfolio.keys()),
"2024-01-01",
"2024-12-31"
)
# 合并收盘价
close_prices = pd.DataFrame()
for code, name in portfolio.items():
if results[code] is not None:
close_prices[name] = results[code]['close']
# 计算收益率
if len(close_prices) > 0:
returns = close_prices.pct_change().dropna()
print("\n各市场收益率统计:")
print((returns.mean() * 252 * 100).round(2).to_string())
print("\n相关系数矩阵:")
print(returns.corr().round(3).to_string())
# ============================================================
# 示例5: 结合技术指标计算
# ============================================================
def example_with_indicators():
"""结合技术指标计算"""
print("\n" + "="*60)
print("示例5: 技术指标计算")
print("="*60)
# 获取数据
df = fetch_kline("000300.SH", "2024-01-01", "2024-06-30")
if df is None:
print("数据获取失败")
return
# 计算移动平均线
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA60'] = df['close'].rolling(window=60).mean()
# 计算RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
print("\n技术指标计算结果最新10条:")
print(df[['close', 'MA5', 'MA20', 'MA60', 'RSI']].tail(10).round(2))
# ============================================================
# 示例6: 错误处理与重试
# ============================================================
def example_error_handling():
"""错误处理示例"""
print("\n" + "="*60)
print("示例6: 错误处理")
print("="*60)
# 测试无效代码
invalid_codes = ["INVALID", "999999.SH", ""]
for code in invalid_codes:
print(f"\n尝试获取: {code}")
df = fetch_kline(code, "2024-01-01", "2024-01-31")
if df is None:
print(f" ✓ 正确返回 None")
else:
print(f" ✗ 意外获取到数据")
# 测试重试机制
print("\n测试重试机制(网络错误):")
fetcher = UniversalDataFetcher()
df = fetcher.fetch("NDX", "2024-01-01", "2024-01-31", retry=3)
if df is not None:
print(f" ✓ 重试成功: {len(df)}")
else:
print(f" ✗ 重试后仍失败")
# ============================================================
# 示例7: 与现有轮动策略集成
# ============================================================
def example_integration_with_rotation():
"""与现有轮动策略集成示例"""
print("\n" + "="*60)
print("示例7: 与轮动策略集成")
print("="*60)
# 模拟轮动策略的代码配置
code_config = {
"399006.SZ": {"name": "创业板指", "etf": "159915.SZ", "market": "A"},
"NDX": {"name": "纳指100", "etf": "513100.SH", "market": "US"},
"HSI": {"name": "恒生指数", "etf": "159920.SZ", "market": "HK"},
}
# 使用 UniversalDataFetcher 获取数据
all_codes = list(code_config.keys())
etf_codes = [cfg['etf'] for cfg in code_config.values() if cfg.get('etf')]
fetcher = UniversalDataFetcher()
with fetcher:
# 获取指数数据(用于因子计算)
index_data = fetcher.fetch_multiple(
all_codes, "2024-01-01", "2024-03-31"
)
# 获取ETF数据用于收益计算
etf_data = fetcher.fetch_multiple(
etf_codes, "2024-01-01", "2024-03-31"
)
# 验证数据完整性
print("\n数据完整性检查:")
for code in all_codes:
idx_df = index_data.get(code)
etf_code = code_config[code].get('etf')
etf_df = etf_data.get(etf_code) if etf_code else None
print(f" {code}:")
print(f" 指数数据: {'' if idx_df is not None else ''} "
f"({len(idx_df) if idx_df is not None else 0} 条)")
if etf_code:
print(f" ETF数据 ({etf_code}): {'' if etf_df is not None else ''} "
f"({len(etf_df) if etf_df is not None else 0} 条)")
# ============================================================
# 运行所有示例
# ============================================================
if __name__ == "__main__":
print("\n" + "="*60)
print("统一数据获取接口 - 使用示例")
print("="*60)
# 选择要运行的示例
examples = [
("基础用法", example_basic),
("资产类型检测", example_detect_type),
("批量获取", example_batch_fetch),
("跨市场组合分析", example_cross_market),
("技术指标计算", example_with_indicators),
("错误处理", example_error_handling),
("与轮动策略集成", example_integration_with_rotation),
]
print("\n可用示例:")
for i, (name, _) in enumerate(examples, 1):
print(f" {i}. {name}")
# 运行特定示例或全部
run_all = True # 改为 False 可以选择性运行
if run_all:
for name, func in examples:
try:
func()
except Exception as e:
print(f"\n示例 '{name}' 运行失败: {e}")
import traceback
traceback.print_exc()
else:
# 运行单个示例
example_index = 0 # 修改为 1-7 运行特定示例
if 0 <= example_index < len(examples):
examples[example_index][1]()
print("\n" + "="*60)
print("示例运行完成")
print("="*60)