- 在DingTalkBot中添加发送图片消息(自动压缩)功能,支持大小限制自动处理 - 添加发送图文混合消息、发送文件消息接口,优化钉钉通知能力 - 实现发送本地图片链接和通过OSS上传图片再发送Markdown图文两种机制 - 新增阿里云OSS上传工具模块,支持文件和图片上传及预签名URL生成 - 创建每日任务调度脚本,实现每日交易日检查、策略执行、结果上传并通知 - 调整回测策略开始日期至2022年,适配最新数据范围
173 lines
5.0 KiB
Python
173 lines
5.0 KiB
Python
"""
|
||
阿里云 OSS 工具模块
|
||
用于上传文件到 OSS 并生成访问链接
|
||
"""
|
||
|
||
import oss2
|
||
import os
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
from loguru import logger
|
||
|
||
|
||
class OSSUploader:
|
||
"""OSS 文件上传器"""
|
||
|
||
def __init__(
|
||
self,
|
||
access_key_id: str = None,
|
||
access_key_secret: str = None,
|
||
bucket_name: str = None,
|
||
endpoint: str = None,
|
||
):
|
||
"""
|
||
初始化 OSS 上传器
|
||
|
||
Args:
|
||
access_key_id: 阿里云 AccessKey ID
|
||
access_key_secret: 阿里云 AccessKey Secret
|
||
bucket_name: OSS Bucket 名称
|
||
endpoint: OSS 区域 Endpoint
|
||
"""
|
||
# 从环境变量或参数获取配置
|
||
self.access_key_id = access_key_id or os.getenv("OSS_ACCESS_KEY_ID")
|
||
self.access_key_secret = access_key_secret or os.getenv("OSS_ACCESS_KEY_SECRET")
|
||
self.bucket_name = bucket_name or os.getenv("OSS_BUCKET_NAME", "value-investing")
|
||
self.endpoint = endpoint or os.getenv("OSS_ENDPOINT", "https://oss-cn-wulanchabu.aliyuncs.com")
|
||
|
||
self.bucket = None
|
||
self._init_bucket()
|
||
|
||
def _init_bucket(self):
|
||
"""初始化 OSS Bucket"""
|
||
if not all([self.access_key_id, self.access_key_secret, self.bucket_name, self.endpoint]):
|
||
logger.warning("OSS 配置不完整,无法初始化")
|
||
return
|
||
|
||
try:
|
||
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
|
||
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
|
||
logger.info(f"OSS Bucket 初始化成功: {self.bucket_name}")
|
||
except Exception as e:
|
||
logger.error(f"OSS Bucket 初始化失败: {e}")
|
||
self.bucket = None
|
||
|
||
def upload_file(
|
||
self,
|
||
local_path: str,
|
||
oss_key: str = None,
|
||
expire_seconds: int = 3600 * 24 * 7, # 默认7天有效期
|
||
) -> Optional[str]:
|
||
"""
|
||
上传文件到 OSS
|
||
|
||
Args:
|
||
local_path: 本地文件路径
|
||
oss_key: OSS 中的目标路径,如果不指定则自动生成
|
||
expire_seconds: 预签名URL有效期(秒)
|
||
|
||
Returns:
|
||
str: 可访问的 URL,失败返回 None
|
||
"""
|
||
if not self.bucket:
|
||
logger.error("OSS Bucket 未初始化")
|
||
return None
|
||
|
||
if not os.path.exists(local_path):
|
||
logger.error(f"本地文件不存在: {local_path}")
|
||
return None
|
||
|
||
try:
|
||
# 自动生成 OSS 路径
|
||
if not oss_key:
|
||
file_name = os.path.basename(local_path)
|
||
date_str = datetime.now().strftime("%Y%m%d")
|
||
oss_key = f"etf-signals/{date_str}/{file_name}"
|
||
|
||
# 上传文件
|
||
self.bucket.put_object_from_file(oss_key, local_path)
|
||
logger.info(f"文件上传成功: {local_path} -> {oss_key}")
|
||
|
||
# 生成预签名 URL
|
||
url = self.bucket.sign_url("GET", oss_key, expire_seconds)
|
||
return url
|
||
|
||
except Exception as e:
|
||
logger.error(f"文件上传失败: {e}")
|
||
return None
|
||
|
||
def upload_image(
|
||
self,
|
||
image_path: str,
|
||
expire_days: int = 7,
|
||
) -> Optional[str]:
|
||
"""
|
||
上传图片到 OSS(专门用于钉钉通知)
|
||
|
||
Args:
|
||
image_path: 图片文件路径
|
||
expire_days: URL 有效期(天)
|
||
|
||
Returns:
|
||
str: 图片访问 URL
|
||
"""
|
||
if not os.path.exists(image_path):
|
||
logger.error(f"图片文件不存在: {image_path}")
|
||
return None
|
||
|
||
# 生成带时间戳的 OSS 路径
|
||
file_name = os.path.basename(image_path)
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
oss_key = f"etf-charts/{timestamp}_{file_name}"
|
||
|
||
return self.upload_file(image_path, oss_key, expire_days * 24 * 3600)
|
||
|
||
def delete_file(self, oss_key: str) -> bool:
|
||
"""
|
||
删除 OSS 文件
|
||
|
||
Args:
|
||
oss_key: OSS 文件路径
|
||
|
||
Returns:
|
||
bool: 是否删除成功
|
||
"""
|
||
if not self.bucket:
|
||
logger.error("OSS Bucket 未初始化")
|
||
return False
|
||
|
||
try:
|
||
self.bucket.delete_object(oss_key)
|
||
logger.info(f"文件删除成功: {oss_key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"文件删除失败: {e}")
|
||
return False
|
||
|
||
|
||
# 全局单例
|
||
_oss_uploader: Optional[OSSUploader] = None
|
||
|
||
|
||
def get_oss_uploader() -> OSSUploader:
|
||
"""获取 OSS 上传器单例"""
|
||
global _oss_uploader
|
||
if _oss_uploader is None:
|
||
_oss_uploader = OSSUploader()
|
||
return _oss_uploader
|
||
|
||
|
||
def upload_image_to_oss(image_path: str, expire_days: int = 7) -> Optional[str]:
|
||
"""
|
||
便捷函数:上传图片到 OSS
|
||
|
||
Args:
|
||
image_path: 图片路径
|
||
expire_days: URL 有效期(天)
|
||
|
||
Returns:
|
||
str: 图片访问 URL
|
||
"""
|
||
uploader = get_oss_uploader()
|
||
return uploader.upload_image(image_path, expire_days)
|