@@ -17,8 +17,6 @@ API 文档:
GET /health - 健康检查
GET /api/v1/asset-type - 检测资产类型
GET /api/v1/ohlcv - 获取K线数据
POST /api/v1/ohlcv/batch - 批量获取K线数据
GET /api/v1/etf/nav - 获取ETF净值
POST /api/v1/cache/clear - 清理缓存
GET /api/v1/cache/stats - 缓存统计
"""
@@ -26,6 +24,7 @@ API 文档:
import os
import sys
import json
import pickle
from pathlib import Path
from datetime import datetime , timedelta
from typing import Optional , Dict , Any , List , Tuple
@@ -100,9 +99,9 @@ def get_fetcher() -> UniversalDataFetcher:
# ============================================================
@lru_cache ( maxsize = CACHE_MAXSIZE )
def _fetch_full_data_cached ( code : str , today : str , adj : str = ' raw ' ) - > Optional [ str ] :
def _fetch_full_data_cached ( code : str , today : str , adj : str = ' raw ' ) - > Optional [ bytes ] :
"""
缓存全量数据(仅日级别数据 )
缓存全量数据(pickle 格式,保留完整 DataFrame 包括 attrs )
缓存策略:
- 日级别数据(股票/指数/ETF/期货): 从 DEFAULT_START_DATE 到 today
@@ -114,7 +113,7 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
- adj: 复权参数,不同复权类型独立缓存
Returns:
JSON 序列化的全量数据(仅日级别数据 )
pickle 序列化的 DataFrame( 包括 df.attrs )
"""
f = get_fetcher ( )
@@ -125,10 +124,8 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
if asset_type == AssetType . CRYPTO :
return None # 不缓存加密货币
# 校验 adj 参数是否适用于该 资产类型
valid_adj = UniversalDataFetcher . VALID_ADJ_BY_TYPE . get ( asset_type , [ ' raw ' ] )
if adj not in valid_adj :
return json . dumps ( { " error " : f " adj= ' { adj } ' 不适用于 { asset_type . value } " } )
# adj 参数资产类型兼容性校验由 f.fetch() 内部处理
# 如果不兼容会抛出 ValueError, 被 except 捕获
try :
with f :
@@ -138,65 +135,37 @@ def _fetch_full_data_cached(code: str, today: str, adj: str = 'raw') -> Optional
if df is None or len ( df ) == 0 :
return None
# 保存为 DataFrame 格式(方便后续切片 )
result = {
' df_json ' : dataframe_to_json ( df , asset_type . value ) ,
' code ' : code ,
' asset_type ' : asset_type . value ,
' adj ' : adj ,
' data_start ' : df . index . min ( ) . strftime ( ' % Y- % m- %d ' ) if len ( df ) > 0 else None ,
' data_end ' : df . index . max ( ) . strftime ( ' % Y- % m- %d ' ) if len ( df ) > 0 else None ,
' cache_strategy ' : ' full_history ' ,
}
# 保存额外元数据到 attrs( 用于切片后重建 result )
df . attrs [ ' _cache_code ' ] = code
df . attrs [ ' _cache_asset_type ' ] = asset_type . value
df . attrs [ ' _cache_adj ' ] = adj
# ✅ 一行代码序列化整个 DataFrame( 包括 attrs)
return pickle . dumps ( df )
return json . dumps ( result )
except Exception as e :
return json . dumps ( { " error " : str ( e ) } )
return None
def _slice_data_from_cache ( cached_data : Dict , start : str , end : str ) - > Dict :
def _slice_data_from_cache ( cached_bytes : bytes , start : str , end : str ) - > Dict :
"""
从缓存的全量 数据中切片指定日期范围
从缓存的 pickle 数据中切片指定日期范围
Args:
cached_data: 缓存的全量数据
cached_bytes: pickle 序列化的 DataFrame
start: 用户请求的开始日期
end: 用户请求的结束日期
Returns:
切片后的数据( JSON格式)
"""
if ' df_json ' not in cached_data or ' data ' not in cached_data [ ' df_json ' ] :
return cached_data
# ✅ 一行代码反序列化(包括 attrs)
df = pickle . loads ( cached_bytes )
# 从缓存数据中重建 DataFrame
records = cached_data [ ' df_json ' ] [ ' data ' ]
info_data = cached_data [ ' df_json ' ] . get ( ' info ' , None ) # 从缓存获取 info
if not records :
result = {
' data ' : [ ] ,
' count ' : 0 ,
' code ' : cached_data [ ' code ' ] ,
' asset_type ' : cached_data [ ' asset_type ' ] ,
' adj ' : cached_data . get ( ' adj ' , ' raw ' ) ,
' requested_range ' : { ' start ' : start , ' end ' : end } ,
' available_range ' : { ' start ' : cached_data [ ' data_start ' ] , ' end ' : cached_data [ ' data_end ' ] } ,
}
# 保留 info( 如果有)
if info_data :
result [ ' info ' ] = info_data
return result
# 转换为 DataFrame
df = pd . DataFrame ( records )
if ' date ' in df . columns :
df [ ' date ' ] = pd . to_datetime ( df [ ' date ' ] )
df = df . set_index ( ' date ' )
# 恢复 attrs( 如果有 info)
if info_data :
df . attrs [ ' info ' ] = info_data
# 从 attrs 获取 缓存元 数据
code = df . attrs . get ( ' _cache_code ' , ' ' )
asset_type = df . attrs . get ( ' _cache_asset_type ' , ' ' )
adj = df . attrs . get ( ' _cache_adj ' , ' raw ' )
# 切片日期范围
start_dt = pd . to_datetime ( start )
@@ -208,13 +177,24 @@ def _slice_data_from_cache(cached_data: Dict, start: str, end: str) -> Dict:
# 切片(使用 loc 进行日期范围选择)
sliced_df = df . loc [ start_dt : end_dt ]
# 转换为 JSON 格式( dataframe_to_json 会处理 df.attrs['info'])
# 转换为 JSON 格式
result = dataframe_to_json ( sliced_df )
result [ ' code ' ] = cached_data [ ' code ' ]
result [ ' asset_type ' ] = cached_data [ ' asset_type' ]
result [ ' adj ' ] = cached_data . get ( ' adj ' , ' raw ' )
result [ ' code ' ] = code
result [ ' asset_type ' ] = asset_type
result [ ' adj ' ] = adj
result [ ' requested_range ' ] = { ' start ' : start , ' end ' : end }
result [ ' available_range ' ] = { ' start ' : cached_data [ ' data_start ' ] , ' end ' : cached_data [ ' data_end ' ] }
result [ ' available_range ' ] = {
' start ' : df . index . min ( ) . strftime ( ' % Y- % m- %d ' ) if len ( df ) > 0 else None ,
' end ' : df . index . max ( ) . strftime ( ' % Y- % m- %d ' ) if len ( df ) > 0 else None ,
}
# 缓存层职责:只保存和恢复原始 attrs, 不关心业务含义
# attrs 中的 nav、premium 等业务数据由 API 层处理
if sliced_df . attrs :
# 过滤掉内部缓存元数据( _cache_*)
public_attrs = { k : v for k , v in sliced_df . attrs . items ( ) if not k . startswith ( ' _cache_ ' ) }
if public_attrs :
result [ ' attrs ' ] = public_attrs
return result
@@ -225,7 +205,8 @@ def fetch_data_with_ttl(
end : str ,
nocache : bool = False ,
timeframe : str = ' 1d ' ,
adj : str = ' raw '
adj : str = ' raw ' ,
asset_type : Optional [ AssetType ] = None # 新增:可选的资产类型参数
) - > Tuple [ Optional [ Dict ] , bool ] :
"""
获取数据,支持 TTL 缓存(加密货币不缓存)
@@ -242,6 +223,7 @@ def fetch_data_with_ttl(
nocache: 是否跳过缓存
timeframe: K线周期( 仅加密货币需要)
adj: 复权参数( raw/qfq/hfq)
asset_type: 资产类型(可选,如果不提供则自动检测)
Returns:
(data, is_cached): 数据和是否命中缓存
@@ -249,8 +231,9 @@ def fetch_data_with_ttl(
# 获取今天的实际日期( 用于缓存Key)
today = datetime . now ( ) . strftime ( ' % Y- % m- %d ' )
# 检查资产类型
asset_type = AssetTypeDetector . detect ( code )
# 使用传入的 asset_type 或自动检测
if asset_type is None :
asset_type = AssetTypeDetector . detect ( code )
# 加密货币:直接下载,不缓存,必须指定 timeframe
if asset_type == AssetType . CRYPTO :
@@ -272,12 +255,8 @@ def fetch_data_with_ttl(
except Exception as e :
return { ' error ' : str ( e ) , ' code ' : code , ' asset_type ' : asset_type . value } , False
# 校验 adj 参数
valid_adj = UniversalDataFetcher . VALID_ADJ_BY_TYPE . get ( asset_type , [ ' raw ' ] )
if adj not in valid_adj :
return { ' error ' : f " adj= ' { adj } ' 不适用于 { asset_type . value } ,支持: { valid_adj } " , ' code ' : code , ' asset_type ' : asset_type . value } , False
# 日级别数据:使用缓存(缓存 Key 包含 adj)
# adj 参数资产类型兼容性校验在 _fetch_full_data_cached() 中执行
full_cache_key = ( code , today , adj )
# 跳过缓存:清理缓存后重新下载
@@ -285,11 +264,10 @@ def fetch_data_with_ttl(
_fetch_full_data_cached . cache_clear ( )
global _ttl_cache
_ttl_cache . clear ( )
result_json = _fetch_full_data_cached ( code , today , adj )
if result_json is None :
cached_bytes = _fetch_full_data_cached ( code , today , adj )
if cached_bytes is None :
return None , False
full_data = json . loads ( result_json )
return ( _slice_data_from_cache ( full_data , start , end ) , False )
return ( _slice_data_from_cache ( cached_bytes , start , end ) , False )
# 检查 TTL 缓存(全量数据缓存)
if full_cache_key in _ttl_cache :
@@ -301,23 +279,17 @@ def fetch_data_with_ttl(
# 过期,删除
del _ttl_cache [ full_cache_key ]
# 从 LRU 缓存获取全量数据
result_json = _fetch_full_data_cached ( code , today , adj )
# 从 LRU 缓存获取全量数据( pickle bytes)
cached_bytes = _fetch_full_data_cached ( code , today , adj )
if result_json is None :
if cached_bytes is None :
return None , False
full_data = json . loads ( result_json )
# 检查是否有错误
if " error " in full_data :
return full_data , False
# 存入 TTL 缓存(存全量数据)
_ttl_cache [ full_cache_key ] = TimedCacheEntry ( full_data )
# 存入 TTL 缓存(存 pickle bytes)
_ttl_cache [ full_cache_key ] = TimedCacheEntry ( cached_bytes )
# 从全量数据切片返回用户请求的范围
sliced_data = _slice_data_from_cache ( full_data , start , end )
sliced_data = _slice_data_from_cache ( cached_bytes , start , end )
return sliced_data , False
@@ -488,6 +460,57 @@ def build_premium_result(premium_series: pd.Series) -> Dict:
}
def build_premium_result_from_attrs ( premium_data : Dict ) - > Dict :
"""
从 attrs 格式构建溢价率返回结果
Args:
premium_data: attrs 中的溢价率数据,格式为:
{
' type ' : ' series ' ,
' data ' : { date_str: premium_value, ...},
' name ' : ' premium '
}
Returns:
包含 premium_series, latest_premium, premium_date, premium_stats 的字典
"""
if not premium_data or premium_data . get ( ' type ' ) != ' series ' :
return { }
# 从 dict 恢复为 Series
premium_dict = premium_data . get ( ' data ' , { } )
if not premium_dict :
return { }
premium_series = pd . Series ( premium_dict )
premium_series . index = pd . to_datetime ( premium_series . index )
premium_series . index . name = ' date '
# 转换为日期-溢价率列表
premium_list = [
{ " date " : date . strftime ( ' % Y- % m- %d ' ) , " premium " : round ( float ( premium ) , 6 ) }
for date , premium in premium_series . items ( )
]
# 最新溢价率
latest_premium = float ( premium_series . iloc [ - 1 ] )
latest_date = premium_series . index [ - 1 ] . strftime ( ' % Y- % m- %d ' )
return {
" premium_series " : premium_list ,
" latest_premium " : round ( latest_premium , 6 ) ,
" premium_date " : latest_date ,
" premium_stats " : {
" mean " : round ( float ( premium_series . mean ( ) ) , 6 ) ,
" std " : round ( float ( premium_series . std ( ) ) , 6 ) ,
" min " : round ( float ( premium_series . min ( ) ) , 6 ) ,
" max " : round ( float ( premium_series . max ( ) ) , 6 ) ,
" median " : round ( float ( premium_series . median ( ) ) , 6 ) ,
} ,
}
# ============================================================
# API 路由
# ============================================================
@@ -514,8 +537,6 @@ def index():
" ohlcv_nocache " : " /api/v1/ohlcv?code= {code} &nocache=true " ,
" ohlcv_crypto " : " /api/v1/ohlcv?code=BTC&timeframe=1d (加密货币必须指定 timeframe) " ,
" ohlcv_asset_type " : " /api/v1/ohlcv?code= {code} &asset_type=china_index (强制覆盖类型) " ,
" batch " : " POST /api/v1/ohlcv/batch " ,
" etf_nav " : " /api/v1/etf/nav?code= {code} " ,
" cache_clear " : " POST /api/v1/cache/clear " ,
" cache_stats " : " /api/v1/cache/stats " ,
} ,
@@ -632,7 +653,7 @@ def get_ohlcv():
" adj_hint " : " 可选 adj 参数获取复权数据( raw/qfq/hfq) " ,
} ) , 400
# adj 参数验证
# adj 参数基础格式验证(详细的资产类型兼容性校验在 fetch() 中)
if adj not in [ ' raw ' , ' qfq ' , ' hfq ' ] :
return jsonify ( {
" error " : f " Invalid adj parameter: { adj } " ,
@@ -667,14 +688,8 @@ def get_ohlcv():
" valid_types " : [ t . value for t in AssetType ] ,
} ) , 400
# 校验 adj 是否适用于该资产类型
valid_adj = UniversalDataFetcher . VALID_ADJ_BY_TYPE . get ( final_type , [ ' raw ' ] )
if adj not in valid_adj :
return jsonify ( {
" error " : f " adj= ' { adj } ' 不适用于 { final_type . value } " ,
" valid_adj " : valid_adj ,
" hint " : f " { final_type . value } 仅支持复权类型: { valid_adj } " ,
} ) , 400
# adj 参数资产类型兼容性校验(委托给 fetch_data_with_ttl, 内部会调用 UniversalDataFetcher.fetch)
# 如果 adj 不兼容, fetch() 会抛出 ValueError, 由 try-except 处理
# 加密货币必须指定 timeframe( 无论自动检测还是手动指定)
if final_type == AssetType . CRYPTO :
@@ -687,7 +702,8 @@ def get_ohlcv():
} ) , 400
# 使用缓存获取数据(加密货币不缓存)
result , is_cached = fetch_data_with_ttl ( code , start , end , nocache , timeframe , adj )
# 传递 final_type 避免重复检测
result , is_cached = fetch_data_with_ttl ( code , start , end , nocache , timeframe , adj , final_type )
if result is None :
return jsonify ( {
@@ -713,25 +729,19 @@ def get_ohlcv():
result [ ' asset_type ' ] = final_type . value # 使用最终类型
result [ ' adj ' ] = adj # 返回使用的 adj 参数
# 如果是中国 ETF, 附加净值和溢价率数据( 数据层已处理, 通过 df.attrs 传递)
if final_type == AssetType . CHINA_ETF :
try :
f = get_fetcher ( )
with f :
# 调用统一接口,数据通过 DataFrame.attrs 传递
price_df , nav_df , premium_series = f . fetch_etf_with_nav ( code , start , end )
# API 层职责:决定如何使用 attrs 中的业务数据
if ' attrs ' in result :
attrs = result [ ' attrs ' ]
# 添加净值数据
if nav_df is not None and len ( nav_df ) > 0 :
result [ ' nav ' ] = d ataframe_to_json ( nav_df )
# 提取净值到顶层(方便调用方使用)
if ' nav ' in attrs :
result [ ' nav ' ] = attrs [ ' nav ' ]
# 添加 溢价率数据
premium_result = build_premium_result ( premium_serie s )
# 提取 溢价率到顶层(调用业务函数处理格式)
if ' premium ' in attr s:
premium_result = build_premium_result_from_attrs ( attrs [ ' premium ' ] )
if premium_result :
result . update ( premium_result )
except Exception as e :
# 净值获取失败不影响主数据返回
result [ ' nav_error ' ] = str ( e )
# 如果用户指定了类型但与自动检测不同,显示提示
if asset_type_param and detected_type != final_type :
@@ -743,120 +753,6 @@ def get_ohlcv():
return jsonify ( result )
@app.route ( ' /api/v1/ohlcv/batch ' , methods = [ ' POST ' ] )
def batch_ohlcv ( ) :
""" 批量获取多只标的的 OHLCV 数据 """
data = request . get_json ( )
if not data :
return jsonify ( {
" error " : " Missing request body " ,
" example " : {
" codes " : [ " 000300.SH " , " NDX " ] ,
" start " : " 2024-01-01 " ,
" end " : " 2024-03-31 " ,
}
} ) , 400
codes = data . get ( ' codes ' , [ ] )
start = data . get ( ' start ' , ' ' ) . strip ( )
end = data . get ( ' end ' , ' ' ) . strip ( )
if not codes or not isinstance ( codes , list ) :
return jsonify ( {
" error " : " Missing or invalid parameter: codes (must be a list) "
} ) , 400
if not start or not end :
start , end = get_default_dates ( )
if not validate_date ( start ) or not validate_date ( end ) :
return jsonify ( {
" error " : " Invalid date format. Use YYYY-MM-DD " ,
} ) , 400
# 获取数据
f = get_fetcher ( )
results = { }
success_count = 0
failed_count = 0
try :
with f :
for code in codes :
result , _ = fetch_data_with_ttl ( code , start , end )
if result is not None and " error " not in result :
results [ code ] = result
success_count + = 1
else :
results [ code ] = {
" code " : code ,
" asset_type " : AssetTypeDetector . detect ( code ) . value ,
" error " : result . get ( " error " , " No data " ) if result else " No data " ,
" data " : [ ] ,
" count " : 0 ,
}
failed_count + = 1
except Exception as e :
return jsonify ( { " error " : f " Batch fetch failed: { str ( e ) } " } ) , 500
return jsonify ( {
" results " : results ,
" success_count " : success_count ,
" failed_count " : failed_count ,
" total " : len ( codes ) ,
" start " : start ,
" end " : end ,
} )
@app.route ( ' /api/v1/etf/nav ' )
def get_etf_nav ( ) :
""" 获取ETF净值数据( 用于计算溢价率) """
code = request . args . get ( ' code ' , ' ' ) . strip ( )
start = request . args . get ( ' start ' , ' ' ) . strip ( )
end = request . args . get ( ' end ' , ' ' ) . strip ( )
if not code :
return jsonify ( {
" error " : " Missing required parameter: code " ,
" example " : " /api/v1/etf/nav?code=513100.SH "
} ) , 400
if not start or not end :
start , end = get_default_dates ( )
# 检查是否为ETF
asset_type = AssetTypeDetector . detect ( code )
if asset_type != AssetType . CHINA_ETF :
return jsonify ( {
" error " : f " Not an ETF: { code } (type: { asset_type . value } ) " ,
" hint " : " Only A股ETF (codes starting with 51/52/15/16) supported " ,
} ) , 400
# 获取净值和溢价率
f = get_fetcher ( )
try :
with f :
price_df , nav_df , premium_series = f . fetch_etf_with_nav ( code , start , end )
result = {
" code " : code ,
" price " : dataframe_to_json ( price_df ) if price_df else { " data " : [ ] , " count " : 0 } ,
" nav " : dataframe_to_json ( nav_df ) if nav_df else { " data " : [ ] , " count " : 0 } ,
}
# 添加溢价率数据(使用抽取的函数)
premium_result = build_premium_result ( premium_series )
if premium_result :
result . update ( premium_result )
return jsonify ( result )
except Exception as e :
return jsonify ( { " error " : str ( e ) } ) , 500
@app.route ( ' /api/v1/cache/clear ' , methods = [ ' POST ' ] )
def clear_cache_endpoint ( ) :
@@ -888,8 +784,6 @@ def not_found(error):
" / " , " /health " ,
" /api/v1/asset-type " ,
" /api/v1/ohlcv " ,
" /api/v1/ohlcv/batch " ,
" /api/v1/etf/nav " ,
" /api/v1/cache/clear " ,
" /api/v1/cache/stats " ,
]