controller.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. from typing import Annotated
  2. from fastapi import APIRouter, Body, Depends, Path
  3. from fastapi.responses import JSONResponse
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.common.response import ResponseSchema, SuccessResponse
  6. from app.core.ap_scheduler import SchedulerUtil
  7. from app.core.base_params import PaginationQueryParam
  8. from app.core.dependencies import AuthPermission
  9. from app.core.logger import log
  10. from app.core.router_class import OperationLogRoute
  11. from .schema import JobOutSchema, JobQueryParam
  12. from .service import JobService
  13. JobRouter = APIRouter(route_class=OperationLogRoute, prefix="/cronjob/job", tags=["调度器监控"])
  14. # ==================== 调度器状态和操作 ====================
  15. @JobRouter.get(
  16. "/scheduler/status",
  17. summary="获取调度器状态",
  18. description="获取调度器运行状态",
  19. response_model=ResponseSchema[dict],
  20. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
  21. )
  22. async def get_scheduler_status_controller() -> JSONResponse:
  23. """
  24. 获取调度器状态
  25. 返回:
  26. - JSONResponse: 调度器状态信息
  27. """
  28. data = JobService.get_scheduler_status_service()
  29. return SuccessResponse(data=data, msg="获取调度器状态成功")
  30. @JobRouter.get(
  31. "/scheduler/jobs",
  32. summary="获取调度器任务列表",
  33. description="获取调度器中的任务列表",
  34. response_model=ResponseSchema[list[dict]],
  35. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
  36. )
  37. async def get_scheduler_jobs_controller() -> JSONResponse:
  38. """
  39. 获取调度器中的任务列表
  40. 返回:
  41. - JSONResponse: 调度器任务列表
  42. """
  43. data = JobService.get_scheduler_jobs_service()
  44. return SuccessResponse(data=data, msg="获取调度器任务列表成功")
  45. @JobRouter.post(
  46. "/scheduler/start",
  47. summary="启动调度器",
  48. description="启动调度器",
  49. response_model=ResponseSchema[None],
  50. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
  51. )
  52. async def start_scheduler_controller() -> JSONResponse:
  53. """
  54. 启动调度器。
  55. 返回:
  56. - JSONResponse: 成功提示响应。
  57. """
  58. SchedulerUtil.start()
  59. log.info("调度器已启动")
  60. return SuccessResponse(msg="调度器已启动")
  61. @JobRouter.post(
  62. "/scheduler/pause",
  63. summary="暂停调度器",
  64. description="暂停调度器",
  65. response_model=ResponseSchema[None],
  66. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
  67. )
  68. async def pause_scheduler_controller() -> JSONResponse:
  69. """
  70. 暂停调度器。
  71. 返回:
  72. - JSONResponse: 成功提示响应。
  73. """
  74. SchedulerUtil.pause()
  75. log.info("调度器已暂停")
  76. return SuccessResponse(msg="调度器已暂停")
  77. @JobRouter.post(
  78. "/scheduler/resume",
  79. summary="恢复调度器",
  80. description="恢复调度器",
  81. response_model=ResponseSchema[None],
  82. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
  83. )
  84. async def resume_scheduler_controller() -> JSONResponse:
  85. """
  86. 恢复调度器。
  87. 返回:
  88. - JSONResponse: 成功提示响应。
  89. """
  90. SchedulerUtil.resume()
  91. log.info("调度器已恢复")
  92. return SuccessResponse(msg="调度器已恢复")
  93. @JobRouter.post(
  94. "/scheduler/shutdown",
  95. summary="关闭调度器",
  96. description="关闭调度器",
  97. response_model=ResponseSchema[None],
  98. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:scheduler"]))],
  99. )
  100. async def shutdown_scheduler_controller() -> JSONResponse:
  101. """
  102. 关闭调度器。
  103. 返回:
  104. - JSONResponse: 成功提示响应。
  105. """
  106. await SchedulerUtil.shutdown()
  107. log.info("调度器已关闭")
  108. return SuccessResponse(msg="调度器已关闭")
  109. @JobRouter.delete(
  110. "/scheduler/jobs/clear",
  111. summary="清空所有任务",
  112. description="清空调度器中的所有任务",
  113. response_model=ResponseSchema[None],
  114. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
  115. )
  116. async def clear_jobs_controller() -> JSONResponse:
  117. """
  118. 清空调度器中的所有任务。
  119. 返回:
  120. - JSONResponse: 成功提示响应。
  121. """
  122. SchedulerUtil.clear_jobs()
  123. log.info("已清空所有任务")
  124. return SuccessResponse(msg="已清空所有任务")
  125. @JobRouter.get(
  126. "/scheduler/console",
  127. summary="获取调度器控制台信息",
  128. description="获取调度器任务的控制台输出",
  129. response_model=ResponseSchema[str],
  130. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:query"]))],
  131. )
  132. async def get_scheduler_console_controller() -> JSONResponse:
  133. """
  134. 获取调度器控制台信息
  135. 返回:
  136. - JSONResponse: 调度器任务的控制台输出
  137. """
  138. console_output = SchedulerUtil.print_jobs()
  139. return SuccessResponse(data=console_output, msg="获取控制台信息成功")
  140. @JobRouter.post(
  141. "/scheduler/sync",
  142. summary="同步调度器任务到数据库",
  143. description="将调度器中的任务同步到执行日志表",
  144. response_model=ResponseSchema[int],
  145. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:update"]))],
  146. )
  147. async def sync_jobs_controller() -> JSONResponse:
  148. """
  149. 同步调度器任务到数据库
  150. 返回:
  151. - JSONResponse: 同步的任务数量
  152. """
  153. sync_count = SchedulerUtil.sync_jobs_to_db()
  154. log.info(f"同步任务完成,共同步 {sync_count} 个任务")
  155. return SuccessResponse(data=sync_count, msg=f"同步完成,共同步 {sync_count} 个任务")
  156. # ==================== 调度器任务操作 ====================
  157. @JobRouter.post(
  158. "/task/pause/{job_id}",
  159. summary="暂停任务",
  160. description="暂停调度器中的任务",
  161. response_model=ResponseSchema[None],
  162. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
  163. )
  164. async def pause_job_controller(
  165. job_id: Annotated[str, Path(description="调度器任务ID")],
  166. ) -> JSONResponse:
  167. """
  168. 暂停调度器中的任务
  169. 参数:
  170. - job_id (str): 调度器任务ID
  171. 返回:
  172. - JSONResponse: 成功提示响应。
  173. """
  174. SchedulerUtil.pause_job(job_id=job_id)
  175. log.info(f"暂停任务成功: {job_id}")
  176. return SuccessResponse(msg="暂停任务成功")
  177. @JobRouter.post(
  178. "/task/resume/{job_id}",
  179. summary="恢复任务",
  180. description="恢复调度器中的任务",
  181. response_model=ResponseSchema[None],
  182. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
  183. )
  184. async def resume_job_controller(
  185. job_id: Annotated[str, Path(description="调度器任务ID")],
  186. ) -> JSONResponse:
  187. """
  188. 恢复调度器中的任务
  189. 参数:
  190. - job_id (str): 调度器任务ID
  191. 返回:
  192. - JSONResponse: 成功提示响应。
  193. """
  194. SchedulerUtil.resume_job(job_id=job_id)
  195. log.info(f"恢复任务成功: {job_id}")
  196. return SuccessResponse(msg="恢复任务成功")
  197. @JobRouter.post(
  198. "/task/run/{job_id}",
  199. summary="立即执行任务",
  200. description="立即执行调度器中的任务",
  201. response_model=ResponseSchema[None],
  202. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:task"]))],
  203. )
  204. async def run_job_controller(
  205. job_id: Annotated[str, Path(description="调度器任务ID")],
  206. ) -> JSONResponse:
  207. """
  208. 立即执行调度器中的任务
  209. 参数:
  210. - job_id (str): 调度器任务ID
  211. 返回:
  212. - JSONResponse: 成功提示响应。
  213. """
  214. SchedulerUtil.run_job_now(job_id=job_id)
  215. log.info(f"立即执行任务成功: {job_id}")
  216. return SuccessResponse(msg="立即执行任务成功")
  217. @JobRouter.delete(
  218. "/task/remove/{job_id}",
  219. summary="移除任务",
  220. description="从调度器中移除任务",
  221. response_model=ResponseSchema[None],
  222. dependencies=[Depends(AuthPermission(["module_task:cronjob:job:delete"]))],
  223. )
  224. async def remove_job_controller(
  225. job_id: Annotated[str, Path(description="调度器任务ID")],
  226. ) -> JSONResponse:
  227. """
  228. 从调度器中移除任务
  229. 参数:
  230. - job_id (str): 调度器任务ID
  231. 返回:
  232. - JSONResponse: 成功提示响应。
  233. """
  234. SchedulerUtil.remove_job(job_id=job_id)
  235. log.info(f"移除任务成功: {job_id}")
  236. return SuccessResponse(msg="移除任务成功")
  237. # ==================== 执行日志 ====================
  238. @JobRouter.get(
  239. "/log/list",
  240. summary="查询执行日志列表",
  241. description="查询执行日志列表",
  242. response_model=ResponseSchema[list[JobOutSchema]],
  243. )
  244. async def get_job_log_list_controller(
  245. page: Annotated[PaginationQueryParam, Depends()],
  246. search: Annotated[JobQueryParam, Depends()],
  247. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:query"]))],
  248. ) -> JSONResponse:
  249. """
  250. 查询执行日志列表
  251. 参数:
  252. - page (PaginationQueryParam): 分页查询参数模型
  253. - search (JobQueryParam): 查询参数模型
  254. - auth (AuthSchema): 认证信息模型
  255. 返回:
  256. - JSONResponse: 包含分页后的执行日志列表
  257. """
  258. order_by = [{"created_time": "desc"}]
  259. if page.order_by:
  260. order_by = page.order_by
  261. result_dict = await JobService.get_job_log_page_service(
  262. auth=auth,
  263. page_no=page.page_no,
  264. page_size=page.page_size,
  265. search=search,
  266. order_by=order_by,
  267. )
  268. log.info("查询执行日志列表成功")
  269. return SuccessResponse(data=result_dict, msg="查询执行日志列表成功")
  270. @JobRouter.get(
  271. "/log/detail/{id}",
  272. summary="获取执行日志详情",
  273. description="获取执行日志详情",
  274. response_model=ResponseSchema[JobOutSchema],
  275. )
  276. async def get_job_log_detail_controller(
  277. id: Annotated[int, Path(description="日志ID")],
  278. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:detail"]))],
  279. ) -> JSONResponse:
  280. """
  281. 获取执行日志详情
  282. 参数:
  283. - id (int): 日志ID
  284. - auth (AuthSchema): 认证信息模型
  285. 返回:
  286. - JSONResponse: 包含执行日志详情
  287. """
  288. result_dict = await JobService.get_job_log_detail_service(id=id, auth=auth)
  289. log.info(f"获取执行日志详情成功 {id}")
  290. return SuccessResponse(data=result_dict, msg="获取执行日志详情成功")
  291. @JobRouter.delete(
  292. "/log/delete",
  293. summary="删除执行日志",
  294. description="删除执行日志",
  295. response_model=ResponseSchema[None],
  296. )
  297. async def delete_job_log_controller(
  298. ids: Annotated[list[int], Body(description="ID列表")],
  299. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_task:cronjob:job:delete"]))],
  300. ) -> JSONResponse:
  301. """
  302. 删除执行日志
  303. 参数:
  304. - ids (list[int]): ID列表
  305. - auth (AuthSchema): 认证信息模型
  306. 返回:
  307. - JSONResponse: 成功提示响应。
  308. """
  309. await JobService.delete_job_log_service(auth=auth, ids=ids)
  310. log.info(f"删除执行日志成功: {ids}")
  311. return SuccessResponse(msg="删除执行日志成功")