封装 redis 客户端处理方案

This commit is contained in:
zk
2026-04-10 10:34:41 +08:00
parent c001ba8146
commit 70a080d9b3
4 changed files with 49 additions and 44 deletions
+32 -27
View File
@@ -1,3 +1,9 @@
"""Redis 异步连接管理
通过 RedisManager 类管理 Redis 连接,使用类属性持有客户端实例。
所有模块通过 from import RedisManager 导入后,访问 RedisManager.client 即可获取最新连接。
"""
from typing import Optional
import redis.asyncio as aioredis
@@ -5,35 +11,34 @@ import redis.asyncio as aioredis
from app.config import settings
from app.core.logger import log
redis_client: Optional[aioredis.Redis] = None
class RedisManager:
"""Redis 连接管理器(纯静态类,禁止实例化)"""
async def init_redis() -> None:
"""初始化 Redis 连接池"""
global redis_client
try:
redis_client = aioredis.from_url(
settings.redis_url,
max_connections=settings.redis_pool_size,
decode_responses=True,
)
await redis_client.ping()
log.info("Redis 连接池已初始化")
except Exception as e:
log.error(f"Redis 连接初始化失败: {e}")
raise
client: Optional[aioredis.Redis] = None
def __init__(self):
raise TypeError("RedisManager 是静态工具类,禁止实例化")
async def close_redis() -> None:
"""关闭 Redis 连接池"""
global redis_client
if redis_client:
await redis_client.close()
log.info("Redis 连接池已关闭")
@classmethod
async def init(cls) -> None:
"""初始化 Redis 连接池"""
try:
cls.client = aioredis.from_url(
settings.redis_url,
max_connections=settings.redis_pool_size,
decode_responses=True,
)
await cls.client.ping()
log.info("Redis 连接池已初始化")
except Exception as e:
log.error(f"Redis 连接初始化失败: {e}")
raise
async def get_redis() -> aioredis.Redis:
"""依赖注入:提供 Redis 客户端实例"""
if redis_client is None:
raise RuntimeError("Redis 未初始化,请先调用 init_redis()")
return redis_client
@classmethod
async def close(cls) -> None:
"""关闭 Redis 连接池"""
if cls.client:
await cls.client.close()
cls.client = None
log.info("Redis 连接池已关闭")