service.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. from apscheduler.jobstores.base import JobLookupError
  2. from app.api.v1.module_system.auth.schema import AuthSchema
  3. from app.core.ap_scheduler import SchedulerUtil
  4. from app.core.exceptions import CustomException
  5. from app.utils.cron_util import CronUtil
  6. from .crud import NodeCRUD
  7. from .schema import (
  8. NodeCreateSchema,
  9. NodeExecuteSchema,
  10. NodeOutSchema,
  11. NodeQueryParam,
  12. NodeUpdateSchema,
  13. )
  14. class NodeService:
  15. """
  16. 节点管理模块服务层
  17. 设计原则:
  18. 1. 节点CRUD只操作数据库,不直接操作调度器
  19. 2. 调度器节点通过"执行"操作来创建和管理
  20. 3. 支持预设函数和自定义代码块两种执行方式
  21. """
  22. @classmethod
  23. async def get_node_options_service(cls, auth: AuthSchema) -> list[dict]:
  24. """
  25. 获取定时任务节点(task_node)列表,供调度/调试使用。
  26. 工作流画布请使用 ``app.plugin.module_task.workflow.node_type`` 模块下接口(/task/workflow/node-type/options)。
  27. 参数:
  28. - auth (AuthSchema): 认证信息模型
  29. 返回:
  30. - list[dict]: 节点类型选项列表
  31. """
  32. obj_list = await NodeCRUD(auth).get_obj_list_crud()
  33. return [
  34. {
  35. "id": obj.id,
  36. "name": obj.name,
  37. "code": obj.code,
  38. "func": obj.func,
  39. "args": obj.args,
  40. "kwargs": obj.kwargs,
  41. }
  42. for obj in obj_list
  43. ]
  44. @classmethod
  45. async def get_node_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  46. """
  47. 获取节点详情
  48. 参数:
  49. - auth (AuthSchema): 认证信息模型
  50. - id (int): 节点ID
  51. 返回:
  52. - Dict: 节点详情字典
  53. """
  54. obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
  55. return NodeOutSchema.model_validate(obj).model_dump()
  56. @classmethod
  57. async def get_node_list_service(
  58. cls,
  59. auth: AuthSchema,
  60. search: NodeQueryParam | None = None,
  61. order_by: list[dict[str, str]] | None = None,
  62. ) -> list[dict]:
  63. """
  64. 获取节点列表
  65. 参数:
  66. - auth (AuthSchema): 认证信息模型
  67. - search (NodeQueryParam | None): 查询参数模型
  68. - order_by (list[dict[str, str]] | None): 排序参数列表
  69. 返回:
  70. - List[Dict]: 节点详情字典列表
  71. """
  72. obj_list = await NodeCRUD(auth).get_obj_list_crud(search=search.__dict__, order_by=order_by)
  73. return [NodeOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  74. @classmethod
  75. async def get_node_page_service(
  76. cls,
  77. auth: AuthSchema,
  78. page_no: int,
  79. page_size: int,
  80. search: NodeQueryParam | None = None,
  81. order_by: list[dict[str, str]] | None = None,
  82. ) -> dict:
  83. """
  84. 分页查询定时任务节点(数据库 OFFSET/LIMIT)。
  85. 参数:
  86. - auth (AuthSchema): 认证信息。
  87. - page_no (int): 页码。
  88. - page_size (int): 每页条数。
  89. - search (NodeQueryParam | None): 查询条件。
  90. - order_by (list[dict[str, str]] | None): 排序。
  91. 返回:
  92. - dict: 分页结果。
  93. """
  94. offset = (page_no - 1) * page_size
  95. return await NodeCRUD(auth).page(
  96. offset=offset,
  97. limit=page_size,
  98. order_by=order_by or [{"id": "asc"}],
  99. search=search.__dict__ if search else {},
  100. out_schema=NodeOutSchema,
  101. )
  102. @classmethod
  103. async def create_node_service(cls, auth: AuthSchema, data: NodeCreateSchema) -> dict:
  104. """
  105. 创建节点 - 只保存到数据库,不创建调度器任务
  106. 参数:
  107. - auth (AuthSchema): 认证信息模型
  108. - data (NodeCreateSchema): 节点创建模型
  109. 返回:
  110. - Dict: 节点详情字典
  111. """
  112. exist_obj = await NodeCRUD(auth).get(name=data.name)
  113. if exist_obj:
  114. raise CustomException(msg="创建失败,该节点已存在")
  115. obj = await NodeCRUD(auth).create_obj_crud(data=data)
  116. if not obj:
  117. raise CustomException(msg="创建失败")
  118. return NodeOutSchema.model_validate(obj).model_dump()
  119. @classmethod
  120. async def update_node_service(cls, auth: AuthSchema, id: int, data: NodeUpdateSchema) -> dict:
  121. """
  122. 更新节点 - 只更新数据库,不修改调度器任务
  123. 参数:
  124. - auth (AuthSchema): 认证信息模型
  125. - id (int): 节点ID
  126. - data (NodeUpdateSchema): 节点更新模型
  127. 返回:
  128. - dict: 节点详情字典
  129. """
  130. exist_obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
  131. if not exist_obj:
  132. raise CustomException(msg="更新失败,该节点不存在")
  133. obj = await NodeCRUD(auth).update_obj_crud(id=id, data=data)
  134. if not obj:
  135. raise CustomException(msg="更新失败")
  136. return NodeOutSchema.model_validate(obj).model_dump()
  137. @classmethod
  138. async def delete_node_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  139. """
  140. 删除节点 - 只删除数据库记录,同时移除调度器中的任务
  141. 参数:
  142. - auth (AuthSchema): 认证信息模型
  143. - ids (list[int]): 节点ID列表
  144. 返回:
  145. - None
  146. """
  147. if len(ids) < 1:
  148. raise CustomException(msg="删除失败,删除对象不能为空")
  149. for id in ids:
  150. exist_obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
  151. if not exist_obj:
  152. raise CustomException(msg="删除失败,该节点不存在")
  153. try:
  154. SchedulerUtil.remove_job(job_id=id)
  155. except JobLookupError:
  156. # 作业不存在,忽略异常,继续删除数据库记录
  157. pass
  158. await NodeCRUD(auth).delete_obj_crud(ids=ids)
  159. @classmethod
  160. async def clear_node_service(cls, auth: AuthSchema) -> None:
  161. """
  162. 清空所有节点
  163. 参数:
  164. - auth (AuthSchema): 认证信息模型
  165. 返回:
  166. - None
  167. """
  168. SchedulerUtil.clear_jobs()
  169. await NodeCRUD(auth).clear_obj_crud()
  170. @classmethod
  171. async def execute_node_service(cls, auth: AuthSchema, id: int, execute_data: NodeExecuteSchema) -> dict:
  172. """
  173. 调试节点 - 根据任务配置创建调度器任务并执行
  174. 参数:
  175. - auth (AuthSchema): 认证信息模型
  176. - id (int): 节点ID
  177. - execute_data (NodeExecuteSchema): 执行参数模型
  178. 返回:
  179. - dict: 调试结果
  180. """
  181. obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
  182. if not obj:
  183. raise CustomException(msg="调试失败,该节点不存在")
  184. trigger = execute_data.trigger
  185. trigger_args = execute_data.trigger_args
  186. start_date = execute_data.start_date
  187. end_date = execute_data.end_date
  188. if trigger == "now":
  189. SchedulerUtil.add_and_run_job_now(job_info=obj)
  190. elif trigger == "cron":
  191. if not trigger_args:
  192. raise CustomException(msg="Cron执行需要提供Cron表达式")
  193. if not CronUtil.validate_cron_expression(trigger_args):
  194. raise CustomException(msg=f"Cron表达式不正确: {trigger_args}")
  195. SchedulerUtil.add_cron_job(
  196. job_info=obj,
  197. trigger_args=trigger_args,
  198. start_date=start_date,
  199. end_date=end_date,
  200. )
  201. elif trigger == "interval":
  202. if not trigger_args:
  203. raise CustomException(msg="间隔执行需要提供间隔参数")
  204. SchedulerUtil.add_interval_job(
  205. job_info=obj,
  206. trigger_args=trigger_args,
  207. start_date=start_date,
  208. end_date=end_date,
  209. )
  210. elif trigger == "date":
  211. if not trigger_args:
  212. raise CustomException(msg="指定时间执行需要提供执行时间")
  213. SchedulerUtil.add_date_job(job_info=obj, run_date=trigger_args)
  214. else:
  215. raise CustomException(msg=f"不支持的触发方式: {trigger}")
  216. return {"job_id": id, "status": "executed", "trigger": trigger}