#!/usr/bin/env python3 """ ETF策略每日定时任务 功能: 1. 每天15:00收盘后检查是否为交易日 2. 如果是交易日,执行策略回测 3. 上传报告图表到OSS 4. 发送结果到钉钉 用法: python scripts/daily_scheduler.py python scripts/daily_scheduler.py --time 15:00 """ import sys import os import time import argparse import subprocess from datetime import datetime, timedelta from pathlib import Path # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv(project_root / ".env") from loguru import logger import tushare as ts from core.common.notify import DingTalkBot from core.common.oss_utils import upload_image_to_oss # 配置日志 logger.add( project_root / "logs" / "scheduler_{time}.log", rotation="1 day", retention="7 days", level="INFO", ) def is_trading_day(date_str: str = None) -> bool: """ 检查指定日期是否为交易日 Args: date_str: 日期字符串 (YYYYMMDD),默认今天 Returns: bool: 是否为交易日 """ if date_str is None: date_str = datetime.now().strftime("%Y%m%d") try: token = os.getenv("TUSHARE_TOKEN") if not token: logger.error("TUSHARE_TOKEN 未配置") return False pro = ts.pro_api(token) df = pro.trade_cal( exchange='SSE', start_date=date_str, end_date=date_str, is_open='1' ) is_open = len(df) > 0 and df.iloc[0]['is_open'] == 1 logger.info(f"日期 {date_str} 是否为交易日: {is_open}") return is_open except Exception as e: logger.error(f"检查交易日失败: {e}") # 失败时默认执行(避免错过交易日) return True def run_strategy(config_path: str = "config/strategies/rotation.yaml") -> dict: """ 执行策略回测 Args: config_path: 配置文件路径 Returns: dict: 执行结果,包含报告路径等信息 """ logger.info("开始执行策略回测...") try: # 构建命令 cmd = [ sys.executable, str(project_root / "scripts" / "run_rotation.py"), "--config", config_path, "--save-path", f"results/report_{datetime.now().strftime('%Y%m%d')}" ] logger.info(f"执行命令: {' '.join(cmd)}") # 执行策略 result = subprocess.run( cmd, capture_output=True, text=True, cwd=project_root, timeout=300 # 5分钟超时 ) if result.returncode != 0: logger.error(f"策略执行失败:\n{result.stderr}") return {"success": False, "error": result.stderr} logger.info("策略执行成功") logger.debug(result.stdout) # 查找生成的报告文件 report_date = datetime.now().strftime('%Y%m%d') chart_path = project_root / "results" / f"report_{report_date}_chart.png" # 如果找不到带日期的,尝试默认路径 if not chart_path.exists(): chart_path = project_root / "results" / "report_chart.png" return { "success": True, "stdout": result.stdout, "chart_path": str(chart_path) if chart_path.exists() else None, } except subprocess.TimeoutExpired: logger.error("策略执行超时") return {"success": False, "error": "timeout"} except Exception as e: logger.error(f"策略执行异常: {e}") return {"success": False, "error": str(e)} def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool: """ 上传报告到OSS并发送到钉钉 Args: chart_path: 图表文件路径 summary_text: 摘要文本 Returns: bool: 是否发送成功 """ logger.info("开始发送报告到钉钉...") try: bot = DingTalkBot() if not bot.webhook: logger.error("钉钉未配置,无法发送") return False today_str = datetime.now().strftime('%Y-%m-%d') # 发送图文消息 success = bot.send_image_via_oss( image_path=chart_path, title=f"ETF轮动策略日报 ({today_str})", text=summary_text or f"今日调仓信号已生成", expire_days=7 ) if success: logger.info("报告发送成功") else: logger.error("报告发送失败") return success except Exception as e: logger.error(f"发送报告异常: {e}") return False def wait_until_target_time(target_time: str = "15:00"): """ 等待直到目标时间 Args: target_time: 目标时间 (HH:MM) """ while True: now = datetime.now() target = now.replace( hour=int(target_time.split(":")[0]), minute=int(target_time.split(":")[1]), second=0, microsecond=0 ) # 如果目标时间已过,等到明天 if target < now: target += timedelta(days=1) logger.info(f"目标时间已过,等到明天 {target_time}") wait_seconds = (target - now).total_seconds() if wait_seconds > 60: logger.info(f"等待 {target_time},还需 {wait_seconds/60:.0f} 分钟...") time.sleep(60) # 每分钟检查一次 else: logger.info(f"即将到达目标时间,等待 {wait_seconds:.0f} 秒...") time.sleep(wait_seconds) break def daily_task(config_path: str = "config/strategies/rotation.yaml"): """ 每日任务主流程 Args: config_path: 配置文件路径 """ today_str = datetime.now().strftime('%Y-%m-%d') logger.info(f"=" * 60) logger.info(f"开始执行每日任务: {today_str}") logger.info(f"=" * 60) # 1. 检查是否为交易日 if not is_trading_day(): logger.info("今天不是交易日,跳过执行") return # 2. 执行策略 result = run_strategy(config_path) if not result["success"]: # 发送错误通知 bot = DingTalkBot() bot.send_text(f"策略执行失败: {result.get('error', '未知错误')}") return # 3. 发送报告 if result.get("chart_path"): send_report_to_dingtalk( chart_path=result["chart_path"], summary_text="今日ETF轮动策略调仓信号" ) else: logger.warning("未找到报告图表") logger.info("每日任务完成") def main(): parser = argparse.ArgumentParser(description="ETF策略每日定时任务") parser.add_argument( "--time", type=str, default="15:30", help="执行时间 (HH:MM),默认15:00", ) parser.add_argument( "--config", type=str, default="config/strategies/rotation.yaml", help="配置文件路径", ) parser.add_argument( "--run-now", action="store_true", help="立即执行一次(不等待指定时间)", ) parser.add_argument( "--loop", action="store_true", help="循环运行(每天执行)", ) args = parser.parse_args() # 创建日志目录 (project_root / "logs").mkdir(exist_ok=True) if args.run_now: # 立即执行一次 daily_task(args.config) elif args.loop: # 循环运行 logger.info(f"启动定时任务,每天 {args.time} 执行") while True: wait_until_target_time(args.time) daily_task(args.config) # 等待一段时间避免重复执行 time.sleep(60) else: # 等待到目标时间执行一次 wait_until_target_time(args.time) daily_task(args.config) if __name__ == "__main__": main()