refactor(scheduler): move daily_scheduler.py to rotation/ and add simple_rotation support

- Move scripts/daily_scheduler.py -> rotation/daily_scheduler.py
- Add run_simple_rotation() to execute simple_rotation.py via subprocess
- Add --strategy flag (simple/legacy/all) for flexible strategy selection
- Add --simple-config flag for custom simple rotation config path
- Update Dockerfile and docker-compose.yml path references
- Add configurable title to send_report_to_dingtalk()
This commit is contained in:
2026-06-02 01:16:34 +08:00
parent 5e11b6b690
commit 4791d3cf40
3 changed files with 133 additions and 37 deletions

View File

@@ -1,314 +0,0 @@
#!/usr/bin/env python3
"""
ETF轮动策略定时调度器
功能:
1. 每天15:30检查是否为交易日
2. 如果是交易日,执行策略回测生成报告
3. 上传报告图片到OSS
4. 发送图片链接到钉钉群
用法:
python scripts/daily_scheduler.py --time 15:30 # 后台定时模式
python scripts/daily_scheduler.py --now # 立即执行一次
python scripts/daily_scheduler.py --no-daemon # 非后台模式
"""
import warnings
# 抑制oss2库的SyntaxWarning无效转义序列'\&'
warnings.filterwarnings('ignore', category=SyntaxWarning, module='oss2')
import os
import sys
import time
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
# 添加项目根目录
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 添加归档模块路径(使用原有的 notify 和 oss_utils
archive_path = project_root / 'archive' / 'legacy_core'
sys.path.insert(0, str(archive_path))
# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
from loguru import logger
import schedule
import tushare as ts
# 导入原有的通知和OSS模块
try:
from core.common.notify import DingTalkBot, send_to_all_groups
from core.common.oss_utils import upload_image_to_oss
HAS_NOTIFY_MODULE = True
except ImportError as e:
logger.warning(f"无法导入通知模块: {e}")
HAS_NOTIFY_MODULE = False
# 配置日志
log_path = project_root / "logs" / "scheduler_{time}.log"
logger.add(
str(log_path),
rotation="1 day",
retention="7 days",
level="INFO"
)
def is_trade_day(date_str: str = None) -> bool:
"""
判断指定日期是否为交易日
Args:
date_str: 日期字符串 (YYYYMMDD),默认今天
Returns:
bool: 是否为交易日
"""
if date_str is None:
date_str = datetime.now().strftime("%Y%m%d")
try:
token = os.getenv('TUSHARE_TOKEN')
if not token:
logger.error("TUSHARE_TOKEN 未设置")
return True # 失败时默认执行(避免错过交易日)
pro = ts.pro_api(token)
df = pro.trade_cal(
exchange='SSE',
start_date=date_str,
end_date=date_str,
is_open='1'
)
is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
return is_open
except Exception as e:
logger.error(f"检查交易日失败: {e}")
return True # 失败时默认执行
def run_strategy(config_path: str = "strategies/rotation/config.yaml") -> dict:
"""
执行策略回测并生成报告
使用 generate_legacy_report.py 生成原引擎格式报告
Args:
config_path: 配置文件路径
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行策略回测...")
try:
# 使用 generate_legacy_report.py 生成报告
# 输出文件: results/rotation_legacy_chart.png
cmd = [
sys.executable,
str(project_root / "scripts" / "generate_legacy_report.py")
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行策略增加超时到15分钟因为需要获取多市场数据
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=900 # 15分钟超时原5分钟不够数据获取串行需要更长时间
)
if result.returncode != 0:
logger.error(f"策略执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("策略执行成功")
logger.debug(result.stdout)
# 查找生成的报告图片(固定路径)
chart_path = project_root / "results" / "rotation_legacy_chart.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("策略执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"策略执行异常: {e}")
return {"success": False, "error": str(e)}
def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool:
"""
上传报告到OSS并发送图片链接到钉钉
Args:
chart_path: 图片文件路径
summary_text: 摘要文本
Returns:
bool: 是否发送成功
"""
logger.info("开始发送报告到钉钉...")
if not HAS_NOTIFY_MODULE:
logger.warning("通知模块未加载,无法发送")
return False
try:
today_str = datetime.now().strftime("%Y-%m-%d")
title = f"ETF轮动策略调仓日报 ({today_str})"
# 使用原有的 send_to_all_groups 发送图片
# 该方法会自动上传到OSS → 发送Markdown消息带图片链接
success = send_to_all_groups(
"send_image_via_oss",
image_path=chart_path,
title=title,
text=summary_text,
expire_days=7
)
if success:
logger.info("报告发送成功")
else:
logger.error("报告发送失败")
return success
except Exception as e:
logger.error(f"发送报告异常: {e}")
return False
def setup_schedule(target_time: str = "15:30", config_path: str = "strategies/rotation/config.yaml"):
"""
设置定时任务
Args:
target_time: 执行时间 (HH:MM)
config_path: 配置文件路径
"""
logger.info(f"设置定时任务: 每天 {target_time} 执行")
# 清除已有任务
schedule.clear()
# 添加每日任务
schedule.every().day.at(target_time).do(daily_task, config_path=config_path)
logger.info("定时任务设置完成,等待执行...")
def daily_task(config_path: str = "strategies/rotation/config.yaml"):
"""
每日任务主流程
Args:
config_path: 配置文件路径
"""
today_str = datetime.now().strftime("%Y-%m-%d")
logger.info("=" * 60)
logger.info(f"开始执行每日任务: {today_str}")
logger.info("=" * 60)
# 1. 检查是否为交易日
if not is_trade_day():
logger.info("今天不是交易日,跳过执行")
return
# 2. 执行策略
result = run_strategy(config_path)
if not result["success"]:
logger.error(f"策略执行失败: {result.get('error', '未知错误')}")
return
# 3. 发送报告
if result.get("chart_path"):
# 只发送标题和图片,不附带文字摘要
send_report_to_dingtalk(
chart_path=result["chart_path"],
summary_text="" # 空字符串,只显示标题和图片
)
else:
logger.warning("未找到报告文件")
logger.info("每日任务执行完成")
def run_scheduler_loop():
"""运行调度循环"""
logger.info("启动调度器,按 Ctrl+C 停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("调度器已停止")
def main():
parser = argparse.ArgumentParser(description='ETF策略定时调度器')
parser.add_argument(
'--time',
type=str,
default='15:30',
help='执行时间 (HH:MM),默认 15:30'
)
parser.add_argument(
'--config',
type=str,
default='strategies/rotation/config.yaml',
help='配置文件路径'
)
parser.add_argument(
'--now',
action='store_true',
help='立即执行一次并退出(不启动定时)'
)
parser.add_argument(
'--no-daemon',
action='store_true',
help='非后台模式:执行一次后进入定时循环(测试用)'
)
args = parser.parse_args()
# 创建日志目录
(project_root / "logs").mkdir(exist_ok=True)
if args.now:
# 立即执行一次并退出
daily_task(args.config)
elif args.no_daemon:
# 非后台模式:执行一次后进入定时循环
setup_schedule(args.time, args.config)
logger.info("执行一次测试...")
daily_task(args.config)
logger.info("测试完成启动定时任务循环Ctrl+C 停止)...")
run_scheduler_loop()
else:
# 默认:后台定时模式
setup_schedule(args.time, args.config)
run_scheduler_loop()
if __name__ == '__main__':
main()