service.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from app.api.v1.module_system.auth.schema import AuthSchema
  2. from app.core.exceptions import CustomException
  3. from app.core.logger import log
  4. from app.utils.snowflake import get_snowflake_id_str, get_snowflake_id
  5. from typing import Optional
  6. import time
  7. import json
  8. from .crud import OpenConfCRUD, OpenTransferCRUD
  9. from .schema import OpenConfOutSchema, OpenConfUpdateSchema, OpenTransferSchema, OpenTransferOutSchema, \
  10. OpenTransferQuerySchema
  11. from app.plugin.module_payment.account import AccountService, TransferCRUD, TransferOutSchema
  12. import aiohttp
  13. import asyncio
  14. async def fetch_manual_retry(session, url, notify_id, app_id, timestamp, content, max_retries=2):
  15. for attempt in range(max_retries):
  16. try:
  17. log.debug("第 {} 次尝试: {}", attempt + 1, url)
  18. form_data = aiohttp.FormData()
  19. form_data.add_field('notify_id', notify_id)
  20. form_data.add_field('app_id', app_id)
  21. form_data.add_field('timestamp', timestamp)
  22. form_data.add_field('content', content)
  23. async with session.post(url=url, data=form_data) as response:
  24. if response.status == 200:
  25. return await response.text()
  26. elif response.status in [500, 502, 503]:
  27. # 服务器错误,准备重试
  28. pass
  29. else:
  30. # 客户端错误,直接抛出,不再重试
  31. response.raise_for_status()
  32. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  33. if attempt < max_retries - 1:
  34. # 计算等待时间 (简单的指数退避: 1s, 2s, 4s...)
  35. wait_time = 2 ** attempt
  36. log.debug("等待 {} 秒后重试...", wait_time)
  37. await asyncio.sleep(wait_time)
  38. else:
  39. log.debug("达到最大重试次数,放弃。")
  40. return None
  41. return None
  42. class OpenTransferService:
  43. @classmethod
  44. async def open_return_service(
  45. cls,
  46. auth: AuthSchema,
  47. order_no: str,
  48. ) -> bool:
  49. """
  50. 发送转账回调通知
  51. 参数:
  52. - auth: 认证信息
  53. - order_no: 订单号
  54. 返回:
  55. - bool: 是否成功发送通知
  56. """
  57. try:
  58. transfer_crud = TransferCRUD(auth)
  59. transfer = await transfer_crud.get_by_order_no(order_no)
  60. if not transfer or not transfer.out_biz_no:
  61. log.info("回调通知: 订单不存在或缺少 out_biz_no, order_no={}", order_no)
  62. return False
  63. open_transfer_crud = OpenTransferCRUD(auth)
  64. open_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no)
  65. if not open_data:
  66. log.info("回调通知: 开放转账记录不存在, out_biz_no={}", transfer.out_biz_no)
  67. return False
  68. auth.tenant_id = open_data.tenant_id
  69. conf = await OpenConfService.get_conf_service(auth)
  70. if not conf:
  71. log.info("回调通知: 开放转账配置不存在, tenant_id={}", auth.tenant_id)
  72. return False
  73. if not conf.return_url:
  74. log.info("回调通知: 未配置回调地址, tenant_id={}", auth.tenant_id)
  75. return False
  76. result = TransferOutSchema.model_validate(transfer).model_dump(exclude_none=True)
  77. result["third_biz_no"] = open_data.third_biz_no
  78. notify_id = f"n{get_snowflake_id()}"
  79. timestamp = int(time.time() * 1000)
  80. content = json.dumps(result)
  81. timeout = aiohttp.ClientTimeout(total=30)
  82. async with aiohttp.ClientSession(timeout=timeout) as session:
  83. log.info("回调通知: order_no={}, url={}, notify_id={}", order_no, conf.return_url, notify_id)
  84. await fetch_manual_retry(
  85. session, conf.return_url, notify_id, conf.app_id, timestamp, content
  86. )
  87. except Exception as e:
  88. log.error("回调通知异常: order_no={}, error={}", order_no, e, exc_info=True)
  89. return False
  90. @classmethod
  91. async def open_query_service(
  92. cls,
  93. auth: AuthSchema,
  94. query: OpenTransferQuerySchema
  95. ) -> dict:
  96. crud = OpenTransferCRUD(auth)
  97. transfer_data = await crud.get(third_biz_no=query.third_biz_no)
  98. if transfer_data is None:
  99. raise CustomException("三方订单号不存在")
  100. result = await AccountService.transfer_detail_service(auth=auth, out_biz_no=transfer_data.out_biz_no)
  101. result_dict = result.model_dump(exclude_none=True)
  102. result_dict.update({"third_biz_no": transfer_data.third_biz_no})
  103. return result_dict
  104. @classmethod
  105. async def open_transfer_service(
  106. cls,
  107. auth: AuthSchema,
  108. data: OpenTransferSchema
  109. ) -> OpenTransferOutSchema:
  110. third_biz_no = data.third_biz_no
  111. if not third_biz_no:
  112. raise CustomException("三方订单号不能为空")
  113. # 先查询是否存在三方订单号
  114. crud = OpenTransferCRUD(auth)
  115. existing = await crud.get(third_biz_no=third_biz_no)
  116. if existing:
  117. raise CustomException("三方订单号已存在")
  118. # 执行转账记录创建
  119. result = await AccountService.transfer_service(auth=auth, data=data)
  120. log.info(f"租户资金专户转账发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}")
  121. # 保存三方订单号关联记录
  122. create_data = {
  123. "third_biz_no": third_biz_no,
  124. "out_biz_no": result.out_biz_no,
  125. }
  126. await crud.create(create_data)
  127. return OpenTransferOutSchema(
  128. status=result.status,
  129. order_no=result.order_no,
  130. third_biz_no=third_biz_no,
  131. )
  132. class OpenConfService:
  133. """开放配置服务层"""
  134. @classmethod
  135. async def get_conf_service(
  136. cls,
  137. auth: AuthSchema,
  138. ) -> Optional[OpenConfOutSchema]:
  139. """
  140. 查询开放配置
  141. """
  142. crud = OpenConfCRUD(auth)
  143. result = await crud.get_first()
  144. if result is not None:
  145. return OpenConfOutSchema.model_validate(result)
  146. return None
  147. @classmethod
  148. async def save_conf_service(
  149. cls,
  150. auth: AuthSchema,
  151. data: OpenConfUpdateSchema,
  152. ) -> OpenConfOutSchema:
  153. """
  154. 创建/更新开放配置(前端只允许配置回调地址)
  155. """
  156. # 先查询是否存在配置
  157. update_data = {
  158. "notify_url": data.notify_url,
  159. "return_url": data.return_url,
  160. }
  161. crud = OpenConfCRUD(auth)
  162. existing = await crud.get_first()
  163. if not existing:
  164. update_data["app_id"] = get_snowflake_id_str(auth.tenant_id)
  165. update_data["gateway_url"] = "https://api.qcsj88888.com"
  166. result = await crud.create(update_data)
  167. log.info(f"开放配置创建成功: 租户ID: {auth.tenant_id}")
  168. else:
  169. result = await crud.update(existing.id, update_data)
  170. log.info(f"开放配置更新成功: 租户ID: {auth.tenant_id}")
  171. return OpenConfOutSchema.model_validate(result)