Files
etf/datasource/yfinance_source.py
aszerW 4f9e0231bd fix(datasource): yfinance时区标准化与NaN过滤修复
- yfinance_source.py: 用 tz_localize(None) 替代 pd.to_datetime(utc=True),
  避免亚洲/欧洲市场因UTC转换导致日期回退一天(如日经225 5/25→5/24)
- yfinance_source.py: 新增 _normalize_index() 静态方法统一处理时区剥除
- yfinance_source.py: fetch() 增加 close=NaN 行过滤(yfinance未收盘日返回不完整数据)
- flask_api_source.py: 客户端同步增加 close=NaN 过滤防御

验证结果:N225 5/25-6/3 返回7个交易日数据,日期无偏移
2026-06-03 09:14:39 +08:00

240 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YFinance数据源
获取港股、美股数据通过SSH隧道
"""
import os
import time
from typing import Optional
from datetime import datetime, timedelta
import pandas as pd
import urllib3
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class YFinanceSource:
"""YFinance数据源"""
# 代码映射(项目代码 -> YFinance格式
CODE_MAP = {
# 港股
"HSTECH.HK": "3033.HK", # 恒生科技指数
"HSI": "^HSI", # 恒生指数
# 美股指数
"NDX": "^NDX", # 纳斯达克100
"SPX": "^GSPC", # 标普500
"DJI": "^DJI", # 道琼斯
# 日本/欧洲
"N225": "^N225", # 日经225
"GDAXI": "^GDAXI", # 德国DAX
# 商品
"CL.NYM": "CL=F", # WTI原油期货
}
def __init__(self, use_ssh_tunnel: bool = False):
"""
初始化YFinance数据源
Args:
use_ssh_tunnel: 是否使用SSH隧道需先启动SSHTunnelManager
"""
self.use_ssh_tunnel = use_ssh_tunnel
self._delay = 0.5 # 请求延迟(避免限流)
@staticmethod
def _normalize_index(index: pd.DatetimeIndex) -> pd.DatetimeIndex:
"""
标准化日期索引,保留交易所本地日期
yfinance 返回的时间戳带有交易所本地时区:
- 美股: 00:00-04:00 (US/Eastern) → 剥 tz 后 00:00日期不变
- 日本: 00:00+09:00 (Asia/Tokyo) → 剥 tz 后 00:00日期不变
- 欧洲: 00:00+02:00 (CET) → 剥 tz 后 00:00日期不变
关键:直接 tz_localize(None) 剥除时区,不做 UTC 转换。
错误示范pd.to_datetime(idx, utc=True) 会先把日本 00:00+09:00 转成
前一天 15:00 UTC导致日期回退一天。
"""
if index.tz is not None:
# tz_localize(None) 直接剥除时区,保留本地时间部分
index = index.tz_localize(None)
return index.normalize()
def fetch(self, code: str, start_date: str, end_date: str, adj: str = 'raw') -> Optional[pd.DataFrame]:
"""
获取数据(支持 adj 参数)
Args:
code: 代码(如 'NDX', 'N225', 'HSI', 'AAPL'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
Returns:
DataFrame with columns: date, open, high, low, close, volume
股票元信息存储在 df.attrs['info'] 中
adj='qfq/hfq' 时 df.attrs['adj'] 会标记复权类型
"""
# 校验 adj 参数
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'raw', 'qfq''hfq',当前: {adj}")
# 复权数据:调用 fetch_adj
if adj in ['qfq', 'hfq']:
return self.fetch_adj(code, start_date, end_date, adj)
# 原始数据:以下为原有逻辑
import yfinance as yf
# 添加延迟避免限流
time.sleep(self._delay)
# 转换代码格式
yf_code = self.CODE_MAP.get(code, code)
try:
ticker = yf.Ticker(yf_code)
# 获取股票信息(仅对股票/ETF有效指数可能没有
stock_info = {}
try:
stock_info = ticker.info or {}
except Exception:
pass # 指数可能没有info
# end_date 需要加一天yfinance的end是排他的
end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
# auto_adjust=False 获取不复权价格
df = ticker.history(
start=start_date,
end=end_dt.strftime("%Y-%m-%d"),
auto_adjust=False
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
})
# 确保索引是日期格式(保留交易所本地日期,避免 UTC 转换导致跨日偏移)
df.index = self._normalize_index(df.index)
df.index.name = "date"
# 过滤 yfinance 返回的不完整数据(未收盘日 close=NaN, volume=0
nan_mask = df['close'].isna()
if nan_mask.any():
df = df[~nan_mask]
if len(df) == 0:
return None
# 添加代码列
df["code"] = code
# 将股票信息存储到 DataFrame.attrs 中(最外层结构)
df.attrs['info'] = stock_info
df.attrs['code'] = code
df.attrs['adj'] = 'raw'
return df[['code', 'open', 'high', 'low', 'close', 'volume']]
except Exception as e:
print(f"YFinance下载 {code} ({yf_code}) 失败: {e}")
return None
def fetch_adj(self, code: str, start_date: str, end_date: str, adj: str = 'qfq') -> Optional[pd.DataFrame]:
"""
获取复权价格数据
统一 adj 参数设计:
- 'qfq': 前复权 → yfinance auto_adjust=True (当前价不变)
- 'hfq': 后复权 → yfinance back_adjust=True (历史价不变)
Args:
code: 代码(如 'AAPL', 'TSLA', 'QQQ', '00700.HK'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, code, open, high, low, close, volume (复权后)
"""
import yfinance as yf
if adj not in ['qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'qfq''hfq',当前: {adj}")
# 添加延迟避免限流
time.sleep(self._delay)
# 转换代码格式
yf_code = self.CODE_MAP.get(code, code)
# adj 参数映射到 yfinance 参数
# qfq(前复权) = auto_adjust=True, back_adjust=False (当前价不变)
# hfq(后复权) = auto_adjust=False, back_adjust=True (历史价不变)
adjust_params = {
'qfq': {'auto_adjust': True, 'back_adjust': False},
'hfq': {'auto_adjust': False, 'back_adjust': True},
}
try:
ticker = yf.Ticker(yf_code)
# end_date 需要加一天yfinance的end是排他的
end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
# 根据 adj 参数设置复权方式
params = adjust_params[adj]
df = ticker.history(
start=start_date,
end=end_dt.strftime("%Y-%m-%d"),
auto_adjust=params['auto_adjust'],
back_adjust=params['back_adjust']
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
})
# 确保索引是日期格式(保留交易所本地日期,避免 UTC 转换导致跨日偏移)
df.index = self._normalize_index(df.index)
df.index.name = "date"
# 添加代码列和标记
df["code"] = code
df.attrs['code'] = code
df.attrs['adj'] = adj
return df[['code', 'open', 'high', 'low', 'close', 'volume']]
except Exception as e:
print(f"YFinance下载复权数据 {code} ({yf_code}) adj={adj} 失败: {e}")
return None
def is_yfinance_code(self, code: str) -> bool:
"""判断是否需要YFinance获取"""
# 非A股代码
china_suffixes = ['.SH', '.SZ', '.SS', '.CSI']
futures_suffixes = ['.SHF', '.NYM', '.DCE', '.CZC']
# A股或期货用Tushare其他用YFinance
return not any(code.endswith(s) for s in china_suffixes + futures_suffixes)