service.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import json
  2. from redis.asyncio.client import Redis
  3. from app.common.enums import RedisInitKeyConfig
  4. from app.core.logger import log
  5. from app.core.redis_crud import RedisCURD
  6. from app.core.security import decode_access_token
  7. from .schema import OnlineQueryParam
  8. class OnlineService:
  9. """在线用户管理模块服务层"""
  10. @classmethod
  11. async def get_online_list_service(
  12. cls, redis: Redis, search: OnlineQueryParam | None = None
  13. ) -> list[dict]:
  14. """
  15. 获取在线用户列表信息(支持分页和搜索)
  16. 参数:
  17. - redis (Redis): Redis异步客户端实例。
  18. - search (OnlineQueryParam | None): 查询参数模型。
  19. 返回:
  20. - list[dict]: 在线用户详情字典列表。
  21. """
  22. keys = await RedisCURD(redis).get_keys(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:*")
  23. tokens = await RedisCURD(redis).mget(keys)
  24. online_users = []
  25. for token in tokens:
  26. if not token:
  27. continue
  28. try:
  29. payload = decode_access_token(token=token)
  30. session_info = json.loads(payload.sub)
  31. if cls._match_search_conditions(session_info, search):
  32. online_users.append(session_info)
  33. except Exception as e:
  34. log.error(f"解析在线用户数据失败: {e}")
  35. continue
  36. # 按照 login_time 倒序排序
  37. online_users.sort(key=lambda x: x.get("login_time", ""), reverse=True)
  38. return online_users
  39. @classmethod
  40. async def delete_online_service(cls, redis: Redis, session_id: str) -> bool:
  41. """
  42. 强制下线指定在线用户
  43. 参数:
  44. - redis (Redis): Redis异步客户端实例。
  45. - session_id (str): 在线用户会话ID。
  46. 返回:
  47. - bool: 如果操作成功则返回True,否则返回False。
  48. """
  49. # 删除 token
  50. await RedisCURD(redis).delete(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:{session_id}")
  51. await RedisCURD(redis).delete(f"{RedisInitKeyConfig.REFRESH_TOKEN.key}:{session_id}")
  52. log.info(f"强制下线用户会话: {session_id}")
  53. return True
  54. @classmethod
  55. async def clear_online_service(cls, redis: Redis) -> bool:
  56. """
  57. 强制下线所有在线用户
  58. 参数:
  59. - redis (Redis): Redis异步客户端实例。
  60. 返回:
  61. - bool: 如果操作成功则返回True,否则返回False。
  62. """
  63. # 删除 token
  64. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:*")
  65. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.REFRESH_TOKEN.key}:*")
  66. log.info("清除所有在线用户会话成功")
  67. return True
  68. @staticmethod
  69. def _match_search_conditions(online_info: dict, search: OnlineQueryParam | None = None) -> bool:
  70. """
  71. 检查是否匹配搜索条件
  72. 参数:
  73. - online_info (dict): 在线用户信息字典。
  74. - search (OnlineQueryParam | None): 查询参数模型。
  75. 返回:
  76. - bool: 如果匹配则返回True,否则返回False。
  77. """
  78. if not search:
  79. return True
  80. if search.name and search.name[1]:
  81. keyword = search.name[1].strip("%")
  82. if keyword.lower() not in online_info.get("name", "").lower():
  83. return False
  84. if search.ipaddr and search.ipaddr[1]:
  85. keyword = search.ipaddr[1].strip("%")
  86. if keyword not in online_info.get("ipaddr", ""):
  87. return False
  88. if search.login_location and search.login_location[1]:
  89. keyword = search.login_location[1].strip("%")
  90. if keyword.lower() not in online_info.get("login_location", "").lower():
  91. return False
  92. return True