database.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from fastapi import FastAPI
  2. from redis import exceptions
  3. from redis.asyncio import Redis
  4. from sqlalchemy import Engine, create_engine
  5. from sqlalchemy.ext.asyncio import (
  6. AsyncEngine,
  7. AsyncSession,
  8. async_sessionmaker,
  9. create_async_engine,
  10. )
  11. from sqlalchemy.orm import sessionmaker
  12. from app.config.setting import settings
  13. from app.core.base_model import MappedBase
  14. from app.core.exceptions import CustomException
  15. from app.core.logger import log
  16. def create_engine_and_session(
  17. db_url: str = settings.DB_URI,
  18. ) -> tuple[Engine, sessionmaker]:
  19. """
  20. 创建同步数据库引擎和会话工厂。
  21. 参数:
  22. - db_url (str): 数据库连接URL,默认从配置中获取。
  23. 返回:
  24. - tuple[Engine, sessionmaker]: 同步数据库引擎和会话工厂。
  25. """
  26. try:
  27. if not settings.SQL_DB_ENABLE:
  28. raise CustomException(
  29. msg="请先开启数据库连接",
  30. data="请启用 app/config/setting.py: SQL_DB_ENABLE",
  31. )
  32. # 同步数据库引擎
  33. engine: Engine = create_engine(
  34. url=db_url,
  35. echo=settings.DATABASE_ECHO,
  36. pool_pre_ping=settings.POOL_PRE_PING,
  37. pool_recycle=settings.POOL_RECYCLE,
  38. )
  39. except Exception as e:
  40. log.error(f"❌ 数据库连接失败 {e}")
  41. raise
  42. else:
  43. # 同步数据库会话工厂
  44. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  45. return engine, SessionLocal
  46. def create_async_engine_and_session(
  47. db_url: str = settings.ASYNC_DB_URI,
  48. ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
  49. """
  50. 获取异步数据库会话连接。
  51. 参数:
  52. - db_url (str): 异步数据库 URL,默认取配置项 ASYNC_DB_URI。
  53. 返回:
  54. - tuple[AsyncEngine, async_sessionmaker[AsyncSession]]: 异步数据库引擎和会话工厂。
  55. """
  56. try:
  57. if not settings.SQL_DB_ENABLE:
  58. raise CustomException(
  59. msg="请先开启数据库连接",
  60. data="请启用 app/config/setting.py: SQL_DB_ENABLE",
  61. )
  62. # 异步数据库引擎
  63. if settings.DATABASE_TYPE == "sqlite":
  64. async_engine = create_async_engine(
  65. url=db_url,
  66. echo=settings.DATABASE_ECHO,
  67. echo_pool=settings.ECHO_POOL,
  68. pool_pre_ping=settings.POOL_PRE_PING,
  69. future=settings.FUTURE,
  70. pool_recycle=settings.POOL_RECYCLE,
  71. )
  72. else:
  73. async_engine = create_async_engine(
  74. url=db_url,
  75. echo=settings.DATABASE_ECHO,
  76. echo_pool=settings.ECHO_POOL,
  77. pool_pre_ping=settings.POOL_PRE_PING,
  78. future=settings.FUTURE,
  79. pool_recycle=settings.POOL_RECYCLE,
  80. pool_size=settings.POOL_SIZE,
  81. max_overflow=settings.MAX_OVERFLOW,
  82. pool_timeout=settings.POOL_TIMEOUT,
  83. pool_use_lifo=settings.POOL_USE_LIFO,
  84. )
  85. except Exception as e:
  86. log.error(f"❌ 数据库连接失败 {e}")
  87. raise
  88. else:
  89. # 异步数据库会话工厂
  90. AsyncSessionLocal = async_sessionmaker(
  91. bind=async_engine,
  92. autocommit=settings.AUTOCOMMIT,
  93. autoflush=settings.AUTOFETCH,
  94. expire_on_commit=settings.EXPIRE_ON_COMMIT,
  95. class_=AsyncSession,
  96. )
  97. return async_engine, AsyncSessionLocal
  98. engine, db_session = create_engine_and_session(settings.DB_URI)
  99. async_engine, async_db_session = create_async_engine_and_session(settings.ASYNC_DB_URI)
  100. async def create_tables() -> None:
  101. """
  102. 创建数据库表(根据 ORM metadata)。
  103. 返回:
  104. - None
  105. """
  106. async with async_engine.begin() as coon:
  107. await coon.run_sync(MappedBase.metadata.create_all)
  108. async def drop_tables() -> None:
  109. """
  110. 删除数据库表(根据 ORM metadata)。
  111. 返回:
  112. - None
  113. """
  114. async with async_engine.begin() as conn:
  115. await conn.run_sync(MappedBase.metadata.drop_all)
  116. async def redis_connect(app: FastAPI, status: str) -> Redis | None:
  117. """
  118. 创建或关闭Redis连接。
  119. 参数:
  120. - app (FastAPI): FastAPI应用实例。
  121. - status (bool): 连接状态,True为创建连接,False为关闭连接。
  122. 返回:
  123. - Redis | None: Redis连接实例,如果连接失败则返回None。
  124. """
  125. if not settings.REDIS_ENABLE:
  126. raise CustomException(
  127. msg="请先开启Redis连接",
  128. data="请启用 app/core/config.py: REDIS_ENABLE",
  129. )
  130. if status:
  131. try:
  132. rd = await Redis.from_url(
  133. url=settings.REDIS_URI,
  134. encoding="utf-8",
  135. decode_responses=True,
  136. health_check_interval=20,
  137. max_connections=settings.POOL_SIZE,
  138. socket_timeout=settings.POOL_TIMEOUT,
  139. )
  140. app.state.redis = rd
  141. if await rd.ping(): # pyright: ignore[reportGeneralTypeIssues]
  142. return rd
  143. except exceptions.AuthenticationError as e:
  144. log.error(f"❌ 数据库 Redis 认证失败: {e}")
  145. raise
  146. except exceptions.TimeoutError as e:
  147. log.error(f"❌ 数据库 Redis 连接超时: {e}")
  148. raise
  149. except exceptions.RedisError as e:
  150. log.error(f"❌ 数据库 Redis 连接错误: {e}")
  151. raise
  152. else:
  153. await app.state.redis.close()
  154. log.info("✅️ Redis连接已关闭")