from typing import Annotated from fastapi import APIRouter, Depends, Path from fastapi.responses import JSONResponse from app.api.v1.module_system.auth.schema import AuthSchema from app.core.router_class import OperationLogRoute from app.plugin.module_payment.apikey.schema import ( TenantApiKeyCreate, TenantApiKeyResponse, TenantApiKeyListResponse, TenantApiKeyUpdate, ApiKeyQueryParam, ) from app.plugin.module_payment.apikey.service import TenantApiKeyService from app.common.response import ResponseSchema, SuccessResponse, ErrorResponse from app.core.base_params import PaginationQueryParam from app.core.dependencies import AuthPermission from app.core.logger import log ApiKeyRouter = APIRouter(route_class=OperationLogRoute, prefix="/api-key", tags=["API Key管理"]) @ApiKeyRouter.post( "", summary="创建API Key", description="为租户创建新的API Key", response_model=ResponseSchema[TenantApiKeyResponse], ) async def create_api_key_controller( data: TenantApiKeyCreate, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:create"]))], ) -> JSONResponse: tenant_id = auth.tenant_id api_key_obj = await TenantApiKeyService.create_api_key( auth=auth, tenant_id=tenant_id, expired_days=data.expired_days, description=data.description, return_url=data.return_url, ) log.info(f"创建API Key成功: 租户ID={tenant_id}") return SuccessResponse( data=TenantApiKeyResponse( id=api_key_obj.id, api_key=api_key_obj.api_key, api_secret=api_key_obj.api_secret, status=api_key_obj.status, expired_at=api_key_obj.expired_at, created_time=api_key_obj.created_time, return_url=api_key_obj.return_url, ), msg="创建APIKey成功" ) @ApiKeyRouter.get( "/list", summary="查询API Key", description="查询API Key", response_model=ResponseSchema[list[TenantApiKeyListResponse]], ) async def get_api_key_list_controller( page: Annotated[PaginationQueryParam, Depends()], search: Annotated[ApiKeyQueryParam, Depends()], auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:query"]))], ) -> JSONResponse: result_dict = await TenantApiKeyService.get_api_key_page_service( auth=auth, page_no=page.page_no, page_size=page.page_size, search=search ) return SuccessResponse(data=result_dict) @ApiKeyRouter.put( "/{api_key_id}", summary="更新API Key状态", description="更新API Key的状态", response_model=ResponseSchema[TenantApiKeyListResponse], ) async def update_api_key_status_controller( api_key_id: Annotated[int, Path(description="API Key ID")], data: TenantApiKeyUpdate, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:update"]))], ) -> JSONResponse: return ErrorResponse(msg="暂不支持操作") @ApiKeyRouter.delete( "/{api_key_id}", summary="删除API Key", description="删除API Key", response_model=ResponseSchema[dict], ) async def delete_api_key_controller( api_key_id: Annotated[int, Path(description="API Key ID")], auth: Annotated[AuthSchema, Depends(AuthPermission(["module_system:tenant:api-key:delete"]))], ) -> JSONResponse: await TenantApiKeyService.delete_api_key(auth=auth, api_key_id=api_key_id) log.info(f"删除API Key成功: ID={api_key_id}") return SuccessResponse(msg="删除APIKey成功")