outsource_service.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import asyncio
  2. from decimal import Decimal
  3. from sqlalchemy import select, insert, update as sa_update
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.alipay import AlipayClient
  6. from app.core.exceptions import CustomException
  7. from app.core.logger import log
  8. from app.utils.snowflake import get_snowflake_id
  9. from .model import QuotaModel
  10. from .enums import QuotaStatusEnum
  11. from .outsource_schema import OutsourceNotifySchema, OutsourceNotifyOutSchema
  12. class OutsourceNotifyService:
  13. """外部消费额度同步服务
  14. 调用 alipay.ebpp.invoice.expensecomsue.outsource.notify 将外部消费同步给支付宝,
  15. 同时写入本地 pay_expense_quota 记录真实额度变动。
  16. """
  17. _alipay_request_cls = None
  18. _alipay_model_cls = None
  19. _alipay_response_cls = None
  20. @classmethod
  21. def _ensure_imports(cls):
  22. """延迟导入支付宝SDK"""
  23. if cls._alipay_request_cls is not None:
  24. return
  25. try:
  26. from alipay.aop.api.request.AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest import (
  27. AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest,
  28. )
  29. from alipay.aop.api.domain.AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel import (
  30. AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel,
  31. )
  32. from alipay.aop.api.response.AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse import (
  33. AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse,
  34. )
  35. cls._alipay_request_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyRequest
  36. cls._alipay_model_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyModel
  37. cls._alipay_response_cls = AlipayEbppInvoiceExpensecomsueOutsourceNotifyResponse
  38. except ImportError:
  39. raise CustomException(msg="支付宝SDK未正确安装(alipay-ebpp-invoice-expensecomsue-outsource-notify)")
  40. @classmethod
  41. def _execute_alipay(cls, request):
  42. """同步执行支付宝调用"""
  43. client = AlipayClient.get_client()
  44. return client.execute(request)
  45. @classmethod
  46. def _build_alipay_model(cls, data: OutsourceNotifySchema) -> object:
  47. """构建Alipay SDK Model
  48. 金额转换规则:
  49. - schema.amount 单位为元(正数为消费/扣款,负数为退款)
  50. - Alipay API 单位为分(正整数),is_off_set 控制方向
  51. """
  52. cls._ensure_imports()
  53. model = cls._alipay_model_cls()
  54. model.enterprise_id = data.enterprise_id
  55. model.employee_id = data.employee_id
  56. model.standard_id = data.standard_id
  57. # 元→分,取绝对值(正值),is_off_set 控制方向
  58. amount_cents = int(abs(data.amount) * 100)
  59. model.amount = str(amount_cents)
  60. # 方向:0=扣款 1=退款
  61. model.is_off_set = 1 if data.amount < 0 else 0
  62. model.out_source_id = data.out_source_id
  63. model.deal_time = data.deal_time
  64. if data.account_id:
  65. model.account_id = data.account_id
  66. if data.agreement_no:
  67. model.agreement_no = data.agreement_no
  68. if data.employee_id_type:
  69. model.employee_id_type = data.employee_id_type
  70. if data.employee_open_id:
  71. model.employee_open_id = data.employee_open_id
  72. if data.extend:
  73. model.extend = data.extend
  74. if data.platform:
  75. model.platform = data.platform
  76. if data.relate_no:
  77. model.relate_no = data.relate_no
  78. return model
  79. @classmethod
  80. async def notify_service(
  81. cls, auth: AuthSchema, data: OutsourceNotifySchema
  82. ) -> OutsourceNotifyOutSchema:
  83. """调用支付宝外部消费额度同步,并写入本地额度记录
  84. Args:
  85. auth: 认证信息
  86. data: 同步请求数据
  87. Returns:
  88. 同步结果
  89. """
  90. out_source_id = data.out_source_id or str(get_snowflake_id())
  91. # 第1步:调用支付宝
  92. try:
  93. cls._ensure_imports()
  94. model = cls._build_alipay_model(data)
  95. request = cls._alipay_request_cls()
  96. request.biz_model = model
  97. response = await asyncio.to_thread(cls._execute_alipay, request)
  98. if not response:
  99. raise CustomException(msg="外部消费额度同步失败: 无响应")
  100. result = cls._alipay_response_cls()
  101. result.parse_response_content(response)
  102. if not result.is_success():
  103. log.error(f"外部消费额度同步失败: {result.code} - {result.msg} ({result.sub_code}: {result.sub_msg})")
  104. # 不抛异常,返回失败信息
  105. return OutsourceNotifyOutSchema(
  106. success=False,
  107. out_source_id=out_source_id,
  108. message=f"{result.msg}: {result.sub_msg}" if result.sub_msg else result.msg,
  109. )
  110. log.info(
  111. f"外部消费额度同步成功: out_source_id={out_source_id}, "
  112. f"employee_id={data.employee_id}, amount={data.amount}"
  113. )
  114. except CustomException:
  115. raise
  116. except Exception as e:
  117. log.error(f"外部消费额度同步异常: {e}")
  118. return OutsourceNotifyOutSchema(
  119. success=False,
  120. out_source_id=out_source_id,
  121. message=str(e),
  122. )
  123. # 第2步:同步写入本地额度记录
  124. try:
  125. await cls._sync_local_quota(auth, data, out_source_id)
  126. except Exception as e:
  127. log.warning(f"本地额度记录同步失败(不影响支付宝侧): {e}")
  128. return OutsourceNotifyOutSchema(
  129. success=True,
  130. out_source_id=out_source_id,
  131. message="success",
  132. )
  133. @classmethod
  134. async def _sync_local_quota(
  135. cls, auth: AuthSchema, data: OutsourceNotifySchema, out_source_id: str
  136. ):
  137. """同步本地额度记录(pay_expense_quota)
  138. 根据支付宝外部消费同步的结果,更新或创建本地额度记录。
  139. - 消费:减少可用额度
  140. - 退款:增加可用额度
  141. 幂等:检查 out_biz_no 是否已存在,已存在则跳过。
  142. """
  143. # 幂等检查:out_source_id 已存在则直接跳过
  144. existing_by_out = await auth.db.execute(
  145. select(QuotaModel).where(QuotaModel.out_biz_no == out_source_id)
  146. )
  147. if existing_by_out.scalar_one_or_none():
  148. log.info(f"外部消费额度同步已处理,跳过本地写入: out_source_id={out_source_id}")
  149. return
  150. institution_id = data.institution_id or ""
  151. # 查询该员工在该制度下是否已有额度记录
  152. stmt = select(QuotaModel).where(
  153. QuotaModel.employee_id == data.employee_id,
  154. QuotaModel.institution_id == institution_id,
  155. )
  156. result = await auth.db.execute(stmt)
  157. existing = result.scalar_one_or_none()
  158. amount_decimal = Decimal(str(abs(data.amount)))
  159. if existing:
  160. # 更新现有额度
  161. if data.amount > 0:
  162. # 消费:减少可用额度
  163. new_available = (existing.available_amount or Decimal("0")) - amount_decimal
  164. if new_available < 0:
  165. new_available = Decimal("0")
  166. else:
  167. # 退款:增加可用额度
  168. new_available = (existing.available_amount or Decimal("0")) + amount_decimal
  169. new_status = QuotaStatusEnum.QUOTA_ACTIVE.value
  170. if new_available <= 0:
  171. new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value
  172. upd = (
  173. sa_update(QuotaModel)
  174. .where(QuotaModel.id == existing.id)
  175. .values(
  176. available_amount=new_available,
  177. status=new_status,
  178. )
  179. )
  180. await auth.db.execute(upd)
  181. log.info(
  182. f"更新本地额度: employee_id={data.employee_id}, "
  183. f"institution_id={institution_id}, "
  184. f"available_amount={new_available}, status={new_status}"
  185. )
  186. else:
  187. # 新建额度记录(仅记录可用额度,无额度ID)
  188. new_available = Decimal("0")
  189. if data.amount < 0:
  190. # 退款时创建正向记录
  191. new_available = amount_decimal
  192. ins = insert(QuotaModel).values(
  193. employee_id=data.employee_id,
  194. institution_id=institution_id,
  195. out_biz_no=out_source_id,
  196. total_amount=new_available,
  197. available_amount=new_available,
  198. status=QuotaStatusEnum.QUOTA_ACTIVE.value,
  199. enterprise_id=data.enterprise_id,
  200. tenant_id=auth.user.tenant_id if auth.user else 1,
  201. )
  202. await auth.db.execute(ins)
  203. log.info(
  204. f"新建本地额度: employee_id={data.employee_id}, "
  205. f"institution_id={institution_id}, "
  206. f"available_amount={new_available}"
  207. )
  208. await auth.db.flush()