"""Redis 异步连接管理 通过 RedisManager 类管理 Redis 连接,使用类属性持有客户端实例。 所有模块通过 from import RedisManager 导入后,访问 RedisManager.client 即可获取最新连接。 """ from typing import Optional import redis.asyncio as aioredis from app.config import settings from app.core.logger import log class RedisManager: """Redis 连接管理器(纯静态类,禁止实例化)""" client: Optional[aioredis.Redis] = None def __init__(self): raise TypeError("RedisManager 是静态工具类,禁止实例化") @classmethod async def init(cls) -> None: """初始化 Redis 连接池""" try: cls.client = aioredis.from_url( settings.redis_url, max_connections=settings.redis_pool_size, decode_responses=True, ) await cls.client.ping() log.info("Redis 连接池已初始化") except Exception as e: log.error(f"Redis 连接初始化失败: {e}") raise @classmethod async def close(cls) -> None: """关闭 Redis 连接池""" if cls.client: await cls.client.close() cls.client = None log.info("Redis 连接池已关闭")