from typing import Annotated from fastapi import APIRouter, Depends from fastapi.responses import PlainTextResponse from redis.asyncio.client import Redis from sqlalchemy.ext.asyncio import AsyncSession from app.api.v1.module_system.auth.schema import AuthSchema from app.core.dependencies import redis_getter, db_getter from app.core.logger import log from app.core.router_class import OperationLogRoute from .schemas import AlipayNotifyBase, parse_alipay_notify_form from .service import NotificationService NotificationRouter = APIRouter( route_class=OperationLogRoute, prefix="/notify", tags=["支付宝消息通知"], ) @NotificationRouter.post( "/alipay", summary="支付宝消息通知", description="接收支付宝开放平台的各类通知消息", responses={ 200: {"description": "处理结果", "content": {"text/plain": {"example": "success"}}} }, ) async def alipay_notify_controller( notify_data: Annotated[AlipayNotifyBase, Depends(parse_alipay_notify_form)], redis: Annotated[Redis, Depends(redis_getter)], db: Annotated[AsyncSession, Depends(db_getter)], ) -> PlainTextResponse: """ 支付宝消息通知接收入口 返回 success 表示处理成功,支付宝将停止重试投递 返回 fail 表示处理失败,支付宝将按照投递策略重试 投递重试策略:一般情况下,25 小时以内完成 8 次通知,除了第一次是实时投递外, 后续的每次重试都会间隔一段时间,间隔频率一般是:2m、10m、10m、1h、2h、6h、15h (第二次消息投递是在第一次投递失败后的 2 分钟; 第三次投递是在第二次投递失败后的 10 分钟,以此类推) """ log.info(f"收到支付宝通知: msg_method={notify_data.msg_method}, notify_id={notify_data.notify_id}") try: success = await NotificationService.verify_and_dispatch(notify_data, redis, AuthSchema(db=db)) except Exception as e: log.error(f"支付宝通知消息 - 处理异常: notify_id={notify_data.notify_id}, error={e}") return PlainTextResponse(content="fail") return PlainTextResponse(content="success" if success else "fail") @NotificationRouter.get( "/health", summary="健康检查", description="检查服务是否正常", responses={ 200: {"description": "服务正常"}, 500: {"description": "服务异常"}, }, ) async def health_controller() -> PlainTextResponse: return PlainTextResponse(content="ok alipay")