| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id_str, get_snowflake_id
- from typing import Optional
- import time
- from .crud import OpenConfCRUD, OpenTransferCRUD
- from .schema import OpenConfOutSchema, OpenConfUpdateSchema, OpenTransferSchema, OpenTransferOutSchema, \
- OpenTransferQuerySchema
- from app.plugin.module_payment.account import AccountService, TransferCRUD, TransferOutSchema
- import aiohttp
- import asyncio
- from ..apikey.service import TenantApiKeyService
- async def fetch_manual_retry(
- session,
- url: str,
- notify_id: str,
- timestamp: int,
- content: str,
- max_retries: int=2
- ):
- for attempt in range(max_retries):
- try:
- log.debug("第 {} 次尝试: {}", attempt + 1, url)
- form_data = aiohttp.FormData()
- form_data.add_field('notify_id', notify_id)
- form_data.add_field('timestamp', timestamp)
- form_data.add_field('content', content)
- async with session.post(url=url, data=form_data) as response:
- if response.status == 200:
- return await response.text()
- elif response.status in [500, 502, 503]:
- # 服务器错误,准备重试
- pass
- else:
- # 客户端错误,直接抛出,不再重试
- response.raise_for_status()
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- if attempt < max_retries - 1:
- # 计算等待时间 (简单的指数退避: 1s, 2s, 4s...)
- wait_time = 2 ** attempt
- log.debug("等待 {} 秒后重试...", wait_time)
- await asyncio.sleep(wait_time)
- else:
- log.debug("达到最大重试次数,放弃。")
- return None
- return None
- class OpenTransferService:
- @classmethod
- async def open_return_service(
- cls,
- auth: AuthSchema,
- order_no: str,
- ) -> bool:
- """
- 发送转账回调通知
-
- 参数:
- - auth: 认证信息
- - order_no: 订单号
-
- 返回:
- - bool: 是否成功发送通知
- """
- try:
- transfer_crud = TransferCRUD(auth)
- transfer_data = await transfer_crud.get_by_order_no(order_no)
- if not transfer_data or not transfer_data.out_biz_no:
- log.info("开放回调通知: 订单不存在或缺少 out_biz_no, order_no={}", order_no)
- return False
- open_transfer_crud = OpenTransferCRUD(auth)
- open_data = await open_transfer_crud.get(out_biz_no=transfer_data.out_biz_no)
- if not open_data:
- log.info("开放回调通知: 开放转账记录不存在, out_biz_no={}", transfer_data.out_biz_no)
- return False
- auth.tenant_id = open_data.tenant_id
- auth.check_data_scope = True
- apikey_data = None
- # 先从apikey那return_url,否则使用conf
- if open_data.api_key:
- apikey_data = await TenantApiKeyService.get_apikey_service(auth=auth, api_key=open_data.api_key)
- log.info("开放回调通知: 从apikey获取回调地址, api_key={}, return_url={}", open_data.api_key, apikey_data.return_url)
- if apikey_data:
- return_url = apikey_data.return_url
- else:
- conf = await OpenConfService.get_conf_service(auth)
- if not conf:
- log.info("开放回调通知: 开放转账配置不存在, tenant_id={}", auth.tenant_id)
- return False
- return_url = conf.return_url
- if not return_url:
- log.info("开放回调通知: 回调地址不存在")
- return False
- result = TransferOutSchema.model_validate(transfer_data)
- result.third_biz_no = open_data.third_biz_no
-
- notify_id = f"n{get_snowflake_id()}"
- timestamp = int(time.time() * 1000)
- content = result.model_dump_json(exclude_none=True)
- timeout = aiohttp.ClientTimeout(total=30)
- async with aiohttp.ClientSession(timeout=timeout) as session:
- log.info("开放回调通知: 回调请求 third_biz_no={} order_no={}, url={}, notify_id={}",
- open_data.third_biz_no, order_no, return_url, notify_id)
- await fetch_manual_retry(
- session, return_url, notify_id, timestamp, content
- )
- return True
- except Exception as e:
- log.error("开放回调通知异常: order_no={}, error={}", order_no, e, exc_info=True)
- return False
- @classmethod
- async def open_query_service(
- cls,
- auth: AuthSchema,
- query: OpenTransferQuerySchema
- ) -> dict:
- crud = OpenTransferCRUD(auth)
- transfer_data = await crud.get(third_biz_no=query.third_biz_no)
- if transfer_data is None:
- raise CustomException("三方订单号不存在")
-
- result = await AccountService.transfer_detail_service(auth=auth, out_biz_no=transfer_data.out_biz_no)
- result.third_biz_no = transfer_data.third_biz_no
- return result.model_dump(exclude_none=True)
- @classmethod
- async def open_transfer_service(
- cls,
- auth: AuthSchema,
- data: OpenTransferSchema
- ) -> OpenTransferOutSchema:
- third_biz_no = data.third_biz_no
- if not third_biz_no:
- raise CustomException("三方订单号不能为空")
- # 先查询是否存在三方订单号
- crud = OpenTransferCRUD(auth)
- existing = await crud.get(third_biz_no=third_biz_no)
- if existing:
- raise CustomException("三方订单号已存在")
- # 执行转账
- try:
- result = await AccountService.transfer_service(auth=auth, data=data)
- log.info(f"租户资金专户转账发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}")
- # 保存三方订单号关联记录
- create_data = {
- "third_biz_no": third_biz_no,
- "out_biz_no": result.out_biz_no,
- "api_key": data.api_key,
- }
- await crud.create(create_data)
- return OpenTransferOutSchema(
- status=result.status,
- order_no=result.order_no,
- third_biz_no=third_biz_no,
- )
- except CustomException as e:
- # 转账失败(支付宝返回错误),通知调用方
- error_msg = e.args[0] if e.args else "转账失败"
- log.warning(f"租户资金专户转账失败: third_biz_no={third_biz_no}, error={error_msg}")
- # 异步通知调用方
- try:
- await cls._open_return_fail(auth, third_biz_no, error_msg, data)
- except Exception as notify_err:
- log.warning(f"发送失败通知异常: {notify_err}")
- return OpenTransferOutSchema(
- status="FAIL",
- order_no="",
- third_biz_no=third_biz_no,
- )
- @classmethod
- async def _open_return_fail(
- cls,
- auth: AuthSchema,
- third_biz_no: str,
- error_msg: str,
- data: OpenTransferSchema,
- ) -> None:
- """转账失败时通知调用方"""
- import time
- import json
- import aiohttp
- # 获取回调地址
- return_url = None
- if data.api_key:
- from ..apikey.service import TenantApiKeyService
- try:
- apikey_data = await TenantApiKeyService.get_apikey_service(auth=auth, api_key=data.api_key)
- return_url = apikey_data.return_url
- except Exception:
- pass
- if not return_url:
- conf = await OpenConfService.get_conf_service(auth)
- if conf:
- return_url = conf.return_url
- if not return_url:
- log.warning("转账失败通知: 未配置回调地址")
- return
- from app.utils.snowflake import get_snowflake_id
- notify_id = f"n{get_snowflake_id()}"
- timestamp = int(time.time() * 1000)
- content = json.dumps({
- "status": "FAIL",
- "third_biz_no": third_biz_no,
- "amount": str(data.amount),
- "error_msg": error_msg,
- }, ensure_ascii=False)
- timeout = aiohttp.ClientTimeout(total=30)
- async with aiohttp.ClientSession(timeout=timeout) as session:
- log.info(f"转账失败回调: third_biz_no={third_biz_no}, url={return_url}")
- await fetch_manual_retry(session, return_url, notify_id, timestamp, content)
- class OpenConfService:
- """开放配置服务层"""
- @classmethod
- async def get_conf_service(
- cls,
- auth: AuthSchema,
- ) -> Optional[OpenConfOutSchema]:
- """
- 查询开放配置
- """
- crud = OpenConfCRUD(auth)
- result = await crud.get(tenant_id=auth.tenant_id)
- if result is not None:
- return OpenConfOutSchema.model_validate(result)
- return None
- @classmethod
- async def save_conf_service(
- cls,
- auth: AuthSchema,
- data: OpenConfUpdateSchema,
- ) -> OpenConfOutSchema:
- """
- 创建/更新开放配置(前端只允许配置回调地址)
- """
- # 先查询是否存在配置
- update_data = {
- "notify_url": data.notify_url,
- "return_url": data.return_url,
- }
- crud = OpenConfCRUD(auth)
- existing = await crud.get_first()
- if not existing:
- update_data["app_id"] = get_snowflake_id_str(auth.tenant_id)
- update_data["gateway_url"] = "https://api.qcsj88888.com"
- result = await crud.create(update_data)
- log.info(f"开放配置创建成功: 租户ID: {auth.tenant_id}")
- else:
- result = await crud.update(existing.id, update_data)
- log.info(f"开放配置更新成功: 租户ID: {auth.tenant_id}")
- return OpenConfOutSchema.model_validate(result)
|