80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
"""双数据源:PostgreSQL(源库) + MySQL(业务库)"""
|
||
|
||
from typing import Optional
|
||
|
||
from sqlalchemy.ext.asyncio import (
|
||
AsyncEngine,
|
||
AsyncSession,
|
||
async_sessionmaker,
|
||
create_async_engine,
|
||
)
|
||
from sqlalchemy.orm import DeclarativeBase
|
||
|
||
from app.config import settings
|
||
from app.core.logger import log
|
||
|
||
# ──────────── 内部变量 ────────────
|
||
_pg_engine: Optional[AsyncEngine] = None
|
||
_pg_session_factory: Optional[async_sessionmaker[AsyncSession]] = None
|
||
|
||
_mysql_engine: Optional[AsyncEngine] = None
|
||
_mysql_session_factory: Optional[async_sessionmaker[AsyncSession]] = None
|
||
|
||
|
||
class PgBase(DeclarativeBase):
|
||
"""PostgreSQL ORM 声明基类"""
|
||
pass
|
||
|
||
|
||
class MysqlBase(DeclarativeBase):
|
||
"""MySQL ORM 声明基类"""
|
||
pass
|
||
|
||
|
||
async def init_db() -> None:
|
||
"""初始化双数据源"""
|
||
global _pg_engine, _pg_session_factory, _mysql_engine, _mysql_session_factory
|
||
|
||
_pg_engine = create_async_engine(
|
||
settings.pg_url,
|
||
pool_size=settings.pg_pool_size,
|
||
max_overflow=settings.pg_max_overflow,
|
||
pool_recycle=3600,
|
||
echo=False,
|
||
)
|
||
_pg_session_factory = async_sessionmaker(_pg_engine, expire_on_commit=False)
|
||
|
||
_mysql_engine = create_async_engine(
|
||
settings.mysql_url,
|
||
pool_size=settings.mysql_pool_size,
|
||
max_overflow=settings.mysql_max_overflow,
|
||
pool_recycle=3600,
|
||
echo=False,
|
||
)
|
||
_mysql_session_factory = async_sessionmaker(_mysql_engine, expire_on_commit=False)
|
||
|
||
log.info("双数据源初始化完成: PG={}, MySQL={}", settings.pg_host, settings.db_host)
|
||
|
||
|
||
async def close_db() -> None:
|
||
"""关闭双数据源"""
|
||
if _pg_engine:
|
||
await _pg_engine.dispose()
|
||
if _mysql_engine:
|
||
await _mysql_engine.dispose()
|
||
log.info("双数据源已关闭")
|
||
|
||
|
||
def PgSession() -> AsyncSession:
|
||
"""获取 PostgreSQL 异步会话(用作 async with PgSession() as session)"""
|
||
if _pg_session_factory is None:
|
||
raise RuntimeError("数据库未初始化,请先调用 init_db()")
|
||
return _pg_session_factory()
|
||
|
||
|
||
def MysqlSession() -> AsyncSession:
|
||
"""获取 MySQL 异步会话(用作 async with MysqlSession() as session)"""
|
||
if _mysql_session_factory is None:
|
||
raise RuntimeError("数据库未初始化,请先调用 init_db()")
|
||
return _mysql_session_factory()
|