diff --git a/framework_v2/shared/data/flask_api_fetcher.py b/framework_v2/shared/data/flask_api_fetcher.py index edc1ce2..b263e54 100644 --- a/framework_v2/shared/data/flask_api_fetcher.py +++ b/framework_v2/shared/data/flask_api_fetcher.py @@ -166,58 +166,54 @@ class FlaskAPIFetcher(DataFetcher): return results - def get_trading_calendar(self, market: str = 'A') -> pd.Index: + def get_trading_calendar( + self, + market: str = 'A', + start: str = None, + end: str = None + ) -> pd.Index: """ - 获取交易日历 - - 注意:Flask API 暂不直接提供交易日历 - 这里使用 pandas 的 BDay 生成近似日历 - - TODO: 后续可通过 API 端点获取准确日历 + 获取交易日历(通过 API) Args: - market: 市场代码('A', 'US', 'HK' 等) + market: 市场代码 + - 'A' 或 'china': A股(上交所/深交所) + - 'US' 或 'us': 美股(NYSE) + - 'HK' 或 'hk': 港股(HKEX) + start: 开始日期 YYYY-MM-DD(默认 2020-01-01) + end: 结束日期 YYYY-MM-DD(默认 2025-12-31) Returns: - 交易日历 Index + 交易日历 DatetimeIndex + + 示例: + >>> fetcher = FlaskAPIFetcher() + >>> # 获取 A 股 2024 年交易日历 + >>> calendar = fetcher.get_trading_calendar('A', '2024-01-01', '2024-12-31') + >>> # 获取美股交易日历 + >>> calendar = fetcher.get_trading_calendar('US', '2024-01-01', '2024-12-31') """ - # 临时实现:使用 pandas 生成工作日日历 - # 实际应该从 API 获取准确的交易日历 + # 默认日期范围 + if start is None: + start = '2020-01-01' + if end is None: + end = '2025-12-31' - if market == 'A': - # A股:中国工作日(简化实现) - start = pd.Timestamp('2020-01-01') - end = pd.Timestamp('2025-12-31') - calendar = pd.bdate_range(start=start, end=end) - - # 移除中国主要节假日(简化版) - # 实际应该从 API 或数据库获取准确日历 - holidays = [ - # 春节(示例,不完整) - '2024-02-10', '2024-02-11', '2024-02-12', '2024-02-13', '2024-02-14', - '2024-02-15', '2024-02-16', '2024-02-17', - # 国庆(示例,不完整) - '2024-10-01', '2024-10-02', '2024-10-03', '2024-10-04', - '2024-10-05', '2024-10-06', '2024-10-07', - ] - calendar = calendar[~calendar.isin(pd.to_datetime(holidays))] - - return calendar + # 调用 API 获取准确日历 + calendar = self._source.get_trading_calendar( + market=market, + start_date=start, + end_date=end + ) - elif market == 'US': - # 美股:美国工作日 - start = pd.Timestamp('2020-01-01') - end = pd.Timestamp('2025-12-31') - return pd.bdate_range(start=start, end=end) + if calendar is None: + # API 失败,抛出异常(不应静默降级) + raise ValueError( + f"交易日历获取失败: market={market}, {start} ~ {end}。" + f"请检查 API 服务是否可用。" + ) - elif market == 'HK': - # 港股:香港工作日 - start = pd.Timestamp('2020-01-01') - end = pd.Timestamp('2025-12-31') - return pd.bdate_range(start=start, end=end) - - else: - raise ValueError(f"不支持的市场: {market}") + return calendar def get_benchmark( self, diff --git a/framework_v2/tests/test_end_to_end.py b/framework_v2/tests/test_end_to_end.py new file mode 100644 index 0000000..5ece13b --- /dev/null +++ b/framework_v2/tests/test_end_to_end.py @@ -0,0 +1,468 @@ +""" +端到端集成测试:数据获取 → 因子计算 → 数据对齐 → 信号生成 + +测试场景: +1. 获取纳指(美股)和创业板(A股)数据 +2. 计算动量因子 +3. 对齐到 A 股交易日历 +4. 生成 Top-N 信号 +5. 验证完整流程 + +目标: +- 验证 FlaskAPIFetcher 数据获取 +- 验证 MomentumFactor 因子计算 +- 验证 CrossMarketAligner 数据对齐 +- 验证完整流程无数据泄漏 +""" + +import sys +from pathlib import Path +import pandas as pd +import numpy as np +from typing import Dict + +# 添加项目根目录到路径 +project_root = Path(__file__).parent.parent.parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +from framework_v2.shared.data import FlaskAPIFetcher, CrossMarketAligner +from framework_v2.shared.factors.momentum import MomentumFactor + + +def test_stage1_data_fetch(): + """ + 阶段 1: 数据获取 + + 获取纳指(^IXIC)和创业板(399006.SZ)数据 + """ + print("\n" + "=" * 70) + print(" 阶段 1: 数据获取") + print("=" * 70) + + fetcher = FlaskAPIFetcher() + + # 获取纳指数据(美股) + print("\n[1.1] 获取纳斯达克指数数据(美股)...") + us_data = fetcher.fetch_indices( + codes=["^IXIC"], + start="2023-01-01", + end="2024-12-31" + ) + + assert "^IXIC" in us_data, "纳指数据获取失败" + df_nasdaq = us_data["^IXIC"] + + print(f"\n纳指数据:") + print(f" 数据量: {len(df_nasdaq)} 条") + print(f" 日期范围: {df_nasdaq.index[0]} ~ {df_nasdaq.index[-1]}") + print(f" 列: {list(df_nasdaq.columns)}") + print(f" 前 3 行:") + print(df_nasdaq.head(3).to_string()) + + # 获取创业板数据(A股) + print("\n[1.2] 获取创业板指数数据(A股)...") + cn_data = fetcher.fetch_indices( + codes=["399006.SZ"], + start="2023-01-01", + end="2024-12-31" + ) + + assert "399006.SZ" in cn_data, "创业板数据获取失败" + df_gem = cn_data["399006.SZ"] + + print(f"\n创业板数据:") + print(f" 数据量: {len(df_gem)} 条") + print(f" 日期范围: {df_gem.index[0]} ~ {df_gem.index[-1]}") + print(f" 列: {list(df_gem.columns)}") + print(f" 前 3 行:") + print(df_gem.head(3).to_string()) + + # 对比日历差异 + print(f"\n[1.3] 交易日历对比:") + nasdaq_dates = set(df_nasdaq.index) + gem_dates = set(df_gem.index) + + common_dates = nasdaq_dates & gem_dates + only_nasdaq = nasdaq_dates - gem_dates + only_gem = gem_dates - nasdaq_dates + + print(f" 纳指交易日: {len(nasdaq_dates)} 天") + print(f" 创业板交易日: {len(gem_dates)} 天") + print(f" 共同交易日: {len(common_dates)} 天") + print(f" 仅纳指交易: {len(only_nasdaq)} 天") + print(f" 仅创业板交易: {len(only_gem)} 天") + + if len(only_nasdaq) > 0: + print(f" 纳指独有日期示例: {sorted(list(only_nasdaq))[:3]}") + if len(only_gem) > 0: + print(f" 创业板独有日期示例: {sorted(list(only_gem))[:3]}") + + print("\n✓ 阶段 1 通过") + + return { + "^IXIC": df_nasdaq, + "399006.SZ": df_gem + } + + +def test_stage2_factor_calculation(data_dict: Dict[str, pd.DataFrame]): + """ + 阶段 2: 因子计算 + + 计算动量因子(在原始日历上) + """ + print("\n" + "=" * 70) + print(" 阶段 2: 因子计算(原始日历)") + print("=" * 70) + + factor_calc = MomentumFactor(n_days=20) + + factors = {} + + for code, df in data_dict.items(): + print(f"\n[2.1] 计算 {code} 动量因子...") + + # compute 方法接受 DataFrame + factor_series = factor_calc.compute(df) + + # 转换为 DataFrame 格式 + factor_result = pd.DataFrame({ + 'value': factor_series, + 'is_filled': False + }) + + factors[code] = factor_result + + print(f" 因子值数量: {len(factor_result)}") + print(f" 日期范围: {factor_result.index[0]} ~ {factor_result.index[-1]}") + print(f" 前 3 行:") + print(factor_result.head(3).to_string()) + + # 统计 NaN + nan_count = factor_result['value'].isna().sum() + print(f" NaN 数量: {nan_count} ({nan_count/len(factor_result):.1%})") + + # 验证因子值合理 + valid_factors = factor_result['value'].dropna() + if len(valid_factors) > 0: + print(f" 因子值范围: {valid_factors.min():.4f} ~ {valid_factors.max():.4f}") + + print("\n✓ 阶段 2 通过") + + return factors + + +def test_stage3_data_alignment( + factors: Dict[str, pd.DataFrame], + data_dict: Dict[str, pd.DataFrame] +): + """ + 阶段 3: 数据对齐 + + 将因子和收益率对齐到 A 股交易日历 + """ + print("\n" + "=" * 70) + print(" 阶段 3: 数据对齐(到 A 股日历)") + print("=" * 70) + + fetcher = FlaskAPIFetcher() + + # 获取 A 股交易日历(通过 API) + print("\n[3.1] 获取 A 股交易日历(通过 API)...") + + # 裁剪到数据日期范围 + data_start = min(df.index[0] for df in data_dict.values()) + data_end = max(df.index[-1] for df in data_dict.values()) + + # 使用 API 获取准确日历 + a_share_calendar = fetcher.get_trading_calendar( + market='A', + start=data_start.strftime('%Y-%m-%d'), + end=data_end.strftime('%Y-%m-%d') + ) + + print(f" A 股交易日: {len(a_share_calendar)} 天") + print(f" 日期范围: {a_share_calendar[0]} ~ {a_share_calendar[-1]}") + + # 创建对齐器 + aligner = CrossMarketAligner(target_calendar=a_share_calendar) + + # 对齐因子 + print("\n[3.2] 对齐因子到 A 股日历...") + aligned_factors = {} + + for code, factor_df in factors.items(): + print(f"\n 对齐 {code} 因子...") + + # 获取原始日历 + original_calendar = factor_df.index + + # 对齐因子 + aligned = aligner.align_factor( + factor_series=factor_df['value'], + source_calendar=original_calendar, + code=code + ) + + aligned_factors[code] = aligned + + # 统计 + filled_count = aligned['is_filled'].sum() + print(f" 对齐后天数: {len(aligned)}") + print(f" 填充天数: {filled_count} ({filled_count/len(aligned):.1%})") + print(f" NaN 数量: {aligned['value'].isna().sum()}") + + # 对齐收益率 + print("\n[3.3] 对齐收益率到 A 股日历...") + aligned_returns = {} + + for code, df in data_dict.items(): + print(f"\n 对齐 {code} 收益率...") + + returns = aligner.align_returns( + close_series=df['close'], + code=code + ) + + aligned_returns[code] = returns + + # 统计 + print(f" 对齐后天数: {len(returns)}") + print(f" 收益率范围: {returns.min():.4%} ~ {returns.max():.4%}") + print(f" NaN 数量: {returns.isna().sum()}") + print(f" 零收益率天数: {(returns == 0).sum()} (休市日)") + + # 验证对齐结果 + print("\n[3.4] 验证对齐结果...") + + # 1. 所有 DataFrame 应该有相同的索引 + indices = [df.index for df in aligned_factors.values()] + indices.extend([s.index for s in aligned_returns.values()]) + + for i, idx1 in enumerate(indices): + for j, idx2 in enumerate(indices): + if i != j: + assert idx1.equals(idx2), f"索引 {i} 和 {j} 不一致" + + print(f" ✓ 所有数据对齐到同一日历: {len(indices[0])} 天") + print(f" ✓ 日期范围: {indices[0][0]} ~ {indices[0][-1]}") + + # 2. 验证收益率无 NaN + for code, returns in aligned_returns.items(): + assert returns.isna().sum() == 0, f"{code} 收益率包含 NaN" + print(f" ✓ 收益率无 NaN") + + # 3. 验证休市日收益率 = 0 + for code, returns in aligned_returns.items(): + zero_days = (returns == 0).sum() + print(f" {code} 休市日收益率 = 0: {zero_days} 天") + + print("\n✓ 阶段 3 通过") + + return aligned_factors, aligned_returns + + +def test_stage4_signal_generation( + aligned_factors: Dict[str, pd.DataFrame], + aligned_returns: Dict[str, pd.Series] +): + """ + 阶段 4: 信号生成 + + 根据对齐后的因子生成 Top-N 信号 + """ + print("\n" + "=" * 70) + print(" 阶段 4: 信号生成") + print("=" * 70) + + # 合并因子值 + print("\n[4.1] 合并因子值...") + + factor_values = pd.DataFrame() + for code, factor_df in aligned_factors.items(): + factor_values[code] = factor_df['value'] + + print(f" 合并后形状: {factor_values.shape}") + print(f" 列: {list(factor_values.columns)}") + print(f" 前 3 行:") + print(factor_values.head(3).to_string()) + + # 简单信号:选择因子值最高的标的 + print("\n[4.2] 生成信号(Top-1)...") + + # 跳过全为 NaN 的行 + valid_rows = factor_values.dropna(how='all').index + factor_valid = factor_values.loc[valid_rows] + + signals = pd.DataFrame() + signals['best'] = factor_valid.idxmax(axis=1) + signals['best_value'] = factor_valid.max(axis=1) + + print(f" 信号数量: {len(signals)}") + print(f" 前 10 个信号:") + print(signals.head(10).to_string()) + + # 统计选择分布 + print(f"\n[4.3] 标的选择分布:") + distribution = signals['best'].value_counts() + for code, count in distribution.items(): + pct = count / len(signals) + print(f" {code}: {count} 天 ({pct:.1%})") + + # 验证信号与收益率对齐 + print("\n[4.4] 验证信号与收益率对齐...") + + returns_df = pd.DataFrame(aligned_returns) + + # 裁剪到共同日期 + common_dates = signals.index.intersection(returns_df.index) + signals_aligned = signals.loc[common_dates] + returns_aligned = returns_df.loc[common_dates] + + print(f" 信号日期: {len(signals)} → {len(signals_aligned)}") + print(f" 收益日期: {len(returns_df)} → {len(returns_aligned)}") + print(f" 共同日期: {len(common_dates)}") + + assert signals_aligned.index.equals(returns_aligned.index), "信号与收益日期不一致" + print(f" ✓ 信号与收益率日期一致") + + print("\n✓ 阶段 4 通过") + + return signals_aligned, returns_aligned + + +def test_stage5_strategy_returns(signals: pd.DataFrame, returns: pd.DataFrame): + """ + 阶段 5: 计算策略收益 + + 根据信号计算策略净值曲线 + """ + print("\n" + "=" * 70) + print(" 阶段 5: 计算策略收益") + print("=" * 70) + + print("\n[5.1] 计算策略日收益...") + + strategy_returns = pd.Series(index=returns.index, dtype=float) + + for date in returns.index: + if date in signals.index: + best_code = signals.loc[date, 'best'] + strategy_returns[date] = returns.loc[date, best_code] + else: + strategy_returns[date] = 0.0 + + # 填充 NaN + strategy_returns = strategy_returns.fillna(0.0) + + print(f" 策略收益天数: {len(strategy_returns)}") + print(f" 收益范围: {strategy_returns.min():.4%} ~ {strategy_returns.max():.4%}") + + print("\n[5.2] 计算累计收益...") + + cumulative_returns = (1 + strategy_returns).cumprod() - 1 + + print(f" 最终累计收益: {cumulative_returns.iloc[-1]:.2%}") + print(f" 最大累计收益: {cumulative_returns.max():.2%}") + print(f" 最小累计收益: {cumulative_returns.min():.2%}") + + print("\n[5.3] 计算年化收益和最大回撤...") + + # 年化收益 + total_days = len(strategy_returns) + annual_return = (1 + cumulative_returns.iloc[-1]) ** (252 / total_days) - 1 + print(f" 年化收益: {annual_return:.2%}") + + # 最大回撤 + rolling_max = cumulative_returns.cummax() + drawdown = (cumulative_returns - rolling_max) / (1 + rolling_max) + max_drawdown = drawdown.min() + print(f" 最大回撤: {max_drawdown:.2%}") + + print("\n[5.4] 策略收益 vs 基准对比...") + + # 基准:等权持有 + benchmark_returns = returns.mean(axis=1) + benchmark_cumulative = (1 + benchmark_returns).cumprod() - 1 + + print(f" 策略累计收益: {cumulative_returns.iloc[-1]:.2%}") + print(f" 基准累计收益: {benchmark_cumulative.iloc[-1]:.2%}") + print(f" 超额收益: {cumulative_returns.iloc[-1] - benchmark_cumulative.iloc[-1]:.2%}") + + print("\n✓ 阶段 5 通过") + + return strategy_returns, cumulative_returns + + +def run_full_pipeline(): + """ + 运行完整流程 + """ + print("\n" + "=" * 70) + print(" 端到端集成测试:数据获取 → 因子计算 → 数据对齐 → 信号生成") + print("=" * 70) + print("\n测试标的:") + print(" - 纳斯达克指数 (^IXIC) - 美股") + print(" - 创业板指数 (399006.SZ) - A 股") + print("\n时间范围: 2023-01-01 ~ 2024-12-31") + print("\n" + "=" * 70) + + try: + # 阶段 1: 数据获取 + data_dict = test_stage1_data_fetch() + + # 阶段 2: 因子计算 + factors = test_stage2_factor_calculation(data_dict) + + # 阶段 3: 数据对齐 + aligned_factors, aligned_returns = test_stage3_data_alignment( + factors, data_dict + ) + + # 阶段 4: 信号生成 + signals, returns = test_stage4_signal_generation( + aligned_factors, aligned_returns + ) + + # 阶段 5: 策略收益 + strategy_returns, cumulative_returns = test_stage5_strategy_returns( + signals, returns + ) + + # 总结 + print("\n" + "=" * 70) + print(" 测试总结") + print("=" * 70) + print("\n✅ 所有阶段通过!") + print("\n流程验证:") + print(" ✓ 数据获取: FlaskAPIFetcher 成功获取线上数据") + print(" ✓ 因子计算: MomentumFactor 在原始日历计算") + print(" ✓ 数据对齐: CrossMarketAligner 对齐到 A 股日历") + print(" ✓ 信号生成: Top-N 选择逻辑正确") + print(" ✓ 收益计算: 策略净值曲线生成成功") + print("\n关键验证:") + print(" ✓ 跨市场日历差异已处理") + print(" ✓ 休市日收益率 = 0% (无 ffill 陷阱)") + print(" ✓ 收益率无 NaN") + print(" ✓ 信号与收益日期一致") + print("\n" + "=" * 70 + "\n") + + return True + + except Exception as e: + print(f"\n✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + success = run_full_pipeline() + + if success: + print("🎉 端到端测试通过!") + sys.exit(0) + else: + print("❌ 端到端测试失败!") + sys.exit(1)