From 7b41bb8c6d18b4ba7718f8265c53d3bd19b0801e Mon Sep 17 00:00:00 2001 From: aszerW Date: Mon, 18 May 2026 00:57:59 +0800 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20=E8=BF=81=E7=A7=BB=E8=BD=AE?= =?UTF-8?q?=E5=8A=A8=E7=AD=96=E7=95=A5=E5=AE=9A=E6=97=B6=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增文件: - scripts/daily_scheduler.py: 定时调度器,支持交易日判断、回测执行、OSS上传、钉钉推送 - scripts/run_rotation.py: 回测入口脚本,支持Flask API和本地数据源切换 - config/settings.py: 配置管理模块,支持钉钉多群配置 功能: 1. 每天15:30自动检查交易日 2. 交易日执行策略回测生成报告 3. 上传报告图片到OSS 4. 发送图片链接到钉钉群 修复: - 添加oss2库SyntaxWarning过滤(Python 3.12兼容) - 钉钉消息精简为标题+图片格式 --- config/settings.py | 93 +++++++++++ scripts/daily_scheduler.py | 314 +++++++++++++++++++++++++++++++++++++ scripts/run_rotation.py | 115 ++++++++++++++ 3 files changed, 522 insertions(+) create mode 100644 config/settings.py create mode 100644 scripts/daily_scheduler.py create mode 100644 scripts/run_rotation.py diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..4f9fd9b --- /dev/null +++ b/config/settings.py @@ -0,0 +1,93 @@ +""" +配置管理模块 + +从环境变量读取配置信息 +""" + +import os +from typing import Dict, List, Optional + +# 默认基准 +DEFAULT_BENCHMARK_CODE = "000300.SH" +DEFAULT_BENCHMARK_NAME = "沪深300指数" + +# 默认代码名称映射 +DEFAULT_CODE_NAME_MAP = { + "000300.SH": "沪深300", + "000905.SH": "中证500", + "000852.SH": "中证1000", +} + + +def get_dingtalk_config() -> Dict[str, str]: + """ + 获取钉钉机器人配置(第一个群) + + Returns: + dict: 包含 webhook 和 secret + """ + webhook = os.getenv("DINGTALK_WEBHOOK", "") + secret = os.getenv("DINGTALK_SECRET", "") + + return { + "webhook": webhook, + "secret": secret, + } + + +def get_all_dingtalk_configs() -> List[Dict[str, str]]: + """ + 获取所有钉钉机器人配置(支持多群) + + 环境变量格式: + DINGTALK_WEBHOOK_1=xxx + DINGTALK_SECRET_1=xxx + DINGTALK_WEBHOOK_2=xxx + DINGTALK_SECRET_2=xxx + ... + + 如果没有编号配置,则使用 DINGTALK_WEBHOOK 和 DINGTALK_SECRET + + Returns: + list: 配置列表,每项包含 webhook 和 secret + """ + configs = [] + + # 查找编号配置 + i = 1 + while True: + webhook = os.getenv(f"DINGTALK_WEBHOOK_{i}", "") + secret = os.getenv(f"DINGTALK_SECRET_{i}", "") + + if not webhook: + break + + configs.append({ + "webhook": webhook, + "secret": secret, + }) + i += 1 + + # 如果没有编号配置,使用默认配置 + if not configs: + default_config = get_dingtalk_config() + if default_config["webhook"]: + configs.append(default_config) + + return configs + + +def get_db_config() -> Dict[str, str]: + """ + 获取数据库配置 + + Returns: + dict: 数据库配置 + """ + return { + "host": os.getenv("DB_HOST", "localhost"), + "port": os.getenv("DB_PORT", "5432"), + "user": os.getenv("DB_USER", ""), + "password": os.getenv("DB_PASSWORD", ""), + "database": os.getenv("DB_NAME", ""), + } \ No newline at end of file diff --git a/scripts/daily_scheduler.py b/scripts/daily_scheduler.py new file mode 100644 index 0000000..6682eff --- /dev/null +++ b/scripts/daily_scheduler.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +""" +ETF轮动策略定时调度器 + +功能: +1. 每天15:30检查是否为交易日 +2. 如果是交易日,执行策略回测生成报告 +3. 上传报告图片到OSS +4. 发送图片链接到钉钉群 + +用法: + python scripts/daily_scheduler.py --time 15:30 # 后台定时模式 + python scripts/daily_scheduler.py --now # 立即执行一次 + python scripts/daily_scheduler.py --no-daemon # 非后台模式 +""" + +import warnings +# 抑制oss2库的SyntaxWarning(无效转义序列'\&') +warnings.filterwarnings('ignore', category=SyntaxWarning, module='oss2') + +import os +import sys +import time +import subprocess +import argparse +from datetime import datetime +from pathlib import Path + +# 添加项目根目录 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +# 添加归档模块路径(使用原有的 notify 和 oss_utils) +archive_path = project_root / 'archive' / 'legacy_core' +sys.path.insert(0, str(archive_path)) + +# 加载环境变量 +from dotenv import load_dotenv +load_dotenv() + +from loguru import logger +import schedule +import tushare as ts + +# 导入原有的通知和OSS模块 +try: + from core.common.notify import DingTalkBot, send_to_all_groups + from core.common.oss_utils import upload_image_to_oss + HAS_NOTIFY_MODULE = True +except ImportError as e: + logger.warning(f"无法导入通知模块: {e}") + HAS_NOTIFY_MODULE = False + +# 配置日志 +log_path = project_root / "logs" / "scheduler_{time}.log" +logger.add( + str(log_path), + rotation="1 day", + retention="7 days", + level="INFO" +) + + +def is_trade_day(date_str: str = None) -> bool: + """ + 判断指定日期是否为交易日 + + Args: + date_str: 日期字符串 (YYYYMMDD),默认今天 + + Returns: + bool: 是否为交易日 + """ + if date_str is None: + date_str = datetime.now().strftime("%Y%m%d") + + try: + token = os.getenv('TUSHARE_TOKEN') + if not token: + logger.error("TUSHARE_TOKEN 未设置") + return True # 失败时默认执行(避免错过交易日) + + pro = ts.pro_api(token) + df = pro.trade_cal( + exchange='SSE', + start_date=date_str, + end_date=date_str, + is_open='1' + ) + + is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1 + logger.info(f"日期 {date_str} 是否为交易日: {is_open}") + return is_open + + except Exception as e: + logger.error(f"检查交易日失败: {e}") + return True # 失败时默认执行 + + +def run_strategy(config_path: str = "strategies/rotation/config.yaml") -> dict: + """ + 执行策略回测并生成报告 + + 使用 generate_legacy_report.py 生成原引擎格式报告 + + Args: + config_path: 配置文件路径 + + Returns: + dict: 执行结果,包含报告路径等信息 + """ + logger.info("开始执行策略回测...") + + try: + # 使用 generate_legacy_report.py 生成报告 + # 输出文件: results/rotation_legacy_chart.png + cmd = [ + sys.executable, + str(project_root / "scripts" / "generate_legacy_report.py") + ] + + logger.info(f"执行命令: {' '.join(cmd)}") + + # 执行策略 + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=project_root, + timeout=300 # 5分钟超时 + ) + + if result.returncode != 0: + logger.error(f"策略执行失败:\n{result.stderr}") + return {"success": False, "error": result.stderr} + + logger.info("策略执行成功") + logger.debug(result.stdout) + + # 查找生成的报告图片(固定路径) + chart_path = project_root / "results" / "rotation_legacy_chart.png" + + return { + "success": True, + "stdout": result.stdout, + "chart_path": str(chart_path) if chart_path.exists() else None, + } + + except subprocess.TimeoutExpired: + logger.error("策略执行超时") + return {"success": False, "error": "timeout"} + except Exception as e: + logger.error(f"策略执行异常: {e}") + return {"success": False, "error": str(e)} + + +def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool: + """ + 上传报告到OSS并发送图片链接到钉钉 + + Args: + chart_path: 图片文件路径 + summary_text: 摘要文本 + + Returns: + bool: 是否发送成功 + """ + logger.info("开始发送报告到钉钉...") + + if not HAS_NOTIFY_MODULE: + logger.warning("通知模块未加载,无法发送") + return False + + try: + today_str = datetime.now().strftime("%Y-%m-%d") + title = f"ETF轮动策略调仓日报 ({today_str})" + + # 使用原有的 send_to_all_groups 发送图片 + # 该方法会自动:上传到OSS → 发送Markdown消息带图片链接 + success = send_to_all_groups( + "send_image_via_oss", + image_path=chart_path, + title=title, + text=summary_text, + expire_days=7 + ) + + if success: + logger.info("报告发送成功") + else: + logger.error("报告发送失败") + + return success + + except Exception as e: + logger.error(f"发送报告异常: {e}") + return False + + +def setup_schedule(target_time: str = "15:30", config_path: str = "strategies/rotation/config.yaml"): + """ + 设置定时任务 + + Args: + target_time: 执行时间 (HH:MM) + config_path: 配置文件路径 + """ + logger.info(f"设置定时任务: 每天 {target_time} 执行") + + # 清除已有任务 + schedule.clear() + + # 添加每日任务 + schedule.every().day.at(target_time).do(daily_task, config_path=config_path) + + logger.info("定时任务设置完成,等待执行...") + + +def daily_task(config_path: str = "strategies/rotation/config.yaml"): + """ + 每日任务主流程 + + Args: + config_path: 配置文件路径 + """ + today_str = datetime.now().strftime("%Y-%m-%d") + logger.info("=" * 60) + logger.info(f"开始执行每日任务: {today_str}") + logger.info("=" * 60) + + # 1. 检查是否为交易日 + if not is_trade_day(): + logger.info("今天不是交易日,跳过执行") + return + + # 2. 执行策略 + result = run_strategy(config_path) + + if not result["success"]: + logger.error(f"策略执行失败: {result.get('error', '未知错误')}") + return + + # 3. 发送报告 + if result.get("chart_path"): + # 只发送标题和图片,不附带文字摘要 + send_report_to_dingtalk( + chart_path=result["chart_path"], + summary_text="" # 空字符串,只显示标题和图片 + ) + else: + logger.warning("未找到报告文件") + + logger.info("每日任务执行完成") + + +def run_scheduler_loop(): + """运行调度循环""" + logger.info("启动调度器,按 Ctrl+C 停止") + try: + while True: + schedule.run_pending() + time.sleep(1) + except KeyboardInterrupt: + logger.info("调度器已停止") + + +def main(): + parser = argparse.ArgumentParser(description='ETF策略定时调度器') + parser.add_argument( + '--time', + type=str, + default='15:30', + help='执行时间 (HH:MM),默认 15:30' + ) + parser.add_argument( + '--config', + type=str, + default='strategies/rotation/config.yaml', + help='配置文件路径' + ) + parser.add_argument( + '--now', + action='store_true', + help='立即执行一次并退出(不启动定时)' + ) + parser.add_argument( + '--no-daemon', + action='store_true', + help='非后台模式:执行一次后进入定时循环(测试用)' + ) + + args = parser.parse_args() + + # 创建日志目录 + (project_root / "logs").mkdir(exist_ok=True) + + if args.now: + # 立即执行一次并退出 + daily_task(args.config) + elif args.no_daemon: + # 非后台模式:执行一次后进入定时循环 + setup_schedule(args.time, args.config) + logger.info("执行一次测试...") + daily_task(args.config) + logger.info("测试完成,启动定时任务循环(Ctrl+C 停止)...") + run_scheduler_loop() + else: + # 默认:后台定时模式 + setup_schedule(args.time, args.config) + run_scheduler_loop() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scripts/run_rotation.py b/scripts/run_rotation.py new file mode 100644 index 0000000..ae64e9d --- /dev/null +++ b/scripts/run_rotation.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +轮动策略回测入口脚本 + +用法: + python scripts/run_rotation.py --config strategies/rotation/config.yaml + python scripts/run_rotation.py --config strategies/rotation/config.yaml --save-path results/report +""" + +import sys +import argparse +from pathlib import Path +from datetime import datetime + +# 添加项目根目录到路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +# 加载环境变量 +from dotenv import load_dotenv +load_dotenv() + +from strategies.rotation.strategy import RotationStrategy + + +def load_config(config_path: str) -> dict: + """加载配置""" + import yaml + with open(config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + + +def main(): + parser = argparse.ArgumentParser(description='ETF轮动策略回测') + parser.add_argument( + '--config', + type=str, + default='strategies/rotation/config.yaml', + help='配置文件路径' + ) + parser.add_argument( + '--save-path', + type=str, + default=None, + help='报告保存路径前缀' + ) + parser.add_argument( + '--no-api', + action='store_true', + help='不使用Flask API,使用本地数据源' + ) + + args = parser.parse_args() + + start_time = datetime.now() + + print("=" * 60) + print(" ETF轮动策略 回测系统") + print("=" * 60) + + # 加载配置 + print(f"\n加载配置: {args.config}") + config = load_config(args.config) + + # 显示配置摘要 + code_list = list(config.get('code_list', {}).keys()) + print(f"候选标的: {len(code_list)} 只") + print(f"回测区间: {config.get('start_date', 'N/A')} ~ {config.get('end_date', 'N/A')}") + print(f"因子类型: {config.get('factor_type', 'momentum')}") + print(f"窗口天数: {config.get('n_days', 25)}") + print(f"选股数量: {config.get('select_num', 3)}") + print(f"调仓周期: {config.get('rebalance_days', 1)} 天") + print(f"交易成本: {config.get('trade_cost', 0.001):.2%}") + + # 初始化策略 + print("\n初始化策略...") + strategy = RotationStrategy.from_yaml(args.config) + + # 设置保存路径 + if args.save_path is None: + report_date = datetime.now().strftime('%Y%m%d') + args.save_path = f"results/report_{report_date}" + + # 执行回测 + print("\n" + "=" * 60) + print("开始回测...") + print("=" * 60) + + # 使用Flask API或本地数据源 + use_flask_api = not args.no_api + data = strategy.get_data(use_flask_api=use_flask_api) + + result = strategy.run_backtest(data=data, save_path=args.save_path) + + # 输出结果 + if result.get('result') is not None: + final_nav = result['result']['策略净值'].iloc[-1] + total_return = (final_nav - 1) * 100 + + print("\n" + "=" * 60) + print("回测完成!") + print("=" * 60) + print(f"最终净值: {final_nav:.4f}") + print(f"累计收益: {total_return:.2f}%") + print(f"调仓次数: {len(result.get('rebalance_events', []))} 次") + print(f"报告保存: {args.save_path}_*.csv") + + elapsed = datetime.now() - start_time + print(f"\n总耗时: {elapsed.total_seconds():.1f}秒") + + return result + + +if __name__ == '__main__': + main() \ No newline at end of file