""" 数据库配置和连接工具 """ import psycopg2 from psycopg2.extras import RealDictCursor from sqlalchemy import create_engine import pandas as pd from loguru import logger from typing import Optional from config.settings import get_db_config class DatabaseManager: """数据库管理类""" def __init__(self, config: dict = None): self.config = config or get_db_config() self.engine = None def get_engine(self): """获取SQLAlchemy引擎""" if self.engine is None: conn_str = ( f"postgresql://{self.config['username']}:{self.config['password']}" f"@{self.config['host']}:{self.config['port']}/{self.config['database']}" ) self.engine = create_engine( conn_str, pool_pre_ping=True, pool_recycle=300, echo=False, ) return self.engine def get_connection(self): """获取psycopg2连接""" return psycopg2.connect( host=self.config["host"], port=self.config["port"], database=self.config["database"], user=self.config["username"], password=self.config["password"], ) def test_connection(self) -> bool: """测试数据库连接""" try: with self.get_connection() as conn: with conn.cursor() as cursor: cursor.execute("SELECT 1") result = cursor.fetchone() logger.info("数据库连接测试成功") return True except Exception as e: logger.error(f"数据库连接测试失败: {e}") return False def execute_query(self, query: str, params: tuple = None) -> Optional[list]: """执行查询并返回结果""" try: with self.get_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(query, params) result = cursor.fetchall() return [dict(row) for row in result] except Exception as e: logger.error(f"执行查询失败: {e}") return None def insert_dataframe( self, df: pd.DataFrame, table_name: str, if_exists: str = "append" ) -> bool: """将DataFrame插入到数据库表""" try: engine = self.get_engine() df.to_sql( table_name, engine, if_exists=if_exists, index=False, method="multi", chunksize=1000, ) logger.info(f"成功插入 {len(df)} 条记录到表 {table_name}") return True except Exception as e: logger.error(f"插入数据到表 {table_name} 失败: {e}") return False def close(self): """关闭连接""" if self.engine: self.engine.dispose() self.engine = None