import hashlib import hmac import secrets import time from datetime import datetime, timedelta from typing import Optional from app.api.v1.module_system.auth.schema import AuthSchema from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD from app.plugin.module_payment.apikey.model import TenantApiKeyModel from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse class TenantApiKeyService: """租户API Key服务""" @staticmethod def generate_api_key(tenant_id: int) -> tuple[str, str]: random_part = secrets.token_hex(32) timestamp = str(int(time.time())) api_key = f"TENANT_{tenant_id}_{timestamp}_{random_part[:16]}" api_secret = secrets.token_hex(64) return api_key, api_secret @staticmethod def generate_signature(api_secret: str, request_data: dict) -> str: sorted_data = sorted(request_data.items(), key=lambda x: x[0]) sign_str = "&".join([f"{k}={v}" for k, v in sorted_data]) signature = hmac.new( api_secret.encode('utf-8'), sign_str.encode('utf-8'), hashlib.sha256 ).hexdigest() return signature @staticmethod def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool: expected_signature = TenantApiKeyService.generate_signature(api_secret, request_data) return expected_signature == signature @staticmethod async def create_api_key( auth: AuthSchema, tenant_id: int, expired_days: Optional[int] = 365, description: Optional[str] = None, ) -> TenantApiKeyModel: api_key, api_secret = TenantApiKeyService.generate_api_key(tenant_id) return await TenantApiKeyCRUD(auth).create_crud( api_key=api_key, api_secret=api_secret, tenant_id=tenant_id, expired_at=datetime.now() + timedelta(days=expired_days or 365), description=description, ) @staticmethod async def validate_api_key(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]: return await TenantApiKeyCRUD(auth).get_by_api_key(api_key) @staticmethod async def get_api_key_page_service( auth: AuthSchema, page_no: int, page_size: int, tenant_id: Optional[int] = None, status: Optional[str] = None, ) -> dict: offset = (page_no - 1) * page_size search = {} if tenant_id: search["tenant_id"] = tenant_id if status: search["status"] = status return await TenantApiKeyCRUD(auth).page( offset=offset, limit=page_size, order_by=[{"created_time": "desc"}], search=search if search else {}, out_schema=TenantApiKeyListResponse, ) @staticmethod async def update_api_key_status( auth: AuthSchema, api_key_id: int, status: str, ) -> Optional[TenantApiKeyModel]: return await TenantApiKeyCRUD(auth).update_status_crud(api_key_id=api_key_id, status=status) @staticmethod async def delete_api_key(auth: AuthSchema, api_key_id: int) -> bool: return await TenantApiKeyCRUD(auth).delete_crud(api_key_id=api_key_id) @staticmethod async def log_api_call( auth: AuthSchema, api_key_id: Optional[int], tenant_id: int, endpoint: str, method: str, request_ip: str, request_data: Optional[dict], response_code: int, start_time: float, ) -> None: response_time = (time.time() - start_time) * 1000 await TenantApiLogCRUD(auth).create_crud( api_key_id=api_key_id, tenant_id=tenant_id, endpoint=endpoint, method=method, request_ip=request_ip, request_data=str(request_data) if request_data else None, response_code=response_code, response_time=response_time, ) @staticmethod async def update_last_used(auth: AuthSchema, api_key_id: int) -> None: await TenantApiKeyCRUD(auth).update_last_used_crud(api_key_id)