| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- from collections.abc import Sequence
- from typing import Any
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.base_crud import CRUDBase
- from .model import WorkflowModel
- from .schema import WorkflowCreateSchema, WorkflowUpdateSchema
- class WorkflowCRUD(CRUDBase[WorkflowModel, WorkflowCreateSchema, WorkflowUpdateSchema]):
- """工作流数据层"""
- def __init__(self, auth: AuthSchema) -> None:
- """
- 初始化工作流 CRUD。
- 参数:
- - auth (AuthSchema): 认证信息。
- 返回:
- - None
- """
- self.auth = auth
- super().__init__(model=WorkflowModel, auth=auth)
- async def get_obj_by_id_crud(
- self, id: int, preload: list[str | Any] | None = None
- ) -> WorkflowModel | None:
- """
- 按主键查询工作流。
- 参数:
- - id (int): 工作流 ID。
- - preload (list[str | Any] | None): 预加载关系。
- 返回:
- - WorkflowModel | None: 实体或 None。
- """
- return await self.get(id=id, preload=preload)
- async def get_obj_list_crud(
- self,
- search: dict | None = None,
- order_by: list[dict[str, str]] | None = None,
- preload: list[str | Any] | None = None,
- ) -> Sequence[WorkflowModel]:
- """
- 条件列表查询工作流。
- 参数:
- - search (dict | None): 查询条件。
- - order_by (list[dict[str, str]] | None): 排序。
- - preload (list[str | Any] | None): 预加载关系。
- 返回:
- - Sequence[WorkflowModel]: 工作流列表。
- """
- return await self.list(search=search, order_by=order_by, preload=preload)
- async def create_obj_crud(self, data: WorkflowCreateSchema) -> WorkflowModel | None:
- """
- 创建工作流。
- 参数:
- - data (WorkflowCreateSchema): 创建模型。
- 返回:
- - WorkflowModel | None: 新建实体或 None。
- """
- return await self.create(data=data)
- async def update_obj_crud(self, id: int, data: WorkflowUpdateSchema) -> WorkflowModel | None:
- """
- 更新工作流。
- 参数:
- - id (int): 工作流 ID。
- - data (WorkflowUpdateSchema): 更新模型。
- 返回:
- - WorkflowModel | None: 更新后实体或 None。
- """
- return await self.update(id=id, data=data)
- async def delete_obj_crud(self, ids: list[int]) -> None:
- """
- 批量删除工作流。
- 参数:
- - ids (list[int]): ID 列表。
- 返回:
- - None
- """
- await self.delete(ids=ids)
|