refactor(universal_fetcher): fetch添加adj参数,fetch_with_adj简化

UniversalDataFetcher.fetch() 新增 adj 参数,直接传递给底层

- fetch(adj='raw/qfq/hfq'): 统一入口,参数校验和路由
- fetch_with_adj(): 简化为 return self.fetch(adj=adj)
- 删除重复的 VALID_ADJ_BY_TYPE 定义和路由逻辑(~70行)
- VALID_ADJ_BY_TYPE 移到类级别作为静态配置
This commit is contained in:
2026-05-23 18:32:10 +08:00
parent 02dbc7bd7d
commit c319fd42be

View File

@@ -107,16 +107,30 @@ class UniversalDataFetcher:
# 统一入口(自动路由)
# ============================================================
# 各资产类型支持的 adj 参数
VALID_ADJ_BY_TYPE = {
AssetType.CHINA_INDEX: ['raw'], # 指数无复权
AssetType.CHINA_ETF: ['raw', 'hfq'], # ETF 仅支持后复权
AssetType.CHINA_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.US_INDEX: ['raw'], # 指数无复权
AssetType.US_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.HK_INDEX: ['raw'], # 指数无复权
AssetType.HK_STOCK: ['raw', 'qfq', 'hfq'],
AssetType.FUTURES: ['raw'], # 期货无复权
AssetType.CRYPTO: ['raw'], # 加密货币无复权
}
def fetch(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3,
timeframe: str = '1d'
) -> Optional[pd.DataFrame]:
"""
统一数据获取入口
统一数据获取入口(支持 adj 参数)
自动识别资产类型并路由到对应方法
@@ -124,31 +138,60 @@ class UniversalDataFetcher:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
retry: 重试次数
timeframe: K线周期仅加密货币需要默认1d
Returns:
DataFrame with columns: date, open, high, low, close, volume
adj='hfq' 时 A股 ETF 会额外返回 adj_factor, close_hfq
示例:
# 原始价格
df = fetcher.fetch("000300.SH", "2020-01-01", "2024-12-31")
# A股股票后复权
df = fetcher.fetch("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 校验 adj 参数
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"adj 参数必须是 'raw', 'qfq''hfq',当前: {adj}")
asset_type = AssetTypeDetector.detect(code)
# 校验 adj 是否适用于该资产类型
valid_adj = self.VALID_ADJ_BY_TYPE.get(asset_type, ['raw'])
if adj not in valid_adj:
raise ValueError(
f"adj='{adj}' 不适用于 {asset_type.value},支持的类型: {valid_adj}"
)
for attempt in range(retry):
try:
# 路由到具体方法
# 路由到具体方法(传递 adj 参数)
if asset_type == AssetType.CHINA_INDEX:
return self._fetch_china_index(code, start_date, end_date)
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_ETF:
return self._fetch_china_etf(code, start_date, end_date)
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.CHINA_STOCK:
return self._tushare.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_INDEX:
return self._fetch_us_index(code, start_date, end_date)
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.US_STOCK:
return self._fetch_us_stock(code, start_date, end_date)
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_INDEX:
return self._fetch_hk_index(code, start_date, end_date)
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.HK_STOCK:
return self._fetch_hk_stock(code, start_date, end_date)
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date, adj)
elif asset_type == AssetType.FUTURES:
return self._fetch_futures(code, start_date, end_date)
return self._fetch_futures(code, start_date, end_date, adj)
elif asset_type == AssetType.CRYPTO:
return self._fetch_crypto(code, start_date, end_date, timeframe)
else:
@@ -159,7 +202,7 @@ class UniversalDataFetcher:
if attempt < retry - 1:
time.sleep(2)
else:
print(f"✗ 获取 {code} 失败 (尝试 {attempt+1}/{retry}): {e}")
print(f"✗ 获取 {code} adj={adj} 失败 (尝试 {attempt+1}/{retry}): {e}")
return None
return None
@@ -359,7 +402,8 @@ class UniversalDataFetcher:
self,
code: str,
start_date: str,
end_date: str
end_date: str,
adj: str = 'raw'
) -> Optional[pd.DataFrame]:
"""
获取期货数据
@@ -367,11 +411,16 @@ class UniversalDataFetcher:
特点:
- 中国期货(.SHF/.DCE/.CZC): Tushare
- NYMEX(.NYM): YFinance
- 期货不支持复权adj 只能为 'raw'
"""
# 期货不支持复权
if adj != 'raw':
raise ValueError(f"期货不支持复权adj='{adj}' 仅适用于股票/ETF")
if code.endswith('.NYM'):
# NYMEX期货走YFinance
self._start_tunnel()
return self._yfinance.fetch(code, start_date, end_date)
return self._yfinance.fetch(code, start_date, end_date, adj='raw')
else:
# 中国期货走Tushare
return self._tushare.fetch_futures(code, start_date, end_date)
@@ -493,12 +542,13 @@ class UniversalDataFetcher:
self,
code: str,
start_date: str,
end_date: str
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取美股复权价格
使用 YFinance auto_adjust=True
使用 YFinance,支持前复权(qfq)和后复权(hfq)
- 消除拆分(split)和分红(dividend)对价格的影响
- 适用于美股股票/ETF
@@ -506,13 +556,98 @@ class UniversalDataFetcher:
code: 美股代码,如 'AAPL', 'TSLA', 'QQQ'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
示例:
# 苹果复权价格(包含分红和拆分调整)
df = fetcher.fetch_us_adj("AAPL", "2020-01-01", "2024-12-31")
df = fetcher.fetch_us_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
self._start_tunnel()
return self._yfinance.fetch_adj(code, start_date, end_date)
return self._yfinance.fetch_adj(code, start_date, end_date, adj)
def fetch_hk_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'qfq'
) -> Optional[pd.DataFrame]:
"""
获取港股股票复权价格
使用 YFinance支持前复权(qfq)和后复权(hfq)
Args:
code: 港股代码,如 '00700.HK', '00941.HK'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'qfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume (复权后)
"""
self._start_tunnel()
return self._yfinance.fetch_adj(code, start_date, end_date, adj)
def fetch_stock_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'hfq'
) -> Optional[pd.DataFrame]:
"""
获取 A股股票复权价格
使用 Tushare pro_bar 接口,支持前复权(qfq)和后复权(hfq)
Args:
code: A股股票代码'000001.SZ', '600000.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'qfq'(前复权) 或 'hfq'(后复权),默认 'hfq'
Returns:
DataFrame with columns: date, open, high, low, close, volume, adj_factor
"""
return self._tushare.fetch_stock_adj(code, start_date, end_date, adj)
# ============================================================
# 统一复权入口(简化版,直接调用 fetch
# ============================================================
def fetch_with_adj(
self,
code: str,
start_date: str,
end_date: str,
adj: str = 'raw',
retry: int = 3
) -> Optional[pd.DataFrame]:
"""
统一复权入口(简化版)
直接调用 fetch(adj=adj),无需重复实现路由逻辑。
Args:
code: 标的代码
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型,默认 'raw'
retry: 重试次数
Returns:
DataFrame结构因资产类型和 adj 参数略有不同
示例:
# A股股票后复权
df = fetcher.fetch_with_adj("000001.SZ", "2020-01-01", "2024-12-31", adj='hfq')
# 美股股票前复权
df = fetcher.fetch_with_adj("AAPL", "2020-01-01", "2024-12-31", adj='qfq')
"""
# 直接调用 fetch传递 adj 参数
return self.fetch(code, start_date, end_date, adj, retry)