diff --git a/config/strategies/rotation.yaml b/config/strategies/rotation.yaml index ce7fc8a..b4349ba 100644 --- a/config/strategies/rotation.yaml +++ b/config/strategies/rotation.yaml @@ -32,9 +32,9 @@ code_list: "399702.SZ": "国债指数" # 全球市场指数 (使用 YFinance) - 非主市场,数据会前向填充到A股交易日 "HSTECH": "恒生科技" # 港股 - "NDX": "纳斯达克100" # 美股 - "BTC": "比特币" # 加密货币 - "ETH": "以太坊" # 加密货币 + "NDX": "纳指100" # 美股 + "BTC": "BTC" # 加密货币 + "ETH": "ETH" # 加密货币 # 主市场配置(用于确定交易日历) primary_market: @@ -47,7 +47,7 @@ benchmark: name: "沪深300指数" # ==================== 回测参数 ==================== -start_date: "2022-01-01" +start_date: "2020-01-01" # end_date: "2025-03-17" # ==================== 因子参数 ==================== @@ -79,5 +79,5 @@ ssh_tunnel: host: "8.218.167.69" # SSH 服务器地址(阿里云香港 ECS IP) port: 22 # SSH 端口 username: "root" # SSH 用户名 - key_path: "/Users/aszer/Documents/vscode/etf/hk_ecs.pem" # SSH 私钥路径 + key_path: "hk_ecs.pem" # SSH 私钥路径(相对于项目根目录) local_port: 1080 # 本地 SOCKS5 代理端口 diff --git a/scripts/daily_scheduler.py b/scripts/daily_scheduler.py index 2bbfa5f..4b054a6 100644 --- a/scripts/daily_scheduler.py +++ b/scripts/daily_scheduler.py @@ -18,7 +18,7 @@ import os import time import argparse import subprocess -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path # 添加项目根目录到路径 @@ -29,6 +29,7 @@ from dotenv import load_dotenv load_dotenv(project_root / ".env") from loguru import logger +import schedule import tushare as ts from core.common.notify import DingTalkBot from core.common.oss_utils import upload_image_to_oss @@ -182,36 +183,23 @@ def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool: return False -def wait_until_target_time(target_time: str = "15:00"): +def setup_schedule(target_time: str = "15:30", config_path: str = "config/strategies/rotation.yaml"): """ - 等待直到目标时间 + 设置定时任务 Args: - target_time: 目标时间 (HH:MM) + target_time: 执行时间 (HH:MM) + config_path: 配置文件路径 """ - while True: - now = datetime.now() - target = now.replace( - hour=int(target_time.split(":")[0]), - minute=int(target_time.split(":")[1]), - second=0, - microsecond=0 - ) + logger.info(f"设置定时任务: 每天 {target_time} 执行") - # 如果目标时间已过,等到明天 - if target < now: - target += timedelta(days=1) - logger.info(f"目标时间已过,等到明天 {target_time}") + # 清除所有现有任务 + schedule.clear() - wait_seconds = (target - now).total_seconds() + # 添加每日任务 + schedule.every().day.at(target_time).do(daily_task, config_path) - if wait_seconds > 60: - logger.info(f"等待 {target_time},还需 {wait_seconds/60:.0f} 分钟...") - time.sleep(60) # 每分钟检查一次 - else: - logger.info(f"即将到达目标时间,等待 {wait_seconds:.0f} 秒...") - time.sleep(wait_seconds) - break + logger.info("定时任务设置完成,等待执行...") def daily_task(config_path: str = "config/strategies/rotation.yaml"): @@ -252,13 +240,24 @@ def daily_task(config_path: str = "config/strategies/rotation.yaml"): logger.info("每日任务完成") +def run_scheduler_loop(): + """运行调度器循环""" + logger.info("启动调度器循环,按 Ctrl+C 停止") + try: + while True: + schedule.run_pending() + time.sleep(1) + except KeyboardInterrupt: + logger.info("调度器已停止") + + def main(): parser = argparse.ArgumentParser(description="ETF策略每日定时任务") parser.add_argument( "--time", type=str, default="15:30", - help="执行时间 (HH:MM),默认15:00", + help="执行时间 (HH:MM),默认15:30", ) parser.add_argument( "--config", @@ -269,12 +268,12 @@ def main(): parser.add_argument( "--run-now", action="store_true", - help="立即执行一次(不等待指定时间)", + help="立即执行一次(不启动定时任务)", ) parser.add_argument( - "--loop", + "--daemon", action="store_true", - help="循环运行(每天执行)", + help="后台运行(持续执行定时任务)", ) args = parser.parse_args() @@ -284,18 +283,17 @@ def main(): if args.run_now: # 立即执行一次 daily_task(args.config) - elif args.loop: - # 循环运行 - logger.info(f"启动定时任务,每天 {args.time} 执行") - while True: - wait_until_target_time(args.time) - daily_task(args.config) - # 等待一段时间避免重复执行 - time.sleep(60) + elif args.daemon: + # 后台运行模式 + setup_schedule(args.time, args.config) + run_scheduler_loop() else: - # 等待到目标时间执行一次 - wait_until_target_time(args.time) + # 默认:设置定时任务并执行一次(用于测试) + setup_schedule(args.time, args.config) + logger.info("执行一次任务用于测试...") daily_task(args.config) + logger.info("测试完成,启动定时任务循环(按 Ctrl+C 停止)...") + run_scheduler_loop() if __name__ == "__main__": diff --git a/strategies/rotation/report.py b/strategies/rotation/report.py index 8714393..4a4dd13 100644 --- a/strategies/rotation/report.py +++ b/strategies/rotation/report.py @@ -83,7 +83,15 @@ def generate_performance_report( # 绘制图表 _plot_report_chart( backtest_result, code_list, code_name_map, - benchmark_name, save_path, select_num + benchmark_name, save_path, select_num, + metrics={ + "累计收益": s_total_return, + "年化收益": s_cagr_nat, + "夏普比率": s_sharpe, + "最大回撤": s_max_dd, + "Calmar比率": s_calmar, + "日胜率": s_win_rate, + } ) # 返回指标字典 @@ -281,6 +289,7 @@ def _plot_report_chart( benchmark_name: str, save_path: str, select_num: int, + metrics: dict = None, ): """绘制报告图表""" plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "SimHei", "DejaVu Sans"] @@ -288,16 +297,36 @@ def _plot_report_chart( strategy_nav = backtest_result["轮动策略净值"] benchmark_nav = backtest_result["基准净值"] + strategy_ret = backtest_result["轮动策略日收益率"] + + # 计算绩效指标(如果没有传入) + if metrics is None: + from core.common.utils import calculate_cagr, calculate_max_drawdown, calculate_sharpe + s_cagr_nat = calculate_cagr(strategy_nav, "natural_days") + s_total_return = strategy_nav.iloc[-1] - 1 + s_sharpe = calculate_sharpe(strategy_ret) + s_max_dd, s_dd_start, s_dd_end = calculate_max_drawdown(strategy_nav) + s_win_rate = (strategy_ret > 0).sum() / len(strategy_ret) + s_calmar = s_cagr_nat / abs(s_max_dd) if s_max_dd != 0 else np.inf + metrics = { + "累计收益": s_total_return, + "年化收益": s_cagr_nat, + "夏普比率": s_sharpe, + "最大回撤": s_max_dd, + "Calmar比率": s_calmar, + "日胜率": s_win_rate, + } # 提取最新调仓信息 latest = _extract_latest_positions(backtest_result, code_list, code_name_map, select_num) # 计算表格行数 n_table_rows = len(latest["positions"]) + len(latest["exit_positions"]) - table_height = max(1.5, 0.5 + n_table_rows * 0.28) + signal_table_height = max(2.0, 0.6 + n_table_rows * 0.35) + metrics_table_height = 1.2 - fig = plt.figure(figsize=(14, 10 + table_height + 8)) - gs = fig.add_gridspec(4, 1, height_ratios=[table_height, 3, 1, 1.2], hspace=0.35) + fig = plt.figure(figsize=(14, 10 + signal_table_height + metrics_table_height + 8)) + gs = fig.add_gridspec(5, 1, height_ratios=[signal_table_height, metrics_table_height, 3, 1, 1.2], hspace=0.35) # 面板0: 最新调仓信号表 ax0 = fig.add_subplot(gs[0]) @@ -342,13 +371,14 @@ def _plot_report_chart( table = ax0.table( cellText=table_data, colLabels=col_labels, - loc="upper center", + loc="center", cellLoc="center", - colWidths=[0.08, 0.08, 0.05, 0.06, 0.06, 0.07, 0.07, 0.05, 0.06, 0.07], + colWidths=[0.10, 0.10, 0.07, 0.08, 0.08, 0.08, 0.08, 0.07, 0.08, 0.08], + bbox=[0, 0, 1, 1], # 使用完整宽度 ) table.auto_set_font_size(False) table.set_fontsize(10) - table.scale(1, 2.0) + table.scale(1, 2.0) # 行高与绩效表格一致 # 表头深色 for j in range(len(col_labels)): @@ -357,7 +387,7 @@ def _plot_report_chart( # 数据行按操作着色 for i in range(len(table_data)): - action = table_data[i][3] + action = table_data[i][7] # 操作列在第8列 if action == "调入": color = "#d4edda" # 绿色 elif action == "调出": @@ -367,11 +397,92 @@ def _plot_report_chart( for j in range(len(col_labels)): table[i + 1, j].set_facecolor(color) - # 面板1: 净值曲线 + # 面板1: 策略绩效指标对比表(转置:行为策略/基准,列为指标) ax1 = fig.add_subplot(gs[1]) - ax1.plot(strategy_nav.index, strategy_nav.values, + ax1.axis("off") + ax1.set_title("策略绩效对比", fontsize=14, fontweight="bold", loc="left", pad=10) + + # 计算基准指标 + from core.common.utils import calculate_cagr, calculate_max_drawdown, calculate_sharpe + benchmark_ret = backtest_result["基准日收益率"] + b_cagr_nat = calculate_cagr(benchmark_nav, "natural_days") + b_total_return = benchmark_nav.iloc[-1] - 1 + b_sharpe = calculate_sharpe(benchmark_ret) + b_max_dd, _, _ = calculate_max_drawdown(benchmark_nav) + + # 构建绩效对比表格(转置) + start_date = strategy_nav.index.min().strftime("%Y-%m-%d") + end_date = strategy_nav.index.max().strftime("%Y-%m-%d") + + # 列标题(指标),第一列添加"策略" + perf_col_labels = ["策略", "开始时间", "结束时间", "累计收益", "年化收益", "最大回撤", "夏普比率", "Calmar比率", "日胜率"] + + # 策略行数据(包含行标题) + strategy_row = [ + "轮动策略", + start_date, + end_date, + f"{metrics.get('累计收益', 0):.2%}", + f"{metrics.get('年化收益', 0):.2%}", + f"{metrics.get('最大回撤', 0):.2%}", + f"{metrics.get('夏普比率', 0):.2f}", + f"{metrics.get('Calmar比率', 0):.2f}", + f"{metrics.get('日胜率', 0):.2%}", + ] + + # 基准行数据(包含行标题) + benchmark_row = [ + f"基准({benchmark_name})", + start_date, + end_date, + f"{b_total_return:.2%}", + f"{b_cagr_nat:.2%}", + f"{b_max_dd:.2%}", + f"{b_sharpe:.2f}", + "—", + "—", + ] + + # 表格数据:2行(策略、基准) + perf_table_data = [strategy_row, benchmark_row] + + # 使用与调仓表格相同的列宽计算方式,确保总宽度一致 + # 调仓表格有10列,这里也有9列,使用相似的宽度分配 + perf_col_widths = [0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10, 0.10] + + perf_table = ax1.table( + cellText=perf_table_data, + colLabels=perf_col_labels, + loc="center", + cellLoc="center", + colWidths=perf_col_widths, + bbox=[0, 0, 1, 1], # 使用完整宽度,与调仓表格一致 + ) + perf_table.auto_set_font_size(False) + perf_table.set_fontsize(10) # 字体大小与调仓表格一致 + perf_table.scale(1, 2.0) # 行高与调仓表格一致 + + # 表头样式(第一行) + for j in range(len(perf_col_labels)): + perf_table[0, j].set_facecolor("#2C3E50") + perf_table[0, j].set_text_props(color="white", fontweight="bold") + + # 数据行样式 + # 策略行浅绿背景 + for j in range(len(perf_col_labels)): + perf_table[1, j].set_facecolor("#d4edda") + # 基准行浅蓝背景 + for j in range(len(perf_col_labels)): + perf_table[2, j].set_facecolor("#cce5ff") + # 第一列(策略名称)加粗 + for i in range(2): + perf_table[i + 1, 0].set_text_props(fontweight="bold") + + # 面板2: 净值曲线 + ax2 = fig.add_subplot(gs[2]) + ax2.plot(strategy_nav.index, strategy_nav.values, label="轮动策略", linewidth=2, color="#E74C3C") - ax1.plot(benchmark_nav.index, benchmark_nav.values, + ax2.plot(benchmark_nav.index, benchmark_nav.values, label=benchmark_name, linewidth=1.5, color="#3498DB", alpha=0.8) chart_colors = plt.cm.tab20.colors @@ -379,27 +490,27 @@ def _plot_report_chart( for i, code in enumerate(code_list): name = code_name_map.get(code, code) lbl = name if i < show_legend_n else None - ax1.plot(backtest_result.index, backtest_result[f"净值_{code}"].values, + ax2.plot(backtest_result.index, backtest_result[f"净值_{code}"].values, label=lbl, linewidth=0.8, alpha=0.4, color=chart_colors[i % len(chart_colors)]) - ax1.set_title("ETF轮动策略 - 净值曲线", fontsize=16, fontweight="bold") - ax1.set_ylabel("净值") - ax1.legend(loc="upper left", fontsize=8, ncol=2) - ax1.grid(True, alpha=0.3) - ax1.set_yscale("log") + ax2.set_title("ETF轮动策略 - 净值曲线", fontsize=16, fontweight="bold") + ax2.set_ylabel("净值") + ax2.legend(loc="upper left", fontsize=8, ncol=2) + ax2.grid(True, alpha=0.3) + ax2.set_yscale("log") - # 面板2: 回撤曲线 - ax2 = fig.add_subplot(gs[2]) + # 面板3: 回撤曲线 + ax3 = fig.add_subplot(gs[3]) cummax = strategy_nav.cummax() drawdown = (strategy_nav - cummax) / cummax - ax2.fill_between(drawdown.index, drawdown.values, 0, alpha=0.5, color="#E74C3C") - ax2.set_title("策略回撤", fontsize=12) - ax2.set_ylabel("回撤") - ax2.grid(True, alpha=0.3) + ax3.fill_between(drawdown.index, drawdown.values, 0, alpha=0.5, color="#E74C3C") + ax3.set_title("策略回撤", fontsize=12) + ax3.set_ylabel("回撤") + ax3.grid(True, alpha=0.3) - # 面板3: 持仓分布 - ax3 = fig.add_subplot(gs[3]) + # 面板4: 持仓分布 + ax4 = fig.add_subplot(gs[4]) signal_series = backtest_result["信号"] for i, code in enumerate(code_list): name = code_name_map.get(code, code) @@ -408,16 +519,16 @@ def _plot_report_chart( else: mask = signal_series == code if mask.any(): - ax3.fill_between(signal_series.index, i, i + 0.8, + ax4.fill_between(signal_series.index, i, i + 0.8, where=mask, alpha=0.7, color=chart_colors[i % len(chart_colors)], label=name) ylabels = [code_name_map.get(c, c) for c in code_list] - ax3.set_title("每日持仓分布", fontsize=12) - ax3.set_yticks(range(len(ylabels))) - ax3.set_yticklabels(ylabels, fontsize=7) - ax3.grid(True, alpha=0.3) + ax4.set_title("每日持仓分布", fontsize=12) + ax4.set_yticks(range(len(ylabels))) + ax4.set_yticklabels(ylabels, fontsize=7) + ax4.grid(True, alpha=0.3) chart_path = f"{save_path}_chart.png" plt.savefig(chart_path, dpi=150, bbox_inches="tight")