Files
etf/scripts/daily_scheduler.py
aszerW 8d24fb91eb refactor(scheduler): 重构每日任务调度逻辑并优化配置路径
- 将等待目标时间逻辑改为基于schedule库的定时任务调度
- 支持后台守护进程模式持续执行定时任务
- 优化命令行参数说明,默认执行时间改为15:30
- 简化立即执行和循环运行的逻辑
- 修改SSH私钥路径为相对于项目根目录
- 更新rotation.yaml配置中指数及加密货币标签说明
- 回测开始日期由2022-01-01调整为2020-01-01

refactor(report): 优化轮动策略绩效报告图表与指标展示

- 新增策略与基准绩效指标对比表格,展示累计收益、年化收益等关键指标
- 调整绩效表布局,增加绩效指标面板高度,保持与信号表格一致视觉
- 丰富绘图函数参数,支持传入绩效指标字典避免重复计算
- 规范调仓信号表操作列索引及样式,保持统一字体大小和行高
- 净值曲线、回撤及持仓分布面板分离,调整图表索引和标题名称
- 优化持仓分布图显示,提升整体报告信息完整性与易读性
2026-03-19 21:56:17 +08:00

301 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ETF策略每日定时任务
功能:
1. 每天15:00收盘后检查是否为交易日
2. 如果是交易日,执行策略回测
3. 上传报告图表到OSS
4. 发送结果到钉钉
用法:
python scripts/daily_scheduler.py
python scripts/daily_scheduler.py --time 15:00
"""
import sys
import os
import time
import argparse
import subprocess
from datetime import datetime
from pathlib import Path
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv(project_root / ".env")
from loguru import logger
import schedule
import tushare as ts
from core.common.notify import DingTalkBot
from core.common.oss_utils import upload_image_to_oss
# 配置日志
logger.add(
project_root / "logs" / "scheduler_{time}.log",
rotation="1 day",
retention="7 days",
level="INFO",
)
def is_trading_day(date_str: str = None) -> bool:
"""
检查指定日期是否为交易日
Args:
date_str: 日期字符串 (YYYYMMDD),默认今天
Returns:
bool: 是否为交易日
"""
if date_str is None:
date_str = datetime.now().strftime("%Y%m%d")
try:
token = os.getenv("TUSHARE_TOKEN")
if not token:
logger.error("TUSHARE_TOKEN 未配置")
return False
pro = ts.pro_api(token)
df = pro.trade_cal(
exchange='SSE',
start_date=date_str,
end_date=date_str,
is_open='1'
)
is_open = len(df) > 0 and df.iloc[0]['is_open'] == 1
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
return is_open
except Exception as e:
logger.error(f"检查交易日失败: {e}")
# 失败时默认执行(避免错过交易日)
return True
def run_strategy(config_path: str = "config/strategies/rotation.yaml") -> dict:
"""
执行策略回测
Args:
config_path: 配置文件路径
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行策略回测...")
try:
# 构建命令
cmd = [
sys.executable,
str(project_root / "scripts" / "run_rotation.py"),
"--config", config_path,
"--save-path", f"results/report_{datetime.now().strftime('%Y%m%d')}"
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行策略
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=300 # 5分钟超时
)
if result.returncode != 0:
logger.error(f"策略执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("策略执行成功")
logger.debug(result.stdout)
# 查找生成的报告文件
report_date = datetime.now().strftime('%Y%m%d')
chart_path = project_root / "results" / f"report_{report_date}_chart.png"
# 如果找不到带日期的,尝试默认路径
if not chart_path.exists():
chart_path = project_root / "results" / "report_chart.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("策略执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"策略执行异常: {e}")
return {"success": False, "error": str(e)}
def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool:
"""
上传报告到OSS并发送到钉钉
Args:
chart_path: 图表文件路径
summary_text: 摘要文本
Returns:
bool: 是否发送成功
"""
logger.info("开始发送报告到钉钉...")
try:
bot = DingTalkBot()
if not bot.webhook:
logger.error("钉钉未配置,无法发送")
return False
today_str = datetime.now().strftime('%Y-%m-%d')
# 发送图文消息
success = bot.send_image_via_oss(
image_path=chart_path,
title=f"ETF轮动策略日报 ({today_str})",
text=summary_text or f"今日调仓信号已生成",
expire_days=7
)
if success:
logger.info("报告发送成功")
else:
logger.error("报告发送失败")
return success
except Exception as e:
logger.error(f"发送报告异常: {e}")
return False
def setup_schedule(target_time: str = "15:30", config_path: str = "config/strategies/rotation.yaml"):
"""
设置定时任务
Args:
target_time: 执行时间 (HH:MM)
config_path: 配置文件路径
"""
logger.info(f"设置定时任务: 每天 {target_time} 执行")
# 清除所有现有任务
schedule.clear()
# 添加每日任务
schedule.every().day.at(target_time).do(daily_task, config_path)
logger.info("定时任务设置完成,等待执行...")
def daily_task(config_path: str = "config/strategies/rotation.yaml"):
"""
每日任务主流程
Args:
config_path: 配置文件路径
"""
today_str = datetime.now().strftime('%Y-%m-%d')
logger.info(f"=" * 60)
logger.info(f"开始执行每日任务: {today_str}")
logger.info(f"=" * 60)
# 1. 检查是否为交易日
if not is_trading_day():
logger.info("今天不是交易日,跳过执行")
return
# 2. 执行策略
result = run_strategy(config_path)
if not result["success"]:
# 发送错误通知
bot = DingTalkBot()
bot.send_text(f"策略执行失败: {result.get('error', '未知错误')}")
return
# 3. 发送报告
if result.get("chart_path"):
send_report_to_dingtalk(
chart_path=result["chart_path"],
summary_text="今日ETF轮动策略调仓信号"
)
else:
logger.warning("未找到报告图表")
logger.info("每日任务完成")
def run_scheduler_loop():
"""运行调度器循环"""
logger.info("启动调度器循环,按 Ctrl+C 停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("调度器已停止")
def main():
parser = argparse.ArgumentParser(description="ETF策略每日定时任务")
parser.add_argument(
"--time",
type=str,
default="15:30",
help="执行时间 (HH:MM)默认15:30",
)
parser.add_argument(
"--config",
type=str,
default="config/strategies/rotation.yaml",
help="配置文件路径",
)
parser.add_argument(
"--run-now",
action="store_true",
help="立即执行一次(不启动定时任务)",
)
parser.add_argument(
"--daemon",
action="store_true",
help="后台运行(持续执行定时任务)",
)
args = parser.parse_args()
# 创建日志目录
(project_root / "logs").mkdir(exist_ok=True)
if args.run_now:
# 立即执行一次
daily_task(args.config)
elif args.daemon:
# 后台运行模式
setup_schedule(args.time, args.config)
run_scheduler_loop()
else:
# 默认:设置定时任务并执行一次(用于测试)
setup_schedule(args.time, args.config)
logger.info("执行一次任务用于测试...")
daily_task(args.config)
logger.info("测试完成,启动定时任务循环(按 Ctrl+C 停止)...")
run_scheduler_loop()
if __name__ == "__main__":
main()