feat(strategy): 新增纯美股动量轮动策略

新增美股轮动策略模块:
- strategies/us_rotation/config.yaml: 47只美股标的池,动量窗口250天,Top5选股
- strategies/us_rotation/strategy.py: USRotationStrategy实现
- run_us_rotation.py: 回测入口脚本

回测结果 (2016-2026, 约10年):
- 总收益: 7675% (年化52.49%)
- 基准NDX收益: 540% (年化19.72%)
- 超额年化收益: 32.78%
- 夏普比率: 1.33 (基准0.80)
- 最大回撤: 42.15%
- 卡玛比率: 1.25
- 胜率: 56.1%
- 平均持仓: 2.7天

年度最佳: 2020年+221% (超额176%)
年度防守: 2022年-10.5% (基准-33.7%, 超额+23.3%)

持仓Top5: NVDA(35.8%), AMD(30.1%), SHOP(26%), AVGO(23.9%), FICO(23.3%)
This commit is contained in:
2026-05-13 01:27:09 +08:00
parent a712bc0f03
commit 105af19690
4 changed files with 610 additions and 0 deletions

55
run_us_rotation.py Normal file
View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
美股动量轮动策略回测入口
运行方式:
python run_us_rotation.py
python run_us_rotation.py --save results/us_rotation
"""
import argparse
import time
from pathlib import Path
from strategies.us_rotation.strategy import USRotationStrategy
def main():
parser = argparse.ArgumentParser(description='美股动量轮动策略回测')
parser.add_argument(
'--config',
type=str,
default='strategies/us_rotation/config.yaml',
help='配置文件路径'
)
parser.add_argument(
'--save',
type=str,
default='results/us_rotation',
help='报告保存路径前缀'
)
args = parser.parse_args()
start_time = time.time()
print("=" * 60)
print("美股动量轮动策略")
print("=" * 60)
print(f"配置文件: {args.config}")
print(f"保存路径: {args.save}")
# 创建策略实例
strategy = USRotationStrategy.from_yaml(args.config)
# 运行回测
result = strategy.run_backtest(save_path=args.save)
elapsed = time.time() - start_time
print(f"\n总耗时: {elapsed:.1f}")
return result
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,7 @@
"""
美股轮动策略模块
"""
from .strategy import USRotationStrategy
__all__ = ['USRotationStrategy']

View File

@@ -0,0 +1,194 @@
# 美股轮动策略配置
# ==================== 候选池配置 ====================
code_list:
# 科技巨头
"AAPL":
name: "Apple"
sector: "Technology"
"ADBE":
name: "Adobe"
sector: "Technology"
"AMD":
name: "AMD"
sector: "Technology"
"AMZN":
name: "Amazon"
sector: "Technology"
"ASML":
name: "ASML"
sector: "Technology"
"AVGO":
name: "Broadcom"
sector: "Technology"
"CRM":
name: "Salesforce"
sector: "Technology"
"CRWD":
name: "CrowdStrike"
sector: "Technology"
"CSCO":
name: "Cisco"
sector: "Technology"
"FICO":
name: "FICO"
sector: "Technology"
"GOOGL":
name: "Google"
sector: "Technology"
"INTC":
name: "Intel"
sector: "Technology"
"KLAC":
name: "KLA"
sector: "Technology"
"LRCX":
name: "Lam Research"
sector: "Technology"
"META":
name: "Meta"
sector: "Technology"
"MSFT":
name: "Microsoft"
sector: "Technology"
"MU":
name: "Micron"
sector: "Technology"
"NET":
name: "Cloudflare"
sector: "Technology"
"NFLX":
name: "Netflix"
sector: "Technology"
"NVDA":
name: "NVIDIA"
sector: "Technology"
"ORCL":
name: "Oracle"
sector: "Technology"
"PANW":
name: "Palo Alto"
sector: "Technology"
"PLTR":
name: "Palantir"
sector: "Technology"
"QCOM":
name: "Qualcomm"
sector: "Technology"
"SNOW":
name: "Snowflake"
sector: "Technology"
"TSLA":
name: "Tesla"
sector: "Technology"
"TSM":
name: "TSMC"
sector: "Technology"
# 金融
"AXP":
name: "American Express"
sector: "Financial"
"BAC":
name: "Bank of America"
sector: "Financial"
"C":
name: "Citigroup"
sector: "Financial"
"GS":
name: "Goldman Sachs"
sector: "Financial"
"JPM":
name: "JPMorgan"
sector: "Financial"
"MA":
name: "Mastercard"
sector: "Financial"
"MS":
name: "Morgan Stanley"
sector: "Financial"
# 消费零售
"COST":
name: "Costco"
sector: "Consumer"
"LULU":
name: "Lululemon"
sector: "Consumer"
"PDD":
name: "PDD Holdings"
sector: "Consumer"
"SHOP":
name: "Shopify"
sector: "Consumer"
# 医药健康
"LLY":
name: "Eli Lilly"
sector: "Healthcare"
"NVO":
name: "Novo Nordisk"
sector: "Healthcare"
# 其他
"CAT":
name: "Caterpillar"
sector: "Industrial"
"COIN":
name: "Coinbase"
sector: "Crypto"
"CRCL":
name: "Circle"
sector: "Crypto"
"FUTU":
name: "Futu"
sector: "Financial"
"HOOD":
name: "Robinhood"
sector: "Financial"
"SAP":
name: "SAP"
sector: "Technology"
"SCCO":
name: "Southern Copper"
sector: "Materials"
# ==================== 基准配置 ====================
benchmark:
code: "NDX"
name: "纳斯达克100"
# ==================== 回测参数 ====================
start_date: "2016-01-01"
# ==================== 因子参数 ====================
# 动量窗口(天数)
n_days: 250
# 因子类型
factor_type: "momentum"
# ==================== 轮动参数 ====================
# 不分组,直接选 Top N
diversified: false
select_num: 5
# ==================== 调仓控制 ====================
# 每日调仓
rebalance_days: 1
# 调仓阈值:新组合得分超过当前组合 X% 才触发调仓
rebalance_threshold: 0.0
# 交易成本(双边)
trade_cost: 0.001
# ==================== 数据缓存 ====================
use_cache: true
# ==================== 数据源配置 ====================
# SSH 隧道配置(用于 yfinance
ssh_tunnel:
enabled: true
host: "8.218.167.69"
port: 22
username: "root"
key_path: "hk_ecs.pem"
local_port: 1080

View File

@@ -0,0 +1,354 @@
"""
美股轮动策略
纯美股轮动策略,使用动量因子选股
特点:
- 全部使用 yfinance 数据源
- 美股交易日历
- 不分组,直接选 Top 5
- 基准为纳指 NDX
"""
import sys
import yaml
import time
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
# 添加项目根目录
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from datasource.yfinance_source import YFinanceSource
from datasource.ssh_tunnel import SSHTunnelManager
from strategies.shared.factors.momentum import MomentumFactor
from strategies.shared.signals.selectors import TopNSelector
from framework.execution import BacktestExecutor
class USRotationStrategy:
"""美股轮动策略"""
def __init__(self, config_path: str = None, config: dict = None):
"""
初始化策略
Args:
config_path: 配置文件路径
config: 配置字典(可选)
"""
if config_path:
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
elif config:
self.config = config
else:
raise ValueError("需要提供 config_path 或 config")
# 结束日期默认今天
if not self.config.get('end_date'):
self.config['end_date'] = datetime.now().strftime('%Y-%m-%d')
# 应用配置
self._apply_config()
# 初始化因子
self._factor = MomentumFactor(
n_days=self.n_days,
weighted=True,
crash_filter=True
)
# 初始化选择器(不分组,直接选 Top N
self._selector = TopNSelector(
select_num=self.select_num,
rebalance_days=self.rebalance_days,
rebalance_threshold=self.rebalance_threshold
)
# 数据源(延迟初始化)
self._yfinance: Optional[YFinanceSource] = None
self._tunnel: Optional[SSHTunnelManager] = None
@classmethod
def from_yaml(cls, config_path: str) -> 'USRotationStrategy':
"""从 YAML 文件创建策略实例"""
return cls(config_path=config_path)
def _apply_config(self):
"""应用配置参数"""
self.select_num = self.config.get('select_num', 5)
self.n_days = self.config.get('n_days', 250)
self.rebalance_days = self.config.get('rebalance_days', 1)
self.rebalance_threshold = self.config.get('rebalance_threshold', 0.0)
self.trade_cost = self.config.get('trade_cost', 0.001)
self.start_date = self.config.get('start_date', '2016-01-01')
self.end_date = self.config['end_date']
self.use_cache = self.config.get('use_cache', True)
def _start_tunnel(self) -> bool:
"""启动 SSH 隧道"""
ssh_config = self.config.get('ssh_tunnel', {})
if not ssh_config.get('enabled', False):
return True
self._tunnel = SSHTunnelManager(ssh_config)
return self._tunnel.start()
def _stop_tunnel(self):
"""停止 SSH 隧道"""
if self._tunnel:
self._tunnel.stop()
self._tunnel = None
def _init_yfinance(self):
"""初始化 YFinance 数据源"""
if self._yfinance is None:
self._yfinance = YFinanceSource(use_ssh_tunnel=True)
def fetch_data(self) -> Dict:
"""获取数据(全部使用 yfinance"""
print("\n" + "=" * 60)
print("获取美股数据")
print("=" * 60)
code_list_config = self.config.get('code_list', {})
benchmark_config = self.config.get('benchmark', {})
benchmark_code = benchmark_config.get('code', 'NDX')
if not code_list_config:
raise ValueError("配置中未找到 code_list")
codes = list(code_list_config.keys())
print(f"标的池: {len(codes)} 只股票")
print(f"基准: {benchmark_code}")
print(f"时间范围: {self.start_date} ~ {self.end_date}")
# 启动 SSH 隓道
print("\n启动 SSH 隧道...")
if not self._start_tunnel():
raise RuntimeError("SSH 隧道启动失败")
self._init_yfinance()
# 获取数据
all_data: Dict[str, pd.DataFrame] = {}
valid_codes: List[str] = []
print("\n获取股票数据...")
for i, code in enumerate(codes):
print(f" [{i+1}/{len(codes)}] {code}...", end=" ")
try:
df = self._yfinance.fetch(code, self.start_date, self.end_date)
time.sleep(0.5) # 避免限流
if df is not None and len(df) >= self.n_days:
all_data[code] = df
valid_codes.append(code)
print(f"{len(df)}")
else:
print(f"✗ 数据不足")
except Exception as e:
print(f"✗ 失败: {e}")
# 获取基准数据
print(f"\n获取基准 {benchmark_code}...", end=" ")
try:
benchmark_df = self._yfinance.fetch(benchmark_code, self.start_date, self.end_date)
if benchmark_df is not None and len(benchmark_df) > 0:
print(f"{len(benchmark_df)}")
else:
print(f"✗ 基准数据获取失败")
benchmark_df = None
except Exception as e:
print(f"✗ 失败: {e}")
benchmark_df = None
# 停止隧道
self._stop_tunnel()
print(f"\n数据获取完成: {len(valid_codes)}/{len(codes)} 只有效")
return {
'stock_data': all_data,
'valid_codes': valid_codes,
'benchmark': benchmark_df,
'benchmark_code': benchmark_code
}
def compute_factors(self, data: Dict) -> pd.DataFrame:
"""计算动量因子"""
print("\n" + "=" * 60)
print("计算动量因子")
print("=" * 60)
stock_data = data['stock_data']
valid_codes = data['valid_codes']
factor_values: Dict[str, pd.Series] = {}
for code in valid_codes:
df = stock_data[code]
if 'close' not in df.columns:
continue
# 数据长度检查
if len(df) < self.n_days:
print(f"{code}: 数据不足 {self.n_days} 天,跳过")
continue
# MomentumFactor.compute 需要DataFrame
factor_series = self._factor.compute(df)
if factor_series is not None and len(factor_series) > 0:
factor_values[code] = factor_series
# 合成 DataFrame
factor_df = pd.DataFrame(factor_values)
print(f"\n因子计算完成: {len(factor_df.columns)} 只标的")
print(f" 窗口: {self.n_days}")
if len(factor_df) > 0:
print(f" 日期范围: {factor_df.index.min()} ~ {factor_df.index.max()}")
return factor_df
def generate_signals(self, factor_df: pd.DataFrame) -> pd.DataFrame:
"""生成轮动信号(不分组,直接选 Top 5"""
print("\n" + "=" * 60)
print("生成轮动信号")
print("=" * 60)
# 不分组,直接对因子排序选 Top 5
# TopNSelector.generate 会自动处理调仓周期和T+1
signals_df = self._selector.generate(factor_df)
print(f"\n信号生成完成:")
print(f" 选股数量: {self.select_num}")
if 'signal' in signals_df.columns:
valid_signals = signals_df[signals_df['signal'] != '']
print(f" 有效信号天数: {len(valid_signals)}")
return signals_df
def run_backtest(self, data: Dict = None, save_path: str = None) -> Dict:
"""运行回测"""
print("\n" + "=" * 60)
print("美股动量轮动策略 回测")
print("=" * 60)
# 1. 获取数据
if data is None:
data = self.fetch_data()
valid_codes = data['valid_codes']
# 2. 计算因子
factor_df = self.compute_factors(data)
# 3. 生成信号
signals = self.generate_signals(factor_df)
# 4. 构建收益数据BacktestExecutor期望列名格式日收益率_{code}
print("\n构建收益数据...")
stock_data = data['stock_data']
# 计算日收益率,列名格式为 '日收益率_{code}'
returns_data: Dict[str, pd.Series] = {}
for code in valid_codes:
if code in stock_data:
df = stock_data[code]
if 'close' in df.columns:
returns_data[f'日收益率_{code}'] = df['close'].pct_change()
returns_df = pd.DataFrame(returns_data)
# 对齐日期
common_dates = signals.index.intersection(returns_df.index)
signals = signals.loc[common_dates]
returns_df = returns_df.loc[common_dates]
# 5. 执行回测
print("\n执行回测...")
executor = BacktestExecutor(
initial_capital=100,
trade_cost=self.trade_cost,
select_num=self.select_num
)
portfolio = executor.execute(signals, returns_df)
# 6. 计算基准收益
benchmark_df = data['benchmark']
benchmark_code = data['benchmark_code']
if benchmark_df is not None and 'close' in benchmark_df.columns:
benchmark_returns = benchmark_df['close'].pct_change()
# 对齐日期
benchmark_returns = benchmark_returns.loc[common_dates]
# 7. 输出结果
if hasattr(portfolio, 'backtest_result') and portfolio.backtest_result is not None:
result = portfolio.backtest_result
# 策略净值DataFrame列
if '策略净值' in result.columns:
strategy_nav = result['策略净值'].values
final_nav = strategy_nav[-1] if len(strategy_nav) > 0 else 100
total_return = (final_nav - 1) * 100 # 净值归一化起点为1
else:
final_nav = 100
total_return = 0
# 基准收益
if benchmark_df is not None and 'close' in benchmark_df.columns:
benchmark_start = benchmark_df['close'].iloc[0]
benchmark_end = benchmark_df['close'].iloc[-1]
benchmark_return = (benchmark_end / benchmark_start - 1) * 100
else:
benchmark_return = 0
print("\n" + "=" * 60)
print("回测结果")
print("=" * 60)
print(f"策略最终净值: {final_nav:.2f}")
print(f"策略总收益: {total_return:.2f}%")
print(f"基准 ({benchmark_code}) 收益: {benchmark_return:.2f}%")
print(f"超额收益: {total_return - benchmark_return:.2f}%")
print(f"交易成本: {self.trade_cost * 100:.1f}%")
# 保存结果
if save_path:
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
# 保存净值曲线
if '策略净值' in result.columns:
nav_df = pd.DataFrame({
'date': result.index,
'strategy_nav': result['策略净值'].values
})
if benchmark_df is not None and 'close' in benchmark_df.columns:
# 重建基准净值
benchmark_nav = (benchmark_df['close'].pct_change() + 1).cumprod()
nav_df['benchmark_nav'] = benchmark_nav.reindex(result.index, method='ffill').values
nav_df.to_csv(f"{save_path}_nav.csv", index=False)
# 保存信号
signals.to_csv(f"{save_path}_signals.csv")
print(f"\n报告保存: {save_path}_*.csv")
return {
'final_nav': final_nav,
'total_return': total_return,
'benchmark_return': benchmark_return,
'excess_return': total_return - benchmark_return,
'signals': signals,
'result': result
}
return {'signals': signals}