import asyncio from datetime import datetime from decimal import Decimal from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.plugin.module_payment.expense.institution.schema import InstitutionListOutSchema, InstitutionCreateSchema from .crud import InstitutionCRUD from .enums import InstitutionStatusEnum from alipay.aop.api.request.AlipayEbppInvoiceInstitutionCreateRequest import ( AlipayEbppInvoiceInstitutionCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionCreateModel import ( AlipayEbppInvoiceInstitutionCreateModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionCreateResponse import ( AlipayEbppInvoiceInstitutionCreateResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionPageinfoQueryRequest import ( AlipayEbppInvoiceInstitutionPageinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionPageinfoQueryModel import ( AlipayEbppInvoiceInstitutionPageinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionPageinfoQueryResponse import ( AlipayEbppInvoiceInstitutionPageinfoQueryResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDetailinfoQueryRequest import ( AlipayEbppInvoiceInstitutionDetailinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDetailinfoQueryModel import ( AlipayEbppInvoiceInstitutionDetailinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDetailinfoQueryResponse import ( AlipayEbppInvoiceInstitutionDetailinfoQueryResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import ( AlipayEbppInvoiceInstitutionDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import ( AlipayEbppInvoiceInstitutionDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionDeleteResponse import ( AlipayEbppInvoiceInstitutionDeleteResponse, ) from alipay.aop.api.request.AlipayEbppInvoiceInstitutionModifyRequest import ( AlipayEbppInvoiceInstitutionModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionModifyModel import ( AlipayEbppInvoiceInstitutionModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionModifyResponse import ( AlipayEbppInvoiceInstitutionModifyResponse, ) class InstitutionService: """费控制度服务层""" @classmethod def _execute_alipay(cls, request): """同步执行支付宝调用(通过线程池避免阻塞事件循环)""" client = AlipayClient.get_client() return client.execute(request) @classmethod async def create_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionCreateModel ) -> AlipayEbppInvoiceInstitutionCreateResponse: """ 创建费控制度(仅调 institution.create,不包含串联流程) 调用: alipay.ebpp.invoice.institution.create """ if data.enterprise_id is None: raise CustomException(msg="创建费控制度失败: 企业ID不能为空") data.currency = 'CNY' request = AlipayEbppInvoiceInstitutionCreateRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="创建费控制度失败: 无响应") result = AlipayEbppInvoiceInstitutionCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建费控制度失败: {result.msg}") return result @classmethod async def create_institution_full_flow( cls, auth: AuthSchema, institution_model: AlipayEbppInvoiceInstitutionCreateModel, enterprise_id: str, scope_data: dict | None = None, issuerule_data: dict | None = None, raw_data: dict | None = None, ) -> dict: """ 创建费控制度(完整串联流程) 流程: 1. institution.create → 获取 institution_id 2. scope.modify ← 如有适用成员数据(scope_data) 3. issuerule.create ← 如为"按固定周期发放"(issuerule_data) 4. 保存到本地DB(制度 + 使用规则 + 发放规则) """ # 第1步:创建制度 institution_result = await cls.create_institution_service(auth=auth, data=institution_model) institution_id = institution_result.institution_id try: # 第2步:设置适用成员(如有) scope_modified = False if scope_data and scope_data.get("adapter_type") and scope_data.get("adapter_type") != "NONE": await InstitutionScopeService.scope_modify_service( auth=auth, institution_id=institution_id, data={ "enterprise_id": enterprise_id, "adapter_type": scope_data["adapter_type"], "owner_type": scope_data.get("owner_type"), "add_owner_id_list": scope_data.get("add_owner_id_list"), }, ) scope_modified = True log.info(f"成员设置成功: institution_id={institution_id}") # 第3步:创建自动发放规则(如为"按固定周期发放") issue_rule_id = None if issuerule_data: issuerule_result = await IssueruleService.create_issuerule_service( auth=auth, institution_id=institution_id, enterprise_id=enterprise_id, quota_type=issuerule_data.get("quota_type", "CAP"), issue_type=issuerule_data.get("issue_type", "ISSUE_MONTH"), issue_amount_value=issuerule_data.get("issue_amount_value", "0"), outer_source_id=issuerule_data.get("outer_source_id"), issue_rule_name=issuerule_data.get("issue_rule_name"), effective_period=issuerule_data.get("effective_period"), invalid_mode=issuerule_data.get("invalid_mode", 0), share_mode=issuerule_data.get("share_mode", 0), ) issue_rule_id = issuerule_result.get("issue_rule_id") log.info(f"发放规则创建成功: institution_id={institution_id}, issue_rule_id={issue_rule_id}") except Exception as e: # 子步骤失败:删除已创建的支付宝制度(补偿事务) log.error(f"创建串联流程失败: {e},开始回滚 institution_id={institution_id}") try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionDeleteRequest import ( AlipayEbppInvoiceInstitutionDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionDeleteModel import ( AlipayEbppInvoiceInstitutionDeleteModel, ) rollback_model = AlipayEbppInvoiceInstitutionDeleteModel() rollback_model.institution_id = institution_id rollback_model.enterprise_id = enterprise_id req = AlipayEbppInvoiceInstitutionDeleteRequest() req.biz_model = rollback_model await asyncio.to_thread(cls._execute_alipay, req) log.info(f"回滚成功: 已删除 institution_id={institution_id}") except Exception as rollback_err: log.error(f"回滚失败: {rollback_err}") raise # 第4步:保存到本地DB create_data = InstitutionCreateSchema( enterprise_id=enterprise_id, institution_id=institution_id, institution_name=getattr(institution_model, 'institution_name', None), institution_desc=getattr(institution_model, 'institution_desc', None), scene_type=getattr(institution_model, 'scene_type', None), expense_type=getattr(institution_model, 'expense_type', None), expense_sub_type=getattr(institution_model, 'expense_sub_type', None), status=InstitutionStatusEnum.INSTITUTION_CREATE.value, effective=getattr(institution_model, 'effective', None), effective_start_date=getattr(institution_model, 'effective_start_date', None), effective_end_date=getattr(institution_model, 'effective_end_date', None), consult_mode=getattr(institution_model, 'consult_mode', None), multi_employee_share_mode=getattr(institution_model, 'multi_employee_share_mode', None), currency=getattr(institution_model, 'currency', None), grant_mode=(raw_data or {}).get("grant_mode"), period_type=(raw_data or {}).get("period_type"), amount=(raw_data or {}).get("amount"), single_limit=(raw_data or {}).get("single_limit"), effective_time_type=(raw_data or {}).get("effective_time_type"), applicable_scope=(raw_data or {}).get("applicable_scope"), ) create_data_dict = create_data.model_dump(exclude_unset=True) crud = InstitutionCRUD(auth) await crud.create(create_data_dict) # 第5步:保存使用规则到本地 if raw_data and raw_data.get("standard_info_list") and hasattr(institution_result, 'standard_id_info_list') and institution_result.standard_id_info_list: from app.plugin.module_payment.expense.rule.crud import RuleCRUD from app.plugin.module_payment.expense.rule.service import RuleService standard_id_map = {} for info in institution_result.standard_id_info_list: if hasattr(info, 'outer_source_id') and hasattr(info, 'standard_id'): standard_id_map[info.outer_source_id] = info.standard_id for idx, std in enumerate(raw_data["standard_info_list"]): condition_list = std.get("standard_condition_info_list", []) single_limit_val = None for cond in (condition_list or []): if cond.get("rule_factor") == "QUOTA_TOTAL": try: single_limit_val = float(cond.get("rule_value", 0)) except (ValueError, TypeError): pass std_data = { "out_biz_no": std.get("outer_source_id", f"std_{institution_id}_{idx}"), "institution_id": institution_id, "rule_id": standard_id_map.get(std.get("outer_source_id", "")), "standard_name": std.get("standard_name"), "standard_desc": std.get("standard_desc"), "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"), "enterprise_id": enterprise_id, "tenant_id": auth.user.tenant_id if auth.user else 1, "condition_info": condition_list, "single_limit": single_limit_val, } try: from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from sqlalchemy import insert stmt = insert(ExpenseRuleModel).values(**std_data) await auth.db.execute(stmt) await auth.db.flush() except Exception as e: log.warning(f"保存使用规则到本地失败: {e}") # 第6步:按适用范围创建员工额度记录 if scope_data and scope_data.get("adapter_type") and scope_data.get("adapter_type") != "NONE": try: await cls._create_institution_quotas( auth=auth, institution_id=institution_id, enterprise_id=enterprise_id, scope_data=scope_data, raw_data=raw_data, issue_rule_id=issue_rule_id, ) except Exception as e: log.warning(f"创建员工额度记录失败(不影响支付宝侧): {e}") return { "institution_id": institution_id, "scope_modified": scope_modified, "issue_rule_id": issue_rule_id, } @classmethod async def _create_institution_quotas( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, scope_data: dict, raw_data: dict | None = None, issue_rule_id: str | None = None, ): """按适用范围创建员工额度记录 规则: - 定额发放 + 制度在有效期内:total=定额值, available=定额值, ACTIVE - 定额发放 + 制度未生效/已过期:total=0, available=0, PENDING - 手工发放:total=0, available=0, PENDING """ from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from sqlalchemy import insert, select grant_mode = (raw_data or {}).get("grant_mode", "manual") amount_val = float((raw_data or {}).get("amount", 0) or 0) tenant_id = auth.user.tenant_id if auth.user else 1 # 判断制度是否在有效期内 now = datetime.now() is_in_period = True try: start_str = (raw_data or {}).get("effective_start_date") end_str = (raw_data or {}).get("effective_end_date") if start_str: start_dt = datetime.fromisoformat(str(start_str).replace('Z', '').replace('T', ' ')[:19]) if now < start_dt: is_in_period = False if end_str and is_in_period: end_dt = datetime.fromisoformat(str(end_str).replace('Z', '').replace('T', ' ')[:19]) if now > end_dt: is_in_period = False except Exception: is_in_period = True # 定额+有效期内 → ACTIVE,否则 → PENDING is_active = (grant_mode == "period") and amount_val > 0 and is_in_period # 收集员工ID列表 employee_ids: list[str] = [] adapter_type = scope_data.get("adapter_type", "") add_ids = scope_data.get("add_owner_id_list") or [] if adapter_type == "EMPLOYEE_SELECT": # 按员工选择 → 直接使用传入的员工ID employee_ids = [str(i) for i in add_ids if i] elif adapter_type == "EMPLOYEE_ALL": # 全体员工 → 仅已签约员工 from app.plugin.module_payment.employee.model import EmployeeModel emp_stmt = select(EmployeeModel).where( EmployeeModel.enterprise_id == enterprise_id, EmployeeModel.status == "ACTIVATED", ) emp_result = await auth.db.execute(emp_stmt) employee_ids = [emp.employee_id for emp in emp_result.scalars().all() if emp.employee_id] elif adapter_type == "EMPLOYEE_DEPARTMENT": # 按部门 → 仅已签约员工 for dept_id in add_ids: dept_id_str = str(dept_id) from app.plugin.module_payment.employee.model import EmployeeModel emp_stmt = select(EmployeeModel).where( EmployeeModel.enterprise_id == enterprise_id, EmployeeModel.status == "ACTIVATED", ) emp_result = await auth.db.execute(emp_stmt) for emp in emp_result.scalars().all(): if emp.department_ids and dept_id_str in emp.department_ids: employee_ids.append(emp.employee_id) # 去重 employee_ids = list(set(employee_ids)) if not employee_ids: log.info(f"无员工需要创建额度记录: institution_id={institution_id}") return now = datetime.now() if is_active: # 定额+有效期内:直接赋值 total = Decimal(str(amount_val)) available = total status = QuotaStatusEnum.QUOTA_ACTIVE.value else: # 手工发放 或 定额但未到有效期:待发放 total = Decimal("0") available = Decimal("0") status = QuotaStatusEnum.QUOTA_PENDING.value for emp_id in employee_ids: # 检查是否已有记录,避免重复 check = select(QuotaModel).where( QuotaModel.employee_id == emp_id, QuotaModel.institution_id == institution_id, ) existing = await auth.db.execute(check) if existing.scalar_one_or_none(): continue stmt = insert(QuotaModel).values( employee_id=emp_id, institution_id=institution_id, quota_type="CAP" if grant_mode == "period" else None, target_type="INSTITUTION", target_id=institution_id, quota_id=issue_rule_id, out_biz_no=f"inst_{institution_id}_{emp_id}", total_amount=total, available_amount=available, status=status, enterprise_id=enterprise_id, tenant_id=tenant_id, ) await auth.db.execute(stmt) await auth.db.flush() log.info( f"创建员工额度记录完成: institution_id={institution_id}, " f"count={len(employee_ids)}, mode={'period' if (grant_mode == 'period') and amount_val > 0 else 'manual'}, " f"status={status}, amount={float(total)}, in_period={is_in_period}" ) @classmethod async def _sync_modify_quotas_by_scope( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, scope_info: dict, raw_data: dict | None = None, ): """修改制度时同步员工额度记录 新增员工:按发放模式+有效期创建额度记录 删除员工:删除对应额度记录 部门模式(EMPLOYEE_DEPARTMENT):add/delete_ids 是部门ID, 需要先展开为员工ID列表再操作额度。 """ from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from sqlalchemy import insert, delete as sa_delete, select grant_mode = (raw_data or {}).get("grant_mode", "manual") amount_val = float((raw_data or {}).get("amount", 0) or 0) tenant_id = auth.user.tenant_id if auth.user else 1 adapter_type = scope_info.get("adapter_type", "") # 判断制度是否在有效期内 now = datetime.now() is_in_period = True try: start_str = (raw_data or {}).get("effective_start_date") end_str = (raw_data or {}).get("effective_end_date") if start_str: start_dt = datetime.fromisoformat(str(start_str).replace('Z', '').replace('T', ' ')[:19]) if now < start_dt: is_in_period = False if end_str and is_in_period: end_dt = datetime.fromisoformat(str(end_str).replace('Z', '').replace('T', ' ')[:19]) if now > end_dt: is_in_period = False except Exception: is_in_period = True is_active = (grant_mode == "period") and amount_val > 0 and is_in_period # 如为部门模式,将部门ID展开为员工ID列表 if adapter_type in ("EMPLOYEE_DEPARTMENT", "DEPARTMENT_SELECT"): from app.plugin.module_payment.employee.model import EmployeeModel emp_all = await auth.db.execute( select(EmployeeModel).where( EmployeeModel.enterprise_id == enterprise_id, EmployeeModel.status == "ACTIVATED", ) ) emp_by_dept: dict[str, list[str]] = {} for emp in emp_all.scalars().all(): if emp.employee_id and emp.department_ids: for did in emp.department_ids: emp_by_dept.setdefault(str(did), []).append(emp.employee_id) raw_delete = scope_info.get("delete_owner_id_list") or [] raw_add = scope_info.get("add_owner_id_list") or [] delete_ids = [] for did in raw_delete: delete_ids.extend(emp_by_dept.get(str(did), [])) add_ids = [] for did in raw_add: add_ids.extend(emp_by_dept.get(str(did), [])) else: delete_ids = scope_info.get("delete_owner_id_list") or [] add_ids = scope_info.get("add_owner_id_list") or [] # 删除被移除的员工额度 if delete_ids: del_stmt = sa_delete(QuotaModel).where( QuotaModel.institution_id == institution_id, QuotaModel.employee_id.in_(delete_ids), ) await auth.db.execute(del_stmt) log.info(f"删除已移除员工额度: count={len(delete_ids)}") # 新增员工的额度 if add_ids: total = Decimal(str(amount_val)) if is_active else Decimal("0") available = total status = QuotaStatusEnum.QUOTA_ACTIVE.value if is_active else QuotaStatusEnum.QUOTA_PENDING.value created = 0 for emp_id in set(add_ids): emp_id_str = str(emp_id) check = select(QuotaModel).where( QuotaModel.employee_id == emp_id_str, QuotaModel.institution_id == institution_id, ) existing = await auth.db.execute(check) if existing.scalar_one_or_none(): continue stmt = insert(QuotaModel).values( employee_id=emp_id_str, institution_id=institution_id, out_biz_no=f"inst_{institution_id}_{emp_id_str}", total_amount=total, available_amount=available, status=status, enterprise_id=enterprise_id, tenant_id=tenant_id, ) await auth.db.execute(stmt) created += 1 if created: await auth.db.flush() log.info(f"新增员工额度: count={created}, is_active={is_active}, status={status}") @classmethod async def pageinfo_query_service( cls, auth: AuthSchema, enterprise_id: str, page_no: int = 1, page_size: int = 20, institution_name: str | None = None, ) -> dict: """ 从支付宝查询费控制度列表 调用: alipay.ebpp.invoice.institution.pageinfo.query 失败时降级到本地DB """ try: model = AlipayEbppInvoiceInstitutionPageinfoQueryModel() model.enterprise_id = enterprise_id model.page_num = page_no model.page_size = page_size if institution_name: model.institution_name = institution_name req = AlipayEbppInvoiceInstitutionPageinfoQueryRequest() req.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, req) if response: result = AlipayEbppInvoiceInstitutionPageinfoQueryResponse() result.parse_response_content(response) if result.is_success(): return { "page_no": getattr(result, 'page_num', page_no) or page_no, "page_size": getattr(result, 'page_size', page_size) or page_size, "total": getattr(result, 'total_page_count', 0) or 0, "list": getattr(result, 'institution_list', []) or [], } log.warning("支付宝 pageinfo.query 失败,降级到本地DB") except Exception as e: log.warning(f"支付宝 pageinfo.query 异常: {e},降级到本地DB") # 降级:查本地DB crud = InstitutionCRUD(auth) search = {"enterprise_id": enterprise_id} if institution_name: search["institution_name"] = institution_name offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search, out_schema=InstitutionListOutSchema, ) @classmethod async def detailinfo_query_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, ) -> dict | None: """ 从支付宝查询费控制度详情,并补充本地规则和额度数据 调用: alipay.ebpp.invoice.institution.detailinfo.query 失败时降级到本地DB """ result_dict = None if not enterprise_id: from .crud import InstitutionCRUD as _InstitutionCRUD _inst_crud = _InstitutionCRUD(auth) _local_inst = await _inst_crud.get(institution_id=institution_id) if _local_inst and _local_inst.enterprise_id: enterprise_id = _local_inst.enterprise_id try: model = AlipayEbppInvoiceInstitutionDetailinfoQueryModel() model.institution_id = institution_id model.enterprise_id = enterprise_id req = AlipayEbppInvoiceInstitutionDetailinfoQueryRequest() req.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, req) if response: result = AlipayEbppInvoiceInstitutionDetailinfoQueryResponse() result.parse_response_content(response) if result.is_success(): result_dict = {k: getattr(result, k) for k in ( "adapter_type", "consult_mode", "currency", "effective", "effective_end_date", "effective_start_date", "expense_type", "institution_desc", "institution_id", "institution_name", "issue_rule_info_list", "multi_employee_share_mode", "outer_source_id", "owner_id_list", "owner_open_id_list", "owner_type", "scene_type", "standard_info_detail_list", "standard_info_list", ) if getattr(result, k, None) is not None} if not result_dict: log.warning("支付宝 detailinfo.query 失败,降级到本地DB") except Exception as e: log.warning(f"支付宝 detailinfo.query 异常: {e},降级到本地DB") # 降级:查本地DB if not result_dict: crud = InstitutionCRUD(auth) obj = await crud.get(institution_id=institution_id, enterprise_id=enterprise_id) if obj: result_dict = InstitutionListOutSchema.model_validate(obj).model_dump() if not result_dict: return None # 合并本地DB的自定义字段(支付宝不包含的字段) # 无条件覆盖,保证前端有数据 try: from .crud import InstitutionCRUD as _crud_cls _crud = _crud_cls(auth) local_obj = await _crud.get(institution_id=institution_id, enterprise_id=enterprise_id) if local_obj: local_dict = InstitutionListOutSchema.model_validate(local_obj).model_dump() for field in ("applicable_scope", "grant_mode", "period_type", "amount", "single_limit", "effective_time_type", "enterprise_id", "status", "created_time", "updated_time", "consult_mode", "effective_start_date", "effective_end_date", "institution_name", "institution_desc", "expense_type", "scene_type", "currency"): val = local_dict.get(field) if val is not None: result_dict[field] = val except Exception as e: log.warning(f"合并本地DB字段失败: {e}") # 从收入额度记录(quota records)获取员工ID列表 if not result_dict.get("employee_ids"): from sqlalchemy import select as _select from app.plugin.module_payment.expense.quota.model import QuotaModel quota_stmt = _select(QuotaModel).where( QuotaModel.institution_id == institution_id, QuotaModel.employee_id.isnot(None), QuotaModel.employee_id != "", ) quota_result = await auth.db.execute(quota_stmt) emp_ids = list(set( q.employee_id for q in quota_result.scalars().all() if q.employee_id )) if emp_ids: result_dict["employee_ids"] = emp_ids result_dict["scope_owner_id_list"] = emp_ids # 从支付宝 owner_id_list 映射(若以上均未取到) owner_ids = result_dict.get("owner_id_list") if not result_dict.get("employee_ids") and owner_ids and isinstance(owner_ids, list): result_dict["employee_ids"] = owner_ids result_dict["scope_owner_id_list"] = owner_ids # adapter_type → applicable_scope 兜底 adapter_type = result_dict.get("adapter_type") if adapter_type and not result_dict.get("applicable_scope"): scope_map = {"EMPLOYEE_SELECT": "employee", "EMPLOYEE_DEPARTMENT": "department", "DEPARTMENT_SELECT": "department", "EMPLOYEE_ALL": "all"} result_dict["applicable_scope"] = scope_map.get(adapter_type, result_dict.get("applicable_scope", "none")) # 部门模式下,从 owner_id_list 提取 department_id if result_dict.get("applicable_scope") == "department" and not result_dict.get("department_id"): owner_ids = result_dict.get("owner_id_list") or result_dict.get("scope_owner_id_list") if owner_ids and isinstance(owner_ids, list) and len(owner_ids) > 0: result_dict["department_id"] = str(owner_ids[0]) # 补充本地规则和额度 from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import select # 查使用规则 rule_stmt = select(ExpenseRuleModel).where(ExpenseRuleModel.institution_id == institution_id) rule_result = await auth.db.execute(rule_stmt) rules = rule_result.scalars().all() if rules: rule_list = [] for r in rules: rule_item = { "rule_id": r.rule_id, "standard_name": r.standard_name, "standard_desc": r.standard_desc, } if hasattr(r, 'single_limit') and r.single_limit: rule_item["single_limit"] = float(r.single_limit) if hasattr(r, 'condition_info') and r.condition_info: rule_item["condition_info"] = r.condition_info for cond in r.condition_info: factor = cond.get("rule_factor") try: value = float(cond.get("rule_value", 0)) except (ValueError, TypeError): continue if factor == "QUOTA_DAY": rule_item["max_day_amount"] = value elif factor == "QUOTA_MONTH": rule_item["max_month_amount"] = value elif factor == "QUOTA_QUARTER": rule_item["max_quarter_amount"] = value elif factor == "QUOTA_YEAR": rule_item["max_year_amount"] = value rule_list.append(rule_item) result_dict["rule_list"] = rule_list # 查额度 quota_stmt = select(QuotaModel).where(QuotaModel.institution_id == institution_id).limit(1000) quota_result = await auth.db.execute(quota_stmt) quotas = quota_result.scalars().all() if quotas: result_dict["quota_list"] = [ { "quota_id": q.quota_id, "employee_id": q.employee_id or "", "out_biz_no": q.out_biz_no, "total_amount": float(q.total_amount) if q.total_amount else 0, "available_amount": float(q.available_amount) if q.available_amount else 0, "status": q.status, } for q in quotas ] return result_dict @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询费控制度列表 优先调支付宝,失败降级到本地DB """ enterprise_id = (search or {}).get("enterprise_id", "") institution_name = (search or {}).get("name") or (search or {}).get("institution_name") if enterprise_id: return await cls.pageinfo_query_service( auth=auth, enterprise_id=enterprise_id, page_no=page_no, page_size=page_size, institution_name=institution_name, ) # 无 enterprise_id 时直接查本地 crud = InstitutionCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=InstitutionListOutSchema, ) @classmethod async def delete_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionDeleteModel ) -> dict: """ 删除费控制度 调用: alipay.ebpp.invoice.institution.delete 支付宝侧已删时忽略错误,始终清理本地关联表 """ institution_id = getattr(data, 'institution_id', None) # 调用支付宝删除(失败时仅告警,不影响本地清理) try: request = AlipayEbppInvoiceInstitutionDeleteRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if response: result = AlipayEbppInvoiceInstitutionDeleteResponse() result.parse_response_content(response) if result.is_success(): log.info(f"支付宝删除成功: institution_id={institution_id}") else: log.warning(f"支付宝删除失败(可能已删): {result.code} - {result.msg}") else: log.warning("支付宝删除无响应,继续清理本地") except Exception as e: log.warning(f"支付宝删除异常(忽略): {e}") # 清理本地关联表 if institution_id: try: from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.institution.model import ExpenseInstitutionModel from sqlalchemy import delete as sa_delete # 删规则 await auth.db.execute(sa_delete(ExpenseRuleModel).where(ExpenseRuleModel.institution_id == institution_id)) # 删额度 await auth.db.execute(sa_delete(QuotaModel).where(QuotaModel.institution_id == institution_id)) # 删制度 await auth.db.execute(sa_delete(ExpenseInstitutionModel).where(ExpenseInstitutionModel.institution_id == institution_id)) await auth.db.flush() log.info(f"本地关联数据已清理: institution_id={institution_id}") except Exception as e: log.warning(f"本地清理失败: {e}") return {"institution_id": institution_id, "deleted": True} @classmethod async def modify_institution_service( cls, auth: AuthSchema, data: AlipayEbppInvoiceInstitutionModifyModel, raw_data: dict | None = None, scope_info: dict | None = None, ) -> dict: """ 编辑费控制度 调用: alipay.ebpp.invoice.institution.modify 适用范围修改(scope_info)需单独调 scope.modify,与基础信息拆分两次请求 支付宝成功后同步更新本地DB: - 制度基本信息 - 适用员工范围(scope) - 使用规则(standard_info_list → pay_expense_rule) - 额度(issuerule → pay_expense_quota) """ if data.institution_id is None: raise CustomException(msg="编辑费控制度失败: 制度ID不能为空") institution_id = data.institution_id enterprise_id = getattr(data, 'enterprise_id', None) or (raw_data or {}).get("enterprise_id", "") raw_data = raw_data or {} # 第1步:修改支付宝制度基础信息(不含 scope) request = AlipayEbppInvoiceInstitutionModifyRequest() request.biz_model = data response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="编辑费控制度失败: 无响应") result = AlipayEbppInvoiceInstitutionModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"编辑费控制度失败: {result.msg}") # 第1.5步:单独调用 scope.modify(不与基础修改在同一请求中) if scope_info: # 全体员工模式:传去支付宝时剥离员工ID scope_data_for_alipay = scope_info.copy() if scope_info else {} if scope_info.get("adapter_type") == "EMPLOYEE_ALL": scope_data_for_alipay.pop("add_owner_id_list", None) scope_data_for_alipay.pop("delete_owner_id_list", None) try: await InstitutionScopeService.scope_modify_service( auth=auth, institution_id=institution_id, data=scope_data_for_alipay ) log.info(f"适用范围已单独同步: adapter_type={scope_info.get('adapter_type')}") except Exception as e: log.warning(f"适用范围同步失败(不影响基础修改,本地DB将更新为最新值): {e}") # scope 变动后同步员工额度记录 try: await cls._sync_modify_quotas_by_scope( auth=auth, institution_id=institution_id, enterprise_id=enterprise_id, scope_info=scope_info, raw_data=raw_data, ) except Exception as e: log.warning(f"同步员工额度记录失败(不影响主体操作): {e}") applicable_scope = raw_data.get("applicable_scope", "") # 第2步:同步更新本地数据库(scope 已在 Alipay modify 请求中通过 modify_scope_info 处理) try: crud = InstitutionCRUD(auth) update_data = {} if hasattr(data, 'institution_name') and data.institution_name: update_data['institution_name'] = data.institution_name if hasattr(data, 'institution_desc') and data.institution_desc: update_data['institution_desc'] = data.institution_desc if hasattr(data, 'effective') and data.effective is not None: update_data['effective'] = data.effective update_data['status'] = ( InstitutionStatusEnum.INSTITUTION_EFFECTIVE.value if data.effective == "1" else InstitutionStatusEnum.INSTITUTION_INVALID.value ) if hasattr(data, 'effective_start_date') and data.effective_start_date: val = data.effective_start_date update_data['effective_start_date'] = datetime.fromisoformat(val.replace(' ', 'T')) if isinstance(val, str) else val if hasattr(data, 'effective_end_date') and data.effective_end_date: val = data.effective_end_date update_data['effective_end_date'] = datetime.fromisoformat(val.replace(' ', 'T')) if isinstance(val, str) else val if applicable_scope: update_data['applicable_scope'] = applicable_scope # 同步额外配置字段 for field in ("grant_mode", "period_type", "amount", "single_limit", "effective_time_type", "expense_type"): val = raw_data.get(field) if val is not None: update_data[field] = val if update_data: await crud.update_by_institution_id(institution_id, update_data) log.info(f"已更新本地制度: institution_id={institution_id}") # 制度生效/失效时同步更新额度状态 if hasattr(data, 'effective') and data.effective is not None: from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from sqlalchemy import update as sa_quota_update new_quota_status = ( QuotaStatusEnum.QUOTA_ACTIVE.value if data.effective == "1" else QuotaStatusEnum.QUOTA_FROZEN.value ) quota_upd = sa_quota_update(QuotaModel).where( QuotaModel.institution_id == institution_id ).values(status=new_quota_status) await auth.db.execute(quota_upd) await auth.db.flush() log.info(f"已同步更新额度状态: institution_id={institution_id}, effective={data.effective}, quota_status={new_quota_status}") # 同步标准规则(modify_standard_detail_info) std_detail = (raw_data or {}).get("modify_standard_detail_info") or {} if std_detail: from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from sqlalchemy import delete as sa_delete # 删除规则 delete_ids = std_detail.get("delete_standard_id_list", []) if delete_ids: d_stmt = sa_delete(ExpenseRuleModel).where( ExpenseRuleModel.rule_id.in_(delete_ids) ) await auth.db.execute(d_stmt) # 新增规则 add_list = std_detail.get("add_standard_list", []) from sqlalchemy import insert as sa_insert for std in add_list: ins_data = { "out_biz_no": std.get("outer_source_id", f"std_{institution_id}"), "institution_id": institution_id, "rule_id": std.get("standard_id"), "standard_name": std.get("standard_name"), "standard_desc": std.get("standard_desc"), "expense_type_sub_category": std.get("expense_type_sub_category", "DEFAULT"), "enterprise_id": raw_data.get("enterprise_id", ""), "tenant_id": auth.user.tenant_id if auth.user else 1, } stmt = sa_insert(ExpenseRuleModel).values(**ins_data) await auth.db.execute(stmt) # 修改规则 modify_list = std_detail.get("modify_standard_list", []) for std in modify_list: std_id = std.get("standard_id", "").strip('"') update_std = {} if std.get("standard_name"): update_std["standard_name"] = std["standard_name"] if std.get("standard_desc"): update_std["standard_desc"] = std["standard_desc"] if update_std and std_id: from sqlalchemy import update as sa_update u_stmt = sa_update(ExpenseRuleModel).where( ExpenseRuleModel.rule_id == std_id ).values(**update_std) await auth.db.execute(u_stmt) await auth.db.flush() log.info(f"已同步使用规则: institution_id={institution_id}") # 同步发放规则(modify_issue_rule_detail_info)已去除 # 发放规则对应的额度数据由外部消费同步时通过 # alipay.ebpp.invoice.expensecomsue.outsource.notify 写入真实数据 except Exception as e: log.warning(f"本地同步失败(不影响支付宝侧): {e}") return result class InstitutionScopeService: """费控制度成员范围服务层""" @classmethod def _execute_alipay(cls, request): """同步执行支付宝调用""" client = AlipayClient.get_client() return client.execute(request) @classmethod async def scope_modify_service( cls, auth: AuthSchema, institution_id: str, data: dict, ) -> dict: """ 设置/修改制度成员范围 调用: alipay.ebpp.invoice.institution.scope.modify """ try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopeModifyRequest import ( AlipayEbppInvoiceInstitutionScopeModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopeModifyModel import ( AlipayEbppInvoiceInstitutionScopeModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopeModifyResponse import ( AlipayEbppInvoiceInstitutionScopeModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scope-modify)") model = AlipayEbppInvoiceInstitutionScopeModifyModel() model.institution_id = institution_id model.enterprise_id = data.get("enterprise_id", "") model.adapter_type = data.get("adapter_type", "EMPLOYEE_ALL") if data.get("owner_type"): model.owner_type = data["owner_type"] if data.get("add_owner_id_list"): model.add_owner_id_list = data["add_owner_id_list"] if data.get("delete_owner_id_list"): model.delete_owner_id_list = data["delete_owner_id_list"] request = AlipayEbppInvoiceInstitutionScopeModifyRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="设置制度成员失败: 无响应") result = AlipayEbppInvoiceInstitutionScopeModifyResponse() result.parse_response_content(response) if not result.is_success(): sub_msg = getattr(result, 'sub_msg', '') or '' err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "") log.error(f"设置制度成员失败: {result.code} - {err_detail}") raise CustomException(msg=f"设置制度成员失败: {sub_msg or result.msg}") return {"result": True} @classmethod async def scopepageinfo_query_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str | None = None, page_num: int = 1, page_size: int = 20, owner_type: str | None = None, ) -> dict: """ 查询制度成员范围 调用: alipay.ebpp.invoice.institution.scopepageinfo.query """ try: from alipay.aop.api.request.AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceInstitutionScopepageinfoQueryModel import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse import ( AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-institution-scopepageinfo-query)") model = AlipayEbppInvoiceInstitutionScopepageinfoQueryModel() model.institution_id = institution_id model.page_num = page_num model.page_size = page_size if not enterprise_id: # 从本地 DB 查找 enterprise_id from .crud import InstitutionCRUD inst_crud = InstitutionCRUD(auth) local_inst = await inst_crud.get(institution_id=institution_id) if local_inst and local_inst.enterprise_id: enterprise_id = local_inst.enterprise_id if enterprise_id: model.enterprise_id = enterprise_id if owner_type: model.owner_type = owner_type request = AlipayEbppInvoiceInstitutionScopepageinfoQueryRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="查询制度成员失败: 无响应") result = AlipayEbppInvoiceInstitutionScopepageinfoQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"查询制度成员失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询制度成员失败: {result.msg}") return { "page_num": getattr(result, 'page_num', page_num) or page_num, "page_size": getattr(result, 'page_size', page_size) or page_size, "total_page_count": getattr(result, 'total_page_count', 0) or 0, "adapter_type": getattr(result, 'adapter_type', None), "owner_id_list": getattr(result, 'owner_id_list', []) or [], "owner_open_id_list": getattr(result, 'owner_open_id_list', []) or [], "scope_info_list": [ { "adapter_type": getattr(result, 'adapter_type', None), "owner_id_list": getattr(result, 'owner_id_list', []) or [], "owner_open_id_list": getattr(result, 'owner_open_id_list', []) or [], } ] if getattr(result, 'adapter_type', None) else [], } class IssueruleService: """自动额度发放规则服务层""" ISSUE_TYPE_MAP = { "daily": "ISSUE_DAY", "weekly": "ISSUE_WEEK", "monthly": "ISSUE_MONTH", "quarterly": "ISSUE_QUARTER", "yearly": "ISSUE_YEAR", } @classmethod def _execute_alipay(cls, request): client = AlipayClient.get_client() return client.execute(request) @classmethod async def create_issuerule_service( cls, auth: AuthSchema, institution_id: str, enterprise_id: str, quota_type: str, issue_type: str, issue_amount_value: str, outer_source_id: str | None = None, issue_rule_name: str | None = None, effective_period: str | None = None, invalid_mode: int | None = None, share_mode: int | None = None, ) -> dict: """ 创建自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.create """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleCreateRequest import ( AlipayEbppInvoiceIssueruleCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleCreateModel import ( AlipayEbppInvoiceIssueruleCreateModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleCreateResponse import ( AlipayEbppInvoiceIssueruleCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-create)") # 参数约束校验 if quota_type == "CAP" and invalid_mode is not None and invalid_mode != 1: raise CustomException(msg="余额类型(CAP)的发放规则必须为可累计(invalid_mode=1)") if quota_type == "COUNT" and share_mode is not None and share_mode != 0: raise CustomException(msg="次卡类型(COUNT)的发放规则不可转赠(share_mode=0)") model = AlipayEbppInvoiceIssueruleCreateModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.quota_type = quota_type model.issue_type = issue_type model.issue_amount_value = issue_amount_value model.enterprise_id = enterprise_id if outer_source_id: model.outer_source_id = outer_source_id if issue_rule_name: model.issue_rule_name = issue_rule_name if effective_period: model.effective_period = effective_period if invalid_mode is not None: model.invalid_mode = invalid_mode if share_mode is not None: model.share_mode = share_mode request = AlipayEbppInvoiceIssueruleCreateRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="创建发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"创建发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建发放规则失败: {result.msg}") return { "issue_rule_id": getattr(result, 'issue_rule_id', None), } @classmethod async def modify_issuerule_service( cls, auth: AuthSchema, institution_id: str, issue_rule_id: str, enterprise_id: str, quota_type: str | None = None, issue_type: str | None = None, issue_amount_value: str | None = None, issue_rule_name: str | None = None, effective: str | None = None, effective_period: str | None = None, invalid_mode: int | None = None, share_mode: int | None = None, ) -> dict: """ 编辑自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.modify """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleModifyRequest import ( AlipayEbppInvoiceIssueruleModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleModifyModel import ( AlipayEbppInvoiceIssueruleModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleModifyResponse import ( AlipayEbppInvoiceIssueruleModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-modify)") model = AlipayEbppInvoiceIssueruleModifyModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.issue_rule_id = issue_rule_id model.action = "MODIFY_BASIC_INFO" model.enterprise_id = enterprise_id if issue_rule_name: model.issue_rule_name = issue_rule_name if quota_type: model.quota_type = quota_type if issue_type: model.issue_type = issue_type if issue_amount_value: model.issue_amount_value = issue_amount_value if effective is not None: model.effective = effective if effective_period: model.effective_period = effective_period if invalid_mode is not None: model.invalid_mode = invalid_mode if share_mode is not None: model.share_mode = share_mode request = AlipayEbppInvoiceIssueruleModifyRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="编辑发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"编辑发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"编辑发放规则失败: {result.msg}") return {"result": True} @classmethod async def delete_issuerule_service( cls, auth: AuthSchema, institution_id: str, issue_rule_id_list: list[str], enterprise_id: str, ) -> dict: """ 删除自动额度发放规则 调用: alipay.ebpp.invoice.issuerule.delete """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssueruleDeleteRequest import ( AlipayEbppInvoiceIssueruleDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssueruleDeleteModel import ( AlipayEbppInvoiceIssueruleDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssueruleDeleteResponse import ( AlipayEbppInvoiceIssueruleDeleteResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-issuerule-delete)") model = AlipayEbppInvoiceIssueruleDeleteModel() model.target_type = "INSTITUTION" model.target_id = institution_id model.issue_rule_id_list = issue_rule_id_list model.enterprise_id = enterprise_id request = AlipayEbppInvoiceIssueruleDeleteRequest() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="删除发放规则失败: 无响应") result = AlipayEbppInvoiceIssueruleDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"删除发放规则失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除发放规则失败: {result.msg}") return {"result": True}