Files
2026-04-10 10:34:41 +08:00

45 lines
1.3 KiB
Python

"""Redis 异步连接管理
通过 RedisManager 类管理 Redis 连接,使用类属性持有客户端实例。
所有模块通过 from import RedisManager 导入后,访问 RedisManager.client 即可获取最新连接。
"""
from typing import Optional
import redis.asyncio as aioredis
from app.config import settings
from app.core.logger import log
class RedisManager:
"""Redis 连接管理器(纯静态类,禁止实例化)"""
client: Optional[aioredis.Redis] = None
def __init__(self):
raise TypeError("RedisManager 是静态工具类,禁止实例化")
@classmethod
async def init(cls) -> None:
"""初始化 Redis 连接池"""
try:
cls.client = aioredis.from_url(
settings.redis_url,
max_connections=settings.redis_pool_size,
decode_responses=True,
)
await cls.client.ping()
log.info("Redis 连接池已初始化")
except Exception as e:
log.error(f"Redis 连接初始化失败: {e}")
raise
@classmethod
async def close(cls) -> None:
"""关闭 Redis 连接池"""
if cls.client:
await cls.client.close()
cls.client = None
log.info("Redis 连接池已关闭")