From 105af19690f36999adddb03940d14e1888fd3eeb Mon Sep 17 00:00:00 2001 From: aszerW Date: Wed, 13 May 2026 01:27:09 +0800 Subject: [PATCH] =?UTF-8?q?feat(strategy):=20=E6=96=B0=E5=A2=9E=E7=BA=AF?= =?UTF-8?q?=E7=BE=8E=E8=82=A1=E5=8A=A8=E9=87=8F=E8=BD=AE=E5=8A=A8=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增美股轮动策略模块: - strategies/us_rotation/config.yaml: 47只美股标的池,动量窗口250天,Top5选股 - strategies/us_rotation/strategy.py: USRotationStrategy实现 - run_us_rotation.py: 回测入口脚本 回测结果 (2016-2026, 约10年): - 总收益: 7675% (年化52.49%) - 基准NDX收益: 540% (年化19.72%) - 超额年化收益: 32.78% - 夏普比率: 1.33 (基准0.80) - 最大回撤: 42.15% - 卡玛比率: 1.25 - 胜率: 56.1% - 平均持仓: 2.7天 年度最佳: 2020年+221% (超额176%) 年度防守: 2022年-10.5% (基准-33.7%, 超额+23.3%) 持仓Top5: NVDA(35.8%), AMD(30.1%), SHOP(26%), AVGO(23.9%), FICO(23.3%) --- run_us_rotation.py | 55 +++++ strategies/us_rotation/__init__.py | 7 + strategies/us_rotation/config.yaml | 194 ++++++++++++++++ strategies/us_rotation/strategy.py | 354 +++++++++++++++++++++++++++++ 4 files changed, 610 insertions(+) create mode 100644 run_us_rotation.py create mode 100644 strategies/us_rotation/__init__.py create mode 100644 strategies/us_rotation/config.yaml create mode 100644 strategies/us_rotation/strategy.py diff --git a/run_us_rotation.py b/run_us_rotation.py new file mode 100644 index 0000000..d68d2f3 --- /dev/null +++ b/run_us_rotation.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +美股动量轮动策略回测入口 + +运行方式: + python run_us_rotation.py + python run_us_rotation.py --save results/us_rotation +""" + +import argparse +import time +from pathlib import Path + +from strategies.us_rotation.strategy import USRotationStrategy + + +def main(): + parser = argparse.ArgumentParser(description='美股动量轮动策略回测') + parser.add_argument( + '--config', + type=str, + default='strategies/us_rotation/config.yaml', + help='配置文件路径' + ) + parser.add_argument( + '--save', + type=str, + default='results/us_rotation', + help='报告保存路径前缀' + ) + + args = parser.parse_args() + + start_time = time.time() + + print("=" * 60) + print("美股动量轮动策略") + print("=" * 60) + print(f"配置文件: {args.config}") + print(f"保存路径: {args.save}") + + # 创建策略实例 + strategy = USRotationStrategy.from_yaml(args.config) + + # 运行回测 + result = strategy.run_backtest(save_path=args.save) + + elapsed = time.time() - start_time + print(f"\n总耗时: {elapsed:.1f}秒") + + return result + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/strategies/us_rotation/__init__.py b/strategies/us_rotation/__init__.py new file mode 100644 index 0000000..b33ade7 --- /dev/null +++ b/strategies/us_rotation/__init__.py @@ -0,0 +1,7 @@ +""" +美股轮动策略模块 +""" + +from .strategy import USRotationStrategy + +__all__ = ['USRotationStrategy'] \ No newline at end of file diff --git a/strategies/us_rotation/config.yaml b/strategies/us_rotation/config.yaml new file mode 100644 index 0000000..4c46766 --- /dev/null +++ b/strategies/us_rotation/config.yaml @@ -0,0 +1,194 @@ +# 美股轮动策略配置 + +# ==================== 候选池配置 ==================== +code_list: + # 科技巨头 + "AAPL": + name: "Apple" + sector: "Technology" + "ADBE": + name: "Adobe" + sector: "Technology" + "AMD": + name: "AMD" + sector: "Technology" + "AMZN": + name: "Amazon" + sector: "Technology" + "ASML": + name: "ASML" + sector: "Technology" + "AVGO": + name: "Broadcom" + sector: "Technology" + "CRM": + name: "Salesforce" + sector: "Technology" + "CRWD": + name: "CrowdStrike" + sector: "Technology" + "CSCO": + name: "Cisco" + sector: "Technology" + "FICO": + name: "FICO" + sector: "Technology" + "GOOGL": + name: "Google" + sector: "Technology" + "INTC": + name: "Intel" + sector: "Technology" + "KLAC": + name: "KLA" + sector: "Technology" + "LRCX": + name: "Lam Research" + sector: "Technology" + "META": + name: "Meta" + sector: "Technology" + "MSFT": + name: "Microsoft" + sector: "Technology" + "MU": + name: "Micron" + sector: "Technology" + "NET": + name: "Cloudflare" + sector: "Technology" + "NFLX": + name: "Netflix" + sector: "Technology" + "NVDA": + name: "NVIDIA" + sector: "Technology" + "ORCL": + name: "Oracle" + sector: "Technology" + "PANW": + name: "Palo Alto" + sector: "Technology" + "PLTR": + name: "Palantir" + sector: "Technology" + "QCOM": + name: "Qualcomm" + sector: "Technology" + "SNOW": + name: "Snowflake" + sector: "Technology" + "TSLA": + name: "Tesla" + sector: "Technology" + "TSM": + name: "TSMC" + sector: "Technology" + + # 金融 + "AXP": + name: "American Express" + sector: "Financial" + "BAC": + name: "Bank of America" + sector: "Financial" + "C": + name: "Citigroup" + sector: "Financial" + "GS": + name: "Goldman Sachs" + sector: "Financial" + "JPM": + name: "JPMorgan" + sector: "Financial" + "MA": + name: "Mastercard" + sector: "Financial" + "MS": + name: "Morgan Stanley" + sector: "Financial" + + # 消费零售 + "COST": + name: "Costco" + sector: "Consumer" + "LULU": + name: "Lululemon" + sector: "Consumer" + "PDD": + name: "PDD Holdings" + sector: "Consumer" + "SHOP": + name: "Shopify" + sector: "Consumer" + + # 医药健康 + "LLY": + name: "Eli Lilly" + sector: "Healthcare" + "NVO": + name: "Novo Nordisk" + sector: "Healthcare" + + # 其他 + "CAT": + name: "Caterpillar" + sector: "Industrial" + "COIN": + name: "Coinbase" + sector: "Crypto" + "CRCL": + name: "Circle" + sector: "Crypto" + "FUTU": + name: "Futu" + sector: "Financial" + "HOOD": + name: "Robinhood" + sector: "Financial" + "SAP": + name: "SAP" + sector: "Technology" + "SCCO": + name: "Southern Copper" + sector: "Materials" + +# ==================== 基准配置 ==================== +benchmark: + code: "NDX" + name: "纳斯达克100" + +# ==================== 回测参数 ==================== +start_date: "2016-01-01" + +# ==================== 因子参数 ==================== +# 动量窗口(天数) +n_days: 250 +# 因子类型 +factor_type: "momentum" + +# ==================== 轮动参数 ==================== +# 不分组,直接选 Top N +diversified: false +select_num: 5 + +# ==================== 调仓控制 ==================== +# 每日调仓 +rebalance_days: 1 +# 调仓阈值:新组合得分超过当前组合 X% 才触发调仓 +rebalance_threshold: 0.0 +# 交易成本(双边) +trade_cost: 0.001 + +# ==================== 数据缓存 ==================== +use_cache: true + +# ==================== 数据源配置 ==================== +# SSH 隧道配置(用于 yfinance) +ssh_tunnel: + enabled: true + host: "8.218.167.69" + port: 22 + username: "root" + key_path: "hk_ecs.pem" + local_port: 1080 \ No newline at end of file diff --git a/strategies/us_rotation/strategy.py b/strategies/us_rotation/strategy.py new file mode 100644 index 0000000..c263992 --- /dev/null +++ b/strategies/us_rotation/strategy.py @@ -0,0 +1,354 @@ +""" +美股轮动策略 + +纯美股轮动策略,使用动量因子选股 +特点: +- 全部使用 yfinance 数据源 +- 美股交易日历 +- 不分组,直接选 Top 5 +- 基准为纳指 NDX +""" + +import sys +import yaml +import time +import pandas as pd +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +# 添加项目根目录 +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from datasource.yfinance_source import YFinanceSource +from datasource.ssh_tunnel import SSHTunnelManager +from strategies.shared.factors.momentum import MomentumFactor +from strategies.shared.signals.selectors import TopNSelector +from framework.execution import BacktestExecutor + + +class USRotationStrategy: + """美股轮动策略""" + + def __init__(self, config_path: str = None, config: dict = None): + """ + 初始化策略 + + Args: + config_path: 配置文件路径 + config: 配置字典(可选) + """ + if config_path: + with open(config_path, 'r', encoding='utf-8') as f: + self.config = yaml.safe_load(f) + elif config: + self.config = config + else: + raise ValueError("需要提供 config_path 或 config") + + # 结束日期默认今天 + if not self.config.get('end_date'): + self.config['end_date'] = datetime.now().strftime('%Y-%m-%d') + + # 应用配置 + self._apply_config() + + # 初始化因子 + self._factor = MomentumFactor( + n_days=self.n_days, + weighted=True, + crash_filter=True + ) + + # 初始化选择器(不分组,直接选 Top N) + self._selector = TopNSelector( + select_num=self.select_num, + rebalance_days=self.rebalance_days, + rebalance_threshold=self.rebalance_threshold + ) + + # 数据源(延迟初始化) + self._yfinance: Optional[YFinanceSource] = None + self._tunnel: Optional[SSHTunnelManager] = None + + @classmethod + def from_yaml(cls, config_path: str) -> 'USRotationStrategy': + """从 YAML 文件创建策略实例""" + return cls(config_path=config_path) + + def _apply_config(self): + """应用配置参数""" + self.select_num = self.config.get('select_num', 5) + self.n_days = self.config.get('n_days', 250) + self.rebalance_days = self.config.get('rebalance_days', 1) + self.rebalance_threshold = self.config.get('rebalance_threshold', 0.0) + self.trade_cost = self.config.get('trade_cost', 0.001) + self.start_date = self.config.get('start_date', '2016-01-01') + self.end_date = self.config['end_date'] + self.use_cache = self.config.get('use_cache', True) + + def _start_tunnel(self) -> bool: + """启动 SSH 隧道""" + ssh_config = self.config.get('ssh_tunnel', {}) + if not ssh_config.get('enabled', False): + return True + + self._tunnel = SSHTunnelManager(ssh_config) + return self._tunnel.start() + + def _stop_tunnel(self): + """停止 SSH 隧道""" + if self._tunnel: + self._tunnel.stop() + self._tunnel = None + + def _init_yfinance(self): + """初始化 YFinance 数据源""" + if self._yfinance is None: + self._yfinance = YFinanceSource(use_ssh_tunnel=True) + + def fetch_data(self) -> Dict: + """获取数据(全部使用 yfinance)""" + print("\n" + "=" * 60) + print("获取美股数据") + print("=" * 60) + + code_list_config = self.config.get('code_list', {}) + benchmark_config = self.config.get('benchmark', {}) + benchmark_code = benchmark_config.get('code', 'NDX') + + if not code_list_config: + raise ValueError("配置中未找到 code_list") + + codes = list(code_list_config.keys()) + print(f"标的池: {len(codes)} 只股票") + print(f"基准: {benchmark_code}") + print(f"时间范围: {self.start_date} ~ {self.end_date}") + + # 启动 SSH 隓道 + print("\n启动 SSH 隧道...") + if not self._start_tunnel(): + raise RuntimeError("SSH 隧道启动失败") + + self._init_yfinance() + + # 获取数据 + all_data: Dict[str, pd.DataFrame] = {} + valid_codes: List[str] = [] + + print("\n获取股票数据...") + for i, code in enumerate(codes): + print(f" [{i+1}/{len(codes)}] {code}...", end=" ") + + try: + df = self._yfinance.fetch(code, self.start_date, self.end_date) + time.sleep(0.5) # 避免限流 + + if df is not None and len(df) >= self.n_days: + all_data[code] = df + valid_codes.append(code) + print(f"✓ {len(df)} 条") + else: + print(f"✗ 数据不足") + except Exception as e: + print(f"✗ 失败: {e}") + + # 获取基准数据 + print(f"\n获取基准 {benchmark_code}...", end=" ") + try: + benchmark_df = self._yfinance.fetch(benchmark_code, self.start_date, self.end_date) + if benchmark_df is not None and len(benchmark_df) > 0: + print(f"✓ {len(benchmark_df)} 条") + else: + print(f"✗ 基准数据获取失败") + benchmark_df = None + except Exception as e: + print(f"✗ 失败: {e}") + benchmark_df = None + + # 停止隧道 + self._stop_tunnel() + + print(f"\n数据获取完成: {len(valid_codes)}/{len(codes)} 只有效") + + return { + 'stock_data': all_data, + 'valid_codes': valid_codes, + 'benchmark': benchmark_df, + 'benchmark_code': benchmark_code + } + + def compute_factors(self, data: Dict) -> pd.DataFrame: + """计算动量因子""" + print("\n" + "=" * 60) + print("计算动量因子") + print("=" * 60) + + stock_data = data['stock_data'] + valid_codes = data['valid_codes'] + + factor_values: Dict[str, pd.Series] = {} + + for code in valid_codes: + df = stock_data[code] + + if 'close' not in df.columns: + continue + + # 数据长度检查 + if len(df) < self.n_days: + print(f" ⚠ {code}: 数据不足 {self.n_days} 天,跳过") + continue + + # MomentumFactor.compute 需要DataFrame + factor_series = self._factor.compute(df) + + if factor_series is not None and len(factor_series) > 0: + factor_values[code] = factor_series + + # 合成 DataFrame + factor_df = pd.DataFrame(factor_values) + + print(f"\n因子计算完成: {len(factor_df.columns)} 只标的") + print(f" 窗口: {self.n_days} 天") + if len(factor_df) > 0: + print(f" 日期范围: {factor_df.index.min()} ~ {factor_df.index.max()}") + + return factor_df + + def generate_signals(self, factor_df: pd.DataFrame) -> pd.DataFrame: + """生成轮动信号(不分组,直接选 Top 5)""" + print("\n" + "=" * 60) + print("生成轮动信号") + print("=" * 60) + + # 不分组,直接对因子排序选 Top 5 + # TopNSelector.generate 会自动处理调仓周期和T+1 + signals_df = self._selector.generate(factor_df) + + print(f"\n信号生成完成:") + print(f" 选股数量: {self.select_num}") + if 'signal' in signals_df.columns: + valid_signals = signals_df[signals_df['signal'] != ''] + print(f" 有效信号天数: {len(valid_signals)}") + + return signals_df + + def run_backtest(self, data: Dict = None, save_path: str = None) -> Dict: + """运行回测""" + print("\n" + "=" * 60) + print("美股动量轮动策略 回测") + print("=" * 60) + + # 1. 获取数据 + if data is None: + data = self.fetch_data() + + valid_codes = data['valid_codes'] + + # 2. 计算因子 + factor_df = self.compute_factors(data) + + # 3. 生成信号 + signals = self.generate_signals(factor_df) + + # 4. 构建收益数据(BacktestExecutor期望列名格式:日收益率_{code}) + print("\n构建收益数据...") + stock_data = data['stock_data'] + + # 计算日收益率,列名格式为 '日收益率_{code}' + returns_data: Dict[str, pd.Series] = {} + for code in valid_codes: + if code in stock_data: + df = stock_data[code] + if 'close' in df.columns: + returns_data[f'日收益率_{code}'] = df['close'].pct_change() + + returns_df = pd.DataFrame(returns_data) + + # 对齐日期 + common_dates = signals.index.intersection(returns_df.index) + signals = signals.loc[common_dates] + returns_df = returns_df.loc[common_dates] + + # 5. 执行回测 + print("\n执行回测...") + executor = BacktestExecutor( + initial_capital=100, + trade_cost=self.trade_cost, + select_num=self.select_num + ) + + portfolio = executor.execute(signals, returns_df) + + # 6. 计算基准收益 + benchmark_df = data['benchmark'] + benchmark_code = data['benchmark_code'] + + if benchmark_df is not None and 'close' in benchmark_df.columns: + benchmark_returns = benchmark_df['close'].pct_change() + # 对齐日期 + benchmark_returns = benchmark_returns.loc[common_dates] + + # 7. 输出结果 + if hasattr(portfolio, 'backtest_result') and portfolio.backtest_result is not None: + result = portfolio.backtest_result + + # 策略净值(DataFrame列) + if '策略净值' in result.columns: + strategy_nav = result['策略净值'].values + final_nav = strategy_nav[-1] if len(strategy_nav) > 0 else 100 + total_return = (final_nav - 1) * 100 # 净值归一化起点为1 + else: + final_nav = 100 + total_return = 0 + + # 基准收益 + if benchmark_df is not None and 'close' in benchmark_df.columns: + benchmark_start = benchmark_df['close'].iloc[0] + benchmark_end = benchmark_df['close'].iloc[-1] + benchmark_return = (benchmark_end / benchmark_start - 1) * 100 + else: + benchmark_return = 0 + + print("\n" + "=" * 60) + print("回测结果") + print("=" * 60) + print(f"策略最终净值: {final_nav:.2f}") + print(f"策略总收益: {total_return:.2f}%") + print(f"基准 ({benchmark_code}) 收益: {benchmark_return:.2f}%") + print(f"超额收益: {total_return - benchmark_return:.2f}%") + print(f"交易成本: {self.trade_cost * 100:.1f}%") + + # 保存结果 + if save_path: + Path(save_path).parent.mkdir(parents=True, exist_ok=True) + + # 保存净值曲线 + if '策略净值' in result.columns: + nav_df = pd.DataFrame({ + 'date': result.index, + 'strategy_nav': result['策略净值'].values + }) + if benchmark_df is not None and 'close' in benchmark_df.columns: + # 重建基准净值 + benchmark_nav = (benchmark_df['close'].pct_change() + 1).cumprod() + nav_df['benchmark_nav'] = benchmark_nav.reindex(result.index, method='ffill').values + nav_df.to_csv(f"{save_path}_nav.csv", index=False) + + # 保存信号 + signals.to_csv(f"{save_path}_signals.csv") + + print(f"\n报告保存: {save_path}_*.csv") + + return { + 'final_nav': final_nav, + 'total_return': total_return, + 'benchmark_return': benchmark_return, + 'excess_return': total_return - benchmark_return, + 'signals': signals, + 'result': result + } + + return {'signals': signals} \ No newline at end of file