feat(datasource): 添加期货数据支持及优化数据对齐逻辑

- 新增期货代码映射及判断函数,支持上海期货交易所黄金主力合约
- 实现通过Tushare接口获取期货日线数据,包含夜盘数据处理
- fetch_single方法新增期货数据的调用逻辑
- 细化标的分类,将期货单独列出用于数据处理和日志输出
- 强制从Tushare获取A股交易日历,确保数据对齐的准确性
- 优化各类标的对齐逻辑,区别处理港股美股、加密货币与期货的前后向填充
- 统一ETF和基准数据对齐到A股交易日,改进数据一致性和完整性
This commit is contained in:
2026-03-26 00:06:26 +08:00
parent 2dde3c89c5
commit e4a5845916

View File

@@ -113,14 +113,12 @@ class HybridDataSource:
# YFinance 代码映射 (代码 -> YFinance格式)
YF_CODE_MAP = {
# 港股
"HSTECH": "3033.HK", # 恒生科技指数 ETF
"HSTECH.HK": "3033.HK", # 恒生科技指数ETF (Yahoo Finance代码)
"HSI": "^HSI", # 恒生指数
# 美股指数
"NDX": "^NDX", # 纳斯达克100
"SPX": "^GSPC", # 标普500
"DJI": "^DJI", # 道琼斯
# 黄金
"GC=F": "GC=F", # 黄金期货
}
# CCXT 代码映射 (代码 -> CCXT格式)
@@ -128,6 +126,11 @@ class HybridDataSource:
"BTC": "BTC/USDT", # OKX 比特币现货
"ETH": "ETH/USDT", # OKX 以太坊现货
}
# 期货代码映射 (代码 -> Tushare格式)
FUTURES_CODE_MAP = {
"AU.SHF": "AU.SHF", # 上海期货交易所黄金主力合约
}
def __init__(self, ssh_config: Optional[dict] = None, use_cache: bool = True):
self.ssh_config = ssh_config or {}
@@ -142,6 +145,10 @@ class HybridDataSource:
def _is_crypto(self, code: str) -> bool:
"""判断是否为加密货币"""
return code in self.CCXT_CODE_MAP
def _is_futures(self, code: str) -> bool:
"""判断是否为期货合约"""
return code in self.FUTURES_CODE_MAP
def _get_tushare_token(self) -> str:
"""获取 Tushare Token"""
@@ -402,12 +409,59 @@ class HybridDataSource:
print(f"CCXT 下载 {code} ({ccxt_code}) 失败: {e}")
return None
def _fetch_futures(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""使用 Tushare 获取期货数据(含夜盘,数据逻辑类似加密货币)"""
import tushare as ts
# 转换代码格式
futures_code = self.FUTURES_CODE_MAP.get(code, code)
# 转换日期格式
start_str = start_date.replace('-', '')
end_str = end_date.replace('-', '')
try:
pro = ts.pro_api(self._get_tushare_token())
# 获取期货日线数据
df = pro.fut_daily(ts_code=futures_code, start_date=start_str, end_date=end_str)
if df is None or df.empty:
return None
# 标准化列名
df = df.rename(columns={
'trade_date': 'date',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'vol': 'volume',
})
# 转换日期格式
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
df = df.sort_index()
# 选择需要的列
df = df[['open', 'high', 'low', 'close', 'volume']]
df['code'] = code
return df
except Exception as e:
print(f"Tushare 期货下载 {code} ({futures_code}) 失败: {e}")
return None
def fetch_single(self, code: str, start_date: str, end_date: str, http_proxy: str = None) -> Optional[pd.DataFrame]:
"""获取单个标的的数据"""
if self._is_china_index(code):
return self._fetch_tushare(code, start_date, end_date)
elif self._is_crypto(code):
return self._fetch_ccxt(code, start_date, end_date, http_proxy)
elif self._is_futures(code):
return self._fetch_futures(code, start_date, end_date)
else:
return self._fetch_yfinance(code, start_date, end_date)
@@ -451,9 +505,11 @@ class HybridDataSource:
print(f" ETF映射: {len(etf_codes)}")
china_codes = [c for c in index_codes if self._is_china_index(c)]
global_codes = [c for c in index_codes if not self._is_china_index(c)]
futures_codes = [c for c in index_codes if self._is_futures(c)]
yf_codes = [c for c in index_codes if not self._is_china_index(c) and not self._is_futures(c)]
print(f" 中国A股指数: {len(china_codes)}")
print(f" 港股/美股/加密货币: {len(global_codes)}")
print(f" 期货合约: {len(futures_codes)}")
print(f" 港股/美股/加密货币: {len(yf_codes)}")
# 检查是否需要启动 socks2http 代理(用于加密货币)
crypto_codes = [c for c in index_codes if self._is_crypto(c)]
@@ -484,6 +540,8 @@ class HybridDataSource:
for code in index_codes:
if self._is_china_index(code):
source = "Tushare"
elif self._is_futures(code):
source = "Tushare/期货"
elif self._is_crypto(code):
source = "CCXT/OKX"
else:
@@ -576,22 +634,41 @@ class HybridDataSource:
)
# 以A股交易日为基准对齐所有数据
tushare_codes = [c for c in valid_codes if self._is_china_index(c)]
if tushare_codes:
primary_dates = index_data[tushare_codes[0]].dropna().index
print(f" 主市场交易日: {len(primary_dates)}")
# 重新索引到主市场交易日
index_data = index_data.reindex(primary_dates)
# 对所有非A股指数进行前向填充
# 所有市场港股、美股、黄金、加密货币在T+1日09:00前都已收盘
non_a_codes = [c for c in valid_codes if not self._is_china_index(c)]
for code in non_a_codes:
if code in index_data.columns:
index_data[code] = index_data[code].ffill().bfill()
print(f" 非A股标的: {len(non_a_codes)} 只 (已前向填充)")
# 强制从 Tushare 获取A股交易日历不管配置中是否有A股指数
start_str = index_data.index.min().strftime('%Y%m%d')
end_str = index_data.index.max().strftime('%Y%m%d')
import tushare as ts
pro = ts.pro_api(self._get_tushare_token())
trade_cal = pro.trade_cal(start_date=start_str, end_date=end_str, is_open='1')
a_share_dates = pd.to_datetime(trade_cal['cal_date']).sort_values()
print(f" A股交易日历: {len(a_share_dates)} 天 ({start_str} ~ {end_str})")
# 重新索引到A股交易日
index_data = index_data.reindex(a_share_dates)
# 对非A股指数进行数据对齐
# 港股、美股T日收盘T+1日09:00前使用T日数据前向填充
# 加密货币、期货含夜盘T+1日09:00前使用T+1日数据后向填充
# - 加密货币UTC 00:00收盘北京时间08:00
# - 期货AU.SHF夜盘02:30收盘数据标记为T+1日
non_a_codes = [c for c in valid_codes if not self._is_china_index(c)]
yf_codes = [c for c in non_a_codes if not self._is_crypto(c) and not self._is_futures(c)]
crypto_futures_codes = [c for c in non_a_codes if self._is_crypto(c) or self._is_futures(c)]
# 港股/美股前向填充使用T日数据
for code in yf_codes:
if code in index_data.columns:
index_data[code] = index_data[code].ffill().bfill()
# 加密货币/期货后向填充使用T+1日数据
for code in crypto_futures_codes:
if code in index_data.columns:
index_data[code] = index_data[code].bfill().ffill()
if non_a_codes:
print(f" 非A股标的: {len(non_a_codes)} 只 (港股/美股:ffill, 加密货币/期货:bfill)")
print(f" 时间范围: {index_data.index[0]} ~ {index_data.index[-1]}")
print(f" 交易日数: {len(index_data)}")
@@ -613,9 +690,8 @@ class HybridDataSource:
aggfunc='first'
)
# 对齐到主市场交易日
if tushare_codes:
etf_data = etf_data.reindex(primary_dates)
# 对齐到A股交易日
etf_data = etf_data.reindex(a_share_dates)
print(f" ETF价格数据: {len(etf_data.columns)}")
else:
@@ -641,10 +717,9 @@ class HybridDataSource:
aggfunc='first'
)
# 对齐到主市场交易日并前向填充缺失值净值数据通常T+1更新
if tushare_codes:
etf_nav_data = etf_nav_data.reindex(primary_dates)
etf_nav_data = etf_nav_data.ffill() # 前向填充缺失的净值数据
# 对齐到A股交易日并前向填充缺失值净值数据通常T+1更新
etf_nav_data = etf_nav_data.reindex(a_share_dates)
etf_nav_data = etf_nav_data.ffill() # 前向填充缺失的净值数据
print(f" ETF净值数据: {len(etf_nav_data.columns)}")
@@ -652,9 +727,8 @@ class HybridDataSource:
benchmark_data = self.fetch_single(benchmark_code, start_date, end_date)
if benchmark_data is not None:
benchmark_data.index = pd.to_datetime(benchmark_data.index, utc=True).tz_localize(None).normalize()
# 对齐到主市场交易日
if tushare_codes:
benchmark_data = benchmark_data.reindex(primary_dates)
# 对齐到A股交易日
benchmark_data = benchmark_data.reindex(a_share_dates)
print(f"\n✓ 基准 {benchmark_code}: {len(benchmark_data)}")
return index_data, etf_data, etf_nav_data, benchmark_data, valid_codes