| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- from typing import Annotated
- import uuid
- from fastapi import APIRouter, Depends, Path, Query
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from app.plugin.module_payment.expense.institution.schema import InstitutionListOutSchema
- from .service import InstitutionService, InstitutionScopeService, IssueruleService
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionCreateModel import (
- AlipayEbppInvoiceInstitutionCreateModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionCreateResponse import (
- AlipayEbppInvoiceInstitutionCreateResponse,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import (
- AlipayEbppInvoiceInstitutionDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDeleteResponse import (
- AlipayEbppInvoiceInstitutionDeleteResponse,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionModifyModel import (
- AlipayEbppInvoiceInstitutionModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionModifyResponse import (
- AlipayEbppInvoiceInstitutionModifyResponse,
- )
- InstitutionRouter = APIRouter(
- route_class=OperationLogRoute,
- prefix="/institution",
- tags=["费控制度"],
- )
- @InstitutionRouter.post(
- "",
- summary="创建费控制度",
- description="创建费控制度。支持串联调用:创建制度→设置成员→创建发放规则",
- )
- async def create_institution_controller(
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:create"]))],
- ) -> JSONResponse:
- """创建费控制度(含完整串联流程)"""
- institution_create_model = AlipayEbppInvoiceInstitutionCreateModel.from_alipay_dict(data)
- enterprise_id = data.get("enterprise_id", "")
- # 解析适用成员数据
- scope_data = None
- adapter_type = data.get("applicable_scope")
- if adapter_type and adapter_type != "NONE":
- scope_data = {
- "adapter_type": adapter_type,
- "owner_type": data.get("scope_owner_type", "EMPLOYEE"),
- "add_owner_id_list": data.get("scope_owner_id_list"),
- }
- # 解析发放规则数据
- issuerule_data = None
- if data.get("grant_mode") == "period":
- period_type_raw = data.get("period_type", "monthly")
- # 映射前端period_type到支付宝枚举
- ISSUE_TYPE_MAP = {
- "daily": "ISSUE_DAY",
- "weekly": "ISSUE_WEEK",
- "monthly": "ISSUE_MONTH",
- "quarterly": "ISSUE_QUARTER",
- "yearly": "ISSUE_YEAR",
- }
- issue_type = ISSUE_TYPE_MAP.get(period_type_raw, "ISSUE_MONTH")
- amount = data.get("amount", 0)
- # 有效时间配置
- effective_time_type = data.get("effective_time_type", "unlimited")
- if effective_time_type == "unlimited":
- effective_period = '{"all": true}'
- elif effective_time_type == "workday":
- workday_start = data.get("workday_start_time", "00:00")
- workday_end = data.get("workday_end_time", "23:59")
- effective_period = f'{{"regular":{{"workday":[["{workday_start}","{workday_end}"]]}}}}'
- else:
- effective_period = '{"all": true}'
- issuerule_data = {
- "quota_type": "CAP",
- "issue_type": issue_type,
- "issue_amount_value": str(amount),
- "issue_rule_name": data.get("name", "") + "-发放规则",
- "effective_period": effective_period,
- "invalid_mode": 1 if data.get("effective_time_type") == "unlimited" else 0,
- "share_mode": 0,
- "outer_source_id": data.get("outer_source_id") or str(uuid.uuid4()),
- }
- result = await InstitutionService.create_institution_full_flow(
- auth=auth,
- institution_model=institution_create_model,
- enterprise_id=enterprise_id,
- scope_data=scope_data,
- issuerule_data=issuerule_data,
- )
- log.info(f"创建费控制度成功: institution_id={result.get('institution_id')}")
- return SuccessResponse(data=result, msg="创建费控制度成功")
- @InstitutionRouter.get(
- "",
- summary="查询费控制度列表",
- description="分页查询费控制度列表",
- response_model=ResponseSchema[InstitutionListOutSchema],
- )
- async def list_institution_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:list"]))],
- page_no: Annotated[int, Query(description="页码")] = 1,
- page_size: Annotated[int, Query(description="每页数量")] = 20,
- enterprise_id: Annotated[str | None, Query(description="企业ID")] = None,
- name: Annotated[str | None, Query(description="制度名称")] = None,
- expense_type: Annotated[str | None, Query(description="费用类型")] = None,
- status: Annotated[str | None, Query(description="状态")] = None,
- ) -> JSONResponse:
- """查询费控制度列表"""
- search = {}
- if enterprise_id:
- search["enterprise_id"] = enterprise_id
- if name:
- search["name"] = name
- if expense_type:
- search["expense_type"] = expense_type
- if status:
- search["status"] = status
- result = await InstitutionService.list_service(
- auth=auth, page_no=page_no, page_size=page_size, search=search
- )
- return SuccessResponse(data=result, msg="查询费控制度列表成功")
- @InstitutionRouter.get(
- "/{institution_id}",
- summary="查询费控制度详情",
- description="查询费控制度详情 (alipay.ebpp.invoice.institution.detailinfo.query),失败时降级到本地DB",
- )
- async def detail_institution_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:detail"]))],
- enterprise_id: Annotated[str | None, Query(description="企业ID")] = None,
- ) -> JSONResponse:
- """查询费控制度详情"""
- if not enterprise_id:
- return SuccessResponse(data=None, msg="企业ID不能为空")
- result = await InstitutionService.detailinfo_query_service(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=enterprise_id,
- )
- if result is None:
- return SuccessResponse(data=None, msg="制度不存在")
- return SuccessResponse(data=result, msg="查询费控制度详情成功")
- @InstitutionRouter.delete(
- "",
- summary="删除费控制度",
- description="删除费控制度 (alipay.ebpp.invoice.institution.delete)",
- )
- async def delete_institution_controller(
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:delete"]))],
- ) -> JSONResponse:
- """删除费控制度"""
- institution_delete_model = AlipayEbppInvoiceInstitutionDeleteModel(**data)
- result = await InstitutionService.delete_institution_service(auth=auth, data=institution_delete_model)
- log.info(f"删除费控制度成功: institution_id={institution_delete_model.institution_id}, enterprise_id={institution_delete_model.enterprise_id}")
- return SuccessResponse(data=result, msg="删除费控制度成功")
- @InstitutionRouter.post(
- "/modify",
- summary="编辑费控制度",
- description="编辑费控制度 (alipay.ebpp.invoice.institution.modify)",
- )
- async def modify_institution_controller(
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:modify"]))],
- ) -> JSONResponse:
- """编辑费控制度"""
- institution_modify_model = AlipayEbppInvoiceInstitutionModifyModel(**data)
- result = await InstitutionService.modify_institution_service(auth=auth, data=institution_modify_model)
- log.info(f"编辑费控制度成功: institution_id={institution_modify_model.institution_id}")
- return SuccessResponse(data=result, msg="编辑费控制度成功")
- # ========== 制度成员范围管理 ==========
- @InstitutionRouter.get(
- "/{institution_id}/scope",
- summary="查询制度成员范围",
- description="查询制度下成员范围 (alipay.ebpp.invoice.institution.scopepageinfo.query)",
- )
- async def list_scope_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:scope:list"]))],
- enterprise_id: Annotated[str | None, Query(description="企业ID")] = None,
- owner_type: Annotated[str | None, Query(description="适配ID类型")] = None,
- page_num: Annotated[int, Query(description="页码")] = 1,
- page_size: Annotated[int, Query(description="每页条数")] = 20,
- ) -> JSONResponse:
- """查询制度成员"""
- result = await InstitutionScopeService.scopepageinfo_query_service(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=enterprise_id,
- page_num=page_num,
- page_size=page_size,
- owner_type=owner_type,
- )
- return SuccessResponse(data=result, msg="查询成功")
- @InstitutionRouter.post(
- "/{institution_id}/scope",
- summary="设置制度成员范围",
- description="设置/修改制度成员范围 (alipay.ebpp.invoice.institution.scope.modify)",
- )
- async def modify_scope_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:scope:modify"]))],
- ) -> JSONResponse:
- """设置制度成员"""
- result = await InstitutionScopeService.scope_modify_service(
- auth=auth,
- institution_id=institution_id,
- data=data,
- )
- log.info(f"设置制度成员成功: institution_id={institution_id}, adapter_type={data.get('adapter_type')}")
- return SuccessResponse(data=result, msg="设置成功")
- # ========== 自动额度发放规则管理 ==========
- @InstitutionRouter.post(
- "/{institution_id}/issuerule",
- summary="创建自动发放规则",
- description="创建自动额度发放规则 (alipay.ebpp.invoice.issuerule.create)",
- )
- async def create_issuerule_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:issuerule:create"]))],
- ) -> JSONResponse:
- """创建自动发放规则"""
- result = await IssueruleService.create_issuerule_service(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=data.get("enterprise_id", ""),
- quota_type=data.get("quota_type", "CAP"),
- issue_type=data.get("issue_type", "ISSUE_MONTH"),
- issue_amount_value=data.get("issue_amount_value", "0"),
- outer_source_id=data.get("outer_source_id"),
- issue_rule_name=data.get("issue_rule_name"),
- effective_period=data.get("effective_period"),
- invalid_mode=data.get("invalid_mode"),
- share_mode=data.get("share_mode"),
- )
- log.info(f"创建自动发放规则成功: institution_id={institution_id}")
- return SuccessResponse(data=result, msg="创建自动发放规则成功")
- @InstitutionRouter.put(
- "/{institution_id}/issuerule/{issue_rule_id}",
- summary="编辑自动发放规则",
- description="编辑自动额度发放规则 (alipay.ebpp.invoice.issuerule.modify)",
- )
- async def modify_issuerule_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- issue_rule_id: Annotated[str, Path(description="发放规则ID")],
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:issuerule:modify"]))],
- ) -> JSONResponse:
- result = await IssueruleService.modify_issuerule_service(
- auth=auth,
- institution_id=institution_id,
- issue_rule_id=issue_rule_id,
- enterprise_id=data.get("enterprise_id", ""),
- quota_type=data.get("quota_type"),
- issue_type=data.get("issue_type"),
- issue_amount_value=data.get("issue_amount_value"),
- issue_rule_name=data.get("issue_rule_name"),
- effective=data.get("effective"),
- effective_period=data.get("effective_period"),
- invalid_mode=data.get("invalid_mode"),
- share_mode=data.get("share_mode"),
- )
- log.info(f"编辑自动发放规则成功: issue_rule_id={issue_rule_id}")
- return SuccessResponse(data=result, msg="编辑自动发放规则成功")
- @InstitutionRouter.delete(
- "/{institution_id}/issuerule",
- summary="删除自动发放规则",
- description="删除自动额度发放规则 (alipay.ebpp.invoice.issuerule.delete)",
- )
- async def delete_issuerule_controller(
- institution_id: Annotated[str, Path(description="制度ID")],
- data: dict,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:expense:institution:issuerule:delete"]))],
- ) -> JSONResponse:
- result = await IssueruleService.delete_issuerule_service(
- auth=auth,
- institution_id=institution_id,
- issue_rule_id_list=data.get("issue_rule_id_list", []),
- enterprise_id=data.get("enterprise_id", ""),
- )
- log.info(f"删除自动发放规则成功: institution_id={institution_id}")
- return SuccessResponse(data=result, msg="删除自动发放规则成功")
|