Files
etf/archive/legacy_scripts/scripts/daily_scheduler.py
aszerW 1fca536c95 refactor: 归档旧代码,保留新框架结构
归档内容:
- core/ (数据源、因子计算、通用工具) → archive/legacy_core/
- strategies/rotation/engine.py, portfolio.py, report.py → archive/legacy_core/
- scripts/ (run_rotation, daily_scheduler) → archive/legacy_scripts/
- examples/ → archive/legacy_examples/
- tests/ (实验、对比测试) → archive/legacy_tests/
- 单独文件 (fetch_*.py, 动量.py, 全球市场.py等) → archive/single_files/

保留新结构:
- framework/ (抽象接口)
- strategies/shared/ (定制组件)
- strategies/rotation/strategy.py (新策略)
- 外层配置: .env, .dockerignore, build-and-push.sh, hk_ecs.pem, README.md, requirements.txt
- Docker相关: Dockerfile, Dockerfile_base, docker-compose.yml

更新README反映新框架架构
2026-05-11 23:34:23 +08:00

303 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ETF策略每日定时任务
功能:
1. 每天15:00收盘后检查是否为交易日
2. 如果是交易日,执行策略回测
3. 上传报告图表到OSS
4. 发送结果到钉钉
用法:
python scripts/daily_scheduler.py
python scripts/daily_scheduler.py --time 15:00
"""
import sys
import os
import time
import argparse
import subprocess
from datetime import datetime
from pathlib import Path
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv(project_root / ".env")
from loguru import logger
import schedule
import tushare as ts
from core.common.notify import DingTalkBot, send_to_all_groups
from core.common.oss_utils import upload_image_to_oss
# 配置日志
logger.add(
project_root / "logs" / "scheduler_{time}.log",
rotation="1 day",
retention="7 days",
level="INFO",
)
def is_trading_day(date_str: str = None) -> bool:
"""
检查指定日期是否为交易日
Args:
date_str: 日期字符串 (YYYYMMDD),默认今天
Returns:
bool: 是否为交易日
"""
if date_str is None:
date_str = datetime.now().strftime("%Y%m%d")
try:
token = os.getenv("TUSHARE_TOKEN")
if not token:
logger.error("TUSHARE_TOKEN 未配置")
return False
pro = ts.pro_api(token)
df = pro.trade_cal(
exchange="SSE", start_date=date_str, end_date=date_str, is_open="1"
)
is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
return is_open
except Exception as e:
logger.error(f"检查交易日失败: {e}")
# 失败时默认执行(避免错过交易日)
return True
def run_strategy(config_path: str = "config/strategies/rotation.yaml") -> dict:
"""
执行策略回测
Args:
config_path: 配置文件路径
Returns:
dict: 执行结果,包含报告路径等信息
"""
logger.info("开始执行策略回测...")
try:
# 构建命令
cmd = [
sys.executable,
str(project_root / "scripts" / "run_rotation.py"),
"--config",
config_path,
"--save-path",
f"results/report_{datetime.now().strftime('%Y%m%d')}",
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 执行策略
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=project_root,
timeout=300, # 5分钟超时
)
if result.returncode != 0:
logger.error(f"策略执行失败:\n{result.stderr}")
return {"success": False, "error": result.stderr}
logger.info("策略执行成功")
logger.debug(result.stdout)
# 查找生成的报告文件
report_date = datetime.now().strftime("%Y%m%d")
chart_path = project_root / "results" / f"report_{report_date}_chart.png"
# 如果找不到带日期的,尝试默认路径
if not chart_path.exists():
chart_path = project_root / "results" / "report_chart.png"
return {
"success": True,
"stdout": result.stdout,
"chart_path": str(chart_path) if chart_path.exists() else None,
}
except subprocess.TimeoutExpired:
logger.error("策略执行超时")
return {"success": False, "error": "timeout"}
except Exception as e:
logger.error(f"策略执行异常: {e}")
return {"success": False, "error": str(e)}
def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool:
"""
上传报告到OSS并发送到钉钉
Args:
chart_path: 图表文件路径
summary_text: 摘要文本
Returns:
bool: 是否发送成功
"""
logger.info("开始发送报告到钉钉...")
try:
bot = DingTalkBot()
if not bot.webhook:
logger.error("钉钉未配置,无法发送")
return False
today_str = datetime.now().strftime("%Y-%m-%d")
# 向所有群发送图文消息
success = send_to_all_groups(
"send_image_via_oss",
image_path=chart_path,
title=f"ETF轮动策略调仓日报 ({today_str})",
text=summary_text,
expire_days=7,
)
if success:
logger.info("报告发送成功")
else:
logger.error("报告发送失败")
return success
except Exception as e:
logger.error(f"发送报告异常: {e}")
return False
def setup_schedule(
target_time: str = "15:30", config_path: str = "config/strategies/rotation.yaml"
):
"""
设置定时任务
Args:
target_time: 执行时间 (HH:MM)
config_path: 配置文件路径
"""
logger.info(f"设置定时任务: 每天 {target_time} 执行")
# 清除所有现有任务
schedule.clear()
# 添加每日任务
schedule.every().day.at(target_time).do(daily_task, config_path)
logger.info("定时任务设置完成,等待执行...")
def daily_task(config_path: str = "config/strategies/rotation.yaml"):
"""
每日任务主流程
Args:
config_path: 配置文件路径
"""
today_str = datetime.now().strftime("%Y-%m-%d")
logger.info(f"=" * 60)
logger.info(f"开始执行每日任务: {today_str}")
logger.info(f"=" * 60)
# 1. 检查是否为交易日
if not is_trading_day():
logger.info("今天不是交易日,跳过执行")
return
# 2. 执行策略
result = run_strategy(config_path)
if not result["success"]:
# 发送错误通知
# 发送错误通知到所有群
send_to_all_groups("send_text", content=f"策略执行失败: {result.get('error', '未知错误')}")
return
# 3. 发送报告
if result.get("chart_path"):
send_report_to_dingtalk(
chart_path=result["chart_path"], summary_text=""
)
else:
logger.warning("未找到报告图表")
logger.info("每日任务完成")
def run_scheduler_loop():
"""运行调度器循环"""
logger.info("启动调度器循环,按 Ctrl+C 停止")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("调度器已停止")
def main():
parser = argparse.ArgumentParser(description="ETF策略每日定时任务")
parser.add_argument(
"--time",
type=str,
default="09:00",
help="执行时间 (HH:MM)默认09:00",
)
parser.add_argument(
"--config",
type=str,
default="config/strategies/rotation.yaml",
help="配置文件路径",
)
parser.add_argument(
"--run-now",
action="store_true",
help="立即执行一次并退出(不启动定时任务)",
)
parser.add_argument(
"--no-daemon",
action="store_true",
help="非后台模式:执行一次后进入定时循环",
)
args = parser.parse_args()
# 创建日志目录
(project_root / "logs").mkdir(exist_ok=True)
if args.run_now:
# 立即执行一次并退出
daily_task(args.config)
elif args.no_daemon:
# 非后台模式:执行一次后进入定时循环
setup_schedule(args.time, args.config)
logger.info("执行一次任务用于测试...")
daily_task(args.config)
logger.info("测试完成,启动定时任务循环(按 Ctrl+C 停止)...")
run_scheduler_loop()
else:
# 默认后台daemon模式
setup_schedule(args.time, args.config)
run_scheduler_loop()
if __name__ == "__main__":
main()