crud.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from typing import Optional
  2. from app.api.v1.module_system.auth.schema import AuthSchema
  3. from app.core.base_crud import CRUDBase
  4. from app.core.exceptions import CustomException
  5. from .model import EmployeeModel
  6. from .schema import EmployeeCreateOrUpdateSchema
  7. class EmployeeCRUD(CRUDBase[EmployeeModel, EmployeeCreateOrUpdateSchema, EmployeeCreateOrUpdateSchema]):
  8. """员工 CRUD 操作"""
  9. def __init__(self, auth: AuthSchema) -> None:
  10. self.auth = auth
  11. super().__init__(model=EmployeeModel, auth=auth)
  12. async def get_by_employee_id(
  13. self, employee_id: str, enterprise_id: str | None = None
  14. ) -> EmployeeModel | None:
  15. """根据员工ID查询员工(业务主键)"""
  16. return await self.get(employee_id=employee_id, enterprise_id=enterprise_id)
  17. async def update_by(
  18. self,
  19. employee_mobile: Optional[str] = None,
  20. employee_email: Optional[str] = None,
  21. identity_open_id: Optional[str] = None,
  22. enterprise_id: Optional[str] = None,
  23. data: dict = {},
  24. ) -> EmployeeModel | None:
  25. """根据员工手机号或邮箱更新员工(业务主键),必须传入 enterprise_id 防止跨企业覆盖"""
  26. filters = {}
  27. if employee_mobile:
  28. filters["employee_mobile"] = employee_mobile
  29. if employee_email:
  30. filters["employee_email"] = employee_email
  31. if identity_open_id:
  32. filters["identity_open_id"] = identity_open_id
  33. if enterprise_id:
  34. filters["enterprise_id"] = enterprise_id
  35. obj = await self.get(preload=[], **filters)
  36. if not obj:
  37. raise CustomException(msg="更新失败!对象不存在")
  38. if self.auth.user and hasattr(obj, "updated_id"):
  39. setattr(obj, "updated_id", self.auth.user.id)
  40. for key, value in data.items():
  41. if hasattr(obj, key):
  42. setattr(obj, key, value)
  43. await self.auth.db.flush()
  44. await self.auth.db.refresh(obj)
  45. # verify_obj = await self.get(employee_id=obj.employee_id, enterprise_id=obj.enterprise_id, preload=[])
  46. # if not verify_obj:
  47. # raise CustomException(msg="更新失败!对象不存在或无权限访问")
  48. return obj
  49. async def delete_by_employee_id(
  50. self, employee_id: str, *, enterprise_id: str | None = None
  51. ) -> None:
  52. """根据员工ID将员工状态更新为解约(业务主键)"""
  53. from .enums import EmployeeStatusEnum
  54. obj = await self.get(employee_id=employee_id, enterprise_id=enterprise_id, preload=[])
  55. if not obj:
  56. return # 员工不存在,直接返回
  57. if self.auth.user and hasattr(obj, "updated_id"):
  58. setattr(obj, "updated_id", self.auth.user.id)
  59. # 直接删除
  60. await self.auth.db.delete(obj)
  61. await self.auth.db.flush()
  62. await self.auth.db.refresh(obj)