| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424 |
- import io
- import os
- import re
- import zipfile
- from collections.abc import Callable
- from typing import Any
- import anyio
- import sqlglot
- from sqlglot.expressions import (
- Alter,
- Comment,
- Create,
- Delete,
- Drop,
- Insert,
- Table,
- TruncateTable,
- Update,
- )
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.constant import GenConstant
- from app.config.path_conf import BASE_DIR
- from app.config.setting import settings
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from .crud import GenTableColumnCRUD, GenTableCRUD
- from .schema import (
- GenSyncColumnChange,
- GenSyncPreviewSchema,
- GenTableColumnOutSchema,
- GenTableColumnSchema,
- GenTableOutSchema,
- GenTableQueryParam,
- GenTableSchema,
- )
- from .tools.gen_util import GenUtils
- from .tools.jinja2_template_util import Jinja2TemplateUtil
- def handle_service_exception(func: Callable) -> Callable:
- """
- 服务层异步方法装饰器:透传 CustomException,其余异常包装为 CustomException。
- 参数:
- - func (Callable): 被装饰的异步可调用对象。
- 返回:
- - Callable: 包装后的可调用对象(异步)。
- """
- async def wrapper(*args, **kwargs):
- try:
- return await func(*args, **kwargs)
- except CustomException:
- raise
- except Exception as e:
- raise CustomException(msg=f"{func.__name__}执行失败: {e!s}")
- return wrapper
- _MENU_TYPE_CATALOG = 1 # 与 sys_menu.type、前端 MenuTypeEnum.CATALOG 一致
- _MENU_TYPE_MENU = 2
- class GenTableService:
- """代码生成业务表服务层"""
- @classmethod
- async def _effective_package_name(
- cls, auth: AuthSchema, parent_catalog_id: int | None, package_name: str | None
- ) -> str:
- """根据「是否选择上级目录」计算最终包名(分系统根目录)。
- 规则(与你描述一致):
- - **未选上级目录**:认为是「新分系统」,包名固定为 ``module_目录``(即 ``module_xxx``)。
- - **已选上级目录**:认为是「分系统内新模块」,包名继承上级目录对应的 ``module_xxx``。
- """
- pn = (package_name or "").strip()
- # 1) 选择上级目录:从上级菜单 route_path 第一段推断 module_xxx
- if parent_catalog_id is not None:
- from app.api.v1.module_system.menu.crud import MenuCRUD
- m = await MenuCRUD(auth).get_by_id_crud(parent_catalog_id)
- if not m:
- raise CustomException(msg="上级菜单不存在")
- route_path = (getattr(m, "route_path", None) or "").strip()
- # 期望形如 /module_xxx 或 /module_xxx/yyy
- seg = route_path.strip("/").split("/", 1)[0] if route_path else ""
- seg = (seg or "").strip()
- if seg:
- return seg if seg.startswith("module_") else f"module_{seg}"
- # 路由缺失则回退包名字段
- if pn:
- return pn if pn.startswith("module_") else f"module_{pn}"
- raise CustomException(msg="无法从上级目录推断分系统包名,请先完善上级目录路由")
- # 2) 未选上级目录:以包名字段为准,并确保 module_ 前缀
- if not pn:
- raise CustomException(msg="包名不能为空")
- return pn if pn.startswith("module_") else f"module_{pn}"
- @classmethod
- async def _assert_parent_menu_is_catalog(cls, auth: AuthSchema, parent_menu_id: int | None) -> None:
- """上级菜单仅允许目录:与前端树只展示目录一致,避免挂到菜单/按钮下。"""
- if parent_menu_id is None:
- return
- from app.api.v1.module_system.menu.crud import MenuCRUD
- m = await MenuCRUD(auth).get_by_id_crud(parent_menu_id)
- if not m:
- raise CustomException(msg="上级菜单不存在")
- if m.type != _MENU_TYPE_CATALOG:
- raise CustomException(msg="上级菜单须选择目录类型")
- @classmethod
- def _menu_route_first_segment(
- cls, parent_catalog_id: int | None, package_name: str, module_name: str | None
- ) -> str:
- """前端页面路由首段(与菜单 ``route_path`` 第一段一致)。
- 统一规则:始终使用分系统包名 ``module_xxx`` 作为路由首段。
- - **无上级目录**:``/module_xxx/...``(新分系统)
- - **有上级目录**:``/module_xxx/...``(继承上级所属分系统)
- """
- pn = (package_name or "").strip()
- if not pn:
- raise CustomException(msg="包名不能为空")
- return pn if pn.startswith("module_") else f"module_{pn}"
- @classmethod
- def _catalog_menu_dir_key(
- cls, parent_catalog_id: int | None, package_name: str, module_name: str | None
- ) -> str:
- """菜单上「模块目录」节点的 name(与路由第一段 package 独立)。
- 统一为 **目录 → 菜单 → 按钮**:
- - 目录节点固定为 ``module_name``(你填写的“模块”)
- - 是否选择上级目录,仅影响分系统根 ``module_xxx`` 的推断方式(见 ``_effective_package_name``)
- """
- pn = (package_name or "").strip()
- mn = (module_name or "").strip()
- if not pn:
- raise CustomException(msg="包名不能为空")
- if not mn:
- raise CustomException(msg="模块名不能为空")
- return mn
- @classmethod
- async def _get_or_create_package_directory_menu(
- cls,
- menu_crud: Any,
- parent_catalog_id: int | None,
- package_name: str,
- module_name: str | None,
- business_name: str,
- ) -> int:
- """创建或复用 type=1 模块目录;固定为「目录 → 菜单 → 按钮」中的第一层目录。"""
- from app.api.v1.module_system.menu.schema import MenuCreateSchema
- from app.utils.common_util import CamelCaseUtil
- pn = (package_name or "").strip()
- if not pn:
- raise CustomException(msg="包名不能为空")
- mn = (module_name or "").strip()
- dir_key = cls._catalog_menu_dir_key(parent_catalog_id, pn, module_name)
- if parent_catalog_id is not None:
- existing = await menu_crud.get(
- name=dir_key, type=_MENU_TYPE_CATALOG, parent_id=parent_catalog_id
- )
- else:
- existing = await menu_crud.get(
- name=dir_key, type=_MENU_TYPE_CATALOG, parent_id=("None", None)
- )
- if existing:
- log.info(
- f"代码生成:复用模块目录菜单 id={existing.id} name={dir_key!r} parent={parent_catalog_id!r}"
- )
- return int(existing.id)
- route_first = cls._menu_route_first_segment(parent_catalog_id, pn, module_name)
- # 目录菜单固定跳到模块根:/{module_xxx}/{module_name}
- catalog_route_path = f"/{route_first}/{mn}"
- redirect = f"/{route_first}/{mn}"
- # route_name 须唯一且体现「分系统+模块目录」,勿仅用 package(会与 module_example 根混淆)
- catalog_route_name = CamelCaseUtil.snake_to_camel(f"{route_first}_{mn}")
- created = await menu_crud.create(
- MenuCreateSchema(
- name=dir_key,
- type=_MENU_TYPE_CATALOG,
- order=9999,
- permission=None,
- icon="menu",
- route_name=catalog_route_name,
- route_path=catalog_route_path,
- component_path=None,
- redirect=redirect,
- hidden=False,
- keep_alive=True,
- always_show=False,
- title=dir_key,
- params=None,
- affix=False,
- parent_id=parent_catalog_id,
- status="0",
- description="模块目录(代码生成)",
- )
- )
- log.info(
- f"代码生成:新建模块目录菜单 id={created.id} name={dir_key!r} under_parent={parent_catalog_id!r}"
- )
- return int(created.id)
- @classmethod
- def normalize_and_validate_master_sub(cls, data: GenTableSchema) -> None:
- """
- 主子表业务规则:子表表名与外键列同填或同空;子表表名不得与主表相同。
- 参数:
- - data (GenTableSchema): 主表配置。
- 返回:
- - None
- 异常:
- - CustomException: 规则不满足时抛出。
- """
- sn = data.sub_table_name
- fk = data.sub_table_fk_name
- if bool(sn) ^ bool(fk):
- raise CustomException(msg="子表表名与子表外键列须同时填写或同时留空")
- tn = (data.table_name or "").strip()
- if sn and fk and sn == tn:
- raise CustomException(msg="子表表名不能与主表表名相同")
- @classmethod
- @handle_service_exception
- async def get_gen_table_detail_service(cls, auth: AuthSchema, table_id: int) -> dict:
- """获取详细信息。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_id (int): 业务表ID。
- 返回:
- - dict: 包含业务表详细信息的字典。
- """
- gen_table = await cls.get_gen_table_by_id_service(auth, table_id)
- return gen_table.model_dump()
- @classmethod
- @handle_service_exception
- async def get_gen_table_list_service(
- cls, auth: AuthSchema, search: GenTableQueryParam
- ) -> list[dict]:
- """
- 获取代码生成业务表列表信息。
- 参数:
- - auth (AuthSchema): 认证信息。
- - search (GenTableQueryParam): 查询参数模型。
- 返回:
- - list[dict]: 包含业务表列表信息的字典列表。
- """
- gen_table_list_result = await GenTableCRUD(auth=auth).get_gen_table_list(search)
- return [GenTableOutSchema.model_validate(obj).model_dump() for obj in gen_table_list_result]
- @classmethod
- @handle_service_exception
- async def get_gen_table_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: GenTableQueryParam,
- order_by: list[dict[str, str]] | None = None,
- ) -> dict:
- """
- 分页查询代码生成业务表(数据库 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - page_no (int): 页码。
- - page_size (int): 每页条数。
- - search (GenTableQueryParam): 查询条件。
- - order_by (list[dict[str, str]] | None): 排序。
- 返回:
- - dict: 分页结果。
- """
- offset = (page_no - 1) * page_size
- order = order_by or [{"created_time": "desc"}]
- return await GenTableCRUD(auth=auth).page(
- offset=offset,
- limit=page_size,
- order_by=order,
- search=search.__dict__,
- out_schema=GenTableOutSchema,
- )
- @classmethod
- @handle_service_exception
- async def get_gen_db_table_list_service(
- cls, auth: AuthSchema, search: GenTableQueryParam
- ) -> list[Any]:
- """获取数据库表列表。
- 参数:
- - auth (AuthSchema): 认证信息。
- - search (GenTableQueryParam): 查询参数模型。
- 返回:
- - list[Any]: 包含数据库表列表信息的任意类型列表。
- """
- gen_db_table_list_result = await GenTableCRUD(auth=auth).get_db_table_list(search)
- return gen_db_table_list_result
- @classmethod
- @handle_service_exception
- async def get_gen_db_table_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: GenTableQueryParam,
- ) -> dict[str, Any]:
- """
- 数据库表列表分页(数据库侧 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - page_no (int): 页码。
- - page_size (int): 每页条数。
- - search (GenTableQueryParam): 查询条件。
- 返回:
- - dict[str, Any]: 含 items、total、has_next 等字段。
- """
- offset = (page_no - 1) * page_size
- items, total = await GenTableCRUD(auth=auth).get_db_table_page(
- search=search, offset=offset, limit=page_size
- )
- return {
- "items": items,
- "total": total,
- "page_no": page_no,
- "page_size": page_size,
- "has_next": offset + page_size < total,
- }
- @classmethod
- @handle_service_exception
- async def get_gen_db_table_list_by_name_service(
- cls, auth: AuthSchema, table_names: list[str]
- ) -> list[GenTableOutSchema]:
- """根据表名称组获取数据库表信息。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_names (list[str]): 业务表名称列表。
- 返回:
- - list[GenTableOutSchema]: 包含业务表详细信息的模型列表。
- """
- gen_db_table_list_result = await GenTableCRUD(auth).get_db_table_list_by_names(table_names)
- # 修复:将GenDBTableSchema对象转换为字典后再传递给GenTableOutSchema
- result = [
- GenTableOutSchema(**gen_table.model_dump()) for gen_table in gen_db_table_list_result
- ]
- return result
- @classmethod
- @handle_service_exception
- async def import_gen_table_service(
- cls, auth: AuthSchema, gen_table_list: list[GenTableOutSchema]
- ) -> bool:
- """导入表结构到生成器。
- 参数:
- - auth (AuthSchema): 认证信息。
- - gen_table_list (list[GenTableOutSchema]): 包含业务表详细信息的模型列表。
- 返回:
- - bool: 成功时返回True,失败时抛出异常。
- """
- # 检查是否有表需要导入
- if not gen_table_list:
- raise CustomException(msg="导入的表结构不能为空")
- try:
- for table in gen_table_list:
- _row = {
- k: v
- for k, v in table.model_dump().items()
- if k in GenTableSchema.model_fields
- }
- cls.normalize_and_validate_master_sub(GenTableSchema.model_validate(_row))
- table_name = table.table_name
- # 检查表是否已存在
- existing_table = await GenTableCRUD(auth).get_gen_table_by_name(table_name)
- if existing_table:
- raise CustomException(msg=f"以下表已存在,不能重复导入: {table_name}")
- GenUtils.init_table(table)
- if not table.columns:
- table.columns = []
- add_gen_table = await GenTableCRUD(auth).add_gen_table(
- GenTableSchema.model_validate(table.model_dump())
- )
- gen_table_columns = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(
- table_name
- )
- if len(gen_table_columns) > 0:
- table.id = add_gen_table.id
- for column in gen_table_columns:
- column_schema = GenTableColumnSchema(
- table_id=table.id,
- column_name=column.column_name,
- column_comment=column.column_comment,
- column_type=column.column_type,
- column_length=column.column_length,
- column_default=column.column_default,
- is_pk=column.is_pk,
- is_increment=column.is_increment,
- is_nullable=column.is_nullable,
- is_unique=column.is_unique,
- sort=column.sort,
- python_type=column.python_type,
- python_field=column.python_field,
- )
- GenUtils.init_column_field(column_schema, table)
- await GenTableColumnCRUD(auth).create_gen_table_column_crud(column_schema)
- return True
- except Exception as e:
- raise CustomException(msg=f"导入失败, {e!s}")
- @classmethod
- @handle_service_exception
- async def create_table_service(cls, auth: AuthSchema, sql: str) -> bool | None:
- """创建表结构并导入至代码生成模块。
- 参数:
- - auth (AuthSchema): 认证信息。
- - sql (str): 包含`CREATE TABLE`语句的SQL字符串。
- 返回:
- - bool | None: 成功时返回True,失败时抛出异常。
- """
- # 验证SQL非空
- if not sql or not sql.strip():
- raise CustomException(msg="SQL语句不能为空")
- try:
- # 解析SQL语句
- sql_statements = sqlglot.parse(sql, dialect=settings.DATABASE_TYPE)
- if not sql_statements:
- raise CustomException(msg="无法解析SQL语句,请检查SQL语法")
- # 校验 SQL 是否为合法的建表语句集合:
- # - 允许:CREATE TABLE + COMMENT ON TABLE/COLUMN +(可选)ALTER TABLE ADD CONSTRAINT ... FOREIGN KEY ...
- # - 禁止:DROP/DELETE/INSERT/UPDATE/TRUNCATE 等破坏性语句
- has_create = any(isinstance(s, Create) for s in sql_statements)
- if not has_create:
- raise CustomException(msg="sql语句不是合法的建表语句:缺少 CREATE TABLE")
- forbidden = (Delete, Drop, Insert, TruncateTable, Update)
- if any(isinstance(s, forbidden) for s in sql_statements):
- raise CustomException(msg="sql语句包含禁止的关键操作(DROP/DELETE/INSERT/UPDATE/TRUNCATE)")
- # 获取要创建的表名
- table_names = []
- for sql_statement in sql_statements:
- if isinstance(sql_statement, Create):
- table = sql_statement.find(Table)
- if table and table.name:
- table_names.append(table.name)
- table_names = list(set(table_names))
- # 创建CRUD实例
- gen_table_crud = GenTableCRUD(auth=auth)
- # 检查每个表是否已存在
- for table_name in table_names:
- # 检查数据库中是否已存在该表
- if await gen_table_crud.check_table_exists(table_name):
- raise CustomException(msg=f"表 {table_name} 已存在,请检查并修改表名后重试")
- # 检查代码生成模块中是否已导入该表
- existing_table = await gen_table_crud.get_gen_table_by_name(table_name)
- if existing_table:
- raise CustomException(
- msg=f"表 {table_name} 已在代码生成模块中存在,请检查并修改表名后重试"
- )
- # 表不存在,执行SQL语句创建表
- for sql_statement in sql_statements:
- # 只执行白名单语句:Create / Comment /(受限)Alter
- if not isinstance(sql_statement, (Create, Comment, Alter)):
- continue
- exc_sql = sql_statement.sql(dialect=settings.DATABASE_TYPE)
- log.info(f"执行SQL语句: {exc_sql}")
- # ALTER 仅允许添加外键约束,避免任意 ALTER 带来的破坏性
- if isinstance(sql_statement, Alter):
- upper = exc_sql.upper()
- allow = (
- "ALTER TABLE" in upper
- and "ADD" in upper
- and "CONSTRAINT" in upper
- and "FOREIGN KEY" in upper
- and "DROP" not in upper
- and "RENAME" not in upper
- and "SET " not in upper
- )
- if not allow:
- raise CustomException(
- msg="仅允许 ALTER TABLE ADD CONSTRAINT ... FOREIGN KEY ...(拒绝其它 ALTER)"
- )
- if not await gen_table_crud.execute_sql(exc_sql):
- raise CustomException(msg=f"执行SQL语句 {exc_sql} 失败,请检查数据库")
- return True
- except Exception as e:
- raise CustomException(msg=f"创建表结构失败: {e!s}")
- @classmethod
- @handle_service_exception
- async def update_gen_table_service(
- cls, auth: AuthSchema, data: GenTableSchema, table_id: int
- ) -> dict[str, Any]:
- """编辑业务表信息。
- 参数:
- - auth (AuthSchema): 认证信息。
- - data (GenTableSchema): 包含业务表详细信息的模型。
- - table_id (int): 业务表ID。
- 返回:
- - dict[str, Any]: 更新后的业务表信息。
- """
- # 处理params为None的情况
- gen_table_info = await cls.get_gen_table_by_id_service(auth, table_id)
- if gen_table_info.id:
- try:
- cls.normalize_and_validate_master_sub(data)
- await cls._assert_parent_menu_is_catalog(auth, data.parent_menu_id)
- # 直接调用edit_gen_table方法,它会在内部处理排除嵌套字段的逻辑
- result = await GenTableCRUD(auth).edit_gen_table(table_id, data)
- if not result:
- raise CustomException(msg="更新业务表信息失败")
- # 处理data.columns为None的情况
- if data.columns:
- for gen_table_column in data.columns:
- # 确保column有id字段
- if hasattr(gen_table_column, "id") and gen_table_column.id:
- column_schema = GenTableColumnSchema(**gen_table_column.model_dump())
- await GenTableColumnCRUD(auth).update_gen_table_column_crud(
- gen_table_column.id, column_schema
- )
- # 重新获取带有预加载关系的对象,避免懒加载导致的MissingGreenlet错误
- updated_gen_table = await GenTableCRUD(auth).get_gen_table_by_id(table_id)
- out = GenTableOutSchema.model_validate(updated_gen_table)
- await cls.set_pk_column(out)
- await cls.hydrate_sub_table(auth, out)
- return out.model_dump()
- except CustomException:
- raise
- except Exception as e:
- raise CustomException(msg=str(e))
- else:
- raise CustomException(msg="业务表不存在")
- @classmethod
- @handle_service_exception
- async def delete_gen_table_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """删除业务表信息(先删字段,再删表)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - ids (list[int]): 业务表ID列表。
- 返回:
- - None
- """
- # 验证ID列表非空
- if not ids:
- raise CustomException(msg="ID列表不能为空")
- try:
- # 先删除相关的字段信息
- await GenTableColumnCRUD(auth=auth).delete_gen_table_column_by_table_id_crud(ids)
- # 再删除表信息
- await GenTableCRUD(auth=auth).delete_gen_table(ids)
- except Exception as e:
- raise CustomException(msg=str(e))
- @classmethod
- @handle_service_exception
- async def get_gen_table_by_id_service(
- cls, auth: AuthSchema, table_id: int
- ) -> GenTableOutSchema:
- """获取需要生成代码的业务表详细信息。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_id (int): 业务表ID。
- 返回:
- - GenTableOutSchema: 业务表详细信息模型。
- """
- gen_table = await GenTableCRUD(auth=auth).get_gen_table_by_id(table_id)
- if not gen_table:
- raise CustomException(msg="业务表不存在")
- result = GenTableOutSchema.model_validate(gen_table)
- await cls.set_pk_column(result)
- await cls.hydrate_sub_table(auth, result)
- return result
- @classmethod
- @handle_service_exception
- async def get_gen_table_all_service(cls, auth: AuthSchema) -> list[GenTableOutSchema]:
- """获取所有业务表信息(列表)。
- 参数:
- - auth (AuthSchema): 认证信息。
- 返回:
- - list[GenTableOutSchema]: 业务表详细信息模型列表。
- """
- gen_table_all = await GenTableCRUD(auth=auth).get_gen_table_all() or []
- result = []
- for gen_table in gen_table_all:
- try:
- table_out = GenTableOutSchema.model_validate(gen_table)
- result.append(table_out)
- except Exception as e:
- log.error(f"转换业务表时出错: {e!s}")
- continue
- return result
- @classmethod
- @handle_service_exception
- async def preview_code_service(cls, auth: AuthSchema, table_id: int) -> dict[str, Any]:
- """
- 预览代码(根据模板渲染内存结果)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_id (int): 业务表ID。
- 返回:
- - dict[str, Any]: 文件名到渲染内容的映射。
- """
- raw = await GenTableCRUD(auth).get_gen_table_by_id(table_id)
- if not raw:
- raise CustomException(msg="业务表不存在")
- gen_table = GenTableOutSchema.model_validate(raw)
- await cls.set_pk_column(gen_table)
- await cls.hydrate_sub_table(auth, gen_table)
- cls._assert_master_sub_config_valid(gen_table)
- # 预览回显的路径/包名规则必须与「写入本地」一致:
- # - 选择上级目录:继承上级目录所属 module_xxx
- # - 未选上级目录:使用表单包名(并补齐 module_ 前缀)
- gen_table.package_name = await cls._effective_package_name(
- auth, gen_table.parent_menu_id, gen_table.package_name
- )
- # 子表与主表同分系统/同模块
- if gen_table.sub and gen_table.sub_table:
- gen_table.sub_table.package_name = gen_table.package_name
- if not (gen_table.sub_table.module_name or "").strip():
- gen_table.sub_table.module_name = gen_table.module_name
- env = Jinja2TemplateUtil.get_env()
- context = Jinja2TemplateUtil.prepare_context(gen_table)
- template_list = Jinja2TemplateUtil.get_template_list()
- preview_code_result: dict[str, Any] = {}
- for template in template_list:
- try:
- render_content = await env.get_template(template).render_async(**context)
- out_key = Jinja2TemplateUtil.get_file_name(template, gen_table)
- preview_code_result[out_key] = render_content
- except Exception as e:
- log.error(f"渲染模板 {template} 时出错: {e!s}")
- out_key = Jinja2TemplateUtil.get_file_name(template, gen_table)
- preview_code_result[out_key] = f"渲染错误: {e!s}"
- if gen_table.sub and gen_table.sub_table:
- sub_ctx = Jinja2TemplateUtil.prepare_sub_render_context(gen_table, gen_table.sub_table)
- sub_table = gen_table.sub_table
- for template in template_list:
- try:
- render_content = await env.get_template(template).render_async(**sub_ctx)
- out_key = Jinja2TemplateUtil.get_file_name(template, sub_table)
- preview_code_result[out_key] = render_content
- except Exception as e:
- log.error(f"渲染子表模板 {template} 时出错: {e!s}")
- out_key = Jinja2TemplateUtil.get_file_name(template, sub_table)
- preview_code_result[out_key] = f"渲染错误: {e!s}"
- return preview_code_result
- @classmethod
- @handle_service_exception
- async def generate_code_service(cls, auth: AuthSchema, table_name: str) -> bool:
- """生成代码至指定路径(安全写入+可跳过覆盖)。
- 菜单固定为 **目录(type=1) + 菜单(type=2) + 按钮(type=3)**:
- - **目录层(name)**:固定为你填写的 ``module_name``(模块)
- - **分系统根(package_name)**:
- - 选上级目录:继承上级目录所属 ``module_xxx``
- - 未选上级目录:使用你填写的包名(自动补齐 ``module_`` 前缀)
- - **页面路由**:``/{module_xxx}/{module_name}/{business_path}``
- - **组件路径**:``module_xxx/module_name/business_path/index``
- - **后端 HTTP 接口前缀**:由动态路由发现容器提供 ``/xxx``(``module_xxx``→去掉 ``module_``)
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_name (str): 业务表名。
- 返回:
- - bool: 生成是否成功。
- """
- # 验证表名非空
- if not table_name or not table_name.strip():
- raise CustomException(msg="表名不能为空")
- env = Jinja2TemplateUtil.get_env()
- render_info = await cls.__get_gen_render_info(auth, table_name)
- gen_table_schema: GenTableOutSchema = render_info[3]
- from app.api.v1.module_system.menu.crud import MenuCRUD
- from app.api.v1.module_system.menu.schema import MenuCreateSchema
- from app.utils.common_util import CamelCaseUtil
- # 按“上级目录”规则矫正最终包名(分系统根)
- gen_table_schema.package_name = await cls._effective_package_name(
- auth, gen_table_schema.parent_menu_id, gen_table_schema.package_name
- )
- # 统一权限前缀(对齐 module_example/demo):
- # - module_xxx:module_name(操作在按钮/模板中追加 :query/:create...)
- pn = (gen_table_schema.package_name or "").strip()
- mn = (gen_table_schema.module_name or "").strip()
- if not mn:
- raise CustomException(msg="模块名不能为空")
- permission_prefix = ":".join([s for s in [pn, mn] if s])
- # 创建菜单 CRUD 实例
- menu_crud = MenuCRUD(auth)
- if not gen_table_schema.function_name:
- raise CustomException(msg="功能名称不能为空")
- if not gen_table_schema.package_name:
- raise CustomException(msg="包名不能为空")
- await cls._assert_parent_menu_is_catalog(auth, gen_table_schema.parent_menu_id)
- # 1. 目录 + 菜单 + 按钮:先取/建模块目录(名称规则见 _catalog_menu_dir_key)
- dir_menu_id = await cls._get_or_create_package_directory_menu(
- menu_crud,
- gen_table_schema.parent_menu_id,
- gen_table_schema.package_name,
- gen_table_schema.module_name,
- gen_table_schema.business_name or "",
- )
- # 检查同一模块目录下是否已有同名功能菜单(避免与其它模块下的同名功能冲突)
- existing_func_menu = await menu_crud.get(
- name=gen_table_schema.function_name,
- type=_MENU_TYPE_MENU,
- parent_id=dir_menu_id,
- )
- if existing_func_menu:
- raise CustomException(
- msg=f"该模块目录下功能菜单「{gen_table_schema.function_name}」已存在,不能重复创建"
- )
- route_seg = cls._menu_route_first_segment(
- gen_table_schema.parent_menu_id,
- gen_table_schema.package_name or "",
- gen_table_schema.module_name,
- )
- _pn = (gen_table_schema.package_name or "").strip()
- _mn = (gen_table_schema.module_name or "").strip()
- if not _mn:
- raise CustomException(msg="模块名不能为空")
- # 与 Jinja2TemplateUtil.get_file_name 统一:module_xxx/{module_name}
- _route_path = f"/{route_seg}/{_mn}"
- _component_path = f"{_pn}/{_mn}/index"
- # 创建功能菜单(类型=2:菜单)
- parent_menu = await menu_crud.create(
- MenuCreateSchema(
- name=gen_table_schema.function_name,
- type=_MENU_TYPE_MENU,
- order=9999,
- permission=f"{permission_prefix}:query",
- icon="menu",
- # route_name 使用模块名(对齐 module_example/demo:/example/demo)
- route_name=CamelCaseUtil.snake_to_camel(_mn),
- route_path=_route_path,
- component_path=_component_path,
- redirect=None,
- hidden=False,
- keep_alive=True,
- always_show=False,
- title=gen_table_schema.function_name,
- params=None,
- affix=False,
- parent_id=dir_menu_id, # 使用目录菜单ID或用户指定的parent_menu_id作为父ID
- status="0",
- description=f"{gen_table_schema.function_name}功能菜单",
- )
- )
- # 创建按钮权限(类型=3:按钮/权限)
- buttons = [
- {
- "name": f"{gen_table_schema.function_name}查询",
- "permission": f"{permission_prefix}:query",
- "order": 1,
- },
- {
- "name": f"{gen_table_schema.function_name}详情",
- "permission": f"{permission_prefix}:detail",
- "order": 2,
- },
- {
- "name": f"{gen_table_schema.function_name}新增",
- "permission": f"{permission_prefix}:create",
- "order": 3,
- },
- {
- "name": f"{gen_table_schema.function_name}修改",
- "permission": f"{permission_prefix}:update",
- "order": 4,
- },
- {
- "name": f"{gen_table_schema.function_name}删除",
- "permission": f"{permission_prefix}:delete",
- "order": 5,
- },
- {
- "name": f"{gen_table_schema.function_name}批量状态修改",
- "permission": f"{permission_prefix}:patch",
- "order": 6,
- },
- {
- "name": f"{gen_table_schema.function_name}导出",
- "permission": f"{permission_prefix}:export",
- "order": 7,
- },
- {
- "name": f"{gen_table_schema.function_name}导入",
- "permission": f"{permission_prefix}:import",
- "order": 8,
- },
- {
- "name": f"{gen_table_schema.function_name}下载导入模板",
- "permission": f"{permission_prefix}:download",
- "order": 9,
- },
- ]
- for button in buttons:
- # 检查按钮权限是否已存在
- await menu_crud.create(
- MenuCreateSchema(
- name=button["name"],
- type=3,
- order=button["order"],
- permission=button["permission"],
- icon=None,
- route_name=None,
- route_path=None,
- component_path=None,
- redirect=None,
- hidden=False,
- keep_alive=True,
- always_show=False,
- title=button["name"],
- params=None,
- affix=False,
- parent_id=parent_menu.id,
- status="0",
- description=f"{gen_table_schema.function_name}功能按钮",
- )
- )
- log.info(f"成功创建按钮权限: {button['name']}")
- log.info(f"成功创建{gen_table_schema.function_name}菜单及按钮权限")
- # 2. 菜单创建成功后,再生成页面代码(主表 + 可选子表)
- async def _write_templates(
- templates: list[str], ctx: dict[str, Any], table_schema: GenTableOutSchema
- ) -> None:
- for template in templates:
- try:
- render_content = await env.get_template(template).render_async(**ctx)
- file_name = Jinja2TemplateUtil.get_file_name(template, table_schema)
- full_path = BASE_DIR.parent.joinpath(file_name)
- gen_path = str(full_path)
- if not gen_path:
- raise CustomException(msg="【代码生成】生成路径为空")
- os.makedirs(os.path.dirname(gen_path), exist_ok=True)
- await anyio.Path(gen_path).write_text(render_content, encoding="utf-8")
- # Python 插件目录需保证包层级可导入:为分系统/模块目录补齐 __init__.py
- # 生成规则固定为 backend/app/plugin/{module_xxx}/{module_name}/...
- pn = (table_schema.package_name or "").strip()
- mn = (table_schema.module_name or "").strip()
- if pn and mn:
- plugin_base = BASE_DIR.parent.joinpath(f"backend/app/plugin/{pn}")
- module_base = plugin_base.joinpath(mn)
- for d in (plugin_base, module_base):
- init_path = d.joinpath("__init__.py")
- if not init_path.exists():
- os.makedirs(str(d), exist_ok=True)
- await anyio.Path(str(init_path)).write_text(
- "# -*- coding: utf-8 -*-", encoding="utf-8"
- )
- except Exception as e:
- raise CustomException(
- msg=f"渲染模板失败,表名:{table_schema.table_name},详细错误信息:{e!s}"
- )
- await _write_templates(render_info[0], render_info[2], gen_table_schema)
- if gen_table_schema.sub and gen_table_schema.sub_table:
- sub_ctx = Jinja2TemplateUtil.prepare_sub_render_context(
- gen_table_schema, gen_table_schema.sub_table
- )
- await _write_templates(render_info[0], sub_ctx, gen_table_schema.sub_table)
- return True
- @classmethod
- @handle_service_exception
- async def batch_gen_code_service(cls, auth: AuthSchema, table_names: list[str]) -> bytes:
- """
- 批量生成代码并打包为ZIP。
- - 备注:内存生成并压缩,兼容多模板类型;供下载使用。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_names (list[str]): 业务表名列表。
- 返回:
- - bytes: 包含所有生成代码的ZIP文件内容。
- """
- valid_names = [t.strip() for t in table_names if t and str(t).strip()]
- if not valid_names:
- raise CustomException(msg="表名列表不能为空")
- zip_buffer = io.BytesIO()
- file_count = 0
- with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
- for table_name in valid_names:
- try:
- env = Jinja2TemplateUtil.get_env()
- render_info = await cls.__get_gen_render_info(auth, table_name)
- gen_tbl = render_info[3]
- for template_file, output_file in zip(
- render_info[0], render_info[1], strict=False
- ):
- render_content = await env.get_template(template_file).render_async(
- **render_info[2]
- )
- zip_file.writestr(output_file, render_content)
- file_count += 1
- if gen_tbl.sub and gen_tbl.sub_table:
- sub_ctx = Jinja2TemplateUtil.prepare_sub_render_context(
- gen_tbl, gen_tbl.sub_table
- )
- sub_tbl = gen_tbl.sub_table
- for template_file in render_info[0]:
- render_content = await env.get_template(template_file).render_async(
- **sub_ctx
- )
- out_path = Jinja2TemplateUtil.get_file_name(template_file, sub_tbl)
- zip_file.writestr(out_path, render_content)
- file_count += 1
- except Exception as e:
- log.error(f"批量生成代码时处理表 {table_name} 出错: {e!s}")
- # 继续处理其他表,不中断整个过程
- continue
- zip_data = zip_buffer.getvalue()
- zip_buffer.close()
- if file_count == 0:
- raise CustomException(
- msg="未能生成任何代码文件:请检查所选表是否存在于代码生成配置中,或主子表、字段配置是否正确"
- )
- return zip_data
- @classmethod
- @handle_service_exception
- async def sync_db_service(cls, auth: AuthSchema, table_name: str, _sync_sub: bool = True) -> None:
- """
- 同步数据库表结构到业务表。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_name (str): 业务表名。
- 返回:
- - None
- """
- # 验证表名非空
- if not table_name or not table_name.strip():
- raise CustomException(msg="表名不能为空")
- gen_table = await GenTableCRUD(auth).get_gen_table_by_name(table_name)
- if not gen_table:
- raise CustomException(msg="业务表不存在")
- table = GenTableOutSchema.model_validate(gen_table)
- if not table.id:
- raise CustomException(msg="业务表ID不能为空")
- table_columns = table.columns or []
- table_column_map = {column.column_name: column for column in table_columns}
- # 确保db_table_columns始终是列表类型,避免None值
- db_table_columns = (
- await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(table_name) or []
- )
- db_table_columns = [col for col in db_table_columns if col is not None]
- db_table_column_names = [column.column_name for column in db_table_columns]
- try:
- # 参考 RuoYi:同步 DB 元信息,但尽量保留用户“生成配置”字段(dict/html/query/python_field...)
- preserve_keys = {
- "dict_type",
- "query_type",
- "python_field",
- "python_type",
- "is_insert",
- "is_edit",
- "is_list",
- "is_query",
- "sort",
- }
- for column in db_table_columns:
- GenUtils.init_column_field(column, table)
- if column.column_name in table_column_map:
- prev_column = table_column_map[column.column_name]
- if getattr(prev_column, "id", None):
- column.id = prev_column.id
- prev_dump = prev_column.model_dump() if hasattr(prev_column, "model_dump") else {}
- for k in preserve_keys:
- if k in prev_dump and prev_dump.get(k) not in (None, ""):
- setattr(column, k, prev_dump.get(k))
- # html_type:保留用户显式选择的非 input;若仍是默认 input,则允许按 DB 重新推断
- prev_html = prev_dump.get("html_type")
- if prev_html not in (None, "", GenConstant.HTML_INPUT):
- column.html_type = prev_html
- # 主键强约束:避免历史配置导致主键出现在新增/编辑/列表/查询
- if bool(getattr(column, "is_pk", False)):
- column.is_insert = False
- column.is_edit = False
- column.is_list = False
- column.is_query = False
- column.query_type = None
- # is_nullable:主键列以 DB 为准,其余保留用户设置
- if not bool(getattr(column, "is_pk", False)) and hasattr(prev_column, "is_nullable"):
- column.is_nullable = prev_column.is_nullable
- if hasattr(column, "id") and column.id:
- await GenTableColumnCRUD(auth).update_gen_table_column_crud(
- column.id, column
- )
- else:
- await GenTableColumnCRUD(auth).create_gen_table_column_crud(column)
- else:
- # 设置table_id以确保新字段能正确关联到表
- column.table_id = table.id
- await GenTableColumnCRUD(auth).create_gen_table_column_crud(column)
- del_columns = [
- column
- for column in table_columns
- if column.column_name not in db_table_column_names
- ]
- if del_columns:
- for column in del_columns:
- if hasattr(column, "id") and column.id:
- await GenTableColumnCRUD(auth).delete_gen_table_column_by_column_id_crud([
- column.id
- ])
- # 主子表:若子表也已导入生成器,则一并同步子表配置(更接近 RuoYi 体验)
- sn = (table.sub_table_name or "").strip()
- fk = (table.sub_table_fk_name or "").strip()
- if _sync_sub and sn and fk:
- sub_cfg = await GenTableCRUD(auth).get_gen_table_by_name(sn)
- if sub_cfg:
- await cls.sync_db_service(auth, sn, _sync_sub=False)
- except Exception as e:
- raise CustomException(msg=f"同步失败: {e!s}")
- @classmethod
- async def hydrate_sub_table(cls, auth: AuthSchema, gen_table: GenTableOutSchema) -> None:
- """
- 主子表:优先使用已导入的子表配置,否则回退为只读 DB 结构。
- 对齐 RuoYi:子表宜为独立 gen_table;主表仅引用。
- 参数:
- - auth (AuthSchema): 认证信息。
- - gen_table (GenTableOutSchema): 主表输出模型(原地填充 sub_table、master_sub_hint 等)。
- 返回:
- - None
- """
- gen_table.master_sub_hint = None
- sub_name_raw = (gen_table.sub_table_name or "").strip()
- fk_raw = (gen_table.sub_table_fk_name or "").strip()
- if not sub_name_raw and not fk_raw:
- gen_table.sub = False
- gen_table.sub_table = None
- return
- if sub_name_raw and not fk_raw:
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = "已填写子表表名,请同时填写「子表外键列」后再保存"
- return
- if fk_raw and not sub_name_raw:
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = "已填写子表外键列,请同时填写「子表表名」后再保存"
- return
- if sub_name_raw == (gen_table.table_name or "").strip():
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = "子表表名不能与主表表名相同"
- return
- # 1) 若子表已作为 gen_table 导入,则使用其 columns 配置(可控、可复用)
- try:
- sub_cfg_model = await GenTableCRUD(auth).get_gen_table_by_name(sub_name_raw, preload=["columns"])
- except Exception:
- sub_cfg_model = None
- if sub_cfg_model:
- sub_cfg = GenTableOutSchema.model_validate(sub_cfg_model)
- await cls.set_pk_column(sub_cfg)
- # 校验外键列存在于子表配置中
- fk_names = {c.column_name for c in (sub_cfg.columns or []) if c.column_name}
- if fk_raw not in fk_names:
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = (
- f"子表「{sub_name_raw}」已导入生成器,但其字段配置中不存在外键列「{fk_raw}」。"
- "请先在子表的字段配置中同步/保存后再生成。"
- )
- return
- gen_table.sub = True
- gen_table.sub_table = sub_cfg
- gen_table.master_sub_hint = (
- "主子表已启用:子表字段来自「已导入的子表配置」(更接近 RuoYi 的方式)。"
- "如需调整子表字段,请在列表中打开该子表进行配置。"
- )
- return
- # 2) 回退:仅从 DB 读取结构(只读,无法配置子表字段)
- try:
- gen_table_columns = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(
- sub_name_raw
- )
- except Exception as e:
- log.warning(f"获取子表 {sub_name_raw} 字段失败: {e!s}")
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = f"无法读取子表结构:{e!s}"
- return
- if not gen_table_columns:
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = (
- f"当前数据库中不存在表「{sub_name_raw}」或该表无列,请先建表再配置主子表"
- )
- return
- fk_names = {c.column_name for c in gen_table_columns if c.column_name}
- if fk_raw not in fk_names:
- gen_table.sub = False
- gen_table.sub_table = None
- gen_table.master_sub_hint = (
- f"子表「{sub_name_raw}」中不存在名为「{fk_raw}」的列,请核对外键列名"
- )
- return
- table_comment = await GenTableCRUD(auth).get_db_table_comment(sub_name_raw)
- sub = GenTableOutSchema.model_validate(
- {
- "id": -1,
- "table_name": sub_name_raw,
- "table_comment": table_comment or None,
- "class_name": GenUtils.convert_class_name(sub_name_raw),
- "package_name": gen_table.package_name,
- "module_name": gen_table.module_name,
- "business_name": sub_name_raw,
- "function_name": re.sub(r"(?:表|测试)", "", table_comment or "") or sub_name_raw,
- "sub_table_name": None,
- "sub_table_fk_name": None,
- "parent_menu_id": gen_table.parent_menu_id,
- "columns": [],
- "sub": False,
- "sub_table": None,
- }
- )
- for column in gen_table_columns:
- col_dump = column.model_dump()
- col_dump["table_id"] = -1
- col_schema = GenTableColumnSchema.model_validate(col_dump)
- GenUtils.init_column_field(col_schema, sub)
- if sub.columns is None:
- sub.columns = []
- sub.columns.append(GenTableColumnOutSchema(**col_schema.model_dump()))
- await cls.set_pk_column(sub)
- gen_table.sub = True
- gen_table.sub_table = sub
- gen_table.master_sub_hint = (
- "主子表已启用:当前子表仅从数据库结构读取(只读)。"
- f"若要像 RuoYi 那样可配置子表字段,请先在「导入」中把子表「{sub_name_raw}」也导入生成器。"
- )
- @classmethod
- def _sync_preview_diff(
- cls,
- current_cols: list[GenTableColumnOutSchema],
- db_cols: list[GenTableColumnOutSchema],
- ) -> tuple[list[str], list[str], list[GenSyncColumnChange], int]:
- cur_map = {c.column_name: c for c in (current_cols or []) if c and c.column_name}
- db_map = {c.column_name: c for c in (db_cols or []) if c and c.column_name}
- cur_names = set(cur_map.keys())
- db_names = set(db_map.keys())
- added = sorted(db_names - cur_names)
- removed = sorted(cur_names - db_names)
- changed: list[GenSyncColumnChange] = []
- unchanged = 0
- keys = [
- "column_type",
- "column_comment",
- "column_default",
- "column_length",
- "is_pk",
- "is_increment",
- "is_nullable",
- "is_unique",
- ]
- for name in sorted(cur_names & db_names):
- before = cur_map[name]
- after = db_map[name]
- diff_fields: list[str] = []
- b_dump = before.model_dump()
- a_dump = after.model_dump()
- for k in keys:
- if b_dump.get(k) != a_dump.get(k):
- diff_fields.append(k)
- if diff_fields:
- changed.append(
- GenSyncColumnChange(
- column_name=name,
- change_fields=diff_fields,
- before={k: b_dump.get(k) for k in keys},
- after={k: a_dump.get(k) for k in keys},
- )
- )
- else:
- unchanged += 1
- return added, removed, changed, unchanged
- @classmethod
- @handle_service_exception
- async def sync_db_preview_service(
- cls, auth: AuthSchema, table_name: str
- ) -> dict[str, Any]:
- """
- 同步数据库前差异预览(主表 + 可选子表)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_name (str): 主表物理表名。
- 返回:
- - dict[str, Any]: 预览差异结构(可序列化)。
- 异常:
- - CustomException: 表名无效或业务表不存在等。
- """
- if not table_name or not table_name.strip():
- raise CustomException(msg="表名不能为空")
- gen_table = await GenTableCRUD(auth).get_gen_table_by_name(table_name, preload=["columns"])
- if not gen_table:
- raise CustomException(msg="业务表不存在")
- table = GenTableOutSchema.model_validate(gen_table)
- if not table.id:
- raise CustomException(msg="业务表ID不能为空")
- db_cols = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(table_name)
- added, removed, changed, unchanged = cls._sync_preview_diff(
- current_cols=table.columns or [],
- db_cols=db_cols or [],
- )
- preview = GenSyncPreviewSchema(
- table_name=table_name,
- added=added,
- removed=removed,
- changed=changed,
- unchanged=unchanged,
- )
- # 子表差异:如果启用主子表,则同时预览子表(无论子表是否已导入)
- sn = (table.sub_table_name or "").strip()
- fk = (table.sub_table_fk_name or "").strip()
- if sn and fk:
- preview.sub_table_name = sn
- # 优先取“已导入的子表配置”,否则用 DB 结构(只读)
- sub_cfg = await GenTableCRUD(auth).get_gen_table_by_name(sn, preload=["columns"])
- if sub_cfg:
- cur_sub_cols = GenTableOutSchema.model_validate(sub_cfg).columns or []
- else:
- cur_sub_cols = []
- db_sub_cols = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(sn)
- s_added, s_removed, s_changed, s_unchanged = cls._sync_preview_diff(
- current_cols=cur_sub_cols,
- db_cols=db_sub_cols or [],
- )
- preview.sub = GenSyncPreviewSchema(
- table_name=sn,
- added=s_added,
- removed=s_removed,
- changed=s_changed,
- unchanged=s_unchanged,
- )
- return preview.model_dump()
- @classmethod
- def _assert_master_sub_config_valid(cls, gen_table: GenTableOutSchema) -> None:
- """预览/生成前校验主子表配置是否可用。"""
- sn = (gen_table.sub_table_name or "").strip()
- fk = (gen_table.sub_table_fk_name or "").strip()
- if not sn and not fk:
- return
- if not sn or not fk:
- raise CustomException(
- msg=gen_table.master_sub_hint
- or "子表表名与子表外键列须同时填写或同时留空"
- )
- if not gen_table.sub_table:
- raise CustomException(
- msg=gen_table.master_sub_hint
- or "无法生成主子表代码:请确认子表已在当前数据库中存在,且外键列名正确"
- )
- @classmethod
- async def set_pk_column(cls, gen_table: GenTableOutSchema) -> None:
- """设置主键列信息(主表/子表)。
- - 备注:同时兼容`pk`布尔与`is_pk == '1'`字符串两种标识。
- 参数:
- - gen_table (GenTableOutSchema): 业务表详细信息模型。
- 返回:
- - None
- """
- if gen_table.columns:
- for column in gen_table.columns:
- is_pk = getattr(column, "is_pk", False)
- if bool(is_pk) if isinstance(is_pk, bool) else str(is_pk) == "1":
- gen_table.pk_column = column
- break
- # 如果没有找到主键列且有列存在,使用第一个列作为主键
- if gen_table.pk_column is None and gen_table.columns:
- gen_table.pk_column = gen_table.columns[0]
- @classmethod
- async def __get_gen_render_info(cls, auth: AuthSchema, table_name: str) -> list[Any]:
- """
- 获取生成代码渲染模板相关信息。
- 参数:
- - auth (AuthSchema): 认证对象。
- - table_name (str): 业务表名称。
- 返回:
- - list[Any]: [模板列表, 输出文件名列表, 渲染上下文, 业务表对象]。
- 异常:
- - CustomException: 当业务表不存在或数据转换失败时抛出。
- """
- gen_table_model = await GenTableCRUD(auth=auth).get_gen_table_by_name(table_name)
- # 检查表是否存在
- if gen_table_model is None:
- raise CustomException(msg=f"业务表 {table_name} 不存在")
- gen_table = GenTableOutSchema.model_validate(gen_table_model)
- # 生成代码时按“上级目录”规则矫正最终包名(不落库,仅影响本次生成/预览/下载/写入)
- gen_table.package_name = await cls._effective_package_name(
- auth, gen_table.parent_menu_id, gen_table.package_name
- )
- await cls.set_pk_column(gen_table)
- await cls.hydrate_sub_table(auth, gen_table)
- cls._assert_master_sub_config_valid(gen_table)
- context = Jinja2TemplateUtil.prepare_context(gen_table)
- template_list = Jinja2TemplateUtil.get_template_list()
- output_files = [
- Jinja2TemplateUtil.get_file_name(template, gen_table) for template in template_list
- ]
- return [template_list, output_files, context, gen_table]
- class GenTableColumnService:
- """代码生成业务表字段服务层"""
- @classmethod
- @handle_service_exception
- async def get_gen_table_column_list_by_table_id_service(
- cls, auth: AuthSchema, table_id: int
- ) -> list[dict[str, Any]]:
- """获取业务表字段列表信息(输出模型)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - table_id (int): 业务表ID。
- 返回:
- - list[dict[str, Any]]: 业务表字段列表,每个元素为字段详细信息字典。
- """
- gen_table_column_list_result = await GenTableColumnCRUD(auth).list_gen_table_column_crud({
- "table_id": table_id
- })
- result = [
- GenTableColumnOutSchema.model_validate(gen_table_column).model_dump()
- for gen_table_column in gen_table_column_list_result
- ]
- return result
|