| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- from fastapi import Query
- from pydantic import (
- BaseModel,
- ConfigDict,
- Field,
- )
- from app.common.enums import QueueEnum
- from app.core.base_schema import BaseSchema
- class JobCreateSchema(BaseModel):
- """执行日志创建模型"""
- job_id: str = Field(..., description="任务ID")
- job_name: str | None = Field(default=None, description="任务名称")
- trigger_type: str | None = Field(default=None, description="触发方式")
- status: str = Field(default="pending", description="执行状态")
- next_run_time: str | None = Field(default=None, description="下次执行时间")
- job_state: str | None = Field(default=None, description="任务状态信息")
- result: str | None = Field(default=None, description="执行结果")
- error: str | None = Field(default=None, description="错误信息")
- class JobUpdateSchema(BaseModel):
- """执行日志更新模型"""
- status: str | None = Field(default=None, description="执行状态")
- next_run_time: str | None = Field(default=None, description="下次执行时间")
- job_state: str | None = Field(default=None, description="任务状态信息")
- result: str | None = Field(default=None, description="执行结果")
- error: str | None = Field(default=None, description="错误信息")
- class JobOutSchema(JobCreateSchema, BaseSchema):
- """执行日志响应模型"""
- model_config = ConfigDict(from_attributes=True)
- ...
- class JobQueryParam:
- """执行日志查询参数"""
- def __init__(
- self,
- job_id: str | None = Query(None, description="任务ID"),
- job_name: str | None = Query(None, description="任务名称"),
- status: str | None = Query(None, description="执行状态"),
- trigger_type: str | None = Query(None, description="触发方式"),
- ) -> None:
- # 确保 job_id 是字符串类型
- self.job_id = (QueueEnum.eq.value, str(job_id) if job_id is not None else None)
- # 只有当 job_name 不为空时才添加查询条件
- self.job_name = (QueueEnum.like.value, job_name) if job_name else None
- self.status = (QueueEnum.eq.value, status)
- self.trigger_type = (QueueEnum.eq.value, trigger_type)
|