# from app.api.v1.module_system.auth.schema import AuthSchema # from app.core.alipay import AlipayClient # from app.core.exceptions import CustomException # from app.core.logger import log # from app.utils.snowflake import get_snowflake_id # from .enums import RuleStatusEnum # from .schema import ( # ExpenseRuleCreateSchema, # ) # from alipay.aop.api.request.AlipayEbppInvoiceInstitutionExpenseruleCreateRequest import ( # AlipayEbppInvoiceInstitutionExpenseruleCreateRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionExpenseruleCreateModel import ( # AlipayEbppInvoiceInstitutionExpenseruleCreateModel, # ) # from alipay.aop.api.domain.StandardConditionInfo import ( # StandardConditionInfo, # ) # from alipay.aop.api.response.AlipayEbppInvoiceInstitutionExpenseruleCreateResponse import ( # AlipayEbppInvoiceInstitutionExpenseruleCreateResponse, # ) # class RuleService: # """使用规则服务层""" # @classmethod # async def create_expense_rule_service( # cls, auth: AuthSchema, data: ExpenseRuleCreateSchema # ) -> RuleOperationOutSchema: # """ # 创建费控使用规则 # 调用: alipay.ebpp.invoice.institution.expenserule.create # """ # crud = RuleCRUD(auth) # out_biz_no = data.outer_source_id or str(get_snowflake_id()) # model = AlipayEbppInvoiceInstitutionExpenseruleCreateModel() # model.institution_id = data.institution_id # model.standard_name = data.standard_name # model.expense_type_sub_category = data.expense_type_sub_category # model.outer_source_id = out_biz_no # if data.standard_condition_info_list: # condition_list = [] # for item in data.standard_condition_info_list: # condition = StandardConditionInfo() # condition.rule_factor = item.rule_factor # condition.rule_value = item.rule_value # if item.rule_operator: # condition.rule_operator = item.rule_operator # if item.rule_name: # condition.rule_name = item.rule_name # condition_list.append(condition) # model.standard_condition_info_list = condition_list # if data.enterprise_id: # model.enterprise_id = data.enterprise_id # if data.open_rule_id: # model.open_rule_id = data.open_rule_id # if data.payment_policy: # model.payment_policy = data.payment_policy # if data.consume_mode: # model.consume_mode = data.consume_mode # request = AlipayEbppInvoiceInstitutionExpenseruleCreateRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="创建费控使用规则失败: 无响应") # result = AlipayEbppInvoiceInstitutionExpenseruleCreateResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"创建费控使用规则失败: {result.msg}") # rule_data = data.model_dump(exclude_none=True) # rule_data["out_biz_no"] = out_biz_no # rule_data["name"] = data.standard_name # rule_data["status"] = RuleStatusEnum.RULE_ACTIVE.value # if result.rule_id: # rule_data["rule_id"] = result.rule_id # rule = await crud.create(rule_data) # if not rule: # raise CustomException(msg="创建使用规则记录失败") # return RuleOperationOutSchema(out_biz_no=out_biz_no, rule_id=result.rule_id) # @classmethod # async def create_rule_service( # cls, auth: AuthSchema, data: RuleCreateSchema # ) -> RuleOperationOutSchema: # """ # 创建使用规则 (本地) # """ # crud = RuleCRUD(auth) # out_biz_no = str(get_snowflake_id()) # rule_data = data.model_dump(exclude_none=True) # rule_data["out_biz_no"] = out_biz_no # rule = await crud.create(rule_data) # if not rule: # raise CustomException(msg="创建使用规则记录失败") # return RuleOperationOutSchema(out_biz_no=out_biz_no, rule_id=rule.rule_id) # @classmethod # async def modify_expense_rule_service( # cls, auth: AuthSchema, out_biz_no: str, data: ExpenseRuleModifySchema # ) -> RuleOperationOutSchema: # """ # 编辑使用规则 # 调用: alipay.ebpp.invoice.institution.expenserule.modify # """ # crud = RuleCRUD(auth) # rule = await crud.get_by_out_biz_no(out_biz_no) # if not rule: # raise CustomException(msg="使用规则不存在") # try: # from alipay.aop.api.request.AlipayEbppInvoiceInstitutionExpenseruleModifyRequest import ( # AlipayEbppInvoiceInstitutionExpenseruleModifyRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionExpenseruleModifyModel import ( # AlipayEbppInvoiceInstitutionExpenseruleModifyModel, # ) # from alipay.aop.api.response.AlipayEbppInvoiceInstitutionExpenseruleModifyResponse import ( # AlipayEbppInvoiceInstitutionExpenseruleModifyResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceInstitutionExpenseruleModifyModel() # model.institution_id = data.institution_id # model.standard_id = data.standard_id # model.action = data.action # model.enterprise_id = data.enterprise_id # if data.standard_name: # model.standard_name = data.standard_name # if data.standard_desc: # model.standard_desc = data.standard_desc # if data.open_rule_id: # model.open_rule_id = data.open_rule_id # if data.payment_policy: # model.payment_policy = data.payment_policy # if data.consume_mode: # model.consume_mode = data.consume_mode # request = AlipayEbppInvoiceInstitutionExpenseruleModifyRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="编辑使用规则失败: 无响应") # result = AlipayEbppInvoiceInstitutionExpenseruleModifyResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"编辑使用规则失败: {result.msg}") # update_data = data.model_dump(exclude_unset=True, exclude_none=True) # exclude_fields = ["institution_id", "standard_id", "action", "enterprise_id"] # for field in exclude_fields: # update_data.pop(field, None) # if update_data: # await crud.update(id=rule.id, data=update_data) # return RuleOperationOutSchema(out_biz_no=out_biz_no, rule_id=data.standard_id) # @classmethod # async def delete_expense_rule_service( # cls, auth: AuthSchema, out_biz_no: str, data: ExpenseRuleDeleteSchema # ) -> RuleOperationOutSchema: # """ # 删除使用规则 # 调用: alipay.ebpp.invoice.institution.expenserule.delete # """ # crud = RuleCRUD(auth) # rule = await crud.get_by_out_biz_no(out_biz_no) # if not rule: # raise CustomException(msg="使用规则不存在") # try: # from alipay.aop.api.request.AlipayEbppInvoiceInstitutionExpenseruleDeleteRequest import ( # AlipayEbppInvoiceInstitutionExpenseruleDeleteRequest, # ) # from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionExpenseruleDeleteModel import ( # AlipayEbppInvoiceInstitutionExpenseruleDeleteModel, # ) # from alipay.aop.api.response.AlipayEbppInvoiceInstitutionExpenseruleDeleteResponse import ( # AlipayEbppInvoiceInstitutionExpenseruleDeleteResponse, # ) # except ImportError: # raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # model = AlipayEbppInvoiceInstitutionExpenseruleDeleteModel() # model.institution_id = data.institution_id # model.standard_id_list = data.standard_id_list # model.enterprise_id = data.enterprise_id # request = AlipayEbppInvoiceInstitutionExpenseruleDeleteRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="删除使用规则失败: 无响应") # result = AlipayEbppInvoiceInstitutionExpenseruleDeleteResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"删除使用规则失败: {result.msg}") # await crud.delete(id=rule.id) # return RuleOperationOutSchema( # out_biz_no=out_biz_no, # result=result.result, # ) # @classmethod # async def list_service( # cls, # auth: AuthSchema, # page_no: int = 1, # page_size: int = 20, # search: dict | None = None, # ) -> dict: # crud = RuleCRUD(auth) # offset = (page_no - 1) * page_size # return await crud.page( # offset=offset, # limit=page_size, # order_by=[{"id": "desc"}], # search=search or {}, # out_schema=RuleListOutSchema, # ) # @classmethod # async def detail_service( # cls, auth: AuthSchema, out_biz_no: str # ) -> RuleOutSchema: # crud = RuleCRUD(auth) # rule = await crud.get_by_out_biz_no(out_biz_no) # if not rule: # raise CustomException(msg="使用规则不存在") # return RuleOutSchema.model_validate(rule) # @classmethod # async def update_service( # cls, auth: AuthSchema, out_biz_no: str, data: RuleUpdateSchema # ) -> RuleOperationOutSchema: # crud = RuleCRUD(auth) # rule = await crud.get_by_out_biz_no(out_biz_no) # if not rule: # raise CustomException(msg="使用规则不存在") # update_data = data.model_dump(exclude_unset=True) # if update_data: # await crud.update(id=rule.id, data=update_data) # return RuleOperationOutSchema(out_biz_no=out_biz_no, rule_id=rule.rule_id)