| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import re
- import httpx
- from app.config.setting import settings
- from app.core.logger import log
- class IpLocalUtil:
- """
- 获取IP归属地工具类
- """
- @classmethod
- def is_valid_ip(cls, ip: str) -> bool:
- """
- 校验IP格式是否合法。
- 参数:
- - ip (str): IP地址。
- 返回:
- - bool: 是否合法。
- """
- ip_pattern = (
- r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
- )
- return bool(re.match(ip_pattern, ip))
- @classmethod
- def is_private_ip(cls, ip: str) -> bool:
- """
- 判断是否为内网IP。
- 参数:
- - ip (str): IP地址。
- 返回:
- - bool: 是否为内网IP。
- """
- priv_pattern = r"^(127\.|10\.|172\.(1[6-9]|2[0-9]|3[01])\.|192\.168\.)"
- return bool(re.match(priv_pattern, ip))
- @classmethod
- async def resolve_location_for_log(cls, ip: str | None) -> str | None:
- """
- 登录与操作日志写入 ``login_location`` 时的统一解析入口。
- 与 ``settings.LOGIN_RESOLVE_IP_LOCATION`` 联动:关闭时不请求外网,仅返回占位描述,
- 避免登录 POST 在 ``OperationLogRoute`` 收尾阶段因外网查询变慢。
- 参数:
- - ip (str | None): 客户端 IP,可为空。
- 返回:
- - str | None: 展示用归属地文案;无需解析时可能为 ``None``。
- """
- if not ip:
- return None
- if not settings.LOGIN_RESOLVE_IP_LOCATION:
- return (
- "内网IP"
- if cls.is_private_ip(ip)
- else "未解析(已关闭归属地查询)"
- )
- return await cls.get_ip_location(ip)
- @classmethod
- async def get_ip_location(cls, ip: str) -> str | None:
- """
- 获取IP归属地信息。
- 参数:
- - ip (str): IP地址。
- 返回:
- - str | None: IP归属地信息,失败时返回"未知"或None。
- """
- # 校验IP格式
- if not cls.is_valid_ip(ip):
- log.error(f"IP格式不合法: {ip}")
- return "未知"
- # 内网IP直接返回
- if cls.is_private_ip(ip):
- return "内网IP"
- try:
- # 使用ip-api.com API获取IP归属地信息
- async with httpx.AsyncClient(timeout=settings.HTTPX_DEFAULT_TIMEOUT) as client:
- # 尝试使用 ip9.com.cn API
- url = f"https://ip9.com.cn/get?ip={ip}"
- response = await cls._make_api_request(client, url)
- if response and response.json().get("ret") == 200:
- result = response.json().get("data", {})
- return f"{result.get('country', '')}-{result.get('prov', '')}-{result.get('city', '')}-{result.get('area', '')}-{result.get('isp', '')}"
- # 尝试使用百度 API
- url = f"https://qifu-api.baidubce.com/ip/geo/v1/district?ip={ip}"
- response = await cls._make_api_request(client, url)
- if response and response.json().get("code") == "Success":
- data = response.json().get("data", {})
- # 修正原代码中的格式错误
- return f"{data.get('country', '')}-{data.get('prov', '')}-{data.get('city', '')}-{data.get('district', '')}-{data.get('isp', '')}"
- except Exception as e:
- log.error(f"获取IP归属地失败: {e}")
- return "未知"
- @classmethod
- async def _make_api_request(cls, client: httpx.AsyncClient, url: str):
- """
- 单独的 API 请求方法,包含重试机制。
- 参数:
- - client (AsyncClient): httpx 异步客户端。
- - url (str): 请求 URL。
- 返回:
- - Response | None: 响应对象,失败时返回None。
- """
- max_retries = 3
- for attempt in range(max_retries):
- try:
- response = await client.get(url, timeout=settings.HTTPX_DEFAULT_TIMEOUT)
- if response.status_code == 200:
- return response
- except Exception as e:
- if attempt < max_retries - 1:
- continue
- log.error(f"API 请求失败: {e}")
- return None
|