| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- import json
- from collections.abc import Awaitable
- from typing import Any
- from redis.asyncio.client import Redis
- from app.core.logger import log
- class RedisCURD:
- """缓存工具类"""
- def __init__(self, redis: Redis) -> None:
- """初始化"""
- self.redis = redis
- async def mget(self, keys: list) -> list:
- """批量获取缓存
- 参数:
- - keys (list): 键名列表
- 返回:
- - list: 返回缓存值列表,如果获取失败则返回空列表
- """
- try:
- data = await self.redis.mget(*[str(key) for key in keys])
- return data
- except Exception as e:
- log.error(f"批量获取缓存失败: {e!s}")
- return []
- async def get_keys(self, pattern: str = "*") -> list:
- """获取缓存键名
- 参数:
- - pattern (str, optional): 匹配模式,默认值为"*"。
- 返回:
- - list: 返回匹配的缓存键名列表,如果获取失败则返回空列表
- """
- try:
- keys = await self.redis.keys(f"{pattern}")
- return keys
- except Exception as e:
- log.error(f"获取缓存键名失败: {e!s}")
- return []
- async def get(self, key: str) -> Any:
- """获取缓存
- 参数:
- - key (str): 缓存键名
- 返回:
- - Any: 返回缓存值,如果缓存不存在则返回None
- """
- try:
- data = await self.redis.get(f"{key}")
- if data is None:
- return None
- return data
- except Exception as e:
- log.error(f"获取缓存失败: {e!s}")
- return None
- async def set(self, key: str, value: Any, expire: int | None = None) -> bool:
- """设置缓存
- 参数:
- - key (str): 缓存键名
- - value (Any): 缓存值
- - expire (int | None, optional): 过期时间,单位为秒,默认值为None。
- 返回:
- - bool: 如果设置缓存成功则返回True,否则返回False
- """
- try:
- # 根据数据类型选择序列化方式
- if isinstance(value, (int, float, str)):
- data = str(value).encode("utf-8")
- else:
- try:
- data = json.dumps(value).encode("utf-8")
- except Exception as e:
- log.error(f"序列化数据失败: {e!s}")
- return False
- await self.redis.set(name=key, value=data, ex=expire)
- return True
- except Exception as e:
- log.error(f"设置缓存失败: {e!s}")
- return False
- async def lock(self, key: str, expire: int, value: str | None = None) -> tuple[bool, str]:
- """获取分布式锁
- 参数:
- - key (str): 锁键名
- - expire (int): 锁过期时间,单位为秒
- - value (str, optional): 锁值,默认值为None(自动生成UUID)。
- 返回:
- - tuple[bool, str]: (获取锁是否成功, 锁值)
- """
- try:
- import uuid
- # 如果没有提供value,生成唯一的UUID
- lock_value = value or str(uuid.uuid4())
- # 使用setnx命令实现原子性锁获取
- result = await self.redis.set(
- name=key,
- value=lock_value,
- ex=expire,
- nx=True, # 只有当键不存在时才设置
- )
- return (result is not None, lock_value)
- except Exception as e:
- log.error(f"获取分布式锁失败: {e!s}")
- return (False, "")
- async def unlock(self, key: str, value: str) -> bool:
- """释放分布式锁(安全版本,验证锁值)
- 参数:
- - key (str): 锁键名
- - value (str): 锁值,用于验证锁的持有者
- 返回:
- - bool: 如果释放锁成功则返回True,否则返回False
- """
- try:
- # 使用Lua脚本确保原子性验证和删除
- script = """
- if redis.call('get', KEYS[1]) == ARGV[1] then
- return redis.call('del', KEYS[1])
- else
- return 0
- end
- """
- result = await self.redis.eval(script, 1, key, value) # pyright: ignore[reportGeneralTypeIssues]
- return result == 1
- except Exception as e:
- log.error(f"释放分布式锁失败: {e!s}")
- return False
- async def unlock_simple(self, key: str) -> bool:
- """释放分布式锁(简单版本,不验证锁值)
- 参数:
- - key (str): 锁键名
- 返回:
- - bool: 如果释放锁成功则返回True,否则返回False
- """
- try:
- await self.redis.delete(key)
- return True
- except Exception as e:
- log.error(f"释放分布式锁失败: {e!s}")
- return False
- async def delete(self, *keys: str) -> bool:
- """删除缓存
- 参数:
- - keys (str): 缓存键名
- 返回:
- - bool: 如果删除缓存成功则返回True,否则返回False
- """
- try:
- await self.redis.delete(*keys)
- return True
- except Exception as e:
- log.error(f"删除缓存失败: {e!s}")
- return False
- async def clear(self, pattern: str = "*") -> bool:
- """清空缓存
- 参数:
- - pattern (str, optional): 匹配模式,默认值为"*"。
- 返回:
- - bool: 如果清空缓存成功则返回True,否则返回False
- """
- try:
- keys = await self.redis.keys(f"{pattern}")
- if keys:
- await self.redis.delete(*keys)
- return True
- except Exception as e:
- log.error(f"清空缓存失败: {e!s}")
- return False
- async def exists(self, key: str) -> bool:
- """判断缓存是否存在
- 参数:
- - key (str): 缓存键名
- 返回:
- - bool: 如果缓存存在则返回True,否则返回False
- """
- try:
- return await self.redis.exists(f"{key}")
- except Exception as e:
- log.error(f"判断缓存是否存在失败: {e!s}")
- return False
- async def ttl(self, key: str) -> int:
- """获取缓存过期时间
- 参数:
- - key (str): 缓存键名
- 返回:
- - int: 返回缓存过期时间,单位为秒,如果缓存没有设置过期时间则返回-1
- """
- try:
- return await self.redis.ttl(f"{key}")
- except Exception as e:
- log.error(f"获取缓存过期时间失败: {e!s}")
- return -1
- async def renew_lock(self, key: str, expire: int, value: str) -> bool:
- """续约分布式锁
- 参数:
- - key (str): 锁键名
- - expire (int): 新的过期时间,单位为秒
- - value (str): 锁值,用于验证锁的持有者
- 返回:
- - bool: 如果续约锁成功则返回True,否则返回False
- """
- try:
- # 使用Lua脚本确保原子性验证和续约
- script = """
- if redis.call('get', KEYS[1]) == ARGV[1] then
- return redis.call('expire', KEYS[1], ARGV[2])
- else
- return 0
- end
- """
- result = await self.redis.eval(script, 1, key, value, str(expire)) # pyright: ignore[reportGeneralTypeIssues]
- return result == 1
- except Exception as e:
- log.error(f"续约分布式锁失败: {e!s}")
- return False
- async def expire(self, key: str, expire: int) -> bool:
- """设置缓存过期时间
- 参数:
- - key (str): 缓存键名
- - expire (int): 过期时间,单位为秒
- 返回:
- - bool: 如果设置缓存过期时间成功则返回True,否则返回False
- """
- try:
- return await self.redis.expire(f"{key}", expire)
- except Exception as e:
- log.error(f"设置缓存过期时间失败: {e!s}")
- return False
- async def info(self) -> dict:
- """获取缓存信息
- 返回:
- - dict: 返回缓存信息字典,如果获取失败则返回空字典
- """
- try:
- return await self.redis.info()
- except Exception as e:
- log.error(f"获取缓存信息失败: {e!s}")
- return {}
- async def db_size(self) -> int:
- """获取数据库大小
- 返回:
- - int: 返回数据库大小,如果获取失败则返回0
- """
- try:
- return await self.redis.dbsize()
- except Exception as e:
- log.error(f"获取数据库大小失败: {e!s}")
- return 0
- async def commandstats(self) -> dict:
- """获取命令统计信息
- 返回:
- - dict: 返回命令统计信息字典,如果获取失败则返回空字典
- """
- try:
- return await self.redis.info("commandstats")
- except Exception as e:
- log.error(f"获取命令统计信息失败: {e!s}")
- return {}
- async def hash_set(self, name: str, key: str, value: Any) -> bool:
- """设置哈希缓存
- 参数:
- - name (str): 哈希缓存名称
- - key (str): 哈希缓存键名
- - value (Any): 哈希缓存值
- 返回:
- - bool: 如果设置哈希缓存成功则返回True,否则返回False
- """
- try:
- self.redis.hset(name=name, key=key, value=value)
- return True
- except Exception as e:
- log.error(f"设置哈希缓存失败: {e!s}")
- return False
- async def hash_get(self, name: str, keys: list[str]) -> Awaitable[list[Any]] | list[Any]:
- """获取哈希缓存
- 参数:
- - name (str): 哈希缓存名称
- - keys (list[str]): 哈希缓存键名列表
- 返回:
- - Awaitable[list[Any]] | list[Any]: 返回哈希缓存值列表,如果获取失败则返回空列表
- """
- try:
- data = self.redis.hmget(name=name, keys=keys)
- return data
- except Exception as e:
- log.error(f"获取哈希缓存失败: {e!s}")
- return []
|