- 新增 flask_api_client.py:DataFetcherClient 类封装 - 提供健康检查、资产类型检测、单只/批量数据获取方法 - 包含命令行调用示例和 DataFrame 转换示例 - 新增 universal_fetcher_examples.py:7个完整使用示例 - 涵盖基础用法、资产检测、批量获取、跨市场分析、技术指标
299 lines
8.6 KiB
Python
299 lines
8.6 KiB
Python
"""
|
|
Flask API 客户端示例
|
|
====================
|
|
演示如何调用 Universal Data Fetcher API
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
from typing import Dict, List, Optional
|
|
|
|
|
|
class DataFetcherClient:
|
|
"""
|
|
Universal Data Fetcher API 客户端
|
|
|
|
使用示例:
|
|
client = DataFetcherClient("http://localhost:5000")
|
|
|
|
# 获取单只标的
|
|
data = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31")
|
|
|
|
# 批量获取
|
|
results = client.batch_ohlcv(
|
|
["000300.SH", "NDX", "HSI"],
|
|
"2024-01-01",
|
|
"2024-03-31"
|
|
)
|
|
"""
|
|
|
|
def __init__(self, base_url: str = "http://localhost:5000"):
|
|
self.base_url = base_url.rstrip('/')
|
|
|
|
def health_check(self) -> Dict:
|
|
"""健康检查"""
|
|
response = requests.get(f"{self.base_url}/health")
|
|
return response.json()
|
|
|
|
def detect_asset_type(self, code: str) -> Dict:
|
|
"""检测资产类型"""
|
|
response = requests.get(
|
|
f"{self.base_url}/api/v1/asset-type",
|
|
params={"code": code}
|
|
)
|
|
return response.json()
|
|
|
|
def get_ohlcv(
|
|
self,
|
|
code: str,
|
|
start: Optional[str] = None,
|
|
end: Optional[str] = None,
|
|
retry: int = 3
|
|
) -> Dict:
|
|
"""
|
|
获取单只标的的 OHLCV 数据
|
|
|
|
Args:
|
|
code: 标的代码
|
|
start: 开始日期 (YYYY-MM-DD)
|
|
end: 结束日期 (YYYY-MM-DD)
|
|
retry: 重试次数
|
|
|
|
Returns:
|
|
{
|
|
"code": "000300.SH",
|
|
"asset_type": "china_index",
|
|
"data": [...],
|
|
"count": 58,
|
|
"date_range": {...}
|
|
}
|
|
"""
|
|
params = {"code": code, "retry": retry}
|
|
if start:
|
|
params["start"] = start
|
|
if end:
|
|
params["end"] = end
|
|
|
|
response = requests.get(
|
|
f"{self.base_url}/api/v1/ohlcv",
|
|
params=params
|
|
)
|
|
return response.json()
|
|
|
|
def batch_ohlcv(
|
|
self,
|
|
codes: List[str],
|
|
start: Optional[str] = None,
|
|
end: Optional[str] = None,
|
|
retry: int = 3
|
|
) -> Dict:
|
|
"""
|
|
批量获取多只标的的 OHLCV 数据
|
|
|
|
Args:
|
|
codes: 标的代码列表
|
|
start: 开始日期
|
|
end: 结束日期
|
|
retry: 重试次数
|
|
|
|
Returns:
|
|
{
|
|
"results": {...},
|
|
"success_count": 2,
|
|
"failed_count": 1,
|
|
"total": 3
|
|
}
|
|
"""
|
|
payload = {
|
|
"codes": codes,
|
|
"retry": retry
|
|
}
|
|
if start:
|
|
payload["start"] = start
|
|
if end:
|
|
payload["end"] = end
|
|
|
|
response = requests.post(
|
|
f"{self.base_url}/api/v1/ohlcv/batch",
|
|
json=payload
|
|
)
|
|
return response.json()
|
|
|
|
def get_supported_codes(self) -> Dict:
|
|
"""获取支持的代码示例"""
|
|
response = requests.get(f"{self.base_url}/api/v1/supported-codes")
|
|
return response.json()
|
|
|
|
|
|
# ============================================================
|
|
# 使用示例
|
|
# ============================================================
|
|
|
|
def example_basic():
|
|
"""基础使用示例"""
|
|
print("\n" + "="*60)
|
|
print("示例1: 基础使用")
|
|
print("="*60)
|
|
|
|
client = DataFetcherClient("http://localhost:5000")
|
|
|
|
# 健康检查
|
|
print("\n1. 健康检查:")
|
|
health = client.health_check()
|
|
print(f" 状态: {health.get('status')}")
|
|
print(f" SSH配置: {health.get('ssh_configured')}")
|
|
|
|
# 检测资产类型
|
|
print("\n2. 检测资产类型:")
|
|
for code in ["000300.SH", "NDX", "BTC", "AAPL"]:
|
|
result = client.detect_asset_type(code)
|
|
print(f" {code:15s} -> {result.get('asset_type'):15s} ({result.get('description')})")
|
|
|
|
|
|
def example_single_fetch():
|
|
"""获取单只标的"""
|
|
print("\n" + "="*60)
|
|
print("示例2: 获取单只标的")
|
|
print("="*60)
|
|
|
|
client = DataFetcherClient("http://localhost:5000")
|
|
|
|
# 获取A股指数
|
|
print("\n1. 沪深300指数 (000300.SH):")
|
|
result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-03-31")
|
|
|
|
if "error" in result:
|
|
print(f" ✗ 错误: {result['error']}")
|
|
else:
|
|
print(f" ✓ 获取成功: {result['count']} 条")
|
|
print(f" 资产类型: {result['asset_type']}")
|
|
print(f" 日期范围: {result['date_range']['start']} ~ {result['date_range']['end']}")
|
|
if result['data']:
|
|
latest = result['data'][-1]
|
|
print(f" 最新数据: {latest['date']} 收盘 {latest['close']}")
|
|
|
|
|
|
def example_batch_fetch():
|
|
"""批量获取"""
|
|
print("\n" + "="*60)
|
|
print("示例3: 批量获取")
|
|
print("="*60)
|
|
|
|
client = DataFetcherClient("http://localhost:5000")
|
|
|
|
codes = [
|
|
"000300.SH", # A股指数
|
|
"510300.SH", # A股ETF
|
|
"NDX", # 美股指数
|
|
"HSI", # 港股指数
|
|
]
|
|
|
|
print(f"\n批量获取 {len(codes)} 只标的...")
|
|
result = client.batch_ohlcv(codes, "2024-01-01", "2024-03-31")
|
|
|
|
print(f"\n结果统计:")
|
|
print(f" 成功: {result['success_count']}/{result['total']}")
|
|
print(f" 失败: {result['failed_count']}/{result['total']}")
|
|
|
|
print(f"\n详细结果:")
|
|
for code, data in result['results'].items():
|
|
if 'error' in data:
|
|
print(f" ✗ {code:15s} 错误: {data['error']}")
|
|
else:
|
|
print(f" ✓ {code:15s} {data['count']:4d} 条 "
|
|
f"({data['date_range']['start']} ~ {data['date_range']['end']})")
|
|
|
|
|
|
def example_cli():
|
|
"""命令行接口示例"""
|
|
print("\n" + "="*60)
|
|
print("示例4: 命令行调用")
|
|
print("="*60)
|
|
|
|
import subprocess
|
|
|
|
# 使用 curl 调用 API
|
|
commands = [
|
|
("健康检查", "curl -s http://localhost:5000/health | python -m json.tool"),
|
|
("检测资产类型", "curl -s 'http://localhost:5000/api/v1/asset-type?code=000300.SH' | python -m json.tool"),
|
|
("获取K线数据", "curl -s 'http://localhost:5000/api/v1/ohlcv?code=000300.SH&start=2024-01-01&end=2024-01-31' | python -m json.tool"),
|
|
]
|
|
|
|
for name, cmd in commands:
|
|
print(f"\n{name}:")
|
|
print(f" 命令: {cmd}")
|
|
print(" 输出:")
|
|
try:
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10)
|
|
if result.returncode == 0:
|
|
print(result.stdout[:500] + "..." if len(result.stdout) > 500 else result.stdout)
|
|
else:
|
|
print(f" 错误: {result.stderr}")
|
|
except Exception as e:
|
|
print(f" 执行失败: {e}")
|
|
|
|
|
|
def example_dataframe():
|
|
"""转换为 DataFrame"""
|
|
print("\n" + "="*60)
|
|
print("示例5: 转换为 DataFrame")
|
|
print("="*60)
|
|
|
|
import pandas as pd
|
|
|
|
client = DataFetcherClient("http://localhost:5000")
|
|
|
|
# 获取数据
|
|
result = client.get_ohlcv("000300.SH", "2024-01-01", "2024-01-31")
|
|
|
|
if "error" not in result and result['data']:
|
|
# 转换为 DataFrame
|
|
df = pd.DataFrame(result['data'])
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
df = df.set_index('date')
|
|
|
|
print(f"\n转换为 DataFrame:")
|
|
print(f" 形状: {df.shape}")
|
|
print(f" 列: {list(df.columns)}")
|
|
print(f"\n数据预览:")
|
|
print(df.head())
|
|
|
|
# 计算收益率
|
|
df['return'] = df['close'].pct_change()
|
|
print(f"\n收益率统计:")
|
|
print(df['return'].describe())
|
|
|
|
|
|
# ============================================================
|
|
# 运行示例
|
|
# ============================================================
|
|
|
|
if __name__ == "__main__":
|
|
print("\n" + "="*60)
|
|
print("Universal Data Fetcher API 客户端示例")
|
|
print("="*60)
|
|
print("\n确保 API 服务器已启动:")
|
|
print(" python core/datasource/flask_server.py")
|
|
print("\n")
|
|
|
|
# 运行示例
|
|
examples = [
|
|
("基础使用", example_basic),
|
|
("单只标的", example_single_fetch),
|
|
("批量获取", example_batch_fetch),
|
|
("命令行调用", example_cli),
|
|
("DataFrame转换", example_dataframe),
|
|
]
|
|
|
|
for name, func in examples:
|
|
try:
|
|
func()
|
|
except Exception as e:
|
|
print(f"\n示例 '{name}' 失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
print("\n" + "="*60)
|
|
print("示例完成")
|
|
print("="*60)
|