From 1ccd9e0011029a9fcd36cdf94918699eb7c81bd1 Mon Sep 17 00:00:00 2001 From: aszerW Date: Sun, 12 Oct 2025 13:50:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=92=89=E9=92=89=E6=9C=BA?= =?UTF-8?q?=E5=99=A8=E4=BA=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dingtalk.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 dingtalk.py diff --git a/dingtalk.py b/dingtalk.py new file mode 100644 index 0000000..9318a05 --- /dev/null +++ b/dingtalk.py @@ -0,0 +1,73 @@ +import requests +import time +import hmac +import hashlib +import base64 +import urllib.parse +from loguru import logger + +class DingTalkBot: + """ + 钉钉机器人类,通过webhook和可选的加签token向群聊发送消息提醒 + """ + + def __init__(self, webhook: str, secret: str = None): + """ + :param webhook: 钉钉自定义机器人webhook地址 + :param secret: 加签密钥(可选) + """ + self.webhook = webhook + self.secret = secret + + def _gen_signed_url(self): + """ + 如果有加签token,根据钉钉接口算法拼接签名到url + """ + if not self.secret: + return self.webhook + timestamp = str(round(time.time() * 1000)) + secret_enc = self.secret.encode('utf-8') + string_to_sign = f"{timestamp}\n{self.secret}" + string_to_sign_enc = string_to_sign.encode('utf-8') + hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) + url = f"{self.webhook}×tamp={timestamp}&sign={sign}" + return url + + def send_text(self, content: str, at_mobiles=None, is_at_all=False): + """ + 发送文本消息 + + :param content: 消息内容 + :param at_mobiles: 需要@的手机号组成的列表,可选 + :param is_at_all: 是否@所有人,默认False + """ + at_mobiles = at_mobiles or [] + data = { + "msgtype": "text", + "text": {"content": content}, + "at": { + "atMobiles": at_mobiles, + "isAtAll": is_at_all + } + } + + url = self._gen_signed_url() if self.secret else self.webhook + + try: + response = requests.post(url, json=data, timeout=5) + response.raise_for_status() + result = response.json() + if result.get("errcode", -1) != 0: + logger.error(f"钉钉消息发送失败: {result}") + else: + logger.info("钉钉消息发送成功") + except Exception as e: + logger.error(f"钉钉消息发送异常: {e}") + + +if __name__ == "__main__": + webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook + secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token(如果有),否则留空 + dingtalk = DingTalkBot(webhook, secret) + dingtalk.send_text("测试消息") \ No newline at end of file