import hashlib import hmac import json import secrets import time import urllib.parse from datetime import datetime, timedelta from typing import Any, Optional from app.api.v1.module_system.auth.schema import AuthSchema from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD from app.plugin.module_payment.apikey.model import TenantApiKeyModel from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse, ApiKeyQueryParam class SignatureGenerator: """签名生成器 - 参考支付宝/微信支付标准""" @staticmethod def generate_signature( api_secret: str, request_data: dict, exclude_params: tuple = ("sign",), ) -> str: """ 生成签名 - 参考支付宝/微信支付签名规范 Args: api_secret: API密钥 request_data: 请求参数字典 sign_param_name: 签名参数名(不参与签名) exclude_params: 需要排除的参数列表 Returns: 签名字符串 """ # 1. 过滤参数 filtered_data = {} for key, value in request_data.items(): if key in exclude_params: continue if value is None or value == "": continue if isinstance(value, (list, dict)) and len(value) == 0: continue filtered_data[key] = value # 2. 字典序排序 (ASCII) sorted_data = sorted(filtered_data.items(), key=lambda x: x[0]) collect = [] for key, value in sorted_data: if isinstance(value, (dict, list)): value = json.dumps(value, sort_keys=True, separators=(",", ":")) encoded_value = urllib.parse.quote(str(value), safe="") collect.append(f"{key}={encoded_value}") sign_str = "&".join(collect) signature = hmac.new( api_secret.encode("utf-8"), sign_str.encode("utf-8"), hashlib.sha256, ).hexdigest() return signature @staticmethod def verify_signature( api_secret: str, request_data: dict, signature: str, ) -> bool: """验证签名""" expected_signature = SignatureGenerator.generate_signature( api_secret, request_data ) return expected_signature == signature class TenantApiKeyService: """租户API Key服务""" @staticmethod def generate_api_key(tenant_id: int) -> tuple[str, str]: random_part = secrets.token_hex(32) timestamp = str(int(time.time())) secure_number = secrets.randbelow(9000) + 1000 api_key = f"{secure_number}{tenant_id}{timestamp}{random_part[:16]}" api_secret = secrets.token_hex(64) return api_key, api_secret @staticmethod def generate_signature(api_secret: str, request_data: dict) -> str: return SignatureGenerator.generate_signature(api_secret, request_data) @staticmethod def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool: return SignatureGenerator.verify_signature(api_secret, request_data, signature) @staticmethod async def create_api_key( auth: AuthSchema, tenant_id: int, expired_days: Optional[int] = 365, return_url: Optional[str] = None, description: Optional[str] = None, ) -> TenantApiKeyModel: api_key, api_secret = TenantApiKeyService.generate_api_key(tenant_id) return await TenantApiKeyCRUD(auth).create_crud( api_key=api_key, api_secret=api_secret, tenant_id=tenant_id, expired_at=datetime.now() + timedelta(days=expired_days or 365), description=description, return_url=return_url, ) @staticmethod async def get_apikey_service(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]: return await TenantApiKeyCRUD(auth).get(api_key=api_key) @staticmethod async def validate_api_key(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]: return await TenantApiKeyCRUD(auth).get_by_api_key(api_key) @staticmethod async def get_api_key_page_service( auth: AuthSchema, page_no: int, page_size: int, search: Optional[ApiKeyQueryParam] = None, order_by: Optional[list[dict[str, str]]] = None, ) -> dict: offset = (page_no - 1) * page_size return await TenantApiKeyCRUD(auth).page( offset=offset, limit=page_size, order_by=order_by or [{"created_time": "desc"}], search=search.__dict__ if search else {}, out_schema=TenantApiKeyListResponse, ) @staticmethod async def update_api_key_status( auth: AuthSchema, api_key_id: int, status: str, ) -> Optional[TenantApiKeyModel]: return await TenantApiKeyCRUD(auth).update_status_crud(api_key_id=api_key_id, status=status) @staticmethod async def delete_api_key(auth: AuthSchema, api_key_id: int) -> None: await TenantApiKeyCRUD(auth).delete_crud(api_key_id=api_key_id) @staticmethod async def log_api_call( auth: AuthSchema, api_key_id: Optional[int], tenant_id: int, endpoint: str, method: str, request_ip: str, request_data: Optional[dict], response_code: int, start_time: float, ) -> None: response_time = (time.time() - start_time) * 1000 await TenantApiLogCRUD(auth).create_crud( api_key_id=api_key_id, tenant_id=tenant_id, endpoint=endpoint, method=method, request_ip=request_ip, request_data=str(request_data) if request_data else None, response_code=response_code, response_time=response_time, ) @staticmethod async def update_last_used(auth: AuthSchema, api_key_id: int) -> None: await TenantApiKeyCRUD(auth).update_last_used_crud(api_key_id)