Files
etf/core/common/oss_utils.py
aszerW 844e609ff7 refactor(notify): 将通知模块从归档移至正式位置
- 将 notify.py 和 oss_utils.py 从 archive/legacy_core 移至 core/common/
- 内联钉钉配置读取函数,移除对 config.settings 的依赖
- 删除 config/ 目录(settings.py 不再需要)
- daily_scheduler.py 移除归档路径的 sys.path hack
- 新增 --no-detail 和 --no-report 命令行参数控制导出
- 全标的排名表新增退场日期和退场价格列
2026-06-08 22:34:03 +08:00

173 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阿里云 OSS 工具模块
用于上传文件到 OSS 并生成访问链接
"""
import oss2
import os
from datetime import datetime
from typing import Optional
from loguru import logger
class OSSUploader:
"""OSS 文件上传器"""
def __init__(
self,
access_key_id: str = None,
access_key_secret: str = None,
bucket_name: str = None,
endpoint: str = None,
):
"""
初始化 OSS 上传器
Args:
access_key_id: 阿里云 AccessKey ID
access_key_secret: 阿里云 AccessKey Secret
bucket_name: OSS Bucket 名称
endpoint: OSS 区域 Endpoint
"""
# 从环境变量或参数获取配置
self.access_key_id = access_key_id or os.getenv("OSS_ACCESS_KEY_ID")
self.access_key_secret = access_key_secret or os.getenv("OSS_ACCESS_KEY_SECRET")
self.bucket_name = bucket_name or os.getenv("OSS_BUCKET_NAME", "value-investing")
self.endpoint = endpoint or os.getenv("OSS_ENDPOINT", "https://oss-cn-wulanchabu.aliyuncs.com")
self.bucket = None
self._init_bucket()
def _init_bucket(self):
"""初始化 OSS Bucket"""
if not all([self.access_key_id, self.access_key_secret, self.bucket_name, self.endpoint]):
logger.warning("OSS 配置不完整,无法初始化")
return
try:
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name)
logger.info(f"OSS Bucket 初始化成功: {self.bucket_name}")
except Exception as e:
logger.error(f"OSS Bucket 初始化失败: {e}")
self.bucket = None
def upload_file(
self,
local_path: str,
oss_key: str = None,
expire_seconds: int = 3600 * 24 * 7, # 默认7天有效期
) -> Optional[str]:
"""
上传文件到 OSS
Args:
local_path: 本地文件路径
oss_key: OSS 中的目标路径,如果不指定则自动生成
expire_seconds: 预签名URL有效期
Returns:
str: 可访问的 URL失败返回 None
"""
if not self.bucket:
logger.error("OSS Bucket 未初始化")
return None
if not os.path.exists(local_path):
logger.error(f"本地文件不存在: {local_path}")
return None
try:
# 自动生成 OSS 路径
if not oss_key:
file_name = os.path.basename(local_path)
date_str = datetime.now().strftime("%Y%m%d")
oss_key = f"etf-signals/{date_str}/{file_name}"
# 上传文件
self.bucket.put_object_from_file(oss_key, local_path)
logger.info(f"文件上传成功: {local_path} -> {oss_key}")
# 生成预签名 URL
url = self.bucket.sign_url("GET", oss_key, expire_seconds)
return url
except Exception as e:
logger.error(f"文件上传失败: {e}")
return None
def upload_image(
self,
image_path: str,
expire_days: int = 7,
) -> Optional[str]:
"""
上传图片到 OSS专门用于钉钉通知
Args:
image_path: 图片文件路径
expire_days: URL 有效期(天)
Returns:
str: 图片访问 URL
"""
if not os.path.exists(image_path):
logger.error(f"图片文件不存在: {image_path}")
return None
# 生成带时间戳的 OSS 路径
file_name = os.path.basename(image_path)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
oss_key = f"etf-charts/{timestamp}_{file_name}"
return self.upload_file(image_path, oss_key, expire_days * 24 * 3600)
def delete_file(self, oss_key: str) -> bool:
"""
删除 OSS 文件
Args:
oss_key: OSS 文件路径
Returns:
bool: 是否删除成功
"""
if not self.bucket:
logger.error("OSS Bucket 未初始化")
return False
try:
self.bucket.delete_object(oss_key)
logger.info(f"文件删除成功: {oss_key}")
return True
except Exception as e:
logger.error(f"文件删除失败: {e}")
return False
# 全局单例
_oss_uploader: Optional[OSSUploader] = None
def get_oss_uploader() -> OSSUploader:
"""获取 OSS 上传器单例"""
global _oss_uploader
if _oss_uploader is None:
_oss_uploader = OSSUploader()
return _oss_uploader
def upload_image_to_oss(image_path: str, expire_days: int = 7) -> Optional[str]:
"""
便捷函数:上传图片到 OSS
Args:
image_path: 图片路径
expire_days: URL 有效期(天)
Returns:
str: 图片访问 URL
"""
uploader = get_oss_uploader()
return uploader.upload_image(image_path, expire_days)