from typing import Annotated from fastapi import APIRouter, Depends, Path, Query from fastapi.responses import JSONResponse from app.api.v1.module_system.auth.schema import AuthSchema from app.common.response import ResponseSchema, SuccessResponse from app.core.dependencies import AuthPermission from app.core.logger import log from app.core.router_class import OperationLogRoute from .schema import ( ExpenseQuotaCreateSchema, ExpenseQuotaDeleteSchema, ExpenseQuotaModifySchema, ExpenseQuotaQuerySchema, IssueBatchCancelOutSchema, IssueBatchCancelSchema, IssueBatchCreateOutSchema, IssueBatchCreateSchema, IssueBatchListOutSchema, IssueBatchRecordsQueryOutSchema, IssueBatchRecordsQuerySchema, QuotaCreateSchema, QuotaListOutSchema, QuotaOperationOutSchema, QuotaOutSchema, QuotaUpdateSchema, ) from .service import QuotaService from .outsource_schema import OutsourceNotifySchema, OutsourceNotifyOutSchema from .outsource_service import OutsourceNotifyService QuotaRouter = APIRouter( route_class=OperationLogRoute, prefix="/quota", tags=["额度管理"], ) @QuotaRouter.post( "/expense/create", summary="创建余额/点券", description="创建余额或点券 (alipay.ebpp.invoice.expensecontrol.quota.create)", response_model=ResponseSchema[QuotaOperationOutSchema], ) async def create_expense_quota_controller( data: ExpenseQuotaCreateSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:expense:create"]))], ) -> JSONResponse: result = await QuotaService.create_expense_quota_service(auth=auth, data=data) log.info(f"创建余额/点券成功: out_biz_no={result.out_biz_no}") return SuccessResponse(data=result, msg="创建余额/点券成功") @QuotaRouter.post( "/expense/query", summary="查询余额/点券", description="查询余额或点券 (alipay.ebpp.invoice.expensecontrol.quota.query)", response_model=ResponseSchema[ExpenseQuotaQuerySchema], ) async def query_expense_quota_controller( data: ExpenseQuotaQuerySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:expense:query"]))], ) -> JSONResponse: result = await QuotaService.query_expense_quota_service(auth=auth, data=data) log.info(f"查询余额/点券成功") return SuccessResponse(data=result, msg="查询余额/点券成功") @QuotaRouter.put( "/expense/{out_biz_no}", summary="修改余额/点券", description="修改余额或点券 (alipay.ebpp.invoice.expensecontrol.quota.modify)", response_model=ResponseSchema[QuotaOperationOutSchema], ) async def modify_expense_quota_controller( out_biz_no: Annotated[str, Path(description="外部业务编号")], data: ExpenseQuotaModifySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:expense:modify"]))], ) -> JSONResponse: result = await QuotaService.modify_expense_quota_service(auth=auth, out_biz_no=out_biz_no, data=data) log.info(f"修改余额/点券成功: {out_biz_no}") return SuccessResponse(data=result, msg="修改余额/点券成功") @QuotaRouter.delete( "/expense/{out_biz_no}", summary="删除额度", description="删除额度 (alipay.ebpp.invoice.expensecontrol.quota.delete)", response_model=ResponseSchema[QuotaOperationOutSchema], ) async def delete_expense_quota_controller( out_biz_no: Annotated[str, Path(description="外部业务编号")], data: ExpenseQuotaDeleteSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:expense:delete"]))], ) -> JSONResponse: result = await QuotaService.delete_expense_quota_service(auth=auth, out_biz_no=out_biz_no, data=data) log.info(f"删除额度成功: {out_biz_no}") return SuccessResponse(data=result, msg="删除额度成功") @QuotaRouter.post( "", summary="创建额度", description="创建额度", response_model=ResponseSchema[QuotaOperationOutSchema], ) async def create_quota_controller( data: QuotaCreateSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:create"]))], ) -> JSONResponse: result = await QuotaService.create_quota_service(auth=auth, data=data) log.info(f"创建额度成功: out_biz_no={result.out_biz_no}") return SuccessResponse(data=result, msg="创建额度成功") @QuotaRouter.get( "", summary="查询额度列表", description="分页查询额度列表", response_model=ResponseSchema[QuotaListOutSchema], ) async def list_quota_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:list"]))], page_no: Annotated[int, Query(description="页码")] = 1, page_size: Annotated[int, Query(description="每页数量")] = 20, institution_id: Annotated[str | None, Query(description="制度ID")] = None, employee_id: Annotated[str | None, Query(description="员工ID")] = None, ) -> JSONResponse: search = {} if institution_id: search["institution_id"] = institution_id if employee_id: search["employee_id"] = employee_id result = await QuotaService.list_service( auth=auth, page_no=page_no, page_size=page_size, search=search ) return SuccessResponse(data=result, msg="查询额度列表成功") # ======================== # 手工批量发放额度 # ======================== @QuotaRouter.get( "/issuebatch/list", summary="查询手工发放批次列表", description="分页查询手工发放批次列表", ) async def list_issue_batch_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:issuebatch:list"]))], page_no: Annotated[int, Query(description="页码")] = 1, page_size: Annotated[int, Query(description="每页数量")] = 20, institution_id: Annotated[str | None, Query(description="制度ID")] = None, ) -> JSONResponse: search = {} if institution_id: search["institution_id"] = institution_id result = await QuotaService.list_batch_service( auth=auth, page_no=page_no, page_size=page_size, search=search ) return SuccessResponse(data=result, msg="查询批次列表成功") @QuotaRouter.post( "/issuebatch/create", summary="手工批量发放额度", description="批量对企业下的员工进行额度发放 (alipay.ebpp.invoice.expensecontrol.issuebatch.create)", response_model=ResponseSchema[IssueBatchCreateOutSchema], ) async def issue_batch_create_controller( data: IssueBatchCreateSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:issuebatch:create"]))], ) -> JSONResponse: result = await QuotaService.issue_batch_create_service(auth=auth, data=data) log.info(f"手工批量发放额度成功: batch_no={data.batch_no}, issue_batch_id={result.issue_batch_id}") return SuccessResponse(data=result, msg="手工批量发放额度成功") @QuotaRouter.post( "/issuebatch/cancel", summary="作废手工发放批次", description="作废当前批次下发放的额度 (alipay.ebpp.invoice.expensecontrol.issuebatch.cancel)", response_model=ResponseSchema[IssueBatchCancelOutSchema], ) async def issue_batch_cancel_controller( data: IssueBatchCancelSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:issuebatch:cancel"]))], ) -> JSONResponse: result = await QuotaService.issue_batch_cancel_service(auth=auth, data=data) log.info(f"作废手工发放批次成功: issue_batch_id={data.issue_batch_id}") return SuccessResponse(data=result, msg="作废手工发放批次成功") @QuotaRouter.post( "/issuebatch/records", summary="查询手工发放发放明细", description="根据批次号分页查询手工发放的发放明细 (alipay.ebpp.invoice.issuebatch.issuerecords.batchquery)", response_model=ResponseSchema[IssueBatchRecordsQueryOutSchema], ) async def issue_batch_records_query_controller( data: IssueBatchRecordsQuerySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:issuebatch:records"]))], ) -> JSONResponse: result = await QuotaService.issue_batch_records_query_service(auth=auth, data=data) log.info(f"查询手工发放发放明细成功: issue_batch_id={data.issue_batch_id}") return SuccessResponse(data=result, msg="查询手工发放发放明细成功") @QuotaRouter.post( "/outsource/notify", summary="外部消费额度同步", description="将外部消费同步到支付宝额度系统 (alipay.ebpp.invoice.expensecomsue.outsource.notify)", response_model=ResponseSchema[OutsourceNotifyOutSchema], ) async def outsource_notify_controller( data: OutsourceNotifySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:quota:outsource:notify"]))], ) -> JSONResponse: result = await OutsourceNotifyService.notify_service(auth=auth, data=data) log.info(f"外部消费额度同步: out_source_id={result.out_source_id}, success={result.success}") return SuccessResponse(data=result, msg="外部消费额度同步成功" if result.success else "外部消费额度同步失败")