crud.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. from collections.abc import Sequence
  2. from typing import Any
  3. from app.api.v1.module_system.auth.schema import AuthSchema
  4. from app.core.base_crud import CRUDBase
  5. from .model import WorkflowModel
  6. from .schema import WorkflowCreateSchema, WorkflowUpdateSchema
  7. class WorkflowCRUD(CRUDBase[WorkflowModel, WorkflowCreateSchema, WorkflowUpdateSchema]):
  8. """工作流数据层"""
  9. def __init__(self, auth: AuthSchema) -> None:
  10. """
  11. 初始化工作流 CRUD。
  12. 参数:
  13. - auth (AuthSchema): 认证信息。
  14. 返回:
  15. - None
  16. """
  17. self.auth = auth
  18. super().__init__(model=WorkflowModel, auth=auth)
  19. async def get_obj_by_id_crud(
  20. self, id: int, preload: list[str | Any] | None = None
  21. ) -> WorkflowModel | None:
  22. """
  23. 按主键查询工作流。
  24. 参数:
  25. - id (int): 工作流 ID。
  26. - preload (list[str | Any] | None): 预加载关系。
  27. 返回:
  28. - WorkflowModel | None: 实体或 None。
  29. """
  30. return await self.get(id=id, preload=preload)
  31. async def get_obj_list_crud(
  32. self,
  33. search: dict | None = None,
  34. order_by: list[dict[str, str]] | None = None,
  35. preload: list[str | Any] | None = None,
  36. ) -> Sequence[WorkflowModel]:
  37. """
  38. 条件列表查询工作流。
  39. 参数:
  40. - search (dict | None): 查询条件。
  41. - order_by (list[dict[str, str]] | None): 排序。
  42. - preload (list[str | Any] | None): 预加载关系。
  43. 返回:
  44. - Sequence[WorkflowModel]: 工作流列表。
  45. """
  46. return await self.list(search=search, order_by=order_by, preload=preload)
  47. async def create_obj_crud(self, data: WorkflowCreateSchema) -> WorkflowModel | None:
  48. """
  49. 创建工作流。
  50. 参数:
  51. - data (WorkflowCreateSchema): 创建模型。
  52. 返回:
  53. - WorkflowModel | None: 新建实体或 None。
  54. """
  55. return await self.create(data=data)
  56. async def update_obj_crud(self, id: int, data: WorkflowUpdateSchema) -> WorkflowModel | None:
  57. """
  58. 更新工作流。
  59. 参数:
  60. - id (int): 工作流 ID。
  61. - data (WorkflowUpdateSchema): 更新模型。
  62. 返回:
  63. - WorkflowModel | None: 更新后实体或 None。
  64. """
  65. return await self.update(id=id, data=data)
  66. async def delete_obj_crud(self, ids: list[int]) -> None:
  67. """
  68. 批量删除工作流。
  69. 参数:
  70. - ids (list[int]): ID 列表。
  71. 返回:
  72. - None
  73. """
  74. await self.delete(ids=ids)