service.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. from typing import Any
  2. from app.core.logger import log
  3. from app.core.alipay.config import get_alipay_config
  4. from app.core.redis_crud import RedisCURD
  5. from redis.asyncio.client import Redis
  6. from app.api.v1.module_system.auth.schema import AuthSchema
  7. from app.plugin.module_payment.notification.crud import AlipayNotifyLogCRUD
  8. from .schemas import AlipayNotifyBase
  9. from .enums import AlipayNotifyMethodEnum
  10. from .handlers import (
  11. AccountHandler,
  12. BaseHandler,
  13. BillHandler,
  14. EmployeeHandler,
  15. EnterpriseHandler,
  16. InstitutionHandler,
  17. OrderHandler,
  18. VoucherHandler,
  19. )
  20. class NotificationService:
  21. """通知消息分发服务"""
  22. _handlers: dict[str, type[BaseHandler]] = {
  23. AlipayNotifyMethodEnum.ENTERPRISE_CHANGE.value: EnterpriseHandler,
  24. AlipayNotifyMethodEnum.EMPLOYEE_CHANGE.value: EmployeeHandler,
  25. AlipayNotifyMethodEnum.CONSUME_CHANGE.value: BillHandler,
  26. AlipayNotifyMethodEnum.VOUCHER_CHANGE.value: VoucherHandler,
  27. # AlipayNotifyMethodEnum.INVOICE_ORDER_CHANGE.value: OrderHandler,
  28. AlipayNotifyMethodEnum.TRANS_AUTHORIZE_NOTIFY.value: AccountHandler,
  29. AlipayNotifyMethodEnum.FUND_CHANGE_NOTIFY.value: AccountHandler,
  30. AlipayNotifyMethodEnum.INSTITUTION_OPERATION.value: InstitutionHandler,
  31. }
  32. # Redis 配置
  33. _notify_cache_prefix = "alipay:notify:"
  34. _notify_cache_ttl = 604800 # 7天
  35. @classmethod
  36. async def dispatch(cls, msg_method: str, content: dict[str, Any], auth: AuthSchema, redis: Redis) -> bool:
  37. """
  38. 分发通知消息到对应的处理器
  39. Args:
  40. msg_method: 消息方法名
  41. content: 通知内容
  42. auth: 认证信息
  43. Returns:
  44. 处理是否成功
  45. """
  46. handler_class = cls._handlers.get(msg_method)
  47. if not handler_class:
  48. log.warning(f"支付宝通知消息未注册的消息处理器: {msg_method}")
  49. return True
  50. handler = handler_class()
  51. try:
  52. result = await handler.handle(method=msg_method, content=content, auth=auth, redis=redis)
  53. log.info(f"支付宝通知消息 - 处理完成: {msg_method}, result={result}")
  54. return result
  55. except Exception as e:
  56. log.error(f"支付宝通知消息 - 处理异常: {msg_method}, error={e}")
  57. return False
  58. @classmethod
  59. def verify_sign(cls, notify_data: AlipayNotifyBase) -> bool:
  60. """
  61. 验签支付宝通知
  62. Args:
  63. notify_data: 支付宝通知数据
  64. Returns:
  65. 验签是否通过
  66. """
  67. if not notify_data.sign:
  68. log.error("支付宝通知消息 - 验签失败: 缺少 sign 参数")
  69. return False
  70. if not notify_data.biz_content:
  71. log.error("支付宝通知消息 - 验签失败: 缺少 biz_content 参数")
  72. return False
  73. try:
  74. alipay_public_key = get_alipay_config().alipay_public_key
  75. if not alipay_public_key:
  76. log.error("支付宝通知消息 - 验签失败: 缺少支付宝公钥配置")
  77. return False
  78. from alipay.aop.api.util.SignatureUtils import verify_with_rsa
  79. import urllib.parse
  80. # 1. 构建参数字典
  81. # 2. 移除 sign、sign_type 参数
  82. params = notify_data.model_dump(exclude={"sign", "sign_type"}, exclude_none=True)
  83. # 3. 对参数值进行 url_decode
  84. decoded_params = {}
  85. for key, value in params.items():
  86. if isinstance(value, str):
  87. decoded_params[key] = urllib.parse.unquote(value)
  88. else:
  89. decoded_params[key] = value
  90. # 4. 按字典序排序
  91. sorted_keys = sorted(decoded_params.keys())
  92. # 5. 构建待验签字符串
  93. sign_str = '&'.join([f"{key}={decoded_params[key]}" for key in sorted_keys])
  94. log.debug(f"支付宝通知消息 - 待验签字符串: {sign_str}")
  95. verify_result = verify_with_rsa(
  96. alipay_public_key,
  97. sign_str.encode(get_alipay_config().charset),
  98. notify_data.sign
  99. )
  100. if not verify_result:
  101. log.warning(f"支付宝通知消息 - 验签失败: sign={notify_data.sign[:20]}..., sign_type={notify_data.sign_type}")
  102. return verify_result
  103. except Exception as e:
  104. log.error(f"支付宝通知消息 - 验签异常: {e}")
  105. return False
  106. @classmethod
  107. async def verify_and_dispatch(cls, notify_data: AlipayNotifyBase, redis: Redis, auth: AuthSchema) -> bool:
  108. """
  109. 验签并分发通知消息
  110. Args:
  111. notify_data: 支付宝通知数据
  112. redis: Redis客户端对象
  113. auth: 认证信息
  114. Returns:
  115. 处理是否成功
  116. """
  117. # 1. 先验签
  118. if not cls.verify_sign(notify_data):
  119. return False
  120. # 2. 检查是否已处理过该通知(幂等处理)
  121. notify_id = notify_data.notify_id
  122. if await cls._is_notify_processed(redis, notify_id):
  123. log.warning(f"支付宝通知消息 - 通知已处理: notify_id={notify_id}")
  124. return True
  125. # 记录日志
  126. await AlipayNotifyLogService.log_notify_service(auth, notify_data.model_dump())
  127. # 3. 解析 biz_content
  128. biz_content = notify_data.parse_biz_content()
  129. log.debug(f"支付宝通知消息 - 消息内容: {biz_content}")
  130. # 4. 从 enterprise_id 获取正确的 tenant_id
  131. enterprise_id = biz_content.get("enterprise_id", "")
  132. if enterprise_id and auth.tenant_id == -1:
  133. try:
  134. from app.plugin.module_payment.enterprise.model import EnterpriseModel
  135. from sqlalchemy import select
  136. stmt = select(EnterpriseModel.tenant_id).where(EnterpriseModel.enterprise_id == enterprise_id)
  137. result = await auth.db.execute(stmt)
  138. tenant_id = result.scalar_one_or_none()
  139. if tenant_id:
  140. auth.tenant_id = tenant_id
  141. log.debug(f"支付宝通知消息 - 从 enterprise_id 解析 tenant_id={tenant_id}")
  142. except Exception:
  143. log.warning(f"支付宝通知消息 - 解析 tenant_id 失败: enterprise_id={enterprise_id}")
  144. # 5. 分发处理
  145. success = await cls.dispatch(notify_data.method, biz_content, auth, redis)
  146. # 5. 处理成功后记录
  147. if success:
  148. await cls._mark_notify_processed(redis, notify_id)
  149. return success
  150. @classmethod
  151. async def _is_notify_processed(cls, redis: Redis, notify_id: str) -> bool:
  152. """
  153. 检查通知是否已处理(幂等处理)
  154. Args:
  155. redis: Redis客户端对象
  156. notify_id: 通知ID
  157. Returns:
  158. 是否已处理
  159. """
  160. key = f"{cls._notify_cache_prefix}{notify_id}"
  161. return await RedisCURD(redis).exists(key)
  162. @classmethod
  163. async def _mark_notify_processed(cls, redis: Redis, notify_id: str) -> None:
  164. """
  165. 标记通知已处理
  166. Args:
  167. redis: Redis客户端对象
  168. notify_id: 通知ID
  169. """
  170. key = f"{cls._notify_cache_prefix}{notify_id}"
  171. await RedisCURD(redis).set(key, "1", expire=cls._notify_cache_ttl)
  172. class AlipayNotifyLogService:
  173. """支付宝通知日志服务"""
  174. @classmethod
  175. async def log_notify_service(
  176. cls, auth: AuthSchema, message: dict, verify_result: bool = True
  177. ) -> bool:
  178. """
  179. 记录支付宝通知
  180. Args:
  181. auth: 认证信息
  182. message: 通知数据(字典格式)
  183. verify_result: 验签结果
  184. process_result: 处理结果
  185. error: 错误信息
  186. Returns:
  187. bool: 记录是否成功
  188. """
  189. try:
  190. # 构建日志数据
  191. log_data = {
  192. "notify_id": message.get('notify_id'),
  193. "msg_method": message.get('msg_method'),
  194. "notify_type": message.get('notify_type'),
  195. "message": message,
  196. "verify_result": verify_result,
  197. "process_result": None,
  198. "error": None,
  199. }
  200. # 保存到数据库
  201. crud = AlipayNotifyLogCRUD(auth)
  202. await crud.create(log_data)
  203. log.info(f"记录支付宝通知: {message.get('msg_method')} - {message.get('notify_id')}")
  204. return True
  205. except Exception as e:
  206. log.error(f"记录支付宝通知失败: {e}")
  207. return False