service.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. from typing import Any
  2. from app.core.logger import log
  3. from app.core.alipay.config import get_alipay_config
  4. from app.core.redis_crud import RedisCURD
  5. from redis.asyncio.client import Redis
  6. from app.api.v1.module_system.auth.schema import AuthSchema
  7. from app.plugin.module_payment.notification.crud import AlipayNotifyLogCRUD
  8. from .schemas import AlipayNotifyBase
  9. from .enums import AlipayNotifyMethodEnum
  10. from .handlers import (
  11. BaseHandler,
  12. EnterpriseHandler,
  13. EmployeeHandler,
  14. BillHandler,
  15. VoucherHandler,
  16. AccountHandler,
  17. )
  18. class NotificationService:
  19. """通知消息分发服务"""
  20. _handlers: dict[str, type[BaseHandler]] = {
  21. AlipayNotifyMethodEnum.ENTERPRISE_CHANGE.value: EnterpriseHandler,
  22. AlipayNotifyMethodEnum.EMPLOYEE_CHANGE.value: EmployeeHandler,
  23. AlipayNotifyMethodEnum.CONSUME_CHANGE.value: BillHandler,
  24. AlipayNotifyMethodEnum.VOUCHER_CHANGE.value: VoucherHandler,
  25. AlipayNotifyMethodEnum.TRANS_AUTHORIZE_NOTIFY.value: AccountHandler,
  26. AlipayNotifyMethodEnum.FUND_CHANGE_NOTIFY.value: AccountHandler,
  27. }
  28. # Redis 配置
  29. _notify_cache_prefix = "alipay:notify:"
  30. _notify_cache_ttl = 604800 # 7天
  31. @classmethod
  32. async def dispatch(cls, msg_method: str, content: dict[str, Any], auth: AuthSchema, redis: Redis) -> bool:
  33. """
  34. 分发通知消息到对应的处理器
  35. Args:
  36. msg_method: 消息方法名
  37. content: 通知内容
  38. auth: 认证信息
  39. Returns:
  40. 处理是否成功
  41. """
  42. handler_class = cls._handlers.get(msg_method)
  43. if not handler_class:
  44. log.warning(f"支付宝通知消息未注册的消息处理器: {msg_method}")
  45. return True
  46. handler = handler_class()
  47. try:
  48. result = await handler.handle(method=msg_method, content=content, auth=auth, redis=redis)
  49. log.info(f"支付宝通知消息 - 处理完成: {msg_method}, result={result}")
  50. return result
  51. except Exception as e:
  52. log.error(f"支付宝通知消息 - 处理异常: {msg_method}, error={e}")
  53. return False
  54. @classmethod
  55. def verify_sign(cls, notify_data: AlipayNotifyBase) -> bool:
  56. """
  57. 验签支付宝通知
  58. Args:
  59. notify_data: 支付宝通知数据
  60. Returns:
  61. 验签是否通过
  62. """
  63. if not notify_data.sign:
  64. log.error("支付宝通知消息 - 验签失败: 缺少 sign 参数")
  65. return False
  66. if not notify_data.biz_content:
  67. log.error("支付宝通知消息 - 验签失败: 缺少 biz_content 参数")
  68. return False
  69. try:
  70. alipay_public_key = get_alipay_config().alipay_public_key
  71. if not alipay_public_key:
  72. log.error("支付宝通知消息 - 验签失败: 缺少支付宝公钥配置")
  73. return False
  74. from alipay.aop.api.util.SignatureUtils import verify_with_rsa
  75. import urllib.parse
  76. # 1. 构建参数字典
  77. # 2. 移除 sign、sign_type 参数
  78. params = notify_data.model_dump(exclude={"sign", "sign_type"}, exclude_none=True)
  79. # 3. 对参数值进行 url_decode
  80. decoded_params = {}
  81. for key, value in params.items():
  82. if isinstance(value, str):
  83. decoded_params[key] = urllib.parse.unquote(value)
  84. else:
  85. decoded_params[key] = value
  86. # 4. 按字典序排序
  87. sorted_keys = sorted(decoded_params.keys())
  88. # 5. 构建待验签字符串
  89. sign_str = '&'.join([f"{key}={decoded_params[key]}" for key in sorted_keys])
  90. log.debug(f"支付宝通知消息 - 待验签字符串: {sign_str}")
  91. verify_result = verify_with_rsa(
  92. alipay_public_key,
  93. sign_str.encode(get_alipay_config().charset),
  94. notify_data.sign
  95. )
  96. if not verify_result:
  97. log.warning(f"支付宝通知消息 - 验签失败: sign={notify_data.sign[:20]}..., sign_type={notify_data.sign_type}")
  98. return verify_result
  99. except Exception as e:
  100. log.error(f"支付宝通知消息 - 验签异常: {e}")
  101. return False
  102. @classmethod
  103. async def verify_and_dispatch(cls, notify_data: AlipayNotifyBase, redis: Redis, auth: AuthSchema) -> bool:
  104. """
  105. 验签并分发通知消息
  106. Args:
  107. notify_data: 支付宝通知数据
  108. redis: Redis客户端对象
  109. auth: 认证信息
  110. Returns:
  111. 处理是否成功
  112. """
  113. # 1. 先验签
  114. if not cls.verify_sign(notify_data):
  115. return False
  116. # 2. 检查是否已处理过该通知(幂等处理)
  117. notify_id = notify_data.notify_id
  118. if await cls._is_notify_processed(redis, notify_id):
  119. log.warning(f"支付宝通知消息 - 通知已处理: notify_id={notify_id}")
  120. return True
  121. # 记录日志
  122. await AlipayNotifyLogService.log_notify_service(auth, notify_data.model_dump())
  123. # 3. 解析 biz_content
  124. biz_content = notify_data.parse_biz_content()
  125. log.debug(f"支付宝通知消息 - 消息内容: {biz_content}")
  126. # 4. 分发处理
  127. success = await cls.dispatch(notify_data.method, biz_content, auth, redis)
  128. # 5. 处理成功后记录
  129. if success:
  130. await cls._mark_notify_processed(redis, notify_id)
  131. return success
  132. @classmethod
  133. async def _is_notify_processed(cls, redis: Redis, notify_id: str) -> bool:
  134. """
  135. 检查通知是否已处理(幂等处理)
  136. Args:
  137. redis: Redis客户端对象
  138. notify_id: 通知ID
  139. Returns:
  140. 是否已处理
  141. """
  142. key = f"{cls._notify_cache_prefix}{notify_id}"
  143. return await RedisCURD(redis).exists(key)
  144. @classmethod
  145. async def _mark_notify_processed(cls, redis: Redis, notify_id: str) -> None:
  146. """
  147. 标记通知已处理
  148. Args:
  149. redis: Redis客户端对象
  150. notify_id: 通知ID
  151. """
  152. key = f"{cls._notify_cache_prefix}{notify_id}"
  153. await RedisCURD(redis).set(key, "1", expire=cls._notify_cache_ttl)
  154. class AlipayNotifyLogService:
  155. """支付宝通知日志服务"""
  156. @classmethod
  157. async def log_notify_service(
  158. cls, auth: AuthSchema, message: dict, verify_result: bool = True
  159. ) -> bool:
  160. """
  161. 记录支付宝通知
  162. Args:
  163. auth: 认证信息
  164. message: 通知数据(字典格式)
  165. verify_result: 验签结果
  166. process_result: 处理结果
  167. error: 错误信息
  168. Returns:
  169. bool: 记录是否成功
  170. """
  171. try:
  172. # 构建日志数据
  173. log_data = {
  174. "notify_id": message.get('notify_id'),
  175. "msg_method": message.get('msg_method'),
  176. "notify_type": message.get('notify_type'),
  177. "message": message,
  178. "verify_result": verify_result,
  179. "process_result": None,
  180. "error": None,
  181. }
  182. # 保存到数据库
  183. crud = AlipayNotifyLogCRUD(auth)
  184. await crud.create(log_data)
  185. log.info(f"记录支付宝通知: {message.get('msg_method')} - {message.get('notify_id')}")
  186. return True
  187. except Exception as e:
  188. log.error(f"记录支付宝通知失败: {e}")
  189. return False