""" 通知模块 - 支持钉钉、日志等多种通知方式 """ import requests import time import hmac import hashlib import base64 import urllib.parse from loguru import logger from typing import Optional from config.settings import get_dingtalk_config class DingTalkBot: """钉钉机器人类""" def __init__(self, webhook: str = None, secret: str = None): """ 初始化钉钉机器人 Args: webhook: 钉钉自定义机器人webhook地址 secret: 加签密钥(可选) """ config = get_dingtalk_config() self.webhook = webhook or config.get("webhook", "") self.secret = secret or config.get("secret", "") if not self.webhook: logger.warning("钉钉webhook未配置,消息将不会被发送") def _gen_signed_url(self) -> str: """生成带签名的URL""" if not self.secret: return self.webhook timestamp = str(round(time.time() * 1000)) secret_enc = self.secret.encode("utf-8") string_to_sign = f"{timestamp}\n{self.secret}" string_to_sign_enc = string_to_sign.encode("utf-8") hmac_code = hmac.new( secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return f"{self.webhook}×tamp={timestamp}&sign={sign}" def send_text( self, content: str, at_mobiles: list = None, is_at_all: bool = False ) -> bool: """ 发送文本消息 Args: content: 消息内容 at_mobiles: 需要@的手机号列表 is_at_all: 是否@所有人 Returns: bool: 是否发送成功 """ if not self.webhook: logger.warning(f"[钉钉消息未发送] {content[:100]}...") return False at_mobiles = at_mobiles or [] data = { "msgtype": "text", "text": {"content": content}, "at": {"atMobiles": at_mobiles, "isAtAll": is_at_all}, } url = self._gen_signed_url() try: response = requests.post(url, json=data, timeout=5) response.raise_for_status() result = response.json() if result.get("errcode", -1) != 0: logger.error(f"钉钉消息发送失败: {result}") return False logger.info("钉钉消息发送成功") return True except Exception as e: logger.error(f"钉钉消息发送异常: {e}") return False def send_markdown( self, title: str, text: str, at_mobiles: list = None, is_at_all: bool = False, ) -> bool: """ 发送markdown消息 Args: title: 消息标题 text: markdown格式的消息内容 at_mobiles: 需要@的手机号列表 is_at_all: 是否@所有人 Returns: bool: 是否发送成功 """ if not self.webhook: logger.warning(f"[钉钉Markdown未发送] {title}") return False at_mobiles = at_mobiles or [] data = { "msgtype": "markdown", "markdown": {"title": title, "text": text}, "at": {"atMobiles": at_mobiles, "isAtAll": is_at_all}, } url = self._gen_signed_url() try: response = requests.post(url, json=data, timeout=5) response.raise_for_status() result = response.json() if result.get("errcode", -1) != 0: logger.error(f"钉钉markdown消息发送失败: {result}") return False logger.info("钉钉markdown消息发送成功") return True except Exception as e: logger.error(f"钉钉markdown消息发送异常: {e}") return False class NotificationManager: """通知管理器 - 统一管理多种通知渠道""" def __init__(self): self.dingtalk = DingTalkBot() def notify(self, message: str, title: str = "系统通知", use_markdown: bool = False): """ 发送通知(优先使用钉钉,失败则记录日志) Args: message: 消息内容 title: 消息标题(markdown模式使用) use_markdown: 是否使用markdown格式 """ if use_markdown: success = self.dingtalk.send_markdown(title, message) else: success = self.dingtalk.send_text(message) if not success: # 钉钉发送失败,记录到日志 logger.info(f"[通知] {title}: {message}") def notify_error(self, error_msg: str): """发送错误通知""" markdown = f"""## 错误告警 **时间**: {time.strftime('%Y-%m-%d %H:%M:%S')} **错误信息**: ``` {error_msg} ``` """ self.notify(markdown, title="系统错误", use_markdown=True) def notify_signal(self, signals: list, signal_type: str = "CCI超卖"): """ 发送交易信号通知 Args: signals: 信号列表,每项为dict包含code, name等指标 signal_type: 信号类型名称 """ if not signals: logger.info(f"[{signal_type}] 无信号") return # 构建markdown表格 if signals: headers = signals[0].keys() header_line = " | ".join(headers) separator = " | ".join(["---"] * len(headers)) rows = [] for s in signals: row = " | ".join(str(v) for v in s.values()) rows.append(row) table = f"{header_line}\n{separator}\n" + "\n".join(rows) else: table = "无" markdown = f"""## {signal_type}信号 **时间**: {time.strftime('%Y-%m-%d %H:%M:%S')} **筛选结果**: {table} 共 {len(signals)} 个标的符合筛选条件。 """ self.notify(markdown, title=f"{signal_type}信号", use_markdown=True)