|
|
@@ -0,0 +1,246 @@
|
|
|
+import asyncio
|
|
|
+from decimal import Decimal
|
|
|
+from sqlalchemy import select, insert, update as sa_update
|
|
|
+
|
|
|
+from app.api.v1.module_system.auth.schema import AuthSchema
|
|
|
+from app.core.alipay import AlipayClient
|
|
|
+from app.core.exceptions import CustomException
|
|
|
+from app.core.logger import log
|
|
|
+from app.utils.snowflake import get_snowflake_id
|
|
|
+
|
|
|
+from .model import QuotaModel
|
|
|
+from .enums import QuotaStatusEnum
|
|
|
+from .outsource_schema import OutsourceNotifySchema, OutsourceNotifyOutSchema
|
|
|
+
|
|
|
+
|
|
|
+class OutsourceNotifyService:
|
|
|
+ """外部消费额度同步服务
|
|
|
+
|
|
|
+ 调用 alipay.ebpp.invoice.expensecomsue.outsource.notify 将外部消费同步给支付宝,
|
|
|
+ 同时写入本地 pay_expense_quota 记录真实额度变动。
|
|
|
+ """
|
|
|
+
|
|
|
+ _alipay_request_cls = None
|
|
|
+ _alipay_model_cls = None
|
|
|
+ _alipay_response_cls = None
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _ensure_imports(cls):
|
|
|
+ """延迟导入支付宝SDK"""
|
|
|
+ if cls._alipay_request_cls is not None:
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ from alipay.aop.api.request.AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest import (
|
|
|
+ AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest,
|
|
|
+ )
|
|
|
+ from alipay.aop.api.domain.AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel import (
|
|
|
+ AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel,
|
|
|
+ )
|
|
|
+ from alipay.aop.api.response.AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse import (
|
|
|
+ AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse,
|
|
|
+ )
|
|
|
+ cls._alipay_request_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest
|
|
|
+ cls._alipay_model_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel
|
|
|
+ cls._alipay_response_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse
|
|
|
+ except ImportError:
|
|
|
+ raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-expensecomsue-outsource-notify)")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _execute_alipay(cls, request):
|
|
|
+ """同步执行支付宝调用"""
|
|
|
+ client = AlipayClient.get_client()
|
|
|
+ return client.execute(request)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _build_alipay_model(cls, data: OutsourceNotifySchema) -> object:
|
|
|
+ """构建Alipay SDK Model
|
|
|
+
|
|
|
+ 金额转换规则:
|
|
|
+ - schema.amount 单位为元(正数为消费/扣款,负数为退款)
|
|
|
+ - Alipay API 单位为分(正整数),is_off_set 控制方向
|
|
|
+ """
|
|
|
+ cls._ensure_imports()
|
|
|
+ model = cls._alipay_model_cls()
|
|
|
+ model.enterprise_id = data.enterprise_id
|
|
|
+ model.employee_id = data.employee_id
|
|
|
+ model.standard_id = data.standard_id
|
|
|
+
|
|
|
+ # 元→分,取绝对值(正值),is_off_set 控制方向
|
|
|
+ amount_cents = int(abs(data.amount) * 100)
|
|
|
+ model.amount = str(amount_cents)
|
|
|
+
|
|
|
+ # 方向:0=扣款 1=退款
|
|
|
+ model.is_off_set = 1 if data.amount < 0 else 0
|
|
|
+
|
|
|
+ model.out_source_id = data.out_source_id
|
|
|
+ model.deal_time = data.deal_time
|
|
|
+
|
|
|
+ if data.account_id:
|
|
|
+ model.account_id = data.account_id
|
|
|
+ if data.agreement_no:
|
|
|
+ model.agreement_no = data.agreement_no
|
|
|
+ if data.employee_id_type:
|
|
|
+ model.employee_id_type = data.employee_id_type
|
|
|
+ if data.employee_open_id:
|
|
|
+ model.employee_open_id = data.employee_open_id
|
|
|
+ if data.extend:
|
|
|
+ model.extend = data.extend
|
|
|
+ if data.platform:
|
|
|
+ model.platform = data.platform
|
|
|
+ if data.relate_no:
|
|
|
+ model.relate_no = data.relate_no
|
|
|
+
|
|
|
+ return model
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ async def notify_service(
|
|
|
+ cls, auth: AuthSchema, data: OutsourceNotifySchema
|
|
|
+ ) -> OutsourceNotifyOutSchema:
|
|
|
+ """调用支付宝外部消费额度同步,并写入本地额度记录
|
|
|
+
|
|
|
+ Args:
|
|
|
+ auth: 认证信息
|
|
|
+ data: 同步请求数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 同步结果
|
|
|
+ """
|
|
|
+ out_source_id = data.out_source_id or str(get_snowflake_id())
|
|
|
+
|
|
|
+ # 第1步:调用支付宝
|
|
|
+ try:
|
|
|
+ cls._ensure_imports()
|
|
|
+ model = cls._build_alipay_model(data)
|
|
|
+
|
|
|
+ request = cls._alipay_request_cls()
|
|
|
+ request.biz_model = model
|
|
|
+
|
|
|
+ response = await asyncio.to_thread(cls._execute_alipay, request)
|
|
|
+
|
|
|
+ if not response:
|
|
|
+ raise CustomException(msg="外部消费额度同步失败: 无响应")
|
|
|
+
|
|
|
+ result = cls._alipay_response_cls()
|
|
|
+ result.parse_response_content(response)
|
|
|
+
|
|
|
+ if not result.is_success():
|
|
|
+ log.error(f"外部消费额度同步失败: {result.code} - {result.msg} ({result.sub_code}: {result.sub_msg})")
|
|
|
+ # 不抛异常,返回失败信息
|
|
|
+ return OutsourceNotifyOutSchema(
|
|
|
+ success=False,
|
|
|
+ out_source_id=out_source_id,
|
|
|
+ message=f"{result.msg}: {result.sub_msg}" if result.sub_msg else result.msg,
|
|
|
+ )
|
|
|
+
|
|
|
+ log.info(
|
|
|
+ f"外部消费额度同步成功: out_source_id={out_source_id}, "
|
|
|
+ f"employee_id={data.employee_id}, amount={data.amount}"
|
|
|
+ )
|
|
|
+ except CustomException:
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"外部消费额度同步异常: {e}")
|
|
|
+ return OutsourceNotifyOutSchema(
|
|
|
+ success=False,
|
|
|
+ out_source_id=out_source_id,
|
|
|
+ message=str(e),
|
|
|
+ )
|
|
|
+
|
|
|
+ # 第2步:同步写入本地额度记录
|
|
|
+ try:
|
|
|
+ await cls._sync_local_quota(auth, data, out_source_id)
|
|
|
+ except Exception as e:
|
|
|
+ log.warning(f"本地额度记录同步失败(不影响支付宝侧): {e}")
|
|
|
+
|
|
|
+ return OutsourceNotifyOutSchema(
|
|
|
+ success=True,
|
|
|
+ out_source_id=out_source_id,
|
|
|
+ message="success",
|
|
|
+ )
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ async def _sync_local_quota(
|
|
|
+ cls, auth: AuthSchema, data: OutsourceNotifySchema, out_source_id: str
|
|
|
+ ):
|
|
|
+ """同步本地额度记录(pay_expense_quota)
|
|
|
+
|
|
|
+ 根据支付宝外部消费同步的结果,更新或创建本地额度记录。
|
|
|
+ - 消费:减少可用额度
|
|
|
+ - 退款:增加可用额度
|
|
|
+
|
|
|
+ 幂等:检查 out_biz_no 是否已存在,已存在则跳过。
|
|
|
+ """
|
|
|
+ # 幂等检查:out_source_id 已存在则直接跳过
|
|
|
+ existing_by_out = await auth.db.execute(
|
|
|
+ select(QuotaModel).where(QuotaModel.out_biz_no == out_source_id)
|
|
|
+ )
|
|
|
+ if existing_by_out.scalar_one_or_none():
|
|
|
+ log.info(f"外部消费额度同步已处理,跳过本地写入: out_source_id={out_source_id}")
|
|
|
+ return
|
|
|
+
|
|
|
+ institution_id = data.institution_id or ""
|
|
|
+
|
|
|
+ # 查询该员工在该制度下是否已有额度记录
|
|
|
+ stmt = select(QuotaModel).where(
|
|
|
+ QuotaModel.employee_id == data.employee_id,
|
|
|
+ QuotaModel.institution_id == institution_id,
|
|
|
+ )
|
|
|
+ result = await auth.db.execute(stmt)
|
|
|
+ existing = result.scalar_one_or_none()
|
|
|
+
|
|
|
+ amount_decimal = Decimal(str(abs(data.amount)))
|
|
|
+
|
|
|
+ if existing:
|
|
|
+ # 更新现有额度
|
|
|
+ if data.amount > 0:
|
|
|
+ # 消费:减少可用额度
|
|
|
+ new_available = (existing.available_amount or Decimal("0")) - amount_decimal
|
|
|
+ if new_available < 0:
|
|
|
+ new_available = Decimal("0")
|
|
|
+ else:
|
|
|
+ # 退款:增加可用额度
|
|
|
+ new_available = (existing.available_amount or Decimal("0")) + amount_decimal
|
|
|
+
|
|
|
+ new_status = QuotaStatusEnum.QUOTA_ACTIVE.value
|
|
|
+ if new_available <= 0:
|
|
|
+ new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value
|
|
|
+
|
|
|
+ upd = (
|
|
|
+ sa_update(QuotaModel)
|
|
|
+ .where(QuotaModel.id == existing.id)
|
|
|
+ .values(
|
|
|
+ available_amount=new_available,
|
|
|
+ status=new_status,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ await auth.db.execute(upd)
|
|
|
+ log.info(
|
|
|
+ f"更新本地额度: employee_id={data.employee_id}, "
|
|
|
+ f"institution_id={institution_id}, "
|
|
|
+ f"available_amount={new_available}, status={new_status}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 新建额度记录(仅记录可用额度,无额度ID)
|
|
|
+ new_available = Decimal("0")
|
|
|
+ if data.amount < 0:
|
|
|
+ # 退款时创建正向记录
|
|
|
+ new_available = amount_decimal
|
|
|
+
|
|
|
+ ins = insert(QuotaModel).values(
|
|
|
+ employee_id=data.employee_id,
|
|
|
+ institution_id=institution_id,
|
|
|
+ out_biz_no=out_source_id,
|
|
|
+ total_amount=new_available,
|
|
|
+ available_amount=new_available,
|
|
|
+ status=QuotaStatusEnum.QUOTA_ACTIVE.value,
|
|
|
+ enterprise_id=data.enterprise_id,
|
|
|
+ tenant_id=auth.user.tenant_id if auth.user else 1,
|
|
|
+ )
|
|
|
+ await auth.db.execute(ins)
|
|
|
+ log.info(
|
|
|
+ f"新建本地额度: employee_id={data.employee_id}, "
|
|
|
+ f"institution_id={institution_id}, "
|
|
|
+ f"available_amount={new_available}"
|
|
|
+ )
|
|
|
+
|
|
|
+ await auth.db.flush()
|