# from app.api.v1.module_system.auth.schema import AuthSchema # from app.core.alipay import AlipayClient # from app.core.exceptions import CustomException # from app.core.logger import log # from app.utils.snowflake import get_snowflake_id # from .enums import QuotaStatusEnum # from .schema import ( # ExpenseQuotaCreateSchema, # ExpenseQuotaDeleteSchema, # ExpenseQuotaModifySchema, # ExpenseQuotaQueryOutSchema, # ExpenseQuotaQuerySchema, # QuotaCreateSchema, # QuotaDetailInfoSchema, # QuotaListOutSchema, # QuotaOperationOutSchema, # QuotaOutSchema, # QuotaUpdateSchema, # ) # class QuotaService: # """额度服务层""" # @classmethod # async def create_expense_quota_service( # cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema # ) -> QuotaOperationOutSchema: # """ # 创建余额/点券 # 调用: alipay.ebpp.invoice.expensecontrol.quota.create # """ # crud = QuotaCRUD(auth) # out_biz_no = data.outer_source_id or str(get_snowflake_id()) # try: # from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import ( # AlipayEbppInvoiceExpensecontrolQuotaCreateRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import ( # AlipayEbppInvoiceExpensecontrolQuotaCreateModel, # ) # from alipay.aop.api.domain.IssueQuotaTarget import ( # IssueQuotaTarget, # ) # from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import ( # AlipayEbppInvoiceExpensecontrolQuotaCreateResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel() # model.target_type = data.target_type # model.target_id = data.target_id # model.enterprise_id = data.enterprise_id # model.outer_source_id = out_biz_no # model.quota_type = data.quota_type or "CAP" # model.share_mode = data.share_mode or "0" # if data.effective_start_date: # model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S") # if data.effective_end_date: # model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S") # if data.issue_name: # model.issue_name = data.issue_name # if data.issue_desc: # model.issue_desc = data.issue_desc # if data.issue_quota_target_list: # target_list = [] # for item in data.issue_quota_target_list: # target = IssueQuotaTarget() # target.owner_type = item.owner_type # target.owner_id = item.owner_id # target.quota = item.quota # if item.amount is not None: # target.amount = item.amount # target_list.append(target) # model.issue_quota_target_list = target_list # request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="创建余额/点券失败: 无响应") # result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"创建余额/点券失败: {result.msg}") # quota_data = data.model_dump(exclude_none=True) # quota_data["out_biz_no"] = out_biz_no # quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value # if result.quota_id: # quota_data["quota_id"] = result.quota_id # quota = await crud.create(quota_data) # if not quota: # raise CustomException(msg="创建额度记录失败") # return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id) # @classmethod # async def create_quota_service( # cls, auth: AuthSchema, data: QuotaCreateSchema # ) -> QuotaOperationOutSchema: # """ # 创建额度 # 调用: alipay.ebpp.invoice.expensecontrol.quota.create # """ # crud = QuotaCRUD(auth) # out_biz_no = str(get_snowflake_id()) # quota_data = data.model_dump(exclude_none=True) # quota_data["out_biz_no"] = out_biz_no # quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value # if quota_data.get("available_amount") is None: # quota_data["available_amount"] = quota_data.get("total_amount", 0) # quota = await crud.create(quota_data) # if not quota: # raise CustomException(msg="创建额度记录失败") # return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id) # @classmethod # async def query_expense_quota_service( # cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema # ) -> ExpenseQuotaQueryOutSchema: # """ # 查询余额/点券 # 调用: alipay.ebpp.invoice.expensecontrol.quota.query # """ # try: # from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( # AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( # AlipayEbppInvoiceExpensecontrolQuotaQueryModel, # ) # from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( # AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() # model.owner_type = data.owner_type # model.page_size = data.page_size # model.page_num = data.page_num # if data.target_type: # model.target_type = data.target_type # if data.target_id: # model.target_id = data.target_id # if data.owner_id: # model.owner_id = data.owner_id # if data.owner_open_id: # model.owner_open_id = data.owner_open_id # if data.enterprise_id: # model.enterprise_id = data.enterprise_id # if data.quota_id_list: # model.quota_id_list = data.quota_id_list # if data.quota_type: # model.quota_type = data.quota_type # request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="查询余额/点券失败: 无响应") # result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"查询余额/点券失败: {result.msg}") # return ExpenseQuotaQueryOutSchema( # page_num=result.page_num or data.page_num, # page_size=result.page_size or data.page_size, # total_page_count=result.total_page_count or 0, # ) # @classmethod # async def modify_expense_quota_service( # cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema # ) -> QuotaOperationOutSchema: # """ # 修改余额/点券 # 调用: alipay.ebpp.invoice.expensecontrol.quota.modify # """ # crud = QuotaCRUD(auth) # quota = await crud.get_by_out_biz_no(out_biz_no) # if not quota: # raise CustomException(msg="额度不存在") # try: # from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import ( # AlipayEbppInvoiceExpensecontrolQuotaModifyRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import ( # AlipayEbppInvoiceExpensecontrolQuotaModifyModel, # ) # from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import ( # AlipayEbppInvoiceExpensecontrolQuotaModifyResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel() # model.quota_id = data.quota_id # model.action = data.action # model.outer_source_id = data.outer_source_id # model.enterprise_id = data.enterprise_id # if data.amount is not None: # model.amount = str(data.amount) # if data.share_mode: # model.share_mode = data.share_mode # request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="修改余额/点券失败: 无响应") # result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"修改余额/点券失败: {result.msg}") # return QuotaOperationOutSchema( # out_biz_no=out_biz_no, # quota_id=data.quota_id, # result=result.success, # ) # @classmethod # async def delete_expense_quota_service( # cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema # ) -> QuotaOperationOutSchema: # """ # 删除额度 # 调用: alipay.ebpp.invoice.expensecontrol.quota.delete # """ # crud = QuotaCRUD(auth) # quota = await crud.get_by_out_biz_no(out_biz_no) # if not quota: # raise CustomException(msg="额度不存在") # try: # from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import ( # AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import ( # AlipayEbppInvoiceExpensecontrolQuotaDeleteModel, # ) # from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import ( # AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel() # model.enterprise_id = data.enterprise_id # if data.quota_id: # model.quota_id = data.quota_id # if data.issue_batch_id: # model.issue_batch_id = data.issue_batch_id # request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="删除额度失败: 无响应") # result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"删除额度失败: {result.msg}") # await crud.delete(id=quota.id) # return QuotaOperationOutSchema(out_biz_no=out_biz_no) # @classmethod # async def list_service( # cls, # auth: AuthSchema, # page_no: int = 1, # page_size: int = 20, # search: dict | None = None, # ) -> dict: # crud = QuotaCRUD(auth) # offset = (page_no - 1) * page_size # return await crud.page( # offset=offset, # limit=page_size, # order_by=[{"id": "desc"}], # search=search or {}, # out_schema=QuotaListOutSchema, # ) # @classmethod # async def detail_service( # cls, auth: AuthSchema, out_biz_no: str # ) -> QuotaOutSchema: # pass # @classmethod # async def update_service( # cls, auth: AuthSchema, out_biz_no: str, data: QuotaUpdateSchema # ) -> QuotaOperationOutSchema: # # quota = await crud.get_by_out_biz_no(out_biz_no) # # if not quota: # # raise CustomException(msg="额度不存在") # # update_data = data.model_dump(exclude_unset=True) # # if update_data: # # await crud.update(id=quota.id, data=update_data) # # return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id) # pass