service.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import platform
  2. import socket
  3. import time
  4. from pathlib import Path
  5. import psutil
  6. from app.utils.common_util import bytes2human
  7. from .schema import (
  8. CpuInfoSchema,
  9. DiskInfoSchema,
  10. MemoryInfoSchema,
  11. PyInfoSchema,
  12. ServerMonitorSchema,
  13. SysInfoSchema,
  14. )
  15. class ServerService:
  16. """服务监控模块服务层"""
  17. @classmethod
  18. async def get_server_monitor_info_service(cls) -> dict:
  19. """
  20. 获取服务器监控信息
  21. 返回:
  22. - Dict: 包含服务器监控信息的字典。
  23. """
  24. return ServerMonitorSchema(
  25. cpu=cls._get_cpu_info(),
  26. mem=cls._get_memory_info(),
  27. sys=cls._get_system_info(),
  28. py=cls._get_python_info(),
  29. disks=cls._get_disk_info(),
  30. ).model_dump()
  31. @classmethod
  32. def _get_cpu_info(cls) -> CpuInfoSchema:
  33. """
  34. 获取CPU信息
  35. 返回:
  36. - CpuInfoSchema: CPU信息模型。
  37. """
  38. cpu_times = psutil.cpu_times_percent()
  39. cpu_num = psutil.cpu_count(logical=True)
  40. if not cpu_num:
  41. cpu_num = 1
  42. return CpuInfoSchema(
  43. cpu_num=cpu_num,
  44. used=cpu_times.user,
  45. sys=cpu_times.system,
  46. free=cpu_times.idle,
  47. )
  48. @classmethod
  49. def _get_memory_info(cls) -> MemoryInfoSchema:
  50. """
  51. 获取内存信息
  52. 返回:
  53. - MemoryInfoSchema: 内存信息模型。
  54. """
  55. memory = psutil.virtual_memory()
  56. return MemoryInfoSchema(
  57. total=bytes2human(memory.total),
  58. used=bytes2human(memory.used),
  59. free=bytes2human(memory.free),
  60. usage=memory.percent,
  61. )
  62. @classmethod
  63. def _get_system_info(cls) -> SysInfoSchema:
  64. """
  65. 获取系统信息
  66. 返回:
  67. - SysInfoSchema: 系统信息模型。
  68. """
  69. hostname = socket.gethostname()
  70. return SysInfoSchema(
  71. computer_ip=socket.gethostbyname(hostname),
  72. computer_name=platform.node(),
  73. os_arch=platform.machine(),
  74. os_name=platform.platform(),
  75. user_dir=str(Path.cwd()),
  76. )
  77. @classmethod
  78. def _get_python_info(cls) -> PyInfoSchema:
  79. """
  80. 获取Python解释器信息
  81. 返回:
  82. - PyInfoSchema: Python解释器信息模型。
  83. """
  84. current_process = psutil.Process()
  85. memory = psutil.virtual_memory()
  86. process_memory = current_process.memory_info()
  87. start_time = current_process.create_time()
  88. run_time = ServerService._calculate_run_time(start_time)
  89. return PyInfoSchema(
  90. name=current_process.name(),
  91. version=platform.python_version(),
  92. start_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)),
  93. run_time=run_time,
  94. home=str(Path(current_process.exe())),
  95. memory_total=bytes2human(memory.available),
  96. memory_used=bytes2human(process_memory.rss),
  97. memory_free=bytes2human(memory.available - process_memory.rss),
  98. memory_usage=round((process_memory.rss / memory.available) * 100, 2),
  99. )
  100. @classmethod
  101. def _get_disk_info(cls) -> list[DiskInfoSchema]:
  102. """
  103. 获取磁盘信息
  104. 返回:
  105. - list[DiskInfoSchema]: 磁盘信息模型列表。
  106. """
  107. disk_info = []
  108. for partition in psutil.disk_partitions():
  109. try:
  110. # 使用mountpoint而不是device来获取磁盘使用情况
  111. usage = psutil.disk_usage(partition.mountpoint)
  112. mount_point = str(Path(partition.mountpoint))
  113. disk_info.append(
  114. DiskInfoSchema(
  115. dir_name=mount_point, # 使用mountpoint替代device
  116. sys_type_name=partition.fstype,
  117. type_name=f"本地固定磁盘({mount_point})",
  118. total=bytes2human(usage.total),
  119. used=bytes2human(usage.used),
  120. free=bytes2human(usage.free),
  121. usage=usage.percent, # 直接使用数字而不是字符串
  122. )
  123. )
  124. except (PermissionError, FileNotFoundError):
  125. # 明确指定可能的异常
  126. continue
  127. return disk_info
  128. @classmethod
  129. def _calculate_run_time(cls, start_time: float) -> str:
  130. """
  131. 计算运行时间
  132. 参数:
  133. - start_time (float): 进程启动时间(时间戳)
  134. 返回:
  135. - str: 格式化后的运行时间字符串(例如:"1天2小时3分钟")
  136. """
  137. difference = time.time() - start_time
  138. days = int(difference // (24 * 60 * 60))
  139. hours = int((difference % (24 * 60 * 60)) // (60 * 60))
  140. minutes = int((difference % (60 * 60)) // 60)
  141. return f"{days}天{hours}小时{minutes}分钟"