Files
etf/scripts/daily_scheduler.py
aszerW 7b41bb8c6d feat(scripts): 迁移轮动策略定时调度器
新增文件:
- scripts/daily_scheduler.py: 定时调度器,支持交易日判断、回测执行、OSS上传、钉钉推送
- scripts/run_rotation.py: 回测入口脚本,支持Flask API和本地数据源切换
- config/settings.py: 配置管理模块,支持钉钉多群配置

功能:
1. 每天15:30自动检查交易日
2. 交易日执行策略回测生成报告
3. 上传报告图片到OSS
4. 发送图片链接到钉钉群

修复:
- 添加oss2库SyntaxWarning过滤(Python 3.12兼容)
- 钉钉消息精简为标题+图片格式
2026-05-18 00:57:59 +08:00

314 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ETF轮动策略定时调度器
功能:
1. 每天15:30检查是否为交易日
2. 如果是交易日,执行策略回测生成报告
3. 上传报告图片到OSS
4. 发送图片链接到钉钉群
用法:
python scripts/daily_scheduler.py --time 15:30 # 后台定时模式
python scripts/daily_scheduler.py --now # 立即执行一次
python scripts/daily_scheduler.py --no-daemon # 非后台模式
"""
import warnings
# 抑制oss2库的SyntaxWarning无效转义序列'\&'
warnings.filterwarnings('ignore', category=SyntaxWarning, module='oss2')
import os
import sys
import time
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
# 添加项目根目录
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 添加归档模块路径(使用原有的 notify 和 oss_utils
archive_path = project_root / 'archive' / 'legacy_core'
sys.path.insert(0, str(archive_path))
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
from loguru import logger
import schedule
import tushare as ts
# 导入原有的通知和OSS模块
try:
from core.common.notify import DingTalkBot, send_to_all_groups
from core.common.oss_utils import upload_image_to_oss
HAS_NOTIFY_MODULE = True
except ImportError as e:
logger.warning(f"无法导入通知模块: {e}")
HAS_NOTIFY_MODULE = False
# 配置日志
log_path = project_root / "logs" / "scheduler_{time}.log"
logger.add(
str(log_path),
rotation="1 day",
retention="7 days",
level="INFO"
)
def is_trade_day(date_str: str = None) -> bool:
"""
判断指定日期是否为交易日
Args:
date_str: 日期字符串 (YYYYMMDD),默认今天
Returns:
bool: 是否为交易日
"""
if date_str is None:
date_str = datetime.now().strftime("%Y%m%d")
try:
token = os.getenv('TUSHARE_TOKEN')
if not token:
logger.error("TUSHARE_TOKEN 未设置")
return True # 失败时默认执行(避免错过交易日)
pro = ts.pro_api(token)
df = pro.trade_cal(
exchange='SSE',
start_date=date_str,
end_date=date_str,
is_open='1'
)
is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
return is_open
except Exception as e:
logger.error(f"检查交易日失败: {e}")
return True # 失败时默认执行
def run_strategy(config_path: str = "strategies/rotation/config.yaml") -> dict:
"""
执行策略回测并生成报告
使用 generate_legacy_report.py 生成原引擎格式报告
Args:
config_path: 配置文件路径
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行策略回测...")
try:
# 使用 generate_legacy_report.py 生成报告
# 输出文件: results/rotation_legacy_chart.png
cmd = [
sys.executable,
str(project_root / "scripts" / "generate_legacy_report.py")
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行策略
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=300 # 5分钟超时
)
if result.returncode != 0:
logger.error(f"策略执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("策略执行成功")
logger.debug(result.stdout)
# 查找生成的报告图片(固定路径)
chart_path = project_root / "results" / "rotation_legacy_chart.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("策略执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"策略执行异常: {e}")
return {"success": False, "error": str(e)}
def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool:
"""
上传报告到OSS并发送图片链接到钉钉
Args:
chart_path: 图片文件路径
summary_text: 摘要文本
Returns:
bool: 是否发送成功
"""
logger.info("开始发送报告到钉钉...")
if not HAS_NOTIFY_MODULE:
logger.warning("通知模块未加载,无法发送")
return False
try:
today_str = datetime.now().strftime("%Y-%m-%d")
title = f"ETF轮动策略调仓日报 ({today_str})"
# 使用原有的 send_to_all_groups 发送图片
# 该方法会自动上传到OSS → 发送Markdown消息带图片链接
success = send_to_all_groups(
"send_image_via_oss",
image_path=chart_path,
title=title,
text=summary_text,
expire_days=7
)
if success:
logger.info("报告发送成功")
else:
logger.error("报告发送失败")
return success
except Exception as e:
logger.error(f"发送报告异常: {e}")
return False
def setup_schedule(target_time: str = "15:30", config_path: str = "strategies/rotation/config.yaml"):
"""
设置定时任务
Args:
target_time: 执行时间 (HH:MM)
config_path: 配置文件路径
"""
logger.info(f"设置定时任务: 每天 {target_time} 执行")
# 清除已有任务
schedule.clear()
# 添加每日任务
schedule.every().day.at(target_time).do(daily_task, config_path=config_path)
logger.info("定时任务设置完成,等待执行...")
def daily_task(config_path: str = "strategies/rotation/config.yaml"):
"""
每日任务主流程
Args:
config_path: 配置文件路径
"""
today_str = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info(f"开始执行每日任务: {today_str}")
logger.info("=" * 60)
# 1. 检查是否为交易日
if not is_trade_day():
logger.info("今天不是交易日,跳过执行")
return
# 2. 执行策略
result = run_strategy(config_path)
if not result["success"]:
logger.error(f"策略执行失败: {result.get('error', '未知错误')}")
return
# 3. 发送报告
if result.get("chart_path"):
# 只发送标题和图片,不附带文字摘要
send_report_to_dingtalk(
chart_path=result["chart_path"],
summary_text="" # 空字符串,只显示标题和图片
)
else:
logger.warning("未找到报告文件")
logger.info("每日任务执行完成")
def run_scheduler_loop():
"""运行调度循环"""
logger.info("启动调度器,按 Ctrl+C 停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("调度器已停止")
def main():
parser = argparse.ArgumentParser(description='ETF策略定时调度器')
parser.add_argument(
'--time',
type=str,
default='15:30',
help='执行时间 (HH:MM),默认 15:30'
)
parser.add_argument(
'--config',
type=str,
default='strategies/rotation/config.yaml',
help='配置文件路径'
)
parser.add_argument(
'--now',
action='store_true',
help='立即执行一次并退出(不启动定时)'
)
parser.add_argument(
'--no-daemon',
action='store_true',
help='非后台模式:执行一次后进入定时循环(测试用)'
)
args = parser.parse_args()
# 创建日志目录
(project_root / "logs").mkdir(exist_ok=True)
if args.now:
# 立即执行一次并退出
daily_task(args.config)
elif args.no_daemon:
# 非后台模式:执行一次后进入定时循环
setup_schedule(args.time, args.config)
logger.info("执行一次测试...")
daily_task(args.config)
logger.info("测试完成启动定时任务循环Ctrl+C 停止)...")
run_scheduler_loop()
else:
# 默认:后台定时模式
setup_schedule(args.time, args.config)
run_scheduler_loop()
if __name__ == '__main__':
main()