Previously hardcoded equal weight (1/select_num), ignoring config weight type. Now reads position_weights from last daily_record, correctly showing rank-based weights.
176 lines
6.1 KiB
Python
176 lines
6.1 KiB
Python
"""分析短债(931862.CSI)和CL=F的R²分布"""
|
|
import os, sys
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from rotation.simple_rotation import SimpleRotationStrategy
|
|
|
|
if 'FLASK_API_URL' not in os.environ:
|
|
os.environ['FLASK_API_URL'] = 'https://k3s.tokenpluse.xyz'
|
|
|
|
strategy = SimpleRotationStrategy()
|
|
strategy._preload_data()
|
|
|
|
n_days = strategy.config.factor.n_days # 25
|
|
|
|
def compute_r2(prices):
|
|
prices = np.clip(prices, 0.01, None)
|
|
y = prices / prices[0]
|
|
x = np.arange(len(y))
|
|
slope, intercept = np.polyfit(x, y, 1)
|
|
y_pred = slope * x + intercept
|
|
ss_res = np.sum((y - y_pred) ** 2)
|
|
ss_tot = np.sum((y - np.mean(y)) ** 2)
|
|
r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
|
|
return r2, slope
|
|
|
|
# 分析所有 signal_codes
|
|
print("=" * 80)
|
|
print(f" 各资产 R² 分布统计 (2020-01-10 ~ 2026-06-05, 窗口={n_days}天)")
|
|
print("=" * 80)
|
|
|
|
all_r2_data = {}
|
|
|
|
for code in strategy.signal_codes:
|
|
if code not in strategy.index_data:
|
|
continue
|
|
df = strategy.index_data[code]
|
|
r2_list = []
|
|
slope_list = []
|
|
for i in range(n_days, len(df)):
|
|
prices = df['close'].values[i-n_days:i]
|
|
r2, slope = compute_r2(prices)
|
|
r2_list.append(r2)
|
|
slope_list.append(slope)
|
|
|
|
r2_arr = np.array(r2_list)
|
|
slope_arr = np.array(slope_list)
|
|
all_r2_data[code] = (r2_arr, slope_arr)
|
|
|
|
# 区分正slope和负slope时的R²
|
|
pos_mask = slope_arr > 0
|
|
neg_mask = slope_arr <= 0
|
|
|
|
print(f"\n {code} (n={len(r2_arr)})")
|
|
print(f" 全部 R²: mean={r2_arr.mean():.4f} median={np.median(r2_arr):.4f} "
|
|
f"p10={np.percentile(r2_arr, 10):.4f} p25={np.percentile(r2_arr, 25):.4f} "
|
|
f"p75={np.percentile(r2_arr, 75):.4f} p90={np.percentile(r2_arr, 90):.4f}")
|
|
print(f" slope>0: mean={r2_arr[pos_mask].mean():.4f} "
|
|
f"p10={np.percentile(r2_arr[pos_mask], 10):.4f} "
|
|
f"p25={np.percentile(r2_arr[pos_mask], 25):.4f} "
|
|
f"count={pos_mask.sum()}")
|
|
print(f" slope<=0: mean={r2_arr[neg_mask].mean():.4f} "
|
|
f"p10={np.percentile(r2_arr[neg_mask], 10):.4f} "
|
|
f"p25={np.percentile(r2_arr[neg_mask], 25):.4f} "
|
|
f"count={neg_mask.sum()}")
|
|
|
|
# 重点对比:短债 vs CL=F
|
|
print(f"\n{'=' * 80}")
|
|
print(f" 重点对比: 931862.CSI (短债) vs CL=F (原油)")
|
|
print(f"{'=' * 80}")
|
|
|
|
for code in ['931862.CSI', 'CL=F']:
|
|
r2_arr, slope_arr = all_r2_data[code]
|
|
pos_mask = slope_arr > 0
|
|
r2_pos = r2_arr[pos_mask]
|
|
|
|
print(f"\n {code} (正趋势 R², n={len(r2_pos)})")
|
|
# 分位数
|
|
for p in [1, 5, 10, 15, 20, 25, 30, 50, 75, 90, 95, 99]:
|
|
val = np.percentile(r2_pos, p)
|
|
print(f" p{p:>2}: R²={val:.4f}")
|
|
|
|
# CL=F 入选时的 R² 统计
|
|
print(f"\n{'=' * 80}")
|
|
print(f" CL=F 入选持仓时的 R² (从 detail JSON 中提取)")
|
|
print(f"{'=' * 80}")
|
|
|
|
import json
|
|
detail_path = PROJECT_ROOT / "rotation" / "results" / "simple_rotation_detail.json"
|
|
with open(detail_path) as f:
|
|
detail = json.load(f)
|
|
|
|
clf_held_r2 = []
|
|
for day in detail['days']:
|
|
assets = day.get('assets', {})
|
|
clf = assets.get('CL=F', {})
|
|
if clf.get('is_held'):
|
|
date = day['date']
|
|
mom = clf.get('momentum', 0)
|
|
rank = clf.get('rank')
|
|
# 需要重新算这个日期的R² (用T-1数据)
|
|
clf_held_r2.append((date, mom, rank))
|
|
|
|
# 取前20个和后20个持仓日
|
|
print(f" CL=F 总持仓天数: {len(clf_held_r2)}")
|
|
print(f"\n 入场日(added)的动量和R²:")
|
|
|
|
entry_days = []
|
|
for day in detail['days']:
|
|
if 'CL=F' in day.get('added', []):
|
|
date = day['date']
|
|
clf = day['assets']['CL=F']
|
|
entry_days.append((date, clf['momentum'], clf.get('rank')))
|
|
# 计算信号日(T-1)的R²
|
|
sig_date = pd.Timestamp(date) - pd.Timedelta(days=1)
|
|
df = strategy.index_data['CL=F']
|
|
mask = df.index <= sig_date
|
|
recent = df.loc[mask]
|
|
if len(recent) >= n_days:
|
|
prices = recent['close'].values[-n_days:]
|
|
r2, slope = compute_r2(prices)
|
|
print(f" {date}: momentum={clf['momentum']:>10.4f} R²={r2:.4f} slope={slope:.5f} "
|
|
f"entry_price={clf.get('entry_price_idx', '?')}")
|
|
|
|
print(f"\n 离场日(removed)的前一天动量和R²:")
|
|
exit_days = []
|
|
for day in detail['days']:
|
|
if 'CL=F' in day.get('removed', []):
|
|
date = day['date']
|
|
# 信号日是T-1
|
|
sig_date = pd.Timestamp(date) - pd.Timedelta(days=1)
|
|
df = strategy.index_data['CL=F']
|
|
mask = df.index <= sig_date
|
|
recent = df.loc[mask]
|
|
if len(recent) >= n_days:
|
|
prices = recent['close'].values[-n_days:]
|
|
r2, slope = compute_r2(prices)
|
|
else:
|
|
r2, slope = 0, 0
|
|
# 找前一天的momentum
|
|
clf = day['assets'].get('CL=F', {})
|
|
print(f" {date}: 信号日R²={r2:.4f} slope={slope:.5f} "
|
|
f"momentum(当日)={clf.get('momentum', '?')} "
|
|
f"price={clf.get('index_close', '?')}")
|
|
|
|
# 不同R²阈值下的回测影响估算
|
|
print(f"\n{'=' * 80}")
|
|
print(f" R² 阈值对 CL=F 信号的过滤效果")
|
|
print(f"{'=' * 80}")
|
|
|
|
for threshold in [0.05, 0.10, 0.15, 0.20, 0.25, 0.30]:
|
|
filtered_entries = []
|
|
for day in detail['days']:
|
|
if 'CL=F' in day.get('added', []):
|
|
date = day['date']
|
|
sig_date = pd.Timestamp(date) - pd.Timedelta(days=1)
|
|
df = strategy.index_data['CL=F']
|
|
mask = df.index <= sig_date
|
|
recent = df.loc[mask]
|
|
if len(recent) >= n_days:
|
|
prices = recent['close'].values[-n_days:]
|
|
r2, slope = compute_r2(prices)
|
|
if r2 < threshold:
|
|
clf = day['assets']['CL=F']
|
|
# 查这个入场最终盈亏
|
|
entry_price = clf.get('entry_price_etf')
|
|
filtered_entries.append((date, r2, slope, clf['momentum']))
|
|
|
|
print(f" R²<{threshold:.2f} 过滤掉的入场: {len(filtered_entries)} 次")
|
|
for date, r2, slope, mom in filtered_entries:
|
|
print(f" {date}: R²={r2:.4f} slope={slope:.5f} momentum={mom:.4f}")
|