from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_crud import CRUDBase from app.core.exceptions import CustomException from .model import EmployeeModel from .schema import EmployeeCreateSchema, EmployeeOutSchema class EmployeeCRUD(CRUDBase[EmployeeModel, EmployeeCreateSchema, EmployeeCreateSchema]): """员工 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=EmployeeModel, auth=auth) async def get_by_employee_id( self, employee_id: str ) -> EmployeeModel | None: """根据员工ID查询员工(业务主键)""" return await self.get(employee_id=employee_id) async def update_by_employee_id( self, employee_id: str, data: dict ) -> EmployeeModel | None: """根据员工ID更新员工(业务主键)""" obj = await self.get(employee_id=employee_id, preload=[]) if not obj: raise CustomException(msg="更新失败!对象不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) verify_obj = await self.get(employee_id=employee_id, preload=[]) if not verify_obj: raise CustomException(msg="更新失败!对象不存在或无权限访问") return obj async def delete_by_employee_id(self, employee_id: str) -> None: """根据员工ID将员工状态更新为解约(业务主键)""" from .enums import EmployeeStatusEnum obj = await self.get(employee_id=employee_id, preload=[]) if not obj: return # 员工不存在,直接返回 # 更新状态为解约 obj.status = EmployeeStatusEnum.EMPLOYEE_UNSIGN.value if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) await self.auth.db.flush() await self.auth.db.refresh(obj)