diff --git a/core/datasource/hybrid_source.py b/core/datasource/hybrid_source.py index 304a788..30dbcc7 100644 --- a/core/datasource/hybrid_source.py +++ b/core/datasource/hybrid_source.py @@ -113,14 +113,12 @@ class HybridDataSource: # YFinance 代码映射 (代码 -> YFinance格式) YF_CODE_MAP = { # 港股 - "HSTECH": "3033.HK", # 恒生科技指数 ETF + "HSTECH.HK": "3033.HK", # 恒生科技指数ETF (Yahoo Finance代码) "HSI": "^HSI", # 恒生指数 # 美股指数 "NDX": "^NDX", # 纳斯达克100 "SPX": "^GSPC", # 标普500 "DJI": "^DJI", # 道琼斯 - # 黄金 - "GC=F": "GC=F", # 黄金期货 } # CCXT 代码映射 (代码 -> CCXT格式) @@ -128,6 +126,11 @@ class HybridDataSource: "BTC": "BTC/USDT", # OKX 比特币现货 "ETH": "ETH/USDT", # OKX 以太坊现货 } + + # 期货代码映射 (代码 -> Tushare格式) + FUTURES_CODE_MAP = { + "AU.SHF": "AU.SHF", # 上海期货交易所黄金主力合约 + } def __init__(self, ssh_config: Optional[dict] = None, use_cache: bool = True): self.ssh_config = ssh_config or {} @@ -142,6 +145,10 @@ class HybridDataSource: def _is_crypto(self, code: str) -> bool: """判断是否为加密货币""" return code in self.CCXT_CODE_MAP + + def _is_futures(self, code: str) -> bool: + """判断是否为期货合约""" + return code in self.FUTURES_CODE_MAP def _get_tushare_token(self) -> str: """获取 Tushare Token""" @@ -402,12 +409,59 @@ class HybridDataSource: print(f"CCXT 下载 {code} ({ccxt_code}) 失败: {e}") return None + def _fetch_futures(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: + """使用 Tushare 获取期货数据(含夜盘,数据逻辑类似加密货币)""" + import tushare as ts + + # 转换代码格式 + futures_code = self.FUTURES_CODE_MAP.get(code, code) + + # 转换日期格式 + start_str = start_date.replace('-', '') + end_str = end_date.replace('-', '') + + try: + pro = ts.pro_api(self._get_tushare_token()) + + # 获取期货日线数据 + df = pro.fut_daily(ts_code=futures_code, start_date=start_str, end_date=end_str) + + if df is None or df.empty: + return None + + # 标准化列名 + df = df.rename(columns={ + 'trade_date': 'date', + 'open': 'open', + 'high': 'high', + 'low': 'low', + 'close': 'close', + 'vol': 'volume', + }) + + # 转换日期格式 + df['date'] = pd.to_datetime(df['date']) + df = df.set_index('date') + df = df.sort_index() + + # 选择需要的列 + df = df[['open', 'high', 'low', 'close', 'volume']] + df['code'] = code + + return df + + except Exception as e: + print(f"Tushare 期货下载 {code} ({futures_code}) 失败: {e}") + return None + def fetch_single(self, code: str, start_date: str, end_date: str, http_proxy: str = None) -> Optional[pd.DataFrame]: """获取单个标的的数据""" if self._is_china_index(code): return self._fetch_tushare(code, start_date, end_date) elif self._is_crypto(code): return self._fetch_ccxt(code, start_date, end_date, http_proxy) + elif self._is_futures(code): + return self._fetch_futures(code, start_date, end_date) else: return self._fetch_yfinance(code, start_date, end_date) @@ -451,9 +505,11 @@ class HybridDataSource: print(f" ETF映射: {len(etf_codes)} 只") china_codes = [c for c in index_codes if self._is_china_index(c)] - global_codes = [c for c in index_codes if not self._is_china_index(c)] + futures_codes = [c for c in index_codes if self._is_futures(c)] + yf_codes = [c for c in index_codes if not self._is_china_index(c) and not self._is_futures(c)] print(f" 中国A股指数: {len(china_codes)} 只") - print(f" 港股/美股/加密货币: {len(global_codes)} 只") + print(f" 期货合约: {len(futures_codes)} 只") + print(f" 港股/美股/加密货币: {len(yf_codes)} 只") # 检查是否需要启动 socks2http 代理(用于加密货币) crypto_codes = [c for c in index_codes if self._is_crypto(c)] @@ -484,6 +540,8 @@ class HybridDataSource: for code in index_codes: if self._is_china_index(code): source = "Tushare" + elif self._is_futures(code): + source = "Tushare/期货" elif self._is_crypto(code): source = "CCXT/OKX" else: @@ -576,22 +634,41 @@ class HybridDataSource: ) # 以A股交易日为基准,对齐所有数据 - tushare_codes = [c for c in valid_codes if self._is_china_index(c)] - if tushare_codes: - primary_dates = index_data[tushare_codes[0]].dropna().index - print(f" 主市场交易日: {len(primary_dates)} 天") - - # 重新索引到主市场交易日 - index_data = index_data.reindex(primary_dates) - - # 对所有非A股指数进行前向填充 - # 所有市场(港股、美股、黄金、加密货币)在T+1日09:00前都已收盘 - non_a_codes = [c for c in valid_codes if not self._is_china_index(c)] - for code in non_a_codes: - if code in index_data.columns: - index_data[code] = index_data[code].ffill().bfill() - - print(f" 非A股标的: {len(non_a_codes)} 只 (已前向填充)") + # 强制从 Tushare 获取A股交易日历(不管配置中是否有A股指数) + start_str = index_data.index.min().strftime('%Y%m%d') + end_str = index_data.index.max().strftime('%Y%m%d') + + import tushare as ts + pro = ts.pro_api(self._get_tushare_token()) + trade_cal = pro.trade_cal(start_date=start_str, end_date=end_str, is_open='1') + a_share_dates = pd.to_datetime(trade_cal['cal_date']).sort_values() + + print(f" A股交易日历: {len(a_share_dates)} 天 ({start_str} ~ {end_str})") + + # 重新索引到A股交易日 + index_data = index_data.reindex(a_share_dates) + + # 对非A股指数进行数据对齐 + # 港股、美股:T日收盘,T+1日09:00前使用T日数据(前向填充) + # 加密货币、期货(含夜盘):T+1日09:00前使用T+1日数据(后向填充) + # - 加密货币UTC 00:00收盘(北京时间08:00) + # - 期货AU.SHF夜盘02:30收盘,数据标记为T+1日 + non_a_codes = [c for c in valid_codes if not self._is_china_index(c)] + yf_codes = [c for c in non_a_codes if not self._is_crypto(c) and not self._is_futures(c)] + crypto_futures_codes = [c for c in non_a_codes if self._is_crypto(c) or self._is_futures(c)] + + # 港股/美股:前向填充(使用T日数据) + for code in yf_codes: + if code in index_data.columns: + index_data[code] = index_data[code].ffill().bfill() + + # 加密货币/期货:后向填充(使用T+1日数据) + for code in crypto_futures_codes: + if code in index_data.columns: + index_data[code] = index_data[code].bfill().ffill() + + if non_a_codes: + print(f" 非A股标的: {len(non_a_codes)} 只 (港股/美股:ffill, 加密货币/期货:bfill)") print(f" 时间范围: {index_data.index[0]} ~ {index_data.index[-1]}") print(f" 交易日数: {len(index_data)}") @@ -613,9 +690,8 @@ class HybridDataSource: aggfunc='first' ) - # 对齐到主市场交易日 - if tushare_codes: - etf_data = etf_data.reindex(primary_dates) + # 对齐到A股交易日 + etf_data = etf_data.reindex(a_share_dates) print(f" ETF价格数据: {len(etf_data.columns)} 只") else: @@ -641,10 +717,9 @@ class HybridDataSource: aggfunc='first' ) - # 对齐到主市场交易日,并前向填充缺失值(净值数据通常T+1更新) - if tushare_codes: - etf_nav_data = etf_nav_data.reindex(primary_dates) - etf_nav_data = etf_nav_data.ffill() # 前向填充缺失的净值数据 + # 对齐到A股交易日,并前向填充缺失值(净值数据通常T+1更新) + etf_nav_data = etf_nav_data.reindex(a_share_dates) + etf_nav_data = etf_nav_data.ffill() # 前向填充缺失的净值数据 print(f" ETF净值数据: {len(etf_nav_data.columns)} 只") @@ -652,9 +727,8 @@ class HybridDataSource: benchmark_data = self.fetch_single(benchmark_code, start_date, end_date) if benchmark_data is not None: benchmark_data.index = pd.to_datetime(benchmark_data.index, utc=True).tz_localize(None).normalize() - # 对齐到主市场交易日 - if tushare_codes: - benchmark_data = benchmark_data.reindex(primary_dates) + # 对齐到A股交易日 + benchmark_data = benchmark_data.reindex(a_share_dates) print(f"\n✓ 基准 {benchmark_code}: {len(benchmark_data)} 条") return index_data, etf_data, etf_nav_data, benchmark_data, valid_codes