redis_crud.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import json
  2. from collections.abc import Awaitable
  3. from typing import Any
  4. from redis.asyncio.client import Redis
  5. from app.core.logger import log
  6. class RedisCURD:
  7. """缓存工具类"""
  8. def __init__(self, redis: Redis) -> None:
  9. """初始化"""
  10. self.redis = redis
  11. async def mget(self, keys: list) -> list:
  12. """批量获取缓存
  13. 参数:
  14. - keys (list): 键名列表
  15. 返回:
  16. - list: 返回缓存值列表,如果获取失败则返回空列表
  17. """
  18. try:
  19. data = await self.redis.mget(*[str(key) for key in keys])
  20. return data
  21. except Exception as e:
  22. log.error(f"批量获取缓存失败: {e!s}")
  23. return []
  24. async def get_keys(self, pattern: str = "*") -> list:
  25. """获取缓存键名
  26. 参数:
  27. - pattern (str, optional): 匹配模式,默认值为"*"。
  28. 返回:
  29. - list: 返回匹配的缓存键名列表,如果获取失败则返回空列表
  30. """
  31. try:
  32. keys = await self.redis.keys(f"{pattern}")
  33. return keys
  34. except Exception as e:
  35. log.error(f"获取缓存键名失败: {e!s}")
  36. return []
  37. async def get(self, key: str) -> Any:
  38. """获取缓存
  39. 参数:
  40. - key (str): 缓存键名
  41. 返回:
  42. - Any: 返回缓存值,如果缓存不存在则返回None
  43. """
  44. try:
  45. data = await self.redis.get(f"{key}")
  46. if data is None:
  47. return None
  48. return data
  49. except Exception as e:
  50. log.error(f"获取缓存失败: {e!s}")
  51. return None
  52. async def set(self, key: str, value: Any, expire: int | None = None) -> bool:
  53. """设置缓存
  54. 参数:
  55. - key (str): 缓存键名
  56. - value (Any): 缓存值
  57. - expire (int | None, optional): 过期时间,单位为秒,默认值为None。
  58. 返回:
  59. - bool: 如果设置缓存成功则返回True,否则返回False
  60. """
  61. try:
  62. # 根据数据类型选择序列化方式
  63. if isinstance(value, (int, float, str)):
  64. data = str(value).encode("utf-8")
  65. else:
  66. try:
  67. data = json.dumps(value).encode("utf-8")
  68. except Exception as e:
  69. log.error(f"序列化数据失败: {e!s}")
  70. return False
  71. await self.redis.set(name=key, value=data, ex=expire)
  72. return True
  73. except Exception as e:
  74. log.error(f"设置缓存失败: {e!s}")
  75. return False
  76. async def lock(self, key: str, expire: int, value: str | None = None) -> tuple[bool, str]:
  77. """获取分布式锁
  78. 参数:
  79. - key (str): 锁键名
  80. - expire (int): 锁过期时间,单位为秒
  81. - value (str, optional): 锁值,默认值为None(自动生成UUID)。
  82. 返回:
  83. - tuple[bool, str]: (获取锁是否成功, 锁值)
  84. """
  85. try:
  86. import uuid
  87. # 如果没有提供value,生成唯一的UUID
  88. lock_value = value or str(uuid.uuid4())
  89. # 使用setnx命令实现原子性锁获取
  90. result = await self.redis.set(
  91. name=key,
  92. value=lock_value,
  93. ex=expire,
  94. nx=True, # 只有当键不存在时才设置
  95. )
  96. return (result is not None, lock_value)
  97. except Exception as e:
  98. log.error(f"获取分布式锁失败: {e!s}")
  99. return (False, "")
  100. async def unlock(self, key: str, value: str) -> bool:
  101. """释放分布式锁(安全版本,验证锁值)
  102. 参数:
  103. - key (str): 锁键名
  104. - value (str): 锁值,用于验证锁的持有者
  105. 返回:
  106. - bool: 如果释放锁成功则返回True,否则返回False
  107. """
  108. try:
  109. # 使用Lua脚本确保原子性验证和删除
  110. script = """
  111. if redis.call('get', KEYS[1]) == ARGV[1] then
  112. return redis.call('del', KEYS[1])
  113. else
  114. return 0
  115. end
  116. """
  117. result = await self.redis.eval(script, 1, key, value) # pyright: ignore[reportGeneralTypeIssues]
  118. return result == 1
  119. except Exception as e:
  120. log.error(f"释放分布式锁失败: {e!s}")
  121. return False
  122. async def unlock_simple(self, key: str) -> bool:
  123. """释放分布式锁(简单版本,不验证锁值)
  124. 参数:
  125. - key (str): 锁键名
  126. 返回:
  127. - bool: 如果释放锁成功则返回True,否则返回False
  128. """
  129. try:
  130. await self.redis.delete(key)
  131. return True
  132. except Exception as e:
  133. log.error(f"释放分布式锁失败: {e!s}")
  134. return False
  135. async def delete(self, *keys: str) -> bool:
  136. """删除缓存
  137. 参数:
  138. - keys (str): 缓存键名
  139. 返回:
  140. - bool: 如果删除缓存成功则返回True,否则返回False
  141. """
  142. try:
  143. await self.redis.delete(*keys)
  144. return True
  145. except Exception as e:
  146. log.error(f"删除缓存失败: {e!s}")
  147. return False
  148. async def clear(self, pattern: str = "*") -> bool:
  149. """清空缓存
  150. 参数:
  151. - pattern (str, optional): 匹配模式,默认值为"*"。
  152. 返回:
  153. - bool: 如果清空缓存成功则返回True,否则返回False
  154. """
  155. try:
  156. keys = await self.redis.keys(f"{pattern}")
  157. if keys:
  158. await self.redis.delete(*keys)
  159. return True
  160. except Exception as e:
  161. log.error(f"清空缓存失败: {e!s}")
  162. return False
  163. async def exists(self, key: str) -> bool:
  164. """判断缓存是否存在
  165. 参数:
  166. - key (str): 缓存键名
  167. 返回:
  168. - bool: 如果缓存存在则返回True,否则返回False
  169. """
  170. try:
  171. return await self.redis.exists(f"{key}")
  172. except Exception as e:
  173. log.error(f"判断缓存是否存在失败: {e!s}")
  174. return False
  175. async def ttl(self, key: str) -> int:
  176. """获取缓存过期时间
  177. 参数:
  178. - key (str): 缓存键名
  179. 返回:
  180. - int: 返回缓存过期时间,单位为秒,如果缓存没有设置过期时间则返回-1
  181. """
  182. try:
  183. return await self.redis.ttl(f"{key}")
  184. except Exception as e:
  185. log.error(f"获取缓存过期时间失败: {e!s}")
  186. return -1
  187. async def renew_lock(self, key: str, expire: int, value: str) -> bool:
  188. """续约分布式锁
  189. 参数:
  190. - key (str): 锁键名
  191. - expire (int): 新的过期时间,单位为秒
  192. - value (str): 锁值,用于验证锁的持有者
  193. 返回:
  194. - bool: 如果续约锁成功则返回True,否则返回False
  195. """
  196. try:
  197. # 使用Lua脚本确保原子性验证和续约
  198. script = """
  199. if redis.call('get', KEYS[1]) == ARGV[1] then
  200. return redis.call('expire', KEYS[1], ARGV[2])
  201. else
  202. return 0
  203. end
  204. """
  205. result = await self.redis.eval(script, 1, key, value, str(expire)) # pyright: ignore[reportGeneralTypeIssues]
  206. return result == 1
  207. except Exception as e:
  208. log.error(f"续约分布式锁失败: {e!s}")
  209. return False
  210. async def expire(self, key: str, expire: int) -> bool:
  211. """设置缓存过期时间
  212. 参数:
  213. - key (str): 缓存键名
  214. - expire (int): 过期时间,单位为秒
  215. 返回:
  216. - bool: 如果设置缓存过期时间成功则返回True,否则返回False
  217. """
  218. try:
  219. return await self.redis.expire(f"{key}", expire)
  220. except Exception as e:
  221. log.error(f"设置缓存过期时间失败: {e!s}")
  222. return False
  223. async def info(self) -> dict:
  224. """获取缓存信息
  225. 返回:
  226. - dict: 返回缓存信息字典,如果获取失败则返回空字典
  227. """
  228. try:
  229. return await self.redis.info()
  230. except Exception as e:
  231. log.error(f"获取缓存信息失败: {e!s}")
  232. return {}
  233. async def db_size(self) -> int:
  234. """获取数据库大小
  235. 返回:
  236. - int: 返回数据库大小,如果获取失败则返回0
  237. """
  238. try:
  239. return await self.redis.dbsize()
  240. except Exception as e:
  241. log.error(f"获取数据库大小失败: {e!s}")
  242. return 0
  243. async def commandstats(self) -> dict:
  244. """获取命令统计信息
  245. 返回:
  246. - dict: 返回命令统计信息字典,如果获取失败则返回空字典
  247. """
  248. try:
  249. return await self.redis.info("commandstats")
  250. except Exception as e:
  251. log.error(f"获取命令统计信息失败: {e!s}")
  252. return {}
  253. async def hash_set(self, name: str, key: str, value: Any) -> bool:
  254. """设置哈希缓存
  255. 参数:
  256. - name (str): 哈希缓存名称
  257. - key (str): 哈希缓存键名
  258. - value (Any): 哈希缓存值
  259. 返回:
  260. - bool: 如果设置哈希缓存成功则返回True,否则返回False
  261. """
  262. try:
  263. self.redis.hset(name=name, key=key, value=value)
  264. return True
  265. except Exception as e:
  266. log.error(f"设置哈希缓存失败: {e!s}")
  267. return False
  268. async def hash_get(self, name: str, keys: list[str]) -> Awaitable[list[Any]] | list[Any]:
  269. """获取哈希缓存
  270. 参数:
  271. - name (str): 哈希缓存名称
  272. - keys (list[str]): 哈希缓存键名列表
  273. 返回:
  274. - Awaitable[list[Any]] | list[Any]: 返回哈希缓存值列表,如果获取失败则返回空列表
  275. """
  276. try:
  277. data = self.redis.hmget(name=name, keys=keys)
  278. return data
  279. except Exception as e:
  280. log.error(f"获取哈希缓存失败: {e!s}")
  281. return []