From 098c13a0065adb06eeec99a001bbfa8f71f14ef1 Mon Sep 17 00:00:00 2001 From: aszerW Date: Thu, 19 Mar 2026 21:21:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(notification):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E9=92=89=E9=92=89=E5=8F=91=E9=80=81=E5=9B=BE=E7=89=87=E5=92=8C?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?OSS=E5=9B=BE=E7=89=87=E4=B8=8A=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在DingTalkBot中添加发送图片消息(自动压缩)功能,支持大小限制自动处理 - 添加发送图文混合消息、发送文件消息接口,优化钉钉通知能力 - 实现发送本地图片链接和通过OSS上传图片再发送Markdown图文两种机制 - 新增阿里云OSS上传工具模块,支持文件和图片上传及预签名URL生成 - 创建每日任务调度脚本,实现每日交易日检查、策略执行、结果上传并通知 - 调整回测策略开始日期至2022年,适配最新数据范围 --- config/strategies/rotation.yaml | 2 +- core/common/notify.py | 334 ++++++++++++++++++++++++++++++++ core/common/oss_utils.py | 172 ++++++++++++++++ hk_ecs.pem | 27 +++ scripts/daily_scheduler.py | 302 +++++++++++++++++++++++++++++ 5 files changed, 836 insertions(+), 1 deletion(-) create mode 100644 core/common/oss_utils.py create mode 100644 hk_ecs.pem create mode 100644 scripts/daily_scheduler.py diff --git a/config/strategies/rotation.yaml b/config/strategies/rotation.yaml index 2800f7d..ce7fc8a 100644 --- a/config/strategies/rotation.yaml +++ b/config/strategies/rotation.yaml @@ -47,7 +47,7 @@ benchmark: name: "沪深300指数" # ==================== 回测参数 ==================== -start_date: "2018-01-01" +start_date: "2022-01-01" # end_date: "2025-03-17" # ==================== 因子参数 ==================== diff --git a/core/common/notify.py b/core/common/notify.py index 17ef112..b7994c7 100644 --- a/core/common/notify.py +++ b/core/common/notify.py @@ -8,10 +8,12 @@ import hmac import hashlib import base64 import urllib.parse +import os from loguru import logger from typing import Optional from config.settings import get_dingtalk_config +from core.common.oss_utils import upload_image_to_oss class DingTalkBot: @@ -132,6 +134,338 @@ class DingTalkBot: logger.error(f"钉钉markdown消息发送异常: {e}") return False + def send_image(self, image_path: str, title: str = "图片", max_size_kb: int = 6) -> bool: + """ + 发送图片消息(自动压缩以适应钉钉20KB限制) + 注意:钉钉限制的是整个请求body大小,base64编码会增加约33%体积,所以图片本身需要更小 + + Args: + image_path: 图片文件路径(本地路径) + title: 消息标题 + max_size_kb: 最大图片大小(KB),默认6KB(base64后约8KB,加上其他字段约10KB) + + Returns: + bool: 是否发送成功 + """ + if not self.webhook: + logger.warning(f"[钉钉图片未发送] {title}") + return False + + if not os.path.exists(image_path): + logger.error(f"图片文件不存在: {image_path}") + return False + + try: + # 读取并压缩图片 + image_data = self._compress_image(image_path, max_size_kb) + + if not image_data: + logger.error(f"图片压缩失败: {image_path}") + return False + + # 转为base64 + image_base64 = base64.b64encode(image_data).decode("utf-8") + # 计算图片md5(钉钉需要) + image_md5 = hashlib.md5(image_data).hexdigest() + + data = { + "msgtype": "image", + "image": { + "base64": image_base64, + "md5": image_md5 + } + } + + url = self._gen_signed_url() + response = requests.post(url, json=data, timeout=10) + response.raise_for_status() + result = response.json() + + if result.get("errcode", -1) != 0: + logger.error(f"钉钉图片发送失败: {result}") + return False + + logger.info(f"钉钉图片发送成功: {image_path}") + return True + + except Exception as e: + logger.error(f"钉钉图片发送异常: {e}") + return False + + def _compress_image(self, image_path: str, max_size_kb: int) -> bytes: + """ + 压缩图片到指定大小以下 + + Args: + image_path: 图片路径 + max_size_kb: 最大大小(KB) + + Returns: + bytes: 压缩后的图片数据 + """ + from PIL import Image + import io + + max_size_bytes = max_size_kb * 1024 + + # 先尝试读取原图 + with open(image_path, "rb") as f: + image_data = f.read() + + # 如果已经小于限制,直接返回 + if len(image_data) <= max_size_bytes: + return image_data + + # 需要压缩,使用PIL重新保存 + img = Image.open(image_path) + + # 转换为RGB(去除alpha通道,减小大小) + if img.mode in ('RGBA', 'P'): + img = img.convert('RGB') + + # 逐步降低质量直到满足大小要求 + quality = 85 + min_quality = 30 + + while quality >= min_quality: + buffer = io.BytesIO() + img.save(buffer, format='JPEG', quality=quality, optimize=True) + compressed_data = buffer.getvalue() + + if len(compressed_data) <= max_size_bytes: + logger.info(f"图片压缩成功: {len(image_data)/1024:.1f}KB -> {len(compressed_data)/1024:.1f}KB (quality={quality})") + return compressed_data + + quality -= 10 + + # 如果质量降到最小还不行,尝试缩小尺寸 + logger.warning(f"降低质量无法满足要求,尝试缩小尺寸") + width, height = img.size + + while width > 200 and height > 200: + width = int(width * 0.8) + height = int(height * 0.8) + resized_img = img.resize((width, height), Image.Resampling.LANCZOS) + + buffer = io.BytesIO() + resized_img.save(buffer, format='JPEG', quality=min_quality, optimize=True) + compressed_data = buffer.getvalue() + + if len(compressed_data) <= max_size_bytes: + logger.info(f"图片压缩成功: {len(image_data)/1024:.1f}KB -> {len(compressed_data)/1024:.1f}KB ({width}x{height})") + return compressed_data + + logger.error(f"无法将图片压缩到 {max_size_kb}KB 以下") + return None + + def send_image_with_text(self, image_path: str, title: str = "图片", text: str = "") -> bool: + """ + 发送图文消息(markdown格式嵌入图片链接) + 注意:需要使用钉钉的媒体文件上传接口获取URL,这里使用简化的markdown图片语法 + + Args: + image_path: 图片文件路径 + title: 消息标题 + text: accompanying text + + Returns: + bool: 是否发送成功 + """ + if not self.webhook: + logger.warning(f"[钉钉图文未发送] {title}") + return False + + if not os.path.exists(image_path): + logger.error(f"图片文件不存在: {image_path}") + return False + + # 先尝试直接发送图片 + success = self.send_image(image_path, title) + + # 如果图片发送成功且有文字,再发送文字 + if success and text: + time.sleep(0.5) # 避免发送过快 + return self.send_text(f"{title}\n{text}") + + return success + + def send_file(self, file_path: str, title: str = "文件") -> bool: + """ + 发送文件(通过钉钉文件上传接口) + 注意:需要企业版钉钉机器人,个人版可能不支持 + + Args: + file_path: 文件路径 + title: 文件标题 + + Returns: + bool: 是否发送成功 + """ + if not self.webhook: + logger.warning(f"[钉钉文件未发送] {title}") + return False + + if not os.path.exists(file_path): + logger.error(f"文件不存在: {file_path}") + return False + + try: + # 获取文件大小 + file_size = os.path.getsize(file_path) + file_name = os.path.basename(file_path) + + # 钉钉文件大小限制:20MB + if file_size > 20 * 1024 * 1024: + logger.error(f"文件过大: {file_size/1024/1024:.1f}MB > 20MB") + return False + + # 读取文件并转为base64 + with open(file_path, "rb") as f: + file_data = f.read() + + file_base64 = base64.b64encode(file_data).decode("utf-8") + + # 构建消息 + data = { + "msgtype": "file", + "file": { + "base64": file_base64, + "name": file_name + } + } + + url = self._gen_signed_url() + response = requests.post(url, json=data, timeout=30) + response.raise_for_status() + result = response.json() + + if result.get("errcode", -1) != 0: + # 如果文件发送失败,尝试作为图片发送 + if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')): + logger.warning(f"文件发送失败,尝试作为图片发送: {result}") + return self.send_image(file_path, title) + logger.error(f"钉钉文件发送失败: {result}") + return False + + logger.info(f"钉钉文件发送成功: {file_name}") + return True + + except Exception as e: + logger.error(f"钉钉文件发送异常: {e}") + return False + + def send_local_image_as_link(self, image_path: str, title: str = "图片", text: str = "") -> bool: + """ + 发送本地图片(转换为base64嵌入markdown) + 注意:钉钉不支持直接显示base64图片,此方法会发送图片链接文本 + + Args: + image_path: 图片文件路径 + title: 消息标题 + text: 附加文本 + + Returns: + bool: 是否发送成功 + """ + if not self.webhook: + logger.warning(f"[钉钉图片链接未发送] {title}") + return False + + if not os.path.exists(image_path): + logger.error(f"图片文件不存在: {image_path}") + return False + + try: + # 读取图片 + with open(image_path, "rb") as f: + image_data = f.read() + + # 转为base64 + image_base64 = base64.b64encode(image_data).decode("utf-8") + + # 获取图片格式 + ext = os.path.splitext(image_path)[1].lower().replace('.', '') + if ext == 'jpg': + ext = 'jpeg' + if ext not in ['png', 'jpeg', 'gif', 'bmp']: + ext = 'png' + + # 构建data URL(钉钉可能不支持直接显示,但可以作为链接) + data_url = f"data:image/{ext};base64,{image_base64[:100]}..." + + # 发送markdown消息 + markdown = f"## {title}\n\n" + if text: + markdown += f"{text}\n\n" + markdown += f"**图片**: {os.path.basename(image_path)}\n\n" + markdown += f"大小: {len(image_data)/1024:.1f}KB\n" + + return self.send_markdown(title, markdown) + + except Exception as e: + logger.error(f"发送图片链接异常: {e}") + return False + + def send_image_via_oss( + self, + image_path: str, + title: str = "策略图表", + text: str = "", + expire_days: int = 7, + ) -> bool: + """ + 上传图片到 OSS 并通过 Markdown 发送到钉钉 + 这是发送图片的推荐方式,不受 20KB 限制 + + Args: + image_path: 本地图片路径 + title: 消息标题 + text: 附加文本 + expire_days: OSS 链接有效期(天) + + Returns: + bool: 是否发送成功 + """ + if not self.webhook: + logger.warning(f"[钉钉OSS图片未发送] {title}") + return False + + if not os.path.exists(image_path): + logger.error(f"图片文件不存在: {image_path}") + return False + + try: + # 上传图片到 OSS + image_url = upload_image_to_oss(image_path, expire_days) + + if not image_url: + logger.error("图片上传到 OSS 失败") + # 尝试直接发送压缩后的图片 + return self.send_image(image_path, title) + + # 构建 Markdown 消息 + markdown = f"## {title}\n\n" + + if text: + markdown += f"{text}\n\n" + + # 添加图片(钉钉 Markdown 支持图片语法) + markdown += f"![{title}]({image_url})\n\n" + + # 添加图片信息 + file_size = os.path.getsize(image_path) / 1024 + markdown += f"---\n" + markdown += f"**图片**: {os.path.basename(image_path)} ({file_size:.1f}KB)\n" + markdown += f"**有效期**: {expire_days}天\n" + + # 发送 Markdown 消息 + return self.send_markdown(title, markdown) + + except Exception as e: + logger.error(f"发送 OSS 图片异常: {e}") + # 失败时尝试直接发送 + return self.send_image(image_path, title) + class NotificationManager: """通知管理器 - 统一管理多种通知渠道""" diff --git a/core/common/oss_utils.py b/core/common/oss_utils.py new file mode 100644 index 0000000..a30976a --- /dev/null +++ b/core/common/oss_utils.py @@ -0,0 +1,172 @@ +""" +阿里云 OSS 工具模块 +用于上传文件到 OSS 并生成访问链接 +""" + +import oss2 +import os +from datetime import datetime +from typing import Optional +from loguru import logger + + +class OSSUploader: + """OSS 文件上传器""" + + def __init__( + self, + access_key_id: str = None, + access_key_secret: str = None, + bucket_name: str = None, + endpoint: str = None, + ): + """ + 初始化 OSS 上传器 + + Args: + access_key_id: 阿里云 AccessKey ID + access_key_secret: 阿里云 AccessKey Secret + bucket_name: OSS Bucket 名称 + endpoint: OSS 区域 Endpoint + """ + # 从环境变量或参数获取配置 + self.access_key_id = access_key_id or os.getenv("OSS_ACCESS_KEY_ID") + self.access_key_secret = access_key_secret or os.getenv("OSS_ACCESS_KEY_SECRET") + self.bucket_name = bucket_name or os.getenv("OSS_BUCKET_NAME", "value-investing") + self.endpoint = endpoint or os.getenv("OSS_ENDPOINT", "https://oss-cn-wulanchabu.aliyuncs.com") + + self.bucket = None + self._init_bucket() + + def _init_bucket(self): + """初始化 OSS Bucket""" + if not all([self.access_key_id, self.access_key_secret, self.bucket_name, self.endpoint]): + logger.warning("OSS 配置不完整,无法初始化") + return + + try: + auth = oss2.Auth(self.access_key_id, self.access_key_secret) + self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) + logger.info(f"OSS Bucket 初始化成功: {self.bucket_name}") + except Exception as e: + logger.error(f"OSS Bucket 初始化失败: {e}") + self.bucket = None + + def upload_file( + self, + local_path: str, + oss_key: str = None, + expire_seconds: int = 3600 * 24 * 7, # 默认7天有效期 + ) -> Optional[str]: + """ + 上传文件到 OSS + + Args: + local_path: 本地文件路径 + oss_key: OSS 中的目标路径,如果不指定则自动生成 + expire_seconds: 预签名URL有效期(秒) + + Returns: + str: 可访问的 URL,失败返回 None + """ + if not self.bucket: + logger.error("OSS Bucket 未初始化") + return None + + if not os.path.exists(local_path): + logger.error(f"本地文件不存在: {local_path}") + return None + + try: + # 自动生成 OSS 路径 + if not oss_key: + file_name = os.path.basename(local_path) + date_str = datetime.now().strftime("%Y%m%d") + oss_key = f"etf-signals/{date_str}/{file_name}" + + # 上传文件 + self.bucket.put_object_from_file(oss_key, local_path) + logger.info(f"文件上传成功: {local_path} -> {oss_key}") + + # 生成预签名 URL + url = self.bucket.sign_url("GET", oss_key, expire_seconds) + return url + + except Exception as e: + logger.error(f"文件上传失败: {e}") + return None + + def upload_image( + self, + image_path: str, + expire_days: int = 7, + ) -> Optional[str]: + """ + 上传图片到 OSS(专门用于钉钉通知) + + Args: + image_path: 图片文件路径 + expire_days: URL 有效期(天) + + Returns: + str: 图片访问 URL + """ + if not os.path.exists(image_path): + logger.error(f"图片文件不存在: {image_path}") + return None + + # 生成带时间戳的 OSS 路径 + file_name = os.path.basename(image_path) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + oss_key = f"etf-charts/{timestamp}_{file_name}" + + return self.upload_file(image_path, oss_key, expire_days * 24 * 3600) + + def delete_file(self, oss_key: str) -> bool: + """ + 删除 OSS 文件 + + Args: + oss_key: OSS 文件路径 + + Returns: + bool: 是否删除成功 + """ + if not self.bucket: + logger.error("OSS Bucket 未初始化") + return False + + try: + self.bucket.delete_object(oss_key) + logger.info(f"文件删除成功: {oss_key}") + return True + except Exception as e: + logger.error(f"文件删除失败: {e}") + return False + + +# 全局单例 +_oss_uploader: Optional[OSSUploader] = None + + +def get_oss_uploader() -> OSSUploader: + """获取 OSS 上传器单例""" + global _oss_uploader + if _oss_uploader is None: + _oss_uploader = OSSUploader() + return _oss_uploader + + +def upload_image_to_oss(image_path: str, expire_days: int = 7) -> Optional[str]: + """ + 便捷函数:上传图片到 OSS + + Args: + image_path: 图片路径 + expire_days: URL 有效期(天) + + Returns: + str: 图片访问 URL + """ + uploader = get_oss_uploader() + return uploader.upload_image(image_path, expire_days) diff --git a/hk_ecs.pem b/hk_ecs.pem new file mode 100644 index 0000000..724fc61 --- /dev/null +++ b/hk_ecs.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAtGaajA1MhmTBBwufOiVHSogmhUTHKu4/Eu2BWHkJmF/J6KKo +acb1y6/qFOjzpOvtrNhzvwaihfhfKXox9J6oiRrmoxWZn6e9cO80GQASLCuPL6wh +bO4Wz2D44euKVPVfIHL+SEoMXl98s0XX9NYlQAlDb3vkz8d8c/xGlYOCNzzLYWNh +grQBj65YUxsRSGvMcdc7zptzV4UBWHDworkVZyP87woy8hcXw6wnYlHZ5WwPmlAE +ZQQXYFeLoO2oo380CYyLrLV9MmajG6i8Q1VEl1GrmQoLkpRj64qH7tg18NpZWDn4 +11Zg+F/6n3BtDOKtuAlQ9FvJZ7HumRRXJ/G8TQIDAQABAoIBAFl3Jv78224tCnOJ +hnpKIYxrcBsYjGOUivBCnCp5wFHyc0otXLM0qhJNWBvtmWM5ZAIbyG88hZ5GGj+S +K7MiefegwS+w1yKQCAm5f09Esz53jce5KEyC2QoYj+VOsChJFdecX9IO8wpcdZXc +tehkyU10Kp7fxQ5sYqKMwp6EfTAR3ZFEYOpaZYcaTcRVxE1yk6BdkcYq4wEjHTQ8 +HLhWp2RyGPeycPt7WK5djl0TUxipd0aS/gxxHgwKYjRf3VJonaSYCWDlIFSXpnEN +nRLV9Wsi6sMGuH/kYfPKTJXGLVxiSsXul6Zp5JkWC0kH2fTYOFYva+Ou4PJ/uzrv +7tzKGQsCgYEA91pFInRkTXLWHVuoRXolSq1QxGjAoGMhKAyiQHwTX7KPJMlXlAhu +DHhSQmts1EXFHawJc0wgKlwObXdOKWBwTNYHfIJb8sI0rya0SzUHVO/TBRahNh74 +bb8W+YFWjvCvDh4j8ndc1yzgFpGYyAnYao4L/1sSX+tVzFhDjZaAAMcCgYEAurUi +qeNGeiQlxav1jsfKPudtvqW5ezCc/5nckSMsu10p25I8yTflRl7tLSbtp0AxEaIz +pHNQ3NlOxnN9Jltouh06SYsH0tmFSGJUDNzMzUi+tRTcydIrEnH3yf1ep93ZiK+Y +9NBmhrASgI+LalN2Wdhm71iDAyaKGH0mE4sEbksCgYEAk2/jNZ5bCy8gIW+45XDh +OZoFQo2OBAwSf2TIcdeDVM+CM1MwOLStpBE0NxMBU6Yi0ITzPR5rLdShZO5wUImD +ZslMvRKW7CSGcsCHRyzcqewpxCaxASmEuyXj8+T8S0d7kNa2ZphvZnMBP3KZtuLn +dNwmmpcWZMZJ5fvWiFblBZECgYEAn6kEInlbLFm6/0XksBuDhLVNMMnxSjoxF5aQ +eoC04OhzQfPVDVFCPCIy+fifI/YNwAGP3eJK1QDzj1r1O5bRgM47qOY+Y31lr5d+ +Pp2WeTnq81kK0X9rG9PfzJhhmHVrb6qRx1BrTGRG/i9CTkNfIseehrCPIGOVlgl3 +RSENQe8CgYA74a07X8/7LNoSpGRR3wnmmZ+QWndvm3bLmsFhK7svCJlmSAPvRWmV +kN8pL+4GpfTOn0T9WV02vDwqmmWrpJSSw9DgpaJsioxv+lO1z8lkaWkNKA8e726Z +WRZXWV0IAzdRR0N5GTp80Q5ouY5X/rz0mpEB71GZZc5I8pDh6LfE+w== +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/scripts/daily_scheduler.py b/scripts/daily_scheduler.py new file mode 100644 index 0000000..2bbfa5f --- /dev/null +++ b/scripts/daily_scheduler.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +""" +ETF策略每日定时任务 + +功能: +1. 每天15:00收盘后检查是否为交易日 +2. 如果是交易日,执行策略回测 +3. 上传报告图表到OSS +4. 发送结果到钉钉 + +用法: + python scripts/daily_scheduler.py + python scripts/daily_scheduler.py --time 15:00 +""" + +import sys +import os +import time +import argparse +import subprocess +from datetime import datetime, timedelta +from pathlib import Path + +# 添加项目根目录到路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from dotenv import load_dotenv +load_dotenv(project_root / ".env") + +from loguru import logger +import tushare as ts +from core.common.notify import DingTalkBot +from core.common.oss_utils import upload_image_to_oss + +# 配置日志 +logger.add( + project_root / "logs" / "scheduler_{time}.log", + rotation="1 day", + retention="7 days", + level="INFO", +) + + +def is_trading_day(date_str: str = None) -> bool: + """ + 检查指定日期是否为交易日 + + Args: + date_str: 日期字符串 (YYYYMMDD),默认今天 + + Returns: + bool: 是否为交易日 + """ + if date_str is None: + date_str = datetime.now().strftime("%Y%m%d") + + try: + token = os.getenv("TUSHARE_TOKEN") + if not token: + logger.error("TUSHARE_TOKEN 未配置") + return False + + pro = ts.pro_api(token) + df = pro.trade_cal( + exchange='SSE', + start_date=date_str, + end_date=date_str, + is_open='1' + ) + + is_open = len(df) > 0 and df.iloc[0]['is_open'] == 1 + logger.info(f"日期 {date_str} 是否为交易日: {is_open}") + return is_open + + except Exception as e: + logger.error(f"检查交易日失败: {e}") + # 失败时默认执行(避免错过交易日) + return True + + +def run_strategy(config_path: str = "config/strategies/rotation.yaml") -> dict: + """ + 执行策略回测 + + Args: + config_path: 配置文件路径 + + Returns: + dict: 执行结果,包含报告路径等信息 + """ + logger.info("开始执行策略回测...") + + try: + # 构建命令 + cmd = [ + sys.executable, + str(project_root / "scripts" / "run_rotation.py"), + "--config", config_path, + "--save-path", f"results/report_{datetime.now().strftime('%Y%m%d')}" + ] + + logger.info(f"执行命令: {' '.join(cmd)}") + + # 执行策略 + result = subprocess.run( + cmd, + capture_output=True, + text=True, + cwd=project_root, + timeout=300 # 5分钟超时 + ) + + if result.returncode != 0: + logger.error(f"策略执行失败:\n{result.stderr}") + return {"success": False, "error": result.stderr} + + logger.info("策略执行成功") + logger.debug(result.stdout) + + # 查找生成的报告文件 + report_date = datetime.now().strftime('%Y%m%d') + chart_path = project_root / "results" / f"report_{report_date}_chart.png" + + # 如果找不到带日期的,尝试默认路径 + if not chart_path.exists(): + chart_path = project_root / "results" / "report_chart.png" + + return { + "success": True, + "stdout": result.stdout, + "chart_path": str(chart_path) if chart_path.exists() else None, + } + + except subprocess.TimeoutExpired: + logger.error("策略执行超时") + return {"success": False, "error": "timeout"} + except Exception as e: + logger.error(f"策略执行异常: {e}") + return {"success": False, "error": str(e)} + + +def send_report_to_dingtalk(chart_path: str, summary_text: str = "") -> bool: + """ + 上传报告到OSS并发送到钉钉 + + Args: + chart_path: 图表文件路径 + summary_text: 摘要文本 + + Returns: + bool: 是否发送成功 + """ + logger.info("开始发送报告到钉钉...") + + try: + bot = DingTalkBot() + + if not bot.webhook: + logger.error("钉钉未配置,无法发送") + return False + + today_str = datetime.now().strftime('%Y-%m-%d') + + # 发送图文消息 + success = bot.send_image_via_oss( + image_path=chart_path, + title=f"ETF轮动策略日报 ({today_str})", + text=summary_text or f"今日调仓信号已生成", + expire_days=7 + ) + + if success: + logger.info("报告发送成功") + else: + logger.error("报告发送失败") + + return success + + except Exception as e: + logger.error(f"发送报告异常: {e}") + return False + + +def wait_until_target_time(target_time: str = "15:00"): + """ + 等待直到目标时间 + + Args: + target_time: 目标时间 (HH:MM) + """ + while True: + now = datetime.now() + target = now.replace( + hour=int(target_time.split(":")[0]), + minute=int(target_time.split(":")[1]), + second=0, + microsecond=0 + ) + + # 如果目标时间已过,等到明天 + if target < now: + target += timedelta(days=1) + logger.info(f"目标时间已过,等到明天 {target_time}") + + wait_seconds = (target - now).total_seconds() + + if wait_seconds > 60: + logger.info(f"等待 {target_time},还需 {wait_seconds/60:.0f} 分钟...") + time.sleep(60) # 每分钟检查一次 + else: + logger.info(f"即将到达目标时间,等待 {wait_seconds:.0f} 秒...") + time.sleep(wait_seconds) + break + + +def daily_task(config_path: str = "config/strategies/rotation.yaml"): + """ + 每日任务主流程 + + Args: + config_path: 配置文件路径 + """ + today_str = datetime.now().strftime('%Y-%m-%d') + logger.info(f"=" * 60) + logger.info(f"开始执行每日任务: {today_str}") + logger.info(f"=" * 60) + + # 1. 检查是否为交易日 + if not is_trading_day(): + logger.info("今天不是交易日,跳过执行") + return + + # 2. 执行策略 + result = run_strategy(config_path) + + if not result["success"]: + # 发送错误通知 + bot = DingTalkBot() + bot.send_text(f"策略执行失败: {result.get('error', '未知错误')}") + return + + # 3. 发送报告 + if result.get("chart_path"): + send_report_to_dingtalk( + chart_path=result["chart_path"], + summary_text="今日ETF轮动策略调仓信号" + ) + else: + logger.warning("未找到报告图表") + + logger.info("每日任务完成") + + +def main(): + parser = argparse.ArgumentParser(description="ETF策略每日定时任务") + parser.add_argument( + "--time", + type=str, + default="15:30", + help="执行时间 (HH:MM),默认15:00", + ) + parser.add_argument( + "--config", + type=str, + default="config/strategies/rotation.yaml", + help="配置文件路径", + ) + parser.add_argument( + "--run-now", + action="store_true", + help="立即执行一次(不等待指定时间)", + ) + parser.add_argument( + "--loop", + action="store_true", + help="循环运行(每天执行)", + ) + args = parser.parse_args() + + # 创建日志目录 + (project_root / "logs").mkdir(exist_ok=True) + + if args.run_now: + # 立即执行一次 + daily_task(args.config) + elif args.loop: + # 循环运行 + logger.info(f"启动定时任务,每天 {args.time} 执行") + while True: + wait_until_target_time(args.time) + daily_task(args.config) + # 等待一段时间避免重复执行 + time.sleep(60) + else: + # 等待到目标时间执行一次 + wait_until_target_time(args.time) + daily_task(args.config) + + +if __name__ == "__main__": + main()