controller.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from typing import Annotated
  2. from fastapi import APIRouter, Depends
  3. from fastapi.responses import PlainTextResponse
  4. from redis.asyncio.client import Redis
  5. from sqlalchemy.ext.asyncio import AsyncSession
  6. from app.api.v1.module_system.auth.schema import AuthSchema
  7. from app.core.dependencies import redis_getter, db_getter
  8. from app.core.logger import log
  9. from app.core.router_class import OperationLogRoute
  10. from .schemas import AlipayNotifyBase, parse_alipay_notify_form
  11. from .service import NotificationService
  12. NotificationRouter = APIRouter(
  13. route_class=OperationLogRoute,
  14. prefix="/notify",
  15. tags=["支付宝消息通知"],
  16. )
  17. @NotificationRouter.post(
  18. "/alipay",
  19. summary="支付宝消息通知",
  20. description="接收支付宝开放平台的各类通知消息",
  21. responses={
  22. 200: {"description": "处理结果", "content": {"text/plain": {"example": "success"}}}
  23. },
  24. )
  25. async def alipay_notify_controller(
  26. notify_data: Annotated[AlipayNotifyBase, Depends(parse_alipay_notify_form)],
  27. redis: Annotated[Redis, Depends(redis_getter)],
  28. db: Annotated[AsyncSession, Depends(db_getter)],
  29. ) -> PlainTextResponse:
  30. """
  31. 支付宝消息通知接收入口
  32. 返回 success 表示处理成功,支付宝将停止重试投递
  33. 返回 fail 表示处理失败,支付宝将按照投递策略重试
  34. 投递重试策略:一般情况下,25 小时以内完成 8 次通知,除了第一次是实时投递外,
  35. 后续的每次重试都会间隔一段时间,间隔频率一般是:2m、10m、10m、1h、2h、6h、15h
  36. (第二次消息投递是在第一次投递失败后的 2 分钟;
  37. 第三次投递是在第二次投递失败后的 10 分钟,以此类推)
  38. """
  39. log.info(f"收到支付宝通知: msg_method={notify_data.msg_method}, notify_id={notify_data.notify_id}")
  40. try:
  41. success = await NotificationService.verify_and_dispatch(notify_data, redis, AuthSchema(db=db))
  42. except Exception as e:
  43. log.error(f"支付宝通知消息 - 处理异常: notify_id={notify_data.notify_id}, error={e}")
  44. return PlainTextResponse(content="fail")
  45. return PlainTextResponse(content="success" if success else "fail")
  46. @NotificationRouter.get(
  47. "/health",
  48. summary="健康检查",
  49. description="检查服务是否正常",
  50. responses={
  51. 200: {"description": "服务正常"},
  52. 500: {"description": "服务异常"},
  53. },
  54. )
  55. async def health_controller() -> PlainTextResponse:
  56. return PlainTextResponse(content="ok alipay")