| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- from typing import Annotated
- from fastapi import APIRouter, Depends, Path, Query
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from .schema import (
- ExpenseRuleCreateSchema,
- ExpenseRuleDeleteSchema,
- ExpenseRuleModifySchema,
- RuleCreateSchema,
- RuleListOutSchema,
- RuleOperationOutSchema,
- RuleOutSchema,
- RuleUpdateSchema,
- )
- from .service import RuleService
- RuleRouter = APIRouter(
- route_class=OperationLogRoute,
- prefix="/rule",
- tags=["使用规则"],
- )
- @RuleRouter.post(
- "/expense/create",
- summary="创建费控使用规则",
- description="创建费控使用规则 (alipay.ebpp.invoice.institution.expenserule.create)",
- response_model=ResponseSchema[RuleOperationOutSchema],
- )
- async def create_expense_rule_controller(
- data: ExpenseRuleCreateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:expense:create"]))],
- ) -> JSONResponse:
- result = await RuleService.create_expense_rule_service(auth=auth, data=data)
- log.info(f"创建费控使用规则成功: out_biz_no={result.out_biz_no}")
- return SuccessResponse(data=result, msg="创建费控使用规则成功")
- @RuleRouter.post(
- "",
- summary="创建使用规则",
- description="创建使用规则",
- response_model=ResponseSchema[RuleOperationOutSchema],
- )
- async def create_rule_controller(
- data: RuleCreateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:create"]))],
- ) -> JSONResponse:
- result = await RuleService.create_rule_service(auth=auth, data=data)
- log.info(f"创建使用规则成功: out_biz_no={result.out_biz_no}")
- return SuccessResponse(data=result, msg="创建使用规则成功")
- @RuleRouter.put(
- "/expense/{out_biz_no}",
- summary="编辑使用规则",
- description="编辑使用规则 (alipay.ebpp.invoice.institution.expenserule.modify)",
- response_model=ResponseSchema[RuleOperationOutSchema],
- )
- async def modify_expense_rule_controller(
- out_biz_no: Annotated[str, Path(description="外部业务编号")],
- data: ExpenseRuleModifySchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:expense:modify"]))],
- ) -> JSONResponse:
- result = await RuleService.modify_expense_rule_service(auth=auth, out_biz_no=out_biz_no, data=data)
- log.info(f"编辑使用规则成功: {out_biz_no}")
- return SuccessResponse(data=result, msg="编辑使用规则成功")
- @RuleRouter.delete(
- "/expense/{out_biz_no}",
- summary="删除使用规则",
- description="删除使用规则 (alipay.ebpp.invoice.institution.expenserule.delete)",
- response_model=ResponseSchema[RuleOperationOutSchema],
- )
- async def delete_expense_rule_controller(
- out_biz_no: Annotated[str, Path(description="外部业务编号")],
- data: ExpenseRuleDeleteSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:expense:delete"]))],
- ) -> JSONResponse:
- result = await RuleService.delete_expense_rule_service(auth=auth, out_biz_no=out_biz_no, data=data)
- log.info(f"删除使用规则成功: {out_biz_no}")
- return SuccessResponse(data=result, msg="删除使用规则成功")
- @RuleRouter.get(
- "",
- summary="查询使用规则列表",
- description="分页查询使用规则列表",
- response_model=ResponseSchema[RuleListOutSchema],
- )
- async def list_rule_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:list"]))],
- page_no: Annotated[int, Query(description="页码")] = 1,
- page_size: Annotated[int, Query(description="每页数量")] = 20,
- institution_id: Annotated[str | None, Query(description="制度ID")] = None,
- ) -> JSONResponse:
- search = {}
- if institution_id:
- search["institution_id"] = institution_id
- result = await RuleService.list_service(
- auth=auth, page_no=page_no, page_size=page_size, search=search
- )
- return SuccessResponse(data=result, msg="查询使用规则列表成功")
- @RuleRouter.get(
- "/{out_biz_no}",
- summary="查询使用规则详情",
- response_model=ResponseSchema[RuleOutSchema],
- )
- async def get_detail_controller(
- out_biz_no: Annotated[str, Path(description="外部业务编号")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:detail"]))],
- ) -> JSONResponse:
- result = await RuleService.detail_service(auth=auth, out_biz_no=out_biz_no)
- return SuccessResponse(data=result, msg="查询使用规则详情成功")
- @RuleRouter.put(
- "/{out_biz_no}",
- summary="更新使用规则",
- response_model=ResponseSchema[RuleOperationOutSchema],
- )
- async def update_controller(
- out_biz_no: Annotated[str, Path(description="外部业务编号")],
- data: RuleUpdateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:rule:update"]))],
- ) -> JSONResponse:
- result = await RuleService.update_service(auth=auth, out_biz_no=out_biz_no, data=data)
- log.info(f"更新使用规则成功: {out_biz_no}")
- return SuccessResponse(data=result, msg="更新使用规则成功")
|