from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id from .enums import QuotaStatusEnum from .schema import ( ExpenseQuotaCreateSchema, ExpenseQuotaDeleteSchema, ExpenseQuotaModifySchema, ExpenseQuotaQueryOutSchema, ExpenseQuotaQuerySchema, QuotaCreateSchema, QuotaDetailInfoSchema, QuotaListOutSchema, QuotaOperationOutSchema, QuotaOutSchema, QuotaUpdateSchema, ) from .crud import QuotaCRUD class QuotaService: """额度服务层""" @classmethod async def create_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema ) -> QuotaOperationOutSchema: """ 创建余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.create """ crud = QuotaCRUD(auth) out_biz_no = data.outer_source_id or str(get_snowflake_id()) try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import ( AlipayEbppInvoiceExpensecontrolQuotaCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import ( AlipayEbppInvoiceExpensecontrolQuotaCreateModel, ) from alipay.aop.api.domain.IssueQuotaTarget import ( IssueQuotaTarget, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import ( AlipayEbppInvoiceExpensecontrolQuotaCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel() model.target_type = data.target_type model.target_id = data.target_id model.enterprise_id = data.enterprise_id model.outer_source_id = out_biz_no model.quota_type = data.quota_type or "CAP" model.share_mode = data.share_mode or "0" if data.effective_start_date: model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S") if data.effective_end_date: model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S") if data.issue_name: model.issue_name = data.issue_name if data.issue_desc: model.issue_desc = data.issue_desc if data.issue_quota_target_list: target_list = [] for item in data.issue_quota_target_list: target = IssueQuotaTarget() target.owner_type = item.owner_type target.owner_id = item.owner_id target.quota = item.quota if item.amount is not None: target.amount = item.amount target_list.append(target) model.issue_quota_target_list = target_list request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="创建余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建余额/点券失败: {result.msg}") quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if result.quota_id: quota_data["quota_id"] = result.quota_id quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id) @classmethod async def create_quota_service( cls, auth: AuthSchema, data: QuotaCreateSchema ) -> QuotaOperationOutSchema: """创建额度""" crud = QuotaCRUD(auth) out_biz_no = str(get_snowflake_id()) quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if quota_data.get("available_amount") is None: quota_data["available_amount"] = quota_data.get("total_amount", 0) quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id) @classmethod async def query_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema ) -> ExpenseQuotaQueryOutSchema: """ 查询余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.query """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( AlipayEbppInvoiceExpensecontrolQuotaQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() model.owner_type = data.owner_type model.page_size = data.page_size model.page_num = data.page_num if data.target_type: model.target_type = data.target_type if data.target_id: model.target_id = data.target_id if data.owner_id: model.owner_id = data.owner_id if data.owner_open_id: model.owner_open_id = data.owner_open_id if data.enterprise_id: model.enterprise_id = data.enterprise_id if data.quota_id_list: model.quota_id_list = data.quota_id_list if data.quota_type: model.quota_type = data.quota_type request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询余额/点券失败: {result.msg}") return ExpenseQuotaQueryOutSchema( page_num=result.page_num or data.page_num, page_size=result.page_size or data.page_size, total_page_count=result.total_page_count or 0, ) @classmethod async def modify_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema ) -> QuotaOperationOutSchema: """ 修改余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.modify """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import ( AlipayEbppInvoiceExpensecontrolQuotaModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import ( AlipayEbppInvoiceExpensecontrolQuotaModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import ( AlipayEbppInvoiceExpensecontrolQuotaModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel() model.quota_id = data.quota_id model.action = data.action model.outer_source_id = data.outer_source_id model.enterprise_id = data.enterprise_id if data.amount is not None: model.amount = str(data.amount) if data.share_mode: model.share_mode = data.share_mode request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="修改余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"修改余额/点券失败: {result.msg}") return QuotaOperationOutSchema( out_biz_no=out_biz_no, quota_id=data.quota_id, result=result.success, ) @classmethod async def delete_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema ) -> QuotaOperationOutSchema: """ 删除额度 调用: alipay.ebpp.invoice.expensecontrol.quota.delete """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel() model.enterprise_id = data.enterprise_id if data.quota_id: model.quota_id = data.quota_id if data.issue_batch_id: model.issue_batch_id = data.issue_batch_id request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="删除额度失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除额度失败: {result.msg}") await crud.delete(id=quota.id) return QuotaOperationOutSchema(out_biz_no=out_biz_no) @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: crud = QuotaCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=QuotaListOutSchema, )