|
|
@@ -1,9 +1,11 @@
|
|
|
import hashlib
|
|
|
import hmac
|
|
|
+import json
|
|
|
import secrets
|
|
|
import time
|
|
|
+import urllib.parse
|
|
|
from datetime import datetime, timedelta
|
|
|
-from typing import Optional
|
|
|
+from typing import Any, Optional
|
|
|
|
|
|
from app.api.v1.module_system.auth.schema import AuthSchema
|
|
|
from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD
|
|
|
@@ -11,6 +13,69 @@ from app.plugin.module_payment.apikey.model import TenantApiKeyModel
|
|
|
from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse, ApiKeyQueryParam
|
|
|
|
|
|
|
|
|
+class SignatureGenerator:
|
|
|
+ """签名生成器 - 参考支付宝/微信支付标准"""
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def generate_signature(
|
|
|
+ api_secret: str,
|
|
|
+ request_data: dict,
|
|
|
+ exclude_params: tuple = ("sign",),
|
|
|
+ ) -> str:
|
|
|
+ """
|
|
|
+ 生成签名 - 参考支付宝/微信支付签名规范
|
|
|
+
|
|
|
+ Args:
|
|
|
+ api_secret: API密钥
|
|
|
+ request_data: 请求参数字典
|
|
|
+ sign_param_name: 签名参数名(不参与签名)
|
|
|
+ exclude_params: 需要排除的参数列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 签名字符串
|
|
|
+ """
|
|
|
+
|
|
|
+ # 1. 过滤参数
|
|
|
+ filtered_data = {}
|
|
|
+ for key, value in request_data.items():
|
|
|
+ if key in exclude_params:
|
|
|
+ continue
|
|
|
+ if value is None or value == "":
|
|
|
+ continue
|
|
|
+ if isinstance(value, (list, dict)) and len(value) == 0:
|
|
|
+ continue
|
|
|
+ filtered_data[key] = value
|
|
|
+
|
|
|
+ # 2. 字典序排序 (ASCII)
|
|
|
+ sorted_data = sorted(filtered_data.items(), key=lambda x: x[0])
|
|
|
+ collect = []
|
|
|
+ for key, value in sorted_data:
|
|
|
+ if isinstance(value, (dict, list)):
|
|
|
+ value = json.dumps(value, sort_keys=True, separators=(",", ":"))
|
|
|
+ encoded_value = urllib.parse.quote(str(value), safe="")
|
|
|
+ collect.append(f"{key}={encoded_value}")
|
|
|
+
|
|
|
+ sign_str = "&".join(collect)
|
|
|
+ signature = hmac.new(
|
|
|
+ api_secret.encode("utf-8"),
|
|
|
+ sign_str.encode("utf-8"),
|
|
|
+ hashlib.sha256,
|
|
|
+ ).hexdigest()
|
|
|
+ return signature
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def verify_signature(
|
|
|
+ api_secret: str,
|
|
|
+ request_data: dict,
|
|
|
+ signature: str,
|
|
|
+ ) -> bool:
|
|
|
+ """验证签名"""
|
|
|
+ expected_signature = SignatureGenerator.generate_signature(
|
|
|
+ api_secret, request_data
|
|
|
+ )
|
|
|
+ return expected_signature == signature
|
|
|
+
|
|
|
+
|
|
|
class TenantApiKeyService:
|
|
|
"""租户API Key服务"""
|
|
|
|
|
|
@@ -25,19 +90,11 @@ class TenantApiKeyService:
|
|
|
|
|
|
@staticmethod
|
|
|
def generate_signature(api_secret: str, request_data: dict) -> str:
|
|
|
- sorted_data = sorted(request_data.items(), key=lambda x: x[0])
|
|
|
- sign_str = "&".join([f"{k}={v}" for k, v in sorted_data])
|
|
|
- signature = hmac.new(
|
|
|
- api_secret.encode('utf-8'),
|
|
|
- sign_str.encode('utf-8'),
|
|
|
- hashlib.sha256
|
|
|
- ).hexdigest()
|
|
|
- return signature
|
|
|
+ return SignatureGenerator.generate_signature(api_secret, request_data)
|
|
|
|
|
|
@staticmethod
|
|
|
def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool:
|
|
|
- expected_signature = TenantApiKeyService.generate_signature(api_secret, request_data)
|
|
|
- return expected_signature == signature
|
|
|
+ return SignatureGenerator.verify_signature(api_secret, request_data, signature)
|
|
|
|
|
|
@staticmethod
|
|
|
async def create_api_key(
|