crud.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from collections.abc import Sequence
  2. from typing import Any
  3. from app.api.v1.module_system.auth.schema import AuthSchema
  4. from app.core.base_crud import CRUDBase
  5. from .model import JobModel
  6. from .schema import JobCreateSchema, JobUpdateSchema
  7. class JobCRUD(CRUDBase[JobModel, JobCreateSchema, JobUpdateSchema]):
  8. """任务执行日志数据层"""
  9. def __init__(self, auth: AuthSchema) -> None:
  10. """
  11. 初始化任务执行日志CRUD
  12. 参数:
  13. - auth (AuthSchema): 认证信息模型
  14. """
  15. self.auth = auth
  16. super().__init__(model=JobModel, auth=auth)
  17. async def get_obj_by_id_crud(
  18. self, id: int, preload: list[str | Any] | None = None
  19. ) -> JobModel | None:
  20. """
  21. 获取执行日志详情
  22. 参数:
  23. - id (int): 日志ID
  24. - preload (list[str | Any] | None): 预加载关系,未提供时使用模型默认项
  25. 返回:
  26. - JobModel | None: 执行日志模型,如果不存在则为None
  27. """
  28. return await self.get(id=id, preload=preload)
  29. async def get_obj_list_crud(
  30. self,
  31. search: dict | None = None,
  32. order_by: list[dict[str, str]] | None = None,
  33. preload: list[str | Any] | None = None,
  34. ) -> Sequence[JobModel]:
  35. """
  36. 获取执行日志列表
  37. 参数:
  38. - search (dict | None): 查询参数字典
  39. - order_by (list[dict[str, str]] | None): 排序参数列表
  40. - preload (list[str | Any] | None): 预加载关系,未提供时使用模型默认项
  41. 返回:
  42. - Sequence[JobModel]: 执行日志模型序列
  43. """
  44. return await self.list(search=search, order_by=order_by, preload=preload)
  45. async def create_obj_crud(self, data: JobCreateSchema) -> JobModel | None:
  46. """
  47. 创建执行日志
  48. 参数:
  49. - data (JobCreateSchema): 创建执行日志模型
  50. 返回:
  51. - JobModel | None: 创建的执行日志模型,如果创建失败则为None
  52. """
  53. return await self.create(data=data)
  54. async def update_obj_crud(self, id: int, data: JobUpdateSchema) -> JobModel | None:
  55. """
  56. 更新执行日志
  57. 参数:
  58. - id (int): 日志ID
  59. - data (JobUpdateSchema): 更新执行日志模型
  60. 返回:
  61. - JobModel | None: 更新后的执行日志模型,如果更新失败则为None
  62. """
  63. return await self.update(id=id, data=data)
  64. async def delete_obj_crud(self, ids: list[int]) -> None:
  65. """
  66. 删除执行日志
  67. 参数:
  68. - ids (list[int]): 日志ID列表
  69. 返回:
  70. - None
  71. """
  72. return await self.delete(ids=ids)
  73. async def clear_obj_crud(self) -> None:
  74. """
  75. 清空所有执行日志。
  76. 返回:
  77. - None
  78. """
  79. return await self.clear()