"""双数据源:PostgreSQL(源库) + MySQL(业务库)""" from typing import Optional from sqlalchemy.ext.asyncio import ( AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.orm import DeclarativeBase from app.config import settings from app.core.logger import log # ──────────── 内部变量 ──────────── _pg_engine: Optional[AsyncEngine] = None _pg_session_factory: Optional[async_sessionmaker[AsyncSession]] = None _mysql_engine: Optional[AsyncEngine] = None _mysql_session_factory: Optional[async_sessionmaker[AsyncSession]] = None class PgBase(DeclarativeBase): """PostgreSQL ORM 声明基类""" pass class MysqlBase(DeclarativeBase): """MySQL ORM 声明基类""" pass async def init_db() -> None: """初始化双数据源""" global _pg_engine, _pg_session_factory, _mysql_engine, _mysql_session_factory _pg_engine = create_async_engine( settings.pg_url, pool_size=settings.pg_pool_size, max_overflow=settings.pg_max_overflow, pool_recycle=3600, echo=False, ) _pg_session_factory = async_sessionmaker(_pg_engine, expire_on_commit=False) _mysql_engine = create_async_engine( settings.mysql_url, pool_size=settings.mysql_pool_size, max_overflow=settings.mysql_max_overflow, pool_recycle=3600, echo=False, ) _mysql_session_factory = async_sessionmaker(_mysql_engine, expire_on_commit=False) log.info("双数据源初始化完成: PG={}, MySQL={}", settings.pg_host, settings.db_host) async def close_db() -> None: """关闭双数据源""" if _pg_engine: await _pg_engine.dispose() if _mysql_engine: await _mysql_engine.dispose() log.info("双数据源已关闭") def PgSession() -> AsyncSession: """获取 PostgreSQL 异步会话(用作 async with PgSession() as session)""" if _pg_session_factory is None: raise RuntimeError("数据库未初始化,请先调用 init_db()") return _pg_session_factory() def MysqlSession() -> AsyncSession: """获取 MySQL 异步会话(用作 async with MysqlSession() as session)""" if _mysql_session_factory is None: raise RuntimeError("数据库未初始化,请先调用 init_db()") return _mysql_session_factory()