""" 阿里云 OSS 工具模块 用于上传文件到 OSS 并生成访问链接 """ import oss2 import os from datetime import datetime from typing import Optional from loguru import logger class OSSUploader: """OSS 文件上传器""" def __init__( self, access_key_id: str = None, access_key_secret: str = None, bucket_name: str = None, endpoint: str = None, ): """ 初始化 OSS 上传器 Args: access_key_id: 阿里云 AccessKey ID access_key_secret: 阿里云 AccessKey Secret bucket_name: OSS Bucket 名称 endpoint: OSS 区域 Endpoint """ # 从环境变量或参数获取配置 self.access_key_id = access_key_id or os.getenv("OSS_ACCESS_KEY_ID") self.access_key_secret = access_key_secret or os.getenv("OSS_ACCESS_KEY_SECRET") self.bucket_name = bucket_name or os.getenv("OSS_BUCKET_NAME", "value-investing") self.endpoint = endpoint or os.getenv("OSS_ENDPOINT", "https://oss-cn-wulanchabu.aliyuncs.com") self.bucket = None self._init_bucket() def _init_bucket(self): """初始化 OSS Bucket""" if not all([self.access_key_id, self.access_key_secret, self.bucket_name, self.endpoint]): logger.warning("OSS 配置不完整,无法初始化") return try: auth = oss2.Auth(self.access_key_id, self.access_key_secret) self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) logger.info(f"OSS Bucket 初始化成功: {self.bucket_name}") except Exception as e: logger.error(f"OSS Bucket 初始化失败: {e}") self.bucket = None def upload_file( self, local_path: str, oss_key: str = None, expire_seconds: int = 3600 * 24 * 7, # 默认7天有效期 ) -> Optional[str]: """ 上传文件到 OSS Args: local_path: 本地文件路径 oss_key: OSS 中的目标路径,如果不指定则自动生成 expire_seconds: 预签名URL有效期(秒) Returns: str: 可访问的 URL,失败返回 None """ if not self.bucket: logger.error("OSS Bucket 未初始化") return None if not os.path.exists(local_path): logger.error(f"本地文件不存在: {local_path}") return None try: # 自动生成 OSS 路径 if not oss_key: file_name = os.path.basename(local_path) date_str = datetime.now().strftime("%Y%m%d") oss_key = f"etf-signals/{date_str}/{file_name}" # 上传文件 self.bucket.put_object_from_file(oss_key, local_path) logger.info(f"文件上传成功: {local_path} -> {oss_key}") # 生成预签名 URL url = self.bucket.sign_url("GET", oss_key, expire_seconds) return url except Exception as e: logger.error(f"文件上传失败: {e}") return None def upload_image( self, image_path: str, expire_days: int = 7, ) -> Optional[str]: """ 上传图片到 OSS(专门用于钉钉通知) Args: image_path: 图片文件路径 expire_days: URL 有效期(天) Returns: str: 图片访问 URL """ if not os.path.exists(image_path): logger.error(f"图片文件不存在: {image_path}") return None # 生成带时间戳的 OSS 路径 file_name = os.path.basename(image_path) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") oss_key = f"etf-charts/{timestamp}_{file_name}" return self.upload_file(image_path, oss_key, expire_days * 24 * 3600) def delete_file(self, oss_key: str) -> bool: """ 删除 OSS 文件 Args: oss_key: OSS 文件路径 Returns: bool: 是否删除成功 """ if not self.bucket: logger.error("OSS Bucket 未初始化") return False try: self.bucket.delete_object(oss_key) logger.info(f"文件删除成功: {oss_key}") return True except Exception as e: logger.error(f"文件删除失败: {e}") return False # 全局单例 _oss_uploader: Optional[OSSUploader] = None def get_oss_uploader() -> OSSUploader: """获取 OSS 上传器单例""" global _oss_uploader if _oss_uploader is None: _oss_uploader = OSSUploader() return _oss_uploader def upload_image_to_oss(image_path: str, expire_days: int = 7) -> Optional[str]: """ 便捷函数:上传图片到 OSS Args: image_path: 图片路径 expire_days: URL 有效期(天) Returns: str: 图片访问 URL """ uploader = get_oss_uploader() return uploader.upload_image(image_path, expire_days)