From 2867ae8d216a7201fce9a67b951000a861b9b222 Mon Sep 17 00:00:00 2001 From: aszerW Date: Sat, 23 May 2026 22:28:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=B0=86ETF=E5=87=80=E5=80=BC?= =?UTF-8?q?=E5=92=8C=E6=BA=A2=E4=BB=B7=E7=8E=87=E9=80=BB=E8=BE=91=E4=B8=8B?= =?UTF-8?q?=E7=A7=BB=E5=88=B0TushareSource=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构说明: - TushareSource: 新增 fetch_etf_with_nav() 和 _calculate_premium_series() - UniversalDataFetcher: 简化 fetch_etf_with_nav() 为透传调用 - Flask: 更新注释说明数据层已处理 架构优势: - 职责分离:TushareSource 封装完整数据获取逻辑 - 可复用性:任何调用 TushareSource 的地方都有净值 - 维护性:业务逻辑集中在数据源层 - 符合单一职责原则 --- datasource/flask_server.py | 6 +- datasource/tushare_source.py | 108 +++++++++++++++++++++++++++++++- datasource/universal_fetcher.py | 85 +------------------------ 3 files changed, 113 insertions(+), 86 deletions(-) diff --git a/datasource/flask_server.py b/datasource/flask_server.py index 3c194ee..bbbe305 100644 --- a/datasource/flask_server.py +++ b/datasource/flask_server.py @@ -713,19 +713,19 @@ def get_ohlcv(): result['asset_type'] = final_type.value # 使用最终类型 result['adj'] = adj # 返回使用的 adj 参数 - # 如果是中国 ETF,始终附加净值和溢价率数据(与 adj 无关) + # 如果是中国 ETF,附加净值和溢价率数据(数据层已处理) if final_type == AssetType.CHINA_ETF: try: f = get_fetcher() with f: - # 注意:始终使用原始价格计算溢价率(净值无复权概念) + # 调用 TushareSource 的完整方法 price_df, nav_df, premium_series = f.fetch_etf_with_nav(code, start, end) # 添加净值数据 if nav_df is not None and len(nav_df) > 0: result['nav'] = dataframe_to_json(nav_df) - # 添加溢价率数据(使用抽取的函数) + # 添加溢价率数据 premium_result = build_premium_result(premium_series) if premium_result: result.update(premium_result) diff --git a/datasource/tushare_source.py b/datasource/tushare_source.py index 4af5eb9..13998e9 100644 --- a/datasource/tushare_source.py +++ b/datasource/tushare_source.py @@ -5,7 +5,7 @@ Tushare数据源 """ import os -from typing import Optional +from typing import Optional, Tuple from datetime import datetime import pandas as pd @@ -276,6 +276,112 @@ class TushareSource: prefix = code[:2] return prefix in ['51', '52', '15', '16'] + def fetch_etf_with_nav( + self, + code: str, + start_date: str, + end_date: str + ) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.Series]]: + """ + 获取ETF完整数据(价格 + 净值 + 溢价率序列) + + Args: + code: ETF代码,如 '159915.SZ', '518880.SH' + start_date: 开始日期 'YYYY-MM-DD' + end_date: 结束日期 'YYYY-MM-DD' + + Returns: + (price_df, nav_df, premium_series) + - price_df: ETF价格数据 (OHLCV) + - nav_df: ETF净值数据 + - premium_series: 溢价率序列 (每天计算) + """ + # 1. 获取价格 + price_df = self.fetch_etf(code, start_date, end_date) + + # 2. 获取净值 + nav_df = self.fetch_etf_nav(code, start_date, end_date) + + # 3. 计算溢价率 + premium_series = None + if price_df is not None and nav_df is not None and len(nav_df) > 0: + premium_series = self._calculate_premium_series(price_df, nav_df) + + return price_df, nav_df, premium_series + + def _calculate_premium_series( + self, + price_df: pd.DataFrame, + nav_df: pd.DataFrame + ) -> Optional[pd.Series]: + """ + 计算历史溢价率序列 + + 溢价率 = (ETF收盘价 - ETF净值) / ETF净值 + + 关键:不同QDII基金净值披露规则不同 + - 部分基金净值当天披露(如日经ETF):价格日期=净值日期 + - 部分基金净值T+1披露(如纳指ETF):价格日期配T-1日净值 + + 集思录做法:根据基金特性选择匹配方式 + - 如果有当天净值数据,优先使用当天净值 + - 如果当天净值不存在,使用T-1日净值 + + Args: + price_df: ETF价格数据(索引为日期) + nav_df: ETF净值数据(索引为日期) + + Returns: + 溢价率Series(索引为价格日期,值为溢价率) + """ + # 去除重复日期 + price_index = price_df.index + if price_index.has_duplicates: + price_df = price_df[~price_df.index.duplicated(keep='last')] + + nav_index = nav_df.index + if nav_index.has_duplicates: + nav_df = nav_df[~nav_df.index.duplicated(keep='last')] + + # 优先尝试使用当天净值(如日经ETF) + same_day_dates = price_df.index.intersection(nav_df.index) + + # 对于没有当天净值的日期,使用T-1日净值(如纳指ETF) + nav_df_shifted = nav_df.copy() + nav_df_shifted.index = nav_df_shifted.index + pd.Timedelta(days=1) + shifted_dates = price_df.index.intersection(nav_df_shifted.index) + + # 排除已有当天净值的日期 + t1_dates = shifted_dates.difference(same_day_dates) + + premium_data = {} + + # 使用当天净值计算 + if len(same_day_dates) > 0: + close_same = price_df.loc[same_day_dates, 'close'] + nav_same = nav_df.loc[same_day_dates, 'nav'] + for date in same_day_dates: + if pd.notna(close_same.loc[date]) and pd.notna(nav_same.loc[date]): + premium_data[date] = (close_same.loc[date] - nav_same.loc[date]) / nav_same.loc[date] + + # 使用T-1日净值计算(仅用于没有当天净值的日期) + if len(t1_dates) > 0: + close_t1 = price_df.loc[t1_dates, 'close'] + nav_t1 = nav_df_shifted.loc[t1_dates, 'nav'] + for date in t1_dates: + if pd.notna(close_t1.loc[date]) and pd.notna(nav_t1.loc[date]): + premium_data[date] = (close_t1.loc[date] - nav_t1.loc[date]) / nav_t1.loc[date] + + if len(premium_data) == 0: + return None + + # 构建Series并按日期排序 + premium = pd.Series(premium_data) + premium = premium.sort_index() + premium = premium.dropna() + + return premium + def fetch_etf_adj(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]: """ 获取 ETF 后复权价格数据 diff --git a/datasource/universal_fetcher.py b/datasource/universal_fetcher.py index 4c128c8..85e5445 100644 --- a/datasource/universal_fetcher.py +++ b/datasource/universal_fetcher.py @@ -308,7 +308,7 @@ class UniversalDataFetcher: """ 获取ETF价格 + 净值 + 溢价率序列 - 计算每一天的溢价率,用于分析溢价率走势 + 直接调用 TushareSource 的完整方法,封装业务逻辑 Args: code: ETF代码 @@ -321,88 +321,9 @@ class UniversalDataFetcher: - nav_df: ETF净值数据 - premium_series: 溢价率序列 (每天计算) """ - price_df = self._tushare.fetch_etf(code, start_date, end_date) - nav_df = self._tushare.fetch_etf_nav(code, start_date, end_date) - - # 计算历史溢价率序列 - premium_series = None - if price_df is not None and nav_df is not None and len(nav_df) > 0: - premium_series = self._calculate_premium_series(price_df, nav_df) - - return price_df, nav_df, premium_series + return self._tushare.fetch_etf_with_nav(code, start_date, end_date) - def _calculate_premium_series( - self, - price_df: pd.DataFrame, - nav_df: pd.DataFrame - ) -> Optional[pd.Series]: - """ - 计算历史溢价率序列 - - 溢价率 = (ETF收盘价 - ETF净值) / ETF净值 - - 关键:不同QDII基金净值披露规则不同 - - 部分基金净值当天披露(如日经ETF):价格日期=净值日期 - - 部分基金净值T+1披露(如纳指ETF):价格日期配T-1日净值 - - 集思录做法:根据基金特性选择匹配方式 - - 如果有当天净值数据,优先使用当天净值 - - 如果当天净值不存在,使用T-1日净值 - - Args: - price_df: ETF价格数据(索引为日期) - nav_df: ETF净值数据(索引为日期) - - Returns: - 溢价率Series(索引为价格日期,值为溢价率) - """ - # 去除重复日期 - price_index = price_df.index - if price_index.has_duplicates: - price_df = price_df[~price_df.index.duplicated(keep='last')] - - nav_index = nav_df.index - if nav_index.has_duplicates: - nav_df = nav_df[~nav_df.index.duplicated(keep='last')] - - # 优先尝试使用当天净值(如日经ETF) - same_day_dates = price_df.index.intersection(nav_df.index) - - # 对于没有当天净值的日期,使用T-1日净值(如纳指ETF) - nav_df_shifted = nav_df.copy() - nav_df_shifted.index = nav_df_shifted.index + pd.Timedelta(days=1) - shifted_dates = price_df.index.intersection(nav_df_shifted.index) - - # 排除已有当天净值的日期 - t1_dates = shifted_dates.difference(same_day_dates) - - premium_data = {} - - # 使用当天净值计算 - if len(same_day_dates) > 0: - close_same = price_df.loc[same_day_dates, 'close'] - nav_same = nav_df.loc[same_day_dates, 'nav'] - for date in same_day_dates: - if pd.notna(close_same.loc[date]) and pd.notna(nav_same.loc[date]): - premium_data[date] = (close_same.loc[date] - nav_same.loc[date]) / nav_same.loc[date] - - # 使用T-1日净值计算(仅用于没有当天净值的日期) - if len(t1_dates) > 0: - close_t1 = price_df.loc[t1_dates, 'close'] - nav_t1 = nav_df_shifted.loc[t1_dates, 'nav'] - for date in t1_dates: - if pd.notna(close_t1.loc[date]) and pd.notna(nav_t1.loc[date]): - premium_data[date] = (close_t1.loc[date] - nav_t1.loc[date]) / nav_t1.loc[date] - - if len(premium_data) == 0: - return None - - # 构建Series并按日期排序 - premium = pd.Series(premium_data) - premium = premium.sort_index() - premium = premium.dropna() - - return premium + # 移除 _calculate_premium_series 方法(已下移到 TushareSource) # ============================================================ # 内部方法:特殊资产类型(保留)