host.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. """请求 Host(通配二级子域)与当前用户租户 ``code`` 一致性(可选开启)。"""
  2. from __future__ import annotations
  3. import ipaddress
  4. from fastapi import Request
  5. from app.api.v1.module_system.user.model import UserModel
  6. from app.config.setting import settings
  7. from app.core.exceptions import CustomException
  8. from .constants import PLATFORM_TENANT_ID
  9. def _host_without_port(raw: str) -> str:
  10. raw = raw.strip().lower()
  11. if not raw:
  12. return ""
  13. if raw.startswith("["):
  14. end = raw.find("]")
  15. return raw[: end + 1] if end != -1 else raw
  16. return raw.split(":")[0]
  17. def _is_ip_host(host: str) -> bool:
  18. if host.startswith("["):
  19. inner = host[1:-1] if host.endswith("]") else host[1:]
  20. try:
  21. ipaddress.ip_address(inner)
  22. return True
  23. except ValueError:
  24. return False
  25. try:
  26. ipaddress.ip_address(host)
  27. return True
  28. except ValueError:
  29. return False
  30. def parse_tenant_code_from_host(host_header: str | None) -> str | None:
  31. """
  32. 从 ``Host`` 解析租户编码:``{code}.{TENANT_HOST_BASE_DOMAIN}`` → ``code``。
  33. - ``localhost`` / 纯 IP / 非本 ``base`` 域:返回 ``None``(调用方不强制校验)。
  34. - 首段在 ``TENANT_HOST_IGNORE_PREFIXES`` 内(如 ``api``、``www``):返回 ``None``,便于独立 API 域名。
  35. """
  36. base = (settings.TENANT_HOST_BASE_DOMAIN or "").strip().lower()
  37. if not base:
  38. return None
  39. host = _host_without_port(host_header or "")
  40. if not host or host == "localhost" or _is_ip_host(host):
  41. return None
  42. suffix = "." + base
  43. if not host.endswith(suffix) or host == base:
  44. return None
  45. prefix = host[: -len(suffix)]
  46. if not prefix:
  47. return None
  48. first = prefix.split(".")[0]
  49. ignore = {x.strip().lower() for x in settings.TENANT_HOST_IGNORE_PREFIXES if x.strip()}
  50. if first in ignore:
  51. return None
  52. return first
  53. def ensure_tenant_host_matches_user(request: Request, user: UserModel) -> None:
  54. """
  55. 若开启 ``TENANT_HOST_ENFORCE`` 且能从 Host 解析出租户 code,则须与用户所属租户 code 一致。
  56. 平台超管(与 ``should_skip_tenant_filter`` 一致:超管且平台租户)不校验。
  57. """
  58. if not settings.TENANT_HOST_ENFORCE:
  59. return
  60. if not (settings.TENANT_HOST_BASE_DOMAIN or "").strip():
  61. return
  62. if getattr(user, "is_superuser", False) and getattr(user, "tenant_id", None) == PLATFORM_TENANT_ID:
  63. return
  64. code_from_host = parse_tenant_code_from_host(request.headers.get("host"))
  65. if code_from_host is None:
  66. return
  67. tenant = getattr(user, "tenant", None)
  68. tcode = (getattr(tenant, "code", None) or "").strip().lower()
  69. if not tcode or tcode != code_from_host.lower():
  70. raise CustomException(
  71. msg="当前访问域名与账号所属租户不一致",
  72. code=10403,
  73. status_code=403,
  74. )