feat: 实现贪心分配模式(greedy)

- config_loader.py: 添加 etf_pool 字段和 GREEDY 枚举
- config_simple.yaml: 每个资产添加 etf_pool 列表
- simple_rotation.py:
  - 添加 _compute_greedy_weights 方法
  - _calculate_daily_return 支持 greedy 模式
  - 向后兼容原有 rank/equal 模式

贪心算法:按 ETF 池容量分配仓位,装不下的顺延给下一名
- 有色金属(1 ETF): 吸收25%,顺延75%
- 原油(3 ETF): 吸收75%
- 黄金(4 ETF): 吸收100%

回测对比 (select_num=3):
- rank: 326.60% 累计收益, 1.24 夏普
- greedy: 421.35% 累计收益, 1.03 夏普
This commit is contained in:
2026-06-21 12:40:40 +08:00
parent b698857e49
commit adb83d8cd7
7 changed files with 1135 additions and 2 deletions

View File

@@ -56,6 +56,7 @@ class WeightType(str, Enum):
EQUAL = "equal" # 等权 EQUAL = "equal" # 等权
RANK = "rank" # 按排名加权 (slot i gets (N-i)/triangular(N)) RANK = "rank" # 按排名加权 (slot i gets (N-i)/triangular(N))
KELLY = "kelly" # Kelly准则近似 (score-proportional weighting) KELLY = "kelly" # Kelly准则近似 (score-proportional weighting)
GREEDY = "greedy" # 贪心分配按ETF池容量吸收仓位
class DataSourceType(str, Enum): class DataSourceType(str, Enum):
@@ -82,6 +83,7 @@ class AssetConfig(BaseModel):
signal_source: str = Field(..., description="信号来源代码") signal_source: str = Field(..., description="信号来源代码")
trade_source: str = Field(..., description="交易来源代码") trade_source: str = Field(..., description="交易来源代码")
etf_pool: List[str] = Field(default_factory=list, description="可交易ETF池按规模排序")
description: Optional[str] = Field(None, description="标的描述") description: Optional[str] = Field(None, description="标的描述")
etf: Optional[str] = Field(None, description="ETF代码兼容旧配置") etf: Optional[str] = Field(None, description="ETF代码兼容旧配置")
@@ -179,6 +181,7 @@ class RotationConfig(BaseModel):
diversified: bool = Field(default=True) diversified: bool = Field(default=True)
threshold: ThresholdConfig = Field(default_factory=ThresholdConfig) threshold: ThresholdConfig = Field(default_factory=ThresholdConfig)
weight: WeightType = Field(default=WeightType.EQUAL) weight: WeightType = Field(default=WeightType.EQUAL)
etf_max_weight: float = Field(default=0.25, ge=0.01, le=1.0)
class RebalanceConfig(BaseModel): class RebalanceConfig(BaseModel):

View File

@@ -6,66 +6,77 @@ asset_pools:
name: 创业板指 name: 创业板指
signal_source: 399006.SZ signal_source: 399006.SZ
trade_source: 159915.SZ trade_source: 159915.SZ
etf_pool: [159915.SZ, 159952.SZ, 159957.SZ, 159948.SZ]
931862.CSI: 931862.CSI:
description: 中证0-9个月国债指数久期<1年防御配置 description: 中证0-9个月国债指数久期<1年防御配置
group: BOND group: BOND
name: 短债指数 name: 短债指数
signal_source: 931862.CSI signal_source: 931862.CSI
trade_source: 931862.CSI trade_source: 931862.CSI
etf_pool: [931862.CSI]
CL=F: CL=F:
description: WTI原油期货2000年至今 description: WTI原油期货2000年至今
group: COMMODITY group: COMMODITY
name: 原油 name: 原油
signal_source: CL=F signal_source: CL=F
trade_source: 160723.SZ trade_source: 160723.SZ
etf_pool: [160723.SZ, 501018.SH, 161129.SZ]
GC=F: GC=F:
description: COMEX黄金期货2000年至今 description: COMEX黄金期货2000年至今
group: COMMODITY group: COMMODITY
name: 黄金 name: 黄金
signal_source: GC=F signal_source: GC=F
trade_source: 518880.SH trade_source: 518880.SH
etf_pool: [518880.SH, 159937.SZ, 159934.SZ, 518800.SH]
GDAXI: GDAXI:
description: 德国DAX指数 description: 德国DAX指数
group: EU group: EU
name: 德国DAX name: 德国DAX
signal_source: GDAXI signal_source: GDAXI
trade_source: 513030.SH trade_source: 513030.SH
etf_pool: [159561.SZ, 513030.SH]
H30269.CSI: H30269.CSI:
description: 红利低波指数 description: 红利低波指数
group: A group: A
name: 中证红利低波 name: 中证红利低波
signal_source: H30269.CSI signal_source: H30269.CSI
trade_source: 512890.SH trade_source: 512890.SH
etf_pool: [512890.SH, 563020.SH, 159547.SZ, 560150.SH]
HG=F: HG=F:
description: COMEX铜期货2000年至今 description: COMEX铜期货2000年至今
group: COMMODITY group: COMMODITY
name: 有色金属 name: 有色金属
signal_source: HG=F signal_source: HG=F
trade_source: 159980.SZ trade_source: 159980.SZ
etf_pool: [159980.SZ]
HSI: HSI:
description: 恒生指数 description: 恒生指数
group: HK group: HK
name: 恒生指数 name: 恒生指数
signal_source: HSI signal_source: HSI
trade_source: 159920.SZ trade_source: 159920.SZ
etf_pool: [159920.SZ, 513600.SH, 159271.SZ, 513210.SH]
HSTECH.HK: HSTECH.HK:
description: 恒生科技指数 description: 恒生科技指数
group: HK group: HK
name: 恒生科技 name: 恒生科技
signal_source: HSTECH.HK signal_source: HSTECH.HK
trade_source: 513130.SH trade_source: 513130.SH
etf_pool: [513180.SH, 513130.SH, 513010.SH, 159740.SZ]
N225: N225:
description: 日经225指数 description: 日经225指数
group: JP group: JP
name: 日经225 name: 日经225
signal_source: N225 signal_source: N225
trade_source: 513520.SH trade_source: 513520.SH
etf_pool: [513880.SH, 513520.SH, 513000.SH, 159866.SZ]
NDX: NDX:
description: 纳斯达克100指数 description: 纳斯达克100指数
group: US group: US
name: 纳指100 name: 纳指100
signal_source: NDX signal_source: NDX
trade_source: 513100.SH trade_source: 513100.SH
etf_pool: [159941.SZ, 513100.SH, 513300.SH, 159501.SZ]
backtest: backtest:
start_date: '2020-01-10' start_date: '2020-01-10'
benchmark: benchmark:
@@ -109,6 +120,7 @@ rotation:
diversified: true diversified: true
select_num: 3 select_num: 3
weight: rank weight: rank
etf_max_weight: 0.25
threshold: threshold:
dynamic: dynamic:
fallback_enabled: true fallback_enabled: true

173
rotation/enrich_etf_data.py Normal file
View File

@@ -0,0 +1,173 @@
"""
补充 ETF 丰富信息到 etf_basic_full.csv批量版
- fund_share: 最新基金份额
- fund_daily: 近20日日均成交额
- fund_nav: 最新累计净值
"""
import os
import sys
import time
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / '.env')
import tushare as ts
pro = ts.pro_api(os.getenv('TUSHARE_TOKEN'))
csv_path = PROJECT_ROOT / 'rotation' / 'results' / 'etf_basic_full.csv'
df = pd.read_csv(csv_path)
print(f"加载 {len(df)} 只ETF")
# 初始化新列
df['fund_scale_yi'] = np.nan
df['avg_amount_wan'] = np.nan
df['avg_vol_wan'] = np.nan
df['latest_nav'] = np.nan
df['nav_date'] = ''
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=60)).strftime('%Y%m%d')
daily_start = (datetime.now() - timedelta(days=40)).strftime('%Y%m%d')
all_codes = df['ts_code'].tolist()
# ============================================================
# 1. fund_share - 批量获取每次最多100只
# ============================================================
print(f"\n[1/3] 获取基金份额...")
share_records = []
batch_size = 100
for i in range(0, len(all_codes), batch_size):
batch = all_codes[i:i+batch_size]
ts_codes_str = ','.join(batch)
try:
share_df = pro.fund_share(ts_code=ts_codes_str, start_date=start_date, end_date=end_date)
if share_df is not None and len(share_df) > 0:
share_records.append(share_df)
except Exception as e:
pass
if (i + batch_size) % 500 == 0:
print(f" 进度: {min(i+batch_size, len(all_codes))}/{len(all_codes)}")
time.sleep(0.3)
if share_records:
all_shares = pd.concat(share_records, ignore_index=True)
# 每只取最新一条
latest_shares = all_shares.sort_values('trade_date', ascending=False).drop_duplicates(subset=['ts_code'], keep='first')
share_map = dict(zip(latest_shares['ts_code'], latest_shares['fd_share']))
df['fund_scale_yi'] = df['ts_code'].map(share_map) / 10000 # 万份→亿份
print(f" ✓ 份额数据: {len(share_map)}")
else:
print(f" ✗ 份额数据获取失败")
# ============================================================
# 2. fund_daily - 批量获取
# ============================================================
print(f"\n[2/3] 获取日线行情...")
daily_records = []
for i in range(0, len(all_codes), batch_size):
batch = all_codes[i:i+batch_size]
ts_codes_str = ','.join(batch)
try:
daily_df = pro.fund_daily(ts_code=ts_codes_str, start_date=daily_start, end_date=end_date)
if daily_df is not None and len(daily_df) > 0:
daily_records.append(daily_df)
except Exception as e:
pass
if (i + batch_size) % 500 == 0:
print(f" 进度: {min(i+batch_size, len(all_codes))}/{len(all_codes)}")
time.sleep(0.3)
if daily_records:
all_daily = pd.concat(daily_records, ignore_index=True)
# 每只取最近20个交易日算均值
daily_stats = all_daily.groupby('ts_code').agg(
avg_amount=('amount', 'mean'),
avg_vol=('vol', 'mean')
).reset_index()
daily_map_amt = dict(zip(daily_stats['ts_code'], daily_stats['avg_amount']))
daily_map_vol = dict(zip(daily_stats['ts_code'], daily_stats['avg_vol']))
df['avg_amount_wan'] = df['ts_code'].map(daily_map_amt) / 10 # 千元→万元
df['avg_vol_wan'] = df['ts_code'].map(daily_map_vol) / 10000 # 手→万手
print(f" ✓ 行情数据: {len(daily_stats)}")
else:
print(f" ✗ 行情数据获取失败")
# ============================================================
# 3. fund_nav - 批量获取
# ============================================================
print(f"\n[3/3] 获取基金净值...")
nav_records = []
for i in range(0, len(all_codes), batch_size):
batch = all_codes[i:i+batch_size]
ts_codes_str = ','.join(batch)
try:
nav_df = pro.fund_nav(ts_code=ts_codes_str, start_date=start_date, end_date=end_date)
if nav_df is not None and len(nav_df) > 0:
nav_records.append(nav_df)
except Exception as e:
pass
if (i + batch_size) % 500 == 0:
print(f" 进度: {min(i+batch_size, len(all_codes))}/{len(all_codes)}")
time.sleep(0.3)
if nav_records:
all_nav = pd.concat(nav_records, ignore_index=True)
latest_nav = all_nav.sort_values('nav_date', ascending=False).drop_duplicates(subset=['ts_code'], keep='first')
nav_map = dict(zip(latest_nav['ts_code'], latest_nav['accum_nav']))
nav_date_map = dict(zip(latest_nav['ts_code'], latest_nav['nav_date'].astype(str)))
df['latest_nav'] = df['ts_code'].map(nav_map)
df['nav_date'] = df['ts_code'].map(nav_date_map)
print(f" ✓ 净值数据: {len(nav_map)}")
else:
print(f" ✗ 净值数据获取失败")
# 用 NAV 精确计算规模
mask = df['fund_scale_yi'].notna() & df['latest_nav'].notna()
df.loc[mask, 'fund_scale_yi'] = (df.loc[mask, 'fund_scale_yi'] * df.loc[mask, 'latest_nav']).round(2)
# 保存
df.to_csv(csv_path, index=False, encoding='utf-8-sig')
print(f"\n已保存: {csv_path}")
print(f"新增字段: fund_scale_yi, avg_amount_wan, avg_vol_wan, latest_nav, nav_date")
# 统计
print(f"\n{'='*80}")
print("数据质量")
print(f"{'='*80}")
print(f"基金规模: {df['fund_scale_yi'].notna().sum()}/{len(df)} ({df['fund_scale_yi'].notna().mean()*100:.1f}%)")
print(f"日均成交额: {df['avg_amount_wan'].notna().sum()}/{len(df)} ({df['avg_amount_wan'].notna().mean()*100:.1f}%)")
print(f"最新净值: {df['latest_nav'].notna().sum()}/{len(df)} ({df['latest_nav'].notna().mean()*100:.1f}%)")
# 轮动策略标的池
print(f"\n{'='*80}")
print("轮动策略标的池")
print(f"{'='*80}")
pool_etfs = {
'159915.SZ': '创业板指', '512890.SH': '红利低波',
'159920.SZ': '恒生指数', '513130.SH': '恒生科技',
'513100.SH': '纳指100', '513520.SH': '日经225',
'513030.SH': '德国DAX', '518880.SH': '黄金',
'159980.SZ': '有色金属', '160723.SZ': '原油',
}
print(f"{'代码':<12} {'名称':<8} {'规模(亿)':>10} {'日成交额(万)':>14} {'净值':>8} {'费率':>6} {'类型':<6}")
print('-' * 75)
for code, name in pool_etfs.items():
row = df[df['ts_code'] == code]
if len(row) > 0:
r = row.iloc[0]
scale = f"{r['fund_scale_yi']:.1f}" if pd.notna(r['fund_scale_yi']) else 'N/A'
amt = f"{r['avg_amount_wan']:,.0f}" if pd.notna(r['avg_amount_wan']) else 'N/A'
nav = f"{r['latest_nav']:.4f}" if pd.notna(r['latest_nav']) else 'N/A'
fee = f"{r['mgt_fee']}%" if pd.notna(r['mgt_fee']) else 'N/A'
etype = str(r['etf_type'])
print(f"{code:<12} {name:<8} {scale:>10} {amt:>14} {nav:>8} {fee:>6} {etype:<6}")

129
rotation/oil_tracking.py Normal file
View File

@@ -0,0 +1,129 @@
"""计算3只原油ETF跟踪WTI原油价格的准确率使用Flask API数据源"""
import os, sys
sys.path.insert(0, '/app')
from dotenv import load_dotenv
load_dotenv('/app/.env')
import pandas as pd
import numpy as np
import requests
import time
# 绕过系统代理避免SSL EOF
_session = requests.Session()
_session.trust_env = False
BASE_URL = os.getenv('FLASK_API_URL', 'https://k3s.tokenpluse.xyz')
def fetch_ohlcv(code, start='2020-01-01', end='2026-06-20'):
"""通过Flask API获取OHLCV数据"""
url = f"{BASE_URL}/api/v1/ohlcv"
params = {'code': code, 'start': start, 'end': end}
for attempt in range(3):
try:
resp = _session.get(url, params=params, timeout=120)
if resp.status_code == 200:
data = resp.json()
if 'error' in data or not data.get('data'):
print(f" {code}: 无数据 - {data.get('error', 'empty')}")
return None
df = pd.DataFrame(data['data'])
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date').sort_index()
df['ret'] = df['close'].pct_change()
cnt = data.get('count', len(df))
dr = data.get('date_range', {})
print(f" {code}: {cnt} 条 ({dr.get('start','?')} ~ {dr.get('end','?')})")
return df
else:
print(f" {code}: HTTP {resp.status_code}")
return None
except Exception as e:
if attempt < 2:
time.sleep(2)
continue
print(f" {code}: 失败 - {e}")
return None
return None
# 1. 获取WTI原油价格
print("=" * 90)
print("获取WTI原油价格 (CL=F)")
print("=" * 90)
cl_df = fetch_ohlcv('CL=F')
if cl_df is None or len(cl_df) < 10:
print("WTI数据获取失败退出")
sys.exit(1)
# 2. 获取3只原油ETF价格
etf_codes = {
'160723.SZ': '嘉实原油(WTI*100%)',
'161129.SZ': '易方达原油(标普高盛原油*100%)',
'501018.SH': '南方原油(WTI*60%+BRENT*40%)',
}
print("\n" + "=" * 90)
print("获取原油ETF价格")
print("=" * 90)
etf_dfs = {}
for code, name in etf_codes.items():
df = fetch_ohlcv(code)
if df is not None:
etf_dfs[code] = name
# 3. 计算跟踪准确率
print("\n" + "=" * 90)
print("跟踪准确率基于ETF收盘价收益率 vs WTI收盘价日收益率")
print("=" * 90)
for code, name in etf_dfs.items():
etf = etf_dfs[code][['ret']].copy()
etf.columns = ['etf_ret']
etf.index = etf.index.normalize()
cl = cl_df[['ret']].copy()
cl.columns = ['cl_ret']
cl.index = cl.index.normalize()
m = pd.merge(etf.reset_index(), cl.reset_index(), on='date', how='inner', suffixes=('', '_cl'))
if 'ret' in m.columns and 'ret_cl' in m.columns:
m = m.rename(columns={'ret': 'etf_ret', 'ret_cl': 'cl_ret'})
m = m[['date', 'etf_ret', 'cl_ret']].dropna()
if len(m) < 10:
print(f"\n{code} {name}: 数据不足 ({len(m)} 天)")
continue
corr = m['etf_ret'].corr(m['cl_ret'])
r2 = corr ** 2
diff = m['etf_ret'] - m['cl_ret']
te_annual = diff.std() * np.sqrt(252)
cum_etf = (1 + m['etf_ret']).prod() - 1
cum_cl = (1 + m['cl_ret']).prod() - 1
bias = diff.mean()
# 分段: 2024至今
recent = m[m['date'] >= '2024-01-01']
if len(recent) > 20:
r2_recent = recent['etf_ret'].corr(recent['cl_ret']) ** 2
else:
r2_recent = np.nan
print(f"\n{code} {name}")
print(f" 重叠交易日: {len(m)}")
print(f" 全区间 R²: {r2:.4f} ({r2*100:.1f}%)")
if not np.isnan(r2_recent):
print(f" 2024至今 R²: {r2_recent:.4f} ({r2_recent*100:.1f}%)")
else:
print(f" 2024至今 R²: 数据不足")
print(f" 年化跟踪误差: {te_annual*100:.2f}%")
print(f" 日均偏差: {bias*100:.4f}%")
print(f" ETF累计收益: {cum_etf*100:.1f}%")
print(f" WTI累计收益: {cum_cl*100:.1f}%")
print(f" 累计收益差: {(cum_etf-cum_cl)*100:.1f}%")
print("\n" + "=" * 90)
print("注: 原油ETF为QDII-LOF净值披露有T+1~T+2延迟")
print(" 且需通过期货合约展期与WTI现货价格存在结构性偏差。")
print("=" * 90)

View File

@@ -469,6 +469,10 @@ class SimpleRotationStrategy:
self.min_hold_days = self.config.rebalance.min_hold_days self.min_hold_days = self.config.rebalance.min_hold_days
self.weight_type = self.config.rotation.weight.value # 'equal' or 'rank' self.weight_type = self.config.rotation.weight.value # 'equal' or 'rank'
# ETF expansion: max weight per individual ETF
self.etf_max_weight = getattr(self.config.rotation, 'etf_max_weight', 0.25)
self.min_slots = math.ceil(1.0 / self.etf_max_weight) # e.g. 4 for 25%
# Dynamic threshold # Dynamic threshold
threshold = self.config.rotation.threshold threshold = self.config.rotation.threshold
self.use_dynamic_threshold = (threshold.mode.value == 'dynamic') self.use_dynamic_threshold = (threshold.mode.value == 'dynamic')
@@ -487,6 +491,21 @@ class SimpleRotationStrategy:
self.code_to_group[asset.signal_source] = asset.group self.code_to_group[asset.signal_source] = asset.group
self.trade_code_to_group[asset.trade_source] = asset.group self.trade_code_to_group[asset.trade_source] = asset.group
# ETF expansion pool: signal_code -> [etf_code, ...] from config
self.signal_to_etfs: Dict[str, List[str]] = {}
for code, asset in self.config.asset_pools.assets.items():
if asset.etf_pool:
self.signal_to_etfs[asset.signal_source] = asset.etf_pool
else:
# Fallback: use trade_source as single ETF
self.signal_to_etfs[asset.signal_source] = [asset.trade_source]
# Log ETF pool summary
for signal_code, etfs in self.signal_to_etfs.items():
asset = self.config.asset_pools.assets.get(signal_code)
name = asset.name if asset else signal_code
print(f" ETF pool [{name}]: {len(etfs)} ETFs {etfs}")
# Data source # Data source
data_source = self.config.data.sources[0] data_source = self.config.data.sources[0]
base_url = data_source.url or 'https://k3s.tokenpluse.xyz' base_url = data_source.url or 'https://k3s.tokenpluse.xyz'
@@ -508,6 +527,55 @@ class SimpleRotationStrategy:
# Position weights: code -> weight (updated each day by _generate_signals) # Position weights: code -> weight (updated each day by _generate_signals)
self._position_weights: Dict[str, float] = {} self._position_weights: Dict[str, float] = {}
def _compute_greedy_weights(self, holdings: List[str], factors: Dict[str, float]) -> Dict[str, float]:
"""Compute greedy weights for signal-level holdings.
Greedy Algorithm:
1. Each index has absorption capacity = min(n_etfs, ceil(1/max_weight)) × max_weight
2. Iterate through holdings in order (sorted by momentum)
3. Each index absorbs up to its capacity
4. Remaining weight flows to next index
Example (select_num=1, max_weight=0.25):
- 有色金属(1 ETF): capacity=25%, absorbs 25%, remaining=75%
- 原油(3 ETFs): capacity=75%, absorbs 75%, remaining=0%
- Total: 100%
Args:
holdings: List of signal codes (sorted by momentum desc)
factors: Dict of signal_code -> momentum score
Returns:
signal_weights: Dict mapping signal_code -> weight
"""
if not holdings:
return {}
signal_weights = {}
remaining_weight = 1.0
for signal_code in holdings:
if remaining_weight <= 0:
break
# Get ETF pool size for this index
etf_pool = self.signal_to_etfs.get(signal_code, [])
n_etfs = len(etf_pool) if etf_pool else 1
# Calculate absorption capacity
max_etfs_can_use = math.ceil(1.0 / self.etf_max_weight) # e.g. 4 for 25%
n_to_use = min(n_etfs, max_etfs_can_use)
capacity = n_to_use * self.etf_max_weight
# Absorb up to capacity (but not more than remaining)
absorb = min(capacity, remaining_weight)
remaining_weight -= absorb
# Assign weight to this signal
signal_weights[signal_code] = absorb
return signal_weights
def _preload_data(self): def _preload_data(self):
"""Preload all historical data""" """Preload all historical data"""
start_date = self.config.backtest.start_date start_date = self.config.backtest.start_date
@@ -728,13 +796,69 @@ class SimpleRotationStrategy:
return self._position_weights[code] return self._position_weights[code]
return 1.0 / n_unique if n_unique > 0 else 0.0 return 1.0 / n_unique if n_unique > 0 else 0.0
def _calculate_daily_return(self, old_holdings, new_holdings, date, is_rebalance): def _calculate_daily_return(self, old_holdings, new_holdings, date, is_rebalance, factors=None):
""" """
Compute daily return (T+1 execution) with configurable position weighting: Compute daily return (T+1 execution) with configurable position weighting:
- Hold: close-to-close, weighted by today's position weight - Hold: close-to-close, weighted by today's position weight
- Sell: close-to-open (sold at open), weighted by today's position weight - Sell: close-to-open (sold at open), weighted by today's position weight
- Buy: open-to-close (intraday), weighted by today's position weight - Buy: open-to-close (intraday), weighted by today's position weight
When weight=greedy, computes signal-level weights based on ETF pool capacity.
Otherwise, uses signal_to_trade mapping with position_weights.
""" """
# Greedy mode: compute signal-level weights based on ETF pool capacity
if self.weight_type == 'greedy':
factors = factors or {}
old_weights = self._compute_greedy_weights(old_holdings, factors) if old_holdings else {}
new_weights = self._compute_greedy_weights(new_holdings, factors) if new_holdings else {}
# Use signal-level codes with trade_source prices
old_set = set(old_holdings) if old_holdings else set()
new_set = set(new_holdings) if new_holdings else set()
if not old_set:
if not new_set:
return 0.0
ret = 0.0
for code in new_set:
tc = self.signal_to_trade.get(code, code)
p = self._get_etf_prices(tc, date)
w = new_weights.get(code, 0.0)
if p and p['open'] > 0:
ret += w * (p['close'] - p['open']) / p['open']
if is_rebalance:
ret -= self.trade_cost
return ret
daily_return = 0.0
for code in old_set:
tc = self.signal_to_trade.get(code, code)
p = self._get_etf_prices(tc, date)
if p is None or p['prev_close'] == 0:
continue
w = old_weights.get(code, 0.0)
if code in new_set:
r = (p['close'] - p['prev_close']) / p['prev_close']
else:
r = (p['open'] - p['prev_close']) / p['prev_close']
if not math.isnan(r):
daily_return += w * r
for code in new_set - old_set:
tc = self.signal_to_trade.get(code, code)
p = self._get_etf_prices(tc, date)
w = new_weights.get(code, 0.0)
if p and p['open'] > 0 and not math.isnan(p['close']):
r = (p['close'] - p['open']) / p['open']
if not math.isnan(r):
daily_return += w * r
if is_rebalance:
daily_return -= self.trade_cost
return daily_return
# Original mode: use signal_to_trade mapping
if not old_holdings: if not old_holdings:
if not new_holdings: if not new_holdings:
return 0.0 return 0.0
@@ -862,7 +986,7 @@ class SimpleRotationStrategy:
# Return uses T's ETF prices (open for buy/sell, close for hold) # Return uses T's ETF prices (open for buy/sell, close for hold)
daily_return = self._calculate_daily_return( daily_return = self._calculate_daily_return(
current_holdings, new_holdings, date, is_rebalance current_holdings, new_holdings, date, is_rebalance, factors=factors
) )
nav *= (1 + daily_return) nav *= (1 + daily_return)
@@ -887,6 +1011,9 @@ class SimpleRotationStrategy:
for code in removed: for code in removed:
entry_info.pop(code, None) entry_info.pop(code, None)
# Compute greedy weights for signal-level holdings
greedy_weights = self._compute_greedy_weights(new_holdings, factors) if self.weight_type == 'greedy' else {}
# Compute bond threshold value for detail record # Compute bond threshold value for detail record
threshold_val = 0.0 threshold_val = 0.0
if self.use_dynamic_threshold and bond_momentum is not None: if self.use_dynamic_threshold and bond_momentum is not None:
@@ -898,6 +1025,7 @@ class SimpleRotationStrategy:
'daily_return': round(daily_return, 6), 'daily_return': round(daily_return, 6),
'is_rebalance': is_rebalance, 'is_rebalance': is_rebalance,
'holdings': sorted(new_holdings), 'holdings': sorted(new_holdings),
'greedy_weights': {k: round(v, 6) for k, v in greedy_weights.items()} if greedy_weights else None,
'added': sorted(added), 'added': sorted(added),
'removed': sorted(removed), 'removed': sorted(removed),
'factors': {k: round(v, 6) for k, v in factors.items()}, 'factors': {k: round(v, 6) for k, v in factors.items()},

View File

@@ -0,0 +1,399 @@
"""
ETF跟踪误差全量计算
- 覆盖轮动策略标的池全部10个标的
- 数据源分层:
- A股指数 → Tushare index_daily
- 商品 → Tushare fut_daily主力合约
- 海外指数 → Flask API (yfinance)
- 与天天基金数据对比校验
"""
import os
import sys
import time
import json
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / '.env')
import tushare as ts
from datasource.flask_api_source import FlaskAPIDataSource
# ============================================================
# 轮动策略标的池全部10个标的
# ============================================================
POOL_CONFIG = {
# --- A股指数Tushare index_daily---
'399006.SZ': {
'name': '创业板指', 'current_etf': '159915.SZ', 'group': 'A',
'benchmark_type': 'tushare_index', 'benchmark_code': '399006.SZ',
},
'H30269.CSI': {
'name': '红利低波', 'current_etf': '512890.SH', 'group': 'A',
'benchmark_type': 'tushare_index', 'benchmark_code': 'H30269.CSI',
},
# --- 商品Tushare fut_daily 主力合约)---
'GC=F': {
'name': '黄金', 'current_etf': '518880.SH', 'group': 'COMMODITY',
'benchmark_type': 'tushare_futures', 'benchmark_code': 'AU.SHF',
},
'HG=F': {
'name': '有色金属', 'current_etf': '159980.SZ', 'group': 'COMMODITY',
'benchmark_type': 'tushare_futures', 'benchmark_code': 'CU.SHF',
},
# --- 海外指数Flask API / yfinance---
'HSI': {
'name': '恒生指数', 'current_etf': '159920.SZ', 'group': 'HK',
'benchmark_type': 'flask_api', 'benchmark_code': '^HSI',
},
'HSTECH.HK': {
'name': '恒生科技', 'current_etf': '513130.SH', 'group': 'HK',
'benchmark_type': 'flask_api', 'benchmark_code': 'HSTECH.HK',
},
'NDX': {
'name': '纳指100', 'current_etf': '513100.SH', 'group': 'US',
'benchmark_type': 'flask_api', 'benchmark_code': '^NDX',
},
'N225': {
'name': '日经225', 'current_etf': '513520.SH', 'group': 'JP',
'benchmark_type': 'flask_api', 'benchmark_code': '^N225',
},
'GDAXI': {
'name': '德国DAX', 'current_etf': '513030.SH', 'group': 'EU',
'benchmark_type': 'flask_api', 'benchmark_code': '^GDAXI',
},
# --- 原油用最早ETF做基准无可靠数据源---
'CL=F': {
'name': '原油', 'current_etf': '160723.SZ', 'group': 'COMMODITY',
'benchmark_type': 'earliest_etf', 'benchmark_code': '159518.SZ',
},
}
# ============================================================
# 数据获取函数
# ============================================================
def get_etf_nav_tushare(pro, etf_code, start_date, end_date):
"""获取ETF累计净值Tushare fund_nav"""
try:
df = pro.fund_nav(
ts_code=etf_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['nav_date'])
df = df.set_index('date').sort_index()
return df['accum_nav'].astype(float)
except Exception as e:
pass
return None
def get_benchmark_tushare_index(pro, index_code, start_date, end_date):
"""获取A股指数收盘价Tushare index_daily"""
try:
df = pro.index_daily(
ts_code=index_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['trade_date'])
df = df.set_index('date').sort_index()
return df['close'].astype(float)
except Exception as e:
pass
return None
def get_benchmark_tushare_futures(pro, fut_code, start_date, end_date):
"""获取期货主力合约收盘价Tushare fut_daily"""
try:
df = pro.fut_daily(
ts_code=fut_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['trade_date'])
df = df.set_index('date').sort_index()
return df['close'].astype(float)
except Exception as e:
pass
return None
def get_benchmark_flask_api(flask_source, yf_code, start_date, end_date):
"""获取海外指数数据Flask API / yfinance"""
try:
df = flask_source.fetch(yf_code, start_date, end_date)
if df is not None and len(df) > 0:
return df['close'].astype(float)
except Exception as e:
pass
return None
def get_etf_close_tushare(pro, etf_code, start_date, end_date):
"""获取ETF收盘价用于原油等无基准数据的情况"""
try:
df = pro.fund_daily(
ts_code=etf_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['trade_date'])
df = df.set_index('date').sort_index()
return df['close'].astype(float)
except Exception as e:
pass
return None
# ============================================================
# 跟踪误差计算
# ============================================================
def calculate_tracking_error(etf_nav, benchmark_close):
"""
计算跟踪误差
公式STDEV(每日偏离度) × √252
每日偏离度 = ETF净值收益率 - 基准收益率
"""
if etf_nav is None or benchmark_close is None:
return None
etf_ret = etf_nav.pct_change().dropna()
bench_ret = benchmark_close.pct_change().dropna()
common = etf_ret.index.intersection(bench_ret.index)
if len(common) < 20:
return None
e = etf_ret.loc[common]
b = bench_ret.loc[common]
daily_deviation = e - b
tracking_error = daily_deviation.std() * np.sqrt(252)
correlation = e.corr(b)
r_squared = correlation ** 2
etf_cum = (1 + e).prod() - 1
bench_cum = (1 + b).prod() - 1
excess = etf_cum - bench_cum
return {
'annual_tracking_error': round(tracking_error * 100, 4),
'correlation': round(correlation, 6),
'r_squared': round(r_squared, 6),
'etf_cum_return': round(etf_cum * 100, 2),
'benchmark_cum_return': round(bench_cum * 100, 2),
'excess_return': round(excess * 100, 2),
'common_days': len(common),
}
# ============================================================
# 主流程
# ============================================================
def main():
print("=" * 80)
print("ETF跟踪误差全量计算10个标的")
print(f"分析日期: {datetime.now().strftime('%Y-%m-%d')}")
print("=" * 80)
# 初始化数据源
pro = ts.pro_api(os.getenv('TUSHARE_TOKEN'))
flask_source = FlaskAPIDataSource()
# 分析区间最近1年
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
print(f"计算区间: {start_date} ~ {end_date}")
# 加载天天基金数据(用于校验 + 获取ETF列表
eastmoney_path = PROJECT_ROOT / 'rotation' / 'results' / 'etf_competitor_analysis.json'
eastmoney_data = {}
if eastmoney_path.exists():
with open(eastmoney_path, 'r', encoding='utf-8') as f:
eastmoney_data = json.load(f)
print(f"已加载天天基金数据: {len(eastmoney_data)} 个标的")
# 按基准类型分组获取(减少重复请求)
benchmark_cache = {} # benchmark_key -> Series
results = {}
for key, info in POOL_CONFIG.items():
index_name = info['name']
current_etf = info['current_etf']
btype = info['benchmark_type']
bcode = info['benchmark_code']
print(f"\n{'='*60}")
print(f"=== {index_name} ({key}) | 基准类型: {btype} ===")
print(f"{'='*60}")
# Step 1: 获取基准数据(带缓存)
bench_key = f"{btype}:{bcode}"
if bench_key not in benchmark_cache:
print(f" 获取基准数据: {bcode} ({btype})")
if btype == 'tushare_index':
benchmark = get_benchmark_tushare_index(pro, bcode, start_date, end_date)
elif btype == 'tushare_futures':
benchmark = get_benchmark_tushare_futures(pro, bcode, start_date, end_date)
elif btype == 'flask_api':
benchmark = get_benchmark_flask_api(flask_source, bcode, start_date, end_date)
elif btype == 'earliest_etf':
benchmark = get_etf_close_tushare(pro, bcode, start_date, end_date)
else:
benchmark = None
if benchmark is not None:
benchmark_cache[bench_key] = benchmark
print(f" ✓ 基准数据: {len(benchmark)}")
else:
print(f" ✗ 基准数据获取失败")
benchmark_cache[bench_key] = None
else:
benchmark = benchmark_cache[bench_key]
print(f" (缓存) 基准数据: {len(benchmark)}")
if benchmark is None:
print(f" 跳过(无基准数据)")
continue
# Step 2: 获取该标的下所有ETF
etf_list = []
if key in eastmoney_data:
for etf in eastmoney_data[key]['etfs']:
etf_list.append({
'code': etf['ts_code'],
'name': etf['name'],
'eastmoney_te': etf.get('annual_tracking_error', 'N/A'),
})
print(f"{len(etf_list)} 只ETF需要计算")
# Step 3: 逐只计算跟踪误差
etf_results = []
for etf_info in etf_list:
etf_code = etf_info['code']
etf_name = etf_info['name']
# 获取ETF NAV或收盘价
if btype == 'earliest_etf':
# 原油:用收盘价对比收盘价
etf_data = get_etf_close_tushare(pro, etf_code, start_date, end_date)
else:
etf_data = get_etf_nav_tushare(pro, etf_code, start_date, end_date)
if etf_data is None or len(etf_data) < 20:
continue
tracking = calculate_tracking_error(etf_data, benchmark)
if tracking is None:
continue
result = {
'ts_code': etf_code,
'name': etf_name,
'tushare_te': tracking['annual_tracking_error'],
'tushare_r2': tracking['r_squared'],
'tushare_correlation': tracking['correlation'],
'tushare_excess_return': tracking['excess_return'],
'tushare_common_days': tracking['common_days'],
'eastmoney_te': etf_info['eastmoney_te'],
'is_current': etf_code == current_etf,
}
etf_results.append(result)
time.sleep(0.05)
# 按跟踪误差排序
etf_results.sort(key=lambda x: x['tushare_te'])
results[key] = {
'index_name': index_name,
'current_etf': current_etf,
'benchmark_type': btype,
'benchmark_code': bcode,
'group': info['group'],
'etf_count': len(etf_results),
'etfs': etf_results,
}
# 打印结果
print(f"\n 计算完成: {len(etf_results)} 只ETF")
print(f" {'代码':<12} {'名称':<20} {'TE':<10} {'天天基金TE':<12} {'':<8}")
print(f" {'-'*70}")
for etf in etf_results[:10]:
te_str = f"{etf['tushare_te']:.4f}%"
em_te = etf['eastmoney_te']
marker = "" if etf['is_current'] else ""
print(f" {etf['ts_code']:<12} {etf['name'][:20]:<20} {te_str:<10} {em_te:<12} {etf['tushare_r2']:<8}{marker}")
if len(etf_results) > 10:
print(f" ... 还有 {len(etf_results) - 10}")
# ============================================================
# 保存结果
# ============================================================
output_dir = PROJECT_ROOT / 'rotation' / 'results'
output_dir.mkdir(exist_ok=True)
output_path = output_dir / 'tracking_error_full.json'
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2, default=str)
print(f"\n{'='*80}")
print(f"结果已保存: {output_path}")
print(f"{'='*80}")
# ============================================================
# 汇总校验
# ============================================================
print(f"\n{'='*80}")
print("全量校验汇总")
print(f"{'='*80}")
for key, data in results.items():
matched = [e for e in data['etfs']
if e['eastmoney_te'] and e['eastmoney_te'] not in ['N/A', '--']]
print(f"\n--- {data['index_name']} ({data['benchmark_type']}) ---")
print(f" ETF总数: {data['etf_count']} | 天天基金有数据: {len(matched)}")
if matched:
diffs = []
for etf in matched:
try:
em_te = float(etf['eastmoney_te'].replace('%', ''))
diffs.append(etf['tushare_te'] - em_te)
except:
pass
if diffs:
print(f" 平均差异: {np.mean(diffs):+.4f}% | 最大差异: {max(diffs, key=abs):+.4f}%")
# 打印前3名
top3 = data['etfs'][:3]
print(f" Top3 (TE最低):")
for i, etf in enumerate(top3, 1):
marker = " ★当前" if etf['is_current'] else ""
print(f" {i}. {etf['ts_code']} {etf['name']} TE={etf['tushare_te']:.4f}%{marker}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,289 @@
"""
ETF跟踪误差计算与校验
- 使用Tushare数据计算ETF跟踪误差基于NAV
- 与天天基金数据对比校验
"""
import os
import sys
import time
import json
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / '.env')
import tushare as ts
# 轮动策略标的池
POOL_INDEX_MAP = {
'399006.SZ': {
'name': '创业板指', 'current_etf': '159915.SZ', 'group': 'A',
'index_code': '399006.SZ',
},
'H30269.CSI': {
'name': '红利低波', 'current_etf': '512890.SH', 'group': 'A',
'index_code': 'H30269.CSI',
},
}
def get_etf_nav_data(pro, etf_code, start_date, end_date):
"""
获取ETF净值数据使用fund_nav接口
注意ETF应使用accum_nav累计净值而非unit_nav单位净值
"""
try:
df = pro.fund_nav(
ts_code=etf_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['nav_date'])
df = df.set_index('date').sort_index()
# 使用累计净值
return df['accum_nav'].astype(float)
except Exception as e:
print(f" 获取 {etf_code} NAV失败: {e}")
return None
def get_index_data(pro, index_code, start_date, end_date):
"""获取指数日线数据"""
try:
df = pro.index_daily(
ts_code=index_code,
start_date=start_date.replace('-', ''),
end_date=end_date.replace('-', '')
)
if df is not None and len(df) > 0:
df['date'] = pd.to_datetime(df['trade_date'])
df = df.set_index('date').sort_index()
return df['close'].astype(float)
except Exception as e:
print(f" 获取指数 {index_code} 失败: {e}")
return None
def calculate_tracking_error(etf_nav, index_close):
"""
计算跟踪误差
公式STDEV(每日偏离度) × √252
每日偏离度 = ETF净值收益率 - 指数收益率
"""
if etf_nav is None or index_close is None:
return None
# 计算收益率
etf_ret = etf_nav.pct_change().dropna()
idx_ret = index_close.pct_change().dropna()
# 对齐日期
common = etf_ret.index.intersection(idx_ret.index)
if len(common) < 20:
return None
e = etf_ret.loc[common]
i = idx_ret.loc[common]
# 每日偏离度
daily_deviation = e - i
# 跟踪误差 = 标准差 × √252
tracking_error = daily_deviation.std() * np.sqrt(252)
# 其他指标
correlation = e.corr(i)
r_squared = correlation ** 2
# 累计收益
etf_cum = (1 + e).prod() - 1
idx_cum = (1 + i).prod() - 1
excess = etf_cum - idx_cum
return {
'annual_tracking_error': round(tracking_error * 100, 4), # %
'correlation': round(correlation, 6),
'r_squared': round(r_squared, 6),
'etf_cum_return': round(etf_cum * 100, 2), # %
'index_cum_return': round(idx_cum * 100, 2), # %
'excess_return': round(excess * 100, 2), # %
'common_days': len(common),
}
def main():
print("=" * 80)
print("ETF跟踪误差计算与校验")
print(f"分析日期: {datetime.now().strftime('%Y-%m-%d')}")
print("=" * 80)
# 初始化
pro = ts.pro_api(os.getenv('TUSHARE_TOKEN'))
# 分析时间范围最近1年
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
print(f"计算区间: {start_date} ~ {end_date}")
# 加载天天基金数据
eastmoney_path = PROJECT_ROOT / 'rotation' / 'results' / 'etf_competitor_analysis.json'
eastmoney_data = {}
if eastmoney_path.exists():
with open(eastmoney_path, 'r', encoding='utf-8') as f:
eastmoney_data = json.load(f)
print(f"已加载天天基金数据: {len(eastmoney_data)} 个指数")
# 对每个指数计算跟踪误差
print(f"\n开始计算跟踪误差...")
results = {}
for key, info in POOL_INDEX_MAP.items():
index_name = info['name']
index_code = info['index_code']
current_etf = info['current_etf']
print(f"\n{'='*60}")
print(f"=== {index_name} ({key}) ===")
print(f"{'='*60}")
# 获取指数数据
print(f" 获取指数数据: {index_code}")
index_data = get_index_data(pro, index_code, start_date, end_date)
if index_data is None:
print(f" ✗ 指数数据获取失败")
continue
print(f" ✓ 指数数据: {len(index_data)}")
# 获取该指数下所有ETF的NAV
etf_list = []
if key in eastmoney_data:
for etf in eastmoney_data[key]['etfs']:
etf_list.append({
'code': etf['ts_code'],
'name': etf['name'],
'eastmoney_te': etf.get('annual_tracking_error', 'N/A'),
})
print(f"{len(etf_list)} 只ETF需要计算")
etf_results = []
for etf_info in etf_list:
etf_code = etf_info['code']
etf_name = etf_info['name']
# 获取ETF NAV
etf_nav = get_etf_nav_data(pro, etf_code, start_date, end_date)
if etf_nav is None or len(etf_nav) < 20:
continue
# 计算跟踪误差
tracking = calculate_tracking_error(etf_nav, index_data)
if tracking is None:
continue
result = {
'ts_code': etf_code,
'name': etf_name,
'tushare_te': tracking['annual_tracking_error'],
'tushare_r2': tracking['r_squared'],
'tushare_correlation': tracking['correlation'],
'tushare_excess_return': tracking['excess_return'],
'tushare_common_days': tracking['common_days'],
'eastmoney_te': etf_info['eastmoney_te'],
'is_current': etf_code == current_etf,
}
etf_results.append(result)
time.sleep(0.1)
# 按跟踪误差排序
etf_results.sort(key=lambda x: x['tushare_te'])
results[key] = {
'index_name': index_name,
'index_code': index_code,
'current_etf': current_etf,
'etf_count': len(etf_results),
'etfs': etf_results,
}
# 打印结果
print(f"\n 计算完成: {len(etf_results)} 只ETF")
print(f" {'代码':<12} {'名称':<20} {'Tushare TE':<12} {'天天基金 TE':<12} {'差异':<10} {'':<8}")
print(f" {'-'*80}")
for etf in etf_results[:10]:
tushare_te = f"{etf['tushare_te']:.4f}%"
eastmoney_te = etf['eastmoney_te']
# 计算差异
diff = 'N/A'
if eastmoney_te and eastmoney_te != 'N/A' and eastmoney_te != '--':
try:
em_te = float(eastmoney_te.replace('%', ''))
diff_val = etf['tushare_te'] - em_te
diff = f"{diff_val:+.4f}%"
except:
pass
marker = "" if etf['is_current'] else ""
print(f" {etf['ts_code']:<12} {etf['name'][:20]:<20} {tushare_te:<12} {eastmoney_te:<12} {diff:<10} {etf['tushare_r2']:<8}{marker}")
if len(etf_results) > 10:
print(f" ... 还有 {len(etf_results) - 10}")
# 保存结果
output_dir = PROJECT_ROOT / 'rotation' / 'results'
output_dir.mkdir(exist_ok=True)
output_path = output_dir / 'tracking_error_validation.json'
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2, default=str)
print(f"\n{'='*80}")
print(f"结果已保存: {output_path}")
print(f"{'='*80}")
# 汇总统计
print(f"\n{'='*80}")
print("校验汇总")
print(f"{'='*80}")
for key, data in results.items():
print(f"\n--- {data['index_name']} ---")
print(f" 指数代码: {data['index_code']}")
print(f" 计算ETF数: {data['etf_count']}")
# 统计有天天基金数据的ETF
matched = [e for e in data['etfs'] if e['eastmoney_te'] and e['eastmoney_te'] not in ['N/A', '--']]
print(f" 天天基金有数据: {len(matched)}")
if matched:
# 计算平均差异
diffs = []
for etf in matched:
try:
em_te = float(etf['eastmoney_te'].replace('%', ''))
diff = etf['tushare_te'] - em_te
diffs.append(diff)
except:
pass
if diffs:
avg_diff = np.mean(diffs)
max_diff = max(diffs, key=abs)
print(f" 平均差异: {avg_diff:+.4f}%")
print(f" 最大差异: {max_diff:+.4f}%")
if __name__ == '__main__':
main()