schema.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from typing import Any
  2. from fastapi import Query
  3. from pydantic import BaseModel, ConfigDict, Field, model_validator
  4. from app.common.enums import QueueEnum
  5. from app.core.base_schema import UserBySchema
  6. from app.core.validator import DateTimeStr
  7. class WorkflowCreateSchema(BaseModel):
  8. """创建工作流"""
  9. name: str = Field(..., max_length=128, description="流程名称")
  10. code: str = Field(..., max_length=64, description="流程编码")
  11. description: str | None = Field(default=None, description="描述")
  12. nodes: list | None = Field(default=None, description="Vue Flow nodes")
  13. edges: list | None = Field(default=None, description="Vue Flow edges")
  14. class WorkflowUpdateSchema(WorkflowCreateSchema):
  15. """更新工作流"""
  16. workflow_status: str | None = Field(default=None, description="draft/published/archived")
  17. class WorkflowOutSchema(UserBySchema):
  18. """工作流输出(status 表示流程状态 draft/published/archived,与 ModelMixin.status 区分)"""
  19. model_config = ConfigDict(from_attributes=True)
  20. id: int | None = Field(default=None, description="主键ID")
  21. uuid: str | None = Field(default=None, description="UUID")
  22. description: str | None = Field(default=None, description="描述")
  23. created_time: DateTimeStr | None = Field(default=None, description="创建时间")
  24. updated_time: DateTimeStr | None = Field(default=None, description="更新时间")
  25. name: str = Field(description="流程名称")
  26. code: str = Field(description="流程编码")
  27. status: str = Field(description="流程状态 draft/published/archived")
  28. nodes: list | None = Field(default=None, description="节点")
  29. edges: list | None = Field(default=None, description="连线")
  30. @model_validator(mode="before")
  31. @classmethod
  32. def _map_workflow_status(cls, data: Any) -> Any:
  33. from .model import WorkflowModel
  34. if isinstance(data, WorkflowModel):
  35. return {
  36. "id": data.id,
  37. "uuid": data.uuid,
  38. "description": data.description,
  39. "created_time": data.created_time,
  40. "updated_time": data.updated_time,
  41. "created_id": data.created_id,
  42. "updated_id": data.updated_id,
  43. "name": data.name,
  44. "code": data.code,
  45. "status": data.workflow_status,
  46. "nodes": data.nodes,
  47. "edges": data.edges,
  48. }
  49. return data
  50. class WorkflowQueryParam:
  51. """工作流查询"""
  52. def __init__(
  53. self,
  54. name: str | None = Query(None, description="流程名称"),
  55. code: str | None = Query(None, description="流程编码"),
  56. status: str | None = Query(None, description="draft/published/archived"),
  57. created_time: list[DateTimeStr] | None = Query(
  58. None,
  59. description="创建时间范围",
  60. examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"],
  61. ),
  62. updated_time: list[DateTimeStr] | None = Query(
  63. None,
  64. description="更新时间范围",
  65. examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"],
  66. ),
  67. created_id: int | None = Query(None, description="创建人"),
  68. updated_id: int | None = Query(None, description="更新人"),
  69. ) -> None:
  70. self.name = (QueueEnum.like.value, name)
  71. self.code = (QueueEnum.like.value, code)
  72. self.workflow_status = (QueueEnum.eq.value, status)
  73. self.created_id = (QueueEnum.eq.value, created_id)
  74. self.updated_id = (QueueEnum.eq.value, updated_id)
  75. if created_time and len(created_time) == 2:
  76. self.created_time = (QueueEnum.between.value, (created_time[0], created_time[1]))
  77. if updated_time and len(updated_time) == 2:
  78. self.updated_time = (QueueEnum.between.value, (updated_time[0], updated_time[1]))
  79. class WorkflowPublishSchema(BaseModel):
  80. """发布工作流(可选备注)"""
  81. remark: str | None = Field(default=None, description="备注")
  82. class WorkflowExecuteSchema(BaseModel):
  83. """执行工作流"""
  84. workflow_id: int = Field(..., description="工作流ID")
  85. variables: dict | None = Field(default=None, description="注入到各节点的 variables 上下文")
  86. business_key: str | None = Field(default=None, description="业务键")
  87. job_id: int | None = Field(default=None, description="关联任务ID")
  88. class WorkflowExecuteResultSchema(BaseModel):
  89. """执行结果"""
  90. workflow_id: int
  91. workflow_name: str
  92. status: str = Field(description="completed/failed")
  93. start_time: str | None = None
  94. end_time: str | None = None
  95. variables: dict | None = None
  96. node_results: dict | None = None
  97. error: str | None = None