controller.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. from typing import Annotated
  2. from fastapi import APIRouter, Body, Depends, Path
  3. from fastapi.responses import JSONResponse
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.common.response import ResponseSchema, SuccessResponse
  6. from app.core.base_params import PaginationQueryParam
  7. from app.core.dependencies import AuthPermission
  8. from app.core.logger import log
  9. from app.core.router_class import OperationLogRoute
  10. from .schema import (
  11. WorkflowCreateSchema,
  12. WorkflowExecuteSchema,
  13. WorkflowOutSchema,
  14. WorkflowPublishSchema,
  15. WorkflowQueryParam,
  16. WorkflowUpdateSchema,
  17. )
  18. from .service import WorkflowService
  19. WorkflowRouter = APIRouter(route_class=OperationLogRoute, prefix="/workflow/definition", tags=["工作流"])
  20. @WorkflowRouter.get(
  21. "/detail/{id}",
  22. summary="工作流详情",
  23. description="根据ID获取工作流详情(含画布 nodes/edges)",
  24. response_model=ResponseSchema[WorkflowOutSchema],
  25. )
  26. async def get_workflow_detail_controller(
  27. id: Annotated[int, Path(description="工作流ID")],
  28. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:detail"]))],
  29. ) -> JSONResponse:
  30. """
  31. 根据 ID 获取工作流详情(含画布 nodes/edges)。
  32. 参数:
  33. - id (int): 工作流 ID。
  34. - auth (AuthSchema): 认证信息。
  35. 返回:
  36. - JSONResponse: 成功响应,data 为详情字典。
  37. """
  38. result_dict = await WorkflowService.get_workflow_detail_service(auth=auth, id=id)
  39. log.info(f"获取工作流详情成功 {id}")
  40. return SuccessResponse(data=result_dict, msg="获取工作流详情成功")
  41. @WorkflowRouter.get(
  42. "/list",
  43. summary="工作流列表",
  44. description="分页查询工作流列表",
  45. response_model=ResponseSchema[list[WorkflowOutSchema]],
  46. )
  47. async def get_workflow_list_controller(
  48. page: Annotated[PaginationQueryParam, Depends()],
  49. search: Annotated[WorkflowQueryParam, Depends()],
  50. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:query"]))],
  51. ) -> JSONResponse:
  52. """
  53. 分页查询工作流列表。
  54. 参数:
  55. - page (PaginationQueryParam): 分页与排序参数。
  56. - search (WorkflowQueryParam): 查询条件。
  57. - auth (AuthSchema): 认证信息。
  58. 返回:
  59. - JSONResponse: 成功响应,data 为分页结果。
  60. """
  61. result_dict = await WorkflowService.get_workflow_page_service(
  62. auth=auth,
  63. page_no=page.page_no,
  64. page_size=page.page_size,
  65. search=search,
  66. order_by=page.order_by,
  67. )
  68. log.info("查询工作流列表成功")
  69. return SuccessResponse(data=result_dict, msg="查询工作流列表成功")
  70. @WorkflowRouter.post(
  71. "/create",
  72. summary="创建工作流",
  73. description="创建草稿工作流,保存 Vue Flow 画布",
  74. response_model=ResponseSchema[WorkflowOutSchema],
  75. )
  76. async def create_workflow_controller(
  77. data: WorkflowCreateSchema,
  78. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:create"]))],
  79. ) -> JSONResponse:
  80. """
  81. 创建草稿工作流。
  82. 参数:
  83. - data (WorkflowCreateSchema): 创建体。
  84. - auth (AuthSchema): 认证信息。
  85. 返回:
  86. - JSONResponse: 成功响应,data 为新建工作流。
  87. """
  88. result_dict = await WorkflowService.create_workflow_service(auth=auth, data=data)
  89. log.info("创建工作流成功")
  90. return SuccessResponse(data=result_dict, msg="创建工作流成功")
  91. @WorkflowRouter.put(
  92. "/update/{id}",
  93. summary="更新工作流",
  94. description="更新工作流及画布",
  95. response_model=ResponseSchema[WorkflowOutSchema],
  96. )
  97. async def update_workflow_controller(
  98. id: Annotated[int, Path(description="工作流ID")],
  99. data: WorkflowUpdateSchema,
  100. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:update"]))],
  101. ) -> JSONResponse:
  102. """
  103. 更新工作流及画布。
  104. 参数:
  105. - id (int): 工作流 ID。
  106. - data (WorkflowUpdateSchema): 更新体。
  107. - auth (AuthSchema): 认证信息。
  108. 返回:
  109. - JSONResponse: 成功响应,data 为更新后的工作流。
  110. """
  111. result_dict = await WorkflowService.update_workflow_service(auth=auth, id=id, data=data)
  112. log.info(f"更新工作流成功 {id}")
  113. return SuccessResponse(data=result_dict, msg="更新工作流成功")
  114. @WorkflowRouter.delete(
  115. "/delete",
  116. summary="删除工作流",
  117. description="批量删除工作流",
  118. response_model=ResponseSchema[None],
  119. )
  120. async def delete_workflow_controller(
  121. ids: Annotated[list[int], Body(description="ID列表")],
  122. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:delete"]))],
  123. ) -> JSONResponse:
  124. """
  125. 批量删除工作流。
  126. 参数:
  127. - ids (list[int]): 工作流 ID 列表。
  128. - auth (AuthSchema): 认证信息。
  129. 返回:
  130. - JSONResponse: 成功提示响应。
  131. """
  132. await WorkflowService.delete_workflow_service(auth=auth, ids=ids)
  133. log.info(f"删除工作流成功 {ids}")
  134. return SuccessResponse(msg="删除工作流成功")
  135. @WorkflowRouter.post(
  136. "/publish/{id}",
  137. summary="发布工作流",
  138. description="校验 DAG 无环后标记为已发布,方可执行",
  139. response_model=ResponseSchema[WorkflowOutSchema],
  140. )
  141. async def publish_workflow_controller(
  142. id: Annotated[int, Path(description="工作流ID")],
  143. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:update"]))],
  144. body: Annotated[WorkflowPublishSchema | None, Body()] = None,
  145. ) -> JSONResponse:
  146. """
  147. 校验 DAG 无环后发布工作流。
  148. 参数:
  149. - id (int): 工作流 ID。
  150. - auth (AuthSchema): 认证信息。
  151. - body (WorkflowPublishSchema | None): 可选发布附加参数。
  152. 返回:
  153. - JSONResponse: 成功响应,data 为发布后工作流。
  154. """
  155. result_dict = await WorkflowService.publish_workflow_service(auth=auth, id=id, body=body)
  156. log.info(f"发布工作流成功 {id}")
  157. return SuccessResponse(data=result_dict, msg="发布工作流成功")
  158. @WorkflowRouter.post(
  159. "/execute",
  160. summary="执行工作流",
  161. description="使用 Prefect 按拓扑顺序执行已发布工作流",
  162. response_model=ResponseSchema[dict],
  163. )
  164. async def execute_workflow_controller(
  165. body: WorkflowExecuteSchema,
  166. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:workflow:definition:execute"]))],
  167. ) -> JSONResponse:
  168. """
  169. 使用 Prefect 按拓扑顺序执行已发布工作流。
  170. 参数:
  171. - body (WorkflowExecuteSchema): 工作流 ID 与变量等。
  172. - auth (AuthSchema): 认证信息。
  173. 返回:
  174. - JSONResponse: 成功响应,data 为执行结果摘要。
  175. """
  176. result_dict = await WorkflowService.execute_workflow_service(auth=auth, body=body)
  177. log.info(f"执行工作流完成 workflow_id={body.workflow_id}")
  178. return SuccessResponse(data=result_dict, msg="执行工作流完成")