feat(notify): 支持钉钉多群推送 & 添加轮动策略核心逻辑文档

- settings.py: 新增 get_all_dingtalk_configs() 自动扫描所有钉钉群配置
- notify.py: 新增 send_to_all_groups() 多群推送函数
- daily_scheduler.py: 报告和错误通知改用多群推送
- .env: 添加第二个钉钉群配置 (DINGTALK_WEBHOOK_2/SECRET_2)
- 轮动策略核心逻辑.md: 策略核心逻辑总结文档
This commit is contained in:
2026-04-23 22:57:23 +08:00
parent 3cca4d79c4
commit 4a500ca5bf
5 changed files with 230 additions and 8 deletions

View File

@@ -12,7 +12,7 @@ import os
from loguru import logger
from typing import Optional
from config.settings import get_dingtalk_config
from config.settings import get_dingtalk_config, get_all_dingtalk_configs
from core.common.oss_utils import upload_image_to_oss
@@ -467,6 +467,45 @@ class DingTalkBot:
return self.send_image(image_path, title)
def send_to_all_groups(
send_func_name: str,
**kwargs,
) -> bool:
"""
向所有已配置的钉钉群发送消息
Args:
send_func_name: DingTalkBot 的发送方法名,如 'send_text', 'send_markdown', 'send_image_via_oss'
**kwargs: 传递给发送方法的参数
Returns:
bool: 是否全部发送成功
"""
configs = get_all_dingtalk_configs()
if not configs:
logger.warning("没有配置任何钉钉群,消息未发送")
return False
all_success = True
for i, cfg in enumerate(configs, 1):
bot = DingTalkBot(webhook=cfg["webhook"], secret=cfg["secret"])
method = getattr(bot, send_func_name, None)
if method is None:
logger.error(f"DingTalkBot 没有方法: {send_func_name}")
return False
try:
success = method(**kwargs)
if success:
logger.info(f"{i} 发送成功")
else:
logger.error(f"{i} 发送失败")
all_success = False
except Exception as e:
logger.error(f"{i} 发送异常: {e}")
all_success = False
return all_success
class NotificationManager:
"""通知管理器 - 统一管理多种通知渠道"""