Files
etf/datasource/yfinance_source.py
aszerW 02dbc7bd7d refactor(datasource): 底层fetch方法添加adj参数
TushareSource.fetch() 和 YFinanceSource.fetch() 新增 adj 参数支持 raw/qfq/hfq

- TushareSource.fetch(adj='raw'): 内部路由到 fetch_index/fetch_stock_adj/fetch_etf_adj
- YFinanceSource.fetch(adj='raw'): 内部路由到 fetch_adj() 或原始逻辑
- 添加 is_china_stock() 和 _is_etf_code() 方法用于资产类型判断
2026-05-23 18:32:00 +08:00

214 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YFinance数据源
获取港股、美股数据通过SSH隧道
"""
import os
import time
from typing import Optional
from datetime import datetime, timedelta
import pandas as pd
import urllib3
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class YFinanceSource:
"""YFinance数据源"""
# 代码映射(项目代码 -> YFinance格式
CODE_MAP = {
# 港股
"HSTECH.HK": "3033.HK", # 恒生科技指数
"HSI": "^HSI", # 恒生指数
# 美股指数
"NDX": "^NDX", # 纳斯达克100
"SPX": "^GSPC", # 标普500
"DJI": "^DJI", # 道琼斯
# 日本/欧洲
"N225": "^N225", # 日经225
"GDAXI": "^GDAXI", # 德国DAX
# 商品
"CL.NYM": "CL=F", # WTI原油期货
}
def __init__(self, use_ssh_tunnel: bool = False):
"""
初始化YFinance数据源
Args:
use_ssh_tunnel: 是否使用SSH隧道需先启动SSHTunnelManager
"""
self.use_ssh_tunnel = use_ssh_tunnel
self._delay = 0.5 # 请求延迟(避免限流)
def fetch(self, code: str, start_date: str, end_date: str, adj: str = 'raw') -> Optional[pd.DataFrame]:
"""
获取数据(支持 adj 参数)
Args:
code: 代码(如 'NDX', 'N225', 'HSI', 'AAPL'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
Returns:
DataFrame with columns: date, open, high, low, close, volume
股票元信息存储在 df.attrs['info'] 中
adj='qfq/hfq' 时 df.attrs['adj'] 会标记复权类型
"""
# 校验 adj 参数
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'raw', 'qfq''hfq',当前: {adj}")
# 复权数据:调用 fetch_adj
if adj in ['qfq', 'hfq']:
return self.fetch_adj(code, start_date, end_date, adj)
# 原始数据:以下为原有逻辑
import yfinance as yf
# 添加延迟避免限流
time.sleep(self._delay)
# 转换代码格式
yf_code = self.CODE_MAP.get(code, code)
try:
ticker = yf.Ticker(yf_code)
# 获取股票信息(仅对股票/ETF有效指数可能没有
stock_info = {}
try:
stock_info = ticker.info or {}
except Exception:
pass # 指数可能没有info
# end_date 需要加一天yfinance的end是排他的
end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
# auto_adjust=False 获取不复权价格
df = ticker.history(
start=start_date,
end=end_dt.strftime("%Y-%m-%d"),
auto_adjust=False
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
})
# 确保索引是日期格式
df.index = pd.to_datetime(df.index, utc=True).tz_localize(None).normalize()
df.index.name = "date"
# 添加代码列
df["code"] = code
# 将股票信息存储到 DataFrame.attrs 中(最外层结构)
df.attrs['info'] = stock_info
df.attrs['code'] = code
df.attrs['adj'] = 'raw'
return df[['code', 'open', 'high', 'low', 'close', 'volume']]
except Exception as e:
print(f"YFinance下载 {code} ({yf_code}) 失败: {e}")
return None
def fetch_adj(self, code: str, start_date: str, end_date: str, adj: str = 'qfq') -> Optional[pd.DataFrame]:
"""
获取复权价格数据
统一 adj 参数设计:
- 'qfq': 前复权 → yfinance auto_adjust=True (当前价不变)
- 'hfq': 后复权 → yfinance back_adjust=True (历史价不变)
Args:
code: 代码(如 'AAPL', 'TSLA', 'QQQ', '00700.HK'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, code, open, high, low, close, volume (复权后)
"""
import yfinance as yf
if adj not in ['qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'qfq''hfq',当前: {adj}")
# 添加延迟避免限流
time.sleep(self._delay)
# 转换代码格式
yf_code = self.CODE_MAP.get(code, code)
# adj 参数映射到 yfinance 参数
# qfq(前复权) = auto_adjust=True, back_adjust=False (当前价不变)
# hfq(后复权) = auto_adjust=False, back_adjust=True (历史价不变)
adjust_params = {
'qfq': {'auto_adjust': True, 'back_adjust': False},
'hfq': {'auto_adjust': False, 'back_adjust': True},
}
try:
ticker = yf.Ticker(yf_code)
# end_date 需要加一天yfinance的end是排他的
end_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
# 根据 adj 参数设置复权方式
params = adjust_params[adj]
df = ticker.history(
start=start_date,
end=end_dt.strftime("%Y-%m-%d"),
auto_adjust=params['auto_adjust'],
back_adjust=params['back_adjust']
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
})
# 确保索引是日期格式
df.index = pd.to_datetime(df.index, utc=True).tz_localize(None).normalize()
df.index.name = "date"
# 添加代码列和标记
df["code"] = code
df.attrs['code'] = code
df.attrs['adj'] = adj
return df[['code', 'open', 'high', 'low', 'close', 'volume']]
except Exception as e:
print(f"YFinance下载复权数据 {code} ({yf_code}) adj={adj} 失败: {e}")
return None
def is_yfinance_code(self, code: str) -> bool:
"""判断是否需要YFinance获取"""
# 非A股代码
china_suffixes = ['.SH', '.SZ', '.SS', '.CSI']
futures_suffixes = ['.SHF', '.NYM', '.DCE', '.CZC']
# A股或期货用Tushare其他用YFinance
return not any(code.endswith(s) for s in china_suffixes + futures_suffixes)