| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- import hashlib
- import hmac
- import json
- import secrets
- import time
- import urllib.parse
- from datetime import datetime, timedelta
- from typing import Any, Optional
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD
- from app.plugin.module_payment.apikey.model import TenantApiKeyModel
- from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse, ApiKeyQueryParam
- class SignatureGenerator:
- """签名生成器 - 参考支付宝/微信支付标准"""
- @staticmethod
- def generate_signature(
- api_secret: str,
- request_data: dict,
- exclude_params: tuple = ("sign",),
- ) -> str:
- """
- 生成签名 - 参考支付宝/微信支付签名规范
- Args:
- api_secret: API密钥
- request_data: 请求参数字典
- sign_param_name: 签名参数名(不参与签名)
- exclude_params: 需要排除的参数列表
- Returns:
- 签名字符串
- """
- # 1. 过滤参数
- filtered_data = {}
- for key, value in request_data.items():
- if key in exclude_params:
- continue
- if value is None or value == "":
- continue
- if isinstance(value, (list, dict)) and len(value) == 0:
- continue
- filtered_data[key] = value
- # 2. 字典序排序 (ASCII)
- sorted_data = sorted(filtered_data.items(), key=lambda x: x[0])
- collect = []
- for key, value in sorted_data:
- if isinstance(value, (dict, list)):
- value = json.dumps(value, sort_keys=True, separators=(",", ":"))
- encoded_value = urllib.parse.quote(str(value), safe="")
- collect.append(f"{key}={encoded_value}")
- sign_str = "&".join(collect)
- signature = hmac.new(
- api_secret.encode("utf-8"),
- sign_str.encode("utf-8"),
- hashlib.sha256,
- ).hexdigest()
- return signature
- @staticmethod
- def verify_signature(
- api_secret: str,
- request_data: dict,
- signature: str,
- ) -> bool:
- """验证签名"""
- expected_signature = SignatureGenerator.generate_signature(
- api_secret, request_data
- )
- return expected_signature == signature
- class TenantApiKeyService:
- """租户API Key服务"""
- @staticmethod
- def generate_api_key(tenant_id: int) -> tuple[str, str]:
- random_part = secrets.token_hex(32)
- timestamp = str(int(time.time()))
- secure_number = secrets.randbelow(9000) + 1000
- api_key = f"{secure_number}{tenant_id}{timestamp}{random_part[:16]}"
- api_secret = secrets.token_hex(64)
- return api_key, api_secret
- @staticmethod
- def generate_signature(api_secret: str, request_data: dict) -> str:
- return SignatureGenerator.generate_signature(api_secret, request_data)
- @staticmethod
- def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool:
- return SignatureGenerator.verify_signature(api_secret, request_data, signature)
- @staticmethod
- async def create_api_key(
- auth: AuthSchema,
- tenant_id: int,
- expired_days: Optional[int] = 365,
- return_url: Optional[str] = None,
- description: Optional[str] = None,
- ) -> TenantApiKeyModel:
- api_key, api_secret = TenantApiKeyService.generate_api_key(tenant_id)
- return await TenantApiKeyCRUD(auth).create_crud(
- api_key=api_key,
- api_secret=api_secret,
- tenant_id=tenant_id,
- expired_at=datetime.now() + timedelta(days=expired_days or 365),
- description=description,
- return_url=return_url,
- )
- @staticmethod
- async def get_apikey_service(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]:
- return await TenantApiKeyCRUD(auth).get(api_key=api_key)
- @staticmethod
- async def validate_api_key(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]:
- return await TenantApiKeyCRUD(auth).get_by_api_key(api_key)
- @staticmethod
- async def get_api_key_page_service(
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: Optional[ApiKeyQueryParam] = None,
- order_by: Optional[list[dict[str, str]]] = None,
- ) -> dict:
- offset = (page_no - 1) * page_size
- return await TenantApiKeyCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=order_by or [{"created_time": "desc"}],
- search=search.__dict__ if search else {},
- out_schema=TenantApiKeyListResponse,
- )
- @staticmethod
- async def update_api_key_status(
- auth: AuthSchema,
- api_key_id: int,
- status: str,
- ) -> Optional[TenantApiKeyModel]:
- return await TenantApiKeyCRUD(auth).update_status_crud(api_key_id=api_key_id, status=status)
- @staticmethod
- async def delete_api_key(auth: AuthSchema, api_key_id: int) -> None:
- await TenantApiKeyCRUD(auth).delete_crud(api_key_id=api_key_id)
- @staticmethod
- async def log_api_call(
- auth: AuthSchema,
- api_key_id: Optional[int],
- tenant_id: int,
- endpoint: str,
- method: str,
- request_ip: str,
- request_data: Optional[dict],
- response_code: int,
- start_time: float,
- ) -> None:
- response_time = (time.time() - start_time) * 1000
- await TenantApiLogCRUD(auth).create_crud(
- api_key_id=api_key_id,
- tenant_id=tenant_id,
- endpoint=endpoint,
- method=method,
- request_ip=request_ip,
- request_data=str(request_data) if request_data else None,
- response_code=response_code,
- response_time=response_time,
- )
- @staticmethod
- async def update_last_used(auth: AuthSchema, api_key_id: int) -> None:
- await TenantApiKeyCRUD(auth).update_last_used_crud(api_key_id)
|