| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import hashlib
- import hmac
- import secrets
- import time
- from datetime import datetime, timedelta
- from typing import Optional
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD
- from app.plugin.module_payment.apikey.model import TenantApiKeyModel
- from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse
- class TenantApiKeyService:
- """租户API Key服务"""
- @staticmethod
- def generate_api_key(tenant_id: int) -> tuple[str, str]:
- random_part = secrets.token_hex(32)
- timestamp = str(int(time.time()))
- api_key = f"TENANT_{tenant_id}_{timestamp}_{random_part[:16]}"
- api_secret = secrets.token_hex(64)
- return api_key, api_secret
- @staticmethod
- def generate_signature(api_secret: str, request_data: dict) -> str:
- sorted_data = sorted(request_data.items(), key=lambda x: x[0])
- sign_str = "&".join([f"{k}={v}" for k, v in sorted_data])
- signature = hmac.new(
- api_secret.encode('utf-8'),
- sign_str.encode('utf-8'),
- hashlib.sha256
- ).hexdigest()
- return signature
- @staticmethod
- def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool:
- expected_signature = TenantApiKeyService.generate_signature(api_secret, request_data)
- return expected_signature == signature
- @staticmethod
- async def create_api_key(
- auth: AuthSchema,
- tenant_id: int,
- expired_days: Optional[int] = 365,
- description: Optional[str] = None,
- ) -> TenantApiKeyModel:
- api_key, api_secret = TenantApiKeyService.generate_api_key(tenant_id)
- return await TenantApiKeyCRUD(auth).create_crud(
- api_key=api_key,
- api_secret=api_secret,
- tenant_id=tenant_id,
- expired_at=datetime.now() + timedelta(days=expired_days or 365),
- description=description,
- )
- @staticmethod
- async def validate_api_key(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]:
- return await TenantApiKeyCRUD(auth).get_by_api_key(api_key)
- @staticmethod
- async def get_api_key_page_service(
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- tenant_id: Optional[int] = None,
- status: Optional[str] = None,
- ) -> dict:
- offset = (page_no - 1) * page_size
- search = {}
- if tenant_id:
- search["tenant_id"] = tenant_id
- if status:
- search["status"] = status
- return await TenantApiKeyCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=[{"created_time": "desc"}],
- search=search if search else {},
- out_schema=TenantApiKeyListResponse,
- )
- @staticmethod
- async def update_api_key_status(
- auth: AuthSchema,
- api_key_id: int,
- status: str,
- ) -> Optional[TenantApiKeyModel]:
- return await TenantApiKeyCRUD(auth).update_status_crud(api_key_id=api_key_id, status=status)
- @staticmethod
- async def delete_api_key(auth: AuthSchema, api_key_id: int) -> bool:
- return await TenantApiKeyCRUD(auth).delete_crud(api_key_id=api_key_id)
- @staticmethod
- async def log_api_call(
- auth: AuthSchema,
- api_key_id: Optional[int],
- tenant_id: int,
- endpoint: str,
- method: str,
- request_ip: str,
- request_data: Optional[dict],
- response_code: int,
- start_time: float,
- ) -> None:
- response_time = (time.time() - start_time) * 1000
- await TenantApiLogCRUD(auth).create_crud(
- api_key_id=api_key_id,
- tenant_id=tenant_id,
- endpoint=endpoint,
- method=method,
- request_ip=request_ip,
- request_data=str(request_data) if request_data else None,
- response_code=response_code,
- response_time=response_time,
- )
- @staticmethod
- async def update_last_used(auth: AuthSchema, api_key_id: int) -> None:
- await TenantApiKeyCRUD(auth).update_last_used_crud(api_key_id)
|