45 lines
1.3 KiB
Python
45 lines
1.3 KiB
Python
"""Redis 异步连接管理
|
|
|
|
通过 RedisManager 类管理 Redis 连接,使用类属性持有客户端实例。
|
|
所有模块通过 from import RedisManager 导入后,访问 RedisManager.client 即可获取最新连接。
|
|
"""
|
|
|
|
from typing import Optional
|
|
|
|
import redis.asyncio as aioredis
|
|
|
|
from app.config import settings
|
|
from app.core.logger import log
|
|
|
|
|
|
class RedisManager:
|
|
"""Redis 连接管理器(纯静态类,禁止实例化)"""
|
|
|
|
client: Optional[aioredis.Redis] = None
|
|
|
|
def __init__(self):
|
|
raise TypeError("RedisManager 是静态工具类,禁止实例化")
|
|
|
|
@classmethod
|
|
async def init(cls) -> None:
|
|
"""初始化 Redis 连接池"""
|
|
try:
|
|
cls.client = aioredis.from_url(
|
|
settings.redis_url,
|
|
max_connections=settings.redis_pool_size,
|
|
decode_responses=True,
|
|
)
|
|
await cls.client.ping()
|
|
log.info("Redis 连接池已初始化")
|
|
except Exception as e:
|
|
log.error(f"Redis 连接初始化失败: {e}")
|
|
raise
|
|
|
|
@classmethod
|
|
async def close(cls) -> None:
|
|
"""关闭 Redis 连接池"""
|
|
if cls.client:
|
|
await cls.client.close()
|
|
cls.client = None
|
|
log.info("Redis 连接池已关闭")
|