Files
etf/rotation/daily_scheduler.py
aszerW 5c4aeb75d2 fix(scheduler): 修复setup_schedule未传递no_detail/no_report参数的问题
setup_schedule() 在定时模式下未将 --no-detail 和 --no-report 参数传递给 daily_task,导致定时任务始终生成 detail JSON
2026-06-09 00:07:01 +08:00

429 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ETF轮动策略定时调度器
功能:
1. 每天15:30检查是否为交易日
2. 如果是交易日,执行策略回测生成报告
3. 上传报告图片到OSS
4. 发送图片链接到钉钉群
用法:
python rotation/daily_scheduler.py --time 15:30 # 后台定时模式
python rotation/daily_scheduler.py --now # 立即执行一次
python rotation/daily_scheduler.py --no-daemon # 非后台模式
"""
import warnings
# 抑制oss2库的SyntaxWarning无效转义序列'\&'
warnings.filterwarnings('ignore', category=SyntaxWarning, module='oss2')
import os
import sys
import time
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
# 添加项目根目录
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
from loguru import logger
import schedule
import tushare as ts
# 导入原有的通知和OSS模块
try:
from core.common.notify import DingTalkBot, send_to_all_groups
from core.common.oss_utils import upload_image_to_oss
HAS_NOTIFY_MODULE = True
except ImportError as e:
logger.warning(f"无法导入通知模块: {e}")
HAS_NOTIFY_MODULE = False
# 配置日志
log_path = project_root / "logs" / "scheduler_{time}.log"
logger.add(
str(log_path),
rotation="1 day",
retention="7 days",
level="INFO"
)
def is_trade_day(date_str: str = None) -> bool:
"""
判断指定日期是否为交易日
Args:
date_str: 日期字符串 (YYYYMMDD),默认今天
Returns:
bool: 是否为交易日
"""
if date_str is None:
date_str = datetime.now().strftime("%Y%m%d")
try:
token = os.getenv('TUSHARE_TOKEN')
if not token:
logger.error("TUSHARE_TOKEN 未设置")
return True # 失败时默认执行(避免错过交易日)
pro = ts.pro_api(token)
df = pro.trade_cal(
exchange='SSE',
start_date=date_str,
end_date=date_str,
is_open='1'
)
is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
return is_open
except Exception as e:
logger.error(f"检查交易日失败: {e}")
return True # 失败时默认执行
def run_strategy(config_path: str = "strategies/rotation/config.yaml") -> dict:
"""
执行策略回测并生成报告
使用 generate_legacy_report.py 生成原引擎格式报告
Args:
config_path: 配置文件路径
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行策略回测...")
try:
# 使用 generate_legacy_report.py 生成报告
# 输出文件: results/rotation_legacy_chart.png
cmd = [
sys.executable,
str(project_root / "scripts" / "generate_legacy_report.py")
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行策略增加超时到15分钟因为需要获取多市场数据
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=900 # 15分钟超时原5分钟不够数据获取串行需要更长时间
)
if result.returncode != 0:
logger.error(f"策略执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("策略执行成功")
logger.debug(result.stdout)
# 查找生成的报告图片(固定路径)
chart_path = project_root / "results" / "rotation_legacy_chart.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("策略执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"策略执行异常: {e}")
return {"success": False, "error": str(e)}
def run_simple_rotation(config_path: str = None, no_detail: bool = False, no_report: bool = False) -> dict:
"""
执行 simple_rotation.py 策略回测并生成报告
Args:
config_path: 配置文件路径,默认使用 rotation/config_simple.yaml
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行 Simple Rotation 策略回测...")
try:
cmd = [
sys.executable,
str(project_root / "rotation" / "simple_rotation.py")
]
if config_path:
cmd.extend(["--config", config_path])
if no_detail:
cmd.append("--no-detail")
if no_report:
cmd.append("--no-report")
logger.info(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=900
)
if result.returncode != 0:
logger.error(f"Simple Rotation 执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("Simple Rotation 执行成功")
logger.debug(result.stdout)
# simple_rotation.py 生成的报告路径
chart_path = project_root / "rotation" / "results" / "simple_rotation_report.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("Simple Rotation 执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"Simple Rotation 执行异常: {e}")
return {"success": False, "error": str(e)}
def send_report_to_dingtalk(chart_path: str, summary_text: str = "", title: str = None) -> bool:
"""
上传报告到OSS并发送图片链接到钉钉
Args:
chart_path: 图片文件路径
summary_text: 摘要文本
title: 消息标题,默认自动生成
Returns:
bool: 是否发送成功
"""
logger.info("开始发送报告到钉钉...")
if not HAS_NOTIFY_MODULE:
logger.warning("通知模块未加载,无法发送")
return False
try:
today_str = datetime.now().strftime("%Y-%m-%d")
if title is None:
title = f"ETF轮动策略调仓日报 ({today_str})"
# 使用原有的 send_to_all_groups 发送图片
# 该方法会自动上传到OSS → 发送Markdown消息带图片链接
success = send_to_all_groups(
"send_image_via_oss",
image_path=chart_path,
title=title,
text=summary_text,
expire_days=7
)
if success:
logger.info("报告发送成功")
else:
logger.error("报告发送失败")
return success
except Exception as e:
logger.error(f"发送报告异常: {e}")
return False
def setup_schedule(target_time: str = "15:30",
config_path: str = "strategies/rotation/config.yaml",
strategy: str = "all",
simple_config: str = None,
no_detail: bool = False,
no_report: bool = False):
"""
设置定时任务
Args:
target_time: 执行时间 (HH:MM)
config_path: legacy策略配置文件路径
strategy: 策略选择 - "simple" / "legacy" / "all"
simple_config: simple_rotation 配置文件路径
no_detail: 跳过 detail JSON 导出
no_report: 跳过 report PNG 生成
"""
logger.info(f"设置定时任务: 每天 {target_time} 执行 (策略: {strategy}, no_detail={no_detail}, no_report={no_report})")
# 清除已有任务
schedule.clear()
# 添加每日任务
schedule.every().day.at(target_time).do(
daily_task,
config_path=config_path,
strategy=strategy,
simple_config=simple_config,
no_detail=no_detail,
no_report=no_report
)
logger.info("定时任务设置完成,等待执行...")
def daily_task(config_path: str = "strategies/rotation/config.yaml",
strategy: str = "all",
simple_config: str = None,
no_detail: bool = False,
no_report: bool = False):
"""
每日任务主流程
Args:
config_path: legacy策略配置文件路径
strategy: 策略选择 - "simple" / "legacy" / "all"(两者都执行)
simple_config: simple_rotation 配置文件路径
"""
today_str = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info(f"开始执行每日任务: {today_str} 策略: {strategy}")
logger.info("=" * 60)
# 1. 检查是否为交易日
if not is_trade_day():
logger.info("今天不是交易日,跳过执行")
return
# 2. 执行 Simple Rotation 策略
if strategy in ("simple", "all"):
result = run_simple_rotation(simple_config, no_detail=no_detail, no_report=no_report)
if result["success"]:
if result.get("chart_path"):
send_report_to_dingtalk(
chart_path=result["chart_path"],
summary_text="",
title=f"Simple Rotation 调仓日报 ({today_str})"
)
else:
logger.warning("Simple Rotation 未找到报告文件")
else:
logger.error(f"Simple Rotation 执行失败: {result.get('error', '未知错误')}")
# 3. 执行 Legacy 策略
if strategy in ("legacy", "all"):
result = run_strategy(config_path)
if result["success"]:
if result.get("chart_path"):
send_report_to_dingtalk(
chart_path=result["chart_path"],
summary_text="",
title=f"ETF轮动策略调仓日报 ({today_str})"
)
else:
logger.warning("Legacy 策略未找到报告文件")
else:
logger.error(f"Legacy 策略执行失败: {result.get('error', '未知错误')}")
logger.info("每日任务执行完成")
def run_scheduler_loop():
"""运行调度循环"""
logger.info("启动调度器,按 Ctrl+C 停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("调度器已停止")
def main():
parser = argparse.ArgumentParser(description='ETF策略定时调度器')
parser.add_argument(
'--time',
type=str,
default='15:30',
help='执行时间 (HH:MM),默认 15:30'
)
parser.add_argument(
'--config',
type=str,
default='strategies/rotation/config.yaml',
help='Legacy策略配置文件路径'
)
parser.add_argument(
'--simple-config',
type=str,
default=None,
help='Simple Rotation 配置文件路径(默认 rotation/config_simple.yaml'
)
parser.add_argument(
'--strategy',
type=str,
choices=['simple', 'legacy', 'all'],
default='simple',
help='策略选择: simple=仅Simple Rotation, legacy=仅Legacy策略, all=两者都执行(默认 simple'
)
parser.add_argument(
'--now',
action='store_true',
help='立即执行一次并退出(不启动定时)'
)
parser.add_argument(
'--no-daemon',
action='store_true',
help='非后台模式:执行一次后进入定时循环(测试用)'
)
parser.add_argument(
'--no-detail',
action='store_true',
help='跳过 detail JSON 导出(加速日常运行)'
)
parser.add_argument(
'--no-report',
action='store_true',
help='跳过 report PNG 生成'
)
args = parser.parse_args()
# 创建日志目录
(project_root / "logs").mkdir(exist_ok=True)
if args.now:
# 立即执行一次并退出
daily_task(args.config, args.strategy, args.simple_config, args.no_detail, args.no_report)
elif args.no_daemon:
# 非后台模式:执行一次后进入定时循环
setup_schedule(args.time, args.config, args.strategy, args.simple_config, args.no_detail, args.no_report)
logger.info("执行一次测试...")
daily_task(args.config, args.strategy, args.simple_config, args.no_detail, args.no_report)
logger.info("测试完成启动定时任务循环Ctrl+C 停止)...")
run_scheduler_loop()
else:
# 默认:后台定时模式
setup_schedule(args.time, args.config, args.strategy, args.simple_config, args.no_detail, args.no_report)
run_scheduler_loop()
if __name__ == '__main__':
main()