service.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from app.api.v1.module_system.auth.schema import AuthSchema
  2. from app.core.exceptions import CustomException
  3. from app.core.logger import log
  4. from app.utils.snowflake import get_snowflake_id_str, get_snowflake_id
  5. from typing import Optional
  6. import time
  7. from .crud import OpenConfCRUD, OpenTransferCRUD
  8. from .schema import OpenConfOutSchema, OpenConfUpdateSchema, OpenTransferSchema, OpenTransferOutSchema, \
  9. OpenTransferQuerySchema
  10. from app.plugin.module_payment.account import AccountService, TransferCRUD, TransferOutSchema
  11. import aiohttp
  12. import asyncio
  13. from ..apikey.service import TenantApiKeyService
  14. async def fetch_manual_retry(
  15. session,
  16. url: str,
  17. notify_id: str,
  18. timestamp: int,
  19. content: str,
  20. max_retries: int=2
  21. ):
  22. for attempt in range(max_retries):
  23. try:
  24. log.debug("第 {} 次尝试: {}", attempt + 1, url)
  25. form_data = aiohttp.FormData()
  26. form_data.add_field('notify_id', notify_id)
  27. form_data.add_field('timestamp', timestamp)
  28. form_data.add_field('content', content)
  29. async with session.post(url=url, data=form_data) as response:
  30. if response.status == 200:
  31. return await response.text()
  32. elif response.status in [500, 502, 503]:
  33. # 服务器错误,准备重试
  34. pass
  35. else:
  36. # 客户端错误,直接抛出,不再重试
  37. response.raise_for_status()
  38. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  39. if attempt < max_retries - 1:
  40. # 计算等待时间 (简单的指数退避: 1s, 2s, 4s...)
  41. wait_time = 2 ** attempt
  42. log.debug("等待 {} 秒后重试...", wait_time)
  43. await asyncio.sleep(wait_time)
  44. else:
  45. log.debug("达到最大重试次数,放弃。")
  46. return None
  47. return None
  48. class OpenTransferService:
  49. @classmethod
  50. async def open_return_service(
  51. cls,
  52. auth: AuthSchema,
  53. order_no: str,
  54. ) -> bool:
  55. """
  56. 发送转账回调通知
  57. 参数:
  58. - auth: 认证信息
  59. - order_no: 订单号
  60. 返回:
  61. - bool: 是否成功发送通知
  62. """
  63. try:
  64. transfer_crud = TransferCRUD(auth)
  65. transfer_data = await transfer_crud.get_by_order_no(order_no)
  66. if not transfer_data or not transfer_data.out_biz_no:
  67. log.info("开放回调通知: 订单不存在或缺少 out_biz_no, order_no={}", order_no)
  68. return False
  69. open_transfer_crud = OpenTransferCRUD(auth)
  70. open_data = await open_transfer_crud.get(out_biz_no=transfer_data.out_biz_no)
  71. if not open_data:
  72. log.info("开放回调通知: 开放转账记录不存在, out_biz_no={}", transfer_data.out_biz_no)
  73. return False
  74. auth.tenant_id = open_data.tenant_id
  75. auth.check_data_scope = True
  76. apikey_data = None
  77. # 先从apikey那return_url,否则使用conf
  78. if open_data.api_key:
  79. apikey_data = await TenantApiKeyService.get_apikey_service(auth=auth, api_key=open_data.api_key)
  80. log.info("开放回调通知: 从apikey获取回调地址, api_key={}, return_url={}", open_data.api_key, apikey_data.return_url)
  81. if apikey_data:
  82. return_url = apikey_data.return_url
  83. else:
  84. conf = await OpenConfService.get_conf_service(auth)
  85. if not conf:
  86. log.info("开放回调通知: 开放转账配置不存在, tenant_id={}", auth.tenant_id)
  87. return False
  88. return_url = conf.return_url
  89. if not return_url:
  90. log.info("开放回调通知: 回调地址不存在")
  91. return False
  92. result = TransferOutSchema.model_validate(transfer_data)
  93. result.third_biz_no = open_data.third_biz_no
  94. notify_id = f"n{get_snowflake_id()}"
  95. timestamp = int(time.time() * 1000)
  96. content = result.model_dump_json(exclude_none=True)
  97. timeout = aiohttp.ClientTimeout(total=30)
  98. async with aiohttp.ClientSession(timeout=timeout) as session:
  99. log.info("开放回调通知: 回调请求 third_biz_no={} order_no={}, url={}, notify_id={}",
  100. open_data.third_biz_no, order_no, return_url, notify_id)
  101. await fetch_manual_retry(
  102. session, return_url, notify_id, timestamp, content
  103. )
  104. return True
  105. except Exception as e:
  106. log.error("开放回调通知异常: order_no={}, error={}", order_no, e, exc_info=True)
  107. return False
  108. @classmethod
  109. async def open_query_service(
  110. cls,
  111. auth: AuthSchema,
  112. query: OpenTransferQuerySchema
  113. ) -> dict:
  114. crud = OpenTransferCRUD(auth)
  115. transfer_data = await crud.get(third_biz_no=query.third_biz_no)
  116. if transfer_data is None:
  117. raise CustomException("三方订单号不存在")
  118. result = await AccountService.transfer_detail_service(auth=auth, out_biz_no=transfer_data.out_biz_no)
  119. result.third_biz_no = transfer_data.third_biz_no
  120. return result.model_dump(exclude_none=True)
  121. @classmethod
  122. async def open_transfer_service(
  123. cls,
  124. auth: AuthSchema,
  125. data: OpenTransferSchema
  126. ) -> OpenTransferOutSchema:
  127. third_biz_no = data.third_biz_no
  128. if not third_biz_no:
  129. raise CustomException("三方订单号不能为空")
  130. # 先查询是否存在三方订单号
  131. crud = OpenTransferCRUD(auth)
  132. existing = await crud.get(third_biz_no=third_biz_no)
  133. if existing:
  134. raise CustomException("三方订单号已存在")
  135. # 执行转账记录创建
  136. result = await AccountService.transfer_service(auth=auth, data=data)
  137. log.info(f"租户资金专户转账发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}")
  138. # 保存三方订单号关联记录
  139. create_data = {
  140. "third_biz_no": third_biz_no,
  141. "out_biz_no": result.out_biz_no,
  142. "api_key": data.api_key,
  143. }
  144. await crud.create(create_data)
  145. return OpenTransferOutSchema(
  146. status=result.status,
  147. order_no=result.order_no,
  148. third_biz_no=third_biz_no,
  149. )
  150. class OpenConfService:
  151. """开放配置服务层"""
  152. @classmethod
  153. async def get_conf_service(
  154. cls,
  155. auth: AuthSchema,
  156. ) -> Optional[OpenConfOutSchema]:
  157. """
  158. 查询开放配置
  159. """
  160. crud = OpenConfCRUD(auth)
  161. result = await crud.get(tenant_id=auth.tenant_id)
  162. if result is not None:
  163. return OpenConfOutSchema.model_validate(result)
  164. return None
  165. @classmethod
  166. async def save_conf_service(
  167. cls,
  168. auth: AuthSchema,
  169. data: OpenConfUpdateSchema,
  170. ) -> OpenConfOutSchema:
  171. """
  172. 创建/更新开放配置(前端只允许配置回调地址)
  173. """
  174. # 先查询是否存在配置
  175. update_data = {
  176. "notify_url": data.notify_url,
  177. "return_url": data.return_url,
  178. }
  179. crud = OpenConfCRUD(auth)
  180. existing = await crud.get_first()
  181. if not existing:
  182. update_data["app_id"] = get_snowflake_id_str(auth.tenant_id)
  183. update_data["gateway_url"] = "https://api.qcsj88888.com"
  184. result = await crud.create(update_data)
  185. log.info(f"开放配置创建成功: 租户ID: {auth.tenant_id}")
  186. else:
  187. result = await crud.update(existing.id, update_data)
  188. log.info(f"开放配置更新成功: 租户ID: {auth.tenant_id}")
  189. return OpenConfOutSchema.model_validate(result)