feat(scripts): 迁移轮动策略定时调度器
新增文件: - scripts/daily_scheduler.py: 定时调度器,支持交易日判断、回测执行、OSS上传、钉钉推送 - scripts/run_rotation.py: 回测入口脚本,支持Flask API和本地数据源切换 - config/settings.py: 配置管理模块,支持钉钉多群配置 功能: 1. 每天15:30自动检查交易日 2. 交易日执行策略回测生成报告 3. 上传报告图片到OSS 4. 发送图片链接到钉钉群 修复: - 添加oss2库SyntaxWarning过滤(Python 3.12兼容) - 钉钉消息精简为标题+图片格式
This commit is contained in:
93
config/settings.py
Normal file
93
config/settings.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""
|
||||
配置管理模块
|
||||
|
||||
从环境变量读取配置信息
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
# 默认基准
|
||||
DEFAULT_BENCHMARK_CODE = "000300.SH"
|
||||
DEFAULT_BENCHMARK_NAME = "沪深300指数"
|
||||
|
||||
# 默认代码名称映射
|
||||
DEFAULT_CODE_NAME_MAP = {
|
||||
"000300.SH": "沪深300",
|
||||
"000905.SH": "中证500",
|
||||
"000852.SH": "中证1000",
|
||||
}
|
||||
|
||||
|
||||
def get_dingtalk_config() -> Dict[str, str]:
|
||||
"""
|
||||
获取钉钉机器人配置(第一个群)
|
||||
|
||||
Returns:
|
||||
dict: 包含 webhook 和 secret
|
||||
"""
|
||||
webhook = os.getenv("DINGTALK_WEBHOOK", "")
|
||||
secret = os.getenv("DINGTALK_SECRET", "")
|
||||
|
||||
return {
|
||||
"webhook": webhook,
|
||||
"secret": secret,
|
||||
}
|
||||
|
||||
|
||||
def get_all_dingtalk_configs() -> List[Dict[str, str]]:
|
||||
"""
|
||||
获取所有钉钉机器人配置(支持多群)
|
||||
|
||||
环境变量格式:
|
||||
DINGTALK_WEBHOOK_1=xxx
|
||||
DINGTALK_SECRET_1=xxx
|
||||
DINGTALK_WEBHOOK_2=xxx
|
||||
DINGTALK_SECRET_2=xxx
|
||||
...
|
||||
|
||||
如果没有编号配置,则使用 DINGTALK_WEBHOOK 和 DINGTALK_SECRET
|
||||
|
||||
Returns:
|
||||
list: 配置列表,每项包含 webhook 和 secret
|
||||
"""
|
||||
configs = []
|
||||
|
||||
# 查找编号配置
|
||||
i = 1
|
||||
while True:
|
||||
webhook = os.getenv(f"DINGTALK_WEBHOOK_{i}", "")
|
||||
secret = os.getenv(f"DINGTALK_SECRET_{i}", "")
|
||||
|
||||
if not webhook:
|
||||
break
|
||||
|
||||
configs.append({
|
||||
"webhook": webhook,
|
||||
"secret": secret,
|
||||
})
|
||||
i += 1
|
||||
|
||||
# 如果没有编号配置,使用默认配置
|
||||
if not configs:
|
||||
default_config = get_dingtalk_config()
|
||||
if default_config["webhook"]:
|
||||
configs.append(default_config)
|
||||
|
||||
return configs
|
||||
|
||||
|
||||
def get_db_config() -> Dict[str, str]:
|
||||
"""
|
||||
获取数据库配置
|
||||
|
||||
Returns:
|
||||
dict: 数据库配置
|
||||
"""
|
||||
return {
|
||||
"host": os.getenv("DB_HOST", "localhost"),
|
||||
"port": os.getenv("DB_PORT", "5432"),
|
||||
"user": os.getenv("DB_USER", ""),
|
||||
"password": os.getenv("DB_PASSWORD", ""),
|
||||
"database": os.getenv("DB_NAME", ""),
|
||||
}
|
||||
314
scripts/daily_scheduler.py
Normal file
314
scripts/daily_scheduler.py
Normal file
@@ -0,0 +1,314 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ETF轮动策略定时调度器
|
||||
|
||||
功能:
|
||||
1. 每天15:30检查是否为交易日
|
||||
2. 如果是交易日,执行策略回测生成报告
|
||||
3. 上传报告图片到OSS
|
||||
4. 发送图片链接到钉钉群
|
||||
|
||||
用法:
|
||||
python scripts/daily_scheduler.py --time 15:30 # 后台定时模式
|
||||
python scripts/daily_scheduler.py --now # 立即执行一次
|
||||
python scripts/daily_scheduler.py --no-daemon # 非后台模式
|
||||
"""
|
||||
|
||||
import warnings
|
||||
# 抑制oss2库的SyntaxWarning(无效转义序列'\&')
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning, module='oss2')
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import subprocess
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# 添加项目根目录
|
||||
project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
# 添加归档模块路径(使用原有的 notify 和 oss_utils)
|
||||
archive_path = project_root / 'archive' / 'legacy_core'
|
||||
sys.path.insert(0, str(archive_path))
|
||||
|
||||
# 加载环境变量
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from loguru import logger
|
||||
import schedule
|
||||
import tushare as ts
|
||||
|
||||
# 导入原有的通知和OSS模块
|
||||
try:
|
||||
from core.common.notify import DingTalkBot, send_to_all_groups
|
||||
from core.common.oss_utils import upload_image_to_oss
|
||||
HAS_NOTIFY_MODULE = True
|
||||
except ImportError as e:
|
||||
logger.warning(f"无法导入通知模块: {e}")
|
||||
HAS_NOTIFY_MODULE = False
|
||||
|
||||
# 配置日志
|
||||
log_path = project_root / "logs" / "scheduler_{time}.log"
|
||||
logger.add(
|
||||
str(log_path),
|
||||
rotation="1 day",
|
||||
retention="7 days",
|
||||
level="INFO"
|
||||
)
|
||||
|
||||
|
||||
def is_trade_day(date_str: str = None) -> bool:
|
||||
"""
|
||||
判断指定日期是否为交易日
|
||||
|
||||
Args:
|
||||
date_str: 日期字符串 (YYYYMMDD),默认今天
|
||||
|
||||
Returns:
|
||||
bool: 是否为交易日
|
||||
"""
|
||||
if date_str is None:
|
||||
date_str = datetime.now().strftime("%Y%m%d")
|
||||
|
||||
try:
|
||||
token = os.getenv('TUSHARE_TOKEN')
|
||||
if not token:
|
||||
logger.error("TUSHARE_TOKEN 未设置")
|
||||
return True # 失败时默认执行(避免错过交易日)
|
||||
|
||||
pro = ts.pro_api(token)
|
||||
df = pro.trade_cal(
|
||||
exchange='SSE',
|
||||
start_date=date_str,
|
||||
end_date=date_str,
|
||||
is_open='1'
|
||||
)
|
||||
|
||||
is_open = len(df) > 0 and df.iloc[0]["is_open"] == 1
|
||||
logger.info(f"日期 {date_str} 是否为交易日: {is_open}")
|
||||
return is_open
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"检查交易日失败: {e}")
|
||||
return True # 失败时默认执行
|
||||
|
||||
|
||||
def run_strategy(config_path: str = "strategies/rotation/config.yaml") -> dict:
|
||||
"""
|
||||
执行策略回测并生成报告
|
||||
|
||||
使用 generate_legacy_report.py 生成原引擎格式报告
|
||||
|
||||
Args:
|
||||
config_path: 配置文件路径
|
||||
|
||||
Returns:
|
||||
dict: 执行结果,包含报告路径等信息
|
||||
"""
|
||||
logger.info("开始执行策略回测...")
|
||||
|
||||
try:
|
||||
# 使用 generate_legacy_report.py 生成报告
|
||||
# 输出文件: results/rotation_legacy_chart.png
|
||||
cmd = [
|
||||
sys.executable,
|
||||
str(project_root / "scripts" / "generate_legacy_report.py")
|
||||
]
|
||||
|
||||
logger.info(f"执行命令: {' '.join(cmd)}")
|
||||
|
||||
# 执行策略
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=project_root,
|
||||
timeout=300 # 5分钟超时
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"策略执行失败:\n{result.stderr}")
|
||||
return {"success": False, "error": result.stderr}
|
||||
|
||||
logger.info("策略执行成功")
|
||||
logger.debug(result.stdout)
|
||||
|
||||
# 查找生成的报告图片(固定路径)
|
||||
chart_path = project_root / "results" / "rotation_legacy_chart.png"
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"stdout": result.stdout,
|
||||
"chart_path": str(chart_path) if chart_path.exists() else None,
|
||||
}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error("策略执行超时")
|
||||
return {"success": False, "error": "timeout"}
|
||||
except Exception as e:
|
||||
logger.error(f"策略执行异常: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool:
|
||||
"""
|
||||
上传报告到OSS并发送图片链接到钉钉
|
||||
|
||||
Args:
|
||||
chart_path: 图片文件路径
|
||||
summary_text: 摘要文本
|
||||
|
||||
Returns:
|
||||
bool: 是否发送成功
|
||||
"""
|
||||
logger.info("开始发送报告到钉钉...")
|
||||
|
||||
if not HAS_NOTIFY_MODULE:
|
||||
logger.warning("通知模块未加载,无法发送")
|
||||
return False
|
||||
|
||||
try:
|
||||
today_str = datetime.now().strftime("%Y-%m-%d")
|
||||
title = f"ETF轮动策略调仓日报 ({today_str})"
|
||||
|
||||
# 使用原有的 send_to_all_groups 发送图片
|
||||
# 该方法会自动:上传到OSS → 发送Markdown消息带图片链接
|
||||
success = send_to_all_groups(
|
||||
"send_image_via_oss",
|
||||
image_path=chart_path,
|
||||
title=title,
|
||||
text=summary_text,
|
||||
expire_days=7
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info("报告发送成功")
|
||||
else:
|
||||
logger.error("报告发送失败")
|
||||
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"发送报告异常: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def setup_schedule(target_time: str = "15:30", config_path: str = "strategies/rotation/config.yaml"):
|
||||
"""
|
||||
设置定时任务
|
||||
|
||||
Args:
|
||||
target_time: 执行时间 (HH:MM)
|
||||
config_path: 配置文件路径
|
||||
"""
|
||||
logger.info(f"设置定时任务: 每天 {target_time} 执行")
|
||||
|
||||
# 清除已有任务
|
||||
schedule.clear()
|
||||
|
||||
# 添加每日任务
|
||||
schedule.every().day.at(target_time).do(daily_task, config_path=config_path)
|
||||
|
||||
logger.info("定时任务设置完成,等待执行...")
|
||||
|
||||
|
||||
def daily_task(config_path: str = "strategies/rotation/config.yaml"):
|
||||
"""
|
||||
每日任务主流程
|
||||
|
||||
Args:
|
||||
config_path: 配置文件路径
|
||||
"""
|
||||
today_str = datetime.now().strftime("%Y-%m-%d")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"开始执行每日任务: {today_str}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# 1. 检查是否为交易日
|
||||
if not is_trade_day():
|
||||
logger.info("今天不是交易日,跳过执行")
|
||||
return
|
||||
|
||||
# 2. 执行策略
|
||||
result = run_strategy(config_path)
|
||||
|
||||
if not result["success"]:
|
||||
logger.error(f"策略执行失败: {result.get('error', '未知错误')}")
|
||||
return
|
||||
|
||||
# 3. 发送报告
|
||||
if result.get("chart_path"):
|
||||
# 只发送标题和图片,不附带文字摘要
|
||||
send_report_to_dingtalk(
|
||||
chart_path=result["chart_path"],
|
||||
summary_text="" # 空字符串,只显示标题和图片
|
||||
)
|
||||
else:
|
||||
logger.warning("未找到报告文件")
|
||||
|
||||
logger.info("每日任务执行完成")
|
||||
|
||||
|
||||
def run_scheduler_loop():
|
||||
"""运行调度循环"""
|
||||
logger.info("启动调度器,按 Ctrl+C 停止")
|
||||
try:
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("调度器已停止")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='ETF策略定时调度器')
|
||||
parser.add_argument(
|
||||
'--time',
|
||||
type=str,
|
||||
default='15:30',
|
||||
help='执行时间 (HH:MM),默认 15:30'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
type=str,
|
||||
default='strategies/rotation/config.yaml',
|
||||
help='配置文件路径'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--now',
|
||||
action='store_true',
|
||||
help='立即执行一次并退出(不启动定时)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--no-daemon',
|
||||
action='store_true',
|
||||
help='非后台模式:执行一次后进入定时循环(测试用)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 创建日志目录
|
||||
(project_root / "logs").mkdir(exist_ok=True)
|
||||
|
||||
if args.now:
|
||||
# 立即执行一次并退出
|
||||
daily_task(args.config)
|
||||
elif args.no_daemon:
|
||||
# 非后台模式:执行一次后进入定时循环
|
||||
setup_schedule(args.time, args.config)
|
||||
logger.info("执行一次测试...")
|
||||
daily_task(args.config)
|
||||
logger.info("测试完成,启动定时任务循环(Ctrl+C 停止)...")
|
||||
run_scheduler_loop()
|
||||
else:
|
||||
# 默认:后台定时模式
|
||||
setup_schedule(args.time, args.config)
|
||||
run_scheduler_loop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
115
scripts/run_rotation.py
Normal file
115
scripts/run_rotation.py
Normal file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
轮动策略回测入口脚本
|
||||
|
||||
用法:
|
||||
python scripts/run_rotation.py --config strategies/rotation/config.yaml
|
||||
python scripts/run_rotation.py --config strategies/rotation/config.yaml --save-path results/report
|
||||
"""
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
# 添加项目根目录到路径
|
||||
project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
# 加载环境变量
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from strategies.rotation.strategy import RotationStrategy
|
||||
|
||||
|
||||
def load_config(config_path: str) -> dict:
|
||||
"""加载配置"""
|
||||
import yaml
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='ETF轮动策略回测')
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
type=str,
|
||||
default='strategies/rotation/config.yaml',
|
||||
help='配置文件路径'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--save-path',
|
||||
type=str,
|
||||
default=None,
|
||||
help='报告保存路径前缀'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--no-api',
|
||||
action='store_true',
|
||||
help='不使用Flask API,使用本地数据源'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
start_time = datetime.now()
|
||||
|
||||
print("=" * 60)
|
||||
print(" ETF轮动策略 回测系统")
|
||||
print("=" * 60)
|
||||
|
||||
# 加载配置
|
||||
print(f"\n加载配置: {args.config}")
|
||||
config = load_config(args.config)
|
||||
|
||||
# 显示配置摘要
|
||||
code_list = list(config.get('code_list', {}).keys())
|
||||
print(f"候选标的: {len(code_list)} 只")
|
||||
print(f"回测区间: {config.get('start_date', 'N/A')} ~ {config.get('end_date', 'N/A')}")
|
||||
print(f"因子类型: {config.get('factor_type', 'momentum')}")
|
||||
print(f"窗口天数: {config.get('n_days', 25)}")
|
||||
print(f"选股数量: {config.get('select_num', 3)}")
|
||||
print(f"调仓周期: {config.get('rebalance_days', 1)} 天")
|
||||
print(f"交易成本: {config.get('trade_cost', 0.001):.2%}")
|
||||
|
||||
# 初始化策略
|
||||
print("\n初始化策略...")
|
||||
strategy = RotationStrategy.from_yaml(args.config)
|
||||
|
||||
# 设置保存路径
|
||||
if args.save_path is None:
|
||||
report_date = datetime.now().strftime('%Y%m%d')
|
||||
args.save_path = f"results/report_{report_date}"
|
||||
|
||||
# 执行回测
|
||||
print("\n" + "=" * 60)
|
||||
print("开始回测...")
|
||||
print("=" * 60)
|
||||
|
||||
# 使用Flask API或本地数据源
|
||||
use_flask_api = not args.no_api
|
||||
data = strategy.get_data(use_flask_api=use_flask_api)
|
||||
|
||||
result = strategy.run_backtest(data=data, save_path=args.save_path)
|
||||
|
||||
# 输出结果
|
||||
if result.get('result') is not None:
|
||||
final_nav = result['result']['策略净值'].iloc[-1]
|
||||
total_return = (final_nav - 1) * 100
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("回测完成!")
|
||||
print("=" * 60)
|
||||
print(f"最终净值: {final_nav:.4f}")
|
||||
print(f"累计收益: {total_return:.2f}%")
|
||||
print(f"调仓次数: {len(result.get('rebalance_events', []))} 次")
|
||||
print(f"报告保存: {args.save_path}_*.csv")
|
||||
|
||||
elapsed = datetime.now() - start_time
|
||||
print(f"\n总耗时: {elapsed.total_seconds():.1f}秒")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user