| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- from apscheduler.jobstores.base import JobLookupError
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.ap_scheduler import SchedulerUtil
- from app.core.exceptions import CustomException
- from app.utils.cron_util import CronUtil
- from .crud import NodeCRUD
- from .schema import (
- NodeCreateSchema,
- NodeExecuteSchema,
- NodeOutSchema,
- NodeQueryParam,
- NodeUpdateSchema,
- )
- class NodeService:
- """
- 节点管理模块服务层
- 设计原则:
- 1. 节点CRUD只操作数据库,不直接操作调度器
- 2. 调度器节点通过"执行"操作来创建和管理
- 3. 支持预设函数和自定义代码块两种执行方式
- """
- @classmethod
- async def get_node_options_service(cls, auth: AuthSchema) -> list[dict]:
- """
- 获取定时任务节点(task_node)列表,供调度/调试使用。
- 工作流画布请使用 ``app.plugin.module_task.workflow.node_type`` 模块下接口(/task/workflow/node-type/options)。
- 参数:
- - auth (AuthSchema): 认证信息模型
- 返回:
- - list[dict]: 节点类型选项列表
- """
- obj_list = await NodeCRUD(auth).get_obj_list_crud()
- return [
- {
- "id": obj.id,
- "name": obj.name,
- "code": obj.code,
- "func": obj.func,
- "args": obj.args,
- "kwargs": obj.kwargs,
- }
- for obj in obj_list
- ]
- @classmethod
- async def get_node_detail_service(cls, auth: AuthSchema, id: int) -> dict:
- """
- 获取节点详情
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 节点ID
- 返回:
- - Dict: 节点详情字典
- """
- obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
- return NodeOutSchema.model_validate(obj).model_dump()
- @classmethod
- async def get_node_list_service(
- cls,
- auth: AuthSchema,
- search: NodeQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> list[dict]:
- """
- 获取节点列表
- 参数:
- - auth (AuthSchema): 认证信息模型
- - search (NodeQueryParam | None): 查询参数模型
- - order_by (list[dict[str, str]] | None): 排序参数列表
- 返回:
- - List[Dict]: 节点详情字典列表
- """
- obj_list = await NodeCRUD(auth).get_obj_list_crud(search=search.__dict__, order_by=order_by)
- return [NodeOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- @classmethod
- async def get_node_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: NodeQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> dict:
- """
- 分页查询定时任务节点(数据库 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - page_no (int): 页码。
- - page_size (int): 每页条数。
- - search (NodeQueryParam | None): 查询条件。
- - order_by (list[dict[str, str]] | None): 排序。
- 返回:
- - dict: 分页结果。
- """
- offset = (page_no - 1) * page_size
- return await NodeCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=order_by or [{"id": "asc"}],
- search=search.__dict__ if search else {},
- out_schema=NodeOutSchema,
- )
- @classmethod
- async def create_node_service(cls, auth: AuthSchema, data: NodeCreateSchema) -> dict:
- """
- 创建节点 - 只保存到数据库,不创建调度器任务
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (NodeCreateSchema): 节点创建模型
- 返回:
- - Dict: 节点详情字典
- """
- exist_obj = await NodeCRUD(auth).get(name=data.name)
- if exist_obj:
- raise CustomException(msg="创建失败,该节点已存在")
- obj = await NodeCRUD(auth).create_obj_crud(data=data)
- if not obj:
- raise CustomException(msg="创建失败")
- return NodeOutSchema.model_validate(obj).model_dump()
- @classmethod
- async def update_node_service(cls, auth: AuthSchema, id: int, data: NodeUpdateSchema) -> dict:
- """
- 更新节点 - 只更新数据库,不修改调度器任务
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 节点ID
- - data (NodeUpdateSchema): 节点更新模型
- 返回:
- - dict: 节点详情字典
- """
- exist_obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist_obj:
- raise CustomException(msg="更新失败,该节点不存在")
- obj = await NodeCRUD(auth).update_obj_crud(id=id, data=data)
- if not obj:
- raise CustomException(msg="更新失败")
- return NodeOutSchema.model_validate(obj).model_dump()
- @classmethod
- async def delete_node_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """
- 删除节点 - 只删除数据库记录,同时移除调度器中的任务
- 参数:
- - auth (AuthSchema): 认证信息模型
- - ids (list[int]): 节点ID列表
- 返回:
- - None
- """
- if len(ids) < 1:
- raise CustomException(msg="删除失败,删除对象不能为空")
- for id in ids:
- exist_obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist_obj:
- raise CustomException(msg="删除失败,该节点不存在")
- try:
- SchedulerUtil.remove_job(job_id=id)
- except JobLookupError:
- # 作业不存在,忽略异常,继续删除数据库记录
- pass
- await NodeCRUD(auth).delete_obj_crud(ids=ids)
- @classmethod
- async def clear_node_service(cls, auth: AuthSchema) -> None:
- """
- 清空所有节点
- 参数:
- - auth (AuthSchema): 认证信息模型
- 返回:
- - None
- """
- SchedulerUtil.clear_jobs()
- await NodeCRUD(auth).clear_obj_crud()
- @classmethod
- async def execute_node_service(cls, auth: AuthSchema, id: int, execute_data: NodeExecuteSchema) -> dict:
- """
- 调试节点 - 根据任务配置创建调度器任务并执行
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 节点ID
- - execute_data (NodeExecuteSchema): 执行参数模型
- 返回:
- - dict: 调试结果
- """
- obj = await NodeCRUD(auth).get_obj_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg="调试失败,该节点不存在")
- trigger = execute_data.trigger
- trigger_args = execute_data.trigger_args
- start_date = execute_data.start_date
- end_date = execute_data.end_date
- if trigger == "now":
- SchedulerUtil.add_and_run_job_now(job_info=obj)
- elif trigger == "cron":
- if not trigger_args:
- raise CustomException(msg="Cron执行需要提供Cron表达式")
- if not CronUtil.validate_cron_expression(trigger_args):
- raise CustomException(msg=f"Cron表达式不正确: {trigger_args}")
- SchedulerUtil.add_cron_job(
- job_info=obj,
- trigger_args=trigger_args,
- start_date=start_date,
- end_date=end_date,
- )
- elif trigger == "interval":
- if not trigger_args:
- raise CustomException(msg="间隔执行需要提供间隔参数")
- SchedulerUtil.add_interval_job(
- job_info=obj,
- trigger_args=trigger_args,
- start_date=start_date,
- end_date=end_date,
- )
- elif trigger == "date":
- if not trigger_args:
- raise CustomException(msg="指定时间执行需要提供执行时间")
- SchedulerUtil.add_date_job(job_info=obj, run_date=trigger_args)
- else:
- raise CustomException(msg=f"不支持的触发方式: {trigger}")
- return {"job_id": id, "status": "executed", "trigger": trigger}
|