| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- from typing import Annotated
- from fastapi import APIRouter, Body, Depends, Path
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.ap_scheduler import SchedulerUtil
- from app.core.base_params import PaginationQueryParam
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from .schema import JobOutSchema, JobQueryParam
- from .service import JobService
- JobRouter = APIRouter(route_class=OperationLogRoute, prefix="/cronjob/job", tags=["调度器监控"])
- # ==================== 调度器状态和操作 ====================
- @JobRouter.get(
- "/scheduler/status",
- summary="获取调度器状态",
- description="获取调度器运行状态",
- response_model=ResponseSchema[dict],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
- )
- async def get_scheduler_status_controller() -> JSONResponse:
- """
- 获取调度器状态
- 返回:
- - JSONResponse: 调度器状态信息
- """
- data = JobService.get_scheduler_status_service()
- return SuccessResponse(data=data, msg="获取调度器状态成功")
- @JobRouter.get(
- "/scheduler/jobs",
- summary="获取调度器任务列表",
- description="获取调度器中的任务列表",
- response_model=ResponseSchema[list[dict]],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
- )
- async def get_scheduler_jobs_controller() -> JSONResponse:
- """
- 获取调度器中的任务列表
- 返回:
- - JSONResponse: 调度器任务列表
- """
- data = JobService.get_scheduler_jobs_service()
- return SuccessResponse(data=data, msg="获取调度器任务列表成功")
- @JobRouter.post(
- "/scheduler/start",
- summary="启动调度器",
- description="启动调度器",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
- )
- async def start_scheduler_controller() -> JSONResponse:
- """
- 启动调度器。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.start()
- log.info("调度器已启动")
- return SuccessResponse(msg="调度器已启动")
- @JobRouter.post(
- "/scheduler/pause",
- summary="暂停调度器",
- description="暂停调度器",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
- )
- async def pause_scheduler_controller() -> JSONResponse:
- """
- 暂停调度器。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.pause()
- log.info("调度器已暂停")
- return SuccessResponse(msg="调度器已暂停")
- @JobRouter.post(
- "/scheduler/resume",
- summary="恢复调度器",
- description="恢复调度器",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
- )
- async def resume_scheduler_controller() -> JSONResponse:
- """
- 恢复调度器。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.resume()
- log.info("调度器已恢复")
- return SuccessResponse(msg="调度器已恢复")
- @JobRouter.post(
- "/scheduler/shutdown",
- summary="关闭调度器",
- description="关闭调度器",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
- )
- async def shutdown_scheduler_controller() -> JSONResponse:
- """
- 关闭调度器。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- await SchedulerUtil.shutdown()
- log.info("调度器已关闭")
- return SuccessResponse(msg="调度器已关闭")
- @JobRouter.delete(
- "/scheduler/jobs/clear",
- summary="清空所有任务",
- description="清空调度器中的所有任务",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
- )
- async def clear_jobs_controller() -> JSONResponse:
- """
- 清空调度器中的所有任务。
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.clear_jobs()
- log.info("已清空所有任务")
- return SuccessResponse(msg="已清空所有任务")
- @JobRouter.get(
- "/scheduler/console",
- summary="获取调度器控制台信息",
- description="获取调度器任务的控制台输出",
- response_model=ResponseSchema[str],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
- )
- async def get_scheduler_console_controller() -> JSONResponse:
- """
- 获取调度器控制台信息
- 返回:
- - JSONResponse: 调度器任务的控制台输出
- """
- console_output = SchedulerUtil.print_jobs()
- return SuccessResponse(data=console_output, msg="获取控制台信息成功")
- @JobRouter.post(
- "/scheduler/sync",
- summary="同步调度器任务到数据库",
- description="将调度器中的任务同步到执行日志表",
- response_model=ResponseSchema[int],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:update"]))],
- )
- async def sync_jobs_controller() -> JSONResponse:
- """
- 同步调度器任务到数据库
- 返回:
- - JSONResponse: 同步的任务数量
- """
- sync_count = SchedulerUtil.sync_jobs_to_db()
- log.info(f"同步任务完成,共同步 {sync_count} 个任务")
- return SuccessResponse(data=sync_count, msg=f"同步完成,共同步 {sync_count} 个任务")
- # ==================== 调度器任务操作 ====================
- @JobRouter.post(
- "/task/pause/{job_id}",
- summary="暂停任务",
- description="暂停调度器中的任务",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
- )
- async def pause_job_controller(
- job_id: Annotated[str, Path(description="调度器任务ID")],
- ) -> JSONResponse:
- """
- 暂停调度器中的任务
- 参数:
- - job_id (str): 调度器任务ID
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.pause_job(job_id=job_id)
- log.info(f"暂停任务成功: {job_id}")
- return SuccessResponse(msg="暂停任务成功")
- @JobRouter.post(
- "/task/resume/{job_id}",
- summary="恢复任务",
- description="恢复调度器中的任务",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
- )
- async def resume_job_controller(
- job_id: Annotated[str, Path(description="调度器任务ID")],
- ) -> JSONResponse:
- """
- 恢复调度器中的任务
- 参数:
- - job_id (str): 调度器任务ID
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.resume_job(job_id=job_id)
- log.info(f"恢复任务成功: {job_id}")
- return SuccessResponse(msg="恢复任务成功")
- @JobRouter.post(
- "/task/run/{job_id}",
- summary="立即执行任务",
- description="立即执行调度器中的任务",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
- )
- async def run_job_controller(
- job_id: Annotated[str, Path(description="调度器任务ID")],
- ) -> JSONResponse:
- """
- 立即执行调度器中的任务
- 参数:
- - job_id (str): 调度器任务ID
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.run_job_now(job_id=job_id)
- log.info(f"立即执行任务成功: {job_id}")
- return SuccessResponse(msg="立即执行任务成功")
- @JobRouter.delete(
- "/task/remove/{job_id}",
- summary="移除任务",
- description="从调度器中移除任务",
- response_model=ResponseSchema[None],
- dependencies=[Depends(AuthPermission(["module_task:cronjob:job:delete"]))],
- )
- async def remove_job_controller(
- job_id: Annotated[str, Path(description="调度器任务ID")],
- ) -> JSONResponse:
- """
- 从调度器中移除任务
- 参数:
- - job_id (str): 调度器任务ID
- 返回:
- - JSONResponse: 成功提示响应。
- """
- SchedulerUtil.remove_job(job_id=job_id)
- log.info(f"移除任务成功: {job_id}")
- return SuccessResponse(msg="移除任务成功")
- # ==================== 执行日志 ====================
- @JobRouter.get(
- "/log/list",
- summary="查询执行日志列表",
- description="查询执行日志列表",
- response_model=ResponseSchema[list[JobOutSchema]],
- )
- async def get_job_log_list_controller(
- page: Annotated[PaginationQueryParam, Depends()],
- search: Annotated[JobQueryParam, Depends()],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:query"]))],
- ) -> JSONResponse:
- """
- 查询执行日志列表
- 参数:
- - page (PaginationQueryParam): 分页查询参数模型
- - search (JobQueryParam): 查询参数模型
- - auth (AuthSchema): 认证信息模型
- 返回:
- - JSONResponse: 包含分页后的执行日志列表
- """
- order_by = [{"created_time": "desc"}]
- if page.order_by:
- order_by = page.order_by
- result_dict = await JobService.get_job_log_page_service(
- auth=auth,
- page_no=page.page_no,
- page_size=page.page_size,
- search=search,
- order_by=order_by,
- )
- log.info("查询执行日志列表成功")
- return SuccessResponse(data=result_dict, msg="查询执行日志列表成功")
- @JobRouter.get(
- "/log/detail/{id}",
- summary="获取执行日志详情",
- description="获取执行日志详情",
- response_model=ResponseSchema[JobOutSchema],
- )
- async def get_job_log_detail_controller(
- id: Annotated[int, Path(description="日志ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:detail"]))],
- ) -> JSONResponse:
- """
- 获取执行日志详情
- 参数:
- - id (int): 日志ID
- - auth (AuthSchema): 认证信息模型
- 返回:
- - JSONResponse: 包含执行日志详情
- """
- result_dict = await JobService.get_job_log_detail_service(id=id, auth=auth)
- log.info(f"获取执行日志详情成功 {id}")
- return SuccessResponse(data=result_dict, msg="获取执行日志详情成功")
- @JobRouter.delete(
- "/log/delete",
- summary="删除执行日志",
- description="删除执行日志",
- response_model=ResponseSchema[None],
- )
- async def delete_job_log_controller(
- ids: Annotated[list[int], Body(description="ID列表")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:delete"]))],
- ) -> JSONResponse:
- """
- 删除执行日志
- 参数:
- - ids (list[int]): ID列表
- - auth (AuthSchema): 认证信息模型
- 返回:
- - JSONResponse: 成功提示响应。
- """
- await JobService.delete_job_log_service(auth=auth, ids=ids)
- log.info(f"删除执行日志成功: {ids}")
- return SuccessResponse(msg="删除执行日志成功")
|