Compare commits

...

8 Commits

Author SHA1 Message Date
c0195c5bca refactor(tushare): 合并ETF复权方法,消除冗余设计
- 合并 fetch_etf_adj 和 _fetch_etf_adj 为单一方法
- 删除 _fetch_etf_qfq 转发方法
- 减少~26行代码,优化代码结构(从3个方法→1个方法)
- 保持公共接口签名不变,完全向后兼容
- 全面测试通过:raw/qfq/hfq三种模式数据正确
- 更新 VALID_ADJ_BY_TYPE 配置,ETF支持前复权/后复权
2026-05-25 19:59:49 +08:00
a62cfb4cd5 fix: 修复因子前向填充不生效的 bug(清理调试代码)
问题根因:
- pandas reindex(method='ffill') 只填充新增行的 NaN,不填充已存在的 NaN
- 当 factor_df 中已有境外市场放假日期的 NaN 值时,reindex 无法填充

修复方案:
- 改为两步操作:reindex() 然后 ffill()
- ffill() 会填充所有 NaN,包括已存在的

验证结果:
- 2026-04-30 HSI: None → 0.2388 
- 2026-04-30 GDAXI: None → 0.5647 
- 2026-05-08 HSI: None → 0.1144 
2026-05-25 19:16:14 +08:00
b8d433d519 fix: 修复因子前向填充不生效的 bug
问题根因:
- reindex(method='ffill') 不会填充已存在的 NaN 值
- 当 factor_df 中已有 NaN(境外市场放假),reindex 无法填充

修复方案:
- 改为两步操作:reindex() 然后 ffill()
- ffill() 会填充所有 NaN,包括已存在的

影响范围:
- rotation.py: positions 对齐到 A 股日历
- export_backtest_detail.py: 因子对齐到展示日历

验证结果:
- 2026-04-30 HSI: nan → 0.2388 
- 2026-05-08 HSI: nan → 0.1144 
2026-05-25 08:53:42 +08:00
2f6b031361 feat: 添加因子对齐调试输出
- 检查 factor_df 索引类型
- 检查 reindex 后 2026-04-30 的 HSI 动量值
- 待进一步分析为何 ffill 未生效
2026-05-25 02:45:27 +08:00
0ef0623538 fix: 导出脚本因子前向填充对齐到展示日历
- 先定义 common_dates = equity_curve.index
- 再执行 factor_df.reindex(common_dates, method='ffill')
- 修复境外市场放假时动量显示为 None 的问题
2026-05-25 02:30:58 +08:00
959a863b5e fix: 导出脚本因子对齐到A股日历
- 因子在原始数据上计算(正确)
- 导出时将因子前向填充到A股交易日历
- 修复境外市场放假时动量显示为None的问题
2026-05-25 02:07:18 +08:00
b89e975aed refactor: 删除 SimpleRotationStrategy 简化版
- 删除 simple.py(已被 GlobalRotationStrategy 替代)
- 删除 backtest_simple_rotation.py 回测脚本
- 删除 test_simple_rotation.py 测试脚本
- 更新 __init__.py 移除 SimpleRotationStrategy 导出
- 现在只保留 GlobalRotationStrategy 正式版
2026-05-25 01:33:23 +08:00
e8e4e9c3ac fix: GlobalRotationStrategy select_num 未生效
- 修复分散化选股逻辑:每个 group 选 Top 1 后,需要再按动量排序选 Top select_num
- 之前:所有 group 的 Top 1 都标记为信号(忽略 select_num)
- 现在:先从每个 group 选 Top 1,再从中按动量选 Top select_num 个
- 影响:配置 select_num=3 时,实际持仓 3 只而不是 4 只(group 数量)
2026-05-25 01:23:00 +08:00
8 changed files with 302 additions and 604 deletions

View File

@@ -121,23 +121,23 @@ class TushareSource:
code: ETF代码'159915.SZ', '518880.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
adj: 复权类型 'raw'(原始) / 'hfq'(后复权),默认 'raw'
adj: 复权类型 'raw'(原始) / 'qfq'(前复权) / 'hfq'(后复权),默认 'raw'
Returns:
DataFrame with columns: date, open, high, low, close, volume
adj='hfq' 时额外返回 adj_factor, close_hfq
adj='qfq''hfq' 时额外返回复权价格
DataFrame.attrs 附加元数据:
- attrs['nav']: 净值 DataFrame
- attrs['premium']: 溢价率 Series始终基于原始价格计算
"""
# 校验 adj 参数
if adj not in ['raw', 'hfq']:
raise ValueError(f"ETF 仅支持 adj='raw''hfq',当前: {adj}")
if adj not in ['raw', 'qfq', 'hfq']:
raise ValueError(f"ETF 仅支持 adj='raw', 'qfq''hfq',当前: {adj}")
# 1. 获取价格数据
if adj == 'hfq':
price_df = self._fetch_etf_hfq(code, start_date, end_date)
if adj in ['qfq', 'hfq']:
price_df = self.fetch_etf_adj(code, start_date, end_date, adj)
else:
price_df = self._fetch_etf_raw(code, start_date, end_date)
@@ -150,8 +150,12 @@ class TushareSource:
# 3. 计算溢价率(始终使用原始价格)
if nav_df is not None and len(nav_df) > 0:
# hfq 时需要获取原始价格来计算溢价率
price_for_premium = price_df if adj == 'raw' else self._fetch_etf_raw(code, start_date, end_date)
# qfq/hfq 时需要获取原始价格来计算溢价率
if adj == 'raw':
price_for_premium = price_df
else:
price_for_premium = self._fetch_etf_raw(code, start_date, end_date)
if price_for_premium is not None:
premium_series = self._calculate_premium_series(price_for_premium, nav_df)
price_df.attrs['premium'] = premium_series
@@ -390,7 +394,7 @@ class TushareSource:
def fetch_etf_adj(self, code: str, start_date: str, end_date: str, adj: str = 'hfq') -> Optional[pd.DataFrame]:
"""
获取 ETF 复权价格数据(公共接口)
获取 ETF 复权价格数据
自己实现复权计算(不使用 pro_bar避免 pandas 兼容性问题):
1. 使用 fund_daily() 获取原始价格
@@ -401,6 +405,8 @@ class TushareSource:
- 后复权 (hfq): close_hfq = close × adj_factor
- 前复权 (qfq): close_qfq = close × adj_factor / latest_factor
fund_adj 单次限 2000 条,按 5 年分段请求再拼接。
Args:
code: ETF代码'159915.SZ', '518880.SH'
start_date: 开始日期 'YYYY-MM-DD'
@@ -413,27 +419,6 @@ class TushareSource:
if adj not in ['qfq', 'hfq']:
raise ValueError(f"ETF adj 参数必须是 'qfq''hfq',当前: {adj}")
return self._fetch_etf_adj(code, start_date, end_date, adj)
def _fetch_etf_adj(self, code: str, start_date: str, end_date: str, adj: str = 'hfq') -> Optional[pd.DataFrame]:
"""
获取 ETF 复权价格数据(内部方法)
自己实现复权计算(不使用 pro_bar
1. 使用 fund_daily() 获取原始价格
2. 使用 fund_adj() 获取复权因子
3. 根据 adj 参数计算复权价格
fund_adj 单次限 2000 条,按 5 年分段请求再拼接。
Args:
code: ETF代码'159915.SZ', '518880.SH'
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
Returns:
DataFrame with columns: date, code, open, high, low, close, volume, adj_factor
"""
try:
pro = self._get_pro_api()
ts_code = code.replace('.SS', '.SH')

View File

@@ -1,98 +0,0 @@
"""
简单轮动策略回测脚本
测试场景:指数信号 → ETF收益
- 使用指数计算动量信号
- 使用 ETF 计算收益
"""
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from framework_v2.config import load_config
from framework_v2.strategies.rotation.simple import SimpleRotationStrategy
def run_backtest():
"""运行回测"""
print("=" * 70)
print(" ETF轮动策略回测V2 框架)")
print(" 场景:指数信号 → ETF收益复现 V1 结果")
print("=" * 70)
# 加载配置
config_file = project_root / "framework_v2" / "strategies" / "rotation" / "config_simple.yaml"
print(f"\n配置文件: {config_file}")
config = load_config(str(config_file))
# 打印配置摘要
print("\n" + "=" * 70)
print(" 配置摘要")
print("=" * 70)
print(f"策略名称: {config.metadata.strategy}")
print(f"回测区间: {config.backtest.start_date} ~ {config.backtest.end_date or '至今'}")
print(f"因子类型: {config.factor.type.value}")
print(f"动量窗口: {config.factor.n_days}")
print(f"选股数量: {config.rotation.select_num}")
# 打印资产池
print(f"\n资产池 ({config.asset_pools.count()} 个标的):")
for code, asset in config.asset_pools.assets.items():
print(f" {code}: {asset.name}")
print(f" 分组: {asset.group}")
print(f" 信号: {asset.signal_source}")
print(f" 交易: {asset.trade_source}")
print(f" 跨市场: {'' if asset.is_cross_market else ''}")
# 创建策略
print("\n" + "=" * 70)
print(" 运行回测...")
print("=" * 70)
strategy = SimpleRotationStrategy(config)
result = strategy.run()
# 打印结果
print("\n" + "=" * 70)
print(" 回测结果")
print("=" * 70)
metrics = result['metrics']
print(f"总收益: {metrics['total_return']:.2%}")
print(f"年化收益: {metrics['annual_return']:.2%}")
print(f"最大回撤: {metrics['max_drawdown']:.2%}")
print(f"夏普比率: {metrics['sharpe_ratio']:.2f}")
print(f"交易天数: {metrics['n_days']}")
# 打印净值曲线
equity_curve = result['equity_curve']
print(f"\n净值曲线:")
print(f" 起始净值: {equity_curve.iloc[0]:.4f}")
print(f" 结束净值: {equity_curve.iloc[-1]:.4f}")
print(f" 数据点数: {len(equity_curve)}")
# 保存结果
output_dir = project_root / "framework_v2" / "results"
output_dir.mkdir(exist_ok=True)
# 保存净值曲线
equity_curve.to_csv(output_dir / "simple_rotation_equity.csv")
print(f"\n净值曲线已保存: {output_dir / 'simple_rotation_equity.csv'}")
# 保存持仓记录
positions = result['positions']
positions.to_csv(output_dir / "simple_rotation_positions.csv")
print(f"持仓记录已保存: {output_dir / 'simple_rotation_positions.csv'}")
print("\n" + "=" * 70)
print(" 回测完成!")
print("=" * 70)
if __name__ == "__main__":
run_backtest()

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""
将 V2 回测细节 JSON 转换为 HTML viewer 需要的 CSV 格式。
用途:
- 适配 backtest_viewer.html 的 CSV 格式要求
- 生成 _detail.csv、_close.csv、_holdings.csv、_code_info.csv
用法:
python framework_v2/scripts/convert_to_viewer_csv.py
"""
import sys
import json
from pathlib import Path
import pandas as pd
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
def safe_float(val, decimals=4):
"""安全转换浮点数"""
if val is None:
return None
try:
return round(float(val), decimals)
except (ValueError, TypeError):
return None
def convert_v2_to_viewer_csv(json_path=None):
"""将 V2 JSON 转换为 V1 CSV 格式"""
# 1. 加载 JSON
if json_path is None:
json_path = project_root / 'framework_v2' / 'results' / 'backtest_detail_v2.json'
print(f"加载 V2 JSON: {json_path}")
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
meta = data['meta']
days = data['days']
print(f" 天数: {len(days)}")
print(f" 标的: {len(meta['codes'])}")
# 2. 生成 _detail.csv
print("\n生成 _detail.csv...")
detail_rows = []
for day in days:
row = {
'日期': day['date'],
'净值': day['nav'],
'日收益': day['daily_return'],
'持仓标的': ','.join(day['holdings']) if day['holdings'] else ''
}
# 添加每个标的的动量
for code, asset in day['assets'].items():
row[f'动量_{code}'] = asset.get('momentum')
detail_rows.append(row)
detail_df = pd.DataFrame(detail_rows)
detail_path = json_path.parent / 'backtest_detail_v2_detail.csv'
detail_df.to_csv(detail_path, index=False, encoding='utf-8-sig')
print(f"{detail_path.name} ({len(detail_df)} 行)")
# 3. 生成 _close.csv
print("\n生成 _close.csv...")
close_rows = []
for day in days:
row = {'日期': day['date']}
for code, asset in day['assets'].items():
# ETF 收盘价
etf_close = asset.get('etf_close')
row[f'收盘价_{code}'] = etf_close
close_rows.append(row)
close_df = pd.DataFrame(close_rows)
close_path = json_path.parent / 'backtest_detail_v2_close.csv'
close_df.to_csv(close_path, index=False, encoding='utf-8-sig')
print(f"{close_path.name} ({len(close_df)} 行)")
# 4. 生成 _holdings.csv
print("\n生成 _holdings.csv...")
holdings_rows = []
for day in days:
if not day['holdings']:
continue
for code in day['holdings']:
asset = day['assets'].get(code, {})
code_meta = meta['codes'].get(code, {})
row = {
'日期': day['date'],
'标的代码': code,
'标的名称': code_meta.get('name', code),
'大类': code_meta.get('group', ''),
'进场日期': asset.get('entry_date'),
'进场价': asset.get('entry_price'),
'当前价': asset.get('etf_close'),
'持仓天数': asset.get('holding_days', 0),
'持仓比例': asset.get('weight', 0),
'累计收益': asset.get('cum_return')
}
holdings_rows.append(row)
holdings_df = pd.DataFrame(holdings_rows)
holdings_path = json_path.parent / 'backtest_detail_v2_holdings.csv'
holdings_df.to_csv(holdings_path, index=False, encoding='utf-8-sig')
print(f"{holdings_path.name} ({len(holdings_df)} 行)")
# 5. 生成 _code_info.csv
print("\n生成 _code_info.csv...")
code_info_rows = []
for code, meta_info in meta['codes'].items():
row = {
'代码': code,
'名称': meta_info.get('name', code),
'大类': meta_info.get('group', '')
}
code_info_rows.append(row)
code_info_df = pd.DataFrame(code_info_rows)
code_info_path = json_path.parent / 'backtest_detail_v2_code_info.csv'
code_info_df.to_csv(code_info_path, index=False, encoding='utf-8-sig')
print(f"{code_info_path.name} ({len(code_info_df)} 行)")
# 6. 汇总
print("\n" + "=" * 80)
print("转换完成!")
print("=" * 80)
print(f"\n输出文件:")
print(f" {detail_path.name}")
print(f" {close_path.name}")
print(f" {holdings_path.name}")
print(f" {code_info_path.name}")
print(f"\n使用方法:")
print(f" 1. 打开 visualization/backtest_viewer.html")
print(f" 2. 选择 'V2 GlobalRotationStrategy (最新)'")
print(f" 3. 自动加载 CSV 文件")
if __name__ == '__main__':
convert_v2_to_viewer_csv()

View File

@@ -87,6 +87,24 @@ def main():
trading_calendar = strategy._get_trading_calendar()
print(f" A 股交易日: {len(trading_calendar)}")
# 准备收盘价和溢价率数据
print("[7.5] 准备价格和溢价率数据...")
index_close_dict = {} # 指数收盘价
etf_close_dict = {} # ETF 收盘价
etf_premium_dict = {} # ETF 溢价率(需要从 API 获取)
for signal_code, trade_code in signal_to_trade.items():
# 指数收盘价
if signal_code in data:
index_close_dict[signal_code] = data[signal_code]['close']
# ETF 收盘价
if trade_code in data:
etf_close_dict[signal_code] = data[trade_code]['close'] # 注意:用 signal_code 作为键
# 溢价率暂时设为 None需要额外 API 支持)
# TODO: 接入 ETF 净值数据计算溢价率
# 创建对齐器
aligner = CrossMarketAligner(target_calendar=trading_calendar)
@@ -119,15 +137,27 @@ def main():
# 9. 构建逐日明细
print("[9] 构建逐日明细...")
# 获取展示日历
common_dates = equity_curve.index
# 因子数据DataFrame 格式)
factor_df = pd.DataFrame(factors)
# 确保索引是 DatetimeIndex
if not isinstance(factor_df.index, pd.DatetimeIndex):
factor_df.index = pd.to_datetime(factor_df.index)
# 将因子对齐到实际展示日历(前向填充)
# 因子已经在原始数据上计算完成,这里只是将结果对齐到展示日历
# 注意:必须先 reindex 再 ffill因为 reindex(method='ffill') 不会填充已有的 NaN
factor_df_aligned = factor_df.reindex(common_dates)
factor_df_aligned = factor_df_aligned.ffill()
# 持仓状态跟踪
holdings_state = {} # {code: {'entry_date': str, 'entry_price': float}}
prev_holdings = set()
days_list = []
common_dates = equity_curve.index
# 获取配置信息
bond_code = strategy.bond_code if strategy.use_dynamic_threshold else None
@@ -159,11 +189,11 @@ def main():
'entry_price': entry_price,
}
# 动态阈值
# 动态阈值(使用对齐后的因子)
factor_scores = {}
if date in factor_df.index:
for code in factor_df.columns:
v = factor_df.loc[date, code]
if date in factor_df_aligned.index:
for code in factor_df_aligned.columns:
v = factor_df_aligned.loc[date, code]
if pd.notna(v):
factor_scores[code] = float(v)
@@ -192,6 +222,25 @@ def main():
assets = {}
all_codes = factor_df.columns.tolist()
# 对齐价格到 A 股日历
index_close_aligned = {}
etf_close_aligned = {}
for code in all_codes:
if code in index_close_dict:
index_close_aligned[code] = index_close_dict[code].reindex(common_dates, method='ffill')
if code in etf_close_dict:
etf_close_aligned[code] = etf_close_dict[code].reindex(common_dates, method='ffill')
# 计算指数和 ETF 收益率
index_returns = {}
etf_returns = {}
for code in all_codes:
if code in index_close_aligned:
index_returns[code] = index_close_aligned[code].pct_change(fill_method=None)
if code in etf_close_aligned:
etf_returns[code] = etf_close_aligned[code].pct_change(fill_method=None)
for code in all_codes:
asset = {}
@@ -206,49 +255,75 @@ def main():
asset['threshold'] = safe_val(threshold, 4)
asset['above_threshold'] = mom >= threshold if mom is not None else False
# 指数价格
if code in index_close_aligned:
idx_close = index_close_aligned[code].get(date)
asset['index_close'] = safe_val(idx_close, 2) if pd.notna(idx_close) else None
else:
asset['index_close'] = None
# ETF 价格
if code in etf_close_aligned:
etf_close = etf_close_aligned[code].get(date)
asset['etf_close'] = safe_val(etf_close, 3) if pd.notna(etf_close) else None
else:
asset['etf_close'] = None
# 指数收益率
if code in index_returns:
idx_ret = index_returns[code].get(date)
asset['index_return'] = safe_val(idx_ret, 6) if pd.notna(idx_ret) else None
else:
asset['index_return'] = None
# ETF 收益率(兼容 V1 命名etf_return_ctc
if code in etf_returns:
etf_ret = etf_returns[code].get(date)
asset['etf_return_ctc'] = safe_val(etf_ret, 6) if pd.notna(etf_ret) else None
else:
asset['etf_return_ctc'] = None
# 溢价率(暂时为 None
asset['premium'] = None
# 持仓状态
is_held = code in current_holdings
asset['is_held'] = is_held
asset['weight'] = safe_val(pos_row.get(code, 0), 4) if is_held else 0.0
if is_held and code in holdings_state:
hs = holdings_state[code]
asset['entry_date'] = hs['entry_date']
asset['entry_price'] = safe_val(hs['entry_price'], 4)
asset['entry_price_etf'] = safe_val(hs['entry_price'], 4)
asset['entry_price_idx'] = None # V2 暂不记录指数进场价
entry_dt = pd.Timestamp(hs['entry_date'])
trading_days_held = len(common_dates[(common_dates >= entry_dt) & (common_dates <= date)])
asset['holding_days'] = trading_days_held
# 累计收益
# 累计收益(区分 ETF 和指数,兼容 V1
if hs['entry_price'] and hs['entry_price'] > 0:
if code in close_dict:
cur = close_dict[code].get(date)
if cur and pd.notna(cur):
asset['cum_return'] = safe_val(float(cur) / hs['entry_price'] - 1, 4)
cum_ret = float(cur) / hs['entry_price'] - 1
asset['cum_return_etf'] = safe_val(cum_ret, 4)
asset['cum_return_idx'] = safe_val(cum_ret, 4) # V2 暂不区分
else:
asset['cum_return'] = None
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return'] = None
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['cum_return'] = None
# 当日收益贡献
if code in returns_df.columns:
asset['daily_return'] = safe_val(returns_df.loc[date, code], 6)
asset['return_contribution'] = safe_val(
pos_row.get(code, 0) * returns_df.loc[date, code], 6
)
else:
asset['daily_return'] = None
asset['return_contribution'] = None
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
else:
asset['entry_date'] = None
asset['entry_price'] = None
asset['entry_price_etf'] = None
asset['entry_price_idx'] = None
asset['holding_days'] = 0
asset['cum_return'] = None
asset['daily_return'] = None
asset['return_contribution'] = None
asset['cum_return_etf'] = None
asset['cum_return_idx'] = None
assets[code] = asset
@@ -269,32 +344,30 @@ def main():
days_list.append(day_record)
prev_holdings = current_holdings
# 10. 构建元数据
# 10. 构建元数据(兼容 V1 格式)
codes_meta = {}
for signal_code, asset in config.asset_pools.assets.items():
codes_meta[signal_code] = {
'name': asset.name,
'etf': asset.trade_source,
'group': asset.group
for code in all_codes:
asset_config = config.asset_pools.assets.get(code)
codes_meta[code] = {
'name': asset_config.name if asset_config else code,
'etf': asset_config.trade_source if asset_config else None,
'market': asset_config.group if asset_config else None # V1 使用 market 字段
}
output = {
'meta': {
'version': 'V2',
'strategy': 'GlobalRotationStrategy',
'mode': '指数信号 + ETF收益',
'mode': 'V2: 指数信号 + ETF收益',
'start_date': common_dates[0].strftime('%Y-%m-%d'),
'end_date': common_dates[-1].strftime('%Y-%m-%d'),
'total_days': len(common_dates),
'select_num': strategy.select_num,
'n_days': config.factor.n_days,
'trade_cost': strategy.trade_cost,
'dynamic_threshold': {
'bond_threshold': {
'enabled': strategy.use_dynamic_threshold,
'bond_code': bond_code,
'ratio': bond_ratio
},
'diversified': strategy.diversified,
'rebalance_count': int(rebalance_count),
'final_nav': safe_val(equity_curve.iloc[-1], 4),
'codes': codes_meta
},
'days': days_list

View File

@@ -2,7 +2,6 @@
轮动策略模块
"""
from framework_v2.strategies.rotation.simple import SimpleRotationStrategy
from framework_v2.strategies.rotation.rotation import GlobalRotationStrategy
__all__ = ['SimpleRotationStrategy', 'GlobalRotationStrategy']
__all__ = ['GlobalRotationStrategy']

View File

@@ -229,9 +229,20 @@ class GlobalRotationStrategy(StrategyBase):
top_code = date_factors.idxmax()
selected_codes.append(top_code)
# 标记信号
# 第二步:从所有 group 的 Top 1 中,按动量再选 Top select_num 个
if selected_codes:
signals.loc[date, selected_codes] = 1
# 获取这些标的的当日因子值
candidate_factors = factor_df.loc[date][selected_codes].dropna()
if not candidate_factors.empty:
# 按动量排序,选 Top select_num
if len(candidate_factors) > self.select_num:
final_selected = candidate_factors.nlargest(self.select_num).index.tolist()
else:
final_selected = candidate_factors.index.tolist()
# 标记信号
signals.loc[date, final_selected] = 1
return signals.astype(int)
@@ -315,7 +326,9 @@ class GlobalRotationStrategy(StrategyBase):
print(f" [对齐] 收益率数据: {len(returns_df)} 天, {len(returns_df.columns)} 个标的")
# 对齐 positions 到 A 股日历
positions = positions.reindex(trading_calendar, method='ffill')
# 注意:必须先 reindex 再 ffill因为 reindex(method='ffill') 不会填充已有的 NaN
positions = positions.reindex(trading_calendar)
positions = positions.ffill()
# 计算策略收益仓位加权T+1 执行)
positions_delayed = positions.shift(1).fillna(0)

View File

@@ -1,304 +0,0 @@
"""
简单轮动策略
基于动量因子的 ETF 轮动策略
- 计算各标的动量得分
- 选择 Top-N 标的
- 等权分配仓位
"""
import pandas as pd
import numpy as np
from typing import Dict
from framework_v2.core.strategy import StrategyBase
from framework_v2.config.schemas import StrategyConfig
from framework_v2.shared.factors import MomentumFactor
class SimpleRotationStrategy(StrategyBase):
"""
简单轮动策略
策略逻辑:
1. 计算各标的动量得分(加权线性回归)
2. 选择得分最高的 Top-N 标的
3. 等权分配仓位
示例:
from framework_v2.config import load_config
from framework_v2.strategies.rotation.simple import SimpleRotationStrategy
config = load_config('rotation_simple.yaml')
strategy = SimpleRotationStrategy(config)
result = strategy.run()
"""
def __init__(self, config: StrategyConfig):
"""
初始化策略
Args:
config: 策略配置
"""
super().__init__(config)
# 初始化动量因子
self.momentum = MomentumFactor(
n_days=config.factor.n_days,
weighted=(config.factor.type.value == 'weighted_momentum')
)
# 策略参数
self.select_num = config.rotation.select_num if config.rotation else 3
self.min_score = config.rotation.threshold.fixed_value if config.rotation else 0.0
def get_codes(self) -> list:
"""
获取标的列表(信号标的 + 交易标的)
返回所有需要的数据标的:
- signal_source: 用于计算因子和信号
- trade_source: 用于计算收益
"""
codes = set()
# 添加所有信号标的
codes.update(self.config.asset_pools.get_signal_codes())
# 添加所有交易标的
codes.update(self.config.asset_pools.get_trade_codes())
return list(codes)
def compute_factors(self, data: Dict[str, pd.DataFrame]) -> Dict[str, pd.Series]:
"""
计算动量因子(只使用信号标的的数据)
Args:
data: 数据字典 {code: DataFrame}(包含 signal_source 和 trade_source
Returns:
因子字典 {signal_source: Series}
"""
factors = {}
# 只使用信号标的计算因子
signal_codes = self.config.asset_pools.get_signal_codes()
for code in signal_codes:
if code not in data:
print(f" 警告: {code} 数据不存在,跳过")
continue
try:
df = data[code]
# 计算动量得分(使用信号标的的数据)
factor_values = self.momentum.compute(df)
factors[code] = factor_values
except Exception as e:
print(f" 警告: {code} 因子计算失败 - {e}")
continue
return factors
def generate_signals(self, factors: Dict[str, pd.Series]) -> pd.DataFrame:
"""
生成轮动信号
逻辑:
1. 每个交易日选择动量得分最高的 Top-N 标的
2. 过滤得分低于阈值的标的
Args:
factors: 因子字典 {code: Series}
Returns:
信号 DataFrameindex=日期, columns=标的, values=1或0
"""
if not factors:
return pd.DataFrame()
# 对齐所有因子的日期
factor_df = pd.DataFrame(factors)
# 生成信号
signals = pd.DataFrame(index=factor_df.index, columns=factor_df.columns, data=0)
for date in factor_df.index:
# 获取当日因子值
scores = factor_df.loc[date].dropna()
if scores.empty:
continue
# 过滤低分标的
if self.min_score > 0:
scores = scores[scores >= self.min_score]
# 选择 Top-N
if len(scores) > self.select_num:
top_codes = scores.nlargest(self.select_num).index
else:
top_codes = scores.index
# 标记信号
signals.loc[date, top_codes] = 1
return signals.astype(int)
def manage_positions(self, signals: pd.DataFrame) -> pd.DataFrame:
"""
仓位管理(等权分配)
Args:
signals: 信号 DataFrame
Returns:
仓位 DataFrame包含 'weight' 列)
"""
positions = signals.astype(float).copy()
# 计算每个日期的权重
for date in positions.index:
signal_row = positions.loc[date]
n_selected = signal_row.sum()
if n_selected > 0:
# 等权分配
positions.loc[date] = signal_row / n_selected
else:
# 空仓
positions.loc[date] = 0
return positions
def _get_trading_calendar(self) -> pd.DatetimeIndex:
"""
获取 A 股交易日历
Returns:
A 股交易日历 DatetimeIndex
"""
from datetime import date
# 获取回测区间
start = self.config.backtest.start_date
end = self.config.backtest.end_date
if end is None:
end = date.today().strftime('%Y-%m-%d')
# 创建临时数据获取器来获取交易日历
if self._data_fetcher is None:
self._data_fetcher = self._create_data_fetcher()
try:
# 调用 get_trading_calendar 方法
calendar = self._data_fetcher.get_trading_calendar(
market='A',
start=start,
end=end
)
print(f" [日历] A 股交易日: {len(calendar)} 天 ({calendar[0]} ~ {calendar[-1]})")
return calendar
except Exception as e:
print(f" [警告] 无法获取 A 股交易日历,使用所有日期: {e}")
# 降级方案:使用 pandas 生成工作日
from pandas.tseries.offsets import BDay
start_dt = pd.Timestamp(start)
end_dt = pd.Timestamp(end)
return pd.date_range(start=start_dt, end=end_dt, freq='B') # 工作日
def _execute_backtest(self, positions: pd.DataFrame, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
"""
执行回测
核心逻辑:
1. 使用 signal_source 计算信号positions 的 columns 是 signal_source
2. 使用 trade_source 计算收益(通过 signal→trade 映射)
3. T+1 执行:今天的信号明天生效
4. 过滤非交易日:只保留 A 股交易日
Args:
positions: 仓位 DataFramecolumns=signal_source
data: 数据字典 {code: DataFrame}(包含 signal_source 和 trade_source
Returns:
回测结果字典
"""
# 获取信号→交易映射
signal_to_trade = self.config.asset_pools.get_signal_to_trade_mapping()
# 提取交易标的的收盘价
close_prices = {}
for signal_code, trade_code in signal_to_trade.items():
if trade_code in data:
# 使用交易标的的数据计算收益
close_prices[signal_code] = data[trade_code]['close']
else:
print(f" 警告: {trade_code} 数据不存在,跳过")
close_df = pd.DataFrame(close_prices)
# 计算收益率
returns = close_df.pct_change()
# 获取 A 股交易日历并过滤
print("\n [过滤] 获取 A 股交易日历...")
trading_calendar = self._get_trading_calendar()
# 过滤到 A 股交易日
original_days = len(returns)
returns = returns[returns.index.isin(trading_calendar)]
positions = positions[positions.index.isin(trading_calendar)]
filtered_days = len(returns)
print(f" [过滤] 原始数据: {original_days} 天 -> A 股交易日: {filtered_days} 天 (过滤 {original_days - filtered_days} 天)")
# 计算策略收益(仓位加权)
# 注意T+1 执行,今天的信号明天生效
positions_delayed = positions.shift(1).fillna(0)
strategy_returns = (positions_delayed * returns).sum(axis=1)
# 计算净值曲线
equity_curve = (1 + strategy_returns).cumprod()
# 检查是否有数据
if len(equity_curve) == 0:
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': 0,
'annual_return': 0,
'max_drawdown': 0,
'sharpe_ratio': 0,
'n_days': 0,
}
}
# 计算绩效指标
total_return = equity_curve.iloc[-1] / equity_curve.iloc[0] - 1
n_days = len(strategy_returns)
annual_return = (1 + total_return) ** (252 / n_days) - 1 if n_days > 0 else 0
# 最大回撤
cumulative_max = equity_curve.cummax()
drawdown = (equity_curve - cumulative_max) / cumulative_max
max_drawdown = drawdown.min()
# 夏普比率
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252) if strategy_returns.std() > 0 else 0
return {
'equity_curve': equity_curve,
'strategy_returns': strategy_returns,
'positions': positions,
'metrics': {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'n_days': n_days,
}
}

View File

@@ -1,128 +0,0 @@
"""
测试简单轮动策略
验证完整流程:
1. 配置加载
2. 策略初始化
3. 数据获取
4. 因子计算
5. 信号生成
6. 回测执行
"""
import sys
from pathlib import Path
import os
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from framework_v2.config import load_config
from framework_v2.strategies.rotation.simple import SimpleRotationStrategy
def test_simple_rotation():
"""测试简单轮动策略完整流程"""
print("\n" + "=" * 70)
print(" 简单轮动策略端到端测试")
print("=" * 70)
# 设置环境变量
os.environ['FLASK_API_URL'] = 'https://k3s.tokenpluse.xyz'
# 1. 加载配置
print("\n[1/6] 加载配置...")
config_path = Path(__file__).parent.parent / 'strategies' / 'rotation' / 'config_simple.yaml'
config = load_config(str(config_path))
print(f" ✓ 配置加载成功")
print(f" 策略: {config.metadata.strategy}")
print(f" 标的: {list(config.asset_pools.equity.keys())}")
print(f" 回测: {config.backtest.start_date} ~ {config.backtest.end_date}")
# 2. 初始化策略
print("\n[2/6] 初始化策略...")
strategy = SimpleRotationStrategy(config)
print(f" ✓ 策略初始化成功")
print(f" 名称: {strategy.name}")
print(f" 动量窗口: {config.factor.n_days}")
print(f" 选股数量: {strategy.select_num}")
# 3. 获取数据
print("\n[3/6] 获取数据...")
codes = strategy.get_codes()
print(f" 标的列表: {codes}")
data = strategy.get_data()
print(f" ✓ 获取 {len(data)} 个标的")
for code, df in data.items():
print(f" {code}: {len(df)} 天 ({df.index[0].date()} ~ {df.index[-1].date()})")
# 4. 计算因子
print("\n[4/6] 计算因子...")
factors = strategy.compute_factors(data)
print(f" ✓ 计算 {len(factors)} 个因子")
for code, factor in factors.items():
print(f" {code}: {len(factor)} 值, 范围 [{factor.min():.4f}, {factor.max():.4f}]")
# 5. 生成信号
print("\n[5/6] 生成信号...")
signals = strategy.generate_signals(factors)
n_signals = signals.sum().sum()
print(f" ✓ 生成 {signals.shape[0]} 个交易日信号")
print(f" 总信号数: {n_signals}")
print(f" 平均每日持仓: {signals.mean().mean():.2%}")
# 6. 仓位管理
print("\n[6/6] 仓位管理...")
positions = strategy.manage_positions(signals)
print(f" ✓ 仓位分配完成")
print(f" 权重和: {positions.sum(axis=1).mean():.2%}")
# 7. 执行回测
print("\n执行回测...")
result = strategy._execute_backtest(positions, data)
# 打印结果
print("\n" + "=" * 70)
print(" 回测结果")
print("=" * 70)
metrics = result['metrics']
print(f"\n 总收益率: {metrics['total_return']:.2%}")
print(f" 年化收益: {metrics['annual_return']:.2%}")
print(f" 最大回撤: {metrics['max_drawdown']:.2%}")
print(f" 夏普比率: {metrics['sharpe_ratio']:.2f}")
print(f" 交易天数: {metrics['n_days']}")
# 验证结果
print("\n" + "=" * 70)
print(" 验证")
print("=" * 70)
assert metrics['total_return'] != 0, "总收益率不应为 0"
print(" ✓ 总收益率有效")
assert len(result['equity_curve']) > 0, "净值曲线不应为空"
print(" ✓ 净值曲线有效")
assert positions.sum(axis=1).max() <= 1.01, "权重和不应超过 100%"
print(" ✓ 仓位权重有效")
print("\n" + "=" * 70)
print(" ✓ 所有测试通过")
print("=" * 70 + "\n")
return result
if __name__ == "__main__":
try:
result = test_simple_rotation()
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)