crud.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. from collections.abc import Sequence
  2. from typing import TYPE_CHECKING
  3. from sqlalchemy import Inspector, inspect, select, text
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.config.setting import settings
  6. from app.core.base_crud import CRUDBase
  7. from app.core.logger import log
  8. from .model import GenTableColumnModel, GenTableModel
  9. from .schema import (
  10. GenDBTableSchema,
  11. GenTableColumnOutSchema,
  12. GenTableColumnSchema,
  13. GenTableQueryParam,
  14. GenTableSchema,
  15. )
  16. if TYPE_CHECKING:
  17. from sqlalchemy.engine.reflection import Inspector
  18. class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]):
  19. """代码生成业务表模块数据库操作层"""
  20. def __init__(self, auth: AuthSchema) -> None:
  21. """
  22. 初始化CRUD操作层
  23. 参数:
  24. - auth (AuthSchema): 认证信息模型
  25. """
  26. super().__init__(model=GenTableModel, auth=auth)
  27. async def get_gen_table_by_id(
  28. self, table_id: int, preload: list | None = None
  29. ) -> GenTableModel | None:
  30. """
  31. 根据业务表ID获取需要生成的业务表信息。
  32. 参数:
  33. - table_id (int): 业务表ID。
  34. - preload (list | None): 预加载关系,未提供时使用模型默认项
  35. 返回:
  36. - GenTableModel | None: 业务表信息对象。
  37. """
  38. return await self.get(id=table_id, preload=preload)
  39. async def get_gen_table_by_name(
  40. self, table_name: str, preload: list | None = None
  41. ) -> GenTableModel | None:
  42. """
  43. 根据业务表名称获取需要生成的业务表信息。
  44. 参数:
  45. - table_name (str): 业务表名称。
  46. - preload (list | None): 预加载关系,未提供时使用模型默认项
  47. 返回:
  48. - GenTableModel | None: 业务表信息对象。
  49. """
  50. return await self.get(table_name=table_name, preload=preload)
  51. async def get_gen_table_all(self, preload: list | None = None) -> Sequence[GenTableModel]:
  52. """
  53. 获取所有业务表信息。
  54. 参数:
  55. - preload (list | None): 预加载关系,未提供时使用模型默认项
  56. 返回:
  57. - Sequence[GenTableModel]: 所有业务表信息列表。
  58. """
  59. return await self.list(preload=preload)
  60. async def get_gen_table_list(
  61. self,
  62. search: GenTableQueryParam | None = None,
  63. preload: list | None = None,
  64. ) -> Sequence[GenTableModel]:
  65. """
  66. 根据查询参数获取代码生成业务表列表信息。
  67. 参数:
  68. - search (GenTableQueryParam | None): 查询参数对象。
  69. - preload (list | None): 预加载关系,未提供时使用模型默认项
  70. 返回:
  71. - Sequence[GenTableModel]: 业务表列表信息。
  72. """
  73. return await self.list(
  74. search=search.__dict__,
  75. order_by=[{"created_time": "desc"}],
  76. preload=preload,
  77. )
  78. async def add_gen_table(self, add_model: GenTableSchema) -> GenTableModel:
  79. """
  80. 新增业务表信息。
  81. 参数:
  82. - add_model (GenTableSchema): 新增业务表信息模型。
  83. 返回:
  84. - GenTableModel: 新增的业务表信息对象。
  85. """
  86. return await self.create(data=add_model)
  87. async def edit_gen_table(self, table_id: int, edit_model: GenTableSchema) -> GenTableModel:
  88. """
  89. 修改业务表信息。
  90. 参数:
  91. - table_id (int): 业务表ID。
  92. - edit_model (GenTableSchema): 修改业务表信息模型。
  93. 返回:
  94. - GenTableSchema: 修改后的业务表信息模型。
  95. """
  96. # 排除嵌套对象字段,避免SQLAlchemy尝试直接将字典设置到模型实例上
  97. return await self.update(
  98. id=table_id,
  99. data=edit_model.model_dump(exclude_unset=True, exclude={"columns"}),
  100. )
  101. async def delete_gen_table(self, ids: list[int]) -> None:
  102. """
  103. 删除业务表信息。除了系统表。
  104. 参数:
  105. - ids (list[int]): 业务表ID列表。
  106. 返回:
  107. - None
  108. """
  109. await self.delete(ids=ids)
  110. async def get_db_table_list(self, search: GenTableQueryParam | None = None) -> list[dict]:
  111. """
  112. 根据查询参数获取数据库表列表信息。
  113. 参数:
  114. - search (GenTableQueryParam | None): 查询参数对象。
  115. 返回:
  116. - list[dict]: 数据库表列表信息(已转为可序列化字典)。
  117. """
  118. database_name = settings.DATABASE_NAME
  119. database_type = settings.DATABASE_TYPE
  120. from app.core.database import engine
  121. inspector: Inspector = inspect(engine)
  122. table_names = inspector.get_table_names()
  123. dict_data = []
  124. for table_name in table_names:
  125. try:
  126. table_comment = inspector.get_table_comment(table_name)
  127. comment = (
  128. table_comment.get("text", "")
  129. if isinstance(table_comment, dict)
  130. else table_comment
  131. )
  132. table_comment = comment or ""
  133. except Exception as e:
  134. log.warning(f"获取表 {table_name} 的注释失败: {e}")
  135. table_comment = ""
  136. # 统一处理 search 为 None 的情况,避免重复判断
  137. if search:
  138. # 表名过滤:忽略大小写,支持模糊匹配
  139. if search.table_name and search.table_name[1] and search.table_name[1].lower() not in table_name.lower():
  140. continue
  141. # 表注释过滤:忽略大小写,支持模糊匹配;table_comment 为 None 时视为空字符串
  142. if search.table_comment and search.table_comment[1] and search.table_comment[1] not in table_comment:
  143. continue
  144. table_info = {
  145. "database_name": database_name,
  146. "table_name": table_name,
  147. "table_type": database_type,
  148. "table_comment": table_comment,
  149. }
  150. dict_data.append(GenDBTableSchema(**table_info).model_dump())
  151. return dict_data
  152. async def get_db_table_page(
  153. self,
  154. search: GenTableQueryParam | None,
  155. offset: int,
  156. limit: int,
  157. ) -> tuple[list[dict], int]:
  158. """数据库侧分页获取物理表列表(用于导入表弹窗)。
  159. 说明:
  160. - 旧实现使用 SQLAlchemy Inspector 全量遍历再内存分页,表多时非常慢。
  161. - 这里按方言走系统表(MySQL information_schema / Postgres pg_catalog)进行分页与过滤。
  162. - 若方言不支持,则回退到旧的全量遍历。
  163. 参数:
  164. - search (GenTableQueryParam | None): 表名/注释过滤条件。
  165. - offset (int): 偏移量。
  166. - limit (int): 每页条数。
  167. 返回:
  168. - tuple[list[dict], int]: 当前页表信息列表与总条数。
  169. """
  170. database_name = settings.DATABASE_NAME
  171. db_type = (settings.DATABASE_TYPE or "").lower()
  172. # 解析 like 关键字(GenTableQueryParam 把字段包装成 ("like", value))
  173. name_kw = None
  174. comment_kw = None
  175. if search:
  176. try:
  177. if search.table_name and search.table_name[1]:
  178. name_kw = str(search.table_name[1]).strip()
  179. if search.table_comment and search.table_comment[1]:
  180. comment_kw = str(search.table_comment[1]).strip()
  181. except Exception:
  182. # 兜底:参数结构异常时忽略过滤
  183. name_kw = None
  184. comment_kw = None
  185. # MySQL / MariaDB
  186. if db_type in {"mysql", "mariadb"}:
  187. where_sql = "WHERE table_schema = :db AND table_type = 'BASE TABLE'"
  188. params: dict = {"db": database_name, "offset": offset, "limit": limit}
  189. if name_kw:
  190. where_sql += " AND table_name LIKE :name_kw"
  191. params["name_kw"] = f"%{name_kw}%"
  192. if comment_kw:
  193. where_sql += " AND table_comment LIKE :comment_kw"
  194. params["comment_kw"] = f"%{comment_kw}%"
  195. count_sql = text(f"SELECT COUNT(1) AS cnt FROM information_schema.tables {where_sql}")
  196. rows_sql = text(
  197. "SELECT table_name, table_comment "
  198. f"FROM information_schema.tables {where_sql} "
  199. "ORDER BY table_name ASC "
  200. "LIMIT :limit OFFSET :offset"
  201. )
  202. total_res = await self.auth.db.execute(count_sql, params)
  203. total = int(total_res.scalar() or 0)
  204. res = await self.auth.db.execute(rows_sql, params)
  205. items: list[dict] = []
  206. for r in res.fetchall():
  207. # r may be Row/tuple depending on driver
  208. table_name = r[0]
  209. table_comment = r[1] or ""
  210. items.append(
  211. GenDBTableSchema(
  212. database_name=database_name,
  213. table_name=table_name,
  214. table_type=settings.DATABASE_TYPE,
  215. table_comment=table_comment,
  216. ).model_dump()
  217. )
  218. return items, total
  219. # PostgreSQL
  220. if db_type in {"postgresql", "postgres"}:
  221. # pg_description 需要通过 objsubid=0 获取 table comment
  222. where_sql = "WHERE n.nspname NOT IN ('pg_catalog','information_schema') AND c.relkind = 'r'"
  223. params = {"offset": offset, "limit": limit}
  224. if name_kw:
  225. where_sql += " AND c.relname ILIKE :name_kw"
  226. params["name_kw"] = f"%{name_kw}%"
  227. if comment_kw:
  228. where_sql += " AND COALESCE(d.description,'') ILIKE :comment_kw"
  229. params["comment_kw"] = f"%{comment_kw}%"
  230. base_from = (
  231. "FROM pg_catalog.pg_class c "
  232. "JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
  233. "LEFT JOIN pg_catalog.pg_description d ON d.objoid = c.oid AND d.objsubid = 0 "
  234. )
  235. count_sql = text(f"SELECT COUNT(1) AS cnt {base_from} {where_sql}")
  236. rows_sql = text(
  237. "SELECT c.relname AS table_name, COALESCE(d.description,'') AS table_comment "
  238. f"{base_from} {where_sql} "
  239. "ORDER BY c.relname ASC "
  240. "LIMIT :limit OFFSET :offset"
  241. )
  242. total_res = await self.auth.db.execute(count_sql, params)
  243. total = int(total_res.scalar() or 0)
  244. res = await self.auth.db.execute(rows_sql, params)
  245. items = []
  246. for r in res.fetchall():
  247. table_name = r[0]
  248. table_comment = r[1] or ""
  249. items.append(
  250. GenDBTableSchema(
  251. database_name=database_name,
  252. table_name=table_name,
  253. table_type=settings.DATABASE_TYPE,
  254. table_comment=table_comment,
  255. ).model_dump()
  256. )
  257. return items, total
  258. # Fallback:回退旧逻辑(全量遍历再分页由上层处理)
  259. all_items = await self.get_db_table_list(search)
  260. total = len(all_items)
  261. return all_items[offset : offset + limit], total
  262. async def get_db_table_list_by_names(self, table_names: list[str]) -> list[GenDBTableSchema]:
  263. """
  264. 根据业务表名称列表获取数据库表信息。
  265. 参数:
  266. - table_names (list[str]): 业务表名称列表。
  267. 返回:
  268. - list[GenDBTableSchema]: 数据库表信息对象列表。
  269. """
  270. # 处理空列表情况
  271. if not table_names:
  272. return []
  273. # 调用get_db_table_list获取所有表信息
  274. all_tables = await self.get_db_table_list()
  275. # 过滤出指定名称的表
  276. table_names_set = set(table_names) # 转换为集合以提高查找效率
  277. filtered_tables = [
  278. GenDBTableSchema(**table)
  279. for table in all_tables
  280. if table["table_name"] in table_names_set
  281. ]
  282. return filtered_tables
  283. async def check_table_exists(self, table_name: str) -> bool:
  284. """
  285. 检查数据库中是否已存在指定表名的表。
  286. 参数:
  287. - table_name (str): 要检查的表名。
  288. 返回:
  289. - bool: 如果表存在返回True,否则返回False。
  290. """
  291. from app.core.database import engine
  292. inspector: Inspector = inspect(engine)
  293. return inspector.has_table(table_name)
  294. async def get_db_table_comment(self, table_name: str) -> str:
  295. """
  296. 获取数据库中指定表的注释(用于主子表场景下从库中加载子表元信息)。
  297. 参数:
  298. - table_name (str): 物理表名。
  299. 返回:
  300. - str: 表注释;表不存在或失败时为空字符串。
  301. """
  302. from app.core.database import engine
  303. inspector: Inspector = inspect(engine)
  304. if not inspector.has_table(table_name):
  305. return ""
  306. try:
  307. table_comment = inspector.get_table_comment(table_name)
  308. comment = (
  309. table_comment.get("text", "")
  310. if isinstance(table_comment, dict)
  311. else (table_comment or "")
  312. )
  313. return comment or ""
  314. except Exception as e:
  315. log.warning(f"获取表 {table_name} 的注释失败: {e}")
  316. return ""
  317. async def execute_sql(self, sql: str) -> bool:
  318. """
  319. 执行SQL语句。
  320. 参数:
  321. - sql (str): 要执行的SQL语句。
  322. 返回:
  323. - bool: 是否执行成功。
  324. """
  325. try:
  326. # 执行SQL但不手动提交事务,由框架管理事务生命周期
  327. await self.auth.db.execute(text(sql))
  328. return True
  329. except Exception as e:
  330. log.error(f"执行SQL时发生错误: {e}")
  331. return False
  332. class GenTableColumnCRUD(CRUDBase[GenTableColumnModel, GenTableColumnSchema, GenTableColumnSchema]):
  333. """代码生成业务表字段模块数据库操作层"""
  334. def __init__(self, auth: AuthSchema) -> None:
  335. """
  336. 初始化CRUD操作层
  337. 参数:
  338. - auth (AuthSchema): 认证信息模型
  339. """
  340. super().__init__(model=GenTableColumnModel, auth=auth)
  341. @staticmethod
  342. def _sync_get_table_columns(database_type: str, table_name: str) -> list[dict]:
  343. """
  344. 同步函数:获取数据库表的列信息
  345. 参数:
  346. - database_type: 数据库类型
  347. - table_name: 表名
  348. 返回:
  349. - list: 列信息列表
  350. """
  351. # 使用SQLAlchemy Inspector获取表列信息
  352. from app.core.database import engine
  353. inspector: Inspector = inspect(engine)
  354. # 获取列信息
  355. columns = inspector.get_columns(table_name)
  356. # 获取主键信息
  357. try:
  358. pk_constraint = inspector.get_pk_constraint(table_name)
  359. primary_keys = (
  360. set(pk_constraint.get("constrained_columns", [])) if pk_constraint else set()
  361. )
  362. except Exception:
  363. primary_keys = set()
  364. # 获取唯一约束信息
  365. unique_columns = set()
  366. unique_constraints = inspector.get_unique_constraints(table_name)
  367. for constraint in unique_constraints:
  368. unique_columns.update(constraint.get("column_names", []))
  369. # 处理列信息
  370. columns_list = []
  371. for idx, column in enumerate(columns):
  372. # 获取列的基本信息
  373. column_name = column["name"]
  374. column_type = str(column["type"])
  375. is_nullable = column.get("nullable", True)
  376. column_default = column.get("default", None)
  377. # 获取列注释(如果有的话)
  378. column_comment = column.get("comment", "")
  379. # 判断是否为主键
  380. is_pk = column_name in primary_keys
  381. # 判断是否为唯一约束
  382. is_unique = column_name in unique_columns
  383. # 判断是否为自增列(基于数据库类型和列类型)
  384. is_increment = column.get("autoincrement", False) in (True, "auto")
  385. # 获取列长度(如果适用)
  386. column_length = None
  387. # 使用getattr安全地获取length属性,避免访问不存在时抛出AttributeError
  388. column_length = getattr(column["type"], "length", None)
  389. if column_length is not None:
  390. column_length = str(getattr(column["type"], "length", ""))
  391. # 构造列信息字典
  392. column_info = {
  393. "column_name": column_name,
  394. "column_comment": column_comment or "",
  395. "column_type": column_type,
  396. "column_length": column_length or "",
  397. "column_default": str(column_default) if column_default is not None else "",
  398. "sort": idx + 1, # 序号从1开始
  399. "is_pk": bool(is_pk),
  400. "is_increment": bool(is_increment),
  401. "is_nullable": bool(is_nullable),
  402. "is_unique": bool(is_unique),
  403. }
  404. columns_list.append(column_info)
  405. return columns_list
  406. async def get_gen_table_column_by_id(
  407. self, id: int, preload: list | None = None
  408. ) -> GenTableColumnModel | None:
  409. """根据业务表字段ID获取业务表字段信息。
  410. 参数:
  411. - id (int): 业务表字段ID。
  412. - preload (list | None): 预加载关系,未提供时使用模型默认项
  413. 返回:
  414. - GenTableColumnModel | None: 业务表字段信息对象。
  415. """
  416. return await self.get(id=id, preload=preload)
  417. async def get_gen_table_column_list_by_table_id(
  418. self, table_id: int, preload: list | None = None
  419. ) -> GenTableColumnModel | None:
  420. """根据业务表ID获取业务表字段列表信息。
  421. 参数:
  422. - table_id (int): 业务表ID。
  423. - preload (list | None): 预加载关系,未提供时使用模型默认项
  424. 返回:
  425. - GenTableColumnModel | None: 业务表字段列表信息对象。
  426. """
  427. return await self.get(table_id=table_id, preload=preload)
  428. async def list_gen_table_column_crud_by_table_id(
  429. self,
  430. table_id: int,
  431. order_by: list | None = None,
  432. preload: list | None = None,
  433. ) -> Sequence[GenTableColumnModel]:
  434. """根据业务表ID查询业务表字段列表。
  435. 参数:
  436. - table_id (int): 业务表ID。
  437. - order_by (list | None): 排序字段列表,每个元素为{"field": "字段名", "order": "asc" | "desc"}。
  438. - preload (list | None): 预加载关系,未提供时使用模型默认项
  439. 返回:
  440. - Sequence[GenTableColumnModel]: 业务表字段列表信息对象序列。
  441. """
  442. return await self.list(search={"table_id": table_id}, order_by=order_by, preload=preload)
  443. async def get_gen_db_table_columns_by_name(
  444. self, table_name: str | None
  445. ) -> list[GenTableColumnOutSchema]:
  446. """
  447. 根据业务表名称获取业务表字段列表信息。
  448. 参数:
  449. - table_name (str | None): 业务表名称。
  450. 返回:
  451. - list[GenTableColumnOutSchema]: 业务表字段列表信息对象。
  452. """
  453. # 检查表名是否为空
  454. if not table_name:
  455. raise ValueError("数据表名称不能为空")
  456. try:
  457. # 直接调用同步方法获取列信息
  458. columns_info = GenTableColumnCRUD._sync_get_table_columns(
  459. settings.DATABASE_TYPE, table_name
  460. )
  461. # 转换为GenTableColumnOutSchema对象列表
  462. columns_list = [GenTableColumnOutSchema(**column_info) for column_info in columns_info]
  463. return columns_list
  464. except Exception as e:
  465. log.error(f"获取表{table_name}的字段列表时出错: {e!s}")
  466. # 确保即使出错也返回空列表而不是None
  467. raise
  468. async def list_gen_table_column_crud(
  469. self,
  470. search: dict | None = None,
  471. order_by: list | None = None,
  472. preload: list | None = None,
  473. ) -> Sequence[GenTableColumnModel]:
  474. """根据业务表字段查询业务表字段列表。
  475. 参数:
  476. - search (dict | None): 查询参数,例如{"table_id": 1}。
  477. - order_by (list | None): 排序字段列表,每个元素为{"field": "字段名", "order": "asc" | "desc"}。
  478. - preload (list | None): 预加载关系,未提供时使用模型默认项
  479. 返回:
  480. - Sequence[GenTableColumnModel]: 业务表字段列表信息对象序列。
  481. """
  482. return await self.list(search=search, order_by=order_by, preload=preload)
  483. async def create_gen_table_column_crud(
  484. self, data: GenTableColumnSchema
  485. ) -> GenTableColumnModel | None:
  486. """创建业务表字段。
  487. 参数:
  488. - data (GenTableColumnSchema): 业务表字段模型。
  489. 返回:
  490. - GenTableColumnModel | None: 业务表字段列表信息对象。
  491. """
  492. return await self.create(data=data)
  493. async def update_gen_table_column_crud(
  494. self, id: int, data: GenTableColumnSchema
  495. ) -> GenTableColumnModel | None:
  496. """更新业务表字段。
  497. 参数:
  498. - id (int): 业务表字段ID。
  499. - data (GenTableColumnSchema): 业务表字段模型。
  500. 返回:
  501. - GenTableColumnModel | None: 业务表字段列表信息对象。
  502. """
  503. # 将对象转换为字典,避免SQLAlchemy直接操作对象时出现的状态问题
  504. data_dict = data.model_dump(exclude_unset=True)
  505. return await self.update(id=id, data=data_dict)
  506. async def delete_gen_table_column_by_table_id_crud(self, table_ids: list[int]) -> None:
  507. """根据业务表ID批量删除业务表字段。
  508. 参数:
  509. - table_ids (list[int]): 业务表ID列表。
  510. 返回:
  511. - None
  512. """
  513. # 先查询出这些表ID对应的所有字段ID
  514. query = select(GenTableColumnModel.id).where(GenTableColumnModel.table_id.in_(table_ids))
  515. result = await self.auth.db.execute(query)
  516. column_ids = [row[0] for row in result.fetchall()]
  517. # 如果有字段ID,则删除这些字段
  518. if column_ids:
  519. await self.delete(ids=column_ids)
  520. async def delete_gen_table_column_by_column_id_crud(self, column_ids: list[int]) -> None:
  521. """根据业务表字段ID批量删除业务表字段。
  522. 参数:
  523. - column_ids (list[int]): 业务表字段ID列表。
  524. 返回:
  525. - None
  526. """
  527. return await self.delete(ids=column_ids)