controller.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from typing import Annotated
  2. from fastapi import APIRouter, Depends, Path
  3. from fastapi.responses import JSONResponse
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.router_class import OperationLogRoute
  6. from app.plugin.module_payment.apikey.schema import (
  7. TenantApiKeyCreate,
  8. TenantApiKeyResponse,
  9. TenantApiKeyListResponse,
  10. TenantApiKeyUpdate,
  11. ApiKeyQueryParam,
  12. )
  13. from app.plugin.module_payment.apikey.service import TenantApiKeyService
  14. from app.common.response import ResponseSchema, SuccessResponse, ErrorResponse
  15. from app.core.base_params import PaginationQueryParam
  16. from app.core.dependencies import AuthPermission
  17. from app.core.logger import log
  18. ApiKeyRouter = APIRouter(route_class=OperationLogRoute, prefix="/api-key", tags=["API Key管理"])
  19. @ApiKeyRouter.post(
  20. "",
  21. summary="创建API Key",
  22. description="为租户创建新的API Key",
  23. response_model=ResponseSchema[TenantApiKeyResponse],
  24. )
  25. async def create_api_key_controller(
  26. data: TenantApiKeyCreate,
  27. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:create"]))],
  28. ) -> JSONResponse:
  29. tenant_id = auth.tenant_id
  30. api_key_obj = await TenantApiKeyService.create_api_key(
  31. auth=auth,
  32. tenant_id=tenant_id,
  33. expired_days=data.expired_days,
  34. description=data.description,
  35. return_url=data.return_url,
  36. )
  37. log.info(f"创建API Key成功: 租户ID={tenant_id}")
  38. return SuccessResponse(
  39. data=TenantApiKeyResponse(
  40. id=api_key_obj.id,
  41. api_key=api_key_obj.api_key,
  42. api_secret=api_key_obj.api_secret,
  43. status=api_key_obj.status,
  44. expired_at=api_key_obj.expired_at,
  45. created_time=api_key_obj.created_time,
  46. return_url=api_key_obj.return_url,
  47. ),
  48. msg="创建APIKey成功"
  49. )
  50. @ApiKeyRouter.get(
  51. "/list",
  52. summary="查询API Key",
  53. description="查询API Key",
  54. response_model=ResponseSchema[list[TenantApiKeyListResponse]],
  55. )
  56. async def get_api_key_list_controller(
  57. page: Annotated[PaginationQueryParam, Depends()],
  58. search: Annotated[ApiKeyQueryParam, Depends()],
  59. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:query"]))],
  60. ) -> JSONResponse:
  61. result_dict = await TenantApiKeyService.get_api_key_page_service(
  62. auth=auth,
  63. page_no=page.page_no,
  64. page_size=page.page_size,
  65. search=search
  66. )
  67. return SuccessResponse(data=result_dict)
  68. @ApiKeyRouter.put(
  69. "/{api_key_id}",
  70. summary="更新API Key状态",
  71. description="更新API Key的状态",
  72. response_model=ResponseSchema[TenantApiKeyListResponse],
  73. )
  74. async def update_api_key_status_controller(
  75. api_key_id: Annotated[int, Path(description="API Key ID")],
  76. data: TenantApiKeyUpdate,
  77. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:update"]))],
  78. ) -> JSONResponse:
  79. return ErrorResponse(msg="暂不支持操作")
  80. @ApiKeyRouter.delete(
  81. "/{api_key_id}",
  82. summary="删除API Key",
  83. description="删除API Key",
  84. response_model=ResponseSchema[dict],
  85. )
  86. async def delete_api_key_controller(
  87. api_key_id: Annotated[int, Path(description="API Key ID")],
  88. auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:delete"]))],
  89. ) -> JSONResponse:
  90. await TenantApiKeyService.delete_api_key(auth=auth, api_key_id=api_key_id)
  91. log.info(f"删除API Key成功: ID={api_key_id}")
  92. return SuccessResponse(msg="删除APIKey成功")