import asyncio from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.plugin.module_payment.expense.institution.schema import InstitutionListOutSchema, InstitutionCreateSchema from .crud import InstitutionCRUD from .enums import InstitutionStatusEnum from alipay.aop.api.request.AlipayEbppInvoiceInstitutionCreateRequest import ( AlipayEbppInvoiceInstitutionCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionCreateModel import ( AlipayEbppInvoiceInstitutionCreateModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionCreateResponse import ( AlipayEbppInvoiceInstitutionCreateResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionPageinfoQueryRequest import ( AlipayEbppInvoiceInstitutionPageinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionPageinfoQueryModel import ( AlipayEbppInvoiceInstitutionPageinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionPageinfoQueryResponse import ( AlipayEbppInvoiceInstitutionPageinfoQueryResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDetailinfoQueryRequest import ( AlipayEbppInvoiceInstitutionDetailinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDetailinfoQueryModel import ( AlipayEbppInvoiceInstitutionDetailinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDetailinfoQueryResponse import ( AlipayEbppInvoiceInstitutionDetailinfoQueryResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import ( AlipayEbppInvoiceInstitutionDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import ( AlipayEbppInvoiceInstitutionDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDeleteResponse import ( AlipayEbppInvoiceInstitutionDeleteResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionModifyRequest import ( AlipayEbppInvoiceInstitutionModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionModifyModel import ( AlipayEbppInvoiceInstitutionModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionModifyResponse import ( AlipayEbppInvoiceInstitutionModifyResponse, ) class InstitutionService: """费控制度服务层""" @classmethod def _execute_alipay(cls, request): """同步执行支付宝调用(通过线程池避免阻塞事件循环)""" client = AlipayClient.get_client() return client.execute(request) @classmethod async def create_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionCreateModel ) -> AlipayEbppInvoiceInstitutionCreateResponse: """ 创建费控制度(仅调 institution.create,不包含串联流程) 调用: alipay.ebpp.invoice.institution.create """ if data.enterprise_id is None: raise CustomException(msg="创建费控制度失败: 企业ID不能为空") data.currency = 'CNY' request = AlipayEbppInvoiceInstitutionCreateRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="创建费控制度失败: 无响应") result = AlipayEbppInvoiceInstitutionCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建费控制度失败: {result.msg}") return result @classmethod async def create_institution_full_flow( cls, auth: AuthSchema, institution_model: AlipayEbppInvoiceInstitutionCreateModel, enterprise_id: str, scope_data: dict | None = None, issuerule_data: dict | None = None, raw_data: dict | None = None, ) -> dict: """ 创建费控制度(完整串联流程) 流程: 1. institution.create → 获取 institution_id 2. scope.modify ← 如有适用成员数据(scope_data) 3. issuerule.create ← 如为"按固定周期发放"(issuerule_data) 4. 保存到本地DB(制度 + 使用规则 + 发放规则) """ # 第1步:创建制度 institution_result = await cls.create_institution_service(auth=auth, data=institution_model) institution_id = institution_result.institution_id try: # 第2步:设置适用成员(如有) scope_modified = False if scope_data and scope_data.get("adapter_type") and scope_data.get("adapter_type") != "NONE": await InstitutionScopeService.scope_modify_service( auth=auth, institution_id=institution_id, data={ "enterprise_id": enterprise_id, "adapter_type": scope_data["adapter_type"], "owner_type": scope_data.get("owner_type"), "add_owner_id_list": scope_data.get("add_owner_id_list"), }, ) scope_modified = True log.info(f"成员设置成功: institution_id={institution_id}") # 第3步:创建自动发放规则(如为"按固定周期发放") issue_rule_id = None if issuerule_data: issuerule_result = await IssueruleService.create_issuerule_service( auth=auth, institution_id=institution_id, enterprise_id=enterprise_id, quota_type=issuerule_data.get("quota_type", "CAP"), issue_type=issuerule_data.get("issue_type", "ISSUE_MONTH"), issue_amount_value=issuerule_data.get("issue_amount_value", "0"), outer_source_id=issuerule_data.get("outer_source_id"), issue_rule_name=issuerule_data.get("issue_rule_name"), effective_period=issuerule_data.get("effective_period"), invalid_mode=issuerule_data.get("invalid_mode", 0), share_mode=issuerule_data.get("share_mode", 0), ) issue_rule_id = issuerule_result.get("issue_rule_id") log.info(f"发放规则创建成功: institution_id={institution_id}, issue_rule_id={issue_rule_id}") except Exception as e: # 子步骤失败:删除已创建的支付宝制度(补偿事务) log.error(f"创建串联流程失败: {e},开始回滚 institution_id={institution_id}") try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import ( AlipayEbppInvoiceInstitutionDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import ( AlipayEbppInvoiceInstitutionDeleteModel, ) rollback_model = AlipayEbppInvoiceInstitutionDeleteModel() rollback_model.institution_id = institution_id rollback_model.enterprise_id = enterprise_id req = AlipayEbppInvoiceInstitutionDeleteRequest() req.biz_model = rollback_model await asyncio.to_thread(cls._execute_alipay, req) log.info(f"回滚成功: 已删除 institution_id={institution_id}") except Exception as rollback_err: log.error(f"回滚失败: {rollback_err}") raise # 第4步:保存到本地DB create_data = InstitutionCreateSchema( enterprise_id=enterprise_id, institution_id=institution_id, institution_name=getattr(institution_model, 'institution_name', None), institution_desc=getattr(institution_model, 'institution_desc', None), scene_type=getattr(institution_model, 'scene_type', None), expense_type=getattr(institution_model, 'expense_type', None), expense_sub_type=getattr(institution_model, 'expense_sub_type', None), status=InstitutionStatusEnum.INSTITUTION_CREATE.value, effective=getattr(institution_model, 'effective', None), effective_start_date=getattr(institution_model, 'effective_start_date', None), effective_end_date=getattr(institution_model, 'effective_end_date', None), consult_mode=getattr(institution_model, 'consult_mode', None), multi_employee_share_mode=getattr(institution_model, 'multi_employee_share_mode', None), currency=getattr(institution_model, 'currency', None), grant_mode=(raw_data or {}).get("grant_mode"), period_type=(raw_data or {}).get("period_type"), amount=(raw_data or {}).get("amount"), single_limit=(raw_data or {}).get("single_limit"), effective_time_type=(raw_data or {}).get("effective_time_type"), applicable_scope=(raw_data or {}).get("applicable_scope"), ) create_data_dict = create_data.model_dump(exclude_unset=True) crud = InstitutionCRUD(auth) await crud.create(create_data_dict) # 第5步:保存使用规则到本地 if raw_data and raw_data.get("standard_info_list") and hasattr(institution_result, 'standard_id_info_list') and institution_result.standard_id_info_list: from app.plugin.module_payment.expense.rule.crud import RuleCRUD from app.plugin.module_payment.expense.rule.service import RuleService standard_id_map = {} for info in institution_result.standard_id_info_list: if hasattr(info, 'outer_source_id') and hasattr(info, 'standard_id'): standard_id_map[info.outer_source_id] = info.standard_id for idx, std in enumerate(raw_data["standard_info_list"]): condition_list = std.get("standard_condition_info_list", []) single_limit_val = None for cond in (condition_list or []): if cond.get("rule_factor") == "QUOTA_TOTAL": try: single_limit_val = float(cond.get("rule_value", 0)) except (ValueError, TypeError): pass std_data = { "out_biz_no": std.get("outer_source_id", f"std_{institution_id}_{idx}"), "institution_id": institution_id, "rule_id": standard_id_map.get(std.get("outer_source_id", "")), "standard_name": std.get("standard_name"), "standard_desc": std.get("standard_desc"), "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"), "enterprise_id": enterprise_id, "tenant_id": auth.user.tenant_id if auth.user else 1, "condition_info": condition_list, "single_limit": single_limit_val, } try: from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from sqlalchemy import insert stmt = insert(ExpenseRuleModel).values(**std_data) await auth.db.execute(stmt) await auth.db.flush() except Exception as e: log.warning(f"保存使用规则到本地失败: {e}") # 第6步:保存发放规则到本地 if issuerule_data and issue_rule_id: from app.plugin.module_payment.expense.quota.model import QuotaModel quota_save_data = { "employee_id": "", "institution_id": institution_id, "out_biz_no": issuerule_data.get("outer_source_id", f"issue_{institution_id}"), "quota_id": issue_rule_id, "total_amount": float(issuerule_data.get("issue_amount_value", 0)), "available_amount": float(issuerule_data.get("issue_amount_value", 0)), "status": "QUOTA_ACTIVE", "enterprise_id": enterprise_id, "tenant_id": auth.user.tenant_id if auth.user else 1, } try: from sqlalchemy import insert stmt = insert(QuotaModel).values(**quota_save_data) await auth.db.execute(stmt) await auth.db.flush() except Exception as e: log.warning(f"保存发放规则到本地失败: {e}") return { "institution_id": institution_id, "scope_modified": scope_modified, "issue_rule_id": issue_rule_id, } @classmethod async def pageinfo_query_service( cls, auth: AuthSchema, enterprise_id: str, page_no: int = 1, page_size: int = 20, institution_name: str | None = None, ) -> dict: """ 从支付宝查询费控制度列表 调用: alipay.ebpp.invoice.institution.pageinfo.query 失败时降级到本地DB """ try: model = AlipayEbppInvoiceInstitutionPageinfoQueryModel() model.enterprise_id = enterprise_id model.page_num = page_no model.page_size = page_size if institution_name: model.institution_name = institution_name req = AlipayEbppInvoiceInstitutionPageinfoQueryRequest() req.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, req) if response: result = AlipayEbppInvoiceInstitutionPageinfoQueryResponse() result.parse_response_content(response) if result.is_success(): return { "page_no": getattr(result, 'page_num', page_no) or page_no, "page_size": getattr(result, 'page_size', page_size) or page_size, "total": getattr(result, 'total_page_count', 0) or 0, "list": getattr(result, 'institution_list', []) or [], } log.warning("支付宝 pageinfo.query 失败,降级到本地DB") except Exception as e: log.warning(f"支付宝 pageinfo.query 异常: {e},降级到本地DB") # 降级:查本地DB crud = InstitutionCRUD(auth) search = {"enterprise_id": enterprise_id} if institution_name: search["institution_name"] = institution_name offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search, out_schema=InstitutionListOutSchema, ) @classmethod async def detailinfo_query_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, ) -> dict | None: """ 从支付宝查询费控制度详情,并补充本地规则和额度数据 调用: alipay.ebpp.invoice.institution.detailinfo.query 失败时降级到本地DB """ result_dict = None try: model = AlipayEbppInvoiceInstitutionDetailinfoQueryModel() model.institution_id = institution_id model.enterprise_id = enterprise_id req = AlipayEbppInvoiceInstitutionDetailinfoQueryRequest() req.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, req) if response: result = AlipayEbppInvoiceInstitutionDetailinfoQueryResponse() result.parse_response_content(response) if result.is_success(): result_dict = result.to_alipay_dict() if not result_dict: log.warning("支付宝 detailinfo.query 失败,降级到本地DB") except Exception as e: log.warning(f"支付宝 detailinfo.query 异常: {e},降级到本地DB") # 降级:查本地DB if not result_dict: crud = InstitutionCRUD(auth) obj = await crud.get(institution_id=institution_id, enterprise_id=enterprise_id) if obj: result_dict = InstitutionListOutSchema.model_validate(obj).model_dump() if not result_dict: return None # 补充本地规则和额度 from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import select # 查使用规则 rule_stmt = select(ExpenseRuleModel).where(ExpenseRuleModel.institution_id == institution_id) rule_result = await auth.db.execute(rule_stmt) rules = rule_result.scalars().all() if rules: rule_list = [] for r in rules: rule_item = { "rule_id": r.rule_id, "standard_name": r.standard_name, "standard_desc": r.standard_desc, } if hasattr(r, 'condition_info') and r.condition_info: rule_item["condition_info"] = r.condition_info if hasattr(r, 'single_limit') and r.single_limit: rule_item["single_limit"] = float(r.single_limit) rule_list.append(rule_item) result_dict["rule_list"] = rule_list # 查额度 quota_stmt = select(QuotaModel).where(QuotaModel.institution_id == institution_id) quota_result = await auth.db.execute(quota_stmt) quotas = quota_result.scalars().all() if quotas: result_dict["quota_list"] = [ { "quota_id": q.quota_id, "total_amount": float(q.total_amount) if q.total_amount else 0, "available_amount": float(q.available_amount) if q.available_amount else 0, "status": q.status, } for q in quotas ] return result_dict @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询费控制度列表 优先调支付宝,失败降级到本地DB """ enterprise_id = (search or {}).get("enterprise_id", "") institution_name = (search or {}).get("name") or (search or {}).get("institution_name") if enterprise_id: return await cls.pageinfo_query_service( auth=auth, enterprise_id=enterprise_id, page_no=page_no, page_size=page_size, institution_name=institution_name, ) # 无 enterprise_id 时直接查本地 crud = InstitutionCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=InstitutionListOutSchema, ) @classmethod async def delete_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionDeleteModel ) -> AlipayEbppInvoiceInstitutionDeleteResponse: """ 删除费控制度 调用: alipay.ebpp.invoice.institution.delete """ request = AlipayEbppInvoiceInstitutionDeleteRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="删除费控制度失败: 无响应") result = AlipayEbppInvoiceInstitutionDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除费控制度失败: {result.msg}") # 同步删除本地记录 try: crud = InstitutionCRUD(auth) institution_id = getattr(data, 'institution_id', None) if institution_id: obj = await crud.get(institution_id=institution_id) if obj: await crud.delete(ids=[obj.id]) log.info(f"已删除本地记录: institution_id={institution_id}") except Exception as e: log.warning(f"删除本地记录失败(不影响支付宝侧): {e}") return result @classmethod async def modify_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionModifyModel, raw_data: dict | None = None ) -> AlipayEbppInvoiceInstitutionModifyResponse: """ 编辑费控制度 调用: alipay.ebpp.invoice.institution.modify 支付宝成功后同步更新本地DB: - 制度基本信息 - 使用规则(standard_info_list → pay_expense_rule) - 额度(issuerule → pay_expense_quota) """ if data.institution_id is None: raise CustomException(msg="编辑费控制度失败: 制度ID不能为空") request = AlipayEbppInvoiceInstitutionModifyRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="编辑费控制度失败: 无响应") result = AlipayEbppInvoiceInstitutionModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"编辑费控制度失败: {result.msg}") # 同步更新本地数据库 institution_id = getattr(data, 'institution_id', None) if not institution_id: return result try: crud = InstitutionCRUD(auth) update_data = {} if hasattr(data, 'institution_name') and data.institution_name: update_data['institution_name'] = data.institution_name if hasattr(data, 'institution_desc') and data.institution_desc: update_data['institution_desc'] = data.institution_desc if hasattr(data, 'effective') and data.effective is not None: update_data['effective'] = data.effective update_data['status'] = ( InstitutionStatusEnum.INSTITUTION_EFFECTIVE.value if data.effective == "1" else InstitutionStatusEnum.INSTITUTION_INVALID.value ) if hasattr(data, 'effective_start_date') and data.effective_start_date: update_data['effective_start_date'] = data.effective_start_date if hasattr(data, 'effective_end_date') and data.effective_end_date: update_data['effective_end_date'] = data.effective_end_date if update_data: await crud.update_by_institution_id(institution_id, update_data) log.info(f"已更新本地制度: institution_id={institution_id}") # 同步标准规则(modify_standard_detail_info) std_detail = (raw_data or {}).get("modify_standard_detail_info") or {} if std_detail: from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from sqlalchemy import delete as sa_delete # 删除规则 delete_ids = std_detail.get("delete_standard_id_list", []) if delete_ids: d_stmt = sa_delete(ExpenseRuleModel).where( ExpenseRuleModel.rule_id.in_(delete_ids) ) await auth.db.execute(d_stmt) # 新增规则 add_list = std_detail.get("add_standard_list", []) from sqlalchemy import insert as sa_insert for std in add_list: ins_data = { "out_biz_no": std.get("outer_source_id", f"std_{institution_id}"), "institution_id": institution_id, "rule_id": std.get("standard_id"), "standard_name": std.get("standard_name"), "standard_desc": std.get("standard_desc"), "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"), "enterprise_id": raw_data.get("enterprise_id", ""), "tenant_id": auth.user.tenant_id if auth.user else 1, } stmt = sa_insert(ExpenseRuleModel).values(**ins_data) await auth.db.execute(stmt) # 修改规则 modify_list = std_detail.get("modify_standard_list", []) for std in modify_list: std_id = std.get("standard_id", "").strip('"') update_std = {} if std.get("standard_name"): update_std["standard_name"] = std["standard_name"] if std.get("standard_desc"): update_std["standard_desc"] = std["standard_desc"] if update_std and std_id: from sqlalchemy import update as sa_update u_stmt = sa_update(ExpenseRuleModel).where( ExpenseRuleModel.rule_id == std_id ).values(**update_std) await auth.db.execute(u_stmt) await auth.db.flush() log.info(f"已同步使用规则: institution_id={institution_id}") # 同步发放规则(modify_issue_rule_detail_info) issue_detail = (raw_data or {}).get("modify_issue_rule_detail_info") or {} if issue_detail: from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import delete as sa_delete # 删除发放规则 delete_ids = issue_detail.get("delete_issue_rule_id_list", []) if delete_ids: d_stmt = sa_delete(QuotaModel).where( QuotaModel.quota_id.in_(delete_ids) ) await auth.db.execute(d_stmt) # 新增发放规则 add_list = issue_detail.get("add_issue_rule_list", []) for rule in add_list: amount = float(rule.get("issue_amount_value", 0)) ins_data = { "employee_id": "", "institution_id": institution_id, "out_biz_no": rule.get("outer_source_id", f"issue_{institution_id}"), "quota_id": rule.get("issue_rule_id"), "total_amount": amount, "available_amount": amount, "status": "QUOTA_ACTIVE", "enterprise_id": raw_data.get("enterprise_id", ""), "tenant_id": auth.user.tenant_id if auth.user else 1, } stmt = sa_insert(QuotaModel).values(**ins_data) await auth.db.execute(stmt) # 修改发放规则 modify_rule = issue_detail.get("modify_issue_rule_list") or {} if modify_rule.get("issue_rule_id"): q_id = modify_rule["issue_rule_id"].strip('"') update_q = {} if modify_rule.get("issue_amount_value"): update_q["total_amount"] = float(modify_rule["issue_amount_value"]) update_q["available_amount"] = float(modify_rule["issue_amount_value"]) if modify_rule.get("issue_rule_name"): update_q["standard_name"] = modify_rule["issue_rule_name"] if update_q and q_id: from sqlalchemy import update as sa_update u_stmt = sa_update(QuotaModel).where( QuotaModel.quota_id == q_id ).values(**update_q) await auth.db.execute(u_stmt) await auth.db.flush() log.info(f"已同步发放规则: institution_id={institution_id}") except Exception as e: log.warning(f"本地同步失败(不影响支付宝侧): {e}") return result class InstitutionScopeService: """费控制度成员范围服务层""" @classmethod def _execute_alipay(cls, request): """同步执行支付宝调用""" client = AlipayClient.get_client() return client.execute(request) @classmethod async def scope_modify_service( cls, auth: AuthSchema, institution_id: str, data: dict, ) -> dict: """ 设置/修改制度成员范围 调用: alipay.ebpp.invoice.institution.scope.modify """ try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopeModifyRequest import ( AlipayEbppInvoiceInstitutionScopeModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopeModifyModel import ( AlipayEbppInvoiceInstitutionScopeModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopeModifyResponse import ( AlipayEbppInvoiceInstitutionScopeModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scope-modify)") model = AlipayEbppInvoiceInstitutionScopeModifyModel() model.institution_id = institution_id model.enterprise_id = data.get("enterprise_id", "") model.adapter_type = data.get("adapter_type", "EMPLOYEE_ALL") if data.get("owner_type"): model.owner_type = data["owner_type"] if data.get("add_owner_id_list"): model.add_owner_id_list = data["add_owner_id_list"] if data.get("delete_owner_id_list"): model.delete_owner_id_list = data["delete_owner_id_list"] request = AlipayEbppInvoiceInstitutionScopeModifyRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="设置制度成员失败: 无响应") result = AlipayEbppInvoiceInstitutionScopeModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"设置制度成员失败: {result.code} - {result.msg}") raise CustomException(msg=f"设置制度成员失败: {result.msg}") return {"result": True} @classmethod async def scopepageinfo_query_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str | None = None, page_num: int = 1, page_size: int = 20, owner_type: str | None = None, ) -> dict: """ 查询制度成员范围 调用: alipay.ebpp.invoice.institution.scopepageinfo.query """ try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopepageinfoQueryModel import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scopepageinfo-query)") model = AlipayEbppInvoiceInstitutionScopepageinfoQueryModel() model.institution_id = institution_id model.page_num = page_num model.page_size = page_size if enterprise_id: model.enterprise_id = enterprise_id if owner_type: model.owner_type = owner_type request = AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="查询制度成员失败: 无响应") result = AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"查询制度成员失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询制度成员失败: {result.msg}") return { "page_num": getattr(result, 'page_num', page_num) or page_num, "page_size": getattr(result, 'page_size', page_size) or page_size, "total_page_count": getattr(result, 'total_page_count', 0) or 0, "adapter_type": getattr(result, 'adapter_type', None), "owner_id_list": getattr(result, 'owner_id_list', []) or [], "owner_open_id_list": getattr(result, 'onwer_open_id_list', []) or [], "scope_info_list": [ { "adapter_type": getattr(result, 'adapter_type', None), "owner_id_list": getattr(result, 'owner_id_list', []) or [], "owner_open_id_list": getattr(result, 'onwer_open_id_list', []) or [], } ] if getattr(result, 'adapter_type', None) else [], } class IssueruleService: """自动额度发放规则服务层""" ISSUE_TYPE_MAP = { "daily": "ISSUE_DAY", "weekly": "ISSUE_WEEK", "monthly": "ISSUE_MONTH", "quarterly": "ISSUE_QUARTER", "yearly": "ISSUE_YEAR", } @classmethod def _execute_alipay(cls, request): client = AlipayClient.get_client() return client.execute(request) @classmethod async def create_issuerule_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, quota_type: str, issue_type: str, issue_amount_value: str, outer_source_id: str | None = None, issue_rule_name: str | None = None, effective_period: str | None = None, invalid_mode: int | None = None, share_mode: int | None = None, ) -> dict: """ 创建自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.create """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleCreateRequest import ( AlipayEbppInvoiceIssueruleCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleCreateModel import ( AlipayEbppInvoiceIssueruleCreateModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleCreateResponse import ( AlipayEbppInvoiceIssueruleCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-create)") # 参数约束校验 if quota_type == "CAP" and invalid_mode is not None and invalid_mode != 1: raise CustomException(msg="余额类型(CP)的发放规则必须为可累计(invalid_mode=1)") if quota_type == "COUNT" and share_mode is not None and share_mode != 0: raise CustomException(msg="次卡类型(COUNT)的发放规则不可转赠(share_mode=0)") model = AlipayEbppInvoiceIssueruleCreateModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.quota_type = quota_type model.issue_type = issue_type model.issue_amount_value = issue_amount_value model.enterprise_id = enterprise_id if outer_source_id: model.outer_source_id = outer_source_id if issue_rule_name: model.issue_rule_name = issue_rule_name if effective_period: model.effective_period = effective_period if invalid_mode is not None: model.invalid_mode = invalid_mode if share_mode is not None: model.share_mode = share_mode request = AlipayEbppInvoiceIssueruleCreateRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="创建发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"创建发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建发放规则失败: {result.msg}") return { "issue_rule_id": getattr(result, 'issue_rule_id', None), } @classmethod async def modify_issuerule_service( cls, auth: AuthSchema, institution_id: str, issue_rule_id: str, enterprise_id: str, quota_type: str | None = None, issue_type: str | None = None, issue_amount_value: str | None = None, issue_rule_name: str | None = None, effective: str | None = None, effective_period: str | None = None, invalid_mode: int | None = None, share_mode: int | None = None, ) -> dict: """ 编辑自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.modify """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleModifyRequest import ( AlipayEbppInvoiceIssueruleModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleModifyModel import ( AlipayEbppInvoiceIssueruleModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleModifyResponse import ( AlipayEbppInvoiceIssueruleModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-modify)") model = AlipayEbppInvoiceIssueruleModifyModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.issue_rule_id = issue_rule_id model.action = "MODIFY_BASIC_INFO" model.enterprise_id = enterprise_id if issue_rule_name: model.issue_rule_name = issue_rule_name if quota_type: model.quota_type = quota_type if issue_type: model.issue_type = issue_type if issue_amount_value: model.issue_amount_value = issue_amount_value if effective is not None: model.effective = effective if effective_period: model.effective_period = effective_period if invalid_mode is not None: model.invalid_mode = invalid_mode if share_mode is not None: model.share_mode = share_mode request = AlipayEbppInvoiceIssueruleModifyRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="编辑发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"编辑发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"编辑发放规则失败: {result.msg}") return {"result": True} @classmethod async def delete_issuerule_service( cls, auth: AuthSchema, institution_id: str, issue_rule_id_list: list[str], enterprise_id: str, ) -> dict: """ 删除自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.delete """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleDeleteRequest import ( AlipayEbppInvoiceIssueruleDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleDeleteModel import ( AlipayEbppInvoiceIssueruleDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleDeleteResponse import ( AlipayEbppInvoiceIssueruleDeleteResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-delete)") model = AlipayEbppInvoiceIssueruleDeleteModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.issue_rule_id_list = issue_rule_id_list model.enterprise_id = enterprise_id request = AlipayEbppInvoiceIssueruleDeleteRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="删除发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"删除发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除发放规则失败: {result.msg}") return {"result": True}