from typing import Any from app.core.logger import log from app.core.alipay.config import get_alipay_config from app.core.redis_crud import RedisCURD from redis.asyncio.client import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.plugin.module_payment.notification.crud import AlipayNotifyLogCRUD from .schemas import AlipayNotifyBase from .enums import AlipayNotifyMethodEnum from .handlers import ( AccountHandler, BaseHandler, BillHandler, EmployeeHandler, EnterpriseHandler, InstitutionHandler, OrderHandler, VoucherHandler, ) class NotificationService: """通知消息分发服务""" _handlers: dict[str, type[BaseHandler]] = { AlipayNotifyMethodEnum.ENTERPRISE_CHANGE.value: EnterpriseHandler, AlipayNotifyMethodEnum.EMPLOYEE_CHANGE.value: EmployeeHandler, AlipayNotifyMethodEnum.CONSUME_CHANGE.value: BillHandler, AlipayNotifyMethodEnum.VOUCHER_CHANGE.value: VoucherHandler, # AlipayNotifyMethodEnum.INVOICE_ORDER_CHANGE.value: OrderHandler, AlipayNotifyMethodEnum.TRANS_AUTHORIZE_NOTIFY.value: AccountHandler, AlipayNotifyMethodEnum.FUND_CHANGE_NOTIFY.value: AccountHandler, AlipayNotifyMethodEnum.INSTITUTION_OPERATION.value: InstitutionHandler, } # Redis 配置 _notify_cache_prefix = "alipay:notify:" _notify_cache_ttl = 604800 # 7天 @classmethod async def dispatch(cls, msg_method: str, content: dict[str, Any], auth: AuthSchema, redis: Redis) -> bool: """ 分发通知消息到对应的处理器 Args: msg_method: 消息方法名 content: 通知内容 auth: 认证信息 Returns: 处理是否成功 """ handler_class = cls._handlers.get(msg_method) if not handler_class: log.warning(f"支付宝通知消息未注册的消息处理器: {msg_method}") return True handler = handler_class() try: result = await handler.handle(method=msg_method, content=content, auth=auth, redis=redis) log.info(f"支付宝通知消息 - 处理完成: {msg_method}, result={result}") return result except Exception as e: log.error(f"支付宝通知消息 - 处理异常: {msg_method}, error={e}") return False @classmethod def verify_sign(cls, notify_data: AlipayNotifyBase) -> bool: """ 验签支付宝通知 Args: notify_data: 支付宝通知数据 Returns: 验签是否通过 """ if not notify_data.sign: log.error("支付宝通知消息 - 验签失败: 缺少 sign 参数") return False if not notify_data.biz_content: log.error("支付宝通知消息 - 验签失败: 缺少 biz_content 参数") return False try: alipay_public_key = get_alipay_config().alipay_public_key if not alipay_public_key: log.error("支付宝通知消息 - 验签失败: 缺少支付宝公钥配置") return False from alipay.aop.api.util.SignatureUtils import verify_with_rsa import urllib.parse # 1. 构建参数字典 # 2. 移除 sign、sign_type 参数 params = notify_data.model_dump(exclude={"sign", "sign_type"}, exclude_none=True) # 3. 对参数值进行 url_decode decoded_params = {} for key, value in params.items(): if isinstance(value, str): decoded_params[key] = urllib.parse.unquote(value) else: decoded_params[key] = value # 4. 按字典序排序 sorted_keys = sorted(decoded_params.keys()) # 5. 构建待验签字符串 sign_str = '&'.join([f"{key}={decoded_params[key]}" for key in sorted_keys]) log.debug(f"支付宝通知消息 - 待验签字符串: {sign_str}") verify_result = verify_with_rsa( alipay_public_key, sign_str.encode(get_alipay_config().charset), notify_data.sign ) if not verify_result: log.warning(f"支付宝通知消息 - 验签失败: sign={notify_data.sign[:20]}..., sign_type={notify_data.sign_type}") return verify_result except Exception as e: log.error(f"支付宝通知消息 - 验签异常: {e}") return False @classmethod async def verify_and_dispatch(cls, notify_data: AlipayNotifyBase, redis: Redis, auth: AuthSchema) -> bool: """ 验签并分发通知消息 Args: notify_data: 支付宝通知数据 redis: Redis客户端对象 auth: 认证信息 Returns: 处理是否成功 """ # 1. 先验签 if not cls.verify_sign(notify_data): return False # 2. 检查是否已处理过该通知(幂等处理) notify_id = notify_data.notify_id if await cls._is_notify_processed(redis, notify_id): log.warning(f"支付宝通知消息 - 通知已处理: notify_id={notify_id}") return True # 记录日志 await AlipayNotifyLogService.log_notify_service(auth, notify_data.model_dump()) # 3. 解析 biz_content biz_content = notify_data.parse_biz_content() log.debug(f"支付宝通知消息 - 消息内容: {biz_content}") # 4. 从 enterprise_id 获取正确的 tenant_id enterprise_id = biz_content.get("enterprise_id", "") if enterprise_id and auth.tenant_id == -1: try: from app.plugin.module_payment.enterprise.model import EnterpriseModel from sqlalchemy import select stmt = select(EnterpriseModel.tenant_id).where(EnterpriseModel.enterprise_id == enterprise_id) result = await auth.db.execute(stmt) tenant_id = result.scalar_one_or_none() if tenant_id: auth.tenant_id = tenant_id log.debug(f"支付宝通知消息 - 从 enterprise_id 解析 tenant_id={tenant_id}") except Exception: log.warning(f"支付宝通知消息 - 解析 tenant_id 失败: enterprise_id={enterprise_id}") # 5. 分发处理 success = await cls.dispatch(notify_data.method, biz_content, auth, redis) # 5. 处理成功后记录 if success: await cls._mark_notify_processed(redis, notify_id) return success @classmethod async def _is_notify_processed(cls, redis: Redis, notify_id: str) -> bool: """ 检查通知是否已处理(幂等处理) Args: redis: Redis客户端对象 notify_id: 通知ID Returns: 是否已处理 """ key = f"{cls._notify_cache_prefix}{notify_id}" return await RedisCURD(redis).exists(key) @classmethod async def _mark_notify_processed(cls, redis: Redis, notify_id: str) -> None: """ 标记通知已处理 Args: redis: Redis客户端对象 notify_id: 通知ID """ key = f"{cls._notify_cache_prefix}{notify_id}" await RedisCURD(redis).set(key, "1", expire=cls._notify_cache_ttl) class AlipayNotifyLogService: """支付宝通知日志服务""" @classmethod async def log_notify_service( cls, auth: AuthSchema, message: dict, verify_result: bool = True ) -> bool: """ 记录支付宝通知 Args: auth: 认证信息 message: 通知数据(字典格式) verify_result: 验签结果 process_result: 处理结果 error: 错误信息 Returns: bool: 记录是否成功 """ try: # 构建日志数据 log_data = { "notify_id": message.get('notify_id'), "msg_method": message.get('msg_method'), "notify_type": message.get('notify_type'), "message": message, "verify_result": verify_result, "process_result": None, "error": None, } # 保存到数据库 crud = AlipayNotifyLogCRUD(auth) await crud.create(log_data) log.info(f"记录支付宝通知: {message.get('msg_method')} - {message.get('notify_id')}") return True except Exception as e: log.error(f"记录支付宝通知失败: {e}") return False