ap_scheduler.py 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495
  1. import json
  2. from collections.abc import Callable
  3. from datetime import datetime
  4. from typing import Any
  5. from apscheduler.events import (
  6. EVENT_ALL,
  7. EVENT_ALL_JOBS_REMOVED,
  8. EVENT_EXECUTOR_ADDED,
  9. EVENT_EXECUTOR_REMOVED,
  10. EVENT_JOB_ADDED,
  11. EVENT_JOB_ERROR,
  12. EVENT_JOB_EXECUTED,
  13. EVENT_JOB_MAX_INSTANCES,
  14. EVENT_JOB_MISSED,
  15. EVENT_JOB_MODIFIED,
  16. EVENT_JOB_REMOVED,
  17. EVENT_JOB_SUBMITTED,
  18. EVENT_JOBSTORE_ADDED,
  19. EVENT_JOBSTORE_REMOVED,
  20. EVENT_SCHEDULER_PAUSED,
  21. EVENT_SCHEDULER_RESUMED,
  22. EVENT_SCHEDULER_SHUTDOWN,
  23. EVENT_SCHEDULER_START,
  24. EVENT_SCHEDULER_STARTED,
  25. JobEvent,
  26. JobExecutionEvent,
  27. JobSubmissionEvent,
  28. SchedulerEvent,
  29. )
  30. from apscheduler.executors.asyncio import AsyncIOExecutor
  31. from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
  32. from apscheduler.job import Job
  33. from apscheduler.jobstores.base import ConflictingIdError
  34. from apscheduler.jobstores.memory import MemoryJobStore
  35. from apscheduler.jobstores.redis import RedisJobStore
  36. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  37. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  38. from apscheduler.triggers.cron import CronTrigger
  39. from apscheduler.triggers.date import DateTrigger
  40. from apscheduler.triggers.interval import IntervalTrigger
  41. from redis.asyncio import Redis
  42. from app.config.setting import settings
  43. from app.core.database import engine
  44. from app.core.logger import log
  45. from app.plugin.module_task.cronjob.node.model import NodeModel
  46. from app.utils.cron_util import CronUtil
  47. scheduler = AsyncIOScheduler()
  48. scheduler.configure(
  49. jobstores={
  50. "default": MemoryJobStore(),
  51. "sqlalchemy": SQLAlchemyJobStore(url=settings.DB_URI, engine=engine),
  52. "redis": RedisJobStore(
  53. host=settings.REDIS_HOST,
  54. port=int(settings.REDIS_PORT),
  55. username=settings.REDIS_USER,
  56. password=settings.REDIS_PASSWORD,
  57. db=int(settings.REDIS_DB_NAME),
  58. )
  59. },
  60. executors={
  61. "default": AsyncIOExecutor(),
  62. "threadpool": ThreadPoolExecutor(max_workers=10),
  63. "processpool": ProcessPoolExecutor(max_workers=1),
  64. },
  65. job_defaults={
  66. "coalesce": True,
  67. "max_instances": 5,
  68. },
  69. timezone="Asia/Shanghai",
  70. )
  71. class SchedulerUtil:
  72. """
  73. 定时任务相关方法
  74. """
  75. redis_instance: Redis | None = None
  76. # 临时存储 job_name,用于在 EVENT_JOB_SUBMITTED 时获取
  77. # 格式可以是: str (任务名称) 或 tuple[str, str] (原任务ID, 任务名称)
  78. _job_name_cache: dict[str, str | tuple[str, str]] = {}
  79. @classmethod
  80. def scheduler_event_listener(cls, event: JobEvent | JobExecutionEvent) -> None:
  81. """
  82. 监听任务执行事件,记录执行日志;每次执行新建日志行,保留历史。
  83. 参数:
  84. - event (JobEvent | JobExecutionEvent): APScheduler 事件对象。
  85. 返回:
  86. - None
  87. """
  88. try:
  89. # 事件处理器映射
  90. event_handlers: dict[int, Callable] = {
  91. # 调度器事件
  92. EVENT_SCHEDULER_STARTED: cls._handle_scheduler_started,
  93. EVENT_SCHEDULER_START: cls._handle_scheduler_started,
  94. EVENT_SCHEDULER_SHUTDOWN: cls._handle_scheduler_shutdown,
  95. EVENT_SCHEDULER_PAUSED: cls._handle_scheduler_paused,
  96. EVENT_SCHEDULER_RESUMED: cls._handle_scheduler_resumed,
  97. # 执行器事件
  98. EVENT_EXECUTOR_ADDED: cls._handle_executor_added,
  99. EVENT_EXECUTOR_REMOVED: cls._handle_executor_removed,
  100. # JobStore 事件
  101. EVENT_JOBSTORE_ADDED: cls._handle_jobstore_added,
  102. EVENT_JOBSTORE_REMOVED: cls._handle_jobstore_removed,
  103. EVENT_ALL_JOBS_REMOVED: cls._handle_all_jobs_removed,
  104. # 任务事件
  105. EVENT_JOB_ADDED: cls._handle_job_added,
  106. EVENT_JOB_REMOVED: cls._handle_job_removed,
  107. EVENT_JOB_MODIFIED: cls._handle_job_modified,
  108. EVENT_JOB_SUBMITTED: cls._handle_job_submitted,
  109. EVENT_JOB_EXECUTED: cls._handle_job_executed,
  110. EVENT_JOB_ERROR: cls._handle_job_error,
  111. EVENT_JOB_MISSED: cls._handle_job_missed,
  112. EVENT_JOB_MAX_INSTANCES: cls._handle_job_max_instances,
  113. }
  114. # 处理事件
  115. if event.code in event_handlers:
  116. handler = event_handlers[event.code]
  117. handler(event)
  118. else:
  119. # 处理其他事件
  120. cls._handle_other_event(event)
  121. except Exception as e:
  122. log.error(f"处理任务执行事件失败: {e!s}", exc_info=True)
  123. @classmethod
  124. def _handle_job_submitted(cls, event: JobSubmissionEvent) -> None:
  125. """
  126. 处理任务提交事件
  127. """
  128. job_id = str(event.job_id)
  129. job = cls.get_job(job_id=job_id)
  130. if job:
  131. log.info(f"任务 {job_id} ({job.name}) 已提交执行")
  132. trigger_type = cls._get_trigger_type(job_id)
  133. # 周期性任务(cron/interval):更新 pending 状态为 running
  134. if trigger_type in ("cron", "interval"):
  135. cls._update_job_log(
  136. job_id=job_id,
  137. status="running",
  138. )
  139. else:
  140. # 一次性任务(manual/date):创建新的 running 状态日志
  141. cls._create_job_log(
  142. job_id=job_id,
  143. job_name=job.name,
  144. trigger_type=trigger_type,
  145. status="running",
  146. )
  147. else:
  148. # 任务可能已经被移除(一次性任务执行完毕后自动移除)
  149. # 尝试从缓存获取 job_name 和原任务 ID
  150. cached_value = cls._job_name_cache.pop(job_id, None)
  151. # 处理新的缓存格式 (原任务ID, 任务名称) 或旧的格式 任务名称
  152. if isinstance(cached_value, tuple):
  153. original_job_id, job_name = cached_value
  154. else:
  155. original_job_id, job_name = job_id, cached_value
  156. log.info(f"任务 {job_id} 提交执行,但未找到任务信息(可能已被移除),尝试创建日志")
  157. result = cls._create_job_log(
  158. job_id=original_job_id,
  159. job_name=job_name,
  160. trigger_type="manual",
  161. status="running",
  162. )
  163. if result:
  164. log.info(f"任务 {original_job_id} 日志创建成功,id={result}")
  165. else:
  166. log.error(f"任务 {original_job_id} 日志创建失败")
  167. @classmethod
  168. def _handle_job_executed(cls, event: JobExecutionEvent) -> None:
  169. """
  170. 处理任务执行成功事件
  171. """
  172. job_id = str(event.job_id)
  173. retval = getattr(event, "retval", None)
  174. scheduled_run_time = getattr(event, "scheduled_run_time", None)
  175. log.info(f"任务 {job_id} 执行成功")
  176. if retval:
  177. log.debug(f"任务 {job_id} 返回值: {retval}")
  178. if scheduled_run_time:
  179. log.debug(f"任务 {job_id} 计划执行时间: {scheduled_run_time}")
  180. # 更新执行日志
  181. cls._update_latest_job_log(
  182. job_id=job_id,
  183. status="success",
  184. result=str(retval) if retval else None,
  185. )
  186. # 为周期性任务创建新的 pending 状态日志,等待下次执行
  187. job = cls.get_job(job_id=job_id)
  188. if job:
  189. trigger_type = cls._get_trigger_type(job_id)
  190. if trigger_type in ("cron", "interval") and job.next_run_time:
  191. cls._create_job_log(
  192. job_id=job_id,
  193. job_name=job.name,
  194. trigger_type=trigger_type,
  195. status="pending",
  196. )
  197. log.debug(f"任务 {job_id} 已创建新的 pending 状态日志,等待下次执行")
  198. @classmethod
  199. def _handle_job_error(cls, event: JobExecutionEvent) -> None:
  200. """
  201. 处理任务执行失败事件
  202. """
  203. job_id = str(event.job_id)
  204. exception = getattr(event, "exception", None)
  205. traceback = getattr(event, "traceback", None)
  206. scheduled_run_time = getattr(event, "scheduled_run_time", None)
  207. log.error(f"任务 {job_id} 执行失败: {exception!s}")
  208. if traceback:
  209. log.debug(f"任务 {job_id} 错误堆栈: {traceback}")
  210. if scheduled_run_time:
  211. log.debug(f"任务 {job_id} 计划执行时间: {scheduled_run_time}")
  212. # 更新执行日志
  213. cls._update_latest_job_log(
  214. job_id=job_id,
  215. status="failed",
  216. result="failed",
  217. error=str(exception) if exception else "未知错误",
  218. )
  219. # 为周期性任务创建新的 pending 状态日志,等待下次执行
  220. job = cls.get_job(job_id=job_id)
  221. if job:
  222. trigger_type = cls._get_trigger_type(job_id)
  223. if trigger_type in ("cron", "interval") and job.next_run_time:
  224. cls._create_job_log(
  225. job_id=job_id,
  226. job_name=job.name,
  227. trigger_type=trigger_type,
  228. status="pending",
  229. )
  230. log.debug(f"任务 {job_id} 已创建新的 pending 状态日志,等待下次执行")
  231. @classmethod
  232. def _handle_job_missed(cls, event: JobEvent) -> None:
  233. """
  234. 处理任务错过执行时间事件
  235. """
  236. job_id = str(event.job_id)
  237. job = cls.get_job(job_id=job_id)
  238. log.warning(f"任务 {job_id} 错过执行时间")
  239. if job:
  240. log.debug(f"任务 {job_id} ({job.name}) 错过执行")
  241. # 更新执行日志
  242. cls._update_latest_job_log(
  243. job_id=job_id,
  244. status="timeout",
  245. result="timeout",
  246. error="任务错过执行时间",
  247. )
  248. # 为周期性任务创建新的 pending 状态日志,等待下次执行
  249. if job:
  250. trigger_type = cls._get_trigger_type(job_id)
  251. if trigger_type in ("cron", "interval") and job.next_run_time:
  252. cls._create_job_log(
  253. job_id=job_id,
  254. job_name=job.name,
  255. trigger_type=trigger_type,
  256. status="pending",
  257. )
  258. log.debug(f"任务 {job_id} 已创建新的 pending 状态日志,等待下次执行")
  259. @classmethod
  260. def _handle_job_removed(cls, event: JobEvent) -> None:
  261. """
  262. 处理任务被移除事件
  263. 注意:APScheduler 对于一次性任务(DateTrigger)会先触发 JOB_REMOVED,
  264. 然后再触发 JOB_SUBMITTED 和 JOB_EXECUTED。因此:
  265. - 对于一次性任务,不应该在 JOB_REMOVED 时创建或更新日志
  266. - 只有周期性任务在移除时才需要更新日志状态
  267. """
  268. job_id = str(event.job_id)
  269. jobstore = getattr(event, "jobstore", "unknown")
  270. log.info(f"任务 {job_id} 从 {jobstore} 存储器中移除")
  271. # 检查是否是一次性任务(DateTrigger)
  272. # 如果任务已经不存在,说明可能是一次性任务执行后被自动移除
  273. # 这种情况下不需要更新日志,因为 JOB_EXECUTED 会处理
  274. job = cls.get_job(job_id=job_id)
  275. if job is None:
  276. # 任务已经被移除,可能是一次性任务
  277. # 不需要在这里创建日志,JOB_SUBMITTED 和 JOB_EXECUTED 会处理
  278. log.debug(f"任务 {job_id} 已从调度器中移除(可能是一次性任务),跳过日志更新")
  279. return
  280. # 任务还存在,说明是周期性任务被手动移除
  281. # 更新执行日志
  282. cls._update_job_log_on_removed(job_id=job_id)
  283. @classmethod
  284. def _handle_job_added(cls, event: JobEvent) -> None:
  285. """
  286. 处理任务添加事件
  287. """
  288. job_id = str(event.job_id)
  289. jobstore = event.jobstore
  290. job = cls.get_job(job_id=job_id)
  291. if job:
  292. log.info(f"任务 {job_id} ({job.name}) 已添加到 {jobstore} 存储器")
  293. # 为周期性任务(cron/interval)创建初始的 pending 状态日志
  294. trigger_type = cls._get_trigger_type(job_id)
  295. if trigger_type in ("cron", "interval"):
  296. cls._create_job_log(
  297. job_id=job_id,
  298. job_name=job.name,
  299. trigger_type=trigger_type,
  300. status="pending",
  301. )
  302. log.debug(f"任务 {job_id} 已创建初始 pending 状态日志")
  303. else:
  304. log.info(f"任务 {job_id} 已添加到 {jobstore} 存储器")
  305. @classmethod
  306. def _handle_job_modified(cls, event: JobEvent) -> None:
  307. """
  308. 处理任务修改事件
  309. """
  310. job_id = str(event.job_id)
  311. jobstore = event.jobstore
  312. job = cls.get_job(job_id=job_id)
  313. if job:
  314. log.info(f"任务 {job_id} ({job.name}) 已在 {jobstore} 存储器中修改")
  315. else:
  316. log.info(f"任务 {job_id} 已在 {jobstore} 存储器中修改")
  317. @classmethod
  318. def _handle_scheduler_started(cls, event: SchedulerEvent) -> None:
  319. """
  320. 处理调度器启动事件
  321. """
  322. log.info("调度器已启动")
  323. cls._update_scheduler_status("running")
  324. @classmethod
  325. def _handle_scheduler_shutdown(cls, event: SchedulerEvent) -> None:
  326. """
  327. 处理调度器关闭事件
  328. """
  329. log.info("调度器已关闭")
  330. cls._update_scheduler_status("stopped")
  331. @classmethod
  332. def _handle_scheduler_paused(cls, event: SchedulerEvent) -> None:
  333. """
  334. 处理调度器暂停事件
  335. """
  336. log.info("调度器已暂停")
  337. cls._update_scheduler_status("paused")
  338. @classmethod
  339. def _handle_scheduler_resumed(cls, event: SchedulerEvent) -> None:
  340. """
  341. 处理调度器恢复事件
  342. """
  343. log.info("调度器已恢复运行")
  344. cls._update_scheduler_status("running")
  345. @classmethod
  346. def _handle_executor_added(cls, event: SchedulerEvent) -> None:
  347. """
  348. 处理执行器添加事件
  349. """
  350. alias = event.alias
  351. if alias:
  352. log.info(f"执行器 {alias} 已添加到调度器")
  353. cls._update_executor_info(alias, "added")
  354. else:
  355. log.warning("执行器添加事件,但别名为空")
  356. @classmethod
  357. def _handle_executor_removed(cls, event: SchedulerEvent) -> None:
  358. """
  359. 处理执行器移除事件
  360. """
  361. alias = event.alias
  362. if alias:
  363. log.info(f"执行器 {alias} 已从调度器中移除")
  364. cls._update_executor_info(alias, "removed")
  365. else:
  366. log.warning("执行器移除事件,但别名为空")
  367. @classmethod
  368. def _handle_jobstore_added(cls, event: SchedulerEvent) -> None:
  369. """
  370. 处理 JobStore 添加事件
  371. """
  372. alias = event.alias
  373. if alias:
  374. log.info(f"JobStore {alias} 已添加到调度器")
  375. cls._update_jobstore_info(alias, "added")
  376. else:
  377. log.warning("JobStore 添加事件,但别名为空")
  378. @classmethod
  379. def _handle_jobstore_removed(cls, event: SchedulerEvent) -> None:
  380. """
  381. 处理 JobStore 移除事件
  382. """
  383. alias = event.alias
  384. if alias:
  385. log.info(f"JobStore {alias} 已从调度器中移除")
  386. cls._update_jobstore_info(alias, "removed")
  387. else:
  388. log.warning("JobStore 移除事件,但别名为空")
  389. @classmethod
  390. def _handle_all_jobs_removed(cls, event: SchedulerEvent) -> None:
  391. """
  392. 处理所有任务移除事件
  393. 注意:清空调度器任务不应该清空执行日志,而是将所有 pending 状态的日志更新为 cancelled
  394. """
  395. log.info("所有任务已从调度器中移除")
  396. cls._cancel_all_pending_job_logs()
  397. @classmethod
  398. def _handle_job_max_instances(cls, event: JobEvent) -> None:
  399. """
  400. 处理任务达到最大实例数事件
  401. """
  402. job_id = str(event.job_id)
  403. log.warning(f"任务 {job_id} 已达到最大实例数限制,无法启动新实例")
  404. @classmethod
  405. def _handle_other_event(cls, event: SchedulerEvent | JobEvent | JobExecutionEvent | JobSubmissionEvent) -> None:
  406. """
  407. 处理其他事件
  408. """
  409. event_code = event.code
  410. event_type = type(event).__name__
  411. log.debug(f"收到未处理的事件: {event_type} (code: {event_code})")
  412. @classmethod
  413. def _update_scheduler_status(cls, status: str) -> None:
  414. """
  415. 更新调度器状态到系统参数
  416. 参数:
  417. - status (str): 调度器状态 (running/stopped/paused)
  418. """
  419. try:
  420. from sqlalchemy.orm import Session
  421. from app.api.v1.module_system.params.model import ParamsModel
  422. with Session(engine) as session:
  423. param = session.query(ParamsModel).filter(ParamsModel.config_key == "scheduler_status").first()
  424. if param:
  425. param.config_value = status
  426. else:
  427. param = ParamsModel(
  428. config_name="调度器状态",
  429. config_key="scheduler_status",
  430. config_value=status,
  431. config_type=True,
  432. )
  433. session.add(param)
  434. session.commit()
  435. log.debug(f"调度器状态已更新: {status}")
  436. except Exception as e:
  437. log.error(f"更新调度器状态失败: {e!s}", exc_info=True)
  438. @classmethod
  439. def _update_executor_info(cls, alias: str | None, action: str) -> None:
  440. """
  441. 更新执行器信息到系统参数
  442. 参数:
  443. - alias (str | None): 执行器别名
  444. - action (str): 操作 (added/removed)
  445. """
  446. if not alias:
  447. log.warning("执行器别名为空,跳过更新")
  448. return
  449. try:
  450. from sqlalchemy.orm import Session
  451. from app.api.v1.module_system.params.model import ParamsModel
  452. key = f"executor_{alias}"
  453. with Session(engine) as session:
  454. param = session.query(ParamsModel).filter(ParamsModel.config_key == key).first()
  455. if action == "added":
  456. if param:
  457. param.config_value = "active"
  458. else:
  459. param = ParamsModel(
  460. config_name=f"执行器 {alias}",
  461. config_key=key,
  462. config_value="active",
  463. config_type=True,
  464. )
  465. session.add(param)
  466. log.debug(f"执行器 {alias} 已标记为活跃")
  467. elif action == "removed":
  468. if param:
  469. param.config_value = "inactive"
  470. log.debug(f"执行器 {alias} 已标记为非活跃")
  471. session.commit()
  472. except Exception as e:
  473. log.error(f"更新执行器信息失败: {e!s}", exc_info=True)
  474. @classmethod
  475. def _update_jobstore_info(cls, alias: str | None, action: str) -> None:
  476. """
  477. 更新 JobStore 信息到系统参数
  478. 参数:
  479. - alias (str | None): JobStore 别名
  480. - action (str): 操作 (added/removed)
  481. """
  482. if not alias:
  483. log.warning("JobStore 别名为空,跳过更新")
  484. return
  485. try:
  486. from sqlalchemy.orm import Session
  487. from app.api.v1.module_system.params.model import ParamsModel
  488. key = f"jobstore_{alias}"
  489. with Session(engine) as session:
  490. param = session.query(ParamsModel).filter(ParamsModel.config_key == key).first()
  491. if action == "added":
  492. if param:
  493. param.config_value = "active"
  494. else:
  495. param = ParamsModel(
  496. config_name=f"JobStore {alias}",
  497. config_key=key,
  498. config_value="active",
  499. config_type=True,
  500. )
  501. session.add(param)
  502. log.debug(f"JobStore {alias} 已标记为活跃")
  503. elif action == "removed":
  504. if param:
  505. param.config_value = "inactive"
  506. log.debug(f"JobStore {alias} 已标记为非活跃")
  507. session.commit()
  508. except Exception as e:
  509. log.error(f"更新 JobStore 信息失败: {e!s}", exc_info=True)
  510. @classmethod
  511. def _clear_all_job_logs(cls) -> None:
  512. """
  513. 清空所有任务日志(仅用于手动清空,不建议使用)
  514. """
  515. try:
  516. from sqlalchemy.orm import Session
  517. from app.plugin.module_task.cronjob.job.model import JobModel
  518. with Session(engine) as session:
  519. session.query(JobModel).delete()
  520. session.commit()
  521. log.info("所有任务日志已清空")
  522. except Exception as e:
  523. log.error(f"清空任务日志失败: {e!s}", exc_info=True)
  524. @classmethod
  525. def _cancel_all_pending_job_logs(cls) -> None:
  526. """
  527. 将所有 pending 状态的执行日志更新为 cancelled
  528. 用于清空调度器任务时,不删除日志而是更新状态
  529. """
  530. try:
  531. from sqlalchemy.orm import Session
  532. from app.plugin.module_task.cronjob.job.model import JobModel
  533. with Session(engine) as session:
  534. session.query(JobModel).filter(JobModel.status == "pending").update(
  535. {"status": "cancelled"}
  536. )
  537. session.commit()
  538. log.info("所有待执行任务日志已标记为已取消")
  539. except Exception as e:
  540. log.error(f"取消待执行任务日志失败: {e!s}", exc_info=True)
  541. @classmethod
  542. def _get_trigger_type(cls, job_id: str) -> str:
  543. """
  544. 获取任务的触发类型
  545. """
  546. job = cls.get_job(job_id=job_id)
  547. if not job:
  548. return "manual"
  549. trigger = job.trigger
  550. if isinstance(trigger, CronTrigger):
  551. return "cron"
  552. elif isinstance(trigger, IntervalTrigger):
  553. return "interval"
  554. elif isinstance(trigger, DateTrigger):
  555. if trigger.run_date:
  556. now = datetime.now(trigger.run_date.tzinfo)
  557. diff = abs((trigger.run_date - now).total_seconds())
  558. if diff < 60:
  559. return "manual"
  560. return "date"
  561. return "manual"
  562. @classmethod
  563. async def init_scheduler(cls, redis: Redis | None = None) -> None:
  564. """
  565. 应用启动时初始化定时任务调度器。
  566. 参数:
  567. - redis (Redis | None): 可选 Redis 实例,供任务侧使用。
  568. 返回:
  569. - None
  570. """
  571. if redis:
  572. cls.redis_instance = redis
  573. scheduler.start()
  574. scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
  575. scheduler.resume()
  576. @classmethod
  577. def _task_wrapper(cls, job_id: str | int, code_block: str | None, *args, **kwargs):
  578. """
  579. 任务执行包装器,执行自定义代码块(同步版本,用于 ThreadPoolExecutor)
  580. 支持完整的 Python 语法,包括 import 语句
  581. """
  582. import types
  583. def run_sync_handler():
  584. """
  585. 在独立模块命名空间中执行代码块并调用 handler。
  586. 返回:
  587. - Any: handler 返回值;无代码块时为 None。
  588. """
  589. if not code_block:
  590. return None
  591. # 创建一个新的模块作为执行环境
  592. module = types.ModuleType(f"node_task_{job_id}")
  593. module.__dict__["__builtins__"] = __builtins__
  594. # 在模块环境中执行代码
  595. exec(code_block, module.__dict__)
  596. # 获取 handler 函数
  597. handler = module.__dict__.get("handler")
  598. if handler and callable(handler):
  599. return handler(*args, **kwargs)
  600. raise ValueError("代码块必须定义 handler(*args, **kwargs) 函数")
  601. try:
  602. result = run_sync_handler()
  603. return result
  604. except Exception as e:
  605. log.error(f"任务 {job_id} 执行失败: {e!s}")
  606. raise
  607. @classmethod
  608. def _get_job_state(cls, job) -> str | None:
  609. """
  610. 获取任务状态(解析为可读的JSON格式)
  611. """
  612. import json
  613. import pickle
  614. if not job:
  615. return None
  616. state = job.__getstate__()
  617. def serialize_value(obj):
  618. """
  619. 将 job state 中的嵌套对象转为可 JSON 化的 Python 结构。
  620. 参数:
  621. - obj (Any): 任意嵌套对象。
  622. 返回:
  623. - Any: 标量、dict、list 或简化后的描述。
  624. """
  625. if obj is None:
  626. return None
  627. if isinstance(obj, (str, int, float, bool)):
  628. return obj
  629. if isinstance(obj, bytes):
  630. try:
  631. decoded = pickle.loads(obj)
  632. return serialize_value(decoded)
  633. except Exception:
  634. return obj.decode("utf-8", errors="replace")
  635. if isinstance(obj, dict):
  636. return {k: serialize_value(v) for k, v in obj.items()}
  637. if isinstance(obj, (list, tuple)):
  638. return [serialize_value(item) for item in obj]
  639. if hasattr(obj, "__dict__"):
  640. obj_dict = {}
  641. for k, v in obj.__dict__.items():
  642. if not k.startswith("_"):
  643. obj_dict[k] = serialize_value(v)
  644. return {"__class__": obj.__class__.__name__, **obj_dict}
  645. try:
  646. return str(obj)
  647. except Exception:
  648. return f"<{type(obj).__name__}>"
  649. parsed_state = serialize_value(state)
  650. return json.dumps(parsed_state, ensure_ascii=False, indent=2)
  651. @classmethod
  652. def get_job_state_from_blob(cls, blob_data: bytes) -> Any:
  653. """
  654. 从 BLOB 数据反序列化任务状态
  655. 参数:
  656. - blob_data: apscheduler_jobs 表中的 job_state 字段(BLOB 类型)
  657. 返回:
  658. - 反序列化后的任务状态
  659. """
  660. import pickle
  661. if not blob_data:
  662. return None
  663. def serialize_value(obj: Any) -> Any:
  664. """
  665. 递归反序列化 BLOB 中的嵌套结构为可 JSON 化数据。
  666. 参数:
  667. - obj (Any): 节点对象。
  668. 返回:
  669. - Any: 标量、dict、list 或字符串化结果。
  670. """
  671. if obj is None:
  672. return None
  673. if isinstance(obj, (str, int, float, bool)):
  674. return obj
  675. if isinstance(obj, bytes):
  676. try:
  677. decoded = pickle.loads(obj)
  678. return serialize_value(decoded)
  679. except Exception:
  680. return obj.decode("utf-8", errors="replace")
  681. if isinstance(obj, dict):
  682. return {k: serialize_value(v) for k, v in obj.items()}
  683. if isinstance(obj, (list, tuple)):
  684. return [serialize_value(item) for item in obj]
  685. if hasattr(obj, "__dict__"):
  686. obj_dict = {}
  687. for k, v in obj.__dict__.items():
  688. if not k.startswith("_"):
  689. obj_dict[k] = serialize_value(v)
  690. return {"__class__": obj.__class__.__name__, **obj_dict}
  691. try:
  692. return str(obj)
  693. except Exception:
  694. return f"<{type(obj).__name__}>"
  695. try:
  696. state = pickle.loads(blob_data)
  697. return serialize_value(state)
  698. except Exception as e:
  699. return {"error": str(e), "raw_data": str(blob_data[:200])}
  700. @classmethod
  701. def _create_job_log(cls, job_id: str, job_name: str | None = None, trigger_type: str = "manual", status: str = "running") -> int | None:
  702. """
  703. 创建执行日志
  704. """
  705. from sqlalchemy.orm import Session
  706. from app.plugin.module_task.cronjob.job.model import JobModel
  707. try:
  708. job = cls.get_job(job_id=job_id)
  709. next_run_time = str(job.next_run_time) if job and job.next_run_time else None
  710. job_state = cls._get_job_state(job) if job else None
  711. # 如果没有传入 job_name,尝试从 job 获取
  712. if not job_name and job:
  713. job_name = job.name
  714. with Session(engine) as session:
  715. job_log = JobModel(
  716. job_id=job_id,
  717. job_name=job_name,
  718. trigger_type=trigger_type,
  719. status=status,
  720. next_run_time=next_run_time,
  721. job_state=job_state,
  722. )
  723. session.add(job_log)
  724. session.commit()
  725. log.info(f"执行日志创建成功: job_id={job_id}, id={job_log.id}")
  726. return job_log.id
  727. except Exception as e:
  728. log.error(f"创建执行日志失败: job_id={job_id}, error={e}", exc_info=True)
  729. return None
  730. @classmethod
  731. def _update_job_log(cls, job_id: str, status: str, result: str | None = None, error: str | None = None) -> None:
  732. """
  733. 更新执行日志(更新该 job_id 最新的 pending 状态日志)
  734. 用于周期性任务在提交执行时将 pending 更新为 running
  735. """
  736. from sqlalchemy.orm import Session
  737. from app.plugin.module_task.cronjob.job.model import JobModel
  738. job = cls.get_job(job_id=job_id)
  739. next_run_time = str(job.next_run_time) if job and job.next_run_time else None
  740. job_state = cls._get_job_state(job) if job else None
  741. with Session(engine) as session:
  742. job_log = (
  743. session.query(JobModel)
  744. .filter(JobModel.job_id == job_id, JobModel.status == "pending")
  745. .order_by(JobModel.created_time.desc())
  746. .first()
  747. )
  748. if job_log:
  749. job_log.status = status
  750. if next_run_time:
  751. job_log.next_run_time = next_run_time
  752. if job_state:
  753. job_log.job_state = job_state
  754. if result:
  755. job_log.result = result
  756. if error:
  757. job_log.error = error
  758. session.commit()
  759. else:
  760. log.warning(f"未找到任务 {job_id} 的待执行日志记录")
  761. @classmethod
  762. def _update_latest_job_log(cls, job_id: str, status: str, result: str | None = None, error: str | None = None) -> None:
  763. """
  764. 更新最新的执行日志(更新该 job_id 最新的一条日志)
  765. 用于每次执行完成后更新状态
  766. """
  767. from sqlalchemy.orm import Session
  768. from app.plugin.module_task.cronjob.job.model import JobModel
  769. try:
  770. job = cls.get_job(job_id=job_id)
  771. next_run_time = str(job.next_run_time) if job and job.next_run_time else None
  772. job_state = cls._get_job_state(job) if job else None
  773. with Session(engine) as session:
  774. # 首先尝试更新 running 状态的日志
  775. job_log = (
  776. session.query(JobModel)
  777. .filter(JobModel.job_id == job_id, JobModel.status == "running")
  778. .order_by(JobModel.created_time.desc())
  779. .first()
  780. )
  781. if job_log:
  782. job_log.status = status
  783. if next_run_time:
  784. job_log.next_run_time = next_run_time
  785. if job_state:
  786. job_log.job_state = job_state
  787. if result:
  788. job_log.result = result
  789. if error:
  790. job_log.error = error
  791. session.commit()
  792. log.info(f"执行日志更新成功: job_id={job_id}, id={job_log.id}, status={status}")
  793. return
  794. # 没有找到 running 状态的日志,尝试更新 cancelled 状态的日志
  795. # 这种情况发生在 EVENT_JOB_REMOVED 先于 EVENT_JOB_SUBMITTED 触发时
  796. job_log = (
  797. session.query(JobModel)
  798. .filter(JobModel.job_id == job_id, JobModel.status == "cancelled")
  799. .order_by(JobModel.created_time.desc())
  800. .first()
  801. )
  802. if job_log:
  803. job_log.status = status
  804. if next_run_time:
  805. job_log.next_run_time = next_run_time
  806. if job_state:
  807. job_log.job_state = job_state
  808. if result:
  809. job_log.result = result
  810. if error:
  811. job_log.error = error
  812. session.commit()
  813. log.info(f"执行日志更新成功: job_id={job_id}, id={job_log.id}, status={status}")
  814. return
  815. # 创建新的日志记录
  816. log.debug(f"未找到任务 {job_id} 的日志记录,创建新日志")
  817. trigger_type = cls._get_trigger_type(job_id) if job else "manual"
  818. new_log = JobModel(
  819. job_id=job_id,
  820. job_name=job.name if job else None,
  821. trigger_type=trigger_type,
  822. status=status,
  823. next_run_time=next_run_time,
  824. job_state=job_state,
  825. result=result,
  826. error=error,
  827. )
  828. session.add(new_log)
  829. session.commit()
  830. log.info(f"执行日志创建成功: job_id={job_id}, id={new_log.id}, status={status}")
  831. except Exception as e:
  832. log.error(f"更新执行日志失败: job_id={job_id}, status={status}, error={e}", exc_info=True)
  833. @classmethod
  834. def _update_job_log_on_removed(cls, job_id: str) -> None:
  835. """
  836. 任务被移除时,更新最新的 pending 或 running 状态日志为 cancelled
  837. 事件触发顺序分析:
  838. 1. 一次性任务(manual/date):
  839. - EVENT_JOB_SUBMITTED -> 创建日志(status=running)
  840. - EVENT_JOB_EXECUTED/ERROR -> 更新日志(status=success/failed)
  841. - EVENT_JOB_REMOVED -> 日志已更新,不会被标记为 cancelled
  842. 2. 周期性任务(cron/interval):
  843. - EVENT_JOB_SUBMITTED -> 创建日志(status=running)
  844. - EVENT_JOB_EXECUTED/ERROR -> 更新日志(status=success/failed)
  845. - 下次执行 -> EVENT_JOB_SUBMITTED -> 创建新日志(status=running)
  846. - EVENT_JOB_REMOVED -> 将 pending/running 标记为 cancelled
  847. 3. 特殊情况:
  848. - 一次性任务在执行前被删除:running -> cancelled
  849. - 周期性任务在 pending 状态被删除:pending -> cancelled
  850. 注意:
  851. - 只有当任务还在 pending 或 running 状态时才更新为 cancelled
  852. - 如果任务已经执行完成(success/failed/timeout),则不需要更新
  853. - 如果任务已经标记为 cancelled,则不需要更新
  854. """
  855. from sqlalchemy.orm import Session
  856. from app.plugin.module_task.cronjob.job.model import JobModel
  857. with Session(engine) as session:
  858. job_log = (
  859. session.query(JobModel)
  860. .filter(
  861. JobModel.job_id == job_id,
  862. JobModel.status.in_(['pending', 'running'])
  863. )
  864. .order_by(JobModel.created_time.desc())
  865. .first()
  866. )
  867. if job_log:
  868. job_log.status = "cancelled"
  869. session.commit()
  870. log.info(f"任务 {job_id} 的执行日志已标记为已取消")
  871. @classmethod
  872. def get_job_status(cls, job_id: str | int) -> str:
  873. """
  874. 获取单个任务的当前状态文案。
  875. 参数:
  876. - job_id (str | int): 调度器任务 ID。
  877. 返回:
  878. - str: 运行中 / 暂停中 / 已停止 / 未知 等。
  879. """
  880. job = cls.get_job(job_id=str(job_id))
  881. if not job:
  882. return "未知"
  883. # 判断是否暂停:next_run_time 为 None 表示任务已暂停
  884. if job.next_run_time is None:
  885. return "暂停中"
  886. if scheduler.state == 0:
  887. return "已停止"
  888. return "运行中"
  889. @classmethod
  890. def add_and_run_job_now(cls, job_info: NodeModel) -> Job:
  891. """
  892. 立即执行任务(加入调度器并尽快触发一次)。
  893. 参数:
  894. - job_info (NodeModel): 节点/任务配置。
  895. 返回:
  896. - Job: APScheduler Job 对象。
  897. """
  898. # 使用稍微延迟的时间,确保事件监听器能够捕获事件
  899. from datetime import timedelta
  900. trigger = DateTrigger(run_date=datetime.now() + timedelta(seconds=0.1))
  901. job = cls._add_job_with_trigger(job_info, trigger)
  902. # 注意:不需要手动创建执行日志,EVENT_JOB_SUBMITTED 事件会自动创建
  903. return job
  904. @classmethod
  905. def add_cron_job(
  906. cls,
  907. job_info: NodeModel,
  908. trigger_args: str | None = None,
  909. start_date: str | None = None,
  910. end_date: str | None = None,
  911. ) -> Job:
  912. """
  913. 创建 Cron 定时任务。
  914. 参数:
  915. - job_info (NodeModel): 任务信息。
  916. - trigger_args (str | None): Cron 表达式,默认取节点配置。
  917. - start_date (str | None): 开始时间。
  918. - end_date (str | None): 结束时间。
  919. 返回:
  920. - Job: 已注册的 APScheduler Job。
  921. """
  922. cron_expr = trigger_args or job_info.trigger_args
  923. if not cron_expr:
  924. raise ValueError("Cron触发器缺少参数")
  925. fields = cron_expr.strip().split()
  926. if len(fields) not in (6, 7):
  927. raise ValueError("无效的 Cron 表达式")
  928. if not CronUtil.validate_cron_expression(cron_expr):
  929. raise ValueError(f"Cron表达式不正确: {cron_expr}")
  930. parsed_fields = [field if field != "?" else "*" for field in fields]
  931. if len(fields) == 6:
  932. parsed_fields.append("*")
  933. second, minute, hour, day, month, day_of_week, year = tuple(parsed_fields)
  934. if second == "*" and minute == "*" and hour == "*" and day == "*" and month == "*" and day_of_week in ("*", "?"):
  935. raise ValueError("Cron表达式不允许每秒执行,请至少指定秒数(如:0 * * * * ? * 表示每分钟执行)")
  936. trigger = CronTrigger(
  937. second=second,
  938. minute=minute,
  939. hour=hour,
  940. day=day,
  941. month=month,
  942. day_of_week=day_of_week,
  943. year=year,
  944. start_date=start_date or job_info.start_date,
  945. end_date=end_date or job_info.end_date,
  946. timezone="Asia/Shanghai",
  947. )
  948. return cls._add_job_with_trigger(job_info, trigger)
  949. @classmethod
  950. def add_interval_job(
  951. cls,
  952. job_info: NodeModel,
  953. trigger_args: str | None = None,
  954. start_date: str | None = None,
  955. end_date: str | None = None,
  956. ) -> Job:
  957. """
  958. 创建间隔执行任务。
  959. 参数:
  960. - job_info (NodeModel): 任务信息。
  961. - trigger_args (str | None): 间隔参数「秒 分 时 天 周」,默认取节点配置。
  962. - start_date (str | None): 开始时间。
  963. - end_date (str | None): 结束时间。
  964. 返回:
  965. - Job: 已注册的 APScheduler Job。
  966. """
  967. interval_args = trigger_args or job_info.trigger_args
  968. if not interval_args:
  969. raise ValueError("interval触发器缺少参数")
  970. fields = interval_args.strip().split()
  971. if len(fields) != 5:
  972. raise ValueError("无效的 interval 表达式,格式: 秒 分 时 天 周")
  973. second, minute, hour, day, week = tuple(
  974. int(field) if field != "*" else 0 for field in fields
  975. )
  976. trigger = IntervalTrigger(
  977. weeks=week,
  978. days=day,
  979. hours=hour,
  980. minutes=minute,
  981. seconds=second,
  982. start_date=start_date or job_info.start_date,
  983. end_date=end_date or job_info.end_date,
  984. timezone="Asia/Shanghai",
  985. )
  986. return cls._add_job_with_trigger(job_info, trigger)
  987. @classmethod
  988. def add_date_job(cls, job_info: NodeModel, run_date: str | None = None) -> Job:
  989. """
  990. 创建指定时刻执行一次的任务。
  991. 参数:
  992. - job_info (NodeModel): 任务信息。
  993. - run_date (str | None): 执行时间字符串,默认取节点 trigger 配置。
  994. 返回:
  995. - Job: 已注册的 APScheduler Job。
  996. """
  997. date_str = run_date or job_info.trigger_args
  998. if not date_str:
  999. raise ValueError("date触发器缺少执行时间参数")
  1000. trigger = DateTrigger(run_date=date_str, timezone="Asia/Shanghai")
  1001. return cls._add_job_with_trigger(job_info, trigger)
  1002. @classmethod
  1003. def _add_job_with_trigger(cls, job_info: NodeModel, trigger) -> Job:
  1004. """
  1005. 添加任务到调度器
  1006. """
  1007. code_block = job_info.func
  1008. if not code_block or not code_block.strip():
  1009. raise ValueError("任务代码块不能为空")
  1010. jobstore = job_info.jobstore or "sqlalchemy"
  1011. executor = job_info.executor or "threadpool"
  1012. job_args = []
  1013. if job_info.args:
  1014. args_str = str(job_info.args).strip()
  1015. if args_str:
  1016. job_args = [arg.strip() for arg in args_str.split(",") if arg.strip()]
  1017. job_kwargs = {}
  1018. if job_info.kwargs:
  1019. kwargs_str = str(job_info.kwargs).strip()
  1020. if kwargs_str:
  1021. try:
  1022. job_kwargs = json.loads(kwargs_str)
  1023. except json.JSONDecodeError:
  1024. raise ValueError(f"关键字参数JSON格式无效: {kwargs_str}")
  1025. # 缓存 job_name,用于 EVENT_JOB_SUBMITTED 时获取
  1026. cls._job_name_cache[str(job_info.id)] = job_info.name or ""
  1027. try:
  1028. job = scheduler.add_job(
  1029. func=cls._task_wrapper,
  1030. trigger=trigger,
  1031. args=[str(job_info.id), code_block, *job_args],
  1032. kwargs=job_kwargs,
  1033. id=str(job_info.id),
  1034. name=job_info.name,
  1035. coalesce=job_info.coalesce,
  1036. max_instances=1,
  1037. jobstore=jobstore,
  1038. executor=executor,
  1039. )
  1040. log.info(f"任务 {job_info.id} 添加到 {jobstore} 存储器成功")
  1041. return job
  1042. except ConflictingIdError:
  1043. scheduler.remove_job(job_id=str(job_info.id), jobstore=jobstore)
  1044. job = scheduler.add_job(
  1045. func=cls._task_wrapper,
  1046. trigger=trigger,
  1047. args=[str(job_info.id), code_block, *job_args],
  1048. kwargs=job_kwargs,
  1049. id=str(job_info.id),
  1050. name=job_info.name,
  1051. coalesce=job_info.coalesce,
  1052. max_instances=1,
  1053. jobstore=jobstore,
  1054. executor=executor,
  1055. )
  1056. log.info(f"任务 {job_info.id} 已存在,已移除旧任务并重新添加")
  1057. return job
  1058. @classmethod
  1059. def start(cls, paused: bool = False) -> None:
  1060. """
  1061. 启动全局调度器。
  1062. 参数:
  1063. - paused (bool): 是否以暂停状态启动。
  1064. 返回:
  1065. - None
  1066. """
  1067. scheduler.start(paused=paused)
  1068. @classmethod
  1069. async def shutdown(cls, wait: bool = False):
  1070. """
  1071. 关闭调度器。
  1072. 参数:
  1073. - wait (bool): 是否等待当前任务结束。
  1074. 返回:
  1075. - 与 APScheduler shutdown 返回值一致。
  1076. """
  1077. return scheduler.shutdown(wait=wait)
  1078. @classmethod
  1079. def configure(cls, gconfig: dict | None = None, prefix: str = "apscheduler.", **options) -> None:
  1080. """
  1081. 透传配置底层 APScheduler。
  1082. 参数:
  1083. - gconfig (dict | None): 全局配置字典。
  1084. - prefix (str): 配置键前缀。
  1085. - **options: 其它 configure 关键字参数。
  1086. 返回:
  1087. - None
  1088. """
  1089. scheduler.configure(gconfig or {}, prefix, **options)
  1090. @classmethod
  1091. def pause(cls) -> None:
  1092. """
  1093. 暂停调度器。
  1094. 返回:
  1095. - None
  1096. """
  1097. scheduler.pause()
  1098. @classmethod
  1099. def resume(cls) -> None:
  1100. """
  1101. 恢复调度器。
  1102. 返回:
  1103. - None
  1104. """
  1105. scheduler.resume()
  1106. @classmethod
  1107. def is_running(cls) -> bool:
  1108. """
  1109. 调度器是否处于运行态。
  1110. 返回:
  1111. - bool: 是否 running。
  1112. """
  1113. return scheduler.running
  1114. @classmethod
  1115. def get_scheduler_state(cls) -> str:
  1116. """
  1117. 将调度器内部 state 码映射为中文状态。
  1118. 返回:
  1119. - str: 停止 / 运行中 / 暂停 / 未知。
  1120. """
  1121. if scheduler.state == 0:
  1122. return "停止"
  1123. if scheduler.state == 1:
  1124. return "运行中"
  1125. if scheduler.state == 2:
  1126. return "暂停"
  1127. return "未知"
  1128. @classmethod
  1129. def get_job(cls, job_id: str | int, jobstore: str | None = None) -> Job | None:
  1130. """
  1131. 按 ID 获取单个任务。
  1132. 参数:
  1133. - job_id (str | int): 任务 ID。
  1134. - jobstore (str | None): 存储器别名。
  1135. 返回:
  1136. - Job | None: 任务对象或不存在。
  1137. """
  1138. return scheduler.get_job(str(job_id), jobstore)
  1139. @classmethod
  1140. def get_jobs(cls, jobstore: str | None = None) -> list[Job]:
  1141. """
  1142. 列出指定存储器中的任务。
  1143. 参数:
  1144. - jobstore (str | None): 存储器别名,None 表示默认存储。
  1145. 返回:
  1146. - list[Job]: 任务列表。
  1147. """
  1148. return scheduler.get_jobs(jobstore)
  1149. @classmethod
  1150. def get_all_jobs(cls) -> list[Job]:
  1151. """
  1152. 列出所有存储器中的任务。
  1153. 返回:
  1154. - list[Job]: 任务列表。
  1155. """
  1156. return scheduler.get_jobs()
  1157. @classmethod
  1158. def remove_job(cls, job_id: str | int, jobstore: str | None = None) -> None:
  1159. """
  1160. 从调度器移除指定任务。
  1161. 参数:
  1162. - job_id (str | int): 任务 ID。
  1163. - jobstore (str | None): 存储器别名。
  1164. 返回:
  1165. - None
  1166. """
  1167. scheduler.remove_job(str(job_id), jobstore)
  1168. @classmethod
  1169. def clear_jobs(cls) -> None:
  1170. """
  1171. 移除所有存储器中的全部任务。
  1172. 返回:
  1173. - None
  1174. """
  1175. scheduler.remove_all_jobs()
  1176. @classmethod
  1177. def print_jobs(cls, jobstore: str | None = None) -> str:
  1178. """
  1179. 打印调度器任务信息
  1180. 参数:
  1181. - jobstore: 存储器别名,None 表示所有存储器
  1182. 返回:
  1183. - str: 格式化的任务信息
  1184. """
  1185. import io
  1186. output = io.StringIO()
  1187. scheduler.print_jobs(jobstore=jobstore, out=output)
  1188. return output.getvalue()
  1189. @classmethod
  1190. def sync_jobs_to_db(cls) -> int:
  1191. """
  1192. 将调度器中的任务同步到数据库
  1193. 返回:
  1194. - int: 同步的任务数量
  1195. """
  1196. from sqlalchemy.orm import Session
  1197. from app.plugin.module_task.cronjob.job.model import JobModel
  1198. jobs = cls.get_all_jobs()
  1199. sync_count = 0
  1200. with Session(engine) as session:
  1201. for job in jobs:
  1202. existing_log = (
  1203. session.query(JobModel)
  1204. .filter(JobModel.job_id == str(job.id), JobModel.status == "pending")
  1205. .first()
  1206. )
  1207. if not existing_log:
  1208. job_log = JobModel(
  1209. job_id=str(job.id),
  1210. job_name=job.name,
  1211. trigger_type=cls._get_trigger_type(str(job.id)),
  1212. status="pending",
  1213. next_run_time=str(job.next_run_time) if job.next_run_time else None,
  1214. job_state=cls._get_job_state(job),
  1215. )
  1216. session.add(job_log)
  1217. sync_count += 1
  1218. session.commit()
  1219. return sync_count
  1220. @classmethod
  1221. def pause_job(cls, job_id: str | int, jobstore: str | None = None) -> Job | None:
  1222. """
  1223. 暂停单个任务。
  1224. 参数:
  1225. - job_id (str | int): 任务 ID。
  1226. - jobstore (str | None): 存储器别名。
  1227. 返回:
  1228. - Job | None: 暂停后的 Job 或 None。
  1229. """
  1230. return scheduler.pause_job(str(job_id), jobstore)
  1231. @classmethod
  1232. def resume_job(cls, job_id: str | int, jobstore: str | None = None) -> Job | None:
  1233. """
  1234. 恢复单个任务。
  1235. 参数:
  1236. - job_id (str | int): 任务 ID。
  1237. - jobstore (str | None): 存储器别名。
  1238. 返回:
  1239. - Job | None: 恢复后的 Job 或 None。
  1240. """
  1241. return scheduler.resume_job(str(job_id), jobstore)
  1242. @classmethod
  1243. def modify_job(cls, job_id: str | int, jobstore: str | None = None, **changes) -> Job | None:
  1244. """
  1245. 修改已存在任务的属性。
  1246. 参数:
  1247. - job_id (str | int): 任务 ID。
  1248. - jobstore (str | None): 存储器别名。
  1249. - **changes: 传给 modify_job 的变更字段。
  1250. 返回:
  1251. - Job | None: 修改后的 Job 或 None。
  1252. """
  1253. return scheduler.modify_job(str(job_id), jobstore, **changes)
  1254. @classmethod
  1255. def run_job_now(cls, job_id: str | int, jobstore: str | None = None) -> Job | None:
  1256. """
  1257. 立即执行任务(通过临时 Job,不修改原任务 trigger)。
  1258. 参数:
  1259. - job_id (str | int): 原任务 ID。
  1260. - jobstore (str | None): 存储器别名。
  1261. 返回:
  1262. - Job | None: 临时任务对象;原任务不存在时为 None。
  1263. 注意:
  1264. - 不改变原任务的触发器配置,仅追加一次性执行。
  1265. """
  1266. job = cls.get_job(job_id=job_id, jobstore=jobstore)
  1267. if not job:
  1268. return None
  1269. # 创建一个新的临时任务 ID
  1270. temp_job_id = f"{job_id}_run_now_{datetime.now().timestamp()}"
  1271. # 缓存 job_name 和原任务 ID,用于 EVENT_JOB_SUBMITTED 时获取
  1272. # 格式: (原任务ID, 任务名称)
  1273. cls._job_name_cache[temp_job_id] = (str(job_id), f"{job.name}(立即执行)")
  1274. # 创建临时任务,延迟 0.1 秒执行,确保事件监听器能够捕获事件
  1275. from datetime import timedelta
  1276. trigger = DateTrigger(run_date=datetime.now() + timedelta(seconds=0.1), timezone="Asia/Shanghai")
  1277. temp_job = scheduler.add_job(
  1278. func=job.func,
  1279. trigger=trigger,
  1280. args=job.args,
  1281. kwargs=job.kwargs,
  1282. id=temp_job_id,
  1283. name=f"{job.name}(立即执行)",
  1284. jobstore=jobstore or "default",
  1285. executor=job.executor,
  1286. max_instances=1,
  1287. )
  1288. log.info(f"任务 {job_id} 已触发立即执行,临时任务 ID: {temp_job_id}")
  1289. return temp_job