| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from typing import Annotated
- from fastapi import APIRouter, Depends, Path
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.router_class import OperationLogRoute
- from app.plugin.module_payment.apikey.schema import (
- TenantApiKeyCreate,
- TenantApiKeyResponse,
- TenantApiKeyListResponse,
- TenantApiKeyUpdate,
- ApiKeyQueryParam,
- )
- from app.plugin.module_payment.apikey.service import TenantApiKeyService
- from app.common.response import ResponseSchema, SuccessResponse, ErrorResponse
- from app.core.base_params import PaginationQueryParam
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- ApiKeyRouter = APIRouter(route_class=OperationLogRoute, prefix="/api-key", tags=["API Key管理"])
- @ApiKeyRouter.post(
- "",
- summary="创建API Key",
- description="为租户创建新的API Key",
- response_model=ResponseSchema[TenantApiKeyResponse],
- )
- async def create_api_key_controller(
- data: TenantApiKeyCreate,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:create"]))],
- ) -> JSONResponse:
- tenant_id = auth.tenant_id
- api_key_obj = await TenantApiKeyService.create_api_key(
- auth=auth,
- tenant_id=tenant_id,
- expired_days=data.expired_days,
- description=data.description,
- return_url=data.return_url,
- )
- log.info(f"创建API Key成功: 租户ID={tenant_id}")
- return SuccessResponse(
- data=TenantApiKeyResponse(
- id=api_key_obj.id,
- api_key=api_key_obj.api_key,
- api_secret=api_key_obj.api_secret,
- status=api_key_obj.status,
- expired_at=api_key_obj.expired_at,
- created_time=api_key_obj.created_time,
- return_url=api_key_obj.return_url,
- ),
- msg="创建APIKey成功"
- )
- @ApiKeyRouter.get(
- "/list",
- summary="查询API Key",
- description="查询API Key",
- response_model=ResponseSchema[list[TenantApiKeyListResponse]],
- )
- async def get_api_key_list_controller(
- page: Annotated[PaginationQueryParam, Depends()],
- search: Annotated[ApiKeyQueryParam, Depends()],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:query"]))],
- ) -> JSONResponse:
- result_dict = await TenantApiKeyService.get_api_key_page_service(
- auth=auth,
- page_no=page.page_no,
- page_size=page.page_size,
- search=search
- )
- return SuccessResponse(data=result_dict)
- @ApiKeyRouter.put(
- "/{api_key_id}",
- summary="更新API Key状态",
- description="更新API Key的状态",
- response_model=ResponseSchema[TenantApiKeyListResponse],
- )
- async def update_api_key_status_controller(
- api_key_id: Annotated[int, Path(description="API Key ID")],
- data: TenantApiKeyUpdate,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:update"]))],
- ) -> JSONResponse:
- return ErrorResponse(msg="暂不支持操作")
- @ApiKeyRouter.delete(
- "/{api_key_id}",
- summary="删除API Key",
- description="删除API Key",
- response_model=ResponseSchema[dict],
- )
- async def delete_api_key_controller(
- api_key_id: Annotated[int, Path(description="API Key ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:delete"]))],
- ) -> JSONResponse:
- await TenantApiKeyService.delete_api_key(auth=auth, api_key_id=api_key_id)
- log.info(f"删除API Key成功: ID={api_key_id}")
- return SuccessResponse(msg="删除APIKey成功")
|