service.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. from app.api.v1.module_system.auth.schema import AuthSchema
  2. from app.core.ap_scheduler import SchedulerUtil
  3. from app.core.exceptions import CustomException
  4. from .crud import JobCRUD
  5. from .schema import JobCreateSchema, JobOutSchema, JobQueryParam, JobUpdateSchema
  6. class JobService:
  7. """
  8. 调度器监控模块服务层
  9. 职责:
  10. 1. 执行日志的 CRUD 操作
  11. 2. 调度器状态和任务列表的获取
  12. 3. 任务操作(暂停、恢复、执行、移除)
  13. """
  14. @classmethod
  15. async def get_job_log_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  16. """
  17. 获取执行日志详情
  18. 参数:
  19. - auth (AuthSchema): 认证信息模型
  20. - id (int): 日志ID
  21. 返回:
  22. - Dict: 执行日志详情字典
  23. """
  24. obj = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  25. if not obj:
  26. raise CustomException(msg="执行日志不存在")
  27. return JobOutSchema.model_validate(obj).model_dump()
  28. @classmethod
  29. async def get_job_log_list_service(
  30. cls,
  31. auth: AuthSchema,
  32. search: JobQueryParam | None = None,
  33. order_by: list[dict[str, str]] | None = None,
  34. ) -> list[dict]:
  35. """
  36. 获取执行日志列表
  37. 参数:
  38. - auth (AuthSchema): 认证信息模型
  39. - search (JobQueryParam | None): 查询参数模型
  40. - order_by (list[dict[str, str]] | None): 排序参数列表
  41. 返回:
  42. - List[Dict]: 执行日志详情字典列表
  43. """
  44. if order_by is None:
  45. order_by = [{"created_time": "desc"}]
  46. obj_list = await JobCRUD(auth).get_obj_list_crud(
  47. search=search.__dict__ if search else None, order_by=order_by
  48. )
  49. return [JobOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  50. @classmethod
  51. async def get_job_log_page_service(
  52. cls,
  53. auth: AuthSchema,
  54. page_no: int,
  55. page_size: int,
  56. search: JobQueryParam | None = None,
  57. order_by: list[dict[str, str]] | None = None,
  58. ) -> dict:
  59. """
  60. 分页查询执行日志(数据库 OFFSET/LIMIT)。
  61. 参数:
  62. - auth (AuthSchema): 认证信息。
  63. - page_no (int): 页码。
  64. - page_size (int): 每页条数。
  65. - search (JobQueryParam | None): 查询条件。
  66. - order_by (list[dict[str, str]] | None): 排序。
  67. 返回:
  68. - dict: 分页结果。
  69. """
  70. offset = (page_no - 1) * page_size
  71. ob = order_by or [{"created_time": "desc"}]
  72. return await JobCRUD(auth).page(
  73. offset=offset,
  74. limit=page_size,
  75. order_by=ob,
  76. search=search.__dict__ if search else {},
  77. out_schema=JobOutSchema,
  78. )
  79. @classmethod
  80. async def create_job_log_service(
  81. cls,
  82. auth: AuthSchema,
  83. job_id: str,
  84. job_name: str | None = None,
  85. trigger_type: str | None = None,
  86. ) -> dict:
  87. """
  88. 创建执行日志
  89. 参数:
  90. - auth (AuthSchema): 认证信息模型
  91. - job_id (str): 任务ID
  92. - job_name (str | None): 任务名称
  93. - trigger_type (str | None): 触发方式
  94. 返回:
  95. - Dict: 执行日志详情字典
  96. """
  97. data = JobCreateSchema(
  98. job_id=job_id,
  99. job_name=job_name,
  100. trigger_type=trigger_type,
  101. status="running",
  102. )
  103. obj = await JobCRUD(auth).create_obj_crud(data=data)
  104. if not obj:
  105. raise CustomException(msg="创建执行日志失败")
  106. return JobOutSchema.model_validate(obj).model_dump()
  107. @classmethod
  108. async def update_job_log_service(
  109. cls,
  110. auth: AuthSchema,
  111. id: int,
  112. status: str,
  113. result: str | None = None,
  114. error: str | None = None,
  115. ) -> dict:
  116. """
  117. 更新执行日志
  118. 参数:
  119. - auth (AuthSchema): 认证信息模型
  120. - id (int): 日志ID
  121. - status (str): 执行状态
  122. - result (str | None): 执行结果
  123. - error (str | None): 错误信息
  124. 返回:
  125. - Dict: 执行日志详情字典
  126. """
  127. data = JobUpdateSchema(
  128. status=status,
  129. result=result,
  130. error=error,
  131. )
  132. obj = await JobCRUD(auth).update_obj_crud(id=id, data=data)
  133. if not obj:
  134. raise CustomException(msg="更新执行日志失败")
  135. return JobOutSchema.model_validate(obj).model_dump()
  136. @classmethod
  137. async def delete_job_log_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  138. """
  139. 删除执行日志
  140. 参数:
  141. - auth (AuthSchema): 认证信息模型
  142. - ids (list[int]): 日志ID列表
  143. 返回:
  144. - None
  145. """
  146. if len(ids) < 1:
  147. raise CustomException(msg="删除失败,删除对象不能为空")
  148. await JobCRUD(auth).delete_obj_crud(ids=ids)
  149. @classmethod
  150. async def clear_job_log_service(cls, auth: AuthSchema) -> None:
  151. """
  152. 清空所有执行日志
  153. 参数:
  154. - auth (AuthSchema): 认证信息模型
  155. 返回:
  156. - None
  157. """
  158. await JobCRUD(auth).clear_obj_crud()
  159. @classmethod
  160. def get_scheduler_status_service(cls) -> dict:
  161. """
  162. 获取调度器状态
  163. 返回:
  164. - Dict: 调度器状态信息
  165. """
  166. status = SchedulerUtil.get_scheduler_state()
  167. is_running = SchedulerUtil.is_running()
  168. jobs = SchedulerUtil.get_all_jobs()
  169. return {
  170. "status": status,
  171. "is_running": is_running,
  172. "job_count": len(jobs),
  173. }
  174. @classmethod
  175. def get_scheduler_jobs_service(cls) -> list[dict]:
  176. """
  177. 获取调度器中的任务列表
  178. 返回:
  179. - List[Dict]: 任务列表
  180. """
  181. jobs = SchedulerUtil.get_all_jobs()
  182. return [
  183. {
  184. "id": job.id,
  185. "name": job.name,
  186. "trigger": str(job.trigger),
  187. "next_run_time": str(job.next_run_time) if job.next_run_time else None,
  188. "status": SchedulerUtil.get_job_status(job_id=job.id),
  189. }
  190. for job in jobs
  191. ]