| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- from typing import Any
- from app.core.logger import log
- from app.core.alipay.config import get_alipay_config
- from app.core.redis_crud import RedisCURD
- from redis.asyncio.client import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.plugin.module_payment.notification.crud import AlipayNotifyLogCRUD
- from .schemas import AlipayNotifyBase
- from .enums import AlipayNotifyMethodEnum
- from .handlers import (
- BaseHandler,
- EnterpriseHandler,
- EmployeeHandler,
- BillHandler,
- VoucherHandler,
- AccountHandler,
- )
- class NotificationService:
- """通知消息分发服务"""
- _handlers: dict[str, type[BaseHandler]] = {
- AlipayNotifyMethodEnum.ENTERPRISE_CHANGE.value: EnterpriseHandler,
- AlipayNotifyMethodEnum.EMPLOYEE_CHANGE.value: EmployeeHandler,
- AlipayNotifyMethodEnum.CONSUME_CHANGE.value: BillHandler,
- AlipayNotifyMethodEnum.VOUCHER_CHANGE.value: VoucherHandler,
- AlipayNotifyMethodEnum.TRANS_AUTHORIZE_NOTIFY.value: AccountHandler,
- AlipayNotifyMethodEnum.FUND_CHANGE_NOTIFY.value: AccountHandler,
- }
- # Redis 配置
- _notify_cache_prefix = "alipay:notify:"
- _notify_cache_ttl = 604800 # 7天
- @classmethod
- async def dispatch(cls, msg_method: str, content: dict[str, Any], auth: AuthSchema, redis: Redis) -> bool:
- """
- 分发通知消息到对应的处理器
- Args:
- msg_method: 消息方法名
- content: 通知内容
- auth: 认证信息
- Returns:
- 处理是否成功
- """
- handler_class = cls._handlers.get(msg_method)
- if not handler_class:
- log.warning(f"支付宝通知消息未注册的消息处理器: {msg_method}")
- return True
- handler = handler_class()
- try:
- result = await handler.handle(method=msg_method, content=content, auth=auth, redis=redis)
- log.info(f"支付宝通知消息 - 处理完成: {msg_method}, result={result}")
- return result
- except Exception as e:
- log.error(f"支付宝通知消息 - 处理异常: {msg_method}, error={e}")
- return False
- @classmethod
- def verify_sign(cls, notify_data: AlipayNotifyBase) -> bool:
- """
- 验签支付宝通知
- Args:
- notify_data: 支付宝通知数据
- Returns:
- 验签是否通过
- """
- if not notify_data.sign:
- log.error("支付宝通知消息 - 验签失败: 缺少 sign 参数")
- return False
- if not notify_data.biz_content:
- log.error("支付宝通知消息 - 验签失败: 缺少 biz_content 参数")
- return False
- try:
- alipay_public_key = get_alipay_config().alipay_public_key
- if not alipay_public_key:
- log.error("支付宝通知消息 - 验签失败: 缺少支付宝公钥配置")
- return False
- from alipay.aop.api.util.SignatureUtils import verify_with_rsa
- import urllib.parse
- # 1. 构建参数字典
- # 2. 移除 sign、sign_type 参数
- params = notify_data.model_dump(exclude={"sign", "sign_type"}, exclude_none=True)
- # 3. 对参数值进行 url_decode
- decoded_params = {}
- for key, value in params.items():
- if isinstance(value, str):
- decoded_params[key] = urllib.parse.unquote(value)
- else:
- decoded_params[key] = value
-
- # 4. 按字典序排序
- sorted_keys = sorted(decoded_params.keys())
-
- # 5. 构建待验签字符串
- sign_str = '&'.join([f"{key}={decoded_params[key]}" for key in sorted_keys])
-
- log.debug(f"支付宝通知消息 - 待验签字符串: {sign_str}")
- verify_result = verify_with_rsa(
- alipay_public_key,
- sign_str.encode(get_alipay_config().charset),
- notify_data.sign
- )
- if not verify_result:
- log.warning(f"支付宝通知消息 - 验签失败: sign={notify_data.sign[:20]}..., sign_type={notify_data.sign_type}")
- return verify_result
- except Exception as e:
- log.error(f"支付宝通知消息 - 验签异常: {e}")
- return False
- @classmethod
- async def verify_and_dispatch(cls, notify_data: AlipayNotifyBase, redis: Redis, auth: AuthSchema) -> bool:
- """
- 验签并分发通知消息
- Args:
- notify_data: 支付宝通知数据
- redis: Redis客户端对象
- auth: 认证信息
- Returns:
- 处理是否成功
- """
- # 1. 先验签
- if not cls.verify_sign(notify_data):
- return False
- # 2. 检查是否已处理过该通知(幂等处理)
- notify_id = notify_data.notify_id
- if await cls._is_notify_processed(redis, notify_id):
- log.warning(f"支付宝通知消息 - 通知已处理: notify_id={notify_id}")
- return True
-
- # 记录日志
- await AlipayNotifyLogService.log_notify_service(auth, notify_data.model_dump())
- # 3. 解析 biz_content
- biz_content = notify_data.parse_biz_content()
- log.debug(f"支付宝通知消息 - 消息内容: {biz_content}")
- # 4. 分发处理
- success = await cls.dispatch(notify_data.method, biz_content, auth, redis)
- # 5. 处理成功后记录
- if success:
- await cls._mark_notify_processed(redis, notify_id)
- return success
- @classmethod
- async def _is_notify_processed(cls, redis: Redis, notify_id: str) -> bool:
- """
- 检查通知是否已处理(幂等处理)
- Args:
- redis: Redis客户端对象
- notify_id: 通知ID
- Returns:
- 是否已处理
- """
- key = f"{cls._notify_cache_prefix}{notify_id}"
- return await RedisCURD(redis).exists(key)
- @classmethod
- async def _mark_notify_processed(cls, redis: Redis, notify_id: str) -> None:
- """
- 标记通知已处理
- Args:
- redis: Redis客户端对象
- notify_id: 通知ID
- """
- key = f"{cls._notify_cache_prefix}{notify_id}"
- await RedisCURD(redis).set(key, "1", expire=cls._notify_cache_ttl)
- class AlipayNotifyLogService:
- """支付宝通知日志服务"""
- @classmethod
- async def log_notify_service(
- cls, auth: AuthSchema, message: dict, verify_result: bool = True
- ) -> bool:
- """
- 记录支付宝通知
-
- Args:
- auth: 认证信息
- message: 通知数据(字典格式)
- verify_result: 验签结果
- process_result: 处理结果
- error: 错误信息
-
- Returns:
- bool: 记录是否成功
- """
- try:
- # 构建日志数据
- log_data = {
- "notify_id": message.get('notify_id'),
- "msg_method": message.get('msg_method'),
- "notify_type": message.get('notify_type'),
- "message": message,
- "verify_result": verify_result,
- "process_result": None,
- "error": None,
- }
-
- # 保存到数据库
- crud = AlipayNotifyLogCRUD(auth)
- await crud.create(log_data)
- log.info(f"记录支付宝通知: {message.get('msg_method')} - {message.get('notify_id')}")
- return True
- except Exception as e:
- log.error(f"记录支付宝通知失败: {e}")
- return False
|