service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import hashlib
  2. import hmac
  3. import json
  4. import secrets
  5. import time
  6. import urllib.parse
  7. from datetime import datetime, timedelta
  8. from typing import Any, Optional
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from app.plugin.module_payment.apikey.crud import TenantApiKeyCRUD, TenantApiLogCRUD
  11. from app.plugin.module_payment.apikey.model import TenantApiKeyModel
  12. from app.plugin.module_payment.apikey.schema import TenantApiKeyListResponse, ApiKeyQueryParam
  13. class SignatureGenerator:
  14. """签名生成器 - 参考支付宝/微信支付标准"""
  15. @staticmethod
  16. def generate_signature(
  17. api_secret: str,
  18. request_data: dict,
  19. exclude_params: tuple = ("sign",),
  20. ) -> str:
  21. """
  22. 生成签名 - 参考支付宝/微信支付签名规范
  23. Args:
  24. api_secret: API密钥
  25. request_data: 请求参数字典
  26. sign_param_name: 签名参数名(不参与签名)
  27. exclude_params: 需要排除的参数列表
  28. Returns:
  29. 签名字符串
  30. """
  31. # 1. 过滤参数
  32. filtered_data = {}
  33. for key, value in request_data.items():
  34. if key in exclude_params:
  35. continue
  36. if value is None or value == "":
  37. continue
  38. if isinstance(value, (list, dict)) and len(value) == 0:
  39. continue
  40. filtered_data[key] = value
  41. # 2. 字典序排序 (ASCII)
  42. sorted_data = sorted(filtered_data.items(), key=lambda x: x[0])
  43. collect = []
  44. for key, value in sorted_data:
  45. if isinstance(value, (dict, list)):
  46. value = json.dumps(value, sort_keys=True, separators=(",", ":"))
  47. encoded_value = urllib.parse.quote(str(value), safe="")
  48. collect.append(f"{key}={encoded_value}")
  49. sign_str = "&".join(collect)
  50. signature = hmac.new(
  51. api_secret.encode("utf-8"),
  52. sign_str.encode("utf-8"),
  53. hashlib.sha256,
  54. ).hexdigest()
  55. return signature
  56. @staticmethod
  57. def verify_signature(
  58. api_secret: str,
  59. request_data: dict,
  60. signature: str,
  61. ) -> bool:
  62. """验证签名"""
  63. expected_signature = SignatureGenerator.generate_signature(
  64. api_secret, request_data
  65. )
  66. return expected_signature == signature
  67. class TenantApiKeyService:
  68. """租户API Key服务"""
  69. @staticmethod
  70. def generate_api_key(tenant_id: int) -> tuple[str, str]:
  71. random_part = secrets.token_hex(32)
  72. timestamp = str(int(time.time()))
  73. secure_number = secrets.randbelow(9000) + 1000
  74. api_key = f"{secure_number}{tenant_id}{timestamp}{random_part[:16]}"
  75. api_secret = secrets.token_hex(64)
  76. return api_key, api_secret
  77. @staticmethod
  78. def generate_signature(api_secret: str, request_data: dict) -> str:
  79. return SignatureGenerator.generate_signature(api_secret, request_data)
  80. @staticmethod
  81. def verify_signature(api_secret: str, request_data: dict, signature: str) -> bool:
  82. return SignatureGenerator.verify_signature(api_secret, request_data, signature)
  83. @staticmethod
  84. async def create_api_key(
  85. auth: AuthSchema,
  86. tenant_id: int,
  87. expired_days: Optional[int] = 365,
  88. description: Optional[str] = None,
  89. ) -> TenantApiKeyModel:
  90. api_key, api_secret = TenantApiKeyService.generate_api_key(tenant_id)
  91. return await TenantApiKeyCRUD(auth).create_crud(
  92. api_key=api_key,
  93. api_secret=api_secret,
  94. tenant_id=tenant_id,
  95. expired_at=datetime.now() + timedelta(days=expired_days or 365),
  96. description=description,
  97. )
  98. @staticmethod
  99. async def validate_api_key(auth: AuthSchema, api_key: str) -> Optional[TenantApiKeyModel]:
  100. return await TenantApiKeyCRUD(auth).get_by_api_key(api_key)
  101. @staticmethod
  102. async def get_api_key_page_service(
  103. auth: AuthSchema,
  104. page_no: int,
  105. page_size: int,
  106. search: Optional[ApiKeyQueryParam] = None,
  107. order_by: Optional[list[dict[str, str]]] = None,
  108. ) -> dict:
  109. offset = (page_no - 1) * page_size
  110. return await TenantApiKeyCRUD(auth).page(
  111. offset=offset,
  112. limit=page_size,
  113. order_by=order_by or [{"created_time": "desc"}],
  114. search=search.__dict__ if search else {},
  115. out_schema=TenantApiKeyListResponse,
  116. )
  117. @staticmethod
  118. async def update_api_key_status(
  119. auth: AuthSchema,
  120. api_key_id: int,
  121. status: str,
  122. ) -> Optional[TenantApiKeyModel]:
  123. return await TenantApiKeyCRUD(auth).update_status_crud(api_key_id=api_key_id, status=status)
  124. @staticmethod
  125. async def delete_api_key(auth: AuthSchema, api_key_id: int) -> None:
  126. await TenantApiKeyCRUD(auth).delete_crud(api_key_id=api_key_id)
  127. @staticmethod
  128. async def log_api_call(
  129. auth: AuthSchema,
  130. api_key_id: Optional[int],
  131. tenant_id: int,
  132. endpoint: str,
  133. method: str,
  134. request_ip: str,
  135. request_data: Optional[dict],
  136. response_code: int,
  137. start_time: float,
  138. ) -> None:
  139. response_time = (time.time() - start_time) * 1000
  140. await TenantApiLogCRUD(auth).create_crud(
  141. api_key_id=api_key_id,
  142. tenant_id=tenant_id,
  143. endpoint=endpoint,
  144. method=method,
  145. request_ip=request_ip,
  146. request_data=str(request_data) if request_data else None,
  147. response_code=response_code,
  148. response_time=response_time,
  149. )
  150. @staticmethod
  151. async def update_last_used(auth: AuthSchema, api_key_id: int) -> None:
  152. await TenantApiKeyCRUD(auth).update_last_used_crud(api_key_id)