service.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. import asyncio
  2. from typing import Any
  3. from app.api.v1.module_system.auth.schema import AuthSchema
  4. from app.core.exceptions import CustomException
  5. from ..engine.prefect_engine import run_prefect_workflow_sync, utc_now_iso, validate_workflow_graph
  6. from ..node_type.crud import WorkflowNodeTypeCRUD
  7. from .crud import WorkflowCRUD
  8. from .schema import (
  9. WorkflowCreateSchema,
  10. WorkflowExecuteResultSchema,
  11. WorkflowExecuteSchema,
  12. WorkflowOutSchema,
  13. WorkflowPublishSchema,
  14. WorkflowQueryParam,
  15. WorkflowUpdateSchema,
  16. )
  17. class WorkflowService:
  18. """工作流:画布存储 + 发布校验 + Prefect 执行"""
  19. @staticmethod
  20. def _out(obj: Any) -> dict:
  21. return WorkflowOutSchema.model_validate(obj).model_dump(mode="json")
  22. @classmethod
  23. async def get_workflow_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  24. """
  25. 获取工作流详情。
  26. 参数:
  27. - auth (AuthSchema): 认证信息。
  28. - id (int): 工作流 ID。
  29. 返回:
  30. - dict: 序列化后的工作流详情。
  31. 异常:
  32. - CustomException: 不存在时抛出。
  33. """
  34. obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
  35. if not obj:
  36. raise CustomException(msg="工作流不存在")
  37. return cls._out(obj)
  38. @classmethod
  39. async def get_workflow_list_service(
  40. cls,
  41. auth: AuthSchema,
  42. search: WorkflowQueryParam | None = None,
  43. order_by: list[dict[str, str]] | None = None,
  44. ) -> list[dict]:
  45. """
  46. 获取工作流列表(非分页)。
  47. 参数:
  48. - auth (AuthSchema): 认证信息。
  49. - search (WorkflowQueryParam | None): 查询条件。
  50. - order_by (list[dict[str, str]] | None): 排序。
  51. 返回:
  52. - list[dict]: 工作流字典列表。
  53. """
  54. if order_by is None:
  55. order_by = [{"updated_time": "desc"}]
  56. obj_list = await WorkflowCRUD(auth).get_obj_list_crud(
  57. search=search.__dict__ if search else None,
  58. order_by=order_by,
  59. )
  60. return [cls._out(o) for o in obj_list]
  61. @classmethod
  62. async def get_workflow_page_service(
  63. cls,
  64. auth: AuthSchema,
  65. page_no: int,
  66. page_size: int,
  67. search: WorkflowQueryParam | None = None,
  68. order_by: list[dict[str, str]] | None = None,
  69. ) -> dict:
  70. """
  71. 分页查询工作流(数据库 OFFSET/LIMIT)。
  72. 参数:
  73. - auth (AuthSchema): 认证信息。
  74. - page_no (int): 页码。
  75. - page_size (int): 每页条数。
  76. - search (WorkflowQueryParam | None): 查询条件。
  77. - order_by (list[dict[str, str]] | None): 排序。
  78. 返回:
  79. - dict: 分页结果(items 已 JSON 友好序列化)。
  80. """
  81. offset = (page_no - 1) * page_size
  82. order = order_by or [{"updated_time": "desc"}]
  83. result = await WorkflowCRUD(auth).page(
  84. offset=offset,
  85. limit=page_size,
  86. order_by=order,
  87. search=search.__dict__ if search else {},
  88. out_schema=WorkflowOutSchema,
  89. )
  90. result["items"] = [
  91. WorkflowOutSchema.model_validate(item).model_dump(mode="json") for item in result["items"]
  92. ]
  93. return result
  94. @classmethod
  95. async def create_workflow_service(cls, auth: AuthSchema, data: WorkflowCreateSchema) -> dict:
  96. """
  97. 创建工作流草稿。
  98. 参数:
  99. - auth (AuthSchema): 认证信息。
  100. - data (WorkflowCreateSchema): 创建体。
  101. 返回:
  102. - dict: 新建工作流字典。
  103. 异常:
  104. - CustomException: 编码重复或创建失败。
  105. """
  106. exist = await WorkflowCRUD(auth).get(code=data.code)
  107. if exist:
  108. raise CustomException(msg="流程编码已存在")
  109. obj = await WorkflowCRUD(auth).create_obj_crud(data=data)
  110. if not obj:
  111. raise CustomException(msg="创建工作流失败")
  112. return cls._out(obj)
  113. @classmethod
  114. async def update_workflow_service(
  115. cls, auth: AuthSchema, id: int, data: WorkflowUpdateSchema
  116. ) -> dict:
  117. """
  118. 更新工作流。
  119. 参数:
  120. - auth (AuthSchema): 认证信息。
  121. - id (int): 工作流 ID。
  122. - data (WorkflowUpdateSchema): 更新体。
  123. 返回:
  124. - dict: 更新后工作流字典。
  125. 异常:
  126. - CustomException: 不存在、编码冲突或更新失败。
  127. """
  128. exist = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
  129. if not exist:
  130. raise CustomException(msg="工作流不存在")
  131. if exist.code != data.code:
  132. other = await WorkflowCRUD(auth).get(code=data.code)
  133. if other:
  134. raise CustomException(msg="流程编码已存在")
  135. obj = await WorkflowCRUD(auth).update_obj_crud(id=id, data=data)
  136. if not obj:
  137. raise CustomException(msg="更新工作流失败")
  138. return cls._out(obj)
  139. @classmethod
  140. async def delete_workflow_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  141. """
  142. 批量删除工作流。
  143. 参数:
  144. - auth (AuthSchema): 认证信息。
  145. - ids (list[int]): ID 列表。
  146. 返回:
  147. - None
  148. 异常:
  149. - CustomException: ID 为空时抛出。
  150. """
  151. if not ids:
  152. raise CustomException(msg="删除ID不能为空")
  153. await WorkflowCRUD(auth).delete_obj_crud(ids=ids)
  154. @classmethod
  155. async def publish_workflow_service(
  156. cls, auth: AuthSchema, id: int, body: WorkflowPublishSchema | None = None
  157. ) -> dict:
  158. """
  159. 校验 DAG 后发布工作流。
  160. 参数:
  161. - auth (AuthSchema): 认证信息。
  162. - id (int): 工作流 ID。
  163. - body (WorkflowPublishSchema | None): 可选附加参数。
  164. 返回:
  165. - dict: 发布后工作流字典。
  166. 异常:
  167. - CustomException: 不存在、图无效或发布失败。
  168. """
  169. obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
  170. if not obj:
  171. raise CustomException(msg="工作流不存在")
  172. nodes = obj.nodes or []
  173. edges = obj.edges or []
  174. try:
  175. validate_workflow_graph(nodes, edges)
  176. except ValueError as e:
  177. raise CustomException(msg=str(e)) from e
  178. data = WorkflowUpdateSchema(
  179. name=obj.name,
  180. code=obj.code,
  181. description=obj.description,
  182. nodes=obj.nodes,
  183. edges=obj.edges,
  184. workflow_status="published",
  185. )
  186. updated = await WorkflowCRUD(auth).update_obj_crud(id=id, data=data)
  187. if not updated:
  188. raise CustomException(msg="发布失败")
  189. return cls._out(updated)
  190. @classmethod
  191. async def execute_workflow_service(
  192. cls, auth: AuthSchema, body: WorkflowExecuteSchema
  193. ) -> dict:
  194. """
  195. 执行已发布工作流(Prefect 同步入口在线程池中运行)。
  196. 参数:
  197. - auth (AuthSchema): 认证信息。
  198. - body (WorkflowExecuteSchema): 工作流 ID 与变量。
  199. 返回:
  200. - dict: 执行结果摘要(成功或失败结构)。
  201. 异常:
  202. - CustomException: 未发布、缺节点、节点类型未注册等。
  203. """
  204. obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=body.workflow_id)
  205. if not obj:
  206. raise CustomException(msg="工作流不存在")
  207. if obj.workflow_status != "published":
  208. raise CustomException(msg="仅已发布的工作流可执行")
  209. nodes = obj.nodes or []
  210. edges = obj.edges or []
  211. if not nodes:
  212. raise CustomException(msg="工作流没有节点")
  213. codes = {n.get("type") for n in nodes if n.get("type")}
  214. templates: dict[str, dict[str, Any]] = {}
  215. for code in codes:
  216. node_type = await WorkflowNodeTypeCRUD(auth).get(code=code)
  217. if not node_type:
  218. raise CustomException(msg=f"编排节点类型未注册(请在「工作流编排节点类型」中维护,非定时任务节点): {code}")
  219. if not node_type.func or not str(node_type.func).strip():
  220. raise CustomException(msg=f"编排节点类型未配置 func 代码块: {code}")
  221. templates[code] = {
  222. "func": node_type.func,
  223. "args": node_type.args,
  224. "kwargs": node_type.kwargs,
  225. }
  226. variables = body.variables or {}
  227. start = utc_now_iso()
  228. try:
  229. raw = await asyncio.to_thread(
  230. run_prefect_workflow_sync,
  231. nodes,
  232. edges,
  233. templates,
  234. variables,
  235. )
  236. except ValueError as e:
  237. raise CustomException(msg=str(e)) from e
  238. except CustomException:
  239. raise
  240. except Exception as e:
  241. end = utc_now_iso()
  242. err = WorkflowExecuteResultSchema(
  243. workflow_id=obj.id,
  244. workflow_name=obj.name,
  245. status="failed",
  246. start_time=start,
  247. end_time=end,
  248. variables=variables,
  249. node_results=None,
  250. error=str(e),
  251. )
  252. return err.model_dump(mode="json")
  253. end = utc_now_iso()
  254. ok = WorkflowExecuteResultSchema(
  255. workflow_id=obj.id,
  256. workflow_name=obj.name,
  257. status="completed",
  258. start_time=start,
  259. end_time=end,
  260. variables=variables,
  261. node_results=raw.get("node_results"),
  262. error=None,
  263. )
  264. return ok.model_dump(mode="json")