添加钉钉消息通知

This commit is contained in:
2025-10-21 21:52:19 +08:00
parent f2f577c07f
commit 7fdac25742
2 changed files with 184 additions and 0 deletions

View File

@@ -9,6 +9,43 @@ from typing import Dict, List, Optional
from functools import reduce
import logging
from dingtalk import DingTalkBot
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
from pandas import DataFrame
from typing import Optional, Union
from freqtrade.strategy import (
IStrategy,
Trade,
Order,
PairLocks,
informative, # @informative decorator
# Hyperopt Parameters
BooleanParameter,
CategoricalParameter,
DecimalParameter,
IntParameter,
RealParameter,
# timeframe helpers
timeframe_to_minutes,
timeframe_to_next_date,
timeframe_to_prev_date,
# Strategy helper functions
merge_informative_pair,
stoploss_from_absolute,
stoploss_from_open,
)
webhook = "https://oapi.dingtalk.com/robot/send?access_token=87c7abfcdd69b699c32da4e4f5981cd2ca6b0445474fc6ffb36f2ed0f6262fbb" # 填写你的webhook
secret = "SECf3d6b43f2f8a87ab91feffd052e71ec314fbf57a1842e483fe07af3c0a0e5aa6" # 填写你的加签token如果有否则留空
# CTA 群机器人
# webhook = "https://oapi.dingtalk.com/robot/send?access_token=87c7abfcdd69b699c32da4e4f5981cd2ca6b0445474fc6ffb36f2ed0f6262fbb"
# secret = "SECf3d6b43f2f8a87ab91feffd052e71ec314fbf57a1842e483fe07af3c0a0e5aa6"
dingtalk = DingTalkBot(webhook, secret)
logger = logging.getLogger(__name__)
@@ -57,6 +94,35 @@ class SimpleRSIStrategyFixed(IStrategy):
ema_breakout_threshold = 0.02
ema_separation_threshold = 0.02
def order_filled(self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs) -> None:
# 交易对
trading_pair = pair
# 时间
fill_time = order.order_filled_date or current_time
# 价格
fill_price = order.average or order.price
# 买入还是卖出
side = order.ft_order_side # 'buy' 或 'sell'
# 仓位(当前持仓数量)
position = trade.amount
# 或者使用日志
logger.info(
f"订单成交 - 交易对: {trading_pair}, "
f"时间: {fill_time}, "
f"价格: {fill_price}, "
f"方向: {side}, "
f"仓位: {position}"
)
dingtalk.send_text(
content=f"订单成交 - 交易对: {trading_pair}, 时间: {fill_time}, 价格: {fill_price}, 方向: {side}, 仓位: {position}")
return None
def populate_indicators(self, dataframe: pd.DataFrame, metadata: Dict) -> pd.DataFrame:
"""
添加技术指标到数据框