service.py 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. import os
  2. import re
  3. import shutil
  4. import urllib.parse
  5. from datetime import datetime
  6. from pathlib import Path
  7. from typing import Any
  8. from urllib.parse import urlparse
  9. from fastapi import UploadFile
  10. from app.config.setting import settings
  11. from app.core.exceptions import CustomException
  12. from app.core.logger import log
  13. from app.utils.excel_util import ExcelUtil
  14. from app.utils.upload_util import DANGEROUS_EXTENSIONS, MIME_TYPE_MAPPING
  15. from .schema import (
  16. ResourceCopySchema,
  17. ResourceCreateDirSchema,
  18. ResourceDirectorySchema,
  19. ResourceItemSchema,
  20. ResourceMoveSchema,
  21. ResourceRenameSchema,
  22. ResourceSearchQueryParam,
  23. ResourceUploadSchema,
  24. )
  25. class ResourceService:
  26. """
  27. 资源管理模块服务层 - 管理系统静态文件目录(仅管理 upload 目录)
  28. """
  29. # 配置常量
  30. MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
  31. MAX_SEARCH_RESULTS = 1000 # 最大搜索结果数
  32. MAX_PATH_DEPTH = 20 # 最大路径深度
  33. @classmethod
  34. def _get_resource_root(cls) -> str:
  35. """
  36. 获取资源管理根目录(仅允许访问 upload 目录)
  37. 返回:
  38. - str: 资源管理根目录路径(upload 目录)。
  39. """
  40. if not settings.STATIC_ENABLE:
  41. raise CustomException(msg="静态文件服务未启用")
  42. # 限制只能管理 upload 目录
  43. upload_root = os.path.join(str(settings.STATIC_ROOT), "upload")
  44. # 确保 upload 目录存在
  45. os.makedirs(upload_root, exist_ok=True)
  46. return upload_root
  47. @classmethod
  48. def _get_safe_path(cls, path: str | None = None) -> str:
  49. """
  50. 获取安全的文件路径(加强版路径遍历防护)
  51. 参数:
  52. - path (str | None): 原始文件路径。
  53. 返回:
  54. - str: 安全的文件路径。
  55. """
  56. resource_root = cls._get_resource_root()
  57. if not path or not isinstance(path, str):
  58. return resource_root
  59. # 支持前端传递的完整URL或以STATIC_URL/ROOT_PATH+STATIC_URL开头的URL路径,转换为相对资源路径
  60. static_prefix = settings.STATIC_URL.rstrip("/")
  61. root_prefix = (
  62. settings.ROOT_PATH.rstrip("/") if getattr(settings, "ROOT_PATH", "") else ""
  63. )
  64. root_static_prefix = f"{root_prefix}{static_prefix}" if root_prefix else static_prefix
  65. def strip_prefix(p: str) -> str:
  66. """
  67. 去掉静态资源 URL 前缀,得到相对 upload 的路径片段。
  68. 参数:
  69. - p (str): 原始路径或 URL 路径段。
  70. 返回:
  71. - str: 去掉已知前缀后的路径。
  72. """
  73. if p.startswith(root_static_prefix):
  74. return p[len(root_static_prefix) :].lstrip("/")
  75. if p.startswith(static_prefix):
  76. return p[len(static_prefix) :].lstrip("/")
  77. return p
  78. if path.startswith(("http://", "https://")):
  79. parsed = urlparse(path)
  80. url_path = parsed.path or ""
  81. path = strip_prefix(url_path)
  82. else:
  83. path = strip_prefix(path)
  84. # 清理路径,规范化斜杠
  85. path = path.strip().replace("//", "/").replace("\\\\\\\\", "/").replace("\\\\", "/")
  86. # 移除开头的 /,将路径视为相对于 resource_root
  87. if path.startswith("/"):
  88. path = path[1:]
  89. # 如果路径以 upload/ 开头,去掉 upload/ 前缀
  90. # 因为 _get_resource_root() 已经指向了 upload 目录
  91. if path.startswith("upload/"):
  92. path = path[7:] # len("upload/") = 7
  93. # 检查路径遍历攻击
  94. if ".." in path or "\x00" in path:
  95. log.error(f"检测到路径遍历攻击尝试: {path}")
  96. raise CustomException(msg="非法的路径格式")
  97. # URL 解码检查
  98. decoded_path = urllib.parse.unquote(path)
  99. if ".." in decoded_path:
  100. log.error(f"检测到编码后的路径遍历攻击: {path}")
  101. raise CustomException(msg="非法的路径格式")
  102. # 构建完整路径
  103. safe_path = os.path.normpath(os.path.join(resource_root, path))
  104. # 获取绝对路径并规范化
  105. resource_root_abs = os.path.normpath(os.path.abspath(resource_root))
  106. safe_path_abs = os.path.normpath(os.path.abspath(safe_path))
  107. # 核心安全检查:确保最终路径在允许的根目录下
  108. if not safe_path_abs.startswith(resource_root_abs + os.sep) and safe_path_abs != resource_root_abs:
  109. log.error(f"路径遍历攻击被阻止: 尝试访问 {safe_path_abs}, 但根目录是 {resource_root_abs}")
  110. raise CustomException(msg="访问路径不在允许范围内")
  111. # 检查路径深度
  112. try:
  113. relative_path = os.path.relpath(safe_path_abs, resource_root_abs)
  114. if relative_path.count(os.sep) > cls.MAX_PATH_DEPTH:
  115. raise CustomException(msg="路径深度超过限制")
  116. except ValueError:
  117. raise CustomException(msg="无效的路径")
  118. return safe_path_abs
  119. @classmethod
  120. def _path_exists(cls, path: str) -> bool:
  121. """
  122. 检查路径是否存在
  123. 参数:
  124. - path (str): 要检查的路径。
  125. 返回:
  126. - bool: 如果路径存在则返回True,否则返回False。
  127. """
  128. try:
  129. safe_path = cls._get_safe_path(path)
  130. return os.path.exists(safe_path)
  131. except Exception as e:
  132. raise CustomException(msg=f"检查路径是否存在失败: {e!s}")
  133. @staticmethod
  134. def _sanitize_filename(filename: str) -> str:
  135. """
  136. 清理文件名,移除危险字符和路径穿越(加强版)。
  137. 参数:
  138. - filename (str): 原始文件名。
  139. 返回:
  140. - str: 安全的文件名。
  141. """
  142. if not filename:
  143. return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  144. # 首先检查原始文件名是否包含路径遍历特征
  145. # 攻击者可能使用 ..\..\etc\passwd 或 ../../etc/passwd
  146. dangerous_patterns = [
  147. r"\.\.", # .. 路径遍历
  148. r"[\/]", # 任何斜杠(目录分隔符)
  149. r"\x00", # 空字节
  150. r"%2e%2e", # URL 编码的 ..
  151. r"%252e%252e", # 双重 URL 编码的 ..
  152. ]
  153. for pattern in dangerous_patterns:
  154. if re.search(pattern, filename, re.IGNORECASE):
  155. log.error(f"检测到文件名路径遍历攻击: {filename}")
  156. # 返回安全文件名,不包含原始文件名
  157. return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  158. # URL 解码检查
  159. decoded = urllib.parse.unquote(filename)
  160. decoded_twice = urllib.parse.unquote(decoded)
  161. for check in [decoded, decoded_twice]:
  162. if ".." in check or "/" in check or "\\" in check:
  163. log.error(f"检测到编码后的文件名攻击: {filename}")
  164. return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  165. # 使用 os.path.basename 提取纯文件名(移除路径)
  166. filename = os.path.basename(filename)
  167. # 移除危险字符
  168. filename = re.sub(r'[<>:"|?*\x00-\x1f]', "", filename)
  169. # 防止多个连续点号(可能被用于绕过扩展名检查)
  170. filename = re.sub(r"\.{2,}", ".", filename)
  171. # 移除首尾的空格和点号
  172. filename = filename.strip(". ")
  173. # 如果文件名为空,生成默认文件名
  174. if not filename:
  175. filename = f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
  176. return filename
  177. @staticmethod
  178. def _detect_file_type(content: bytes) -> str | None:
  179. """
  180. 通过文件内容检测真实文件类型。
  181. 参数:
  182. - content (bytes): 文件内容(前几字节即可)。
  183. 返回:
  184. - str | None: 检测到的 MIME 类型,无法识别返回 None。
  185. """
  186. if content.startswith(b"\xff\xd8\xff"):
  187. return "image/jpeg"
  188. if content.startswith(b"\x89PNG\r\n\x1a\n"):
  189. return "image/png"
  190. if content.startswith(b"GIF87a") or content.startswith(b"GIF89a"):
  191. return "image/gif"
  192. if content.startswith(b"PK\x03\x04"):
  193. if b"[Content_Types].xml" in content[:1000]:
  194. return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  195. return "application/zip"
  196. if content.startswith(b"%PDF"):
  197. return "application/pdf"
  198. if content.startswith(b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1"):
  199. return "application/msword"
  200. return None
  201. @classmethod
  202. def _generate_http_url(cls, file_path: str, base_url: str | None = None) -> str:
  203. """
  204. 生成文件的HTTP URL
  205. 参数:
  206. - file_path (str): 文件的绝对路径。
  207. - base_url (str | None): 基础URL,用于生成完整URL。
  208. 返回:
  209. - str: 文件的HTTP URL。
  210. """
  211. # 使用 STATIC_ROOT 作为基准,而不是 _get_resource_root()
  212. # 这样可以保留 upload 目录在 URL 中
  213. static_root = str(settings.STATIC_ROOT)
  214. try:
  215. relative_path = os.path.relpath(file_path, static_root)
  216. # 确保路径使用正斜杠(URL格式)
  217. url_path = relative_path.replace(os.sep, "/")
  218. except ValueError:
  219. # 如果无法计算相对路径,使用文件名
  220. url_path = os.path.basename(file_path)
  221. # 如果提供了base_url,使用它生成完整URL,否则使用settings.STATIC_URL
  222. if base_url:
  223. # 使用完整的 base_url(包含 API 路径前缀)
  224. base_part = base_url.rstrip("/")
  225. static_part = settings.STATIC_URL.lstrip("/")
  226. file_part = url_path.lstrip("/")
  227. http_url = f"{base_part}/{static_part}/{file_part}".replace("//", "/").replace(":/", "://")
  228. else:
  229. http_url = f"{settings.STATIC_URL}/{url_path}".replace("//", "/")
  230. return http_url
  231. @classmethod
  232. def _get_file_info(cls, file_path: str, base_url: str | None = None) -> dict:
  233. """
  234. 获取文件或目录的详细信息,如名称、大小、创建时间、修改时间、路径、深度、HTTP URL、是否隐藏、是否为目录等。
  235. 参数:
  236. - file_path (str): 文件或目录的路径(必须是绝对路径)。
  237. - base_url (str | None): 基础URL,用于生成完整URL。
  238. 返回:
  239. - dict: 文件或目录的详细信息字典。
  240. """
  241. try:
  242. # 直接使用传入的路径(已经是绝对路径)
  243. safe_path = file_path
  244. if not os.path.exists(safe_path):
  245. return {}
  246. stat = os.stat(safe_path)
  247. path_obj = Path(safe_path)
  248. resource_root = cls._get_resource_root()
  249. # 计算相对路径(相对于资源根目录)
  250. try:
  251. relative_path = os.path.relpath(safe_path, resource_root)
  252. except ValueError:
  253. relative_path = os.path.basename(safe_path)
  254. # 生成HTTP URL路径
  255. http_url = cls._generate_http_url(safe_path, base_url)
  256. # 检查是否为隐藏文件(文件名以点开头)
  257. is_hidden = path_obj.name.startswith(".")
  258. # 对于目录,设置is_directory字段(兼容前端)
  259. is_directory = os.path.isdir(safe_path)
  260. # 将datetime对象转换为ISO格式的字符串,确保JSON序列化成功
  261. created_time = datetime.fromtimestamp(stat.st_ctime).isoformat()
  262. modified_time = datetime.fromtimestamp(stat.st_mtime).isoformat()
  263. return {
  264. "name": path_obj.name,
  265. "file_url": http_url, # 统一使用file_url字段
  266. "relative_path": relative_path,
  267. "is_file": os.path.isfile(safe_path),
  268. "is_dir": is_directory,
  269. "size": stat.st_size if os.path.isfile(safe_path) else None,
  270. "created_time": created_time,
  271. "modified_time": modified_time,
  272. "is_hidden": is_hidden,
  273. }
  274. except Exception as e:
  275. log.error(f"获取文件信息失败: {e!s}")
  276. return {}
  277. @classmethod
  278. async def get_directory_list_service(
  279. cls,
  280. path: str | None = None,
  281. include_hidden: bool = False,
  282. base_url: str | None = None,
  283. ) -> dict:
  284. """
  285. 获取目录列表
  286. 参数:
  287. - path (str | None): 目录路径。如果未指定,将使用静态文件根目录。
  288. - include_hidden (bool): 是否包含隐藏文件。
  289. - base_url (str | None): 基础URL,用于生成完整URL。
  290. 返回:
  291. - dict: 包含目录列表和统计信息的字典。
  292. """
  293. try:
  294. # 如果没有指定路径,使用静态文件根目录
  295. if path is None:
  296. safe_path = cls._get_resource_root()
  297. display_path = cls._generate_http_url(safe_path, base_url)
  298. else:
  299. safe_path = cls._get_safe_path(path)
  300. display_path = cls._generate_http_url(safe_path, base_url)
  301. if not os.path.exists(safe_path):
  302. raise CustomException(msg="目录不存在")
  303. if not os.path.isdir(safe_path):
  304. raise CustomException(msg="路径不是目录")
  305. items = []
  306. total_files = 0
  307. total_dirs = 0
  308. total_size = 0
  309. try:
  310. for item_name in os.listdir(safe_path):
  311. # 跳过隐藏文件
  312. if not include_hidden and item_name.startswith("."):
  313. continue
  314. item_path = os.path.join(safe_path, item_name)
  315. file_info = cls._get_file_info(item_path, base_url)
  316. if file_info:
  317. items.append(ResourceItemSchema(**file_info))
  318. if file_info["is_file"]:
  319. total_files += 1
  320. total_size += file_info.get("size", 0) or 0
  321. elif file_info["is_dir"]:
  322. total_dirs += 1
  323. except PermissionError:
  324. raise CustomException(msg="没有权限访问此目录")
  325. return ResourceDirectorySchema(
  326. path=display_path, # 返回HTTP URL路径而不是文件系统路径
  327. name=os.path.basename(safe_path),
  328. items=items,
  329. total_files=total_files,
  330. total_dirs=total_dirs,
  331. total_size=total_size,
  332. ).model_dump()
  333. except CustomException:
  334. raise
  335. except Exception as e:
  336. log.error(f"获取目录列表失败: {e!s}")
  337. raise CustomException(msg=f"获取目录列表失败: {e!s}")
  338. @classmethod
  339. async def get_resources_list_service(
  340. cls,
  341. search: ResourceSearchQueryParam | None = None,
  342. order_by: str | None = None,
  343. base_url: str | None = None,
  344. ) -> list[dict]:
  345. """
  346. 搜索资源列表(用于分页和导出)
  347. 参数:
  348. - search (ResourceSearchQueryParam | None): 查询参数模型。
  349. - order_by (str | None): 排序参数。
  350. - base_url (str | None): 基础URL,用于生成完整URL。
  351. 返回:
  352. - list[dict]: 资源详情字典列表。
  353. """
  354. try:
  355. # 确定搜索路径
  356. if search and hasattr(search, "path") and search.path and isinstance(search.path, str):
  357. resource_root = cls._get_safe_path(search.path)
  358. else:
  359. resource_root = cls._get_resource_root()
  360. # 检查路径是否存在
  361. if not os.path.exists(resource_root):
  362. raise CustomException(msg="目录不存在")
  363. if not os.path.isdir(resource_root):
  364. raise CustomException(msg="路径不是目录")
  365. # 收集资源
  366. all_resources = []
  367. try:
  368. for item_name in os.listdir(resource_root):
  369. # 跳过隐藏文件
  370. if item_name.startswith("."):
  371. continue
  372. item_path = os.path.join(resource_root, item_name)
  373. file_info = cls._get_file_info(item_path, base_url)
  374. if file_info:
  375. # 应用名称过滤
  376. if search and hasattr(search, "name") and search.name and search.name[1]:
  377. search_keyword = search.name[1].lower()
  378. if search_keyword not in file_info.get("name", "").lower():
  379. continue
  380. all_resources.append(file_info)
  381. except PermissionError:
  382. raise CustomException(msg="没有权限访问此目录")
  383. # 应用排序
  384. sorted_resources = cls._sort_results(all_resources, order_by)
  385. # 限制最大结果数
  386. if len(sorted_resources) > cls.MAX_SEARCH_RESULTS:
  387. sorted_resources = sorted_resources[: cls.MAX_SEARCH_RESULTS]
  388. return sorted_resources
  389. except Exception as e:
  390. log.error(f"搜索资源失败: {e!s}")
  391. raise CustomException(msg=f"搜索资源失败: {e!s}")
  392. @classmethod
  393. async def export_resource_service(cls, data_list: list[dict]) -> bytes:
  394. """
  395. 导出资源列表
  396. 参数:
  397. - data_list (list[dict]): 资源详情字典列表。
  398. 返回:
  399. - bytes: Excel文件的二进制数据。
  400. """
  401. mapping_dict = {
  402. "name": "文件名",
  403. "path": "文件路径",
  404. "size": "文件大小",
  405. "created_time": "创建时间",
  406. "modified_time": "修改时间",
  407. "parent_path": "父目录",
  408. }
  409. # 复制数据并转换状态
  410. export_data = data_list.copy()
  411. # 格式化文件大小
  412. for item in export_data:
  413. if item.get("size"):
  414. item["size"] = cls._format_file_size(item["size"])
  415. return ExcelUtil.export_list2excel(list_data=export_data, mapping_dict=mapping_dict)
  416. @classmethod
  417. async def _get_directory_stats(cls, path: str, include_hidden: bool = False) -> dict[str, int]:
  418. """
  419. 递归获取目录统计信息
  420. 参数:
  421. - path (str): 目录路径。
  422. - include_hidden (bool): 是否包含隐藏文件。
  423. 返回:
  424. - dict[str, int]: 包含文件数、目录数和总大小的字典。
  425. """
  426. stats = {"files": 0, "dirs": 0, "size": 0}
  427. try:
  428. for root, dirs, files in os.walk(path):
  429. # 过滤隐藏目录
  430. if not include_hidden:
  431. dirs[:] = [d for d in dirs if not d.startswith(".")]
  432. files = [f for f in files if not f.startswith(".")]
  433. stats["dirs"] += len(dirs)
  434. stats["files"] += len(files)
  435. for file in files:
  436. file_path = os.path.join(root, file)
  437. try:
  438. stats["size"] += os.path.getsize(file_path)
  439. except OSError:
  440. continue
  441. except Exception:
  442. pass
  443. return stats
  444. @classmethod
  445. def _sort_results(
  446. cls, results: list[dict], order_by: str | None = None
  447. ) -> list[dict[Any, Any]]:
  448. """
  449. 排序搜索结果
  450. 参数:
  451. - results (list[dict]): 资源详情字典列表。
  452. - order_by (str | None): 排序参数。
  453. 返回:
  454. - list[dict]: 排序后的资源详情字典列表。
  455. """
  456. try:
  457. # 默认按名称升序排序
  458. if not order_by:
  459. return sorted(results, key=lambda x: x.get("name", ""), reverse=False)
  460. # 解析order_by参数,格式: [{'field':'asc/desc'}]
  461. sort_conditions = eval(order_by)
  462. if isinstance(sort_conditions, list):
  463. # 构建排序键函数
  464. def sort_key(item):
  465. """
  466. 按多条排序条件从资源项中抽取比较键(支持时间字段转 datetime)。
  467. 参数:
  468. - item (dict): 单条资源详情字典。
  469. 返回:
  470. - list: 用于 `sorted` 的多字段键列表。
  471. """
  472. keys = []
  473. for cond in sort_conditions:
  474. field = cond.get("field", "name")
  475. cond.get("direction", "asc")
  476. # 获取字段值,默认为空字符串
  477. value = item.get(field, "")
  478. # 如果是日期字段,转换为可比较的格式
  479. if (
  480. field
  481. in [
  482. "created_time",
  483. "modified_time",
  484. "accessed_time",
  485. ]
  486. and value
  487. ):
  488. value = datetime.fromisoformat(value)
  489. keys.append(value)
  490. return keys
  491. # 确定排序方向(这里只支持单一方向,多个条件时使用第一个条件的方向)
  492. reverse = False
  493. if sort_conditions and isinstance(sort_conditions[0], dict):
  494. direction = sort_conditions[0].get("direction", "").lower()
  495. reverse = direction == "desc"
  496. return sorted(results, key=sort_key, reverse=reverse)
  497. # 如果排序条件不是列表,返回默认排序
  498. return sorted(results, key=lambda x: x.get("name", ""), reverse=False)
  499. except Exception as e:
  500. raise CustomException(msg=f"排序参数格式错误: {e!s}")
  501. @classmethod
  502. async def upload_file_service(
  503. cls,
  504. file: UploadFile,
  505. target_path: str | None = None,
  506. base_url: str | None = None,
  507. ) -> dict:
  508. """
  509. 上传文件到指定目录
  510. 参数:
  511. - file (UploadFile): 上传的文件对象。
  512. - target_path (str | None): 目标目录路径。
  513. - base_url (str | None): 基础URL,用于生成完整URL。
  514. 返回:
  515. - dict: 包含文件信息的字典。
  516. """
  517. if not file or not file.filename:
  518. raise CustomException(msg="请选择要上传的文件")
  519. original_filename = file.filename
  520. # 使用加强版的 _sanitize_filename 来清理文件名
  521. # 该方法已经包含了路径遍历检测
  522. safe_filename = cls._sanitize_filename(original_filename)
  523. # 如果文件名被重置为默认名称,说明检测到攻击
  524. if safe_filename.startswith("file_") and safe_filename != original_filename:
  525. log.error(f"文件名因安全问题被重置,原始文件名: {original_filename}")
  526. # 检查文件扩展名(使用清理后的文件名)
  527. if "." not in safe_filename:
  528. raise CustomException(msg="无法识别文件类型")
  529. ext = os.path.splitext(safe_filename)[1].lower()
  530. if not ext:
  531. raise CustomException(msg="无法识别文件类型")
  532. if ext in DANGEROUS_EXTENSIONS:
  533. log.error(f"尝试上传危险文件类型: {ext}")
  534. raise CustomException(msg=f"不允许上传此类型的文件: {ext}")
  535. try:
  536. content = await file.read()
  537. if len(content) > cls.MAX_UPLOAD_SIZE:
  538. raise CustomException(
  539. msg=f"文件太大,最大支持{cls.MAX_UPLOAD_SIZE // (1024 * 1024)}MB"
  540. )
  541. detected_type = cls._detect_file_type(content)
  542. if detected_type:
  543. expected_ext = MIME_TYPE_MAPPING.get(detected_type, "")
  544. if expected_ext and expected_ext != ext:
  545. log.warning(
  546. f"文件类型不匹配: 声明扩展名={ext}, 检测类型={detected_type}"
  547. )
  548. # 获取安全的目录路径(_get_safe_path 已经包含路径遍历防护)
  549. safe_dir = (
  550. cls._get_resource_root() if target_path is None else cls._get_safe_path(target_path)
  551. )
  552. # 确保目录存在
  553. os.makedirs(safe_dir, exist_ok=True)
  554. # 构建完整的文件路径
  555. file_path = os.path.join(safe_dir, safe_filename)
  556. # 最终安全检查:确保文件路径在允许的目录下
  557. file_path_abs = os.path.normpath(os.path.abspath(file_path))
  558. safe_dir_abs = os.path.normpath(os.path.abspath(safe_dir))
  559. resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
  560. # 检查文件路径是否在目标目录下
  561. if not file_path_abs.startswith(safe_dir_abs + os.sep) and file_path_abs != safe_dir_abs:
  562. log.error(f"检测到路径穿越攻击,目标路径: {file_path}")
  563. raise CustomException(msg="非法的文件路径")
  564. # 再次确保文件路径在资源根目录下(防止通过 target_path 绕过)
  565. if not file_path_abs.startswith(resource_root_abs + os.sep) and file_path_abs != resource_root_abs:
  566. log.error(f"检测到越权访问尝试,目标路径: {file_path_abs}, 根目录: {resource_root_abs}")
  567. raise CustomException(msg="访问路径不在允许范围内")
  568. if os.path.exists(file_path):
  569. base_name, extension = os.path.splitext(safe_filename)
  570. counter = 1
  571. while os.path.exists(file_path):
  572. new_filename = f"{base_name}_{counter}{extension}"
  573. file_path = os.path.join(safe_dir, new_filename)
  574. counter += 1
  575. safe_filename = os.path.basename(file_path)
  576. Path(file_path).write_bytes(content)
  577. file_info = cls._get_file_info(file_path, base_url)
  578. file_url = cls._generate_http_url(file_path, base_url)
  579. log.info(f"文件上传成功: {safe_filename}")
  580. return ResourceUploadSchema(
  581. filename=safe_filename,
  582. file_url=file_url,
  583. file_size=file_info.get("size", 0),
  584. upload_time=datetime.now(),
  585. ).model_dump(mode="json")
  586. except CustomException:
  587. raise
  588. except Exception as e:
  589. log.error(f"文件上传失败: {e!s}")
  590. raise CustomException(msg=f"文件上传失败: {e!s}")
  591. @classmethod
  592. async def download_file_service(cls, file_path: str, base_url: str | None = None) -> str:
  593. """
  594. 下载文件(返回本地文件系统路径)
  595. 参数:
  596. - file_path (str): 文件路径(可为相对路径、绝对路径或完整URL)。
  597. - base_url (str | None): 基础URL,用于生成完整URL(不再直接返回URL)。
  598. 返回:
  599. - str: 本地文件系统路径。
  600. """
  601. try:
  602. safe_path = cls._get_safe_path(file_path)
  603. if not os.path.exists(safe_path):
  604. raise CustomException(msg="文件不存在")
  605. if not os.path.isfile(safe_path):
  606. raise CustomException(msg="路径不是文件")
  607. # 返回本地文件路径给 FileResponse 使用
  608. log.info(f"定位文件路径: {safe_path}")
  609. return safe_path
  610. except CustomException:
  611. raise
  612. except Exception as e:
  613. log.error(f"下载文件失败: {e!s}")
  614. raise CustomException(msg=f"下载文件失败: {e!s}")
  615. @classmethod
  616. def _delete_single_path(cls, path: str) -> None:
  617. """
  618. 删除单个文件或目录(内部辅助方法)
  619. 参数:
  620. - path (str): 文件或目录路径。
  621. 返回:
  622. - None
  623. 异常:
  624. - CustomException: 删除失败时抛出
  625. """
  626. safe_path = cls._get_safe_path(path)
  627. if not os.path.exists(safe_path):
  628. log.error(f"路径不存在,跳过: {path}")
  629. raise CustomException(msg=f"路径不存在: {path}")
  630. if os.path.isfile(safe_path):
  631. os.remove(safe_path)
  632. log.info(f"删除文件成功: {safe_path}")
  633. elif os.path.isdir(safe_path):
  634. shutil.rmtree(safe_path)
  635. log.info(f"删除目录成功: {safe_path}")
  636. @classmethod
  637. async def delete_file_service(cls, paths: list[str]) -> None:
  638. """
  639. 删除文件或目录(内部使用,遇到错误会抛出异常)
  640. 参数:
  641. - paths (list[str]): 文件或目录路径列表。
  642. 返回:
  643. - None
  644. 注意:
  645. - 此方法遇到第一个错误就会抛出异常并停止
  646. - 如需批量删除并收集结果,请使用 batch_delete_service
  647. """
  648. if not paths:
  649. raise CustomException(msg="删除失败,删除路径不能为空")
  650. for path in paths:
  651. try:
  652. cls._delete_single_path(path)
  653. except Exception as e:
  654. log.error(f"删除失败 {path}: {e!s}")
  655. raise CustomException(msg=f"删除失败 {path}: {e!s}")
  656. @classmethod
  657. async def batch_delete_service(cls, paths: list[str]) -> dict[str, list[str]]:
  658. """
  659. 批量删除文件或目录
  660. 参数:
  661. - paths (list[str]): 文件或目录路径列表。
  662. 返回:
  663. - dict[str, list[str]]: 键 `success` / `failed` 对应成功与失败路径列表。
  664. """
  665. if not paths:
  666. raise CustomException(msg="删除失败,删除路径不能为空")
  667. success_paths = []
  668. failed_paths = []
  669. for path in paths:
  670. try:
  671. cls._delete_single_path(path)
  672. success_paths.append(path)
  673. except Exception:
  674. failed_paths.append(path)
  675. return {"success": success_paths, "failed": failed_paths}
  676. @classmethod
  677. async def move_file_service(cls, data: ResourceMoveSchema) -> None:
  678. """
  679. 移动文件或目录
  680. 参数:
  681. - data (ResourceMoveSchema): 包含源路径和目标路径的模型。
  682. 返回:
  683. - None
  684. """
  685. try:
  686. source_path = cls._get_safe_path(data.source_path)
  687. target_path = cls._get_safe_path(data.target_path)
  688. if not os.path.exists(source_path):
  689. raise CustomException(msg="源路径不存在")
  690. # 检查目标路径是否已存在
  691. if os.path.exists(target_path):
  692. if not data.overwrite:
  693. raise CustomException(msg="目标路径已存在")
  694. # 删除目标路径
  695. if os.path.isfile(target_path):
  696. os.remove(target_path)
  697. else:
  698. shutil.rmtree(target_path)
  699. # 确保目标目录存在
  700. target_dir = os.path.dirname(target_path)
  701. os.makedirs(target_dir, exist_ok=True)
  702. # 移动文件
  703. shutil.move(source_path, target_path)
  704. log.info(f"移动成功: {source_path} -> {target_path}")
  705. except CustomException:
  706. raise
  707. except Exception as e:
  708. log.error(f"移动失败: {e!s}")
  709. raise CustomException(msg=f"移动失败: {e!s}")
  710. @classmethod
  711. async def copy_file_service(cls, data: ResourceCopySchema) -> None:
  712. """
  713. 复制文件或目录
  714. 参数:
  715. - data (ResourceCopySchema): 包含源路径和目标路径的模型。
  716. 返回:
  717. - None
  718. """
  719. try:
  720. source_path = cls._get_safe_path(data.source_path)
  721. target_path = cls._get_safe_path(data.target_path)
  722. if not os.path.exists(source_path):
  723. raise CustomException(msg="源路径不存在")
  724. # 检查目标路径是否已存在
  725. if os.path.exists(target_path) and not data.overwrite:
  726. raise CustomException(msg="目标路径已存在")
  727. # 确保目标目录存在
  728. target_dir = os.path.dirname(target_path)
  729. os.makedirs(target_dir, exist_ok=True)
  730. # 复制文件或目录
  731. if os.path.isfile(source_path):
  732. shutil.copy2(source_path, target_path)
  733. else:
  734. shutil.copytree(source_path, target_path, dirs_exist_ok=data.overwrite)
  735. log.info(f"复制成功: {source_path} -> {target_path}")
  736. except CustomException:
  737. raise
  738. except Exception as e:
  739. log.error(f"复制失败: {e!s}")
  740. raise CustomException(msg=f"复制失败: {e!s}")
  741. @classmethod
  742. async def rename_file_service(cls, data: ResourceRenameSchema) -> None:
  743. """
  744. 重命名文件或目录
  745. 参数:
  746. - data (ResourceRenameSchema): 包含旧路径和新名称的模型。
  747. 返回:
  748. - None
  749. """
  750. try:
  751. old_path = cls._get_safe_path(data.old_path)
  752. if not os.path.exists(old_path):
  753. raise CustomException(msg="文件或目录不存在")
  754. # 清理新名称,防止路径遍历
  755. # 使用 _sanitize_filename 来清理,确保不包含路径分隔符
  756. safe_new_name = cls._sanitize_filename(data.new_name)
  757. # 如果新名称被重置,说明检测到攻击
  758. if safe_new_name.startswith("file_") and safe_new_name != data.new_name:
  759. log.error(f"重命名时检测到路径遍历攻击,原始名称: {data.new_name}")
  760. raise CustomException(msg="新名称包含非法字符")
  761. # 生成新路径
  762. parent_dir = os.path.dirname(old_path)
  763. new_path = os.path.join(parent_dir, safe_new_name)
  764. # 最终安全检查:确保新路径在允许的目录下
  765. new_path_abs = os.path.normpath(os.path.abspath(new_path))
  766. resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
  767. if not new_path_abs.startswith(resource_root_abs + os.sep) and new_path_abs != resource_root_abs:
  768. log.error(f"重命名时检测到越权访问: {new_path_abs}")
  769. raise CustomException(msg="目标路径不在允许范围内")
  770. if os.path.exists(new_path):
  771. raise CustomException(msg="目标名称已存在")
  772. # 重命名
  773. os.rename(old_path, new_path)
  774. log.info(f"重命名成功: {old_path} -> {new_path}")
  775. except CustomException:
  776. raise
  777. except Exception as e:
  778. log.error(f"重命名失败: {e!s}")
  779. raise CustomException(msg=f"重命名失败: {e!s}")
  780. @classmethod
  781. async def create_directory_service(cls, data: ResourceCreateDirSchema) -> None:
  782. """
  783. 创建目录
  784. 参数:
  785. - data (ResourceCreateDirSchema): 包含父目录路径和目录名称的模型。
  786. 返回:
  787. - None
  788. """
  789. try:
  790. parent_path = cls._get_safe_path(data.parent_path)
  791. if not os.path.exists(parent_path):
  792. raise CustomException(msg="父目录不存在")
  793. if not os.path.isdir(parent_path):
  794. raise CustomException(msg="父路径不是目录")
  795. # 清理目录名称,防止路径遍历(使用与文件名相同的清理逻辑)
  796. safe_dir_name = cls._sanitize_filename(data.dir_name)
  797. # 如果目录名被重置,说明检测到攻击
  798. if safe_dir_name.startswith("file_") and safe_dir_name != data.dir_name:
  799. log.error(f"创建目录时检测到路径遍历攻击,原始名称: {data.dir_name}")
  800. raise CustomException(msg="目录名称包含非法字符")
  801. # 生成新目录路径
  802. new_dir_path = os.path.join(parent_path, safe_dir_name)
  803. # 最终安全检查:确保新目录路径在允许的目录下
  804. new_dir_path_abs = os.path.normpath(os.path.abspath(new_dir_path))
  805. resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
  806. if not new_dir_path_abs.startswith(resource_root_abs + os.sep) and new_dir_path_abs != resource_root_abs:
  807. log.error(f"创建目录时检测到越权访问: {new_dir_path_abs}")
  808. raise CustomException(msg="目标路径不在允许范围内")
  809. if os.path.exists(new_dir_path):
  810. raise CustomException(msg="目录已存在")
  811. # 创建目录
  812. os.makedirs(new_dir_path)
  813. log.info(f"创建目录成功: {new_dir_path}")
  814. except CustomException:
  815. raise
  816. except Exception as e:
  817. log.error(f"创建目录失败: {e!s}")
  818. raise CustomException(msg=f"创建目录失败: {e!s}")
  819. @classmethod
  820. def _format_file_size(cls, size_bytes: int) -> str:
  821. """
  822. 格式化文件大小
  823. 参数:
  824. - size_bytes (int): 文件大小(字节)
  825. 返回:
  826. - str: 格式化后的文件大小字符串(例如:"123.45MB")
  827. """
  828. if size_bytes == 0:
  829. return "0B"
  830. size_names = ["B", "KB", "MB", "GB", "TB"]
  831. i = 0
  832. while size_bytes >= 1024 and i < len(size_names) - 1:
  833. size_bytes = int(size_bytes / 1024)
  834. i += 1
  835. return f"{size_bytes:.2f}{size_names[i]}"