controller.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. from __future__ import annotations
  2. from typing import Any
  3. from fastapi import APIRouter, Request
  4. from fastapi.responses import JSONResponse
  5. from sqlalchemy import text
  6. from app.common.constant import RET
  7. from app.common.response import ErrorResponse, ResponseSchema, SuccessResponse
  8. from app.config.setting import settings
  9. from app.core.database import async_db_session
  10. HealthRouter = APIRouter(prefix="/health", tags=["健康检查"])
  11. @HealthRouter.get(
  12. "/",
  13. summary="存活探针(Liveness)",
  14. description="进程已启动即可返回 200;不探测外部依赖,供 K8s livenessProbe 使用。",
  15. response_model=ResponseSchema[dict],
  16. )
  17. async def health_check() -> JSONResponse:
  18. """轻量存活检查:避免在依赖故障时误杀进程。"""
  19. return SuccessResponse(data=True, msg="系统健康")
  20. @HealthRouter.get(
  21. "/ready/",
  22. summary="就绪探针(Readiness)",
  23. description="探测数据库与 Redis;任一项失败返回 503,供 K8s readinessProbe 摘除流量。",
  24. response_model=ResponseSchema[dict[str, Any]],
  25. )
  26. async def readiness_check(request: Request) -> JSONResponse:
  27. """
  28. 就绪检查:启动阶段已通过 lifespan 连接 Redis,此处做周期性轻量 ping。
  29. 数据库使用 ``SELECT 1``,避免依赖具体表结构。
  30. """
  31. checks: dict[str, bool | None] = {"database": None, "redis": None}
  32. detail_errors: list[str] = []
  33. db_ok = False
  34. if settings.SQL_DB_ENABLE:
  35. try:
  36. async with async_db_session() as session:
  37. async with session.begin():
  38. await session.execute(text("SELECT 1"))
  39. checks["database"] = True
  40. db_ok = True
  41. except Exception as e:
  42. checks["database"] = False
  43. detail_errors.append(f"database:{e!s}")
  44. else:
  45. checks["database"] = False
  46. detail_errors.append("database:SQL_DB_ENABLE is false")
  47. redis_ok = True
  48. if settings.REDIS_ENABLE:
  49. redis_ok = False
  50. try:
  51. rd = getattr(request.app.state, "redis", None)
  52. if rd is not None:
  53. await rd.ping() # type: ignore[misc]
  54. checks["redis"] = True
  55. redis_ok = True
  56. else:
  57. checks["redis"] = False
  58. detail_errors.append("redis:app.state.redis missing")
  59. except Exception as e:
  60. checks["redis"] = False
  61. detail_errors.append(f"redis:{e!s}")
  62. else:
  63. checks["redis"] = None
  64. all_ok = db_ok and redis_ok and not detail_errors
  65. payload: dict[str, Any] = {"checks": checks, "errors": detail_errors or None}
  66. if all_ok:
  67. return SuccessResponse(data=payload, msg="依赖就绪")
  68. return ErrorResponse(
  69. data=payload,
  70. msg="依赖未就绪",
  71. code=RET.SERVICE_UNAVAILABLE.code,
  72. status_code=503,
  73. success=False,
  74. )