| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- import asyncio
- from typing import Any
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.exceptions import CustomException
- from ..engine.prefect_engine import run_prefect_workflow_sync, utc_now_iso, validate_workflow_graph
- from ..node_type.crud import WorkflowNodeTypeCRUD
- from .crud import WorkflowCRUD
- from .schema import (
- WorkflowCreateSchema,
- WorkflowExecuteResultSchema,
- WorkflowExecuteSchema,
- WorkflowOutSchema,
- WorkflowPublishSchema,
- WorkflowQueryParam,
- WorkflowUpdateSchema,
- )
- class WorkflowService:
- """工作流:画布存储 + 发布校验 + Prefect 执行"""
- @staticmethod
- def _out(obj: Any) -> dict:
- return WorkflowOutSchema.model_validate(obj).model_dump(mode="json")
- @classmethod
- async def get_workflow_detail_service(cls, auth: AuthSchema, id: int) -> dict:
- """
- 获取工作流详情。
- 参数:
- - auth (AuthSchema): 认证信息。
- - id (int): 工作流 ID。
- 返回:
- - dict: 序列化后的工作流详情。
- 异常:
- - CustomException: 不存在时抛出。
- """
- obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg="工作流不存在")
- return cls._out(obj)
- @classmethod
- async def get_workflow_list_service(
- cls,
- auth: AuthSchema,
- search: WorkflowQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> list[dict]:
- """
- 获取工作流列表(非分页)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - search (WorkflowQueryParam | None): 查询条件。
- - order_by (list[dict[str, str]] | None): 排序。
- 返回:
- - list[dict]: 工作流字典列表。
- """
- if order_by is None:
- order_by = [{"updated_time": "desc"}]
- obj_list = await WorkflowCRUD(auth).get_obj_list_crud(
- search=search.__dict__ if search else None,
- order_by=order_by,
- )
- return [cls._out(o) for o in obj_list]
- @classmethod
- async def get_workflow_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: WorkflowQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> dict:
- """
- 分页查询工作流(数据库 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - page_no (int): 页码。
- - page_size (int): 每页条数。
- - search (WorkflowQueryParam | None): 查询条件。
- - order_by (list[dict[str, str]] | None): 排序。
- 返回:
- - dict: 分页结果(items 已 JSON 友好序列化)。
- """
- offset = (page_no - 1) * page_size
- order = order_by or [{"updated_time": "desc"}]
- result = await WorkflowCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=order,
- search=search.__dict__ if search else {},
- out_schema=WorkflowOutSchema,
- )
- result["items"] = [
- WorkflowOutSchema.model_validate(item).model_dump(mode="json") for item in result["items"]
- ]
- return result
- @classmethod
- async def create_workflow_service(cls, auth: AuthSchema, data: WorkflowCreateSchema) -> dict:
- """
- 创建工作流草稿。
- 参数:
- - auth (AuthSchema): 认证信息。
- - data (WorkflowCreateSchema): 创建体。
- 返回:
- - dict: 新建工作流字典。
- 异常:
- - CustomException: 编码重复或创建失败。
- """
- exist = await WorkflowCRUD(auth).get(code=data.code)
- if exist:
- raise CustomException(msg="流程编码已存在")
- obj = await WorkflowCRUD(auth).create_obj_crud(data=data)
- if not obj:
- raise CustomException(msg="创建工作流失败")
- return cls._out(obj)
- @classmethod
- async def update_workflow_service(
- cls, auth: AuthSchema, id: int, data: WorkflowUpdateSchema
- ) -> dict:
- """
- 更新工作流。
- 参数:
- - auth (AuthSchema): 认证信息。
- - id (int): 工作流 ID。
- - data (WorkflowUpdateSchema): 更新体。
- 返回:
- - dict: 更新后工作流字典。
- 异常:
- - CustomException: 不存在、编码冲突或更新失败。
- """
- exist = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist:
- raise CustomException(msg="工作流不存在")
- if exist.code != data.code:
- other = await WorkflowCRUD(auth).get(code=data.code)
- if other:
- raise CustomException(msg="流程编码已存在")
- obj = await WorkflowCRUD(auth).update_obj_crud(id=id, data=data)
- if not obj:
- raise CustomException(msg="更新工作流失败")
- return cls._out(obj)
- @classmethod
- async def delete_workflow_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """
- 批量删除工作流。
- 参数:
- - auth (AuthSchema): 认证信息。
- - ids (list[int]): ID 列表。
- 返回:
- - None
- 异常:
- - CustomException: ID 为空时抛出。
- """
- if not ids:
- raise CustomException(msg="删除ID不能为空")
- await WorkflowCRUD(auth).delete_obj_crud(ids=ids)
- @classmethod
- async def publish_workflow_service(
- cls, auth: AuthSchema, id: int, body: WorkflowPublishSchema | None = None
- ) -> dict:
- """
- 校验 DAG 后发布工作流。
- 参数:
- - auth (AuthSchema): 认证信息。
- - id (int): 工作流 ID。
- - body (WorkflowPublishSchema | None): 可选附加参数。
- 返回:
- - dict: 发布后工作流字典。
- 异常:
- - CustomException: 不存在、图无效或发布失败。
- """
- obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg="工作流不存在")
- nodes = obj.nodes or []
- edges = obj.edges or []
- try:
- validate_workflow_graph(nodes, edges)
- except ValueError as e:
- raise CustomException(msg=str(e)) from e
- data = WorkflowUpdateSchema(
- name=obj.name,
- code=obj.code,
- description=obj.description,
- nodes=obj.nodes,
- edges=obj.edges,
- workflow_status="published",
- )
- updated = await WorkflowCRUD(auth).update_obj_crud(id=id, data=data)
- if not updated:
- raise CustomException(msg="发布失败")
- return cls._out(updated)
- @classmethod
- async def execute_workflow_service(
- cls, auth: AuthSchema, body: WorkflowExecuteSchema
- ) -> dict:
- """
- 执行已发布工作流(Prefect 同步入口在线程池中运行)。
- 参数:
- - auth (AuthSchema): 认证信息。
- - body (WorkflowExecuteSchema): 工作流 ID 与变量。
- 返回:
- - dict: 执行结果摘要(成功或失败结构)。
- 异常:
- - CustomException: 未发布、缺节点、节点类型未注册等。
- """
- obj = await WorkflowCRUD(auth).get_obj_by_id_crud(id=body.workflow_id)
- if not obj:
- raise CustomException(msg="工作流不存在")
- if obj.workflow_status != "published":
- raise CustomException(msg="仅已发布的工作流可执行")
- nodes = obj.nodes or []
- edges = obj.edges or []
- if not nodes:
- raise CustomException(msg="工作流没有节点")
- codes = {n.get("type") for n in nodes if n.get("type")}
- templates: dict[str, dict[str, Any]] = {}
- for code in codes:
- node_type = await WorkflowNodeTypeCRUD(auth).get(code=code)
- if not node_type:
- raise CustomException(msg=f"编排节点类型未注册(请在「工作流编排节点类型」中维护,非定时任务节点): {code}")
- if not node_type.func or not str(node_type.func).strip():
- raise CustomException(msg=f"编排节点类型未配置 func 代码块: {code}")
- templates[code] = {
- "func": node_type.func,
- "args": node_type.args,
- "kwargs": node_type.kwargs,
- }
- variables = body.variables or {}
- start = utc_now_iso()
- try:
- raw = await asyncio.to_thread(
- run_prefect_workflow_sync,
- nodes,
- edges,
- templates,
- variables,
- )
- except ValueError as e:
- raise CustomException(msg=str(e)) from e
- except CustomException:
- raise
- except Exception as e:
- end = utc_now_iso()
- err = WorkflowExecuteResultSchema(
- workflow_id=obj.id,
- workflow_name=obj.name,
- status="failed",
- start_time=start,
- end_time=end,
- variables=variables,
- node_results=None,
- error=str(e),
- )
- return err.model_dump(mode="json")
- end = utc_now_iso()
- ok = WorkflowExecuteResultSchema(
- workflow_id=obj.id,
- workflow_name=obj.name,
- status="completed",
- start_time=start,
- end_time=end,
- variables=variables,
- node_results=raw.get("node_results"),
- error=None,
- )
- return ok.model_dump(mode="json")
|