| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347 |
- import asyncio
- from datetime import datetime
- from decimal import Decimal
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.plugin.module_payment.expense.institution.schema import InstitutionListOutSchema, InstitutionCreateSchema
- from .crud import InstitutionCRUD
- from .enums import InstitutionStatusEnum
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionCreateRequest import (
- AlipayEbppInvoiceInstitutionCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionCreateModel import (
- AlipayEbppInvoiceInstitutionCreateModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionCreateResponse import (
- AlipayEbppInvoiceInstitutionCreateResponse,
- )
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionPageinfoQueryRequest import (
- AlipayEbppInvoiceInstitutionPageinfoQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionPageinfoQueryModel import (
- AlipayEbppInvoiceInstitutionPageinfoQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionPageinfoQueryResponse import (
- AlipayEbppInvoiceInstitutionPageinfoQueryResponse,
- )
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDetailinfoQueryRequest import (
- AlipayEbppInvoiceInstitutionDetailinfoQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDetailinfoQueryModel import (
- AlipayEbppInvoiceInstitutionDetailinfoQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDetailinfoQueryResponse import (
- AlipayEbppInvoiceInstitutionDetailinfoQueryResponse,
- )
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import (
- AlipayEbppInvoiceInstitutionDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import (
- AlipayEbppInvoiceInstitutionDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDeleteResponse import (
- AlipayEbppInvoiceInstitutionDeleteResponse,
- )
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionModifyRequest import (
- AlipayEbppInvoiceInstitutionModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionModifyModel import (
- AlipayEbppInvoiceInstitutionModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionModifyResponse import (
- AlipayEbppInvoiceInstitutionModifyResponse,
- )
- class InstitutionService:
- """费控制度服务层"""
- @classmethod
- def _execute_alipay(cls, request):
- """同步执行支付宝调用(通过线程池避免阻塞事件循环)"""
- client = AlipayClient.get_client()
- return client.execute(request)
- @classmethod
- async def create_institution_service(
- cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionCreateModel
- ) -> AlipayEbppInvoiceInstitutionCreateResponse:
- """
- 创建费控制度(仅调 institution.create,不包含串联流程)
- 调用: alipay.ebpp.invoice.institution.create
- """
- if data.enterprise_id is None:
- raise CustomException(msg="创建费控制度失败: 企业ID不能为空")
- data.currency = 'CNY'
- request = AlipayEbppInvoiceInstitutionCreateRequest()
- request.biz_model = data
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="创建费控制度失败: 无响应")
- result = AlipayEbppInvoiceInstitutionCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建费控制度失败: {result.msg}")
- return result
- @classmethod
- async def create_institution_full_flow(
- cls,
- auth: AuthSchema,
- institution_model: AlipayEbppInvoiceInstitutionCreateModel,
- enterprise_id: str,
- scope_data: dict | None = None,
- issuerule_data: dict | None = None,
- raw_data: dict | None = None,
- ) -> dict:
- """
- 创建费控制度(完整串联流程)
- 流程:
- 1. institution.create → 获取 institution_id
- 2. scope.modify ← 如有适用成员数据(scope_data)
- 3. issuerule.create ← 如为"按固定周期发放"(issuerule_data)
- 4. 保存到本地DB(制度 + 使用规则 + 发放规则)
- """
- # 第1步:创建制度
- institution_result = await cls.create_institution_service(auth=auth, data=institution_model)
- institution_id = institution_result.institution_id
- try:
- # 第2步:设置适用成员(如有)
- scope_modified = False
- if scope_data and scope_data.get("adapter_type") and scope_data.get("adapter_type") != "NONE":
- await InstitutionScopeService.scope_modify_service(
- auth=auth,
- institution_id=institution_id,
- data={
- "enterprise_id": enterprise_id,
- "adapter_type": scope_data["adapter_type"],
- "owner_type": scope_data.get("owner_type"),
- "add_owner_id_list": scope_data.get("add_owner_id_list"),
- },
- )
- scope_modified = True
- log.info(f"成员设置成功: institution_id={institution_id}")
- # 第3步:创建自动发放规则(如为"按固定周期发放")
- issue_rule_id = None
- if issuerule_data:
- issuerule_result = await IssueruleService.create_issuerule_service(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=enterprise_id,
- quota_type=issuerule_data.get("quota_type", "CAP"),
- issue_type=issuerule_data.get("issue_type", "ISSUE_MONTH"),
- issue_amount_value=issuerule_data.get("issue_amount_value", "0"),
- outer_source_id=issuerule_data.get("outer_source_id"),
- issue_rule_name=issuerule_data.get("issue_rule_name"),
- effective_period=issuerule_data.get("effective_period"),
- invalid_mode=issuerule_data.get("invalid_mode", 0),
- share_mode=issuerule_data.get("share_mode", 0),
- )
- issue_rule_id = issuerule_result.get("issue_rule_id")
- log.info(f"发放规则创建成功: institution_id={institution_id}, issue_rule_id={issue_rule_id}")
- except Exception as e:
- # 子步骤失败:删除已创建的支付宝制度(补偿事务)
- log.error(f"创建串联流程失败: {e},开始回滚 institution_id={institution_id}")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import (
- AlipayEbppInvoiceInstitutionDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import (
- AlipayEbppInvoiceInstitutionDeleteModel,
- )
- rollback_model = AlipayEbppInvoiceInstitutionDeleteModel()
- rollback_model.institution_id = institution_id
- rollback_model.enterprise_id = enterprise_id
- req = AlipayEbppInvoiceInstitutionDeleteRequest()
- req.biz_model = rollback_model
- await asyncio.to_thread(cls._execute_alipay, req)
- log.info(f"回滚成功: 已删除 institution_id={institution_id}")
- except Exception as rollback_err:
- log.error(f"回滚失败: {rollback_err}")
- raise
- # 第4步:保存到本地DB
- create_data = InstitutionCreateSchema(
- enterprise_id=enterprise_id,
- institution_id=institution_id,
- institution_name=getattr(institution_model, 'institution_name', None),
- institution_desc=getattr(institution_model, 'institution_desc', None),
- scene_type=getattr(institution_model, 'scene_type', None),
- expense_type=getattr(institution_model, 'expense_type', None),
- expense_sub_type=getattr(institution_model, 'expense_sub_type', None),
- status=InstitutionStatusEnum.INSTITUTION_CREATE.value,
- effective=getattr(institution_model, 'effective', None),
- effective_start_date=getattr(institution_model, 'effective_start_date', None),
- effective_end_date=getattr(institution_model, 'effective_end_date', None),
- consult_mode=getattr(institution_model, 'consult_mode', None),
- multi_employee_share_mode=getattr(institution_model, 'multi_employee_share_mode', None),
- currency=getattr(institution_model, 'currency', None),
- grant_mode=(raw_data or {}).get("grant_mode"),
- period_type=(raw_data or {}).get("period_type"),
- amount=(raw_data or {}).get("amount"),
- single_limit=(raw_data or {}).get("single_limit"),
- effective_time_type=(raw_data or {}).get("effective_time_type"),
- applicable_scope=(raw_data or {}).get("applicable_scope"),
- )
- create_data_dict = create_data.model_dump(exclude_unset=True)
- crud = InstitutionCRUD(auth)
- await crud.create(create_data_dict)
- # 第5步:保存使用规则到本地
- if raw_data and raw_data.get("standard_info_list") and hasattr(institution_result, 'standard_id_info_list') and institution_result.standard_id_info_list:
- from app.plugin.module_payment.expense.rule.crud import RuleCRUD
- from app.plugin.module_payment.expense.rule.service import RuleService
- standard_id_map = {}
- for info in institution_result.standard_id_info_list:
- if hasattr(info, 'outer_source_id') and hasattr(info, 'standard_id'):
- standard_id_map[info.outer_source_id] = info.standard_id
- for idx, std in enumerate(raw_data["standard_info_list"]):
- condition_list = std.get("standard_condition_info_list", [])
- single_limit_val = None
- for cond in (condition_list or []):
- if cond.get("rule_factor") == "QUOTA_TOTAL":
- try:
- single_limit_val = float(cond.get("rule_value", 0))
- except (ValueError, TypeError):
- pass
- std_data = {
- "out_biz_no": std.get("outer_source_id", f"std_{institution_id}_{idx}"),
- "institution_id": institution_id,
- "rule_id": standard_id_map.get(std.get("outer_source_id", "")),
- "standard_name": std.get("standard_name"),
- "standard_desc": std.get("standard_desc"),
- "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"),
- "enterprise_id": enterprise_id,
- "tenant_id": auth.user.tenant_id if auth.user else 1,
- "condition_info": condition_list,
- "single_limit": single_limit_val,
- }
- try:
- from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
- from sqlalchemy import insert
- stmt = insert(ExpenseRuleModel).values(**std_data)
- await auth.db.execute(stmt)
- await auth.db.flush()
- except Exception as e:
- log.warning(f"保存使用规则到本地失败: {e}")
- # 第6步:按适用范围创建员工额度记录
- if scope_data and scope_data.get("adapter_type") and scope_data.get("adapter_type") != "NONE":
- try:
- await cls._create_institution_quotas(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=enterprise_id,
- scope_data=scope_data,
- raw_data=raw_data,
- issue_rule_id=issue_rule_id,
- )
- except Exception as e:
- log.warning(f"创建员工额度记录失败(不影响支付宝侧): {e}")
- return {
- "institution_id": institution_id,
- "scope_modified": scope_modified,
- "issue_rule_id": issue_rule_id,
- }
- @classmethod
- async def _create_institution_quotas(
- cls,
- auth: AuthSchema,
- institution_id: str,
- enterprise_id: str,
- scope_data: dict,
- raw_data: dict | None = None,
- issue_rule_id: str | None = None,
- ):
- """按适用范围创建员工额度记录
- 规则:
- - 定额发放 + 制度在有效期内:total=定额值, available=定额值, ACTIVE
- - 定额发放 + 制度未生效/已过期:total=0, available=0, PENDING
- - 手工发放:total=0, available=0, PENDING
- """
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import insert, select
- grant_mode = (raw_data or {}).get("grant_mode", "manual")
- amount_val = float((raw_data or {}).get("amount", 0) or 0)
- tenant_id = auth.user.tenant_id if auth.user else 1
- # 判断制度是否在有效期内
- now = datetime.now()
- is_in_period = True
- try:
- start_str = (raw_data or {}).get("effective_start_date")
- end_str = (raw_data or {}).get("effective_end_date")
- if start_str:
- start_dt = datetime.fromisoformat(str(start_str).replace('Z', '').replace('T', ' ')[:19])
- if now < start_dt:
- is_in_period = False
- if end_str and is_in_period:
- end_dt = datetime.fromisoformat(str(end_str).replace('Z', '').replace('T', ' ')[:19])
- if now > end_dt:
- is_in_period = False
- except Exception:
- is_in_period = True
- # 定额+有效期内 → ACTIVE,否则 → PENDING
- is_active = (grant_mode == "period") and amount_val > 0 and is_in_period
- # 收集员工ID列表
- employee_ids: list[str] = []
- adapter_type = scope_data.get("adapter_type", "")
- add_ids = scope_data.get("add_owner_id_list") or []
- if adapter_type == "EMPLOYEE_SELECT":
- # 按员工选择 → 直接使用传入的员工ID
- employee_ids = [str(i) for i in add_ids if i]
- elif adapter_type == "EMPLOYEE_ALL":
- # 全体员工 → 仅已签约员工
- from app.plugin.module_payment.employee.model import EmployeeModel
- emp_stmt = select(EmployeeModel).where(
- EmployeeModel.enterprise_id == enterprise_id,
- EmployeeModel.status == "ACTIVATED",
- )
- emp_result = await auth.db.execute(emp_stmt)
- employee_ids = [emp.employee_id for emp in emp_result.scalars().all() if emp.employee_id]
- elif adapter_type == "EMPLOYEE_DEPARTMENT":
- # 按部门 → 仅已签约员工
- for dept_id in add_ids:
- dept_id_str = str(dept_id)
- from app.plugin.module_payment.employee.model import EmployeeModel
- emp_stmt = select(EmployeeModel).where(
- EmployeeModel.enterprise_id == enterprise_id,
- EmployeeModel.status == "ACTIVATED",
- )
- emp_result = await auth.db.execute(emp_stmt)
- for emp in emp_result.scalars().all():
- if emp.department_ids and dept_id_str in emp.department_ids:
- employee_ids.append(emp.employee_id)
- # 去重
- employee_ids = list(set(employee_ids))
- if not employee_ids:
- log.info(f"无员工需要创建额度记录: institution_id={institution_id}")
- return
- now = datetime.now()
- if is_active:
- # 定额+有效期内:直接赋值
- total = Decimal(str(amount_val))
- available = total
- status = QuotaStatusEnum.QUOTA_ACTIVE.value
- else:
- # 手工发放 或 定额但未到有效期:待发放
- total = Decimal("0")
- available = Decimal("0")
- status = QuotaStatusEnum.QUOTA_PENDING.value
- for emp_id in employee_ids:
- # 检查是否已有记录,避免重复
- check = select(QuotaModel).where(
- QuotaModel.employee_id == emp_id,
- QuotaModel.institution_id == institution_id,
- )
- existing = await auth.db.execute(check)
- if existing.scalar_one_or_none():
- continue
- stmt = insert(QuotaModel).values(
- employee_id=emp_id,
- institution_id=institution_id,
- quota_type="CAP" if grant_mode == "period" else None,
- target_type="INSTITUTION",
- target_id=institution_id,
- quota_id=issue_rule_id,
- out_biz_no=f"inst_{institution_id}_{emp_id}",
- total_amount=total,
- available_amount=available,
- status=status,
- enterprise_id=enterprise_id,
- tenant_id=tenant_id,
- )
- await auth.db.execute(stmt)
- await auth.db.flush()
- log.info(
- f"创建员工额度记录完成: institution_id={institution_id}, "
- f"count={len(employee_ids)}, mode={'period' if (grant_mode == 'period') and amount_val > 0 else 'manual'}, "
- f"status={status}, amount={float(total)}, in_period={is_in_period}"
- )
- @classmethod
- async def _sync_modify_quotas_by_scope(
- cls,
- auth: AuthSchema,
- institution_id: str,
- enterprise_id: str,
- scope_info: dict,
- raw_data: dict | None = None,
- ):
- """修改制度时同步员工额度记录
- 新增员工:按发放模式+有效期创建额度记录
- 删除员工:删除对应额度记录
- 部门模式(EMPLOYEE_DEPARTMENT):add/delete_ids 是部门ID,
- 需要先展开为员工ID列表再操作额度。
- """
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import insert, delete as sa_delete, select
- grant_mode = (raw_data or {}).get("grant_mode", "manual")
- amount_val = float((raw_data or {}).get("amount", 0) or 0)
- tenant_id = auth.user.tenant_id if auth.user else 1
- adapter_type = scope_info.get("adapter_type", "")
- # 判断制度是否在有效期内
- now = datetime.now()
- is_in_period = True
- try:
- start_str = (raw_data or {}).get("effective_start_date")
- end_str = (raw_data or {}).get("effective_end_date")
- if start_str:
- start_dt = datetime.fromisoformat(str(start_str).replace('Z', '').replace('T', ' ')[:19])
- if now < start_dt:
- is_in_period = False
- if end_str and is_in_period:
- end_dt = datetime.fromisoformat(str(end_str).replace('Z', '').replace('T', ' ')[:19])
- if now > end_dt:
- is_in_period = False
- except Exception:
- is_in_period = True
- is_active = (grant_mode == "period") and amount_val > 0 and is_in_period
- # 如为部门模式,将部门ID展开为员工ID列表
- if adapter_type in ("EMPLOYEE_DEPARTMENT", "DEPARTMENT_SELECT"):
- from app.plugin.module_payment.employee.model import EmployeeModel
- emp_all = await auth.db.execute(
- select(EmployeeModel).where(
- EmployeeModel.enterprise_id == enterprise_id,
- EmployeeModel.status == "EMPLOYEE_ACTIVATED",
- )
- )
- emp_by_dept: dict[str, list[str]] = {}
- for emp in emp_all.scalars().all():
- if emp.employee_id and emp.department_ids:
- for did in emp.department_ids:
- emp_by_dept.setdefault(str(did), []).append(emp.employee_id)
- raw_delete = scope_info.get("delete_owner_id_list") or []
- raw_add = scope_info.get("add_owner_id_list") or []
- delete_ids = []
- for did in raw_delete:
- delete_ids.extend(emp_by_dept.get(str(did), []))
- add_ids = []
- for did in raw_add:
- add_ids.extend(emp_by_dept.get(str(did), []))
- else:
- delete_ids = scope_info.get("delete_owner_id_list") or []
- add_ids = scope_info.get("add_owner_id_list") or []
- # 删除被移除的员工额度
- if delete_ids:
- del_stmt = sa_delete(QuotaModel).where(
- QuotaModel.institution_id == institution_id,
- QuotaModel.employee_id.in_(delete_ids),
- )
- await auth.db.execute(del_stmt)
- log.info(f"删除已移除员工额度: count={len(delete_ids)}")
- # 新增员工的额度
- if add_ids:
- total = Decimal(str(amount_val)) if is_active else Decimal("0")
- available = total
- status = QuotaStatusEnum.QUOTA_ACTIVE.value if is_active else QuotaStatusEnum.QUOTA_PENDING.value
- created = 0
- for emp_id in set(add_ids):
- emp_id_str = str(emp_id)
- check = select(QuotaModel).where(
- QuotaModel.employee_id == emp_id_str,
- QuotaModel.institution_id == institution_id,
- )
- existing = await auth.db.execute(check)
- if existing.scalar_one_or_none():
- continue
- stmt = insert(QuotaModel).values(
- employee_id=emp_id_str,
- institution_id=institution_id,
- out_biz_no=f"inst_{institution_id}_{emp_id_str}",
- total_amount=total,
- available_amount=available,
- status=status,
- enterprise_id=enterprise_id,
- tenant_id=tenant_id,
- )
- await auth.db.execute(stmt)
- created += 1
- if created:
- await auth.db.flush()
- log.info(f"新增员工额度: count={created}, is_active={is_active}, status={status}")
- @classmethod
- async def pageinfo_query_service(
- cls,
- auth: AuthSchema,
- enterprise_id: str,
- page_no: int = 1,
- page_size: int = 20,
- institution_name: str | None = None,
- ) -> dict:
- """
- 从支付宝查询费控制度列表
- 调用: alipay.ebpp.invoice.institution.pageinfo.query
- 失败时降级到本地DB
- """
- try:
- model = AlipayEbppInvoiceInstitutionPageinfoQueryModel()
- model.enterprise_id = enterprise_id
- model.page_num = page_no
- model.page_size = page_size
- if institution_name:
- model.institution_name = institution_name
- req = AlipayEbppInvoiceInstitutionPageinfoQueryRequest()
- req.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, req)
- if response:
- result = AlipayEbppInvoiceInstitutionPageinfoQueryResponse()
- result.parse_response_content(response)
- if result.is_success():
- return {
- "page_no": getattr(result, 'page_num', page_no) or page_no,
- "page_size": getattr(result, 'page_size', page_size) or page_size,
- "total": getattr(result, 'total_page_count', 0) or 0,
- "list": getattr(result, 'institution_list', []) or [],
- }
- log.warning("支付宝 pageinfo.query 失败,降级到本地DB")
- except Exception as e:
- log.warning(f"支付宝 pageinfo.query 异常: {e},降级到本地DB")
- # 降级:查本地DB
- crud = InstitutionCRUD(auth)
- search = {"enterprise_id": enterprise_id}
- if institution_name:
- search["institution_name"] = institution_name
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search,
- out_schema=InstitutionListOutSchema,
- )
- @classmethod
- async def detailinfo_query_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- enterprise_id: str,
- ) -> dict | None:
- """
- 从支付宝查询费控制度详情,并补充本地规则和额度数据
- 调用: alipay.ebpp.invoice.institution.detailinfo.query
- 失败时降级到本地DB
- """
- result_dict = None
- if not enterprise_id:
- from .crud import InstitutionCRUD as _InstitutionCRUD
- _inst_crud = _InstitutionCRUD(auth)
- _local_inst = await _inst_crud.get(institution_id=institution_id)
- if _local_inst and _local_inst.enterprise_id:
- enterprise_id = _local_inst.enterprise_id
- try:
- model = AlipayEbppInvoiceInstitutionDetailinfoQueryModel()
- model.institution_id = institution_id
- model.enterprise_id = enterprise_id
- req = AlipayEbppInvoiceInstitutionDetailinfoQueryRequest()
- req.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, req)
- if response:
- result = AlipayEbppInvoiceInstitutionDetailinfoQueryResponse()
- result.parse_response_content(response)
- if result.is_success():
- result_dict = {k: getattr(result, k) for k in (
- "adapter_type", "consult_mode", "currency", "effective",
- "effective_end_date", "effective_start_date", "expense_type",
- "institution_desc", "institution_id", "institution_name",
- "issue_rule_info_list", "multi_employee_share_mode",
- "outer_source_id", "owner_id_list", "owner_open_id_list",
- "owner_type", "scene_type", "standard_info_detail_list",
- "standard_info_list",
- ) if getattr(result, k, None) is not None}
- if not result_dict:
- log.warning("支付宝 detailinfo.query 失败,降级到本地DB")
- except Exception as e:
- log.warning(f"支付宝 detailinfo.query 异常: {e},降级到本地DB")
- # 降级:查本地DB
- if not result_dict:
- crud = InstitutionCRUD(auth)
- obj = await crud.get(institution_id=institution_id, enterprise_id=enterprise_id)
- if obj:
- result_dict = InstitutionListOutSchema.model_validate(obj).model_dump()
- if not result_dict:
- return None
- # 合并本地DB的自定义字段(支付宝不包含的字段)
- # 无条件覆盖,保证前端有数据
- try:
- from .crud import InstitutionCRUD as _crud_cls
- _crud = _crud_cls(auth)
- local_obj = await _crud.get(institution_id=institution_id, enterprise_id=enterprise_id)
- if local_obj:
- local_dict = InstitutionListOutSchema.model_validate(local_obj).model_dump()
- for field in ("applicable_scope", "grant_mode", "period_type", "amount",
- "single_limit", "effective_time_type", "enterprise_id", "status",
- "created_time", "updated_time", "consult_mode",
- "effective_start_date", "effective_end_date", "institution_name",
- "institution_desc", "expense_type", "scene_type", "currency"):
- val = local_dict.get(field)
- if val is not None:
- result_dict[field] = val
- except Exception as e:
- log.warning(f"合并本地DB字段失败: {e}")
- # 从收入额度记录(quota records)获取员工ID列表
- if not result_dict.get("employee_ids"):
- from sqlalchemy import select as _select
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- quota_stmt = _select(QuotaModel).where(
- QuotaModel.institution_id == institution_id,
- QuotaModel.employee_id.isnot(None),
- QuotaModel.employee_id != "",
- )
- quota_result = await auth.db.execute(quota_stmt)
- emp_ids = list(set(
- q.employee_id for q in quota_result.scalars().all() if q.employee_id
- ))
- if emp_ids:
- result_dict["employee_ids"] = emp_ids
- result_dict["scope_owner_id_list"] = emp_ids
- # 从支付宝 owner_id_list 映射(若以上均未取到)
- owner_ids = result_dict.get("owner_id_list")
- if not result_dict.get("employee_ids") and owner_ids and isinstance(owner_ids, list):
- result_dict["employee_ids"] = owner_ids
- result_dict["scope_owner_id_list"] = owner_ids
- # adapter_type → applicable_scope 兜底
- adapter_type = result_dict.get("adapter_type")
- if adapter_type and not result_dict.get("applicable_scope"):
- scope_map = {"EMPLOYEE_SELECT": "employee", "EMPLOYEE_DEPARTMENT": "department", "DEPARTMENT_SELECT": "department", "EMPLOYEE_ALL": "all"}
- result_dict["applicable_scope"] = scope_map.get(adapter_type, result_dict.get("applicable_scope", "none"))
- # 部门模式下,从 owner_id_list 提取 department_id
- if result_dict.get("applicable_scope") == "department" and not result_dict.get("department_id"):
- owner_ids = result_dict.get("owner_id_list") or result_dict.get("scope_owner_id_list")
- if owner_ids and isinstance(owner_ids, list) and len(owner_ids) > 0:
- result_dict["department_id"] = str(owner_ids[0])
- # 补充本地规则和额度
- from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import select
- # 查使用规则
- rule_stmt = select(ExpenseRuleModel).where(ExpenseRuleModel.institution_id == institution_id)
- rule_result = await auth.db.execute(rule_stmt)
- rules = rule_result.scalars().all()
- if rules:
- rule_list = []
- for r in rules:
- rule_item = {
- "rule_id": r.rule_id,
- "standard_name": r.standard_name,
- "standard_desc": r.standard_desc,
- }
- if hasattr(r, 'single_limit') and r.single_limit:
- rule_item["single_limit"] = float(r.single_limit)
- if hasattr(r, 'condition_info') and r.condition_info:
- rule_item["condition_info"] = r.condition_info
- for cond in r.condition_info:
- factor = cond.get("rule_factor")
- try:
- value = float(cond.get("rule_value", 0))
- except (ValueError, TypeError):
- continue
- if factor == "QUOTA_DAY":
- rule_item["max_day_amount"] = value
- elif factor == "QUOTA_MONTH":
- rule_item["max_month_amount"] = value
- elif factor == "QUOTA_QUARTER":
- rule_item["max_quarter_amount"] = value
- elif factor == "QUOTA_YEAR":
- rule_item["max_year_amount"] = value
- rule_list.append(rule_item)
- result_dict["rule_list"] = rule_list
- # 查额度
- quota_stmt = select(QuotaModel).where(QuotaModel.institution_id == institution_id).limit(1000)
- quota_result = await auth.db.execute(quota_stmt)
- quotas = quota_result.scalars().all()
- if quotas:
- result_dict["quota_list"] = [
- {
- "quota_id": q.quota_id,
- "employee_id": q.employee_id or "",
- "out_biz_no": q.out_biz_no,
- "total_amount": float(q.total_amount) if q.total_amount else 0,
- "available_amount": float(q.available_amount) if q.available_amount else 0,
- "status": q.status,
- }
- for q in quotas
- ]
- return result_dict
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """
- 查询费控制度列表
- 优先调支付宝,失败降级到本地DB
- """
- enterprise_id = (search or {}).get("enterprise_id", "")
- institution_name = (search or {}).get("name") or (search or {}).get("institution_name")
- if enterprise_id:
- return await cls.pageinfo_query_service(
- auth=auth,
- enterprise_id=enterprise_id,
- page_no=page_no,
- page_size=page_size,
- institution_name=institution_name,
- )
- # 无 enterprise_id 时直接查本地
- crud = InstitutionCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=InstitutionListOutSchema,
- )
- @classmethod
- async def delete_institution_service(
- cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionDeleteModel
- ) -> dict:
- """
- 删除费控制度
- 调用: alipay.ebpp.invoice.institution.delete
- 支付宝侧已删时忽略错误,始终清理本地关联表
- """
- institution_id = getattr(data, 'institution_id', None)
- # 调用支付宝删除(失败时仅告警,不影响本地清理)
- try:
- request = AlipayEbppInvoiceInstitutionDeleteRequest()
- request.biz_model = data
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if response:
- result = AlipayEbppInvoiceInstitutionDeleteResponse()
- result.parse_response_content(response)
- if result.is_success():
- log.info(f"支付宝删除成功: institution_id={institution_id}")
- else:
- log.warning(f"支付宝删除失败(可能已删): {result.code} - {result.msg}")
- else:
- log.warning("支付宝删除无响应,继续清理本地")
- except Exception as e:
- log.warning(f"支付宝删除异常(忽略): {e}")
- # 清理本地关联表
- if institution_id:
- try:
- from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.institution.model import ExpenseInstitutionModel
- from sqlalchemy import delete as sa_delete
- # 删规则
- await auth.db.execute(sa_delete(ExpenseRuleModel).where(ExpenseRuleModel.institution_id == institution_id))
- # 删额度
- await auth.db.execute(sa_delete(QuotaModel).where(QuotaModel.institution_id == institution_id))
- # 删制度
- await auth.db.execute(sa_delete(ExpenseInstitutionModel).where(ExpenseInstitutionModel.institution_id == institution_id))
- await auth.db.flush()
- log.info(f"本地关联数据已清理: institution_id={institution_id}")
- except Exception as e:
- log.warning(f"本地清理失败: {e}")
- return {"institution_id": institution_id, "deleted": True}
- @classmethod
- async def modify_institution_service(
- cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionModifyModel, raw_data: dict | None = None,
- scope_info: dict | None = None,
- ) -> dict:
- """
- 编辑费控制度
- 调用: alipay.ebpp.invoice.institution.modify
- 适用范围修改(scope_info)需单独调 scope.modify,与基础信息拆分两次请求
- 支付宝成功后同步更新本地DB:
- - 制度基本信息
- - 适用员工范围(scope)
- - 使用规则(standard_info_list → pay_expense_rule)
- - 额度(issuerule → pay_expense_quota)
- """
- if data.institution_id is None:
- raise CustomException(msg="编辑费控制度失败: 制度ID不能为空")
- institution_id = data.institution_id
- enterprise_id = getattr(data, 'enterprise_id', None) or (raw_data or {}).get("enterprise_id", "")
- raw_data = raw_data or {}
- # 第1步:修改支付宝制度基础信息(不含 scope)
- request = AlipayEbppInvoiceInstitutionModifyRequest()
- request.biz_model = data
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="编辑费控制度失败: 无响应")
- result = AlipayEbppInvoiceInstitutionModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"编辑费控制度失败: {result.msg}")
- # 第1.5步:单独调用 scope.modify(不与基础修改在同一请求中)
- if scope_info:
- # 全体员工模式:传去支付宝时剥离员工ID
- scope_data_for_alipay = scope_info.copy() if scope_info else {}
- if scope_info.get("adapter_type") == "EMPLOYEE_ALL":
- scope_data_for_alipay.pop("add_owner_id_list", None)
- scope_data_for_alipay.pop("delete_owner_id_list", None)
- try:
- await InstitutionScopeService.scope_modify_service(
- auth=auth, institution_id=institution_id, data=scope_data_for_alipay
- )
- log.info(f"适用范围已单独同步: adapter_type={scope_info.get('adapter_type')}")
- except Exception as e:
- log.warning(f"适用范围同步失败(不影响基础修改,本地DB将更新为最新值): {e}")
- # scope 变动后同步员工额度记录
- try:
- await cls._sync_modify_quotas_by_scope(
- auth=auth,
- institution_id=institution_id,
- enterprise_id=enterprise_id,
- scope_info=scope_info,
- raw_data=raw_data,
- )
- except Exception as e:
- log.warning(f"同步员工额度记录失败(不影响主体操作): {e}")
- applicable_scope = raw_data.get("applicable_scope", "")
- # 第2步:同步更新本地数据库(scope 已在 Alipay modify 请求中通过 modify_scope_info 处理)
- try:
- crud = InstitutionCRUD(auth)
- update_data = {}
- if hasattr(data, 'institution_name') and data.institution_name:
- update_data['institution_name'] = data.institution_name
- if hasattr(data, 'institution_desc') and data.institution_desc:
- update_data['institution_desc'] = data.institution_desc
- if hasattr(data, 'effective') and data.effective is not None:
- update_data['effective'] = data.effective
- update_data['status'] = (
- InstitutionStatusEnum.INSTITUTION_EFFECTIVE.value
- if data.effective == "1"
- else InstitutionStatusEnum.INSTITUTION_INVALID.value
- )
- if hasattr(data, 'effective_start_date') and data.effective_start_date:
- val = data.effective_start_date
- update_data['effective_start_date'] = datetime.fromisoformat(val.replace(' ', 'T')) if isinstance(val, str) else val
- if hasattr(data, 'effective_end_date') and data.effective_end_date:
- val = data.effective_end_date
- update_data['effective_end_date'] = datetime.fromisoformat(val.replace(' ', 'T')) if isinstance(val, str) else val
- if applicable_scope:
- update_data['applicable_scope'] = applicable_scope
- # 同步额外配置字段
- for field in ("grant_mode", "period_type", "amount", "single_limit", "effective_time_type", "expense_type"):
- val = raw_data.get(field)
- if val is not None:
- update_data[field] = val
- if update_data:
- await crud.update_by_institution_id(institution_id, update_data)
- log.info(f"已更新本地制度: institution_id={institution_id}")
- # 制度生效/失效时同步更新额度状态
- if hasattr(data, 'effective') and data.effective is not None:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import update as sa_quota_update
- new_quota_status = (
- QuotaStatusEnum.QUOTA_ACTIVE.value
- if data.effective == "1"
- else QuotaStatusEnum.QUOTA_FROZEN.value
- )
- quota_upd = sa_quota_update(QuotaModel).where(
- QuotaModel.institution_id == institution_id
- ).values(status=new_quota_status)
- await auth.db.execute(quota_upd)
- await auth.db.flush()
- log.info(f"已同步更新额度状态: institution_id={institution_id}, effective={data.effective}, quota_status={new_quota_status}")
- # 同步标准规则(modify_standard_detail_info)
- std_detail = (raw_data or {}).get("modify_standard_detail_info") or {}
- if std_detail:
- from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
- from sqlalchemy import delete as sa_delete
- # 删除规则
- delete_ids = std_detail.get("delete_standard_id_list", [])
- if delete_ids:
- d_stmt = sa_delete(ExpenseRuleModel).where(
- ExpenseRuleModel.rule_id.in_(delete_ids)
- )
- await auth.db.execute(d_stmt)
- # 新增规则
- add_list = std_detail.get("add_standard_list", [])
- from sqlalchemy import insert as sa_insert
- for std in add_list:
- ins_data = {
- "out_biz_no": std.get("outer_source_id", f"std_{institution_id}"),
- "institution_id": institution_id,
- "rule_id": std.get("standard_id"),
- "standard_name": std.get("standard_name"),
- "standard_desc": std.get("standard_desc"),
- "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"),
- "enterprise_id": raw_data.get("enterprise_id", ""),
- "tenant_id": auth.user.tenant_id if auth.user else 1,
- }
- stmt = sa_insert(ExpenseRuleModel).values(**ins_data)
- await auth.db.execute(stmt)
- # 修改规则
- modify_list = std_detail.get("modify_standard_list", [])
- for std in modify_list:
- std_id = std.get("standard_id", "").strip('"')
- update_std = {}
- if std.get("standard_name"):
- update_std["standard_name"] = std["standard_name"]
- if std.get("standard_desc"):
- update_std["standard_desc"] = std["standard_desc"]
- if update_std and std_id:
- from sqlalchemy import update as sa_update
- u_stmt = sa_update(ExpenseRuleModel).where(
- ExpenseRuleModel.rule_id == std_id
- ).values(**update_std)
- await auth.db.execute(u_stmt)
- await auth.db.flush()
- log.info(f"已同步使用规则: institution_id={institution_id}")
- # 同步发放规则(modify_issue_rule_detail_info)已去除
- # 发放规则对应的额度数据由外部消费同步时通过
- # alipay.ebpp.invoice.expensecomsue.outsource.notify 写入真实数据
- except Exception as e:
- log.warning(f"本地同步失败(不影响支付宝侧): {e}")
- return result
- class InstitutionScopeService:
- """费控制度成员范围服务层"""
- @classmethod
- def _execute_alipay(cls, request):
- """同步执行支付宝调用"""
- client = AlipayClient.get_client()
- return client.execute(request)
- @classmethod
- async def scope_modify_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- data: dict,
- ) -> dict:
- """
- 设置/修改制度成员范围
- 调用: alipay.ebpp.invoice.institution.scope.modify
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopeModifyRequest import (
- AlipayEbppInvoiceInstitutionScopeModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopeModifyModel import (
- AlipayEbppInvoiceInstitutionScopeModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopeModifyResponse import (
- AlipayEbppInvoiceInstitutionScopeModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scope-modify)")
- model = AlipayEbppInvoiceInstitutionScopeModifyModel()
- model.institution_id = institution_id
- model.enterprise_id = data.get("enterprise_id", "")
- model.adapter_type = data.get("adapter_type", "EMPLOYEE_ALL")
- if data.get("owner_type"):
- model.owner_type = data["owner_type"]
- if data.get("add_owner_id_list"):
- model.add_owner_id_list = data["add_owner_id_list"]
- if data.get("delete_owner_id_list"):
- model.delete_owner_id_list = data["delete_owner_id_list"]
- request = AlipayEbppInvoiceInstitutionScopeModifyRequest()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="设置制度成员失败: 无响应")
- result = AlipayEbppInvoiceInstitutionScopeModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"设置制度成员失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"设置制度成员失败: {sub_msg or result.msg}")
- return {"result": True}
- @classmethod
- async def scopepageinfo_query_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- enterprise_id: str | None = None,
- page_num: int = 1,
- page_size: int = 20,
- owner_type: str | None = None,
- ) -> dict:
- """
- 查询制度成员范围
- 调用: alipay.ebpp.invoice.institution.scopepageinfo.query
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest import (
- AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopepageinfoQueryModel import (
- AlipayEbppInvoiceInstitutionScopepageinfoQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse import (
- AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scopepageinfo-query)")
- model = AlipayEbppInvoiceInstitutionScopepageinfoQueryModel()
- model.institution_id = institution_id
- model.page_num = page_num
- model.page_size = page_size
- if not enterprise_id:
- # 从本地 DB 查找 enterprise_id
- from .crud import InstitutionCRUD
- inst_crud = InstitutionCRUD(auth)
- local_inst = await inst_crud.get(institution_id=institution_id)
- if local_inst and local_inst.enterprise_id:
- enterprise_id = local_inst.enterprise_id
- if enterprise_id:
- model.enterprise_id = enterprise_id
- if owner_type:
- model.owner_type = owner_type
- request = AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="查询制度成员失败: 无响应")
- result = AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"查询制度成员失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询制度成员失败: {result.msg}")
- return {
- "page_num": getattr(result, 'page_num', page_num) or page_num,
- "page_size": getattr(result, 'page_size', page_size) or page_size,
- "total_page_count": getattr(result, 'total_page_count', 0) or 0,
- "adapter_type": getattr(result, 'adapter_type', None),
- "owner_id_list": getattr(result, 'owner_id_list', []) or [],
- "owner_open_id_list": getattr(result, 'owner_open_id_list', []) or [],
- "scope_info_list": [
- {
- "adapter_type": getattr(result, 'adapter_type', None),
- "owner_id_list": getattr(result, 'owner_id_list', []) or [],
- "owner_open_id_list": getattr(result, 'owner_open_id_list', []) or [],
- }
- ] if getattr(result, 'adapter_type', None) else [],
- }
- class IssueruleService:
- """自动额度发放规则服务层"""
- ISSUE_TYPE_MAP = {
- "daily": "ISSUE_DAY",
- "weekly": "ISSUE_WEEK",
- "monthly": "ISSUE_MONTH",
- "quarterly": "ISSUE_QUARTER",
- "yearly": "ISSUE_YEAR",
- }
- @classmethod
- def _execute_alipay(cls, request):
- client = AlipayClient.get_client()
- return client.execute(request)
- @classmethod
- async def create_issuerule_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- enterprise_id: str,
- quota_type: str,
- issue_type: str,
- issue_amount_value: str,
- outer_source_id: str | None = None,
- issue_rule_name: str | None = None,
- effective_period: str | None = None,
- invalid_mode: int | None = None,
- share_mode: int | None = None,
- ) -> dict:
- """
- 创建自动额度发放规则
- 调用: alipay.ebpp.invoice.issuerule.create
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssueruleCreateRequest import (
- AlipayEbppInvoiceIssueruleCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleCreateModel import (
- AlipayEbppInvoiceIssueruleCreateModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssueruleCreateResponse import (
- AlipayEbppInvoiceIssueruleCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-create)")
- # 参数约束校验
- if quota_type == "CAP" and invalid_mode is not None and invalid_mode != 1:
- raise CustomException(msg="余额类型(CAP)的发放规则必须为可累计(invalid_mode=1)")
- if quota_type == "COUNT" and share_mode is not None and share_mode != 0:
- raise CustomException(msg="次卡类型(COUNT)的发放规则不可转赠(share_mode=0)")
- model = AlipayEbppInvoiceIssueruleCreateModel()
- model.target_type = "INSTITUTION"
- model.target_id = institution_id
- model.quota_type = quota_type
- model.issue_type = issue_type
- model.issue_amount_value = issue_amount_value
- model.enterprise_id = enterprise_id
- if outer_source_id:
- model.outer_source_id = outer_source_id
- if issue_rule_name:
- model.issue_rule_name = issue_rule_name
- if effective_period:
- model.effective_period = effective_period
- if invalid_mode is not None:
- model.invalid_mode = invalid_mode
- if share_mode is not None:
- model.share_mode = share_mode
- request = AlipayEbppInvoiceIssueruleCreateRequest()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="创建发放规则失败: 无响应")
- result = AlipayEbppInvoiceIssueruleCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"创建发放规则失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建发放规则失败: {result.msg}")
- return {
- "issue_rule_id": getattr(result, 'issue_rule_id', None),
- }
- @classmethod
- async def modify_issuerule_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- issue_rule_id: str,
- enterprise_id: str,
- quota_type: str | None = None,
- issue_type: str | None = None,
- issue_amount_value: str | None = None,
- issue_rule_name: str | None = None,
- effective: str | None = None,
- effective_period: str | None = None,
- invalid_mode: int | None = None,
- share_mode: int | None = None,
- ) -> dict:
- """
- 编辑自动额度发放规则
- 调用: alipay.ebpp.invoice.issuerule.modify
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssueruleModifyRequest import (
- AlipayEbppInvoiceIssueruleModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleModifyModel import (
- AlipayEbppInvoiceIssueruleModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssueruleModifyResponse import (
- AlipayEbppInvoiceIssueruleModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-modify)")
- model = AlipayEbppInvoiceIssueruleModifyModel()
- model.target_type = "INSTITUTION"
- model.target_id = institution_id
- model.issue_rule_id = issue_rule_id
- model.action = "MODIFY_BASIC_INFO"
- model.enterprise_id = enterprise_id
- if issue_rule_name:
- model.issue_rule_name = issue_rule_name
- if quota_type:
- model.quota_type = quota_type
- if issue_type:
- model.issue_type = issue_type
- if issue_amount_value:
- model.issue_amount_value = issue_amount_value
- if effective is not None:
- model.effective = effective
- if effective_period:
- model.effective_period = effective_period
- if invalid_mode is not None:
- model.invalid_mode = invalid_mode
- if share_mode is not None:
- model.share_mode = share_mode
- request = AlipayEbppInvoiceIssueruleModifyRequest()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="编辑发放规则失败: 无响应")
- result = AlipayEbppInvoiceIssueruleModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"编辑发放规则失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"编辑发放规则失败: {result.msg}")
- return {"result": True}
- @classmethod
- async def delete_issuerule_service(
- cls,
- auth: AuthSchema,
- institution_id: str,
- issue_rule_id_list: list[str],
- enterprise_id: str,
- ) -> dict:
- """
- 删除自动额度发放规则
- 调用: alipay.ebpp.invoice.issuerule.delete
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssueruleDeleteRequest import (
- AlipayEbppInvoiceIssueruleDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleDeleteModel import (
- AlipayEbppInvoiceIssueruleDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssueruleDeleteResponse import (
- AlipayEbppInvoiceIssueruleDeleteResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-delete)")
- model = AlipayEbppInvoiceIssueruleDeleteModel()
- model.target_type = "INSTITUTION"
- model.target_id = institution_id
- model.issue_rule_id_list = issue_rule_id_list
- model.enterprise_id = enterprise_id
- request = AlipayEbppInvoiceIssueruleDeleteRequest()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="删除发放规则失败: 无响应")
- result = AlipayEbppInvoiceIssueruleDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"删除发放规则失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"删除发放规则失败: {result.msg}")
- return {"result": True}
|