| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from typing import Annotated
- from fastapi import APIRouter, Body, Depends, Path, Query
- from fastapi.responses import JSONResponse
- from redis.asyncio import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.dependencies import AuthPermission, redis_getter
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from .schema import (
- EnterpriseApplyInviteSchema,
- EnterpriseCreateOrUpdateSchema,
- EnterpriseInviteOutSchema,
- EnterpriseListOutSchema,
- EnterpriseOperationOutSchema,
- EnterpriseOutSchema,
- )
- from .service import EnterpriseService
- from alipay.aop.api.domain.EnterpriseInfoDTO import EnterpriseInfoDTO
- EnterpriseRouter = APIRouter(
- route_class=OperationLogRoute,
- prefix="/enterprise",
- tags=["企业管理"],
- )
- @EnterpriseRouter.get(
- "/all",
- summary="查询企业类别",
- description="分页查询企业列表",
- response_model=ResponseSchema[EnterpriseListOutSchema],
- )
- async def all_enterprise_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:list"]))],
- ) -> JSONResponse:
- result = await EnterpriseService.all_service(auth=auth)
- return SuccessResponse(data=result)
- @EnterpriseRouter.get(
- "",
- summary="查询企业列表",
- description="分页查询企业列表",
- response_model=ResponseSchema[EnterpriseListOutSchema],
- )
- async def list_enterprise_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:list"]))],
- page_no: Annotated[int, Query(description="页码")] = 1,
- page_size: Annotated[int, Query(description="每页数量")] = 20,
- name: Annotated[str | None, Query(description="企业名称")] = None,
- enterprise_id: Annotated[str | None, Query(description="企业ID")] = None,
- status: Annotated[str | None, Query(description="状态")] = None,
- start_time: Annotated[str | None, Query(description="开始时间")] = None,
- end_time: Annotated[str | None, Query(description="结束时间")] = None,
- ) -> JSONResponse:
- """查询企业列表"""
- search = {}
- if name:
- search["name"] = name
- if enterprise_id:
- search["enterprise_id"] = enterprise_id
- if status:
- search["status"] = status
- if start_time:
- search["start_time"] = start_time
- if end_time:
- search["end_time"] = end_time
- result = await EnterpriseService.list_service(
- auth=auth, page_no=page_no, page_size=page_size, search=search
- )
- return SuccessResponse(data=result, msg="查询企业列表成功")
- @EnterpriseRouter.post(
- "/invite",
- summary="申请企业邀请码",
- description="申请企业邀请码(可多次调用)",
- response_model=ResponseSchema[EnterpriseInviteOutSchema],
- )
- async def apply_invite_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:invite"]))],
- redis: Annotated[Redis, Depends(redis_getter)],
- data: EnterpriseApplyInviteSchema,
- ) -> JSONResponse:
- """申请企业邀请码"""
- result = await EnterpriseService.apply_invite_service(auth=auth, redis=redis, data=data)
- log.info(f"申请企业邀请码成功: {data.identity_type} | {data.identity} | {data.identity_open_id}")
- return SuccessResponse(data=result, msg="申请企业邀请码成功")
- @EnterpriseRouter.get(
- "/{enterprise_id}",
- summary="查询企业详情",
- description="根据 enterprise_id 查询企业详情",
- # response_model=ResponseSchema[EnterpriseInfoDTO]
- )
- async def get_detail_controller(
- enterprise_id: Annotated[str, Path(description="企业ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:detail"]))],
- ) -> JSONResponse:
- """查询企业详情"""
- result = await EnterpriseService.detail_service(auth=auth, enterprise_id=enterprise_id)
- log.info(f"查询企业详情成功: {enterprise_id}")
- return SuccessResponse(data=result, msg="查询企业详情成功")
- @EnterpriseRouter.post(
- "/{enterprise_id}/unsign",
- summary="企业解约",
- description="企业解约",
- response_model=ResponseSchema[EnterpriseOperationOutSchema],
- )
- async def unsign_controller(
- enterprise_id: Annotated[str, Path(description="企业ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:unsign"]))],
- ) -> JSONResponse:
- """企业解约"""
- result = await EnterpriseService.enterprise_unsign_service(auth=auth, enterprise_id=enterprise_id)
- log.info(f"企业解约申请已提交: {enterprise_id}")
- return SuccessResponse(data=result, msg="企业解约申请已提交")
- @EnterpriseRouter.post(
- "/{enterprise_id}/delete",
- summary="企业注销",
- description="企业注销",
- response_model=ResponseSchema[EnterpriseOperationOutSchema],
- )
- async def delete_controller(
- enterprise_id: Annotated[str, Path(description="企业ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:enterprise:delete"]))],
- ) -> JSONResponse:
- """企业注销"""
- result = await EnterpriseService.enterprise_delete_service(auth=auth, enterprise_id=enterprise_id)
- log.info(f"企业注销申请已提交: {enterprise_id}")
- return SuccessResponse(data=result, msg="企业注销申请已提交")
|