73 lines
2.5 KiB
Python
73 lines
2.5 KiB
Python
import requests
|
||
import time
|
||
import hmac
|
||
import hashlib
|
||
import base64
|
||
import urllib.parse
|
||
from loguru import logger
|
||
|
||
class DingTalkBot:
|
||
"""
|
||
钉钉机器人类,通过webhook和可选的加签token向群聊发送消息提醒
|
||
"""
|
||
|
||
def __init__(self, webhook: str, secret: str = None):
|
||
"""
|
||
:param webhook: 钉钉自定义机器人webhook地址
|
||
:param secret: 加签密钥(可选)
|
||
"""
|
||
self.webhook = webhook
|
||
self.secret = secret
|
||
|
||
def _gen_signed_url(self):
|
||
"""
|
||
如果有加签token,根据钉钉接口算法拼接签名到url
|
||
"""
|
||
if not self.secret:
|
||
return self.webhook
|
||
timestamp = str(round(time.time() * 1000))
|
||
secret_enc = self.secret.encode('utf-8')
|
||
string_to_sign = f"{timestamp}\n{self.secret}"
|
||
string_to_sign_enc = string_to_sign.encode('utf-8')
|
||
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
|
||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
||
url = f"{self.webhook}×tamp={timestamp}&sign={sign}"
|
||
return url
|
||
|
||
def send_text(self, content: str, at_mobiles=None, is_at_all=False):
|
||
"""
|
||
发送文本消息
|
||
|
||
:param content: 消息内容
|
||
:param at_mobiles: 需要@的手机号组成的列表,可选
|
||
:param is_at_all: 是否@所有人,默认False
|
||
"""
|
||
at_mobiles = at_mobiles or []
|
||
data = {
|
||
"msgtype": "text",
|
||
"text": {"content": content},
|
||
"at": {
|
||
"atMobiles": at_mobiles,
|
||
"isAtAll": is_at_all
|
||
}
|
||
}
|
||
|
||
url = self._gen_signed_url() if self.secret else self.webhook
|
||
|
||
try:
|
||
response = requests.post(url, json=data, timeout=5)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
if result.get("errcode", -1) != 0:
|
||
logger.error(f"钉钉消息发送失败: {result}")
|
||
else:
|
||
logger.info("钉钉消息发送成功")
|
||
except Exception as e:
|
||
logger.error(f"钉钉消息发送异常: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
webhook = "https://oapi.dingtalk.com/robot/send?access_token=fb70c1561d8beba94b4f11568f4bb15e3ae07ccbdc8ac19676434a9d1cd17546" # 填写你的webhook
|
||
secret = "SEC1ae7cd2f1a6f9da3611af37da3e7d954c1e8533fc073c6c8cc5e5af3b6e5926b" # 填写你的加签token(如果有),否则留空
|
||
dingtalk = DingTalkBot(webhook, secret)
|
||
dingtalk.send_text("测试消息") |