From 9ea84f0e57d372bb0168d68a8e1f9e163a784e4c Mon Sep 17 00:00:00 2001 From: aszerW Date: Thu, 19 Mar 2026 20:38:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(rotation):=20=E6=94=AF=E6=8C=81=E6=B7=B7?= =?UTF-8?q?=E5=90=88=E6=95=B0=E6=8D=AE=E6=BA=90=E5=B9=B6=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=9B=A0=E5=AD=90=E8=AE=A1=E7=AE=97=E5=92=8C=E7=AD=96=E7=95=A5?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除旧的Tushare Token环境变量函数,简化配置 - 在配置文件中新增全市场指数及SSH隧道配置支持YFinance数据访问 - 更新compute_factors函数,支持长格式混合数据源,兼容旧宽格式数据 - 修改RotationStrategy使用HybridDataSource,支持Tushare与YFinance数据源混合 - 添加SSH隧道支持,实现安全访问非主市场数据 - 优化因子计算逻辑,提升缺失值处理和因子合并的鲁棒性 - 修正基准净值计算,兼容长宽格式基准数据处理 - 增强信号生成逻辑,处理因子得分中的NaN情况防止异常 --- config/settings.py | 9 --- config/strategies/rotation.yaml | 23 ++++++- core/factors/momentum.py | 104 ++++++++++++++++++++++++-------- strategies/rotation/engine.py | 49 ++++++++++++--- 4 files changed, 139 insertions(+), 46 deletions(-) diff --git a/config/settings.py b/config/settings.py index 5039d33..bf04629 100644 --- a/config/settings.py +++ b/config/settings.py @@ -25,15 +25,6 @@ DATA_CACHE_DIR = PROJECT_ROOT / "data_cache" DATA_CACHE_DIR.mkdir(exist_ok=True) -# ==================== API配置 ==================== -def get_tushare_token() -> str: - """从环境变量获取Tushare Token""" - token = os.getenv("TUSHARE_TOKEN") - if not token: - raise ValueError("请设置环境变量 TUSHARE_TOKEN") - return token - - # ==================== 钉钉配置 ==================== def get_dingtalk_config() -> dict: """从环境变量获取钉钉配置""" diff --git a/config/strategies/rotation.yaml b/config/strategies/rotation.yaml index ab271e1..2800f7d 100644 --- a/config/strategies/rotation.yaml +++ b/config/strategies/rotation.yaml @@ -4,6 +4,7 @@ # A股全行业指数配置(Tushare格式:XXXXXX.SH / XXXXXX.SZ) # 格式: {代码: 名称} code_list: + # 中国A股指数 (使用 Tushare) - 主市场,交易日基准 # 宽基指数 "000300.SH": "沪深300" "000905.SH": "中证500" @@ -29,10 +30,20 @@ code_list: "399967.SZ": "中证军工" "000949.SH": "中证农业" "399702.SZ": "国债指数" + # 全球市场指数 (使用 YFinance) - 非主市场,数据会前向填充到A股交易日 + "HSTECH": "恒生科技" # 港股 + "NDX": "纳斯达克100" # 美股 + "BTC": "比特币" # 加密货币 + "ETH": "以太坊" # 加密货币 + +# 主市场配置(用于确定交易日历) +primary_market: + source: "Tushare" # 以A股交易日为基准 + code: "000300.SH" # 基准指数 # 基准指数配置 benchmark: - code: "000300.SH" + code: "000300.SH" # 中国A股指数使用 Tushare 格式 name: "沪深300指数" # ==================== 回测参数 ==================== @@ -60,3 +71,13 @@ trade_cost: 0.001 # ==================== 数据缓存 ==================== # 是否使用本地缓存(True=优先从本地读取) use_cache: true + +# ==================== 数据源配置 ==================== +# SSH 隧道配置(用于网络受限环境,通过境外服务器访问 yfinance) +ssh_tunnel: + enabled: true # 是否启用 SSH 隧道 + host: "8.218.167.69" # SSH 服务器地址(阿里云香港 ECS IP) + port: 22 # SSH 端口 + username: "root" # SSH 用户名 + key_path: "/Users/aszer/Documents/vscode/etf/hk_ecs.pem" # SSH 私钥路径 + local_port: 1080 # 本地 SOCKS5 代理端口 diff --git a/core/factors/momentum.py b/core/factors/momentum.py index 9d86893..315cc80 100644 --- a/core/factors/momentum.py +++ b/core/factors/momentum.py @@ -87,9 +87,10 @@ def compute_factors( ) -> tuple[pd.DataFrame, list]: """ 计算所有指数的因子和日收益率 + 支持长格式数据(混合数据源:Tushare + YFinance) Args: - etf_data: DataFrame, 宽表格式的收盘价 + etf_data: DataFrame, 长格式数据,包含 [code, close, source] 列 code_list: 指数代码列表 n: 动量/趋势窗口 factor_type: 'momentum' 或 'slope_r2' @@ -97,36 +98,87 @@ def compute_factors( Returns: tuple: (result_df, valid_codes) """ - result = etf_data.copy() + # 检查数据格式 + if 'code' in etf_data.columns: + # 长格式数据 - 按 code 分别计算因子(旧逻辑,保留兼容) + all_factors = [] + valid_codes = [] - # 过滤掉缺失值过多的指数 - total_rows = len(result) - valid_codes = [] - for code in code_list: - if code not in result.columns: - print(f" ⚠ 跳过 {code}: 不在数据中") - continue - null_pct = result[code].isnull().sum() / total_rows - if null_pct > 0.2: - print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") - result = result.drop(columns=[code]) - else: + for code in code_list: + code_data = etf_data[etf_data['code'] == code].copy() + if len(code_data) == 0: + print(f" ⚠ 跳过 {code}: 不在数据中") + continue + + # 检查缺失值 + null_pct = code_data['close'].isnull().sum() / len(code_data) + if null_pct > 0.2: + print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") + continue + + # 按日期排序 + code_data = code_data.sort_index() + + # 计算日收益率和因子 + code_data[f"日收益率_{code}"] = calculate_daily_return(code_data['close']) + + if factor_type == "momentum": + code_data[f"得分_{code}"] = calculate_momentum(code_data['close'], n) + elif factor_type == "slope_r2": + code_data[f"得分_{code}"] = calculate_slope_r2(code_data['close'], n) + else: + raise ValueError(f"不支持的因子类型: {factor_type}") + + # 保留需要的列 + code_data = code_data[[f"日收益率_{code}", f"得分_{code}"]] + all_factors.append(code_data) valid_codes.append(code) - # 对有效指数计算因子 - for code in valid_codes: - result[f"日收益率_{code}"] = calculate_daily_return(result[code]) + if not all_factors: + raise ValueError("没有有效的指数数据") - if factor_type == "momentum": - result[f"得分_{code}"] = calculate_momentum(result[code], n) - elif factor_type == "slope_r2": - result[f"得分_{code}"] = calculate_slope_r2(result[code], n) - else: - raise ValueError(f"不支持的因子类型: {factor_type}") + # 合并所有因子的数据(按日期内连接 - 只保留所有指数都有数据的日期) + result = all_factors[0] + for df in all_factors[1:]: + result = result.join(df, how='inner') - # 按得分列做 dropna - score_cols = [f"得分_{code}" for code in valid_codes] - result = result.dropna(subset=score_cols) + # 删除所有得分都是 NaN 的行(即窗口期内的数据) + score_cols = [f"得分_{code}" for code in valid_codes] + # 只删除完全无法比较的行(所有得分都是NaN) + result = result.dropna(subset=score_cols, how='all') + + else: + # 宽格式数据(向后兼容) + result = etf_data.copy() + + # 过滤掉缺失值过多的指数 + total_rows = len(result) + valid_codes = [] + for code in code_list: + if code not in result.columns: + print(f" ⚠ 跳过 {code}: 不在数据中") + continue + null_pct = result[code].isnull().sum() / total_rows + if null_pct > 0.2: + print(f" ⚠ 剔除 {code}: 缺失率 {null_pct:.1%} 过高") + result = result.drop(columns=[code]) + else: + valid_codes.append(code) + + # 对有效指数计算因子 + for code in valid_codes: + result[f"日收益率_{code}"] = calculate_daily_return(result[code]) + + if factor_type == "momentum": + result[f"得分_{code}"] = calculate_momentum(result[code], n) + elif factor_type == "slope_r2": + result[f"得分_{code}"] = calculate_slope_r2(result[code], n) + else: + raise ValueError(f"不支持的因子类型: {factor_type}") + + # 按得分列做 dropna + score_cols = [f"得分_{code}" for code in valid_codes] + result = result.dropna(subset=score_cols) print("\n因子计算完成:") print(f" 因子类型: {factor_type}") diff --git a/strategies/rotation/engine.py b/strategies/rotation/engine.py index 8d8a970..6b7a8f9 100644 --- a/strategies/rotation/engine.py +++ b/strategies/rotation/engine.py @@ -2,6 +2,7 @@ ETF轮动策略引擎 整合信号生成和回测逻辑 +使用 YFinance 数据源(支持 SSH 隧道) """ import pandas as pd @@ -9,7 +10,7 @@ import numpy as np from typing import Optional from strategies.base import BacktestStrategy -from core.data.tushare_source import TushareDataSource +from core.data.hybrid_source import HybridDataSource from core.factors.momentum import compute_factors, calculate_daily_return @@ -18,7 +19,16 @@ class RotationStrategy(BacktestStrategy): def __init__(self, config: dict): super().__init__("ETF轮动策略", config) - self.data_source = TushareDataSource(use_cache=config.get("use_cache", True)) + + # 初始化混合数据源 + ssh_config = config.get("ssh_tunnel", {}) + self.data_source = HybridDataSource( + ssh_config=ssh_config, + use_cache=config.get("use_cache", True) + ) + print(f"使用混合数据源: Tushare(中国A股) + YFinance(港股/美股/加密货币)") + print(f"SSH隧道: {ssh_config.get('enabled', False)}") + self.data = None self.signals = None self.backtest_result = None @@ -30,12 +40,14 @@ class RotationStrategy(BacktestStrategy): # 从配置中读取基准代码,或使用默认值 benchmark_code = self.config.get("benchmark", {}).get("code", DEFAULT_BENCHMARK_CODE) - etf_data, benchmark_data, valid_codes = self.data_source.fetch_all( - self.config["code_list"], - benchmark_code, - self.config["start_date"], - self.config["end_date"], - ) + # 使用上下文管理器管理 SSH 隧道(如果是 YFinance 数据源) + with self.data_source: + etf_data, benchmark_data, valid_codes = self.data_source.fetch_all( + self.config["code_list"], + benchmark_code, + self.config["start_date"], + self.config["end_date"], + ) self.etf_data = etf_data self.benchmark_data = benchmark_data @@ -65,6 +77,9 @@ class RotationStrategy(BacktestStrategy): rebalance_threshold = self.config["rebalance_threshold"] # Step 1: 每日目标组合 + if not score_cols: + raise ValueError("没有有效的指数代码,无法生成信号") + if select_num == 1: daily_target = ( result[score_cols] @@ -74,7 +89,11 @@ class RotationStrategy(BacktestStrategy): else: def top_n_codes(row): scores = pd.to_numeric(row[score_cols], errors="coerce") - top = scores.nlargest(select_num).index.tolist() + # 过滤掉 NaN 值 + scores = scores.dropna() + if len(scores) == 0: + return "" + top = scores.nlargest(min(select_num, len(scores))).index.tolist() return ",".join([c.replace("得分_", "") for c in top]) daily_target = result.apply(top_n_codes, axis=1) @@ -216,7 +235,17 @@ class RotationStrategy(BacktestStrategy): result[f"净值_{code}"] = result[code] / first_price # 基准净值 - bench_ret = self.benchmark_data.pct_change().dropna() + # benchmark_data 是 DataFrame,需要提取 close 列 + if isinstance(self.benchmark_data, pd.DataFrame): + if 'close' in self.benchmark_data.columns: + bench_close = self.benchmark_data['close'] + else: + # 宽格式数据 + bench_close = self.benchmark_data.iloc[:, 0] + else: + bench_close = self.benchmark_data + + bench_ret = bench_close.pct_change().dropna() common_dates = result.index.intersection(bench_ret.index) bench_ret = bench_ret.loc[common_dates]