| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import json
- import time
- from collections.abc import Callable, Coroutine
- from typing import Any
- from fastapi import Request, Response
- from fastapi.routing import APIRoute
- from user_agents import parse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.api.v1.module_system.log.schema import OperationLogCreateSchema
- from app.api.v1.module_system.log.service import OperationLogService
- from app.config.setting import settings
- from app.core.database import async_db_session
- from app.utils.ip_local_util import IpLocalUtil
- """
- 在 FastAPI 中,route_class 参数用于自定义路由的行为。
- 通过设置 route_class,你可以定义一个自定义的路由类,从而在每个路由处理之前或之后执行特定的操作。
- 这对于日志记录、权限验证、性能监控等场景非常有用。
- """
- class OperationLogRoute(APIRoute):
- """操作日志路由装饰器"""
- def get_route_handler(
- self,
- ) -> Callable[[Request], Coroutine[Any, Any, Response]]:
- """
- 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
- 参数:
- - request (Request): FastAPI请求对象。
- 返回:
- - Response: FastAPI响应对象。
- """
- original_route_handler = super().get_route_handler()
- async def custom_route_handler(request: Request) -> Response:
- """
- 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
- 参数:
- - request (Request): FastAPI请求对象。
- 描述:
- - 该方法在每个路由处理之前被调用,用于记录操作日志。
- 返回:
- - Response: FastAPI响应对象。
- """
- start_time = time.time()
- # 请求前的处理
- response: Response = await original_route_handler(request)
- # 请求后的处理
- if not settings.OPERATION_LOG_RECORD:
- return response
- if request.method not in settings.OPERATION_RECORD_METHOD:
- return response
- route: APIRoute = request.scope.get("route", None)
- if route.name in settings.IGNORE_OPERATION_FUNCTION:
- return response
- user_agent = parse(request.headers.get("user-agent"))
- payload = b"{}"
- req_content_type = request.headers.get("Content-Type", "")
- if req_content_type and (
- req_content_type.startswith((
- "multipart/form-data",
- "application/x-www-form-urlencoded",
- ))
- ):
- form_data = await request.form()
- oper_param = "\n".join([f"{k}: {v}" for k, v in form_data.items()])
- payload = oper_param # 直接使用字符串格式的参数
- else:
- payload = await request.body()
- path_params = request.path_params
- oper_param = {}
- # 处理请求体数据
- if payload:
- try:
- oper_param["body"] = json.loads(payload.decode())
- except (json.JSONDecodeError, UnicodeDecodeError):
- oper_param["body"] = payload.decode("utf-8", errors="ignore")
- # 处理路径参数
- if path_params:
- oper_param["path_params"] = dict(path_params)
- payload = json.dumps(oper_param, ensure_ascii=False)
- # 日志表请求参数字段长度最大为2000,因此在此处判断长度
- if len(payload) > 2000:
- payload = "请求参数过长"
- response_data = (
- response.body
- if "application/json" in response.headers.get("Content-Type", "")
- else b"{}"
- )
- process_time = f"{(time.time() - start_time):.2f}s"
- # 获取当前用户ID,如果是登录接口则为空
- log_type = 1 # 1:登录日志 2:操作日志
- current_user_id = None
- # 优化:只在操作日志场景下获取current_user_id
- if "user_id" in request.scope:
- current_user_id = request.scope.get("user_id")
- log_type = 2
- request_ip = None
- x_forwarded_for = request.headers.get("X-Forwarded-For")
- if x_forwarded_for:
- # 取第一个 IP 地址,通常为客户端真实 IP
- request_ip = x_forwarded_for.split(",")[0].strip()
- else:
- # 若没有 X-Forwarded-For 头,则使用 request.client.host
- if request.client:
- request_ip = request.client.host
- login_location = await IpLocalUtil.resolve_location_for_log(request_ip)
- # 判断请求是否来自api文档
- referer = request.headers.get("referer")
- request_from_swagger = referer and referer.endswith("docs")
- request_from_redoc = referer and referer.endswith("redoc")
- if request_from_swagger or request_from_redoc:
- # 如果请求来自api文档,则不记录日志
- pass
- else:
- async with async_db_session() as session:
- async with session.begin():
- auth = AuthSchema(db=session)
- await OperationLogService.create_log_service(
- data=OperationLogCreateSchema(
- type=log_type,
- request_path=request.url.path,
- request_method=request.method,
- request_payload=payload,
- request_ip=request_ip,
- login_location=login_location,
- request_os=user_agent.os.family,
- request_browser=user_agent.browser.family,
- response_code=response.status_code,
- response_json=response_data.decode()
- if isinstance(response_data, (bytes, bytearray))
- else str(response_data),
- process_time=process_time,
- description=route.summary,
- created_id=current_user_id,
- updated_id=current_user_id,
- ),
- auth=auth,
- )
- return response
- return custom_route_handler
|