import time from typing import Optional, Dict, Any, Callable, Awaitable from fastapi import Request, Response, Depends, HTTPException, security from sqlalchemy.ext.asyncio import AsyncSession from app.core.dependencies import db_getter from app.api.v1.module_system.auth.schema import AuthSchema from app.plugin.module_payment.apikey.service import TenantApiKeyService from app.plugin.module_payment.apikey.model import TenantApiKeyModel class TenantApiKeyAuth: """ 租户API Key认证 """ def __init__(self, auto_error: bool = True): self.auto_error = auto_error async def __call__(self, request: Request, db: AsyncSession = Depends(db_getter)) -> Optional[TenantApiKeyModel]: """ 验证API Key """ # 记录请求开始时间 request.state.start_time = time.time() # 获取Authorization头 authorization = request.headers.get("Authorization", None) if not authorization: if self.auto_error: raise HTTPException(status_code=401, detail="Authorization header required") return None # 检查是否为ApiKey认证 if not authorization.startswith("ApiKey "): if self.auto_error: raise HTTPException(status_code=401, detail="Invalid authorization format") return None # 提取API Key和签名 auth_str = authorization[7:] if ":" in auth_str: api_key, signature = auth_str.split(":", 1) else: api_key, signature = auth_str, None # 创建AuthSchema对象 temp_auth = AuthSchema( user=None, check_data_scope=False, db=db, tenant_id=0 ) # 验证API Key api_key_obj = await TenantApiKeyService.validate_api_key(temp_auth, api_key) if not api_key_obj: # 记录失败日志 await TenantApiKeyService.log_api_call( auth=temp_auth, api_key_id=None, tenant_id=0, endpoint=str(request.url.path), method=request.method, request_ip=request.client.host if request.client else "unknown", request_data=None, response_code=401, start_time=request.state.start_time, ) if self.auto_error: raise HTTPException(status_code=401, detail="Invalid API Key") return None # 验证签名(如果提供) if signature: # 获取请求数据 try: request_data = await request.json() except Exception: request_data = {} if not TenantApiKeyService.verify_signature(api_key_obj.api_secret, request_data, signature): # 记录失败日志 await TenantApiKeyService.log_api_call( auth=temp_auth, api_key_id=api_key_obj.id, tenant_id=api_key_obj.tenant_id, endpoint=str(request.url.path), method=request.method, request_ip=request.client.host if request.client else "unknown", request_data=request_data, response_code=401, start_time=request.state.start_time, ) if self.auto_error: raise HTTPException(status_code=401, detail="Invalid Signature") return None # 记录成功日志 await TenantApiKeyService.log_api_call( auth=temp_auth, api_key_id=api_key_obj.id, tenant_id=api_key_obj.tenant_id, endpoint=str(request.url.path), method=request.method, request_ip=request.client.host if request.client else "unknown", request_data=None, # 避免记录敏感数据 response_code=200, start_time=request.state.start_time, ) # 更新最后使用时间 await TenantApiKeyService.update_last_used(temp_auth, api_key_obj.id) # 将API Key对象存储到请求状态 request.state.api_key = api_key_obj request.state.tenant_id = api_key_obj.tenant_id return api_key_obj async def get_tenant_from_api_key( request: Request, api_key: Optional[TenantApiKeyModel] = Depends(TenantApiKeyAuth(auto_error=False)), ) -> Optional[int]: """ 从API Key获取租户ID """ if api_key: return api_key.tenant_id return None