| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import asyncio
- from decimal import Decimal
- from sqlalchemy import select, insert, update as sa_update
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id
- from .model import QuotaModel
- from .enums import QuotaStatusEnum
- from .outsource_schema import OutsourceNotifySchema, OutsourceNotifyOutSchema
- class OutsourceNotifyService:
- """外部消费额度同步服务
- 调用 alipay.ebpp.invoice.expensecomsue.outsource.notify 将外部消费同步给支付宝,
- 同时写入本地 pay_expense_quota 记录真实额度变动。
- """
- _alipay_request_cls = None
- _alipay_model_cls = None
- _alipay_response_cls = None
- @classmethod
- def _ensure_imports(cls):
- """延迟导入支付宝SDK"""
- if cls._alipay_request_cls is not None:
- return
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest import (
- AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel import (
- AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse import (
- AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse,
- )
- cls._alipay_request_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest
- cls._alipay_model_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel
- cls._alipay_response_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-expensecomsue-outsource-notify)")
- @classmethod
- def _execute_alipay(cls, request):
- """同步执行支付宝调用"""
- client = AlipayClient.get_client()
- return client.execute(request)
- @classmethod
- def _build_alipay_model(cls, data: OutsourceNotifySchema) -> object:
- """构建Alipay SDK Model
- 金额转换规则:
- - schema.amount 单位为元(正数为消费/扣款,负数为退款)
- - Alipay API 单位为分(正整数),is_off_set 控制方向
- """
- cls._ensure_imports()
- model = cls._alipay_model_cls()
- model.enterprise_id = data.enterprise_id
- model.employee_id = data.employee_id
- model.standard_id = data.standard_id
- # 元→分,取绝对值(正值),is_off_set 控制方向
- amount_cents = int(abs(data.amount) * 100)
- model.amount = str(amount_cents)
- # 方向:0=扣款 1=退款
- model.is_off_set = 1 if data.amount < 0 else 0
- model.out_source_id = data.out_source_id
- model.deal_time = data.deal_time
- if data.account_id:
- model.account_id = data.account_id
- if data.agreement_no:
- model.agreement_no = data.agreement_no
- if data.employee_id_type:
- model.employee_id_type = data.employee_id_type
- if data.employee_open_id:
- model.employee_open_id = data.employee_open_id
- if data.extend:
- model.extend = data.extend
- if data.platform:
- model.platform = data.platform
- if data.relate_no:
- model.relate_no = data.relate_no
- return model
- @classmethod
- async def notify_service(
- cls, auth: AuthSchema, data: OutsourceNotifySchema
- ) -> OutsourceNotifyOutSchema:
- """调用支付宝外部消费额度同步,并写入本地额度记录
- Args:
- auth: 认证信息
- data: 同步请求数据
- Returns:
- 同步结果
- """
- out_source_id = data.out_source_id or str(get_snowflake_id())
- # 第1步:调用支付宝
- try:
- cls._ensure_imports()
- model = cls._build_alipay_model(data)
- request = cls._alipay_request_cls()
- request.biz_model = model
- response = await asyncio.to_thread(cls._execute_alipay, request)
- if not response:
- raise CustomException(msg="外部消费额度同步失败: 无响应")
- result = cls._alipay_response_cls()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"外部消费额度同步失败: {result.code} - {result.msg} ({result.sub_code}: {result.sub_msg})")
- # 不抛异常,返回失败信息
- return OutsourceNotifyOutSchema(
- success=False,
- out_source_id=out_source_id,
- message=f"{result.msg}: {result.sub_msg}" if result.sub_msg else result.msg,
- )
- log.info(
- f"外部消费额度同步成功: out_source_id={out_source_id}, "
- f"employee_id={data.employee_id}, amount={data.amount}"
- )
- except CustomException:
- raise
- except Exception as e:
- log.error(f"外部消费额度同步异常: {e}")
- return OutsourceNotifyOutSchema(
- success=False,
- out_source_id=out_source_id,
- message=str(e),
- )
- # 第2步:同步写入本地额度记录
- try:
- await cls._sync_local_quota(auth, data, out_source_id)
- except Exception as e:
- log.warning(f"本地额度记录同步失败(不影响支付宝侧): {e}")
- return OutsourceNotifyOutSchema(
- success=True,
- out_source_id=out_source_id,
- message="success",
- )
- @classmethod
- async def _sync_local_quota(
- cls, auth: AuthSchema, data: OutsourceNotifySchema, out_source_id: str
- ):
- """同步本地额度记录(pay_expense_quota)
- 根据支付宝外部消费同步的结果,更新或创建本地额度记录。
- - 消费:减少可用额度
- - 退款:增加可用额度
- 幂等:检查 out_biz_no 是否已存在,已存在则跳过。
- """
- # 幂等检查:out_source_id 已存在则直接跳过
- existing_by_out = await auth.db.execute(
- select(QuotaModel).where(QuotaModel.out_biz_no == out_source_id)
- )
- if existing_by_out.scalar_one_or_none():
- log.info(f"外部消费额度同步已处理,跳过本地写入: out_source_id={out_source_id}")
- return
- institution_id = data.institution_id or ""
- # 查询该员工在该制度下是否已有额度记录
- stmt = select(QuotaModel).where(
- QuotaModel.employee_id == data.employee_id,
- QuotaModel.institution_id == institution_id,
- )
- result = await auth.db.execute(stmt)
- existing = result.scalar_one_or_none()
- amount_decimal = Decimal(str(abs(data.amount)))
- if existing:
- # 更新现有额度
- if data.amount > 0:
- # 消费:减少可用额度
- new_available = (existing.available_amount or Decimal("0")) - amount_decimal
- if new_available < 0:
- new_available = Decimal("0")
- else:
- # 退款:增加可用额度
- new_available = (existing.available_amount or Decimal("0")) + amount_decimal
- new_status = QuotaStatusEnum.QUOTA_ACTIVE.value
- if new_available <= 0:
- new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value
- upd = (
- sa_update(QuotaModel)
- .where(QuotaModel.id == existing.id)
- .values(
- available_amount=new_available,
- status=new_status,
- )
- )
- await auth.db.execute(upd)
- log.info(
- f"更新本地额度: employee_id={data.employee_id}, "
- f"institution_id={institution_id}, "
- f"available_amount={new_available}, status={new_status}"
- )
- else:
- # 新建额度记录(仅记录可用额度,无额度ID)
- new_available = Decimal("0")
- if data.amount < 0:
- # 退款时创建正向记录
- new_available = amount_decimal
- ins = insert(QuotaModel).values(
- employee_id=data.employee_id,
- institution_id=institution_id,
- out_biz_no=out_source_id,
- total_amount=new_available,
- available_amount=new_available,
- status=QuotaStatusEnum.QUOTA_ACTIVE.value,
- enterprise_id=data.enterprise_id,
- tenant_id=auth.user.tenant_id if auth.user else 1,
- )
- await auth.db.execute(ins)
- log.info(
- f"新建本地额度: employee_id={data.employee_id}, "
- f"institution_id={institution_id}, "
- f"available_amount={new_available}"
- )
- await auth.db.flush()
|