Files
etf/core/common/notify.py
aszerW 988c2335fb chore(config): 添加环境变量示例及.gitignore更新
- 新增 .env.example,包含 Tushare API、钉钉机器人和PostgreSQL数据库配置模板
- 更新.gitignore,忽略本地配置文件如 .env.local 和 config_local.py
- 添加对报表文件命名规则的支持,保留示例文件不忽略
- 删除废弃的 chart.py 及相关图表模块代码
- 新增 config/settings.py,实现从环境变量读取配置的统一接口
- 设置数据目录及缓存目录,确保目录存在,提高配置管理规范性
2026-03-18 23:33:40 +08:00

211 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
通知模块 - 支持钉钉、日志等多种通知方式
"""
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse
from loguru import logger
from typing import Optional
from config.settings import get_dingtalk_config
class DingTalkBot:
"""钉钉机器人类"""
def __init__(self, webhook: str = None, secret: str = None):
"""
初始化钉钉机器人
Args:
webhook: 钉钉自定义机器人webhook地址
secret: 加签密钥(可选)
"""
config = get_dingtalk_config()
self.webhook = webhook or config.get("webhook", "")
self.secret = secret or config.get("secret", "")
if not self.webhook:
logger.warning("钉钉webhook未配置消息将不会被发送")
def _gen_signed_url(self) -> str:
"""生成带签名的URL"""
if not self.secret:
return self.webhook
timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode("utf-8")
string_to_sign = f"{timestamp}\n{self.secret}"
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"{self.webhook}&timestamp={timestamp}&sign={sign}"
def send_text(
self, content: str, at_mobiles: list = None, is_at_all: bool = False
) -> bool:
"""
发送文本消息
Args:
content: 消息内容
at_mobiles: 需要@的手机号列表
is_at_all: 是否@所有人
Returns:
bool: 是否发送成功
"""
if not self.webhook:
logger.warning(f"[钉钉消息未发送] {content[:100]}...")
return False
at_mobiles = at_mobiles or []
data = {
"msgtype": "text",
"text": {"content": content},
"at": {"atMobiles": at_mobiles, "isAtAll": is_at_all},
}
url = self._gen_signed_url()
try:
response = requests.post(url, json=data, timeout=5)
response.raise_for_status()
result = response.json()
if result.get("errcode", -1) != 0:
logger.error(f"钉钉消息发送失败: {result}")
return False
logger.info("钉钉消息发送成功")
return True
except Exception as e:
logger.error(f"钉钉消息发送异常: {e}")
return False
def send_markdown(
self,
title: str,
text: str,
at_mobiles: list = None,
is_at_all: bool = False,
) -> bool:
"""
发送markdown消息
Args:
title: 消息标题
text: markdown格式的消息内容
at_mobiles: 需要@的手机号列表
is_at_all: 是否@所有人
Returns:
bool: 是否发送成功
"""
if not self.webhook:
logger.warning(f"[钉钉Markdown未发送] {title}")
return False
at_mobiles = at_mobiles or []
data = {
"msgtype": "markdown",
"markdown": {"title": title, "text": text},
"at": {"atMobiles": at_mobiles, "isAtAll": is_at_all},
}
url = self._gen_signed_url()
try:
response = requests.post(url, json=data, timeout=5)
response.raise_for_status()
result = response.json()
if result.get("errcode", -1) != 0:
logger.error(f"钉钉markdown消息发送失败: {result}")
return False
logger.info("钉钉markdown消息发送成功")
return True
except Exception as e:
logger.error(f"钉钉markdown消息发送异常: {e}")
return False
class NotificationManager:
"""通知管理器 - 统一管理多种通知渠道"""
def __init__(self):
self.dingtalk = DingTalkBot()
def notify(self, message: str, title: str = "系统通知", use_markdown: bool = False):
"""
发送通知(优先使用钉钉,失败则记录日志)
Args:
message: 消息内容
title: 消息标题markdown模式使用
use_markdown: 是否使用markdown格式
"""
if use_markdown:
success = self.dingtalk.send_markdown(title, message)
else:
success = self.dingtalk.send_text(message)
if not success:
# 钉钉发送失败,记录到日志
logger.info(f"[通知] {title}: {message}")
def notify_error(self, error_msg: str):
"""发送错误通知"""
markdown = f"""## 错误告警
**时间**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**错误信息**:
```
{error_msg}
```
"""
self.notify(markdown, title="系统错误", use_markdown=True)
def notify_signal(self, signals: list, signal_type: str = "CCI超卖"):
"""
发送交易信号通知
Args:
signals: 信号列表每项为dict包含code, name等指标
signal_type: 信号类型名称
"""
if not signals:
logger.info(f"[{signal_type}] 无信号")
return
# 构建markdown表格
if signals:
headers = signals[0].keys()
header_line = " | ".join(headers)
separator = " | ".join(["---"] * len(headers))
rows = []
for s in signals:
row = " | ".join(str(v) for v in s.values())
rows.append(row)
table = f"{header_line}\n{separator}\n" + "\n".join(rows)
else:
table = ""
markdown = f"""## {signal_type}信号
**时间**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**筛选结果**:
{table}
{len(signals)} 个标的符合筛选条件。
"""
self.notify(markdown, title=f"{signal_type}信号", use_markdown=True)