| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- from typing import Any
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id_str
- from .crud import DepartmentCRUD
- from .enums import DepartmentStatusEnum
- from .schema import (
- DepartmentCreateSchema,
- DepartmentOutSchema,
- DepartmentTreeSchema,
- DepartmentUpdateSchema,
- )
- class DepartmentService:
- """部门服务层"""
- @classmethod
- async def create_service(
- cls, auth: AuthSchema, data: DepartmentCreateSchema
- ) -> DepartmentOutSchema:
- """创建部门"""
- crud = DepartmentCRUD(auth)
- department_data = data.model_dump(exclude_none=True)
- department_data["department_id"] = get_snowflake_id_str(auth.tenant_id)
- department_data["status"] = DepartmentStatusEnum.DEPARTMENT_ACTIVE.value
- if department_data.get("parent_id"):
- parent = await crud.get_by_department_id(department_data["parent_id"])
- if not parent:
- raise CustomException(msg="父部门不存在")
- department_data["enterprise_id"] = parent.enterprise_id
- else:
- if hasattr(auth, "enterprise_id") and auth.enterprise_id:
- department_data["enterprise_id"] = auth.enterprise_id
- department = await crud.create(department_data)
- if not department:
- raise CustomException(msg="创建部门失败")
- return DepartmentOutSchema.model_validate(department)
- @classmethod
- async def update_service(
- cls, auth: AuthSchema, department_id: str, data: DepartmentUpdateSchema
- ) -> DepartmentOutSchema:
- """更新部门"""
- crud = DepartmentCRUD(auth)
- existing = await crud.get_by_department_id(department_id)
- if not existing:
- raise CustomException(msg="部门不存在")
- update_data = data.model_dump(exclude_none=True)
- if update_data.get("parent_id"):
- if update_data["parent_id"] == department_id:
- raise CustomException(msg="不能将自己设为父部门")
- parent = await crud.get_by_department_id(update_data["parent_id"])
- if not parent:
- raise CustomException(msg="父部门不存在")
- updated = await crud.update_by_department_id(department_id, update_data)
- if not updated:
- raise CustomException(msg="更新部门失败")
- return DepartmentOutSchema.model_validate(updated)
- @classmethod
- async def delete_service(
- cls, auth: AuthSchema, department_id: str
- ) -> None:
- """删除部门"""
- crud = DepartmentCRUD(auth)
- existing = await crud.get_by_department_id(department_id)
- if not existing:
- raise CustomException(msg="部门不存在")
- if await crud.has_children(department_id):
- raise CustomException(msg="请先删除子部门")
- await crud.delete(department_id)
- @classmethod
- async def detail_service(
- cls, auth: AuthSchema, department_id: str
- ) -> DepartmentOutSchema:
- """查询部门详情"""
- crud = DepartmentCRUD(auth)
- department = await crud.get_by_department_id(department_id)
- if not department:
- raise CustomException(msg="部门不存在")
- return DepartmentOutSchema.model_validate(department)
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 100,
- search: dict | None = None,
- ) -> dict:
- """查询部门列表(平铺)"""
- crud = DepartmentCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"sort_order": "desc"}, {"id": "desc"}],
- search=search or {},
- out_schema=DepartmentOutSchema,
- )
- @classmethod
- async def tree_service(cls, auth: AuthSchema) -> list[DepartmentTreeSchema]:
- """获取部门树形结构"""
- crud = DepartmentCRUD(auth)
- search = {}
- if hasattr(auth, "enterprise_id") and auth.enterprise_id:
- search["enterprise_id"] = auth.enterprise_id
- result = await crud.page(
- offset=0,
- limit=1000,
- order_by=[{"sort_order": "desc"}, {"id": "desc"}],
- search=search,
- out_schema=DepartmentOutSchema,
- )
- items = result.get("items", [])
- return cls._build_tree(items)
- @classmethod
- def _build_tree(
- cls, departments: list[dict[str, Any]]
- ) -> list[DepartmentTreeSchema]:
- """构建部门树形结构"""
- department_map = {}
- roots = []
- for dept in departments:
- dept_id = dept.get("department_id")
- if dept_id:
- department_map[dept_id] = DepartmentTreeSchema(
- department_id=dept.get("department_id"),
- name=dept["name"],
- code=dept.get("code"),
- parent_id=dept.get("parent_id"),
- leader_employee_id=dept.get("leader_employee_id"),
- sort_order=dept.get("sort_order", 0),
- status=dept["status"],
- children=[],
- )
- for dept in departments:
- dept_id = dept.get("department_id")
- if dept_id and dept_id in department_map:
- parent_id = dept.get("parent_id")
- if parent_id and parent_id in department_map:
- department_map[parent_id].children.append(
- department_map[dept_id]
- )
- else:
- roots.append(department_map[dept_id])
- return roots
- @classmethod
- async def children_service(
- cls, auth: AuthSchema, department_id: str
- ) -> list[DepartmentOutSchema]:
- """获取子部门列表"""
- crud = DepartmentCRUD(auth)
- children = await crud.get_children(department_id)
- return [DepartmentOutSchema.model_validate(child) for child in children]
- @classmethod
- async def move_service(
- cls, auth: AuthSchema, department_id: str, parent_id: str | None
- ) -> DepartmentOutSchema:
- """移动部门(变更父级)"""
- if parent_id == department_id:
- raise CustomException(msg="不能将自己设为父部门")
- if parent_id:
- crud = DepartmentCRUD(auth)
- parent = await crud.get_by_department_id(parent_id)
- if not parent:
- raise CustomException(msg="目标父部门不存在")
- descendants = await cls._get_all_descendants(department_id)
- if parent_id in descendants:
- raise CustomException(msg="不能移动到子部门下方")
- return await cls.update_service(
- auth, department_id, DepartmentUpdateSchema(parent_id=parent_id)
- )
- @classmethod
- async def _get_all_descendants(
- cls, department_id: str, crud: DepartmentCRUD | None = None
- ) -> set[str]:
- """获取部门所有后代ID"""
- if crud is None:
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.request import get_current_auth
- auth = await get_current_auth()
- crud = DepartmentCRUD(auth)
- descendants = set()
- children = await crud.get_children(department_id)
- for child in children:
- if child.department_id:
- descendants.add(child.department_id)
- child_descendants = await cls._get_all_descendants(
- child.department_id, crud
- )
- descendants.update(child_descendants)
- return descendants
- @classmethod
- async def options_service(cls, auth: AuthSchema) -> list[dict]:
- """获取部门选择选项"""
- crud = DepartmentCRUD(auth)
- search = {}
- if hasattr(auth, "enterprise_id") and auth.enterprise_id:
- search["enterprise_id"] = auth.enterprise_id
- result = await crud.page(
- offset=0,
- limit=1000,
- order_by=[{"sort_order": "desc"}, {"id": "desc"}],
- search=search,
- out_schema=DepartmentOutSchema,
- )
- items = result.get("items", [])
- return [
- {
- "value": item.department_id,
- "label": item.name,
- "disabled": False
- }
- for item in items if item.department_id
- ]
|