Compare commits

...

3 Commits

Author SHA1 Message Date
ec749314bc feat(data-source): 支持指数-ETF双轨数据获取及因子计算
- 新增使用Tushare获取A股ETF价格及净值数据的私有方法
- fetch_all方法支持接收完整代码配置,区分指数与ETF及市场类别
- 指数数据和ETF数据分别下载,ETF净值数据用于溢价率计算
- 采用A股交易日为主交易日历,非A股数据前向填充对齐
- 调整因子计算,支持指数价格计算因子,ETF价格计算收益率
- run_rotation脚本和RotationStrategy引擎适配指数-ETF配置格式
- 代码结构优化,增强多市场及加密货币处理能力
2026-03-25 22:01:44 +08:00
e6898a851c feat(config): 优化ETF轮动策略配置
- 将候选池指数列表升级为包含名称、对应ETF代码及市场的详细映射结构
- 支持多市场ETF映射,包括A股、港股、美股、商品及加密货币市场
- 新增主市场配置及跨境ETF溢价控制机制,防止高溢价买入
- 溢价控制支持启用开关、不同市场阈值及降权模式
- 明确交易成本、缓存使用及交易日历设置,增强策略灵活性和稳定性
2026-03-25 22:01:22 +08:00
61362b274b feat(rotation): 实现跨市场ETF映射与溢价控制方案
- 重新设计配置文件结构,支持指数到ETF的映射关系及市场类型区分
- 新增ETF数据获取模块,从Tushare获取A股ETF行情及净值
- 修改因子计算逻辑,基于指数数据计算因子,使用ETF数据计算收益率
- 重构轮动引擎,支持同时处理指数与ETF数据,动态检查ETF可用性
- 增加跨境ETF溢价控制机制,基于实时及历史溢价率过滤或降权持仓标的
- 持仓和报告模块显示ETF代码及跨境溢价率信息,提升实际操作参考价值
- 更新配置解析,构建代码-名称、代码-ETF和代码-市场映射关系
- 采用A股交易日历对齐多市场数据,确保因子计算和信号生成时序准确
- 详细设计溢价率计算与信号调整策略,解决跨境市场时差及数据可用性问题
- 明确加密货币数据处理方案及休市期间数据填充策略,保证逻辑一致性与安全性
2026-03-25 22:01:07 +08:00
6 changed files with 1076 additions and 207 deletions

View File

@@ -1,43 +1,111 @@
# ETF轮动策略配置
# ==================== 候选池配置 ====================
# A股全行业指数配置Tushare格式XXXXXX.SH / XXXXXX.SZ
# 格式: {代码: 名称}
# 指数-ETF映射配置
# index: 指数代码(用于计算因子信号)
# etf: ETF代码用于实际交易和收益计算null表示直接交易指数/加密货币
code_list:
# 中国A股指数 (使用 Tushare) - 主市场,交易日基准
# 宽基指数
"000300.SH": "沪深300"
"000905.SH": "中证500"
"000852.SH": "中证1000"
"399006.SZ": "创业板指"
"000015.SH": "上证红利"
"000300.SH":
name: "沪深300"
etf: "510300.SH" # 华泰柏瑞沪深300ETF
market: "A"
"000905.SH":
name: "中证500"
etf: "510500.SH" # 南方中证500ETF
market: "A"
"000852.SH":
name: "中证1000"
etf: "512100.SH" # 南方中证1000ETF
market: "A"
"399006.SZ":
name: "创业板指"
etf: "159915.SZ" # 易方达创业板ETF
market: "A"
"000015.SH":
name: "上证红利"
etf: "510880.SH" # 华泰柏瑞红利ETF
market: "A"
# 金融
"399986.SZ": "中证银行"
"399986.SZ":
name: "中证银行"
etf: "512800.SH" # 华宝银行ETF
market: "A"
# 消费
"399997.SZ": "中证白酒"
"399997.SZ":
name: "中证白酒"
etf: "512690.SH" # 鹏华酒ETF
market: "A"
# 医药健康
"399989.SZ": "中证医疗"
"399989.SZ":
name: "中证医疗"
etf: "512170.SH" # 华宝医疗ETF
market: "A"
# 科技信息
"000935.SH": "中证信息"
"000935.SH":
name: "中证信息"
etf: "512330.SH" # 南方信息ETF
market: "A"
# 新能源
"399976.SZ": "新能源车"
"399976.SZ":
name: "新能源车"
etf: "515030.SH" # 华夏新能源ETF
market: "A"
# 周期资源
"399395.SZ": "国证有色"
"399998.SZ": "中证煤炭"
"399813.SZ": "细分化工"
"000937.SH": "中证能源"
"399395.SZ":
name: "国证有色"
etf: "159880.SZ" # 有色ETF
market: "A"
"399998.SZ":
name: "中证煤炭"
etf: "515220.SH" # 煤炭ETF
market: "A"
"399813.SZ":
name: "细分化工"
etf: "516120.SH" # 化工ETF
market: "A"
"000937.SH":
name: "中证能源"
etf: "159930.SZ" # 能源ETF
market: "A"
# 其他行业
"399967.SZ": "中证军工"
"000949.SH": "中证农业"
"399702.SZ": "国债指数"
"399967.SZ":
name: "中证军工"
etf: "512660.SH" # 军工ETF
market: "A"
"000949.SH":
name: "中证农业"
etf: "159825.SZ" # 农业ETF
market: "A"
"399702.SZ":
name: "国债指数"
etf: "511010.SH" # 国债ETF
market: "A"
# 全球市场指数 (使用 YFinance) - 非主市场数据会前向填充到A股交易日
"HSTECH": "恒生科技" # 港股
"NDX": "纳指100" # 美股
"GC=F": "黄金" # 黄金期货 (COMEX)
"HSTECH":
name: "恒生科技"
etf: "513180.SH" # 华夏恒生科技ETF
market: "HK"
"NDX":
name: "纳指100"
etf: "159501.SZ" # 嘉实纳指100ETF流动性好
market: "US"
"GC=F":
name: "黄金"
etf: "518880.SH" # 华安黄金ETF
market: "COMMODITY"
# 加密货币 (使用 CCXT/OKX 现货) - 通过 SSH->HTTP 代理访问
"BTC": "比特币" # OKX 现货
"ETH": "以太坊" # OKX 现货
"BTC":
name: "比特币"
etf: null # 无ETF直接交易
market: "CRYPTO"
"ETH":
name: "以太坊"
etf: null # 无ETF直接交易
market: "CRYPTO"
# 主市场配置(用于确定交易日历)
primary_market:
@@ -71,6 +139,27 @@ rebalance_threshold: 0.0
# 单次换仓成本(双边,含佣金+滑点)
trade_cost: 0.001
# ==================== 溢价控制配置 ====================
# 跨境ETF溢价过滤机制防止高溢价买入
premium_control:
enabled: true
default_threshold: 0.02 # 默认溢价阈值 2%
mode: "filter" # "filter"(完全排除) 或 "penalize"(降权)
penalty_factor: 0.5 # 降权模式下的惩罚系数
# 按市场类型覆盖配置
market_overrides:
A: # A股 ETF
enabled: false # 不启用(溢价通常 < 0.5%
HK: # 港股 ETF
enabled: true
threshold: 0.03 # 阈值 3%
US: # 美股 ETF
enabled: true
threshold: 0.02 # 阈值 2%
COMMODITY: # 商品 ETF
enabled: false
# ==================== 数据缓存 ====================
# 是否使用本地缓存True=优先从本地读取)
use_cache: true

View File

@@ -209,6 +209,116 @@ class HybridDataSource:
if value is not None:
os.environ[key] = value
def _fetch_etf(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""使用 Tushare 获取A股ETF数据fund_daily接口"""
import os
# 临时清除代理环境变量
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"]:
original_proxy[key] = os.environ.pop(key, None)
try:
import tushare as ts
pro = ts.pro_api(self._get_tushare_token())
# 转换代码格式 (510300.SH -> 510300.SH)
ts_code = code.replace(".SS", ".SH")
# 获取ETF日线数据
df = pro.fund_daily(
ts_code=ts_code,
start_date=start_date.replace("-", ""),
end_date=end_date.replace("-", "")
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"trade_date": "date",
"open": "open",
"high": "high",
"low": "low",
"close": "close",
"vol": "volume",
})
# 转换日期格式
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()
# 添加代码列
df["code"] = code
return df
except Exception as e:
print(f"Tushare 下载ETF {code} 失败: {e}")
return None
finally:
# 恢复代理环境变量
for key, value in original_proxy.items():
if value is not None:
os.environ[key] = value
def _fetch_etf_nav(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""使用 Tushare 获取ETF净值数据fund_nav接口"""
import os
# 临时清除代理环境变量
original_proxy = {}
for key in ["HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy"]:
original_proxy[key] = os.environ.pop(key, None)
try:
import tushare as ts
pro = ts.pro_api(self._get_tushare_token())
# 转换代码格式
ts_code = code.replace(".SS", ".SH")
# 获取ETF净值数据
df = pro.fund_nav(
ts_code=ts_code,
start_date=start_date.replace("-", ""),
end_date=end_date.replace("-", "")
)
if df is None or len(df) == 0:
return None
# 标准化列名
df = df.rename(columns={
"nav_date": "date",
"unit_nav": "nav",
})
# 转换日期格式
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
df = df.sort_index()
# 添加代码列
df["code"] = code
return df
except Exception as e:
print(f"Tushare 下载ETF净值 {code} 失败: {e}")
return None
finally:
# 恢复代理环境变量
for key, value in original_proxy.items():
if value is not None:
os.environ[key] = value
def _fetch_yfinance(self, code: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
"""使用 YFinance 获取数据"""
import time
@@ -303,39 +413,50 @@ class HybridDataSource:
def fetch_all(
self,
code_list, # list[代码] 或 dict{代码: 名称}
code_config: dict, # {代码: {name, etf, market}}
benchmark_code: str,
start_date: str,
end_date: str,
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], list]:
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame], list]:
"""
批量获取数据
注意:由于 Tushare(中国A股) 和 YFinance(美股/加密货币) 的交易日历不同,
这里返回的是长格式数据,由调用方分别处理各市场的数据
批量获取数据(支持指数-ETF映射
Args:
code_config: 配置字典,格式为 {index_code: {name, etf, market}}
benchmark_code: 基准指数代码
start_date: 开始日期
end_date: 结束日期
Returns:
(etf_data, benchmark_data, valid_codes)
etf_data: DataFrame with columns [code, close, source], index=date
(index_data, etf_data, etf_nav_data, benchmark_data, valid_codes)
- index_data: 指数数据(用于因子计算)
- etf_data: ETF价格数据用于收益计算
- etf_nav_data: ETF净值数据用于溢价率计算
- benchmark_data: 基准数据
- valid_codes: 有效代码列表
"""
all_data = []
index_data_list = []
etf_data_list = []
valid_codes = []
# 兼容列表和字典格式
if isinstance(code_list, dict):
codes = list(code_list.keys())
code_name_map = code_list
else:
codes = code_list
code_name_map = {c: c for c in codes}
# 提取指数代码和ETF代码
index_codes = list(code_config.keys())
etf_codes = {}
for idx_code, cfg in code_config.items():
if cfg.get('etf'):
etf_codes[idx_code] = cfg['etf']
print(f"开始下载 {len(codes)} 只标的的数据...")
china_codes = [c for c in codes if self._is_china_index(c)]
global_codes = [c for c in codes if not self._is_china_index(c)]
print(f"开始下载 {len(index_codes)} 只标的的数据...")
print(f" 指数代码: {len(index_codes)}")
print(f" ETF映射: {len(etf_codes)}")
china_codes = [c for c in index_codes if self._is_china_index(c)]
global_codes = [c for c in index_codes if not self._is_china_index(c)]
print(f" 中国A股指数: {len(china_codes)}")
print(f" 港股/美股/加密货币: {len(global_codes)}")
# 检查是否需要启动 socks2http 代理(用于加密货币)
crypto_codes = [c for c in codes if self._is_crypto(c)]
crypto_codes = [c for c in index_codes if self._is_crypto(c)]
http_proxy = None
socks2http_proc = None
@@ -358,8 +479,9 @@ class HybridDataSource:
except Exception as e:
print(f" ✗ 启动代理失败: {e}")
# 分别下载数据
for code in codes:
# 下载指数数据
print("\n [1/2] 下载指数数据(用于因子计算)...")
for code in index_codes:
if self._is_china_index(code):
source = "Tushare"
elif self._is_crypto(code):
@@ -367,8 +489,8 @@ class HybridDataSource:
else:
source = "YFinance"
name = code_name_map.get(code, code)
print(f" 下载 {code} ({name}) - {source}...", end=" ")
name = code_config[code].get('name', code)
print(f" 下载 {code} ({name}) - {source}...", end=" ")
# 加密货币使用 HTTP 代理
proxy = http_proxy if self._is_crypto(code) else None
@@ -378,95 +500,163 @@ class HybridDataSource:
# 标准化数据格式
data = data.copy()
data['source'] = source
data['code'] = code # 确保code列正确
# 确保索引是日期格式且无时区,只保留日期部分(去掉时间)
data.index = pd.to_datetime(data.index, utc=True).tz_localize(None).normalize()
all_data.append(data[['code', 'close', 'source']])
index_data_list.append(data[['code', 'close', 'source']])
valid_codes.append(code)
print(f"{len(data)}")
else:
print("✗ 无数据")
# 下载ETF数据价格+净值,用于溢价率计算)
etf_nav_data_list = [] # ETF净值数据
if etf_codes:
print("\n [2/2] 下载ETF数据价格+净值,用于溢价率计算)...")
for idx_code, etf_code in etf_codes.items():
name = code_config[idx_code].get('name', idx_code)
market = code_config[idx_code].get('market', 'A')
# 加密货币跳过ETF下载
if market == 'CRYPTO':
continue
print(f" 下载 ETF {etf_code} (对应指数 {idx_code})...", end=" ")
# 获取ETF价格数据
price_data = self._fetch_etf(etf_code, start_date, end_date)
# 获取ETF净值数据
nav_data = self._fetch_etf_nav(etf_code, start_date, end_date)
if price_data is not None and len(price_data) > 0:
# 使用指数代码作为列名,保持与指数数据一致
price_data = price_data.copy()
price_data['source'] = 'Tushare-ETF'
price_data['code'] = idx_code
price_data.index = pd.to_datetime(price_data.index, utc=True).tz_localize(None).normalize()
etf_data_list.append(price_data[['code', 'close', 'source']])
# 处理净值数据
if nav_data is not None and len(nav_data) > 0:
nav_data = nav_data.copy()
nav_data['code'] = idx_code
nav_data.index = pd.to_datetime(nav_data.index, utc=True).tz_localize(None).normalize()
etf_nav_data_list.append(nav_data[['code', 'nav']])
print(f"✓ 价格{len(price_data)}条 净值{len(nav_data)}")
else:
print(f"✓ 价格{len(price_data)}条 (无净值数据)")
else:
print(f"✗ 无数据")
# 关闭 socks2http 代理
if socks2http_proc:
socks2http_proc.terminate()
socks2http_proc.wait()
print(f"\n socks2http 代理已关闭")
if not all_data:
return None, None, []
if not index_data_list:
return None, None, None, None, []
# 检查数据源类型
sources = set(d['source'].iloc[0] for d in all_data)
# 处理指数数据
print(f"\n整理指数数据(用于因子计算)...")
index_df = pd.concat(index_data_list, ignore_index=False)
index_df = index_df.reset_index()
if 'index' in index_df.columns:
index_df = index_df.rename(columns={'index': 'date'})
index_df['date'] = pd.to_datetime(index_df['date']).dt.normalize()
if len(sources) == 1:
# 单一数据源:转换为宽格式(向后兼容)
all_df = pd.concat(all_data, ignore_index=False)
all_df = all_df.reset_index()
all_df['date'] = pd.to_datetime(all_df['date'], utc=True).dt.tz_localize(None)
etf_data = all_df.pivot_table(
index='date',
columns='code',
values='close',
aggfunc='first'
)
print(f"\n数据整理完成 (单一数据源 {list(sources)[0]}):")
print(f" 时间范围: {etf_data.index[0]} ~ {etf_data.index[-1]}")
print(f" 交易日数: {len(etf_data)}")
print(f" 有效标的: {len(etf_data.columns)}")
else:
# 多数据源以主市场Tushare/A股为基准其他市场数据前向填充
print(f"\n数据整理完成 (多数据源 - 以A股交易日为基准):")
# 透视为宽格式
index_data = index_df.pivot_table(
index='date',
columns='code',
values='close',
aggfunc='first'
)
# 合并所有数据(索引已经是标准化后的日期)
all_df = pd.concat(all_data, ignore_index=False)
all_df = all_df.reset_index()
# 重命名索引列为 date
if 'index' in all_df.columns:
all_df = all_df.rename(columns={'index': 'date'})
# 确保 date 列是日期格式(不含时间)
all_df['date'] = pd.to_datetime(all_df['date']).dt.normalize()
# 以A股交易日为基准对齐所有数据
tushare_codes = [c for c in valid_codes if self._is_china_index(c)]
if tushare_codes:
primary_dates = index_data[tushare_codes[0]].dropna().index
print(f" 主市场交易日: {len(primary_dates)}")
# 重新索引到主市场交易日
index_data = index_data.reindex(primary_dates)
# 对非A股指数进行前向填充
non_a_codes = [c for c in valid_codes if not self._is_china_index(c)]
for code in non_a_codes:
if code in index_data.columns:
index_data[code] = index_data[code].ffill().bfill()
print(f" 非A股标的: {len(non_a_codes)} 只 (已前向填充)")
print(f" 时间范围: {index_data.index[0]} ~ {index_data.index[-1]}")
print(f" 交易日数: {len(index_data)}")
# 处理ETF数据
if etf_data_list:
print(f"\n整理ETF数据用于收益计算...")
etf_df = pd.concat(etf_data_list, ignore_index=False)
etf_df = etf_df.reset_index()
if 'index' in etf_df.columns:
etf_df = etf_df.rename(columns={'index': 'date'})
etf_df['date'] = pd.to_datetime(etf_df['date']).dt.normalize()
# 透视为宽格式
etf_data = all_df.pivot_table(
etf_data = etf_df.pivot_table(
index='date',
columns='code',
values='close',
aggfunc='first'
)
# 获取主市场Tushare交易日
tushare_codes = [c for c in valid_codes if self._is_china_index(c)]
# 对齐到主市场交易日
if tushare_codes:
# 使用第一个A股代码的日期作为主市场交易日
primary_dates = etf_data[tushare_codes[0]].dropna().index
print(f" 主市场交易日: {len(primary_dates)}")
# 重新索引到主市场交易日,使用前向填充
etf_data = etf_data.reindex(primary_dates)
# 对每个非主市场代码进行前向填充
yfinance_codes = [c for c in valid_codes if not self._is_china_index(c)]
for code in yfinance_codes:
if code in etf_data.columns:
# 前向填充:用最近的有效价格填充休市日的数据
etf_data[code] = etf_data[code].ffill()
# 对于开头的NaN用后向填充
etf_data[code] = etf_data[code].bfill()
print(f" ETF价格数据: {len(etf_data.columns)}")
else:
# 如果没有ETF数据使用指数数据代替
etf_data = index_data.copy()
print(f"\n无ETF映射使用指数数据代替")
print(f" 非主市场标的: {len(yfinance_codes)} 只 (已前向填充)")
# 处理ETF净值数据
etf_nav_data = None
if etf_nav_data_list:
print(f"\n整理ETF净值数据用于溢价率计算...")
nav_df = pd.concat(etf_nav_data_list, ignore_index=False)
nav_df = nav_df.reset_index()
if 'index' in nav_df.columns:
nav_df = nav_df.rename(columns={'index': 'date'})
nav_df['date'] = pd.to_datetime(nav_df['date']).dt.normalize()
print(f" 时间范围: {etf_data.index[0]} ~ {etf_data.index[-1]}")
print(f" 交易日数: {len(etf_data)}")
print(f" 有效标的: {len(etf_data.columns)}")
# 透视为宽格式
etf_nav_data = nav_df.pivot_table(
index='date',
columns='code',
values='nav',
aggfunc='first'
)
# 对齐到主市场交易日并前向填充缺失值净值数据通常T+1更新
if tushare_codes:
etf_nav_data = etf_nav_data.reindex(primary_dates)
etf_nav_data = etf_nav_data.ffill() # 前向填充缺失的净值数据
print(f" ETF净值数据: {len(etf_nav_data.columns)}")
# 获取基准数据
benchmark_data = self.fetch_single(benchmark_code, start_date, end_date)
if benchmark_data is not None:
# 标准化日期索引(无时区,只保留日期部分)
benchmark_data.index = pd.to_datetime(benchmark_data.index, utc=True).tz_localize(None).normalize()
print(f" ✓ 基准 {benchmark_code}: {len(benchmark_data)}")
# 对齐到主市场交易日
if tushare_codes:
benchmark_data = benchmark_data.reindex(primary_dates)
print(f"\n✓ 基准 {benchmark_code}: {len(benchmark_data)}")
return etf_data, benchmark_data, valid_codes
return index_data, etf_data, etf_nav_data, benchmark_data, valid_codes
def __enter__(self):
"""上下文管理器入口"""

View File

@@ -80,110 +80,74 @@ def calculate_daily_return(price_series: pd.Series) -> pd.Series:
def compute_factors(
etf_data: pd.DataFrame,
index_data: pd.DataFrame,
code_list: list,
n: int = 25,
factor_type: str = "slope_r2",
etf_data: pd.DataFrame = None,
code_config: dict = None,
) -> tuple[pd.DataFrame, list]:
"""
计算所有指数的因子和日收益率
支持长格式数据混合数据源Tushare + YFinance
计算所有指数的因子和日收益率(支持指数-ETF双轨数据
Args:
etf_data: DataFrame, 长格式数据,包含 [code, close, source] 列
index_data: 指数价格数据(宽格式,用于因子计算)
code_list: 指数代码列表
n: 动量/趋势窗口
factor_type: 'momentum''slope_r2'
etf_data: ETF价格数据宽格式用于收益计算
code_config: 代码配置字典 {code: {name, etf, market}},用于判断是否为加密货币
Returns:
tuple: (result_df, valid_codes)
- result_df: 包含因子得分和日收益率的DataFrame
- valid_codes: 有效代码列表
"""
# 检查数据格式
if 'code' in etf_data.columns:
# 长格式数据 - 按 code 分别计算因子(旧逻辑,保留兼容)
all_factors = []
valid_codes = []
code_config = code_config or {}
for code in code_list:
code_data = etf_data[etf_data['code'] == code].copy()
if len(code_data) == 0:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
# 如果没有提供ETF数据创建一个空的DataFrame
if etf_data is None:
etf_data = pd.DataFrame()
# 检查缺失值
null_pct = code_data['close'].isnull().sum() / len(code_data)
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
continue
result = index_data.copy()
# 按日期排序
code_data = code_data.sort_index()
# 计算日收益率和因子
code_data[f"日收益率_{code}"] = calculate_daily_return(code_data['close'])
if factor_type == "momentum":
code_data[f"得分_{code}"] = calculate_momentum(code_data['close'], n)
elif factor_type == "slope_r2":
code_data[f"得分_{code}"] = calculate_slope_r2(code_data['close'], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 保留需要的列
code_data = code_data[[f"日收益率_{code}", f"得分_{code}"]]
all_factors.append(code_data)
# 过滤掉缺失值过多的指数
total_rows = len(result)
valid_codes = []
for code in code_list:
if code not in result.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code])
else:
valid_codes.append(code)
if not all_factors:
raise ValueError("没有有效的指数数据")
# 对有效指数计算因子和收益率
for code in valid_codes:
# 因子基于指数价格计算
if factor_type == "momentum":
result[f"得分_{code}"] = calculate_momentum(result[code], n)
elif factor_type == "slope_r2":
result[f"得分_{code}"] = calculate_slope_r2(result[code], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 合并所有因子的数据(按日期内连接 - 只保留所有指数都有数据的日期
result = all_factors[0]
for df in all_factors[1:]:
result = result.join(df, how='inner')
# 日收益率基于指数价格计算(回测使用指数价格
result[f"日收益率_{code}"] = calculate_daily_return(result[code])
# 删除所有得分都是 NaN 的行(即窗口期内的数据)
score_cols = [f"得分_{code}" for code in valid_codes]
# 只删除完全无法比较的行所有得分都是NaN
result = result.dropna(subset=score_cols, how='all')
else:
# 宽格式数据(向后兼容)
result = etf_data.copy()
# 过滤掉缺失值过多的指数
total_rows = len(result)
valid_codes = []
for code in code_list:
if code not in result.columns:
print(f" ⚠ 跳过 {code}: 不在数据中")
continue
null_pct = result[code].isnull().sum() / total_rows
if null_pct > 0.2:
print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高")
result = result.drop(columns=[code])
else:
valid_codes.append(code)
# 对有效指数计算因子
for code in valid_codes:
result[f"日收益率_{code}"] = calculate_daily_return(result[code])
if factor_type == "momentum":
result[f"得分_{code}"] = calculate_momentum(result[code], n)
elif factor_type == "slope_r2":
result[f"得分_{code}"] = calculate_slope_r2(result[code], n)
else:
raise ValueError(f"不支持的因子类型: {factor_type}")
# 按得分列做 dropna
score_cols = [f"得分_{code}" for code in valid_codes]
result = result.dropna(subset=score_cols)
# 按得分列做 dropna
score_cols = [f"得分_{code}" for code in valid_codes]
result = result.dropna(subset=score_cols)
print("\n因子计算完成:")
print(f" 因子类型: {factor_type}")
print(f" 窗口天数: {n}")
print(f" 有效指数: {len(valid_codes)}/{len(code_list)}")
print(f" 有效数据: {len(result)}")
if etf_data is not index_data:
print(f" 使用ETF数据计算收益: ✓")
return result, valid_codes

View File

@@ -59,22 +59,38 @@ def main():
from datetime import datetime
config['end_date'] = datetime.now().strftime('%Y-%m-%d')
# 从配置中读取 code_list 和 code_name_map
# code_list 现在是一个字典 {代码: 名称}
# 从配置中读取 code_list(新的配置格式:{代码: {name, etf, market}}
code_list_config = config.get('code_list', {})
# 提取代码列表和名称映射
if isinstance(code_list_config, dict):
code_list = list(code_list_config.keys())
code_name_map = code_list_config
# 构建 code_name_map: {代码: 名称}
code_name_map = {}
for code, cfg in code_list_config.items():
if isinstance(cfg, dict):
code_name_map[code] = cfg.get('name', code)
else:
# 兼容旧格式
code_name_map[code] = cfg
else:
# 兼容旧格式(列表)
code_list = code_list_config
code_name_map = DEFAULT_CODE_NAME_MAP
code_list_config = {}
benchmark_config = config.get('benchmark', {})
benchmark_name = benchmark_config.get('name', DEFAULT_BENCHMARK_NAME)
print(f"\n配置文件: {args.config}")
print(f"候选标的: {len(code_list)}")
# 统计ETF映射情况
etf_count = sum(1 for cfg in code_list_config.values() if isinstance(cfg, dict) and cfg.get('etf'))
crypto_count = sum(1 for cfg in code_list_config.values() if isinstance(cfg, dict) and cfg.get('market') == 'CRYPTO')
print(f" - ETF映射: {etf_count}")
print(f" - 直接交易: {crypto_count} 只(加密货币)")
print(f"回测区间: {config['start_date']} ~ {config['end_date']}")
print(f"因子类型: {config['factor_type']}")
print(f"窗口天数: {config['n_days']}")
@@ -82,8 +98,8 @@ def main():
print(f"调仓周期: {config['rebalance_days']}")
print(f"交易成本: {config['trade_cost']:.2%}")
# 更新 config 中的 code_list 为列表格式
config['code_list'] = code_list
# 保持 config 中的 code_list 为完整配置格式(用于引擎内部解析)
# 不需要修改 config['code_list'],引擎会直接使用原始配置
# 创建策略实例
strategy = RotationStrategy(config)
@@ -119,6 +135,10 @@ def main():
benchmark_name=benchmark_name,
save_path=args.save_path,
select_num=config["select_num"],
code_config=code_list_config, # 传入完整配置以显示ETF映射
index_data=strategy.index_data, # 传入指数数据
etf_price_data=strategy.etf_data, # 传入ETF价格数据
etf_nav_data_raw=strategy.etf_nav_data, # 传入ETF净值数据
)
elapsed = time.time() - start_time

View File

@@ -34,31 +34,40 @@ class RotationStrategy(BacktestStrategy):
self.backtest_result = None
def fetch_data(self) -> pd.DataFrame:
"""获取数据"""
"""获取数据(支持指数-ETF双轨数据"""
from config.settings import DEFAULT_BENCHMARK_CODE
# 从配置中读取基准代码,或使用默认值
benchmark_code = self.config.get("benchmark", {}).get("code", DEFAULT_BENCHMARK_CODE)
# 使用上下文管理器管理 SSH 隧道(如果是 YFinance 数据源
# 获取代码配置(包含 name, etf, market
code_config = self.config.get("code_list", {})
# 使用上下文管理器管理 SSH 隧道
with self.data_source:
etf_data, benchmark_data, valid_codes = self.data_source.fetch_all(
self.config["code_list"],
index_data, etf_data, etf_nav_data, benchmark_data, valid_codes = self.data_source.fetch_all(
code_config,
benchmark_code,
self.config["start_date"],
self.config["end_date"],
)
self.etf_data = etf_data
# 存储数据和配置
self.index_data = index_data # 指数数据(用于因子计算)
self.etf_data = etf_data # ETF价格数据用于收益计算
self.etf_nav_data = etf_nav_data # ETF净值数据用于溢价率计算
self.benchmark_data = benchmark_data
self.valid_codes = valid_codes
self.code_config = code_config # 代码配置(用于判断市场类型)
# 计算因子
# 计算因子传入两套数据指数数据用于因子ETF数据用于收益
factor_data, valid_codes = compute_factors(
etf_data,
index_data,
valid_codes,
n=self.config["n_days"],
factor_type=self.config["factor_type"],
etf_data=etf_data, # 传入ETF数据用于收益计算
code_config=code_config, # 传入配置以判断加密货币
)
self.data = factor_data

View File

@@ -0,0 +1,597 @@
# 跨市场ETF映射方案
## 背景问题
当前系统存在以下问题:
1. 配置中使用指数代码但实际交易的是ETF
2. 跨境ETF恒生科技ETF、纳指ETF在A股交易交易时间与标的指数不同
3. 回测收益使用指数价格与实际ETF收益存在跟踪误差
## 方案设计
### 核心思路
- **信号层**:使用指数数据计算因子,生成交易信号
- **执行层**使用ETF数据计算收益反映实际交易成本和跟踪误差
- **加密货币**:保持原样,直接在交易所买卖
### 数据流程
```
指数数据 → 因子计算 → 信号生成 → 映射到ETF → ETF收益计算
```
### Task 执行顺序
```
Task 1 (配置结构) ─────┬─→ Task 6 (配置解析)
Task 2 (ETF数据获取) ──┼─→ Task 3 (因子计算) ─→ Task 4 (轮动引擎)
│ │
Task 7 (溢价控制) ─────┴───────────────────────────→─┴─→ Task 5 (报告)
```
**推荐执行顺序**Task 1 → Task 6 → Task 2 → Task 7 → Task 3 → Task 4 → Task 5
---
## Task 1: 修改配置文件结构
修改 `config/strategies/rotation.yaml`,将简单的代码列表改为支持指数-ETF映射的结构
> **ETF 代码格式说明**Tushare 使用 `.SH`(上交所)和 `.SZ`(深交所)后缀
```yaml
code_list:
# A股指数 - index为指数代码(信号), etf为场内ETF代码(交易)
"000300.SH":
name: "沪深300"
etf: "510300.SH" # 华泰柏瑞沪深300ETF上交所
market: "A"
"000905.SH":
name: "中证500"
etf: "510500.SH" # 南方中证500ETF上交所
market: "A"
# 跨境ETF - 使用境外指数计算信号但交易A股ETF
"HSTECH":
name: "恒生科技"
etf: "513180.SH" # 华夏恒生科技ETF上交所
market: "HK"
"NDX":
name: "纳指100"
etf: "159501.SZ" # 嘉实纳指100ETF深交所- 流动性好
market: "US"
# 黄金 - A股黄金ETF
"GC=F":
name: "黄金"
etf: "518880.SH" # 华安黄金ETF上交所
market: "COMMODITY"
# 加密货币 - 无ETF映射直接交易
"BTC":
name: "比特币"
etf: null # 无ETF直接交易
market: "CRYPTO"
```
---
## Task 2: 创建ETF数据获取模块
`core/datasource/hybrid_source.py` 中新增ETF数据获取能力
1. 新增方法 `fetch_etf_data()` 获取A股ETF数据通过Tushare
2. 修改 `fetch_all()` 返回值同时返回指数数据和ETF数据
关键逻辑:
- A股ETF如510300.SH使用 Tushare 的 `fund_daily` 接口
- 所有ETF都在A股交易统一使用A股交易日历
```python
def fetch_all(...) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, list]:
"""
Returns:
(index_data, etf_data, benchmark_data, valid_codes)
- index_data: 指数数据,用于因子计算
- etf_data: ETF数据用于收益计算
"""
```
---
## Task 3: 修改因子计算逻辑
修改 `core/factors/momentum.py``compute_factors()` 函数:
1. 接收两套数据:指数数据(用于因子计算)+ ETF数据用于收益计算
2. 因子基于指数价格计算
3. 日收益率基于ETF价格计算或指数价格如果没有ETF映射
```python
def compute_factors(
index_data: pd.DataFrame, # 指数数据 - 用于因子
etf_data: pd.DataFrame, # ETF数据 - 用于收益
code_list: list,
...
) -> tuple[pd.DataFrame, list]:
```
---
## Task 4: 修改轮动引擎
修改 `strategies/rotation/engine.py`
1. `fetch_data()`: 同时获取指数和ETF数据
2. `generate_signals()`: 基于指数因子生成信号,**动态检查 ETF 可用性**
3. `run_backtest()`: 基于ETF价格计算收益
关键变更:
- 存储 `self.index_data``self.etf_data` 两套数据
- 信号基于 `index_data` 的因子得分
- 收益率基于 `etf_data` 的价格变动
- **新增**ETF 上市日期检查,未上市的 ETF 不参与当日排名
```python
def generate_signals(self) -> pd.DataFrame:
# 对每个交易日,检查 ETF 数据可用性
for date in trading_dates:
# 获取当日有 ETF 数据的标的
available_codes = [
code for code in self.valid_codes
if not pd.isna(self.etf_data.loc[date, code_etf_map[code]])
]
# 只在 available_codes 中选择信号
...
```
---
## Task 5: 修改报告和持仓跟踪
修改 `strategies/rotation/report.py``portfolio.py`
1. 报告中同时显示指数名称和ETF代码
2. 持仓记录使用ETF代码方便实际操作
3. 新增字段:实际交易标的、交易市场
4. **新增**:跨境 ETF 溢价率显示(使用 Task 7 的真实净值计算)
输出示例:
```
当前持仓建议:
1. 纳指100 (NDX) → 买入 513100.SH (国泰纳指100ETF) [溢价率: +3.2%] ⚠️
2. 沪深300 (000300.SH) → 买入 510300.SH (华泰柏瑞沪深300ETF) [溢价率: +0.1%]
3. 比特币 (BTC) → 买入 BTC/USDT (OKX交易所)
注: 溢价率 > 2% 显示警告标记
```
---
## Task 6: 更新配置解析逻辑
修改 `scripts/run_rotation.py``config/settings.py`
1. 解析新的配置结构提取指数代码列表和ETF映射
2. 构建 `code_name_map``code_etf_map``code_market_map`
3. 向下传递映射关系
---
## Task 7: 跨境 ETF 溢价控制(新增)
`strategies/rotation/engine.py` 中增加溢价过滤机制:
### 问题背景
当美股/港股大涨时A股跨境 ETF 往往高开甚至涨停,溢价率可达 3%-5%。如果在高溢价时买入,即使境外指数不跌,溢价回落也会造成亏损。
### 溢价率数据来源
**需要合并两个 Tushare 接口**(各需要 2000 积分):
1. `fund_daily`: 获取 ETF 交易价格 (close)
2. `fund_nav`: 获取 ETF 单位净值 (unit_nav)
```python
import tushare as ts
pro = ts.pro_api()
# 1. 获取 ETF 价格
price_df = pro.fund_daily(
ts_code='159501.SZ', # 纳指ETF
start_date='20200101',
end_date='20250101',
)
# 2. 获取 ETF 净值
nav_df = pro.fund_nav(
ts_code='159501.SZ',
start_date='20200101',
end_date='20250101',
)
# 3. 合并计算溢价率
price_df = price_df[['trade_date', 'close']].rename(columns={'trade_date': 'date'})
nav_df = nav_df[['nav_date', 'unit_nav']].rename(columns={'nav_date': 'date', 'unit_nav': 'nav'})
merged = price_df.merge(nav_df, on='date', how='inner')
# 溢价率 = 收盘价 / 净值 - 1
merged['premium'] = merged['close'] / merged['nav'] - 1
```
**实测结果**159501.SZ 纳指 ETF60 天数据):
- 平均溢价率: +4.34%
- 最大溢价率: +7.83%
- 溢价率 > 2% 的天数: 93.9%
**结论**:跨境 ETF 的高溢价是真实存在的风险,必须在策略中增加溢价过滤!
### Tushare 积分说明
| 接口 | 所需积分 | 用途 |
|------|---------|------|
| `fund_daily` | 2000 | ETF 日线行情(价格) |
| `fund_nav` | 2000 | ETF 单位净值 |
| `index_daily` | 2000 | A股指数日线 |
当前 2000 积分满足所有需求,限制:每分钟 200 次,每天 10 万次/API。
### 不同市场类型的溢价率计算方式
**核心逻辑**:在 T+1 日 09:00 计算信号时T 日净值是否已公布?
| ETF 类型 | 跟踪标的收盘时间 | 净值公布时间 | T+1日09:00可用净值 | 溢价率公式 | 溢价控制 |
|---------|-----------------|-------------|-------------------|-----------|---------|
| **A股 ETF** | T日 15:00 | T日 ~18:00 | T日净值 | `T日价格 / T日净值` | 可选(溢价通常 < 0.5% |
| **港股 ETF** | T日 16:00 | T日 ~18:00 | T日净值 | `T日价格 / T日净值` | **必须**溢价可达 3%+ |
| **黄金 ETF** | T日 15:00 | T日 ~18:00 | T日净值 | `T日价格 / T日净值` | 可选流动性好溢价小 |
| **美股 ETF** | T+1日 05:00 | T+1日 晚间 | **T-1日净值** | `T日价格 / T-1日净值` | **必须**溢价可达 5%+ |
> **注意**A股 ETF如沪深300 ETF流动性好、套利机制完善溢价率通常在 ±0.5% 以内,可不启用溢价过滤。跨境 ETF港股/美股)因套利限制,溢价率波动大,**必须启用过滤**。
**时间线对比**
```
A股/港股/黄金 ETF
T日 T+1日
├──────────────────────┼─────────────
15:00 ~18:00 09:00
收盘 净值公布 计算信号 → T日净值已可用 ✓
美股 ETF跨境
T日 T+1日
├────────────────────────────────┼────────────────────────────
15:00 22:30 05:00 09:00 ~晚间
ETF收盘 美股开盘 美股收盘 计算信号 T日净值公布
T日净值未公布只能用 T-1日净值
```
**这与集思录的计算方式一致**集思录对跨境 ETF 使用 `价格日期` `净值日期` 两列反映了实际数据可用性
### 解决方案
1. **计算实盘溢价率**按市场类型
```python
def calculate_realtime_premium(
etf_code: str,
market_type: str,
price_date: str,
) -> float:
"""
计算实盘可用的溢价率(在 T+1 日 09:00 决策时点)
Args:
etf_code: ETF 代码
market_type: 市场类型 ('A', 'HK', 'US', 'COMMODITY')
price_date: 价格日期T日即昨日
Returns:
溢价率(小数形式)
"""
pro = ts.pro_api()
# 获取 T 日 ETF 收盘价
price_df = pro.fund_daily(ts_code=etf_code, trade_date=price_date)
t_close = price_df['close'].iloc[0]
# 根据市场类型确定净值日期
if market_type in ['A', 'HK', 'COMMODITY']:
# A股/港股/黄金T日净值在决策时点已公布
nav_date = price_date # 同日
else: # US 美股
# 美股T日净值未公布使用 T-1 日净值
nav_date = get_previous_trade_date(price_date) # 前一日
# 获取净值
nav_df = pro.fund_nav(ts_code=etf_code, nav_date=nav_date)
nav = nav_df['unit_nav'].iloc[0]
# 计算溢价率
premium = t_close / nav - 1
return premium
```
2. **计算历史溢价率**回测阶段同日对齐
```python
def calculate_historical_premium(etf_code: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
计算历史溢价率(回测用,同日对齐)
"""
pro = ts.pro_api()
# 获取价格和净值
price_df = pro.fund_daily(ts_code=etf_code, start_date=start_date, end_date=end_date)
nav_df = pro.fund_nav(ts_code=etf_code, start_date=start_date, end_date=end_date)
# 合并(同日对齐)
price_df = price_df[['trade_date', 'close']].rename(columns={'trade_date': 'date'})
nav_df = nav_df[['nav_date', 'unit_nav']].rename(columns={'nav_date': 'date', 'unit_nav': 'nav'})
merged = price_df.merge(nav_df, on='date', how='inner')
# 溢价率 = 收盘价 / 净值 - 1
merged['premium'] = merged['close'] / merged['nav'] - 1
return merged
```
3. **信号过滤逻辑**
```python
# 配置参数
premium_threshold = 0.02 # 溢价超过 2% 不买入
def filter_by_premium(scores, premium_rates, threshold):
"""
过滤高溢价标的
- 溢价率超过阈值的标的,得分置为 -inf不参与排名
- 或者:得分乘以惩罚因子 (1 - premium_rate)
"""
for code in scores.index:
if premium_rates.get(code, 0) > threshold:
scores[code] = float('-inf') # 或降权处理
return scores
```
4. **配置文件新增参数**
```yaml
# rotation.yaml
premium_control:
enabled: true
default_threshold: 0.02 # 默认溢价阈值 2%
mode: "filter" # "filter"(完全排除) 或 "penalize"(降权)
penalty_factor: 0.5 # 降权模式下的惩罚系数
# 按市场类型覆盖配置
market_overrides:
A: # A股 ETF
enabled: false # 不启用(溢价通常 < 0.5%
HK: # 港股 ETF
enabled: true
threshold: 0.03 # 阈值 3%
US: # 美股 ETF
enabled: true
threshold: 0.02 # 阈值 2%
COMMODITY: # 商品 ETF
enabled: false
```
---
## 数据对齐策略
### 信号计算基准时点
策略信号在 **A股 T+1 日早上 09:00** 计算此时各市场数据可获取性
| 市场 | 收盘时间北京时间| 09:00时可获取数据 |
|------|------------------|------------------|
| A股 | T日 15:00 | T日收盘价 |
| 港股 | T日 16:00 | T日收盘价 |
| 美股 | T+1日 05:00 | T日收盘价 |
| 黄金期货 | T+1日 ~06:00 | T日结算价 |
| 加密货币 | 24小时交易 | T+1日 08:00价格UTC 00:00 |
**结论** A股 T+1 09:00 信号计算时所有市场的 T 日数据均已可获取无需 shift 处理
```
时间线(北京时间):
T日 T+1日
├─────────────────────────────────────┼─────────────────────────
15:00 16:00 05:00 08:00 09:00 09:30
A股收盘 港股收盘 美股收盘 UTC0 计算信号 A股开盘执行
│ │ │ │ │ │
└────────┴──────────────────┴──────┴──────┴─────────┘
所有T日数据在T+1日09:00前均已可获取
```
### 各市场数据对齐规则
```python
# A股 T+1 日 09:00 计算信号时,使用的数据
data_alignment = {
"A股指数": {
"price_date": "T日", # T日收盘价
"source_time": "T日 15:00",
},
"港股指数": {
"price_date": "T日", # T日收盘价
"source_time": "T日 16:00",
},
"美股指数": {
"price_date": "T日", # T日收盘价美东时间
"source_time": "T+1日 05:00", # 北京时间
},
"黄金期货": {
"price_date": "T日", # T日结算价
"source_time": "T+1日 ~06:00",
},
"加密货币": {
"price_date": "T日", # UTC T+1日 00:00 = 北京 T+1日 08:00
"source_time": "T+1日 08:00", # 比决策时点早1小时
},
}
```
### 加密货币特殊处理
加密货币 24 小时交易使用 **UTC 00:00北京时间 08:00** 作为"日收盘价"
- CCXT/OKX 返回的日线数据默认就是 UTC 00:00 切换
- 北京时间 08:00 距离 09:00 信号计算只有 1 小时数据足够新鲜
- 与其他数据源保持一致的数据结构
**A股休市期间的处理**
```
场景A股春节休市 5 天
1/20(五) 1/21-22(周末) 1/23-27(春节) 1/28(六) 1/29(日) 1/30(一)
A股 交易 休市 休市 休市 休市 交易
BTC 交易 交易 交易 交易 交易 交易
│ │
使用1/20价格 ────────────── ffill ──────────────────────→ 使用1/29价格
```
- 因子计算使用前向填充休市期间认为价格不变
- 实际交易加密货币可在A股休市期间随时买卖但策略信号只在A股交易日生成
### 数据对齐代码实现
```python
def align_to_a_share_calendar(
market_data: dict[str, pd.DataFrame], # {code: df}
market_types: dict[str, str], # {code: market_type}
a_share_dates: pd.DatetimeIndex,
) -> pd.DataFrame:
"""
将所有市场数据对齐到A股交易日历
由于信号在 T+1 日 09:00 计算,所有市场 T 日数据均已可获取,
因此直接 reindex 到 A股交易日即可无需 shift。
"""
aligned_data = {}
for code, df in market_data.items():
market = market_types.get(code, "A")
# 统一处理reindex 到 A股交易日前向填充休市日
aligned = df['close'].reindex(a_share_dates)
if market in ["HK", "US", "COMMODITY", "CRYPTO"]:
# 非A股市场休市日用前向填充
aligned = aligned.ffill().bfill()
aligned_data[code] = aligned
return pd.DataFrame(aligned_data)
```
### 跨境ETF信号与收益分离
以恒生科技为例
- **信号计算**使用 HSTECH 港股指数YFinance反映真实市场走势
- **收益计算**使用 513180.SH A股ETFTushare反映实际交易成本和跟踪误差
### 特殊情况处理
| 情况 | 处理方式 |
|------|---------|
| A股休市境外交易 | 指数使用 ffillETF无数据不计收益 |
| 境外休市A股交易 | 指数使用 ffillETF正常交易可能跳空 |
| 两边都休市 | 该日不在回测范围内 |
| ETF上市日期晚于指数 | 该标的从ETF上市日开始参与回测 |
---
## 涉及文件清单
| 文件 | 修改内容 |
|------|---------|
| `config/strategies/rotation.yaml` | 配置结构重构新增溢价控制参数 |
| `core/datasource/hybrid_source.py` | 新增ETF数据获取 |
| `core/factors/momentum.py` | 支持双轨数据输入 |
| `strategies/rotation/engine.py` | 引擎逻辑重构ETF可用性检查溢价过滤 |
| `strategies/rotation/portfolio.py` | 持仓显示ETF |
| `strategies/rotation/report.py` | 报告显示ETF和溢价率 |
| `scripts/run_rotation.py` | 配置解析适配 |
| `config/settings.py` | 新增默认ETF映射和汇率配置 |
---
## 风险与注意事项
1. **跟踪误差**ETF净值与指数存在偏差这是预期行为
2. **停牌风险**ETF可能停牌需要处理缺失数据
3. **新ETF上市**部分ETF成立时间晚于回测起始日Task 4 已处理
4. **QDII额度**跨境ETF可能有申购限制不影响回测
5. **跨境ETF溢价**美股/港股大涨时ETF可能高溢价Task 7 已增加过滤机制
6. **Tushare 积分**当前 2000 积分满足 fund_dailyfund_nav index_daily 需求建议缓存历史数据减少 API 调用
7. **ETF 选择建议**
- 同一指数可能有多只 ETF如纳指有 513100/159501/159632
- 优先选择规模大> 10 亿)、流动性好(日成交 > 1000 万)、跟踪误差小的 ETF
- 本方案使用 159501.SZ嘉实纳指100是因为其流动性较好
---
## 关于加密货币数据精度的说明
建议文档中提到"BTC 应使用分钟级数据切片到 09:00",经评估:
| 方案 | 优点 | 缺点 |
|------|------|------|
| 日线数据08:00 UTC | 数据量小,易获取,与其他市场一致 | 与决策时点有 1 小时差距 |
| 分钟级切片09:00 | 数据最新 | 5年历史需 260 万条数据,获取/存储成本高 |
**决策**
- **回测阶段**使用日线数据UTC 00:0008:00-09:00 的波动(平均 ~1%)对 25 天趋势因子影响极小,可通过交易成本参数覆盖
- **实盘阶段**(未来增强):可在执行时获取实时价格,信号计算仍基于日线
---
## ffill 时机说明
建议文档担心"ffill 后立即计算因子会导致休市期间价格不变影响动量"。实际上:
- 我们使用**交易日对齐**reindex 到 A股交易日
- 动量计算的是过去 N 个**交易日**的涨幅,非 N 个自然日
- ffill 只填充非交易日,这些日期不进入因子计算序列
- 当前逻辑正确,无需调整
### 详细说明:为什么 ffill 是安全的
**核心保障**:主循环 `for date in a_share_trading_dates` 严格只遍历 A 股交易日。
**实现方式**`reindex(a_share_dates)` 直接索引到 A 股交易日历,结果只包含交易日行,不会产生中间填充行。
**示例:春节休市场景**
```
A股交易日历: [1/20(周五), 1/30(周一)] (春节休市 1/21-1/29)
美股原始数据: 1/20→200, 1/21→205, ..., 1/29→220
Step 1: reindex(a_share_dates)
1/20: 200 (原始值)
1/30: NaN (1/30 美股尚未收盘,无数据)
注意:结果只有 2 行,不会出现 [1/21, 1/22, ...] 这些行!
Step 2: ffill()
1/20: 200
1/30: 220 (向前找到 1/29 的值填充)
Step 3: 动量计算 pct_change(1)
1/20: NaN (首行无前值)
1/30: (220 - 200) / 200 = 10% ← 正确反映假期全部涨幅
```
**关键区别**
| 错误做法 | 正确做法(当前方案) |
|---------|-------------------|
| reindex 到自然日历 → ffill → 产生 `[D1,D1,D1,D1,D1,D2]` | reindex 到 A股交易日历 → ffill → 只有 `[D1, D2]` |
| 中间填充日会干扰动量计算 | 根本不存在中间填充日 |
**结论**:当前方案直接 reindex 到 A 股交易日历,是最安全的实现方式。