添加钉钉机器人

This commit is contained in:
2025-10-12 13:50:09 +08:00
parent cea661ff7d
commit 1ccd9e0011

73
dingtalk.py Normal file
View File

@@ -0,0 +1,73 @@
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse
from loguru import logger
class DingTalkBot:
"""
钉钉机器人类通过webhook和可选的加签token向群聊发送消息提醒
"""
def __init__(self, webhook: str, secret: str = None):
"""
:param webhook: 钉钉自定义机器人webhook地址
:param secret: 加签密钥(可选)
"""
self.webhook = webhook
self.secret = secret
def _gen_signed_url(self):
"""
如果有加签token根据钉钉接口算法拼接签名到url
"""
if not self.secret:
return self.webhook
timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = f"{timestamp}\n{self.secret}"
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"{self.webhook}&timestamp={timestamp}&sign={sign}"
return url
def send_text(self, content: str, at_mobiles=None, is_at_all=False):
"""
发送文本消息
:param content: 消息内容
:param at_mobiles: 需要@的手机号组成的列表,可选
:param is_at_all: 是否@所有人默认False
"""
at_mobiles = at_mobiles or []
data = {
"msgtype": "text",
"text": {"content": content},
"at": {
"atMobiles": at_mobiles,
"isAtAll": is_at_all
}
}
url = self._gen_signed_url() if self.secret else self.webhook
try:
response = requests.post(url, json=data, timeout=5)
response.raise_for_status()
result = response.json()
if result.get("errcode", -1) != 0:
logger.error(f"钉钉消息发送失败: {result}")
else:
logger.info("钉钉消息发送成功")
except Exception as e:
logger.error(f"钉钉消息发送异常: {e}")
if __name__ == "__main__":
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token如果有否则留空
dingtalk = DingTalkBot(webhook, secret)
dingtalk.send_text("测试消息")