@@ -113,14 +113,12 @@ class HybridDataSource:
# YFinance 代码映射 (代码 -> YFinance格式)
YF_CODE_MAP = {
# 港股
" HSTECH " : " 3033.HK " , # 恒生科技指数 ETF
" HSTECH.HK " : " 3033.HK " , # 恒生科技指数ETF (Yahoo Finance代码)
" HSI " : " ^HSI " , # 恒生指数
# 美股指数
" NDX " : " ^NDX " , # 纳斯达克100
" SPX " : " ^GSPC " , # 标普500
" DJI " : " ^DJI " , # 道琼斯
# 黄金
" GC=F " : " GC=F " , # 黄金期货
}
# CCXT 代码映射 (代码 -> CCXT格式)
@@ -129,6 +127,11 @@ class HybridDataSource:
" ETH " : " ETH/USDT " , # OKX 以太坊现货
}
# 期货代码映射 (代码 -> Tushare格式)
FUTURES_CODE_MAP = {
" AU.SHF " : " AU.SHF " , # 上海期货交易所黄金主力合约
}
def __init__ ( self , ssh_config : Optional [ dict ] = None , use_cache : bool = True ) :
self . ssh_config = ssh_config or { }
self . use_cache = use_cache
@@ -143,6 +146,10 @@ class HybridDataSource:
""" 判断是否为加密货币 """
return code in self . CCXT_CODE_MAP
def _is_futures ( self , code : str ) - > bool :
""" 判断是否为期货合约 """
return code in self . FUTURES_CODE_MAP
def _get_tushare_token ( self ) - > str :
""" 获取 Tushare Token """
if self . _tushare_token is None :
@@ -402,12 +409,59 @@ class HybridDataSource:
print ( f " CCXT 下载 { code } ( { ccxt_code } ) 失败: { e } " )
return None
def _fetch_futures ( self , code : str , start_date : str , end_date : str ) - > Optional [ pd . DataFrame ] :
""" 使用 Tushare 获取期货数据(含夜盘,数据逻辑类似加密货币) """
import tushare as ts
# 转换代码格式
futures_code = self . FUTURES_CODE_MAP . get ( code , code )
# 转换日期格式
start_str = start_date . replace ( ' - ' , ' ' )
end_str = end_date . replace ( ' - ' , ' ' )
try :
pro = ts . pro_api ( self . _get_tushare_token ( ) )
# 获取期货日线数据
df = pro . fut_daily ( ts_code = futures_code , start_date = start_str , end_date = end_str )
if df is None or df . empty :
return None
# 标准化列名
df = df . rename ( columns = {
' trade_date ' : ' date ' ,
' open ' : ' open ' ,
' high ' : ' high ' ,
' low ' : ' low ' ,
' close ' : ' close ' ,
' vol ' : ' volume ' ,
} )
# 转换日期格式
df [ ' date ' ] = pd . to_datetime ( df [ ' date ' ] )
df = df . set_index ( ' date ' )
df = df . sort_index ( )
# 选择需要的列
df = df [ [ ' open ' , ' high ' , ' low ' , ' close ' , ' volume ' ] ]
df [ ' code ' ] = code
return df
except Exception as e :
print ( f " Tushare 期货下载 { code } ( { futures_code } ) 失败: { e } " )
return None
def fetch_single ( self , code : str , start_date : str , end_date : str , http_proxy : str = None ) - > Optional [ pd . DataFrame ] :
""" 获取单个标的的数据 """
if self . _is_china_index ( code ) :
return self . _fetch_tushare ( code , start_date , end_date )
elif self . _is_crypto ( code ) :
return self . _fetch_ccxt ( code , start_date , end_date , http_proxy )
elif self . _is_futures ( code ) :
return self . _fetch_futures ( code , start_date , end_date )
else :
return self . _fetch_yfinance ( code , start_date , end_date )
@@ -451,9 +505,11 @@ class HybridDataSource:
print ( f " ETF映射: { len ( etf_codes ) } 只 " )
china_codes = [ c for c in index_codes if self . _is_china_index ( c ) ]
global _codes = [ c for c in index_codes if not self . _is_china_index ( c ) ]
futures _codes = [ c for c in index_codes if self . _is_futures ( c ) ]
yf_codes = [ c for c in index_codes if not self . _is_china_index ( c ) and not self . _is_futures ( c ) ]
print ( f " 中国A股指数: { len ( china_codes ) } 只 " )
print ( f " 港股/美股/加密货币 : { len ( global _codes) } 只 " )
print ( f " 期货合约 : { len ( futures _codes) } 只 " )
print ( f " 港股/美股/加密货币: { len ( yf_codes ) } 只 " )
# 检查是否需要启动 socks2http 代理(用于加密货币)
crypto_codes = [ c for c in index_codes if self . _is_crypto ( c ) ]
@@ -484,6 +540,8 @@ class HybridDataSource:
for code in index_codes :
if self . _is_china_index ( code ) :
source = " Tushare "
elif self . _is_futures ( code ) :
source = " Tushare/期货 "
elif self . _is_crypto ( code ) :
source = " CCXT/OKX "
else :
@@ -576,22 +634,41 @@ class HybridDataSource:
)
# 以A股交易日为基准, 对齐所有数据
tushare_codes = [ c for c in valid_codes if self . _is_china_index ( c ) ]
if tushare_codes :
primary_dates = index_data [ tushare_codes [ 0 ] ] . dropna ( ) . index
print ( f " 主市场交易日: { len ( primary_dates ) } 天 " )
# 强制从 Tushare 获取A股交易日历( 不管配置中是否有A股指数)
start_str = index_data . index . min ( ) . strftime ( ' % Y % m %d ' )
end_str = index_data . index . max ( ) . strftime ( ' % Y % m %d ' )
# 重新索引到主市场交易日
index_data = index_data . reindex ( primary_dates )
import tushare as ts
pro = ts . pro_api ( self . _get_tushare_token ( ) )
trade_cal = pro . trade_cal ( start_date = start_str , end_date = end_str , is_open = ' 1 ' )
a_share_dates = pd . to_datetime ( trade_cal [ ' cal_date ' ] ) . sort_values ( )
# 对所有非A股指数进行前向填充
# 所有市场( 港股、美股、黄金、加密货币) 在T+1日09:00前都已收盘
print ( f " A股交易日历: { len ( a_share_dates ) } 天 ( { start_str } ~ { end_str } ) " )
# 重新索引到A股交易日
index_data = index_data . reindex ( a_share_dates )
# 对非A股指数进行数据对齐
# 港股、美股: T日收盘, T+1日09:00前使用T日数据( 前向填充)
# 加密货币、期货( 含夜盘) : T+1日09:00前使用T+1日数据( 后向填充)
# - 加密货币UTC 00:00收盘( 北京时间08:00)
# - 期货AU.SHF夜盘02:30收盘, 数据标记为T+1日
non_a_codes = [ c for c in valid_codes if not self . _is_china_index ( c ) ]
for code in non_a_codes :
yf_codes = [ c for c in non_a_codes if not self . _is_crypto ( c ) and not self . _is_futures ( c ) ]
crypto_futures_codes = [ c for c in non_a_codes if self . _is_crypto ( c ) or self . _is_futures ( c ) ]
# 港股/美股: 前向填充( 使用T日数据)
for code in yf_codes :
if code in index_data . columns :
index_data [ code ] = index_data [ code ] . ffill ( ) . bfill ( )
print ( f " 非A股标的: { len ( non_a_codes ) } 只 (已前向填充) " )
# 加密货币/期货: 后向填充( 使用T+1日数据)
for code in crypto_futures_codes :
if code in index_data . columns :
index_data [ code ] = index_data [ code ] . bfill ( ) . ffill ( )
if non_a_codes :
print ( f " 非A股标的: { len ( non_a_codes ) } 只 (港股/美股:ffill, 加密货币/期货:bfill) " )
print ( f " 时间范围: { index_data . index [ 0 ] } ~ { index_data . index [ - 1 ] } " )
print ( f " 交易日数: { len ( index_data ) } " )
@@ -613,9 +690,8 @@ class HybridDataSource:
aggfunc = ' first '
)
# 对齐到主市场 交易日
if tu share_cod es:
etf_data = etf_data . reindex ( primary_dates )
# 对齐到A股 交易日
etf_data = etf_data . reindex ( a_ share_dat es)
print ( f " ETF价格数据: { len ( etf_data . columns ) } 只 " )
else :
@@ -641,9 +717,8 @@ class HybridDataSource:
aggfunc = ' first '
)
# 对齐到主市场 交易日, 并前向填充缺失值( 净值数据通常T+1更新)
if tu share_cod es:
etf_nav_data = etf_nav_data . reindex ( primary_dates )
# 对齐到A股 交易日, 并前向填充缺失值( 净值数据通常T+1更新)
etf_nav_data = etf_nav_data . reindex ( a_ share_dat es)
etf_nav_data = etf_nav_data . ffill ( ) # 前向填充缺失的净值数据
print ( f " ETF净值数据: { len ( etf_nav_data . columns ) } 只 " )
@@ -652,9 +727,8 @@ class HybridDataSource:
benchmark_data = self . fetch_single ( benchmark_code , start_date , end_date )
if benchmark_data is not None :
benchmark_data . index = pd . to_datetime ( benchmark_data . index , utc = True ) . tz_localize ( None ) . normalize ( )
# 对齐到主市场 交易日
if tu share_cod es:
benchmark_data = benchmark_data . reindex ( primary_dates )
# 对齐到A股 交易日
benchmark_data = benchmark_data . reindex ( a_ share_dat es)
print ( f " \n ✓ 基准 { benchmark_code } : { len ( benchmark_data ) } 条 " )
return index_data , etf_data , etf_nav_data , benchmark_data , valid_codes