experiment(rotation): 同大类扩充与纳指vs标普替换对比实验

技术修复:
- SOCKS5代理IPv6问题:socks5:// → socks5h:// (hybrid_source.py, yfinance_source.py)

目录整理:
- scripts/ → 仅保留策略入口(daily_scheduler, run_rotation, run_cci_screener)
- 实验脚本移至 tests/experiments/
- 工具脚本移至 tests/utils/
- 实验记录新增 docs/experiments/
- results/ 添加到 gitignore

实验结果:

实验001 - 同大类扩充(添加标普500):
├─ 累计收益: 1467.35% → 1176.26% (-291%)
├─ CAGR: 48.10% → 43.82% (-4.28%)
├─ 调仓次数: 459 → 501 (+42次)
└─ 结论: 添加同大类标的不增加跨类分散,反而侵蚀收益

实验002 - 纳指vs标普替换对比:
├─ 累计收益: 1467.35% → 1118.77% (-348%)
├─ CAGR: 48.10% → 42.87% (-5.22%)
├─ Sharpe: 2.21 → 2.08 (-0.13)
├─ MaxDD: -17.33% → -15.14% (+2.18%)
└─ 结论: 纳指100优于标普500,成长风格更适合动量策略

策略建议:
- 保持纳指100作为美股大类代表
- 不添加同大类新标的(避免类内切换成本)
- 新增标的应优先考虑新大类(增加跨类分散)
This commit is contained in:
2026-05-06 20:43:38 +08:00
parent a4e8a6050e
commit 6b59855c28
20 changed files with 1086 additions and 2 deletions

View File

@@ -0,0 +1,2 @@
# 实验脚本目录
# 存放策略研究相关的A/B测试、对比实验等脚本

View File

@@ -0,0 +1,182 @@
"""
策略迭代 A/B 对比实验脚本
量化三个维度的改进贡献度:
1. 标的池: 原始全市场池 vs. 精选11只核心池
2. 评分公式: 简单斜率(slope_r2) vs. 年化收益率*R2 (weighted_momentum)
3. 观察窗口: 固定25日窗口 vs. 动态ATR窗口 (20-60天)
"""
import sys
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
# 添加项目根目录
sys.path.insert(0, str(Path(__file__).parent.parent))
from strategies.rotation.engine import RotationStrategy
import matplotlib.pyplot as plt
# ==================== 标的池定义 ====================
ORIGINAL_POOL = {
"000300.SH": {"name": "沪深300", "market": "A", "etf": "510300.SH"},
"000905.SH": {"name": "中证500", "market": "A", "etf": "510500.SH"},
"000852.SH": {"name": "中证1000", "market": "A", "etf": "512100.SH"},
"399006.SZ": {"name": "创业板指", "market": "A", "etf": "159915.SZ"},
"000015.SH": {"name": "上证红利", "market": "A", "etf": "510880.SH"},
"399986.SZ": {"name": "中证银行", "market": "A", "etf": "516310.SH"},
"399997.SZ": {"name": "中证白酒", "market": "A", "etf": "512690.SH"},
"399989.SZ": {"name": "中证医疗", "market": "A", "etf": "512170.SH"},
"399395.SZ": {"name": "国证有色", "market": "COMMODITY", "etf": "159880.SZ"},
"399998.SZ": {"name": "中证煤炭", "market": "A", "etf": "515220.SH"},
"399967.SZ": {"name": "中证军工", "market": "A", "etf": "512660.SH"},
"HSTECH.HK": {"name": "恒生科技", "market": "HK", "etf": "513180.SH"},
"NDX": {"name": "纳指100", "market": "US", "etf": "513100.SH"},
"AU.SHF": {"name": "黄金", "market": "COMMODITY", "etf": "518880.SH"}
}
FINAL_POOL = {
"399006.SZ": {"name": "创业板指", "market": "A", "etf": "159915.SZ"},
"H30269.CSI": {"name": "中证红利低波", "market": "A", "etf": "512890.SH"},
"000015.SH": {"name": "上证红利", "market": "A", "etf": "510880.SH"},
"NDX": {"name": "纳指100", "market": "US", "etf": "513100.SH"},
"N225": {"name": "日经225", "market": "JP", "etf": "513520.SH"},
"GDAXI": {"name": "德国DAX", "market": "EU", "etf": "513030.SH"},
"HSI": {"name": "恒生指数", "market": "HK", "etf": "159920.SZ"},
"HSTECH.HK": {"name": "恒生科技", "market": "HK", "etf": "513130.SH"},
"AU.SHF": {"name": "黄金", "market": "COMMODITY", "etf": "518880.SH"},
"CL.NYM": {"name": "原油", "market": "COMMODITY", "etf": "160723.SZ"},
"931862.CSI": {"name": "30年国债", "market": "BOND", "etf": "511090.SH"}
}
# ==================== 实验配置 ====================
ITERATIONS = [
{
"label": "1. 原始基准 (原始池+简单评分+固定窗口)",
"config": {
"code_list": ORIGINAL_POOL,
"factor_type": "slope_r2",
"auto_day": False,
"n_days": 25,
"diversified": False
}
},
{
"label": "2. 标的池优化 (精选池+简单评分+固定窗口)",
"config": {
"code_list": FINAL_POOL,
"factor_type": "slope_r2",
"auto_day": False,
"n_days": 25,
"diversified": True # 开启跨大类分散
}
},
{
"label": "3. 评分公式优化 (精选池+加权评分+固定窗口)",
"config": {
"code_list": FINAL_POOL,
"factor_type": "weighted_momentum",
"auto_day": False,
"n_days": 25,
"diversified": True
}
},
{
"label": "4. 终极版本 (精选池+加权评分+动态窗口)",
"config": {
"code_list": FINAL_POOL,
"factor_type": "weighted_momentum",
"auto_day": True,
"n_days": 25, # 提供默认窗口作为 fallback
"min_days": 20,
"max_days": 60,
"diversified": True
}
}
]
COMMON_CONFIG = {
"start_date": "2019-01-01",
"end_date": datetime.now().strftime('%Y-%m-%d'),
"select_num": 3,
"rebalance_days": 1,
"rebalance_threshold": 0.0,
"trade_cost": 0.001,
"premium_control": {"enabled": True, "default_threshold": 0.10},
"use_cache": True,
"ssh_tunnel": {"enabled": True, "host": "8.218.167.69", "port": 22, "username": "root", "key_path": "hk_ecs.pem", "local_port": 1080}
}
def run_experiment():
results = []
for i, item in enumerate(ITERATIONS):
print(f"\n{'='*80}")
print(f"运行实验 {item['label']}")
print(f"{'='*80}")
cfg = COMMON_CONFIG.copy()
cfg.update(item['config'])
strategy = RotationStrategy(cfg)
try:
res_df = strategy.run()
# 计算指标
nav = res_df['轮动策略净值']
total_ret = nav.iloc[-1] - 1
days = (nav.index[-1] - nav.index[0]).days
cagr = (1 + total_ret)**(365.25/days) - 1
daily_ret = res_df['轮动策略日收益率']
sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0
peak = nav.cummax()
dd = (nav - peak) / peak
max_dd = dd.min()
results.append({
"label": item['label'],
"total_ret": total_ret,
"cagr": cagr,
"max_dd": max_dd,
"sharpe": sharpe,
"nav": nav
})
print(f"完成: CAGR={cagr:.2%}, MaxDD={max_dd:.2%}, Sharpe={sharpe:.2f}")
except Exception as e:
print(f"实验失败: {e}")
import traceback
traceback.print_exc()
# ==================== 汇总报告 ====================
print(f"\n\n{'='*100}")
print(f"{'策略迭代对比报告':^100}")
print(f"{'='*100}")
print(f"{'版本':<40} | {'累计收益':>10} | {'年化(CAGR)':>10} | {'最大回撤':>10} | {'夏普比率':>8} | {'贡献增量':>10}")
print(f"{'-'*100}")
prev_cagr = 0
for i, r in enumerate(results):
delta = f"+{(r['cagr'] - prev_cagr)*100:>.2f}%" if i > 0 else "-"
print(f"{r['label']:<40} | {r['total_ret']:>10.2%} | {r['cagr']:>10.2%} | {r['max_dd']:>10.2%} | {r['sharpe']:>8.2f} | {delta:>10}")
prev_cagr = r['cagr']
print(f"{'='*100}")
# ==================== 绘图 ====================
plt.figure(figsize=(15, 8))
for r in results:
plt.plot(r['nav'].index, r['nav'], label=r['label'], linewidth=1.5)
plt.yscale('log')
plt.title("策略迭代 A/B 对比 - 净值曲线 (对数坐标)", fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
output_path = Path(__file__).parent.parent / "results" / "ab_test_iterations.png"
plt.savefig(output_path)
print(f"\n对比图表已保存至: {output_path}")
if __name__ == "__main__":
run_experiment()

View File

@@ -0,0 +1,187 @@
"""
A/B测试纳指100 vs 标普500 替换对比
对比:
- A组对照组纳指100作为美股大类代表
- B组实验组标普500替换纳指100作为美股大类代表
核心问题:替换后对策略绩效的影响(无类内竞争)
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from strategies.rotation.engine import RotationStrategy
import pandas as pd
import yaml
def create_config_replace_ndx_with_spx(base_config: dict) -> dict:
"""将纳指100替换为标普500"""
config = base_config.copy()
config['code_list'] = base_config['code_list'].copy()
# 移除纳指100
if 'NDX' in config['code_list']:
del config['code_list']['NDX']
# 添加标普500替换纳指100
config['code_list']['SPX'] = {
'name': '标普500',
'etf': '513500.SH',
'market': 'US'
}
return config
def run_backtest(config: dict, label: str) -> dict:
"""运行回测并返回关键指标"""
print(f"\n{'='*60}")
print(f" {label}")
print(f"{'='*60}")
strategy = RotationStrategy(config)
result = strategy.run()
if result is None or len(result) == 0:
return None
# 计算指标
strategy_nav = result['轮动策略净值']
strategy_ret = result['轮动策略日收益率']
total_return = strategy_nav.iloc[-1] - 1
days = len(result)
years = days / 250
cagr = (strategy_nav.iloc[-1] ** (1/years)) - 1
excess_ret = strategy_ret.mean() * 250
vol = strategy_ret.std() * (250 ** 0.5)
sharpe = excess_ret / vol if vol > 0 else 0
rolling_max = strategy_nav.cummax()
drawdown = (strategy_nav - rolling_max) / rolling_max
max_dd = drawdown.min()
calmar = cagr / abs(max_dd) if max_dd < 0 else 0
win_rate = (strategy_ret > 0).sum() / len(strategy_ret)
metrics = {
'label': label,
'美股标的': '纳指100' if 'NDX' in config['code_list'] else '标普500',
'累计收益': total_return,
'CAGR': cagr,
'Sharpe': sharpe,
'MaxDD': max_dd,
'Calmar': calmar,
'日胜率': win_rate,
}
print(f"\n美股代表: {metrics['美股标的']}")
print(f"累计收益: {metrics['累计收益']:.2%}")
print(f"CAGR: {metrics['CAGR']:.2%}")
print(f"Sharpe: {metrics['Sharpe']:.2f}")
print(f"MaxDD: {metrics['MaxDD']:.2%}")
print(f"Calmar: {metrics['Calmar']:.2f}")
print(f"日胜率: {metrics['日胜率']:.2%}")
return metrics
def compare_results(a_metrics: dict, b_metrics: dict):
"""对比两组结果"""
print(f"\n{'='*60}")
print(f" 对比结果")
print(f"{'='*60}")
print(f"\n{'指标':<15} {'A组(纳指100)':<15} {'B组(标普500)':<15} {'差异':<15}")
print("-" * 60)
metrics_keys = ['美股标的', '累计收益', 'CAGR', 'Sharpe', 'MaxDD', 'Calmar', '日胜率']
for key in metrics_keys:
a_val = a_metrics.get(key, 0)
b_val = b_metrics.get(key, 0)
if key == '美股标的':
print(f"{key:<15} {a_val:<15} {b_val:<15} {'替换':<15}")
continue
diff = b_val - a_val
if key in ['累计收益', 'CAGR', 'MaxDD', '日胜率']:
a_str = f"{a_val:.2%}"
b_str = f"{b_val:.2%}"
diff_str = f"{diff*100:+.2f}%"
else:
a_str = f"{a_val:.2f}"
b_str = f"{b_val:.2f}"
diff_str = f"{diff:+.2f}"
print(f"{key:<15} {a_str:<15} {b_str:<15} {diff_str:<15}")
print("-" * 60)
print(f"\n【关键发现】")
print(f"纳指100 → 标普500 替换效果:")
if b_metrics['CAGR'] < a_metrics['CAGR']:
print(f" - CAGR下降 {a_metrics['CAGR'] - b_metrics['CAGR']:.2%}")
print(f" → 标普500动量信号可能不如纳指强")
if b_metrics['MaxDD'] > a_metrics['MaxDD']: # 注意MaxDD是负数
print(f" - MaxDD改善 {b_metrics['MaxDD'] - a_metrics['MaxDD']:.2%}")
print(f" → 标普500更稳定回撤更小")
if b_metrics['Sharpe'] > a_metrics['Sharpe']:
print(f" - Sharpe改善 {b_metrics['Sharpe'] - a_metrics['Sharpe']:.2f}")
print(f" → 标普500风险调整后收益更优")
elif b_metrics['Sharpe'] < a_metrics['Sharpe']:
print(f" - Sharpe下降 {b_metrics['Sharpe'] - a_metrics['Sharpe']:.2f}")
print(f" → 纳指100风险调整后收益更优")
print(f"\n【策略建议】")
if b_metrics['累计收益'] < a_metrics['累计收益'] * 0.9:
print(f" 建议保持纳指100成长风格更适合动量策略")
elif b_metrics['Sharpe'] > a_metrics['Sharpe']:
print(f" 建议考虑标普500更稳定、风险调整收益更优")
else:
print(f" 建议保持纳指100累计收益更高")
def main():
"""主函数"""
# 加载基础配置
config_path = Path(__file__).parent.parent / 'config' / 'strategies' / 'rotation.yaml'
with open(config_path, 'r') as f:
base_config = yaml.safe_load(f)
# 添加 end_date
from datetime import datetime
base_config['end_date'] = datetime.now().strftime('%Y-%m-%d')
print(f"\n{'='*60}")
print(f" A/B测试纳指100 vs 标普500 替换对比")
print(f"{'='*60}")
print(f"\n研究问题:")
print(f" - 将美股大类代表从纳指100替换为标普500")
print(f" - 无类内竞争每大类还是1只")
print(f" - 评估标的特性变化对绩效的影响")
# A组纳指100当前配置
a_metrics = run_backtest(base_config, "A组: 纳指100作为美股代表")
# B组标普500替换纳指100
config_replace = create_config_replace_ndx_with_spx(base_config)
b_metrics = run_backtest(config_replace, "B组: 标普500替换纳指100")
# 对比
if a_metrics and b_metrics:
compare_results(a_metrics, b_metrics)
# 保存结果
results_df = pd.DataFrame([a_metrics, b_metrics])
results_path = Path(__file__).parent.parent / 'results' / 'ab_test_ndx_vs_spx.csv'
results_df.to_csv(results_path, index=False)
print(f"\n对比结果已保存: {results_path}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,183 @@
"""
A/B测试添加标普500对轮动策略的影响
对比:
- A组对照组当前11只标的配置
- B组实验组添加标普500后的12只标的配置
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from strategies.rotation.engine import RotationStrategy
import pandas as pd
def create_config_with_spx(base_config: dict) -> dict:
"""在基础配置上添加标普500"""
config = base_config.copy()
config['code_list'] = base_config['code_list'].copy()
# 添加标普500美股大类内
config['code_list']['SPX'] = {
'name': '标普500',
'etf': '513500.SH',
'market': 'US' # 与纳指100同属美股大类
}
return config
def run_backtest(config: dict, label: str) -> dict:
"""运行回测并返回关键指标"""
print(f"\n{'='*60}")
print(f" {label}")
print(f"{'='*60}")
strategy = RotationStrategy(config)
result = strategy.run() # result 是 DataFrame
if result is None or len(result) == 0:
return None
# 从 DataFrame 中直接计算指标
strategy_nav = result['轮动策略净值']
strategy_ret = result['轮动策略日收益率']
benchmark_nav = result['基准净值']
benchmark_ret = result['基准日收益率']
# 累计收益
total_return = strategy_nav.iloc[-1] - 1
# CAGR (交易日口径)
days = len(result)
years = days / 250
cagr = (strategy_nav.iloc[-1] ** (1/years)) - 1
# Sharpe
excess_ret = strategy_ret.mean() * 250 # 年化收益
vol = strategy_ret.std() * (250 ** 0.5) # 年化波动
sharpe = excess_ret / vol if vol > 0 else 0
# 最大回撤
rolling_max = strategy_nav.cummax()
drawdown = (strategy_nav - rolling_max) / rolling_max
max_dd = drawdown.min()
# Calmar
calmar = cagr / abs(max_dd) if max_dd < 0 else 0
# 日胜率
win_rate = (strategy_ret > 0).sum() / len(strategy_ret)
# 提取关键指标
metrics = {
'label': label,
'标的数': len(config['code_list']),
'累计收益': total_return,
'CAGR': cagr,
'Sharpe': sharpe,
'MaxDD': max_dd,
'Calmar': calmar,
'日胜率': win_rate,
}
print(f"\n标的池: {len(config['code_list'])}")
print(f"累计收益: {metrics['累计收益']:.2%}")
print(f"CAGR: {metrics['CAGR']:.2%}")
print(f"Sharpe: {metrics['Sharpe']:.2f}")
print(f"MaxDD: {metrics['MaxDD']:.2%}")
print(f"Calmar: {metrics['Calmar']:.2f}")
print(f"日胜率: {metrics['日胜率']:.2%}")
return metrics
def compare_results(a_metrics: dict, b_metrics: dict):
"""对比两组结果"""
print(f"\n{'='*60}")
print(f" 对比结果")
print(f"{'='*60}")
print(f"\n{'指标':<12} {'A组(无SPX)':<15} {'B组(有SPX)':<15} {'差异':<15}")
print("-" * 60)
metrics_keys = ['标的数', '累计收益', 'CAGR', 'Sharpe', 'MaxDD', 'Calmar', '日胜率']
for key in metrics_keys:
a_val = a_metrics.get(key, 0)
b_val = b_metrics.get(key, 0)
if key == '标的数':
diff = b_val - a_val
diff_str = f"+{diff}" if diff > 0 else str(diff)
else:
diff = b_val - a_val
if key in ['累计收益', 'CAGR', 'MaxDD', '日胜率']:
diff_str = f"{diff*100:+.2f}%"
else:
diff_str = f"{diff:+.2f}"
if key in ['累计收益', 'CAGR', 'MaxDD', '日胜率']:
a_str = f"{a_val:.2%}"
b_str = f"{b_val:.2%}"
else:
a_str = str(a_val)
b_str = str(b_val)
print(f"{key:<12} {a_str:<15} {b_str:<15} {diff_str:<15}")
print("-" * 60)
# 分析美股大类内部切换情况
print(f"\n【关键发现】")
print(f"添加标普500后")
print(f" - 美股大类从1只→2只纳指100 + 标普500")
print(f" - 类内竞争纳指100 vs 标普500得分高者代表美股大类")
print(f" - 跨类分散不变美股大类还是只输出1只冠军进入Top3")
if b_metrics['累计收益'] != a_metrics['累计收益']:
print(f" - 累计收益变化:{a_metrics['累计收益']:.2%}{b_metrics['累计收益']:.2%}")
def main():
"""主函数"""
import yaml
# 加载基础配置
config_path = Path(__file__).parent.parent / 'config' / 'strategies' / 'rotation.yaml'
with open(config_path, 'r') as f:
base_config = yaml.safe_load(f)
# 添加缺失的 end_date使用今天日期
from datetime import datetime
base_config['end_date'] = datetime.now().strftime('%Y-%m-%d')
print(f"\n{'='*60}")
print(f" A/B测试添加标普500对diversified模式的影响")
print(f"{'='*60}")
print(f"\n测试假设:")
print(f" - diversified=true 模式下每大类只选1只冠军")
print(f" - 添加标普500同属美股大类不会增加跨类分散")
print(f" - 但可能增加类内切换频率和换手率")
# A组当前配置11只无标普500
a_metrics = run_backtest(base_config, "A组: 当前配置11只无标普500")
# B组添加标普500后的配置12只
config_with_spx = create_config_with_spx(base_config)
b_metrics = run_backtest(config_with_spx, "B组: 添加标普50012只")
# 对比结果
if a_metrics and b_metrics:
compare_results(a_metrics, b_metrics)
# 保存对比结果
results_df = pd.DataFrame([a_metrics, b_metrics])
results_path = Path(__file__).parent.parent / 'results' / 'ab_test_spx.csv'
results_df.to_csv(results_path, index=False)
print(f"\n对比结果已保存: {results_path}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,115 @@
"""
分析历史 Top 3 标的中存在负分的情况 (正式版)
"""
import sys
import yaml
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 添加项目根目录
sys.path.insert(0, str(Path(__file__).parent.parent))
from strategies.rotation.engine import RotationStrategy
from core.factors.momentum import compute_factors
def load_config(config_path: str) -> dict:
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def analyze_negative_scores():
config_path = "config/strategies/rotation.yaml"
config = load_config(config_path)
# 强制不使用过滤,以获取完整数据
config['diversified'] = True
config['select_num'] = 3
strategy = RotationStrategy(config)
# 使用策略内部方法获取数据
with strategy.data_source:
index_data, etf_data, etf_nav_data, benchmark_data, valid_codes, index_ohlcv_data = strategy.data_source.fetch_all(
config['code_list'],
config['benchmark']['code'],
config["start_date"],
datetime.now().strftime('%Y-%m-%d')
)
# 手动计算因子 (不带过滤)
# 注意:为了分析原始得分,我们将 compute_factors 内部调用的过滤函数暂时跳过或分析结果
factor_data, valid_codes = compute_factors(
index_data,
valid_codes,
n=config["n_days"],
factor_type=config["factor_type"],
auto_day=config.get("auto_day", False),
index_ohlcv_data=index_ohlcv_data
)
score_cols = [c for c in factor_data.columns if c.startswith("得分_")]
code_config = config['code_list']
total_days = len(factor_data)
results = []
last_top_3 = set()
rebalance_count = 0
for date, row in factor_data.iterrows():
scores = row[score_cols].dropna()
if scores.empty: continue
# 模拟 diversified 逻辑下的 Top 3 (不带 >0 过滤)
cat_best = {}
for col_name, s in scores.items():
code = col_name.replace("得分_", "")
cat = code_config.get(code, {}).get("market", "未知")
if cat not in cat_best or s > cat_best[cat][1]:
cat_best[cat] = (code, s)
sorted_cats = sorted(cat_best.values(), key=lambda x: x[1], reverse=True)
top_3_raw = sorted_cats[:3]
current_top_3_codes = set(code for code, s in top_3_raw)
# 判断是否发生调仓(目标持仓集合发生变化)
if current_top_3_codes != last_top_3:
rebalance_count += 1
# 统计调仓日这 3 只中得分 <= 0 的数量
neg_count = sum(1 for code, s in top_3_raw if s <= 0)
results.append({
"date": date,
"neg_count": neg_count,
"top_1_score": top_3_raw[0][1],
"top_2_score": top_3_raw[1][1] if len(top_3_raw)>1 else np.nan,
"top_3_score": top_3_raw[2][1] if len(top_3_raw)>2 else np.nan,
"top_1_name": code_config.get(top_3_raw[0][0], {}).get('name')
})
last_top_3 = current_top_3_codes
neg_df = pd.DataFrame(results)
print(f"\n{'='*60}")
print(f"调仓日 (Rebalance Day) Top 3 标的出现负分情况分析")
print(f"{'='*60}")
print(f"总调仓次数: {rebalance_count}")
print(f"涉及负分(<=0)的调仓次数: {len(neg_df[neg_df['neg_count']>0])} ({len(neg_df[neg_df['neg_count']>0])/rebalance_count:.1%})")
if not neg_df.empty:
print(f"\n调仓日负分详细分布:")
print(f" - 只有 1 只标的为负: {len(neg_df[neg_df['neg_count']==1])}")
print(f" - 有 2 只标的为负: {len(neg_df[neg_df['neg_count']==2])}")
print(f" - 全部 3 只标的均为负: {len(neg_df[neg_df['neg_count']==3])}")
print(f"\n最近 10 次涉及负分的调仓详情:")
neg_df['date'] = pd.to_datetime(neg_df['date'])
print(neg_df[neg_df['neg_count']>0][['date', 'neg_count', 'top_1_score', 'top_1_name']].tail(10))
if __name__ == "__main__":
analyze_negative_scores()

View File

@@ -0,0 +1,235 @@
"""
全市场44只ETF Top 3 等权轮动回测
标的池来源etf_rotation_deep_analysis.md
"""
import sys
import math
import warnings
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
from 动量 import (
fetch_all_etf_data,
fetch_etf_nav_data,
calc_atr,
calc_weighted_momentum_score,
apply_crash_filter,
calc_premium_rate,
print_performance,
print_yearly_returns,
)
# ==================== 资产配置池 (9个精选 + 恒生科技 + 恒生指数) ====================
FULL_POOL = {
'513100.SH': '纳指100ETF',
'513520.SH': '日经225ETF',
'513030.SH': '德国DAX ETF',
'518880.SH': '黄金ETF',
'159980.SZ': '有色金属ETF',
'160723.SZ': '嘉实原油LOF',
'511090.SH': '30年国债ETF',
'512890.SH': '红利低波ETF',
'159915.SZ': '创业板ETF',
'513130.SH': '恒生科技ETF',
'159920.SZ': '恒生ETF',
}
# ==================== 资产大类映射 ====================
ETF_CATEGORIES = {
'513100.SH': '美股',
'513520.SH': '日本',
'513030.SH': '欧洲',
'518880.SH': '商品',
'159980.SZ': '商品',
'160723.SZ': '商品',
'511090.SH': '固收',
'512890.SH': 'A股主题',
'159915.SZ': 'A股宽基',
'513130.SH': '港股',
'159920.SZ': '港股',
}
CONFIG = {
'etf_pool': FULL_POOL,
'target_num': 3, # 持仓数量
'auto_day': True, # 是否启用动态周期
'fixed_days': 25, # 固定回看天数
'min_days': 20, # 动态周期最小值
'max_days': 60, # 动态周期最大值
'premium_threshold': 5.0, # 溢价率阈值(%)
'trade_cost': 0.001, # 单次交易成本(双边)
'start_date': '2019-01-01',
'benchmark': '000300.SH', # 基准沪深300
}
def run_full_backtest(config: dict):
"""执行全市场回测"""
end_date = datetime.now().strftime('%Y-%m-%d')
etf_pool = config['etf_pool']
etf_codes = list(etf_pool.keys())
print("=" * 60)
print(" 全市场ETF轮动策略 - Top 3 等权回测")
print("=" * 60)
print(f" 候选ETF: {len(etf_codes)}")
print(f" 持仓数量: {config['target_num']}")
print(f" 回测区间: {config['start_date']} ~ {end_date}")
# 1. 获取数据 (使用缓存加速)
from scripts.etf_data_cache import ETFDataCache
data_cache = ETFDataCache()
print(f"\n{'='*60}")
print("加载数据...")
all_data = {}
for code in etf_codes:
df = data_cache.load_cached_ohlcv(code)
if not df.empty:
all_data[code] = df
print(f" 加载完成: {len(all_data)} 只价格数据")
# 2. 构建交易日历
all_dates = set()
for df in all_data.values():
all_dates.update(df.index.tolist())
trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date']))
print(f" 交易日数: {len(trade_dates)}")
# 3. 逐日回测
print(f"\n{'='*60}")
print("开始回测...")
max_lookback = config['max_days'] + 10
holdings = {} # {code: weight}
daily_returns = []
signals = []
for i, today in enumerate(trade_dates):
# 计算得分
scores = {}
for code in etf_codes:
if code not in all_data: continue
df = all_data[code]
hist = df[df.index <= today].tail(max_lookback + 1)
if len(hist) < config['min_days']: continue
close_arr = hist['close'].values
# 动态周期
if config['auto_day'] and len(hist) >= max_lookback:
long_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['max_days']).iloc[-1]
short_atr = calc_atr(hist['high'], hist['low'], hist['close'], config['min_days']).iloc[-1]
if long_atr > 0:
ratio = min(0.9, short_atr / long_atr)
lookback = int(config['min_days'] + (config['max_days'] - config['min_days']) * (1 - ratio))
else:
lookback = config['fixed_days']
else:
lookback = config['fixed_days']
prices = close_arr[-lookback:]
if len(prices) < 5: continue
result = calc_weighted_momentum_score(prices)
score = result['score']
score = apply_crash_filter(close_arr, score)
if 0 < score < 6:
scores[code] = score
# 选出排名最高的 3 只 (跨大类 Top 1 逻辑)
if scores:
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
# 1. 每个大类只保留最高分的那一个
category_best = {} # {category: (code, score)}
for code, score in ranked:
cat = ETF_CATEGORIES.get(code, '未知')
if cat not in category_best:
category_best[cat] = (code, score)
# 2. 对所有大类的 Top 1 进行排序,选前 3 个大类
sorted_categories = sorted(category_best.values(), key=lambda x: x[1], reverse=True)
targets = [code for code, score in sorted_categories[:config['target_num']]]
new_holdings = {c: 1.0/len(targets) for c in targets}
else:
new_holdings = {}
# 计算收益
port_ret = 0.0
for code, weight in holdings.items():
df_h = all_data[code]
if today in df_h.index:
prev_dates = df_h[df_h.index < today].index
if len(prev_dates) > 0:
prev_price = df_h.loc[prev_dates[-1], 'close']
port_ret += weight * (df_h.loc[today, 'close'] / prev_price - 1)
# 调仓成本
old_set, new_set = set(holdings.keys()), set(new_holdings.keys())
trade_cost = 0.0
if old_set != new_set:
turnover = sum(holdings[c] for c in old_set - new_set) + sum(new_holdings[c] for c in new_set - old_set)
trade_cost = turnover * config['trade_cost'] / 2
signals.append({'date': today, 'holdings': list(new_holdings.keys())})
holdings = new_holdings
daily_returns.append({
'date': today,
'daily_return': port_ret - trade_cost,
'holding': ", ".join(holdings.keys()) if holdings else "空仓"
})
# 4. 计算绩效
result_df = pd.DataFrame(daily_returns).set_index('date')
result_df['nav'] = (1 + result_df['daily_return']).cumprod()
# 基准
import os, tushare as ts
pro = ts.pro_api(os.getenv("TUSHARE_TOKEN"))
bench_df = pro.index_daily(ts_code=config['benchmark'], start_date=config['start_date'].replace('-', ''), end_date=end_date.replace('-', ''))
if bench_df is not None and not bench_df.empty:
bench_df['date'] = pd.to_datetime(bench_df['trade_date'])
bench_df = bench_df.set_index('date').sort_index()
result_df['bench_return'] = bench_df['close'].reindex(result_df.index, method='ffill') / bench_df['close'].iloc[0]
else:
result_df['bench_return'] = 1.0
print_performance(result_df, signals, config)
print_yearly_returns(result_df)
# 保存图表
save_chart(result_df)
def save_chart(result_df):
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), height_ratios=[3, 1], gridspec_kw={'hspace': 0.3})
ax1.plot(result_df.index, result_df['nav'], label='全市场Top3等权', color='#2ecc71')
ax1.plot(result_df.index, result_df['bench_return'], label='沪深300', color='#95a5a6')
ax1.set_yscale('log')
ax1.legend()
ax1.grid(True, alpha=0.3)
peak = result_df['nav'].cummax()
ax2.fill_between(result_df.index, (result_df['nav'] - peak) / peak, 0, color='#e74c3c', alpha=0.4)
plt.savefig(Path(__file__).parent.parent / 'results' / 'full_pool_top3_chart.png')
print(f"图表已保存到 results/full_pool_top3_chart.png")
except Exception as e: print(f"图表生成失败: {e}")
if __name__ == "__main__":
run_full_backtest(CONFIG)

View File

@@ -0,0 +1,399 @@
"""
动量策略多持仓对比实验
对比 6 种配置: 全仓1只 / 等权3只 / 反波动率3只 / 等权5只 / 反波动率5只 / 动量>0全选等权
支持 dynamic 模式: 回测中定期重建ETF池消除前视偏差
"""
import sys
import math
import warnings
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
load_dotenv()
# ==================== 复用动量.py的核心函数 ====================
from 动量 import (
fetch_all_etf_data,
fetch_etf_nav_data,
calc_atr,
calc_weighted_momentum_score,
apply_crash_filter,
calc_premium_rate,
resolve_etf_pool,
)
# ==================== 权重计算 ====================
def calc_equal_weights(codes: list) -> dict:
"""等权"""
w = 1.0 / len(codes)
return {c: w for c in codes}
def calc_inv_vol_weights(codes: list, all_data: dict, today, lookback: int = 20) -> dict:
"""反波动率加权: 权重 ∝ 1/σ"""
vols = {}
for c in codes:
if c not in all_data:
continue
df = all_data[c]
hist = df[df.index <= today].tail(lookback + 1)
if len(hist) < 10:
vols[c] = 1.0 # fallback
continue
ret = hist['close'].pct_change().dropna()
vol = ret.std()
vols[c] = vol if vol > 0 else 1e-6
if not vols:
return calc_equal_weights(codes)
inv_vols = {c: 1.0 / v for c, v in vols.items()}
total = sum(inv_vols.values())
return {c: iv / total for c, iv in inv_vols.items()}
# ==================== 多持仓回测引擎 ====================
def run_multi_backtest(config: dict, all_data: dict, nav_data: dict,
trade_dates: list, etf_codes: list,
target_num: int = 1, weight_mode: str = 'equal',
label: str = '',
data_cache=None, rebuild_interval: int = 0) -> dict:
"""
多持仓回测
Args:
target_num: 同时持有数量
weight_mode: 'equal' 等权 | 'inv_vol' 反波动率
label: 实验标签
data_cache: ETFDataCache 实例(动态重建模式)
rebuild_interval: 重建间隔(交易日)0=不重建
Returns:
dict: 绩效指标
"""
max_lookback = config['max_days'] + 10
holdings = {} # {code: weight}
daily_returns = []
n_trades = 0
last_rebuild_i = -rebuild_interval if rebuild_interval > 0 else 0
current_codes = list(etf_codes) # 当前活跃的候选池
for i, today in enumerate(trade_dates):
# 动态重建 ETF 池
if rebuild_interval > 0 and data_cache is not None and (i - last_rebuild_i >= rebuild_interval):
ref_str = today.strftime('%Y%m%d')
try:
new_pool = resolve_etf_pool(config, ref_date=ref_str, data_cache=data_cache)
current_codes = list(new_pool.keys())
# 加载新增 ETF 数据
for code in current_codes:
if code not in all_data:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
last_rebuild_i = i
except Exception:
pass
# 1. 计算每只 ETF 的得分 (使用当前活跃池)
scores = {}
for code in current_codes:
if code not in all_data:
continue
df = all_data[code]
hist = df[df.index <= today].tail(max_lookback + 1)
if len(hist) < config['min_days']:
continue
close_arr = hist['close'].values
if config['auto_day']:
if len(hist) < max_lookback:
lookback = config['fixed_days']
else:
long_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['max_days'])
short_atr = calc_atr(hist['high'], hist['low'], hist['close'],
config['min_days'])
la = long_atr.iloc[-1]
sa = short_atr.iloc[-1]
if la > 0 and not np.isnan(la) and not np.isnan(sa):
ratio = min(0.9, sa / la)
lookback = int(config['min_days'] +
(config['max_days'] - config['min_days']) * (1 - ratio))
else:
lookback = config['fixed_days']
prices = close_arr[-lookback:]
else:
prices = close_arr[-config['fixed_days']:]
if len(prices) < 5:
continue
result = calc_weighted_momentum_score(prices)
score = result['score']
score = apply_crash_filter(close_arr, score)
if code in nav_data:
nav_df = nav_data[code]
nav_row = nav_df[nav_df.index <= today]
if not nav_row.empty:
nav_val = nav_row.iloc[-1]['nav']
etf_price = close_arr[-1]
premium = calc_premium_rate(etf_price, nav_val)
if premium >= config['premium_threshold']:
score -= 1
if 0 < score < 6:
scores[code] = score
# 2. 选出 top N (或全部正动量)
if scores:
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
if target_num == 'all_positive':
targets = [c for c, s in ranked] # scores 已过滤 >0
else:
targets = [c for c, _ in ranked[:target_num]]
else:
targets = []
# 3. 计算权重
if targets:
if weight_mode == 'inv_vol':
new_weights = calc_inv_vol_weights(targets, all_data, today)
else:
new_weights = calc_equal_weights(targets)
else:
new_weights = {}
# 4. 计算当日组合收益
port_ret = 0.0
for code, weight in holdings.items():
if code not in all_data:
continue
df_h = all_data[code]
if today in df_h.index:
prev_dates = df_h[df_h.index < today].index
if len(prev_dates) > 0:
prev_price = df_h.loc[prev_dates[-1], 'close']
today_price = df_h.loc[today, 'close']
port_ret += weight * (today_price / prev_price - 1)
# 5. 调仓判断
old_set = set(holdings.keys())
new_set = set(new_weights.keys())
if old_set != new_set:
# 换手成本: 按换手比例收取
turnover = 0.0
for c in old_set - new_set:
turnover += holdings[c]
for c in new_set - old_set:
turnover += new_weights[c]
for c in old_set & new_set:
turnover += abs(new_weights[c] - holdings[c])
trade_cost = turnover * config['trade_cost'] / 2 # 单边已含在trade_cost中
n_trades += 1
else:
trade_cost = 0.0
holdings = new_weights
daily_returns.append({
'date': today,
'daily_return': port_ret - trade_cost,
})
# 计算绩效
result_df = pd.DataFrame(daily_returns).set_index('date')
result_df['nav'] = (1 + result_df['daily_return']).cumprod()
nav = result_df['nav']
total_return = nav.iloc[-1] / nav.iloc[0] - 1
days = (result_df.index[-1] - result_df.index[0]).days
cagr = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
daily_rets = result_df['daily_return']
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
peak = nav.cummax()
drawdown = (nav - peak) / peak
max_dd = drawdown.min()
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
win_rate = (daily_rets > 0).sum() / (daily_rets != 0).sum() if (daily_rets != 0).sum() > 0 else 0
years = days / 365
# 年度统计
win_years = 0
total_years = 0
for year, group in result_df.groupby(result_df.index.year):
yr = group['nav']
yr_ret = yr.iloc[-1] / yr.iloc[0] - 1
total_years += 1
if yr_ret > 0:
win_years += 1
return {
'label': label,
'target_num': target_num,
'weight_mode': weight_mode,
'total_return': total_return,
'cagr': cagr,
'sharpe': sharpe,
'max_dd': max_dd,
'calmar': calmar,
'win_rate': win_rate,
'n_trades': n_trades,
'trades_per_year': n_trades / years if years > 0 else 0,
'win_years': f"{win_years}/{total_years}",
'result_df': result_df,
}
# ==================== 主函数 ====================
def main():
from 动量 import CONFIG
config = CONFIG.copy()
# 强制使用 dynamic 模式
config['etf_pool'] = 'dynamic'
rebuild_interval = config.get('rebuild_interval', 60)
# 初始化缓存
from scripts.etf_data_cache import ETFDataCache
data_cache = ETFDataCache()
# 用 start_date 作为初始重建日期
init_ref_date = config['start_date'].replace('-', '')
etf_pool = resolve_etf_pool(config, ref_date=init_ref_date, data_cache=data_cache)
etf_codes = list(etf_pool.keys())
end_date = datetime.now().strftime('%Y-%m-%d')
print("=" * 70)
print(" 动量策略多持仓对比实验 (动态重建模式, 无前视偏差)")
print("=" * 70)
print(f" 初始ETF池 ({init_ref_date}): {len(etf_codes)}")
for code, name in etf_pool.items():
print(f" {code} {name}")
print(f" 回测区间: {config['start_date']} ~ {end_date}")
print(f" 重建间隔: {rebuild_interval} 交易日")
# 从缓存加载数据
print(f"\n{'='*70}")
print("从本地缓存加载数据...")
all_data = {}
# 加载所有可能用到的 ETF 数据 (初始池 + 后续可能加入的)
for code in etf_codes:
ohlcv = data_cache.load_cached_ohlcv(code)
if not ohlcv.empty:
all_data[code] = ohlcv
nav_data = {} # 动态模式下不使用净值数据
print(f"价格数据: {len(all_data)}")
# 构建交易日历
all_dates = set()
for df in all_data.values():
all_dates.update(df.index.tolist())
trade_dates = sorted(d for d in all_dates if d >= pd.Timestamp(config['start_date']))
print(f"交易日: {len(trade_dates)}")
# 6 组实验
experiments = [
{'target_num': 1, 'weight_mode': 'equal', 'label': 'A: 全仓1只'},
{'target_num': 3, 'weight_mode': 'equal', 'label': 'B: 等权3只'},
{'target_num': 3, 'weight_mode': 'inv_vol', 'label': 'C: 反波动率3只'},
{'target_num': 5, 'weight_mode': 'equal', 'label': 'D: 等权5只'},
{'target_num': 5, 'weight_mode': 'inv_vol', 'label': 'E: 反波动率5只'},
{'target_num': 'all_positive', 'weight_mode': 'equal', 'label': 'F: 动量>0全选等权'},
]
results = []
for exp in experiments:
print(f"\n{''*70}")
print(f" 运行: {exp['label']}...")
r = run_multi_backtest(
config, all_data, nav_data, trade_dates, etf_codes,
target_num=exp['target_num'],
weight_mode=exp['weight_mode'],
label=exp['label'],
data_cache=data_cache,
rebuild_interval=rebuild_interval,
)
results.append(r)
print(f" 完成: CAGR={r['cagr']:.2%}, MaxDD={r['max_dd']:.2%}, Sharpe={r['sharpe']:.2f}")
# 输出对比表
print(f"\n\n{'='*100}")
print(f"{'':>20s} 动量策略多持仓对比实验结果")
print(f"{'='*100}")
print(f" {'实验':<18s} {'累计收益':>10s} {'CAGR':>8s} {'夏普':>6s} {'最大回撤':>8s} {'Calmar':>8s} {'日胜率':>7s} {'调仓次':>6s} {'年调仓':>6s} {'盈利年':>7s}")
print(f"{''*100}")
for r in results:
print(f" {r['label']:<16s} {r['total_return']:>9.2%} {r['cagr']:>7.2%} {r['sharpe']:>6.2f} "
f"{r['max_dd']:>8.2%} {r['calmar']:>7.2f} {r['win_rate']:>6.2%} "
f"{r['n_trades']:>5d} {r['trades_per_year']:>6.1f} {r['win_years']:>7s}")
print(f"{'='*100}")
# 找出最优
best_sharpe = max(results, key=lambda x: x['sharpe'])
best_calmar = max(results, key=lambda x: x['calmar'])
best_cagr = max(results, key=lambda x: x['cagr'])
print(f"\n 最高夏普: {best_sharpe['label']} (Sharpe={best_sharpe['sharpe']:.2f})")
print(f" 最高Calmar: {best_calmar['label']} (Calmar={best_calmar['calmar']:.2f})")
print(f" 最高CAGR: {best_cagr['label']} (CAGR={best_cagr['cagr']:.2%})")
# 保存图表
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
matplotlib.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 10), height_ratios=[3, 1],
gridspec_kw={'hspace': 0.3})
colors = ['#e74c3c', '#3498db', '#2ecc71', '#f39c12', '#9b59b6']
for r, color in zip(results, colors):
nav = r['result_df']['nav']
ax1.plot(nav.index, nav, label=r['label'], linewidth=1.2, color=color)
ax1.set_title('动量策略多持仓对比 - 净值曲线', fontsize=14, fontweight='bold')
ax1.legend(loc='upper left', fontsize=10)
ax1.grid(True, alpha=0.3)
ax1.set_ylabel('净值')
ax1.set_yscale('log')
# 回撤
for r, color in zip(results, colors):
nav = r['result_df']['nav']
peak = nav.cummax()
dd = (nav - peak) / peak
ax2.plot(dd.index, dd, label=r['label'], linewidth=0.8, color=color, alpha=0.7)
ax2.set_title('回撤对比', fontsize=12)
ax2.set_ylabel('回撤')
ax2.grid(True, alpha=0.3)
ax2.legend(loc='lower left', fontsize=8)
chart_path = Path(__file__).parent.parent / 'results' / 'momentum_multi_experiment.png'
chart_path.parent.mkdir(exist_ok=True)
fig.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close(fig)
print(f"\n 对比图表已保存: {chart_path}")
except Exception as e:
print(f"\n 图表生成失败: {e}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,112 @@
"""
持仓数量 (select_num) 敏感度测试
测试 select_num 分别为 1, 2, 3, 4, 5 时的策略表现
基于最终精选的 11 只标的池
"""
import sys
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
import matplotlib.pyplot as plt
# 添加项目根目录
sys.path.insert(0, str(Path(__file__).parent.parent))
from strategies.rotation.engine import RotationStrategy
# ==================== 基础配置 ====================
FINAL_POOL = {
"399006.SZ": {"name": "创业板指", "market": "A", "etf": "159915.SZ"},
"H30269.CSI": {"name": "中证红利低波", "market": "A", "etf": "512890.SH"},
"000015.SH": {"name": "上证红利", "market": "A", "etf": "510880.SH"},
"NDX": {"name": "纳指100", "market": "US", "etf": "513100.SH"},
"N225": {"name": "日经225", "market": "JP", "etf": "513520.SH"},
"GDAXI": {"name": "德国DAX", "market": "EU", "etf": "513030.SH"},
"HSI": {"name": "恒生指数", "market": "HK", "etf": "159920.SZ"},
"HSTECH.HK": {"name": "恒生科技", "market": "HK", "etf": "513130.SH"},
"AU.SHF": {"name": "黄金", "market": "COMMODITY", "etf": "518880.SH"},
"CL.NYM": {"name": "原油", "market": "COMMODITY", "etf": "160723.SZ"},
"931862.CSI": {"name": "30年国债", "market": "BOND", "etf": "511090.SH"}
}
BASE_CONFIG = {
"start_date": "2019-01-01",
"end_date": datetime.now().strftime('%Y-%m-%d'),
"code_list": FINAL_POOL,
"factor_type": "weighted_momentum",
"auto_day": False, # 使用当前设定的固定窗口
"n_days": 25,
"diversified": True,
"rebalance_days": 1,
"rebalance_threshold": 0.0,
"trade_cost": 0.001,
"premium_control": {"enabled": True, "default_threshold": 0.10},
"use_cache": True,
"ssh_tunnel": {"enabled": True, "host": "8.218.167.69", "port": 22, "username": "root", "key_path": "hk_ecs.pem", "local_port": 1080}
}
def run_sensitivity_test():
test_values = [1, 2, 3, 4, 5]
results = []
for val in test_values:
print(f"\n测试 select_num = {val} ...")
cfg = BASE_CONFIG.copy()
cfg["select_num"] = val
strategy = RotationStrategy(cfg)
try:
res_df = strategy.run()
nav = res_df['轮动策略净值']
total_ret = nav.iloc[-1] - 1
days = (nav.index[-1] - nav.index[0]).days
cagr = (1 + total_ret)**(365.25/days) - 1
daily_ret = res_df['轮动策略日收益率']
sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0
peak = nav.cummax()
dd = (nav - peak) / peak
max_dd = dd.min()
results.append({
"select_num": val,
"total_ret": total_ret,
"cagr": cagr,
"max_dd": max_dd,
"sharpe": sharpe,
"nav": nav
})
except Exception as e:
print(f"测试失败 (select_num={val}): {e}")
# ==================== 汇总报告 ====================
print(f"\n\n{'='*90}")
print(f"{'持仓数量 (select_num) 敏感度测试报告':^90}")
print(f"{'='*90}")
print(f"{'持仓数':<10} | {'累计收益':>12} | {'年化(CAGR)':>12} | {'最大回撤':>12} | {'夏普比率':>10}")
print(f"{'-'*90}")
for r in results:
print(f"{r['select_num']:<10} | {r['total_ret']:>12.2%} | {r['cagr']:>12.2%} | {r['max_dd']:>12.2%} | {r['sharpe']:>10.2f}")
print(f"{'='*90}")
# ==================== 绘图 ====================
plt.figure(figsize=(14, 7))
for r in results:
plt.plot(r['nav'].index, r['nav'], label=f"select_num = {r['select_num']}")
plt.yscale('log')
plt.title("持仓数量对净值的影响 (select_num 1-5)", fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
output_path = Path(__file__).parent.parent / "results" / "select_num_test.png"
plt.savefig(output_path)
print(f"\n对比图表已保存至: {output_path}")
if __name__ == "__main__":
run_sensitivity_test()