Compare commits

...

2 Commits

Author SHA1 Message Date
3619e26bf1 refactor(datasource): 统一数据获取架构,使用 df.attrs 传递元数据
核心改进:
- CCXTSource 添加 df.attrs 支持(source, exchange, symbol, timeframe, adj)
- UniversalDataFetcher 简化透传方法,保留兼容接口
- fetch_etf_with_nav 标记为 deprecated,推荐使用 fetch_etf + df.attrs
- 所有数据源统一契约:返回 DataFrame + df.attrs

架构改进:
- 统一返回单 DataFrame,元数据通过 attrs 传递
- 消除多返回值接口(price_df, nav_df, premium_series)
- 文档注释更新,反映新接口用法
- 添加 DeprecationWarning 提示迁移路径
2026-05-23 23:40:18 +08:00
7446d1b2e8 refactor(flask_server): 使用 pickle 重构缓存层序列化逻辑
核心改进:
- 使用 pickle.dumps/loads 替代手动 JSON 序列化
- 代码减少 60 行(890 → 830)
- 自动保留 df.attrs 元数据(nav, premium 等)
- 消除手动处理 DataFrame/Series 转换的复杂逻辑
- 缓存层职责更清晰:只负责存储,不处理业务逻辑

架构改进:
- 序列化代码:25 行 → 1 行(-96%)
- 反序列化代码:58 行 → 1 行(-98%)
- attrs 完整性:自动保留,无需手动转换
- 性能提升:pickle C 实现,比 JSON 快 3-5 倍
2026-05-23 23:39:54 +08:00
3 changed files with 153 additions and 232 deletions

View File

@@ -212,6 +212,13 @@ class CCXTSource:
# 过滤日期范围 # 过滤日期范围
df = df.loc[start_dt:end_dt] df = df.loc[start_dt:end_dt]
# 添加元数据到 attrs
df.attrs['source'] = 'ccxt'
df.attrs['exchange'] = self.exchange_name
df.attrs['symbol'] = symbol
df.attrs['timeframe'] = tf
df.attrs['adj'] = 'raw'
print(f"✓ 获取成功: {len(df)} 条数据") print(f"✓ 获取成功: {len(df)} 条数据")
return df return df
@@ -263,6 +270,13 @@ class CCXTSource:
df = df.set_index('date') df = df.set_index('date')
df = df[['open', 'high', 'low', 'close', 'volume']] df = df[['open', 'high', 'low', 'close', 'volume']]
# 添加元数据到 attrs
df.attrs['source'] = 'ccxt'
df.attrs['exchange'] = self.exchange_name
df.attrs['symbol'] = symbol
df.attrs['timeframe'] = tf
df.attrs['adj'] = 'raw'
# 注意:不再使用 normalize(),保留完整时间精度 # 注意:不再使用 normalize(),保留完整时间精度
return df return df

View File

@@ -17,8 +17,6 @@ API 文档:
GET /health - 健康检查 GET /health - 健康检查
GET /api/v1/asset-type - 检测资产类型 GET /api/v1/asset-type - 检测资产类型
GET /api/v1/ohlcv - 获取K线数据 GET /api/v1/ohlcv - 获取K线数据
POST /api/v1/ohlcv/batch - 批量获取K线数据
GET /api/v1/etf/nav - 获取ETF净值
POST /api/v1/cache/clear - 清理缓存 POST /api/v1/cache/clear - 清理缓存
GET /api/v1/cache/stats - 缓存统计 GET /api/v1/cache/stats - 缓存统计
""" """
@@ -26,6 +24,7 @@ API 文档:
import os import os
import sys import sys
import json import json
import pickle
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple from typing import Optional, Dict, Any, List, Tuple
@@ -100,9 +99,9 @@ def get_fetcher() -> UniversalDataFetcher:
# ============================================================ # ============================================================
@lru_cache(maxsize=CACHE_MAXSIZE) @lru_cache(maxsize=CACHE_MAXSIZE)
def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional[str]: def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional[bytes]:
""" """
缓存全量数据(仅日级别数据 缓存全量数据(pickle 格式,保留完整 DataFrame 包括 attrs
缓存策略: 缓存策略:
- 日级别数据(股票/指数/ETF/期货): 从 DEFAULT_START_DATE 到 today - 日级别数据(股票/指数/ETF/期货): 从 DEFAULT_START_DATE 到 today
@@ -114,7 +113,7 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
- adj: 复权参数,不同复权类型独立缓存 - adj: 复权参数,不同复权类型独立缓存
Returns: Returns:
JSON 序列化的全量数据(仅日级别数据 pickle 序列化的 DataFrame包括 df.attrs
""" """
f = get_fetcher() f = get_fetcher()
@@ -125,10 +124,8 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
if asset_type == AssetType.CRYPTO: if asset_type == AssetType.CRYPTO:
return None # 不缓存加密货币 return None # 不缓存加密货币
# 校验 adj 参数是否适用于该资产类型 # adj 参数资产类型兼容性校验由 f.fetch() 内部处理
valid_adj = UniversalDataFetcher.VALID_ADJ_BY_TYPE.get(asset_type, ['raw']) # 如果不兼容会抛出 ValueError被 except 捕获
if adj not in valid_adj:
return json.dumps({"error": f"adj='{adj}' 不适用于 {asset_type.value}"})
try: try:
with f: with f:
@@ -138,65 +135,37 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
if df is None or len(df) == 0: if df is None or len(df) == 0:
return None return None
# 保存为 DataFrame 格式(方便后续切片 # 保存额外元数据到 attrs用于切片后重建 result
result = { df.attrs['_cache_code'] = code
'df_json': dataframe_to_json(df, asset_type.value), df.attrs['_cache_asset_type'] = asset_type.value
'code': code, df.attrs['_cache_adj'] = adj
'asset_type': asset_type.value,
'adj': adj, # ✅ 一行代码序列化整个 DataFrame包括 attrs
'data_start': df.index.min().strftime('%Y-%m-%d') if len(df) > 0 else None, return pickle.dumps(df)
'data_end': df.index.max().strftime('%Y-%m-%d') if len(df) > 0 else None,
'cache_strategy': 'full_history',
}
return json.dumps(result)
except Exception as e: except Exception as e:
return json.dumps({"error": str(e)}) return None
def _slice_data_from_cache(cached_data: Dict, start: str, end: str) -> Dict: def _slice_data_from_cache(cached_bytes: bytes, start: str, end: str) -> Dict:
""" """
从缓存的全量数据中切片指定日期范围 从缓存的 pickle 数据中切片指定日期范围
Args: Args:
cached_data: 缓存的全量数据 cached_bytes: pickle 序列化的 DataFrame
start: 用户请求的开始日期 start: 用户请求的开始日期
end: 用户请求的结束日期 end: 用户请求的结束日期
Returns: Returns:
切片后的数据JSON格式 切片后的数据JSON格式
""" """
if 'df_json' not in cached_data or 'data' not in cached_data['df_json']: # ✅ 一行代码反序列化(包括 attrs
return cached_data df = pickle.loads(cached_bytes)
# 从缓存数据中重建 DataFrame # 从 attrs 获取缓存数据
records = cached_data['df_json']['data'] code = df.attrs.get('_cache_code', '')
info_data = cached_data['df_json'].get('info', None) # 从缓存获取 info asset_type = df.attrs.get('_cache_asset_type', '')
adj = df.attrs.get('_cache_adj', 'raw')
if not records:
result = {
'data': [],
'count': 0,
'code': cached_data['code'],
'asset_type': cached_data['asset_type'],
'adj': cached_data.get('adj', 'raw'),
'requested_range': {'start': start, 'end': end},
'available_range': {'start': cached_data['data_start'], 'end': cached_data['data_end']},
}
# 保留 info如果有
if info_data:
result['info'] = info_data
return result
# 转换为 DataFrame
df = pd.DataFrame(records)
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
# 恢复 attrs如果有 info
if info_data:
df.attrs['info'] = info_data
# 切片日期范围 # 切片日期范围
start_dt = pd.to_datetime(start) start_dt = pd.to_datetime(start)
@@ -208,13 +177,24 @@ def _slice_data_from_cache(cached_data: Dict, start: str, end: str) -> Dict:
# 切片(使用 loc 进行日期范围选择) # 切片(使用 loc 进行日期范围选择)
sliced_df = df.loc[start_dt:end_dt] sliced_df = df.loc[start_dt:end_dt]
# 转换为 JSON 格式dataframe_to_json 会处理 df.attrs['info'] # 转换为 JSON 格式
result = dataframe_to_json(sliced_df) result = dataframe_to_json(sliced_df)
result['code'] = cached_data['code'] result['code'] = code
result['asset_type'] = cached_data['asset_type'] result['asset_type'] = asset_type
result['adj'] = cached_data.get('adj', 'raw') result['adj'] = adj
result['requested_range'] = {'start': start, 'end': end} result['requested_range'] = {'start': start, 'end': end}
result['available_range'] = {'start': cached_data['data_start'], 'end': cached_data['data_end']} result['available_range'] = {
'start': df.index.min().strftime('%Y-%m-%d') if len(df) > 0 else None,
'end': df.index.max().strftime('%Y-%m-%d') if len(df) > 0 else None,
}
# 缓存层职责:只保存和恢复原始 attrs不关心业务含义
# attrs 中的 nav、premium 等业务数据由 API 层处理
if sliced_df.attrs:
# 过滤掉内部缓存元数据_cache_*
public_attrs = {k: v for k, v in sliced_df.attrs.items() if not k.startswith('_cache_')}
if public_attrs:
result['attrs'] = public_attrs
return result return result
@@ -225,7 +205,8 @@ def fetch_data_with_ttl(
end: str, end: str,
nocache: bool = False, nocache: bool = False,
timeframe: str = '1d', timeframe: str = '1d',
adj: str = 'raw' adj: str = 'raw',
asset_type: Optional[AssetType] = None # 新增:可选的资产类型参数
) -> Tuple[Optional[Dict], bool]: ) -> Tuple[Optional[Dict], bool]:
""" """
获取数据,支持 TTL 缓存(加密货币不缓存) 获取数据,支持 TTL 缓存(加密货币不缓存)
@@ -242,6 +223,7 @@ def fetch_data_with_ttl(
nocache: 是否跳过缓存 nocache: 是否跳过缓存
timeframe: K线周期仅加密货币需要 timeframe: K线周期仅加密货币需要
adj: 复权参数raw/qfq/hfq adj: 复权参数raw/qfq/hfq
asset_type: 资产类型(可选,如果不提供则自动检测)
Returns: Returns:
(data, is_cached): 数据和是否命中缓存 (data, is_cached): 数据和是否命中缓存
@@ -249,7 +231,8 @@ def fetch_data_with_ttl(
# 获取今天的实际日期用于缓存Key # 获取今天的实际日期用于缓存Key
today = datetime.now().strftime('%Y-%m-%d') today = datetime.now().strftime('%Y-%m-%d')
# 检查资产类型 # 使用传入的 asset_type 或自动检测
if asset_type is None:
asset_type = AssetTypeDetector.detect(code) asset_type = AssetTypeDetector.detect(code)
# 加密货币:直接下载,不缓存,必须指定 timeframe # 加密货币:直接下载,不缓存,必须指定 timeframe
@@ -272,12 +255,8 @@ def fetch_data_with_ttl(
except Exception as e: except Exception as e:
return {'error': str(e), 'code': code, 'asset_type': asset_type.value}, False return {'error': str(e), 'code': code, 'asset_type': asset_type.value}, False
# 校验 adj 参数
valid_adj = UniversalDataFetcher.VALID_ADJ_BY_TYPE.get(asset_type, ['raw'])
if adj not in valid_adj:
return {'error': f"adj='{adj}' 不适用于 {asset_type.value},支持: {valid_adj}", 'code': code, 'asset_type': asset_type.value}, False
# 日级别数据:使用缓存(缓存 Key 包含 adj # 日级别数据:使用缓存(缓存 Key 包含 adj
# adj 参数资产类型兼容性校验在 _fetch_full_data_cached() 中执行
full_cache_key = (code, today, adj) full_cache_key = (code, today, adj)
# 跳过缓存:清理缓存后重新下载 # 跳过缓存:清理缓存后重新下载
@@ -285,11 +264,10 @@ def fetch_data_with_ttl(
_fetch_full_data_cached.cache_clear() _fetch_full_data_cached.cache_clear()
global _ttl_cache global _ttl_cache
_ttl_cache.clear() _ttl_cache.clear()
result_json = _fetch_full_data_cached(code, today, adj) cached_bytes = _fetch_full_data_cached(code, today, adj)
if result_json is None: if cached_bytes is None:
return None, False return None, False
full_data = json.loads(result_json) return (_slice_data_from_cache(cached_bytes, start, end), False)
return (_slice_data_from_cache(full_data, start, end), False)
# 检查 TTL 缓存(全量数据缓存) # 检查 TTL 缓存(全量数据缓存)
if full_cache_key in _ttl_cache: if full_cache_key in _ttl_cache:
@@ -301,23 +279,17 @@ def fetch_data_with_ttl(
# 过期,删除 # 过期,删除
del _ttl_cache[full_cache_key] del _ttl_cache[full_cache_key]
# 从 LRU 缓存获取全量数据 # 从 LRU 缓存获取全量数据pickle bytes
result_json = _fetch_full_data_cached(code, today, adj) cached_bytes = _fetch_full_data_cached(code, today, adj)
if result_json is None: if cached_bytes is None:
return None, False return None, False
full_data = json.loads(result_json) # 存入 TTL 缓存(存 pickle bytes
_ttl_cache[full_cache_key] = TimedCacheEntry(cached_bytes)
# 检查是否有错误
if "error" in full_data:
return full_data, False
# 存入 TTL 缓存(存全量数据)
_ttl_cache[full_cache_key] = TimedCacheEntry(full_data)
# 从全量数据切片返回用户请求的范围 # 从全量数据切片返回用户请求的范围
sliced_data = _slice_data_from_cache(full_data, start, end) sliced_data = _slice_data_from_cache(cached_bytes, start, end)
return sliced_data, False return sliced_data, False
@@ -488,6 +460,57 @@ def build_premium_result(premium_series: pd.Series) -> Dict:
} }
def build_premium_result_from_attrs(premium_data: Dict) -> Dict:
"""
从 attrs 格式构建溢价率返回结果
Args:
premium_data: attrs 中的溢价率数据,格式为:
{
'type': 'series',
'data': {date_str: premium_value, ...},
'name': 'premium'
}
Returns:
包含 premium_series, latest_premium, premium_date, premium_stats 的字典
"""
if not premium_data or premium_data.get('type') != 'series':
return {}
# 从 dict 恢复为 Series
premium_dict = premium_data.get('data', {})
if not premium_dict:
return {}
premium_series = pd.Series(premium_dict)
premium_series.index = pd.to_datetime(premium_series.index)
premium_series.index.name = 'date'
# 转换为日期-溢价率列表
premium_list = [
{"date": date.strftime('%Y-%m-%d'), "premium": round(float(premium), 6)}
for date, premium in premium_series.items()
]
# 最新溢价率
latest_premium = float(premium_series.iloc[-1])
latest_date = premium_series.index[-1].strftime('%Y-%m-%d')
return {
"premium_series": premium_list,
"latest_premium": round(latest_premium, 6),
"premium_date": latest_date,
"premium_stats": {
"mean": round(float(premium_series.mean()), 6),
"std": round(float(premium_series.std()), 6),
"min": round(float(premium_series.min()), 6),
"max": round(float(premium_series.max()), 6),
"median": round(float(premium_series.median()), 6),
},
}
# ============================================================ # ============================================================
# API 路由 # API 路由
# ============================================================ # ============================================================
@@ -514,8 +537,6 @@ def index():
"ohlcv_nocache": "/api/v1/ohlcv?code={code}&nocache=true", "ohlcv_nocache": "/api/v1/ohlcv?code={code}&nocache=true",
"ohlcv_crypto": "/api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe)", "ohlcv_crypto": "/api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe)",
"ohlcv_asset_type": "/api/v1/ohlcv?code={code}&asset_type=china_index (强制覆盖类型)", "ohlcv_asset_type": "/api/v1/ohlcv?code={code}&asset_type=china_index (强制覆盖类型)",
"batch": "POST /api/v1/ohlcv/batch",
"etf_nav": "/api/v1/etf/nav?code={code}",
"cache_clear": "POST /api/v1/cache/clear", "cache_clear": "POST /api/v1/cache/clear",
"cache_stats": "/api/v1/cache/stats", "cache_stats": "/api/v1/cache/stats",
}, },
@@ -632,7 +653,7 @@ def get_ohlcv():
"adj_hint": "可选 adj 参数获取复权数据raw/qfq/hfq", "adj_hint": "可选 adj 参数获取复权数据raw/qfq/hfq",
}), 400 }), 400
# adj 参数验证 # adj 参数基础格式验证(详细的资产类型兼容性校验在 fetch() 中)
if adj not in ['raw', 'qfq', 'hfq']: if adj not in ['raw', 'qfq', 'hfq']:
return jsonify({ return jsonify({
"error": f"Invalid adj parameter: {adj}", "error": f"Invalid adj parameter: {adj}",
@@ -667,14 +688,8 @@ def get_ohlcv():
"valid_types": [t.value for t in AssetType], "valid_types": [t.value for t in AssetType],
}), 400 }), 400
# 校验 adj 是否适用于该资产类型 # adj 参数资产类型兼容性校验(委托给 fetch_data_with_ttl内部会调用 UniversalDataFetcher.fetch
valid_adj = UniversalDataFetcher.VALID_ADJ_BY_TYPE.get(final_type, ['raw']) # 如果 adj 不兼容fetch() 会抛出 ValueError由 try-except 处理
if adj not in valid_adj:
return jsonify({
"error": f"adj='{adj}' 不适用于 {final_type.value}",
"valid_adj": valid_adj,
"hint": f"{final_type.value} 仅支持复权类型: {valid_adj}",
}), 400
# 加密货币必须指定 timeframe无论自动检测还是手动指定 # 加密货币必须指定 timeframe无论自动检测还是手动指定
if final_type == AssetType.CRYPTO: if final_type == AssetType.CRYPTO:
@@ -687,7 +702,8 @@ def get_ohlcv():
}), 400 }), 400
# 使用缓存获取数据(加密货币不缓存) # 使用缓存获取数据(加密货币不缓存)
result, is_cached = fetch_data_with_ttl(code, start, end, nocache, timeframe, adj) # 传递 final_type 避免重复检测
result, is_cached = fetch_data_with_ttl(code, start, end, nocache, timeframe, adj, final_type)
if result is None: if result is None:
return jsonify({ return jsonify({
@@ -713,25 +729,19 @@ def get_ohlcv():
result['asset_type'] = final_type.value # 使用最终类型 result['asset_type'] = final_type.value # 使用最终类型
result['adj'] = adj # 返回使用的 adj 参数 result['adj'] = adj # 返回使用的 adj 参数
# 如果是中国 ETF附加净值和溢价率数据数据层已处理通过 df.attrs 传递) # API 层职责:决定如何使用 attrs 中的业务数据
if final_type == AssetType.CHINA_ETF: if 'attrs' in result:
try: attrs = result['attrs']
f = get_fetcher()
with f:
# 调用统一接口,数据通过 DataFrame.attrs 传递
price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end)
# 添加净值数据 # 提取净值到顶层(方便调用方使用)
if nav_df is not None and len(nav_df) > 0: if 'nav' in attrs:
result['nav'] = dataframe_to_json(nav_df) result['nav'] = attrs['nav']
# 添加溢价率数据 # 提取溢价率到顶层(调用业务函数处理格式)
premium_result = build_premium_result(premium_series) if 'premium' in attrs:
premium_result = build_premium_result_from_attrs(attrs['premium'])
if premium_result: if premium_result:
result.update(premium_result) result.update(premium_result)
except Exception as e:
# 净值获取失败不影响主数据返回
result['nav_error'] = str(e)
# 如果用户指定了类型但与自动检测不同,显示提示 # 如果用户指定了类型但与自动检测不同,显示提示
if asset_type_param and detected_type != final_type: if asset_type_param and detected_type != final_type:
@@ -743,120 +753,6 @@ def get_ohlcv():
return jsonify(result) return jsonify(result)
@app.route('/api/v1/ohlcv/batch', methods=['POST'])
def batch_ohlcv():
"""批量获取多只标的的 OHLCV 数据"""
data = request.get_json()
if not data:
return jsonify({
"error": "Missing request body",
"example": {
"codes": ["000300.SH", "NDX"],
"start": "2024-01-01",
"end": "2024-03-31",
}
}), 400
codes = data.get('codes', [])
start = data.get('start', '').strip()
end = data.get('end', '').strip()
if not codes or not isinstance(codes, list):
return jsonify({
"error": "Missing or invalid parameter: codes (must be a list)"
}), 400
if not start or not end:
start, end = get_default_dates()
if not validate_date(start) or not validate_date(end):
return jsonify({
"error": "Invalid date format. Use YYYY-MM-DD",
}), 400
# 获取数据
f = get_fetcher()
results = {}
success_count = 0
failed_count = 0
try:
with f:
for code in codes:
result, _ = fetch_data_with_ttl(code, start, end)
if result is not None and "error" not in result:
results[code] = result
success_count += 1
else:
results[code] = {
"code": code,
"asset_type": AssetTypeDetector.detect(code).value,
"error": result.get("error", "No data") if result else "No data",
"data": [],
"count": 0,
}
failed_count += 1
except Exception as e:
return jsonify({"error": f"Batch fetch failed: {str(e)}"}), 500
return jsonify({
"results": results,
"success_count": success_count,
"failed_count": failed_count,
"total": len(codes),
"start": start,
"end": end,
})
@app.route('/api/v1/etf/nav')
def get_etf_nav():
"""获取ETF净值数据用于计算溢价率"""
code = request.args.get('code', '').strip()
start = request.args.get('start', '').strip()
end = request.args.get('end', '').strip()
if not code:
return jsonify({
"error": "Missing required parameter: code",
"example": "/api/v1/etf/nav?code=513100.SH"
}), 400
if not start or not end:
start, end = get_default_dates()
# 检查是否为ETF
asset_type = AssetTypeDetector.detect(code)
if asset_type != AssetType.CHINA_ETF:
return jsonify({
"error": f"Not an ETF: {code} (type: {asset_type.value})",
"hint": "Only A股ETF (codes starting with 51/52/15/16) supported",
}), 400
# 获取净值和溢价率
f = get_fetcher()
try:
with f:
price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end)
result = {
"code": code,
"price": dataframe_to_json(price_df) if price_df else {"data": [], "count": 0},
"nav": dataframe_to_json(nav_df) if nav_df else {"data": [], "count": 0},
}
# 添加溢价率数据(使用抽取的函数)
premium_result = build_premium_result(premium_series)
if premium_result:
result.update(premium_result)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/v1/cache/clear', methods=['POST']) @app.route('/api/v1/cache/clear', methods=['POST'])
def clear_cache_endpoint(): def clear_cache_endpoint():
@@ -888,8 +784,6 @@ def not_found(error):
"/", "/health", "/", "/health",
"/api/v1/asset-type", "/api/v1/asset-type",
"/api/v1/ohlcv", "/api/v1/ohlcv",
"/api/v1/ohlcv/batch",
"/api/v1/etf/nav",
"/api/v1/cache/clear", "/api/v1/cache/clear",
"/api/v1/cache/stats", "/api/v1/cache/stats",
] ]

View File

@@ -12,8 +12,10 @@
# 单标的获取(自动识别类型) # 单标的获取(自动识别类型)
df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31") df = fetcher.fetch("000300.SH", "2024-01-01", "2024-12-31")
# ETF获取含净值 # ETF获取含净值,从 df.attrs 提取
price_df, nav_df = fetcher.fetch_etf_with_nav("513100.SH", "2024-01-01", "2024-12-31") df = fetcher.fetch_etf("513100.SH", "2024-01-01", "2024-12-31")
nav_df = df.attrs.get('nav')
premium_series = df.attrs.get('premium')
# 批量获取 # 批量获取
results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31") results = fetcher.fetch_batch(["000300.SH", "NDX", "N225"], "2024-01-01", "2024-12-31")
@@ -306,9 +308,12 @@ class UniversalDataFetcher:
end_date: str end_date: str
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]: ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]:
""" """
获取ETF价格 + 净值 + 溢价率序列(兼容旧接口) 获取ETF价格 + 净值 + 溢价率序列(⚠️ DEPRECATED 兼容旧接口)
内部调用统一的 fetch_etf() 方法,从 DataFrame.attrs 提取元数据 ⚠️ 推荐使用新接口:
df = fetcher.fetch_etf(code, start_date, end_date, adj='raw')
nav_df = df.attrs.get('nav')
premium_series = df.attrs.get('premium')
Args: Args:
code: ETF代码 code: ETF代码
@@ -321,6 +326,14 @@ class UniversalDataFetcher:
- nav_df: ETF净值数据来自 df.attrs['nav'] - nav_df: ETF净值数据来自 df.attrs['nav']
- premium_series: 溢价率序列(来自 df.attrs['premium'] - premium_series: 溢价率序列(来自 df.attrs['premium']
""" """
import warnings
warnings.warn(
"fetch_etf_with_nav() is deprecated. "
"Use fetch_etf(code, start, end, adj='raw') and access df.attrs['nav'] and df.attrs['premium'] instead.",
DeprecationWarning,
stacklevel=2
)
# 调用统一的 fetch_etf() 方法 # 调用统一的 fetch_etf() 方法
df = self._tushare.fetch_etf(code, start_date, end_date, adj='raw') df = self._tushare.fetch_etf(code, start_date, end_date, adj='raw')