security.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import jwt
  2. from fastapi import Form, Request
  3. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  4. from fastapi.security.utils import get_authorization_scheme_param
  5. from app.api.v1.module_system.auth.schema import JWTPayloadSchema
  6. from app.config.setting import settings
  7. from app.core.exceptions import CustomException
  8. class CustomOAuth2PasswordBearer(OAuth2PasswordBearer):
  9. """自定义OAuth2认证类,继承自OAuth2PasswordBearer"""
  10. def __init__(
  11. self,
  12. token_url: str,
  13. scheme_name: str | None = None,
  14. scopes: dict[str, str] | None = None,
  15. description: str | None = None,
  16. auto_error: bool = True,
  17. ) -> None:
  18. super().__init__(
  19. tokenUrl=token_url,
  20. scheme_name=scheme_name,
  21. scopes=scopes,
  22. description=description,
  23. auto_error=auto_error,
  24. )
  25. async def __call__(self, request: Request) -> str | None:
  26. """
  27. 重写认证方法,校验token
  28. 参数:
  29. - request (Request): FastAPI请求对象。
  30. 返回:
  31. - str | None: 校验通过的token,如果校验失败则返回None。
  32. 异常:
  33. - CustomException: 认证失败时抛出,状态码为401。
  34. """
  35. authorization = request.headers.get("Authorization")
  36. scheme, token = get_authorization_scheme_param(authorization)
  37. if not authorization or scheme.lower() != settings.TOKEN_TYPE:
  38. if self.auto_error:
  39. raise CustomException(msg="认证失败,请登录后再试", code=10401, status_code=401)
  40. return None
  41. return token
  42. class CustomOAuth2PasswordRequestForm(OAuth2PasswordRequestForm):
  43. """
  44. 自定义登录表单,扩展验证码等字段
  45. 参数:
  46. - grant_type (str | None): 授权类型,默认值为None,正则表达式为'password'。
  47. - scope (str): 作用域,默认值为空字符串。
  48. - client_id (str | None): 客户端ID,默认值为None。
  49. - client_secret (str | None): 客户端密钥,默认值为None。
  50. - username (str): 用户名。
  51. - password (str): 密码。
  52. - captcha_key (str | None): 验证码键,默认值为空字符串。
  53. - captcha (str | None): 验证码值,默认值为空字符串。
  54. - login_type (str | None): 登录类型,默认值为"PC端",描述为"PC端 | 移动端"。
  55. """
  56. def __init__(
  57. self,
  58. grant_type: str | None = Form(default=None, pattern="password"),
  59. scope: str = Form(default=""),
  60. client_id: str | None = Form(default=None),
  61. client_secret: str | None = Form(default=None),
  62. username: str = Form(),
  63. password: str = Form(),
  64. captcha_key: str | None = Form(default=""),
  65. captcha: str | None = Form(default=""),
  66. login_type: str | None = Form(default="PC端", description="PC端 | 移动端"),
  67. ) -> None:
  68. super().__init__(
  69. grant_type=grant_type,
  70. scope=scope,
  71. client_id=client_id,
  72. client_secret=client_secret,
  73. username=username,
  74. password=password,
  75. )
  76. self.captcha_key = captcha_key
  77. self.captcha = captcha
  78. self.login_type = login_type
  79. # OAuth2认证配置
  80. OAuth2Schema = CustomOAuth2PasswordBearer(token_url="system/auth/login", description="认证")
  81. def create_access_token(payload: JWTPayloadSchema) -> str:
  82. """
  83. 生成JWT访问令牌
  84. 参数:
  85. - payload (JWTPayloadSchema): JWT有效载荷,包含用户信息等。
  86. 返回:
  87. - str: 生成的JWT访问令牌。
  88. """
  89. payload_dict = payload.model_dump()
  90. return jwt.encode(
  91. payload=payload_dict,
  92. key=settings.SECRET_KEY,
  93. algorithm=settings.ALGORITHM,
  94. )
  95. def decode_access_token(token: str) -> JWTPayloadSchema:
  96. """
  97. 解析JWT访问令牌
  98. 参数:
  99. - token (str): JWT访问令牌字符串。
  100. 返回:
  101. - JWTPayloadSchema: 解析后的JWT有效载荷,包含用户信息等。
  102. 异常:
  103. - CustomException: 解析失败时抛出,状态码为401。
  104. """
  105. if not token:
  106. raise CustomException(msg="认证不存在,请重新登录", code=10401, status_code=401)
  107. try:
  108. payload = jwt.decode(jwt=token, key=settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  109. online_user_info = payload.get("sub")
  110. if not online_user_info:
  111. raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
  112. return JWTPayloadSchema(**payload)
  113. except (jwt.InvalidSignatureError, jwt.DecodeError):
  114. raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
  115. except jwt.ExpiredSignatureError:
  116. raise CustomException(msg="认证已过期,请重新登录", code=10401, status_code=401)
  117. except jwt.InvalidTokenError:
  118. raise CustomException(msg="token已失效,请重新登录", code=10401, status_code=401)