from typing import Any from app.api.v1.module_system.auth.schema import AuthSchema from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str from .crud import DepartmentCRUD from .enums import DepartmentStatusEnum from .schema import ( DepartmentCreateSchema, DepartmentOutSchema, DepartmentTreeSchema, DepartmentUpdateSchema, ) class DepartmentService: """部门服务层""" @classmethod async def create_service( cls, auth: AuthSchema, data: DepartmentCreateSchema ) -> DepartmentOutSchema: """创建部门""" crud = DepartmentCRUD(auth) department_data = data.model_dump(exclude_none=True) department_data["department_id"] = get_snowflake_id_str(auth.tenant_id) department_data["status"] = DepartmentStatusEnum.DEPARTMENT_ACTIVE.value if department_data.get("parent_id"): parent = await crud.get_by_department_id(department_data["parent_id"]) if not parent: raise CustomException(msg="父部门不存在") department_data["enterprise_id"] = parent.enterprise_id else: if hasattr(auth, "enterprise_id") and auth.enterprise_id: department_data["enterprise_id"] = auth.enterprise_id department = await crud.create(department_data) if not department: raise CustomException(msg="创建部门失败") return DepartmentOutSchema.model_validate(department) @classmethod async def update_service( cls, auth: AuthSchema, department_id: str, data: DepartmentUpdateSchema ) -> DepartmentOutSchema: """更新部门""" crud = DepartmentCRUD(auth) existing = await crud.get_by_department_id(department_id) if not existing: raise CustomException(msg="部门不存在") update_data = data.model_dump(exclude_none=True) if update_data.get("parent_id"): if update_data["parent_id"] == department_id: raise CustomException(msg="不能将自己设为父部门") parent = await crud.get_by_department_id(update_data["parent_id"]) if not parent: raise CustomException(msg="父部门不存在") updated = await crud.update_by_department_id(department_id, update_data) if not updated: raise CustomException(msg="更新部门失败") return DepartmentOutSchema.model_validate(updated) @classmethod async def delete_service( cls, auth: AuthSchema, department_id: str ) -> None: """删除部门""" crud = DepartmentCRUD(auth) existing = await crud.get_by_department_id(department_id) if not existing: raise CustomException(msg="部门不存在") if await crud.has_children(department_id): raise CustomException(msg="请先删除子部门") await crud.delete(department_id) @classmethod async def detail_service( cls, auth: AuthSchema, department_id: str ) -> DepartmentOutSchema: """查询部门详情""" crud = DepartmentCRUD(auth) department = await crud.get_by_department_id(department_id) if not department: raise CustomException(msg="部门不存在") return DepartmentOutSchema.model_validate(department) @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 100, search: dict | None = None, ) -> dict: """查询部门列表(平铺)""" crud = DepartmentCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"sort_order": "desc"}, {"id": "desc"}], search=search or {}, out_schema=DepartmentOutSchema, ) @classmethod async def tree_service(cls, auth: AuthSchema) -> list[DepartmentTreeSchema]: """获取部门树形结构""" crud = DepartmentCRUD(auth) search = {} if hasattr(auth, "enterprise_id") and auth.enterprise_id: search["enterprise_id"] = auth.enterprise_id result = await crud.page( offset=0, limit=1000, order_by=[{"sort_order": "desc"}, {"id": "desc"}], search=search, out_schema=DepartmentOutSchema, ) items = result.get("items", []) return cls._build_tree(items) @classmethod def _build_tree( cls, departments: list[dict[str, Any]] ) -> list[DepartmentTreeSchema]: """构建部门树形结构""" department_map = {} roots = [] for dept in departments: dept_id = dept.get("department_id") if dept_id: department_map[dept_id] = DepartmentTreeSchema( department_id=dept.get("department_id"), name=dept["name"], code=dept.get("code"), parent_id=dept.get("parent_id"), leader_employee_id=dept.get("leader_employee_id"), sort_order=dept.get("sort_order", 0), status=dept["status"], children=[], ) for dept in departments: dept_id = dept.get("department_id") if dept_id and dept_id in department_map: parent_id = dept.get("parent_id") if parent_id and parent_id in department_map: department_map[parent_id].children.append( department_map[dept_id] ) else: roots.append(department_map[dept_id]) return roots @classmethod async def children_service( cls, auth: AuthSchema, department_id: str ) -> list[DepartmentOutSchema]: """获取子部门列表""" crud = DepartmentCRUD(auth) children = await crud.get_children(department_id) return [DepartmentOutSchema.model_validate(child) for child in children] @classmethod async def move_service( cls, auth: AuthSchema, department_id: str, parent_id: str | None ) -> DepartmentOutSchema: """移动部门(变更父级)""" if parent_id == department_id: raise CustomException(msg="不能将自己设为父部门") if parent_id: crud = DepartmentCRUD(auth) parent = await crud.get_by_department_id(parent_id) if not parent: raise CustomException(msg="目标父部门不存在") descendants = await cls._get_all_descendants(department_id) if parent_id in descendants: raise CustomException(msg="不能移动到子部门下方") return await cls.update_service( auth, department_id, DepartmentUpdateSchema(parent_id=parent_id) ) @classmethod async def _get_all_descendants( cls, department_id: str, crud: DepartmentCRUD | None = None ) -> set[str]: """获取部门所有后代ID""" if crud is None: from app.api.v1.module_system.auth.schema import AuthSchema from app.core.request import get_current_auth auth = await get_current_auth() crud = DepartmentCRUD(auth) descendants = set() children = await crud.get_children(department_id) for child in children: if child.department_id: descendants.add(child.department_id) child_descendants = await cls._get_all_descendants( child.department_id, crud ) descendants.update(child_descendants) return descendants @classmethod async def options_service(cls, auth: AuthSchema) -> list[dict]: """获取部门选择选项""" crud = DepartmentCRUD(auth) search = {} if hasattr(auth, "enterprise_id") and auth.enterprise_id: search["enterprise_id"] = auth.enterprise_id result = await crud.page( offset=0, limit=1000, order_by=[{"sort_order": "desc"}, {"id": "desc"}], search=search, out_schema=DepartmentOutSchema, ) items = result.get("items", []) return [ { "value": item.department_id, "label": item.name, "disabled": False } for item in items if item.department_id ]