schema.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from fastapi import Query
  2. from pydantic import (
  3. BaseModel,
  4. ConfigDict,
  5. Field,
  6. )
  7. from app.common.enums import QueueEnum
  8. from app.core.base_schema import BaseSchema
  9. class JobCreateSchema(BaseModel):
  10. """执行日志创建模型"""
  11. job_id: str = Field(..., description="任务ID")
  12. job_name: str | None = Field(default=None, description="任务名称")
  13. trigger_type: str | None = Field(default=None, description="触发方式")
  14. status: str = Field(default="pending", description="执行状态")
  15. next_run_time: str | None = Field(default=None, description="下次执行时间")
  16. job_state: str | None = Field(default=None, description="任务状态信息")
  17. result: str | None = Field(default=None, description="执行结果")
  18. error: str | None = Field(default=None, description="错误信息")
  19. class JobUpdateSchema(BaseModel):
  20. """执行日志更新模型"""
  21. status: str | None = Field(default=None, description="执行状态")
  22. next_run_time: str | None = Field(default=None, description="下次执行时间")
  23. job_state: str | None = Field(default=None, description="任务状态信息")
  24. result: str | None = Field(default=None, description="执行结果")
  25. error: str | None = Field(default=None, description="错误信息")
  26. class JobOutSchema(JobCreateSchema, BaseSchema):
  27. """执行日志响应模型"""
  28. model_config = ConfigDict(from_attributes=True)
  29. ...
  30. class JobQueryParam:
  31. """执行日志查询参数"""
  32. def __init__(
  33. self,
  34. job_id: str | None = Query(None, description="任务ID"),
  35. job_name: str | None = Query(None, description="任务名称"),
  36. status: str | None = Query(None, description="执行状态"),
  37. trigger_type: str | None = Query(None, description="触发方式"),
  38. ) -> None:
  39. # 确保 job_id 是字符串类型
  40. self.job_id = (QueueEnum.eq.value, str(job_id) if job_id is not None else None)
  41. # 只有当 job_name 不为空时才添加查询条件
  42. self.job_name = (QueueEnum.like.value, job_name) if job_name else None
  43. self.status = (QueueEnum.eq.value, status)
  44. self.trigger_type = (QueueEnum.eq.value, trigger_type)