ip_local_util.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import re
  2. import httpx
  3. from app.config.setting import settings
  4. from app.core.logger import log
  5. class IpLocalUtil:
  6. """
  7. 获取IP归属地工具类
  8. """
  9. @classmethod
  10. def is_valid_ip(cls, ip: str) -> bool:
  11. """
  12. 校验IP格式是否合法。
  13. 参数:
  14. - ip (str): IP地址。
  15. 返回:
  16. - bool: 是否合法。
  17. """
  18. ip_pattern = (
  19. r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
  20. )
  21. return bool(re.match(ip_pattern, ip))
  22. @classmethod
  23. def is_private_ip(cls, ip: str) -> bool:
  24. """
  25. 判断是否为内网IP。
  26. 参数:
  27. - ip (str): IP地址。
  28. 返回:
  29. - bool: 是否为内网IP。
  30. """
  31. priv_pattern = r"^(127\.|10\.|172\.(1[6-9]|2[0-9]|3[01])\.|192\.168\.)"
  32. return bool(re.match(priv_pattern, ip))
  33. @classmethod
  34. async def resolve_location_for_log(cls, ip: str | None) -> str | None:
  35. """
  36. 登录与操作日志写入 ``login_location`` 时的统一解析入口。
  37. 与 ``settings.LOGIN_RESOLVE_IP_LOCATION`` 联动:关闭时不请求外网,仅返回占位描述,
  38. 避免登录 POST 在 ``OperationLogRoute`` 收尾阶段因外网查询变慢。
  39. 参数:
  40. - ip (str | None): 客户端 IP,可为空。
  41. 返回:
  42. - str | None: 展示用归属地文案;无需解析时可能为 ``None``。
  43. """
  44. if not ip:
  45. return None
  46. if not settings.LOGIN_RESOLVE_IP_LOCATION:
  47. return (
  48. "内网IP"
  49. if cls.is_private_ip(ip)
  50. else "未解析(已关闭归属地查询)"
  51. )
  52. return await cls.get_ip_location(ip)
  53. @classmethod
  54. async def get_ip_location(cls, ip: str) -> str | None:
  55. """
  56. 获取IP归属地信息。
  57. 参数:
  58. - ip (str): IP地址。
  59. 返回:
  60. - str | None: IP归属地信息,失败时返回"未知"或None。
  61. """
  62. # 校验IP格式
  63. if not cls.is_valid_ip(ip):
  64. log.error(f"IP格式不合法: {ip}")
  65. return "未知"
  66. # 内网IP直接返回
  67. if cls.is_private_ip(ip):
  68. return "内网IP"
  69. try:
  70. # 使用ip-api.com API获取IP归属地信息
  71. async with httpx.AsyncClient(timeout=settings.HTTPX_DEFAULT_TIMEOUT) as client:
  72. # 尝试使用 ip9.com.cn API
  73. url = f"https://ip9.com.cn/get?ip={ip}"
  74. response = await cls._make_api_request(client, url)
  75. if response and response.json().get("ret") == 200:
  76. result = response.json().get("data", {})
  77. return f"{result.get('country', '')}-{result.get('prov', '')}-{result.get('city', '')}-{result.get('area', '')}-{result.get('isp', '')}"
  78. # 尝试使用百度 API
  79. url = f"https://qifu-api.baidubce.com/ip/geo/v1/district?ip={ip}"
  80. response = await cls._make_api_request(client, url)
  81. if response and response.json().get("code") == "Success":
  82. data = response.json().get("data", {})
  83. # 修正原代码中的格式错误
  84. return f"{data.get('country', '')}-{data.get('prov', '')}-{data.get('city', '')}-{data.get('district', '')}-{data.get('isp', '')}"
  85. except Exception as e:
  86. log.error(f"获取IP归属地失败: {e}")
  87. return "未知"
  88. @classmethod
  89. async def _make_api_request(cls, client: httpx.AsyncClient, url: str):
  90. """
  91. 单独的 API 请求方法,包含重试机制。
  92. 参数:
  93. - client (AsyncClient): httpx 异步客户端。
  94. - url (str): 请求 URL。
  95. 返回:
  96. - Response | None: 响应对象,失败时返回None。
  97. """
  98. max_retries = 3
  99. for attempt in range(max_retries):
  100. try:
  101. response = await client.get(url, timeout=settings.HTTPX_DEFAULT_TIMEOUT)
  102. if response.status_code == 200:
  103. return response
  104. except Exception as e:
  105. if attempt < max_retries - 1:
  106. continue
  107. log.error(f"API 请求失败: {e}")
  108. return None