| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- from typing import Annotated
- from fastapi import APIRouter, Body, Depends, Path
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.base_params import PaginationQueryParam
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from .schema import (
- WorkflowCreateSchema,
- WorkflowExecuteSchema,
- WorkflowOutSchema,
- WorkflowPublishSchema,
- WorkflowQueryParam,
- WorkflowUpdateSchema,
- )
- from .service import WorkflowService
- WorkflowRouter = APIRouter(route_class=OperationLogRoute, prefix="/workflow/definition", tags=["工作流"])
- @WorkflowRouter.get(
- "/detail/{id}",
- summary="工作流详情",
- description="根据ID获取工作流详情(含画布 nodes/edges)",
- response_model=ResponseSchema[WorkflowOutSchema],
- )
- async def get_workflow_detail_controller(
- id: Annotated[int, Path(description="工作流ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:detail"]))],
- ) -> JSONResponse:
- """
- 根据 ID 获取工作流详情(含画布 nodes/edges)。
- 参数:
- - id (int): 工作流 ID。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功响应,data 为详情字典。
- """
- result_dict = await WorkflowService.get_workflow_detail_service(auth=auth, id=id)
- log.info(f"获取工作流详情成功 {id}")
- return SuccessResponse(data=result_dict, msg="获取工作流详情成功")
- @WorkflowRouter.get(
- "/list",
- summary="工作流列表",
- description="分页查询工作流列表",
- response_model=ResponseSchema[list[WorkflowOutSchema]],
- )
- async def get_workflow_list_controller(
- page: Annotated[PaginationQueryParam, Depends()],
- search: Annotated[WorkflowQueryParam, Depends()],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:query"]))],
- ) -> JSONResponse:
- """
- 分页查询工作流列表。
- 参数:
- - page (PaginationQueryParam): 分页与排序参数。
- - search (WorkflowQueryParam): 查询条件。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功响应,data 为分页结果。
- """
- result_dict = await WorkflowService.get_workflow_page_service(
- auth=auth,
- page_no=page.page_no,
- page_size=page.page_size,
- search=search,
- order_by=page.order_by,
- )
- log.info("查询工作流列表成功")
- return SuccessResponse(data=result_dict, msg="查询工作流列表成功")
- @WorkflowRouter.post(
- "/create",
- summary="创建工作流",
- description="创建草稿工作流,保存 Vue Flow 画布",
- response_model=ResponseSchema[WorkflowOutSchema],
- )
- async def create_workflow_controller(
- data: WorkflowCreateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:create"]))],
- ) -> JSONResponse:
- """
- 创建草稿工作流。
- 参数:
- - data (WorkflowCreateSchema): 创建体。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功响应,data 为新建工作流。
- """
- result_dict = await WorkflowService.create_workflow_service(auth=auth, data=data)
- log.info("创建工作流成功")
- return SuccessResponse(data=result_dict, msg="创建工作流成功")
- @WorkflowRouter.put(
- "/update/{id}",
- summary="更新工作流",
- description="更新工作流及画布",
- response_model=ResponseSchema[WorkflowOutSchema],
- )
- async def update_workflow_controller(
- id: Annotated[int, Path(description="工作流ID")],
- data: WorkflowUpdateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:update"]))],
- ) -> JSONResponse:
- """
- 更新工作流及画布。
- 参数:
- - id (int): 工作流 ID。
- - data (WorkflowUpdateSchema): 更新体。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功响应,data 为更新后的工作流。
- """
- result_dict = await WorkflowService.update_workflow_service(auth=auth, id=id, data=data)
- log.info(f"更新工作流成功 {id}")
- return SuccessResponse(data=result_dict, msg="更新工作流成功")
- @WorkflowRouter.delete(
- "/delete",
- summary="删除工作流",
- description="批量删除工作流",
- response_model=ResponseSchema[None],
- )
- async def delete_workflow_controller(
- ids: Annotated[list[int], Body(description="ID列表")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:delete"]))],
- ) -> JSONResponse:
- """
- 批量删除工作流。
- 参数:
- - ids (list[int]): 工作流 ID 列表。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- await WorkflowService.delete_workflow_service(auth=auth, ids=ids)
- log.info(f"删除工作流成功 {ids}")
- return SuccessResponse(msg="删除工作流成功")
- @WorkflowRouter.post(
- "/publish/{id}",
- summary="发布工作流",
- description="校验 DAG 无环后标记为已发布,方可执行",
- response_model=ResponseSchema[WorkflowOutSchema],
- )
- async def publish_workflow_controller(
- id: Annotated[int, Path(description="工作流ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:update"]))],
- body: Annotated[WorkflowPublishSchema | None, Body()] = None,
- ) -> JSONResponse:
- """
- 校验 DAG 无环后发布工作流。
- 参数:
- - id (int): 工作流 ID。
- - auth (AuthSchema): 认证信息。
- - body (WorkflowPublishSchema | None): 可选发布附加参数。
- 返回:
- - JSONResponse: 成功响应,data 为发布后工作流。
- """
- result_dict = await WorkflowService.publish_workflow_service(auth=auth, id=id, body=body)
- log.info(f"发布工作流成功 {id}")
- return SuccessResponse(data=result_dict, msg="发布工作流成功")
- @WorkflowRouter.post(
- "/execute",
- summary="执行工作流",
- description="使用 Prefect 按拓扑顺序执行已发布工作流",
- response_model=ResponseSchema[dict],
- )
- async def execute_workflow_controller(
- body: WorkflowExecuteSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:execute"]))],
- ) -> JSONResponse:
- """
- 使用 Prefect 按拓扑顺序执行已发布工作流。
- 参数:
- - body (WorkflowExecuteSchema): 工作流 ID 与变量等。
- - auth (AuthSchema): 认证信息。
- 返回:
- - JSONResponse: 成功响应,data 为执行结果摘要。
- """
- result_dict = await WorkflowService.execute_workflow_service(auth=auth, body=body)
- log.info(f"执行工作流完成 workflow_id={body.workflow_id}")
- return SuccessResponse(data=result_dict, msg="执行工作流完成")
|