from typing import Optional from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_crud import CRUDBase from app.core.exceptions import CustomException from .model import EmployeeModel from .schema import EmployeeCreateOrUpdateSchema class EmployeeCRUD(CRUDBase[EmployeeModel, EmployeeCreateOrUpdateSchema, EmployeeCreateOrUpdateSchema]): """员工 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=EmployeeModel, auth=auth) async def get_by_employee_id( self, employee_id: str, enterprise_id: str | None = None ) -> EmployeeModel | None: """根据员工ID查询员工(业务主键)""" return await self.get(employee_id=employee_id, enterprise_id=enterprise_id) async def update_by( self, employee_mobile: Optional[str] = None, employee_email: Optional[str] = None, identity_open_id: Optional[str] = None, enterprise_id: Optional[str] = None, data: dict = {}, ) -> EmployeeModel | None: """根据员工手机号或邮箱更新员工(业务主键),必须传入 enterprise_id 防止跨企业覆盖""" filters = {} if employee_mobile: filters["employee_mobile"] = employee_mobile if employee_email: filters["employee_email"] = employee_email if identity_open_id: filters["identity_open_id"] = identity_open_id if enterprise_id: filters["enterprise_id"] = enterprise_id obj = await self.get(preload=[], **filters) if not obj: raise CustomException(msg="更新失败!对象不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) # verify_obj = await self.get(employee_id=obj.employee_id, enterprise_id=obj.enterprise_id, preload=[]) # if not verify_obj: # raise CustomException(msg="更新失败!对象不存在或无权限访问") return obj async def delete_by_employee_id( self, employee_id: str, *, enterprise_id: str | None = None ) -> None: """根据员工ID将员工状态更新为解约(业务主键)""" from .enums import EmployeeStatusEnum obj = await self.get(employee_id=employee_id, enterprise_id=enterprise_id, preload=[]) if not obj: return # 员工不存在,直接返回 if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) # 直接删除 await self.auth.db.delete(obj) await self.auth.db.flush() await self.auth.db.refresh(obj)