| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- """请求 Host(通配二级子域)与当前用户租户 ``code`` 一致性(可选开启)。"""
- from __future__ import annotations
- import ipaddress
- from fastapi import Request
- from app.api.v1.module_system.user.model import UserModel
- from app.config.setting import settings
- from app.core.exceptions import CustomException
- from .constants import PLATFORM_TENANT_ID
- def _host_without_port(raw: str) -> str:
- raw = raw.strip().lower()
- if not raw:
- return ""
- if raw.startswith("["):
- end = raw.find("]")
- return raw[: end + 1] if end != -1 else raw
- return raw.split(":")[0]
- def _is_ip_host(host: str) -> bool:
- if host.startswith("["):
- inner = host[1:-1] if host.endswith("]") else host[1:]
- try:
- ipaddress.ip_address(inner)
- return True
- except ValueError:
- return False
- try:
- ipaddress.ip_address(host)
- return True
- except ValueError:
- return False
- def parse_tenant_code_from_host(host_header: str | None) -> str | None:
- """
- 从 ``Host`` 解析租户编码:``{code}.{TENANT_HOST_BASE_DOMAIN}`` → ``code``。
- - ``localhost`` / 纯 IP / 非本 ``base`` 域:返回 ``None``(调用方不强制校验)。
- - 首段在 ``TENANT_HOST_IGNORE_PREFIXES`` 内(如 ``api``、``www``):返回 ``None``,便于独立 API 域名。
- """
- base = (settings.TENANT_HOST_BASE_DOMAIN or "").strip().lower()
- if not base:
- return None
- host = _host_without_port(host_header or "")
- if not host or host == "localhost" or _is_ip_host(host):
- return None
- suffix = "." + base
- if not host.endswith(suffix) or host == base:
- return None
- prefix = host[: -len(suffix)]
- if not prefix:
- return None
- first = prefix.split(".")[0]
- ignore = {x.strip().lower() for x in settings.TENANT_HOST_IGNORE_PREFIXES if x.strip()}
- if first in ignore:
- return None
- return first
- def ensure_tenant_host_matches_user(request: Request, user: UserModel) -> None:
- """
- 若开启 ``TENANT_HOST_ENFORCE`` 且能从 Host 解析出租户 code,则须与用户所属租户 code 一致。
- 平台超管(与 ``should_skip_tenant_filter`` 一致:超管且平台租户)不校验。
- """
- if not settings.TENANT_HOST_ENFORCE:
- return
- if not (settings.TENANT_HOST_BASE_DOMAIN or "").strip():
- return
- if getattr(user, "is_superuser", False) and getattr(user, "tenant_id", None) == PLATFORM_TENANT_ID:
- return
- code_from_host = parse_tenant_code_from_host(request.headers.get("host"))
- if code_from_host is None:
- return
- tenant = getattr(user, "tenant", None)
- tcode = (getattr(tenant, "code", None) or "").strip().lower()
- if not tcode or tcode != code_from_host.lower():
- raise CustomException(
- msg="当前访问域名与账号所属租户不一致",
- code=10403,
- status_code=403,
- )
|