| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- from typing import Optional
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.base_crud import CRUDBase
- from app.core.exceptions import CustomException
- from .model import EmployeeModel
- from .schema import EmployeeCreateOrUpdateSchema
- class EmployeeCRUD(CRUDBase[EmployeeModel, EmployeeCreateOrUpdateSchema, EmployeeCreateOrUpdateSchema]):
- """员工 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=EmployeeModel, auth=auth)
- async def get_by_employee_id(
- self, employee_id: str, enterprise_id: str | None = None
- ) -> EmployeeModel | None:
- """根据员工ID查询员工(业务主键)"""
- return await self.get(employee_id=employee_id, enterprise_id=enterprise_id)
- async def update_by(
- self,
- employee_mobile: Optional[str] = None,
- employee_email: Optional[str] = None,
- identity_open_id: Optional[str] = None,
- enterprise_id: Optional[str] = None,
- data: dict = {},
- ) -> EmployeeModel | None:
- """根据员工手机号或邮箱更新员工(业务主键),必须传入 enterprise_id 防止跨企业覆盖"""
- filters = {}
- if employee_mobile:
- filters["employee_mobile"] = employee_mobile
- if employee_email:
- filters["employee_email"] = employee_email
- if identity_open_id:
- filters["identity_open_id"] = identity_open_id
- if enterprise_id:
- filters["enterprise_id"] = enterprise_id
- obj = await self.get(preload=[], **filters)
- if not obj:
- raise CustomException(msg="更新失败!对象不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- # verify_obj = await self.get(employee_id=obj.employee_id, enterprise_id=obj.enterprise_id, preload=[])
- # if not verify_obj:
- # raise CustomException(msg="更新失败!对象不存在或无权限访问")
- return obj
- async def delete_by_employee_id(
- self, employee_id: str, *, enterprise_id: str | None = None
- ) -> None:
- """根据员工ID将员工状态更新为解约(业务主键)"""
- from .enums import EmployeeStatusEnum
- obj = await self.get(employee_id=employee_id, enterprise_id=enterprise_id, preload=[])
- if not obj:
- return # 员工不存在,直接返回
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- # 直接删除
- await self.auth.db.delete(obj)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
|