| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043 |
- import os
- import re
- import shutil
- import urllib.parse
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- from urllib.parse import urlparse
- from fastapi import UploadFile
- from app.config.setting import settings
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.excel_util import ExcelUtil
- from app.utils.upload_util import DANGEROUS_EXTENSIONS, MIME_TYPE_MAPPING
- from .schema import (
- ResourceCopySchema,
- ResourceCreateDirSchema,
- ResourceDirectorySchema,
- ResourceItemSchema,
- ResourceMoveSchema,
- ResourceRenameSchema,
- ResourceSearchQueryParam,
- ResourceUploadSchema,
- )
- class ResourceService:
- """
- 资源管理模块服务层 - 管理系统静态文件目录(仅管理 upload 目录)
- """
- # 配置常量
- MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
- MAX_SEARCH_RESULTS = 1000 # 最大搜索结果数
- MAX_PATH_DEPTH = 20 # 最大路径深度
- @classmethod
- def _get_resource_root(cls) -> str:
- """
- 获取资源管理根目录(仅允许访问 upload 目录)
- 返回:
- - str: 资源管理根目录路径(upload 目录)。
- """
- if not settings.STATIC_ENABLE:
- raise CustomException(msg="静态文件服务未启用")
- # 限制只能管理 upload 目录
- upload_root = os.path.join(str(settings.STATIC_ROOT), "upload")
- # 确保 upload 目录存在
- os.makedirs(upload_root, exist_ok=True)
- return upload_root
- @classmethod
- def _get_safe_path(cls, path: str | None = None) -> str:
- """
- 获取安全的文件路径(加强版路径遍历防护)
- 参数:
- - path (str | None): 原始文件路径。
- 返回:
- - str: 安全的文件路径。
- """
- resource_root = cls._get_resource_root()
- if not path or not isinstance(path, str):
- return resource_root
- # 支持前端传递的完整URL或以STATIC_URL/ROOT_PATH+STATIC_URL开头的URL路径,转换为相对资源路径
- static_prefix = settings.STATIC_URL.rstrip("/")
- root_prefix = (
- settings.ROOT_PATH.rstrip("/") if getattr(settings, "ROOT_PATH", "") else ""
- )
- root_static_prefix = f"{root_prefix}{static_prefix}" if root_prefix else static_prefix
- def strip_prefix(p: str) -> str:
- """
- 去掉静态资源 URL 前缀,得到相对 upload 的路径片段。
- 参数:
- - p (str): 原始路径或 URL 路径段。
- 返回:
- - str: 去掉已知前缀后的路径。
- """
- if p.startswith(root_static_prefix):
- return p[len(root_static_prefix) :].lstrip("/")
- if p.startswith(static_prefix):
- return p[len(static_prefix) :].lstrip("/")
- return p
- if path.startswith(("http://", "https://")):
- parsed = urlparse(path)
- url_path = parsed.path or ""
- path = strip_prefix(url_path)
- else:
- path = strip_prefix(path)
- # 清理路径,规范化斜杠
- path = path.strip().replace("//", "/").replace("\\\\\\\\", "/").replace("\\\\", "/")
- # 移除开头的 /,将路径视为相对于 resource_root
- if path.startswith("/"):
- path = path[1:]
- # 如果路径以 upload/ 开头,去掉 upload/ 前缀
- # 因为 _get_resource_root() 已经指向了 upload 目录
- if path.startswith("upload/"):
- path = path[7:] # len("upload/") = 7
- # 检查路径遍历攻击
- if ".." in path or "\x00" in path:
- log.error(f"检测到路径遍历攻击尝试: {path}")
- raise CustomException(msg="非法的路径格式")
- # URL 解码检查
- decoded_path = urllib.parse.unquote(path)
- if ".." in decoded_path:
- log.error(f"检测到编码后的路径遍历攻击: {path}")
- raise CustomException(msg="非法的路径格式")
- # 构建完整路径
- safe_path = os.path.normpath(os.path.join(resource_root, path))
- # 获取绝对路径并规范化
- resource_root_abs = os.path.normpath(os.path.abspath(resource_root))
- safe_path_abs = os.path.normpath(os.path.abspath(safe_path))
- # 核心安全检查:确保最终路径在允许的根目录下
- if not safe_path_abs.startswith(resource_root_abs + os.sep) and safe_path_abs != resource_root_abs:
- log.error(f"路径遍历攻击被阻止: 尝试访问 {safe_path_abs}, 但根目录是 {resource_root_abs}")
- raise CustomException(msg="访问路径不在允许范围内")
- # 检查路径深度
- try:
- relative_path = os.path.relpath(safe_path_abs, resource_root_abs)
- if relative_path.count(os.sep) > cls.MAX_PATH_DEPTH:
- raise CustomException(msg="路径深度超过限制")
- except ValueError:
- raise CustomException(msg="无效的路径")
- return safe_path_abs
- @classmethod
- def _path_exists(cls, path: str) -> bool:
- """
- 检查路径是否存在
- 参数:
- - path (str): 要检查的路径。
- 返回:
- - bool: 如果路径存在则返回True,否则返回False。
- """
- try:
- safe_path = cls._get_safe_path(path)
- return os.path.exists(safe_path)
- except Exception as e:
- raise CustomException(msg=f"检查路径是否存在失败: {e!s}")
- @staticmethod
- def _sanitize_filename(filename: str) -> str:
- """
- 清理文件名,移除危险字符和路径穿越(加强版)。
- 参数:
- - filename (str): 原始文件名。
- 返回:
- - str: 安全的文件名。
- """
- if not filename:
- return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- # 首先检查原始文件名是否包含路径遍历特征
- # 攻击者可能使用 ..\..\etc\passwd 或 ../../etc/passwd
- dangerous_patterns = [
- r"\.\.", # .. 路径遍历
- r"[\/]", # 任何斜杠(目录分隔符)
- r"\x00", # 空字节
- r"%2e%2e", # URL 编码的 ..
- r"%252e%252e", # 双重 URL 编码的 ..
- ]
- for pattern in dangerous_patterns:
- if re.search(pattern, filename, re.IGNORECASE):
- log.error(f"检测到文件名路径遍历攻击: {filename}")
- # 返回安全文件名,不包含原始文件名
- return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- # URL 解码检查
- decoded = urllib.parse.unquote(filename)
- decoded_twice = urllib.parse.unquote(decoded)
- for check in [decoded, decoded_twice]:
- if ".." in check or "/" in check or "\\" in check:
- log.error(f"检测到编码后的文件名攻击: {filename}")
- return f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- # 使用 os.path.basename 提取纯文件名(移除路径)
- filename = os.path.basename(filename)
- # 移除危险字符
- filename = re.sub(r'[<>:"|?*\x00-\x1f]', "", filename)
- # 防止多个连续点号(可能被用于绕过扩展名检查)
- filename = re.sub(r"\.{2,}", ".", filename)
- # 移除首尾的空格和点号
- filename = filename.strip(". ")
- # 如果文件名为空,生成默认文件名
- if not filename:
- filename = f"file_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- return filename
- @staticmethod
- def _detect_file_type(content: bytes) -> str | None:
- """
- 通过文件内容检测真实文件类型。
- 参数:
- - content (bytes): 文件内容(前几字节即可)。
- 返回:
- - str | None: 检测到的 MIME 类型,无法识别返回 None。
- """
- if content.startswith(b"\xff\xd8\xff"):
- return "image/jpeg"
- if content.startswith(b"\x89PNG\r\n\x1a\n"):
- return "image/png"
- if content.startswith(b"GIF87a") or content.startswith(b"GIF89a"):
- return "image/gif"
- if content.startswith(b"PK\x03\x04"):
- if b"[Content_Types].xml" in content[:1000]:
- return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- return "application/zip"
- if content.startswith(b"%PDF"):
- return "application/pdf"
- if content.startswith(b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1"):
- return "application/msword"
- return None
- @classmethod
- def _generate_http_url(cls, file_path: str, base_url: str | None = None) -> str:
- """
- 生成文件的HTTP URL
- 参数:
- - file_path (str): 文件的绝对路径。
- - base_url (str | None): 基础URL,用于生成完整URL。
- 返回:
- - str: 文件的HTTP URL。
- """
- # 使用 STATIC_ROOT 作为基准,而不是 _get_resource_root()
- # 这样可以保留 upload 目录在 URL 中
- static_root = str(settings.STATIC_ROOT)
- try:
- relative_path = os.path.relpath(file_path, static_root)
- # 确保路径使用正斜杠(URL格式)
- url_path = relative_path.replace(os.sep, "/")
- except ValueError:
- # 如果无法计算相对路径,使用文件名
- url_path = os.path.basename(file_path)
- # 如果提供了base_url,使用它生成完整URL,否则使用settings.STATIC_URL
- if base_url:
- # 使用完整的 base_url(包含 API 路径前缀)
- base_part = base_url.rstrip("/")
- static_part = settings.STATIC_URL.lstrip("/")
- file_part = url_path.lstrip("/")
- http_url = f"{base_part}/{static_part}/{file_part}".replace("//", "/").replace(":/", "://")
- else:
- http_url = f"{settings.STATIC_URL}/{url_path}".replace("//", "/")
- return http_url
- @classmethod
- def _get_file_info(cls, file_path: str, base_url: str | None = None) -> dict:
- """
- 获取文件或目录的详细信息,如名称、大小、创建时间、修改时间、路径、深度、HTTP URL、是否隐藏、是否为目录等。
- 参数:
- - file_path (str): 文件或目录的路径(必须是绝对路径)。
- - base_url (str | None): 基础URL,用于生成完整URL。
- 返回:
- - dict: 文件或目录的详细信息字典。
- """
- try:
- # 直接使用传入的路径(已经是绝对路径)
- safe_path = file_path
- if not os.path.exists(safe_path):
- return {}
- stat = os.stat(safe_path)
- path_obj = Path(safe_path)
- resource_root = cls._get_resource_root()
- # 计算相对路径(相对于资源根目录)
- try:
- relative_path = os.path.relpath(safe_path, resource_root)
- except ValueError:
- relative_path = os.path.basename(safe_path)
- # 生成HTTP URL路径
- http_url = cls._generate_http_url(safe_path, base_url)
- # 检查是否为隐藏文件(文件名以点开头)
- is_hidden = path_obj.name.startswith(".")
- # 对于目录,设置is_directory字段(兼容前端)
- is_directory = os.path.isdir(safe_path)
- # 将datetime对象转换为ISO格式的字符串,确保JSON序列化成功
- created_time = datetime.fromtimestamp(stat.st_ctime).isoformat()
- modified_time = datetime.fromtimestamp(stat.st_mtime).isoformat()
- return {
- "name": path_obj.name,
- "file_url": http_url, # 统一使用file_url字段
- "relative_path": relative_path,
- "is_file": os.path.isfile(safe_path),
- "is_dir": is_directory,
- "size": stat.st_size if os.path.isfile(safe_path) else None,
- "created_time": created_time,
- "modified_time": modified_time,
- "is_hidden": is_hidden,
- }
- except Exception as e:
- log.error(f"获取文件信息失败: {e!s}")
- return {}
- @classmethod
- async def get_directory_list_service(
- cls,
- path: str | None = None,
- include_hidden: bool = False,
- base_url: str | None = None,
- ) -> dict:
- """
- 获取目录列表
- 参数:
- - path (str | None): 目录路径。如果未指定,将使用静态文件根目录。
- - include_hidden (bool): 是否包含隐藏文件。
- - base_url (str | None): 基础URL,用于生成完整URL。
- 返回:
- - dict: 包含目录列表和统计信息的字典。
- """
- try:
- # 如果没有指定路径,使用静态文件根目录
- if path is None:
- safe_path = cls._get_resource_root()
- display_path = cls._generate_http_url(safe_path, base_url)
- else:
- safe_path = cls._get_safe_path(path)
- display_path = cls._generate_http_url(safe_path, base_url)
- if not os.path.exists(safe_path):
- raise CustomException(msg="目录不存在")
- if not os.path.isdir(safe_path):
- raise CustomException(msg="路径不是目录")
- items = []
- total_files = 0
- total_dirs = 0
- total_size = 0
- try:
- for item_name in os.listdir(safe_path):
- # 跳过隐藏文件
- if not include_hidden and item_name.startswith("."):
- continue
- item_path = os.path.join(safe_path, item_name)
- file_info = cls._get_file_info(item_path, base_url)
- if file_info:
- items.append(ResourceItemSchema(**file_info))
- if file_info["is_file"]:
- total_files += 1
- total_size += file_info.get("size", 0) or 0
- elif file_info["is_dir"]:
- total_dirs += 1
- except PermissionError:
- raise CustomException(msg="没有权限访问此目录")
- return ResourceDirectorySchema(
- path=display_path, # 返回HTTP URL路径而不是文件系统路径
- name=os.path.basename(safe_path),
- items=items,
- total_files=total_files,
- total_dirs=total_dirs,
- total_size=total_size,
- ).model_dump()
- except CustomException:
- raise
- except Exception as e:
- log.error(f"获取目录列表失败: {e!s}")
- raise CustomException(msg=f"获取目录列表失败: {e!s}")
- @classmethod
- async def get_resources_list_service(
- cls,
- search: ResourceSearchQueryParam | None = None,
- order_by: str | None = None,
- base_url: str | None = None,
- ) -> list[dict]:
- """
- 搜索资源列表(用于分页和导出)
- 参数:
- - search (ResourceSearchQueryParam | None): 查询参数模型。
- - order_by (str | None): 排序参数。
- - base_url (str | None): 基础URL,用于生成完整URL。
- 返回:
- - list[dict]: 资源详情字典列表。
- """
- try:
- # 确定搜索路径
- if search and hasattr(search, "path") and search.path and isinstance(search.path, str):
- resource_root = cls._get_safe_path(search.path)
- else:
- resource_root = cls._get_resource_root()
- # 检查路径是否存在
- if not os.path.exists(resource_root):
- raise CustomException(msg="目录不存在")
- if not os.path.isdir(resource_root):
- raise CustomException(msg="路径不是目录")
- # 收集资源
- all_resources = []
- try:
- for item_name in os.listdir(resource_root):
- # 跳过隐藏文件
- if item_name.startswith("."):
- continue
- item_path = os.path.join(resource_root, item_name)
- file_info = cls._get_file_info(item_path, base_url)
- if file_info:
- # 应用名称过滤
- if search and hasattr(search, "name") and search.name and search.name[1]:
- search_keyword = search.name[1].lower()
- if search_keyword not in file_info.get("name", "").lower():
- continue
- all_resources.append(file_info)
- except PermissionError:
- raise CustomException(msg="没有权限访问此目录")
- # 应用排序
- sorted_resources = cls._sort_results(all_resources, order_by)
- # 限制最大结果数
- if len(sorted_resources) > cls.MAX_SEARCH_RESULTS:
- sorted_resources = sorted_resources[: cls.MAX_SEARCH_RESULTS]
- return sorted_resources
- except Exception as e:
- log.error(f"搜索资源失败: {e!s}")
- raise CustomException(msg=f"搜索资源失败: {e!s}")
- @classmethod
- async def export_resource_service(cls, data_list: list[dict]) -> bytes:
- """
- 导出资源列表
- 参数:
- - data_list (list[dict]): 资源详情字典列表。
- 返回:
- - bytes: Excel文件的二进制数据。
- """
- mapping_dict = {
- "name": "文件名",
- "path": "文件路径",
- "size": "文件大小",
- "created_time": "创建时间",
- "modified_time": "修改时间",
- "parent_path": "父目录",
- }
- # 复制数据并转换状态
- export_data = data_list.copy()
- # 格式化文件大小
- for item in export_data:
- if item.get("size"):
- item["size"] = cls._format_file_size(item["size"])
- return ExcelUtil.export_list2excel(list_data=export_data, mapping_dict=mapping_dict)
- @classmethod
- async def _get_directory_stats(cls, path: str, include_hidden: bool = False) -> dict[str, int]:
- """
- 递归获取目录统计信息
- 参数:
- - path (str): 目录路径。
- - include_hidden (bool): 是否包含隐藏文件。
- 返回:
- - dict[str, int]: 包含文件数、目录数和总大小的字典。
- """
- stats = {"files": 0, "dirs": 0, "size": 0}
- try:
- for root, dirs, files in os.walk(path):
- # 过滤隐藏目录
- if not include_hidden:
- dirs[:] = [d for d in dirs if not d.startswith(".")]
- files = [f for f in files if not f.startswith(".")]
- stats["dirs"] += len(dirs)
- stats["files"] += len(files)
- for file in files:
- file_path = os.path.join(root, file)
- try:
- stats["size"] += os.path.getsize(file_path)
- except OSError:
- continue
- except Exception:
- pass
- return stats
- @classmethod
- def _sort_results(
- cls, results: list[dict], order_by: str | None = None
- ) -> list[dict[Any, Any]]:
- """
- 排序搜索结果
- 参数:
- - results (list[dict]): 资源详情字典列表。
- - order_by (str | None): 排序参数。
- 返回:
- - list[dict]: 排序后的资源详情字典列表。
- """
- try:
- # 默认按名称升序排序
- if not order_by:
- return sorted(results, key=lambda x: x.get("name", ""), reverse=False)
- # 解析order_by参数,格式: [{'field':'asc/desc'}]
- sort_conditions = eval(order_by)
- if isinstance(sort_conditions, list):
- # 构建排序键函数
- def sort_key(item):
- """
- 按多条排序条件从资源项中抽取比较键(支持时间字段转 datetime)。
- 参数:
- - item (dict): 单条资源详情字典。
- 返回:
- - list: 用于 `sorted` 的多字段键列表。
- """
- keys = []
- for cond in sort_conditions:
- field = cond.get("field", "name")
- cond.get("direction", "asc")
- # 获取字段值,默认为空字符串
- value = item.get(field, "")
- # 如果是日期字段,转换为可比较的格式
- if (
- field
- in [
- "created_time",
- "modified_time",
- "accessed_time",
- ]
- and value
- ):
- value = datetime.fromisoformat(value)
- keys.append(value)
- return keys
- # 确定排序方向(这里只支持单一方向,多个条件时使用第一个条件的方向)
- reverse = False
- if sort_conditions and isinstance(sort_conditions[0], dict):
- direction = sort_conditions[0].get("direction", "").lower()
- reverse = direction == "desc"
- return sorted(results, key=sort_key, reverse=reverse)
- # 如果排序条件不是列表,返回默认排序
- return sorted(results, key=lambda x: x.get("name", ""), reverse=False)
- except Exception as e:
- raise CustomException(msg=f"排序参数格式错误: {e!s}")
- @classmethod
- async def upload_file_service(
- cls,
- file: UploadFile,
- target_path: str | None = None,
- base_url: str | None = None,
- ) -> dict:
- """
- 上传文件到指定目录
- 参数:
- - file (UploadFile): 上传的文件对象。
- - target_path (str | None): 目标目录路径。
- - base_url (str | None): 基础URL,用于生成完整URL。
- 返回:
- - dict: 包含文件信息的字典。
- """
- if not file or not file.filename:
- raise CustomException(msg="请选择要上传的文件")
- original_filename = file.filename
- # 使用加强版的 _sanitize_filename 来清理文件名
- # 该方法已经包含了路径遍历检测
- safe_filename = cls._sanitize_filename(original_filename)
- # 如果文件名被重置为默认名称,说明检测到攻击
- if safe_filename.startswith("file_") and safe_filename != original_filename:
- log.error(f"文件名因安全问题被重置,原始文件名: {original_filename}")
- # 检查文件扩展名(使用清理后的文件名)
- if "." not in safe_filename:
- raise CustomException(msg="无法识别文件类型")
- ext = os.path.splitext(safe_filename)[1].lower()
- if not ext:
- raise CustomException(msg="无法识别文件类型")
- if ext in DANGEROUS_EXTENSIONS:
- log.error(f"尝试上传危险文件类型: {ext}")
- raise CustomException(msg=f"不允许上传此类型的文件: {ext}")
- try:
- content = await file.read()
- if len(content) > cls.MAX_UPLOAD_SIZE:
- raise CustomException(
- msg=f"文件太大,最大支持{cls.MAX_UPLOAD_SIZE // (1024 * 1024)}MB"
- )
- detected_type = cls._detect_file_type(content)
- if detected_type:
- expected_ext = MIME_TYPE_MAPPING.get(detected_type, "")
- if expected_ext and expected_ext != ext:
- log.warning(
- f"文件类型不匹配: 声明扩展名={ext}, 检测类型={detected_type}"
- )
- # 获取安全的目录路径(_get_safe_path 已经包含路径遍历防护)
- safe_dir = (
- cls._get_resource_root() if target_path is None else cls._get_safe_path(target_path)
- )
- # 确保目录存在
- os.makedirs(safe_dir, exist_ok=True)
- # 构建完整的文件路径
- file_path = os.path.join(safe_dir, safe_filename)
- # 最终安全检查:确保文件路径在允许的目录下
- file_path_abs = os.path.normpath(os.path.abspath(file_path))
- safe_dir_abs = os.path.normpath(os.path.abspath(safe_dir))
- resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
- # 检查文件路径是否在目标目录下
- if not file_path_abs.startswith(safe_dir_abs + os.sep) and file_path_abs != safe_dir_abs:
- log.error(f"检测到路径穿越攻击,目标路径: {file_path}")
- raise CustomException(msg="非法的文件路径")
- # 再次确保文件路径在资源根目录下(防止通过 target_path 绕过)
- if not file_path_abs.startswith(resource_root_abs + os.sep) and file_path_abs != resource_root_abs:
- log.error(f"检测到越权访问尝试,目标路径: {file_path_abs}, 根目录: {resource_root_abs}")
- raise CustomException(msg="访问路径不在允许范围内")
- if os.path.exists(file_path):
- base_name, extension = os.path.splitext(safe_filename)
- counter = 1
- while os.path.exists(file_path):
- new_filename = f"{base_name}_{counter}{extension}"
- file_path = os.path.join(safe_dir, new_filename)
- counter += 1
- safe_filename = os.path.basename(file_path)
- Path(file_path).write_bytes(content)
- file_info = cls._get_file_info(file_path, base_url)
- file_url = cls._generate_http_url(file_path, base_url)
- log.info(f"文件上传成功: {safe_filename}")
- return ResourceUploadSchema(
- filename=safe_filename,
- file_url=file_url,
- file_size=file_info.get("size", 0),
- upload_time=datetime.now(),
- ).model_dump(mode="json")
- except CustomException:
- raise
- except Exception as e:
- log.error(f"文件上传失败: {e!s}")
- raise CustomException(msg=f"文件上传失败: {e!s}")
- @classmethod
- async def download_file_service(cls, file_path: str, base_url: str | None = None) -> str:
- """
- 下载文件(返回本地文件系统路径)
- 参数:
- - file_path (str): 文件路径(可为相对路径、绝对路径或完整URL)。
- - base_url (str | None): 基础URL,用于生成完整URL(不再直接返回URL)。
- 返回:
- - str: 本地文件系统路径。
- """
- try:
- safe_path = cls._get_safe_path(file_path)
- if not os.path.exists(safe_path):
- raise CustomException(msg="文件不存在")
- if not os.path.isfile(safe_path):
- raise CustomException(msg="路径不是文件")
- # 返回本地文件路径给 FileResponse 使用
- log.info(f"定位文件路径: {safe_path}")
- return safe_path
- except CustomException:
- raise
- except Exception as e:
- log.error(f"下载文件失败: {e!s}")
- raise CustomException(msg=f"下载文件失败: {e!s}")
- @classmethod
- def _delete_single_path(cls, path: str) -> None:
- """
- 删除单个文件或目录(内部辅助方法)
- 参数:
- - path (str): 文件或目录路径。
- 返回:
- - None
- 异常:
- - CustomException: 删除失败时抛出
- """
- safe_path = cls._get_safe_path(path)
- if not os.path.exists(safe_path):
- log.error(f"路径不存在,跳过: {path}")
- raise CustomException(msg=f"路径不存在: {path}")
- if os.path.isfile(safe_path):
- os.remove(safe_path)
- log.info(f"删除文件成功: {safe_path}")
- elif os.path.isdir(safe_path):
- shutil.rmtree(safe_path)
- log.info(f"删除目录成功: {safe_path}")
- @classmethod
- async def delete_file_service(cls, paths: list[str]) -> None:
- """
- 删除文件或目录(内部使用,遇到错误会抛出异常)
- 参数:
- - paths (list[str]): 文件或目录路径列表。
- 返回:
- - None
- 注意:
- - 此方法遇到第一个错误就会抛出异常并停止
- - 如需批量删除并收集结果,请使用 batch_delete_service
- """
- if not paths:
- raise CustomException(msg="删除失败,删除路径不能为空")
- for path in paths:
- try:
- cls._delete_single_path(path)
- except Exception as e:
- log.error(f"删除失败 {path}: {e!s}")
- raise CustomException(msg=f"删除失败 {path}: {e!s}")
- @classmethod
- async def batch_delete_service(cls, paths: list[str]) -> dict[str, list[str]]:
- """
- 批量删除文件或目录
- 参数:
- - paths (list[str]): 文件或目录路径列表。
- 返回:
- - dict[str, list[str]]: 键 `success` / `failed` 对应成功与失败路径列表。
- """
- if not paths:
- raise CustomException(msg="删除失败,删除路径不能为空")
- success_paths = []
- failed_paths = []
- for path in paths:
- try:
- cls._delete_single_path(path)
- success_paths.append(path)
- except Exception:
- failed_paths.append(path)
- return {"success": success_paths, "failed": failed_paths}
- @classmethod
- async def move_file_service(cls, data: ResourceMoveSchema) -> None:
- """
- 移动文件或目录
- 参数:
- - data (ResourceMoveSchema): 包含源路径和目标路径的模型。
- 返回:
- - None
- """
- try:
- source_path = cls._get_safe_path(data.source_path)
- target_path = cls._get_safe_path(data.target_path)
- if not os.path.exists(source_path):
- raise CustomException(msg="源路径不存在")
- # 检查目标路径是否已存在
- if os.path.exists(target_path):
- if not data.overwrite:
- raise CustomException(msg="目标路径已存在")
- # 删除目标路径
- if os.path.isfile(target_path):
- os.remove(target_path)
- else:
- shutil.rmtree(target_path)
- # 确保目标目录存在
- target_dir = os.path.dirname(target_path)
- os.makedirs(target_dir, exist_ok=True)
- # 移动文件
- shutil.move(source_path, target_path)
- log.info(f"移动成功: {source_path} -> {target_path}")
- except CustomException:
- raise
- except Exception as e:
- log.error(f"移动失败: {e!s}")
- raise CustomException(msg=f"移动失败: {e!s}")
- @classmethod
- async def copy_file_service(cls, data: ResourceCopySchema) -> None:
- """
- 复制文件或目录
- 参数:
- - data (ResourceCopySchema): 包含源路径和目标路径的模型。
- 返回:
- - None
- """
- try:
- source_path = cls._get_safe_path(data.source_path)
- target_path = cls._get_safe_path(data.target_path)
- if not os.path.exists(source_path):
- raise CustomException(msg="源路径不存在")
- # 检查目标路径是否已存在
- if os.path.exists(target_path) and not data.overwrite:
- raise CustomException(msg="目标路径已存在")
- # 确保目标目录存在
- target_dir = os.path.dirname(target_path)
- os.makedirs(target_dir, exist_ok=True)
- # 复制文件或目录
- if os.path.isfile(source_path):
- shutil.copy2(source_path, target_path)
- else:
- shutil.copytree(source_path, target_path, dirs_exist_ok=data.overwrite)
- log.info(f"复制成功: {source_path} -> {target_path}")
- except CustomException:
- raise
- except Exception as e:
- log.error(f"复制失败: {e!s}")
- raise CustomException(msg=f"复制失败: {e!s}")
- @classmethod
- async def rename_file_service(cls, data: ResourceRenameSchema) -> None:
- """
- 重命名文件或目录
- 参数:
- - data (ResourceRenameSchema): 包含旧路径和新名称的模型。
- 返回:
- - None
- """
- try:
- old_path = cls._get_safe_path(data.old_path)
- if not os.path.exists(old_path):
- raise CustomException(msg="文件或目录不存在")
- # 清理新名称,防止路径遍历
- # 使用 _sanitize_filename 来清理,确保不包含路径分隔符
- safe_new_name = cls._sanitize_filename(data.new_name)
- # 如果新名称被重置,说明检测到攻击
- if safe_new_name.startswith("file_") and safe_new_name != data.new_name:
- log.error(f"重命名时检测到路径遍历攻击,原始名称: {data.new_name}")
- raise CustomException(msg="新名称包含非法字符")
- # 生成新路径
- parent_dir = os.path.dirname(old_path)
- new_path = os.path.join(parent_dir, safe_new_name)
- # 最终安全检查:确保新路径在允许的目录下
- new_path_abs = os.path.normpath(os.path.abspath(new_path))
- resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
- if not new_path_abs.startswith(resource_root_abs + os.sep) and new_path_abs != resource_root_abs:
- log.error(f"重命名时检测到越权访问: {new_path_abs}")
- raise CustomException(msg="目标路径不在允许范围内")
- if os.path.exists(new_path):
- raise CustomException(msg="目标名称已存在")
- # 重命名
- os.rename(old_path, new_path)
- log.info(f"重命名成功: {old_path} -> {new_path}")
- except CustomException:
- raise
- except Exception as e:
- log.error(f"重命名失败: {e!s}")
- raise CustomException(msg=f"重命名失败: {e!s}")
- @classmethod
- async def create_directory_service(cls, data: ResourceCreateDirSchema) -> None:
- """
- 创建目录
- 参数:
- - data (ResourceCreateDirSchema): 包含父目录路径和目录名称的模型。
- 返回:
- - None
- """
- try:
- parent_path = cls._get_safe_path(data.parent_path)
- if not os.path.exists(parent_path):
- raise CustomException(msg="父目录不存在")
- if not os.path.isdir(parent_path):
- raise CustomException(msg="父路径不是目录")
- # 清理目录名称,防止路径遍历(使用与文件名相同的清理逻辑)
- safe_dir_name = cls._sanitize_filename(data.dir_name)
- # 如果目录名被重置,说明检测到攻击
- if safe_dir_name.startswith("file_") and safe_dir_name != data.dir_name:
- log.error(f"创建目录时检测到路径遍历攻击,原始名称: {data.dir_name}")
- raise CustomException(msg="目录名称包含非法字符")
- # 生成新目录路径
- new_dir_path = os.path.join(parent_path, safe_dir_name)
- # 最终安全检查:确保新目录路径在允许的目录下
- new_dir_path_abs = os.path.normpath(os.path.abspath(new_dir_path))
- resource_root_abs = os.path.normpath(os.path.abspath(cls._get_resource_root()))
- if not new_dir_path_abs.startswith(resource_root_abs + os.sep) and new_dir_path_abs != resource_root_abs:
- log.error(f"创建目录时检测到越权访问: {new_dir_path_abs}")
- raise CustomException(msg="目标路径不在允许范围内")
- if os.path.exists(new_dir_path):
- raise CustomException(msg="目录已存在")
- # 创建目录
- os.makedirs(new_dir_path)
- log.info(f"创建目录成功: {new_dir_path}")
- except CustomException:
- raise
- except Exception as e:
- log.error(f"创建目录失败: {e!s}")
- raise CustomException(msg=f"创建目录失败: {e!s}")
- @classmethod
- def _format_file_size(cls, size_bytes: int) -> str:
- """
- 格式化文件大小
- 参数:
- - size_bytes (int): 文件大小(字节)
- 返回:
- - str: 格式化后的文件大小字符串(例如:"123.45MB")
- """
- if size_bytes == 0:
- return "0B"
- size_names = ["B", "KB", "MB", "GB", "TB"]
- i = 0
- while size_bytes >= 1024 and i < len(size_names) - 1:
- size_bytes = int(size_bytes / 1024)
- i += 1
- return f"{size_bytes:.2f}{size_names[i]}"
|