import asyncio from decimal import Decimal from sqlalchemy import select, insert, update as sa_update from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id from .model import QuotaModel from .enums import QuotaStatusEnum from .outsource_schema import OutsourceNotifySchema, OutsourceNotifyOutSchema class OutsourceNotifyService: """外部消费额度同步服务 调用 alipay.ebpp.invoice.expensecomsue.outsource.notify 将外部消费同步给支付宝, 同时写入本地 pay_expense_quota 记录真实额度变动。 """ _alipay_request_cls = None _alipay_model_cls = None _alipay_response_cls = None @classmethod def _ensure_imports(cls): """延迟导入支付宝SDK""" if cls._alipay_request_cls is not None: return try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest import ( AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel import ( AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse import ( AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse, ) cls._alipay_request_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest cls._alipay_model_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel cls._alipay_response_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse except ImportError: raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-expensecomsue-outsource-notify)") @classmethod def _execute_alipay(cls, request): """同步执行支付宝调用""" client = AlipayClient.get_client() return client.execute(request) @classmethod def _build_alipay_model(cls, data: OutsourceNotifySchema) -> object: """构建Alipay SDK Model 金额转换规则: - schema.amount 单位为元(正数为消费/扣款,负数为退款) - Alipay API 单位为分(正整数),is_off_set 控制方向 """ cls._ensure_imports() model = cls._alipay_model_cls() model.enterprise_id = data.enterprise_id model.employee_id = data.employee_id model.standard_id = data.standard_id # 元→分,取绝对值(正值),is_off_set 控制方向 amount_cents = int(abs(data.amount) * 100) model.amount = str(amount_cents) # 方向:0=扣款 1=退款 model.is_off_set = 1 if data.amount < 0 else 0 model.out_source_id = data.out_source_id model.deal_time = data.deal_time if data.account_id: model.account_id = data.account_id if data.agreement_no: model.agreement_no = data.agreement_no if data.employee_id_type: model.employee_id_type = data.employee_id_type if data.employee_open_id: model.employee_open_id = data.employee_open_id if data.extend: model.extend = data.extend if data.platform: model.platform = data.platform if data.relate_no: model.relate_no = data.relate_no return model @classmethod async def notify_service( cls, auth: AuthSchema, data: OutsourceNotifySchema ) -> OutsourceNotifyOutSchema: """调用支付宝外部消费额度同步,并写入本地额度记录 Args: auth: 认证信息 data: 同步请求数据 Returns: 同步结果 """ out_source_id = data.out_source_id or str(get_snowflake_id()) # 第1步:调用支付宝 try: cls._ensure_imports() model = cls._build_alipay_model(data) request = cls._alipay_request_cls() request.biz_model = model response = await asyncio.to_thread(cls._execute_alipay, request) if not response: raise CustomException(msg="外部消费额度同步失败: 无响应") result = cls._alipay_response_cls() result.parse_response_content(response) if not result.is_success(): log.error(f"外部消费额度同步失败: {result.code} - {result.msg} ({result.sub_code}: {result.sub_msg})") # 不抛异常,返回失败信息 return OutsourceNotifyOutSchema( success=False, out_source_id=out_source_id, message=f"{result.msg}: {result.sub_msg}" if result.sub_msg else result.msg, ) log.info( f"外部消费额度同步成功: out_source_id={out_source_id}, " f"employee_id={data.employee_id}, amount={data.amount}" ) except CustomException: raise except Exception as e: log.error(f"外部消费额度同步异常: {e}") return OutsourceNotifyOutSchema( success=False, out_source_id=out_source_id, message=str(e), ) # 第2步:同步写入本地额度记录 try: await cls._sync_local_quota(auth, data, out_source_id) except Exception as e: log.warning(f"本地额度记录同步失败(不影响支付宝侧): {e}") return OutsourceNotifyOutSchema( success=True, out_source_id=out_source_id, message="success", ) @classmethod async def _sync_local_quota( cls, auth: AuthSchema, data: OutsourceNotifySchema, out_source_id: str ): """同步本地额度记录(pay_expense_quota) 根据支付宝外部消费同步的结果,更新或创建本地额度记录。 - 消费:减少可用额度 - 退款:增加可用额度 幂等:检查 out_biz_no 是否已存在,已存在则跳过。 """ # 幂等检查:out_source_id 已存在则直接跳过 existing_by_out = await auth.db.execute( select(QuotaModel).where(QuotaModel.out_biz_no == out_source_id) ) if existing_by_out.scalar_one_or_none(): log.info(f"外部消费额度同步已处理,跳过本地写入: out_source_id={out_source_id}") return institution_id = data.institution_id or "" # 查询该员工在该制度下是否已有额度记录 stmt = select(QuotaModel).where( QuotaModel.employee_id == data.employee_id, QuotaModel.institution_id == institution_id, ) result = await auth.db.execute(stmt) existing = result.scalar_one_or_none() amount_decimal = Decimal(str(abs(data.amount))) if existing: # 更新现有额度 if data.amount > 0: # 消费:减少可用额度 new_available = (existing.available_amount or Decimal("0")) - amount_decimal if new_available < 0: new_available = Decimal("0") else: # 退款:增加可用额度 new_available = (existing.available_amount or Decimal("0")) + amount_decimal new_status = QuotaStatusEnum.QUOTA_ACTIVE.value if new_available <= 0: new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value upd = ( sa_update(QuotaModel) .where(QuotaModel.id == existing.id) .values( available_amount=new_available, status=new_status, ) ) await auth.db.execute(upd) log.info( f"更新本地额度: employee_id={data.employee_id}, " f"institution_id={institution_id}, " f"available_amount={new_available}, status={new_status}" ) else: # 新建额度记录(仅记录可用额度,无额度ID) new_available = Decimal("0") if data.amount < 0: # 退款时创建正向记录 new_available = amount_decimal ins = insert(QuotaModel).values( employee_id=data.employee_id, institution_id=institution_id, out_biz_no=out_source_id, total_amount=new_available, available_amount=new_available, status=QuotaStatusEnum.QUOTA_ACTIVE.value, enterprise_id=data.enterprise_id, tenant_id=auth.user.tenant_id if auth.user else 1, ) await auth.db.execute(ins) log.info( f"新建本地额度: employee_id={data.employee_id}, " f"institution_id={institution_id}, " f"available_amount={new_available}" ) await auth.db.flush()