service.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. from typing import Any
  2. from app.api.v1.module_system.auth.schema import AuthSchema
  3. from app.core.base_schema import BatchSetAvailable
  4. from app.core.exceptions import CustomException
  5. from app.utils.common_util import (
  6. get_child_id_map,
  7. get_child_recursion,
  8. get_parent_id_map,
  9. get_parent_recursion,
  10. traversal_to_tree,
  11. )
  12. from .crud import MenuCRUD
  13. from .schema import (
  14. MenuCreateSchema,
  15. MenuOutSchema,
  16. MenuQueryParam,
  17. MenuUpdateSchema,
  18. )
  19. class MenuService:
  20. """
  21. 菜单模块服务层
  22. """
  23. @classmethod
  24. async def _validate_parent_child_type(
  25. cls, auth: AuthSchema, parent_id: int | None, child_type: int
  26. ) -> None:
  27. """
  28. 父子类型约束:目录下仅允许目录/菜单/外链;菜单下仅允许按钮;按钮与外链下不可挂子级。
  29. 无父级时仅允许目录、菜单、外链(与前端一致)。
  30. """
  31. if parent_id is None:
  32. if child_type not in (1, 2, 4):
  33. raise CustomException(msg="顶级菜单仅允许目录、菜单或外链类型")
  34. return
  35. parent = await MenuCRUD(auth).get_by_id_crud(id=parent_id)
  36. if not parent:
  37. raise CustomException(msg="父级菜单不存在")
  38. pt = parent.type
  39. if pt == 1:
  40. if child_type not in (1, 2, 4):
  41. raise CustomException(msg="目录下仅允许新增目录、菜单或外链")
  42. elif pt == 2:
  43. if child_type != 3:
  44. raise CustomException(msg="菜单下仅允许新增按钮")
  45. else:
  46. raise CustomException(msg="菜单或链接类型下不允许新增子菜单")
  47. @classmethod
  48. async def get_menu_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  49. """
  50. 获取菜单详情。
  51. 参数:
  52. - auth (AuthSchema): 认证对象。
  53. - id (int): 菜单ID。
  54. 返回:
  55. - dict: 菜单详情对象。
  56. """
  57. menu = await MenuCRUD(auth).get_by_id_crud(id=id)
  58. # 创建实例后再设置parent_name属性
  59. menu_out = MenuOutSchema.model_validate(menu)
  60. if menu and menu.parent_id:
  61. parent = await MenuCRUD(auth).get_by_id_crud(id=menu.parent_id)
  62. if parent:
  63. menu_out.parent_name = parent.name
  64. return menu_out.model_dump()
  65. @classmethod
  66. async def get_menu_tree_service(
  67. cls,
  68. auth: AuthSchema,
  69. search: MenuQueryParam | None = None,
  70. order_by: list[dict] | None = None,
  71. ) -> list[dict]:
  72. """
  73. 获取菜单树形列表。
  74. 参数:
  75. - auth (AuthSchema): 认证对象。
  76. - search (MenuQueryParam | None): 查询参数对象。
  77. - order_by (list[dict] | None): 排序参数列表。
  78. 返回:
  79. - list[dict]: 菜单树形列表对象。
  80. """
  81. # 使用树形结构查询,预加载children关系
  82. menu_list = await MenuCRUD(auth).get_tree_list_crud(
  83. search=search.__dict__, order_by=order_by
  84. )
  85. # 转换为字典列表
  86. menu_dict_list = [MenuOutSchema.model_validate(menu).model_dump() for menu in menu_list]
  87. # 使用traversal_to_tree构建树形结构
  88. tree = traversal_to_tree(menu_dict_list)
  89. # 对外链菜单(type=4)转换路由数据
  90. MenuService._transform_external_menus(tree)
  91. return tree
  92. @staticmethod
  93. def _transform_external_menus(items: list[dict]) -> None:
  94. """递归转换外链菜单的路由和组件路径"""
  95. for item in items:
  96. if item.get("type") == 4:
  97. original_url = item.get("route_path", "")
  98. route_name = item.get("route_name") or "external"
  99. item["route_path"] = f"/{route_name}"
  100. item["component_path"] = "module_system/menu/ExternalLink"
  101. # 把原始 URL 存入 params,供 ExternalLink 组件读取
  102. item["params"] = [{"key": "url", "value": original_url}]
  103. if item.get("children"):
  104. MenuService._transform_external_menus(item["children"])
  105. @classmethod
  106. async def create_menu_service(cls, auth: AuthSchema, data: MenuCreateSchema) -> dict:
  107. """
  108. 创建菜单。
  109. 参数:
  110. - auth (AuthSchema): 认证对象。
  111. - data (MenuCreateSchema): 创建参数对象。
  112. 返回:
  113. - dict: 创建的菜单对象。
  114. """
  115. search: dict[str, Any] = {"title": data.title}
  116. if data.parent_id is not None:
  117. search["parent_id"] = data.parent_id
  118. menu = await MenuCRUD(auth).get(**search)
  119. if menu:
  120. raise CustomException(msg="创建失败,该菜单已存在")
  121. await cls._validate_parent_child_type(auth, data.parent_id, data.type)
  122. new_menu = await MenuCRUD(auth).create(data=data)
  123. new_menu_dict = MenuOutSchema.model_validate(new_menu).model_dump()
  124. return new_menu_dict
  125. @classmethod
  126. async def update_menu_service(cls, auth: AuthSchema, id: int, data: MenuUpdateSchema) -> dict:
  127. """
  128. 更新菜单。
  129. 参数:
  130. - auth (AuthSchema): 认证对象。
  131. - id (int): 菜单ID。
  132. - data (MenuUpdateSchema): 更新参数对象。
  133. 返回:
  134. - dict: 更新的菜单对象。
  135. """
  136. menu = await MenuCRUD(auth).get_by_id_crud(id=id)
  137. if not menu:
  138. raise CustomException(msg="更新失败,该菜单不存在")
  139. await cls._validate_parent_child_type(auth, data.parent_id, data.type)
  140. search: dict[str, Any] = {"title": data.title}
  141. if data.parent_id is not None:
  142. search["parent_id"] = data.parent_id
  143. exist_menu = await MenuCRUD(auth).get(**search)
  144. if exist_menu and exist_menu.id != id:
  145. raise CustomException(msg="更新失败,菜单标题重复")
  146. if data.parent_id:
  147. parent_menu = await MenuCRUD(auth).get_by_id_crud(id=data.parent_id)
  148. if not parent_menu:
  149. raise CustomException(msg="更新失败,父级菜单不存在")
  150. data.parent_name = parent_menu.title
  151. new_menu = await MenuCRUD(auth).update(id=id, data=data)
  152. await cls.set_menu_available_service(
  153. auth=auth, data=BatchSetAvailable(ids=[id], status=data.status)
  154. )
  155. new_menu_dict = MenuOutSchema.model_validate(new_menu).model_dump()
  156. return new_menu_dict
  157. @classmethod
  158. async def delete_menu_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  159. """
  160. 删除菜单。
  161. 参数:
  162. - auth (AuthSchema): 认证对象。
  163. - ids (list[int]): 菜单ID列表。
  164. 返回:
  165. - None
  166. """
  167. if len(ids) < 1:
  168. raise CustomException(msg="删除失败,删除对象不能为空")
  169. # 获取所有菜单列表,用于构建树形关系
  170. all_menus = await MenuCRUD(auth).get_list_crud()
  171. # 构建子菜单ID映射
  172. child_id_map = get_child_id_map(model_list=all_menus)
  173. # 收集所有需要删除的菜单ID,包括直接指定的ID和它们的所有子菜单ID
  174. delete_ids_set = set()
  175. for id in ids:
  176. # 递归获取该ID的所有子菜单ID
  177. all_descendants = get_child_recursion(id=id, id_map=child_id_map)
  178. delete_ids_set.update(all_descendants)
  179. # 将集合转换为列表
  180. delete_ids = list(delete_ids_set)
  181. # 执行批量删除操作
  182. await MenuCRUD(auth).delete(ids=delete_ids)
  183. @classmethod
  184. async def set_menu_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  185. """
  186. 递归获取所有父、子级菜单,然后批量修改菜单可用状态。
  187. 参数:
  188. - auth (AuthSchema): 认证对象。
  189. - data (BatchSetAvailable): 批量设置可用参数对象。
  190. 返回:
  191. - None
  192. """
  193. menu_list = await MenuCRUD(auth).get_list_crud()
  194. total_ids = []
  195. if data.status == "0":
  196. # 激活,则需要把所有父级菜单都激活
  197. id_map = get_parent_id_map(model_list=menu_list)
  198. for menu_id in data.ids:
  199. enable_ids = get_parent_recursion(id=menu_id, id_map=id_map)
  200. total_ids.extend(enable_ids)
  201. else:
  202. # 禁止,则需要把所有子级菜单都禁止
  203. id_map = get_child_id_map(model_list=menu_list)
  204. for menu_id in data.ids:
  205. disable_ids = get_child_recursion(id=menu_id, id_map=id_map)
  206. total_ids.extend(disable_ids)
  207. await MenuCRUD(auth).set_available_crud(ids=total_ids, status=data.status)