from app.api.v1.module_system.auth.schema import AuthSchema from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str, get_snowflake_id from typing import Optional import time import json from .crud import OpenConfCRUD, OpenTransferCRUD from .schema import OpenConfOutSchema, OpenConfUpdateSchema, OpenTransferSchema, OpenTransferOutSchema, \ OpenTransferQuerySchema from app.plugin.module_payment.account import AccountService, TransferCRUD, TransferOutSchema import aiohttp import asyncio async def fetch_manual_retry(session, url, notify_id, app_id, timestamp, content, max_retries=2): for attempt in range(max_retries): try: log.debug("第 {} 次尝试: {}", attempt + 1, url) form_data = aiohttp.FormData() form_data.add_field('notify_id', notify_id) form_data.add_field('app_id', app_id) form_data.add_field('timestamp', timestamp) form_data.add_field('content', content) async with session.post(url=url, data=form_data) as response: if response.status == 200: return await response.text() elif response.status in [500, 502, 503]: # 服务器错误,准备重试 pass else: # 客户端错误,直接抛出,不再重试 response.raise_for_status() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt < max_retries - 1: # 计算等待时间 (简单的指数退避: 1s, 2s, 4s...) wait_time = 2 ** attempt log.debug("等待 {} 秒后重试...", wait_time) await asyncio.sleep(wait_time) else: log.debug("达到最大重试次数,放弃。") return None return None class OpenTransferService: @classmethod async def open_return_service( cls, auth: AuthSchema, order_no: str, ) -> bool: """ 发送转账回调通知 参数: - auth: 认证信息 - order_no: 订单号 返回: - bool: 是否成功发送通知 """ try: transfer_crud = TransferCRUD(auth) transfer = await transfer_crud.get_by_order_no(order_no) if not transfer or not transfer.out_biz_no: log.info("回调通知: 订单不存在或缺少 out_biz_no, order_no={}", order_no) return False open_transfer_crud = OpenTransferCRUD(auth) open_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no) if not open_data: log.info("回调通知: 开放转账记录不存在, out_biz_no={}", transfer.out_biz_no) return False auth.tenant_id = open_data.tenant_id conf = await OpenConfService.get_conf_service(auth) if not conf: log.info("回调通知: 开放转账配置不存在, tenant_id={}", auth.tenant_id) return False if not conf.return_url: log.info("回调通知: 未配置回调地址, tenant_id={}", auth.tenant_id) return False result = TransferOutSchema.model_validate(transfer).model_dump(exclude_none=True) result["third_biz_no"] = open_data.third_biz_no notify_id = f"n{get_snowflake_id()}" timestamp = int(time.time() * 1000) content = json.dumps(result) timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: log.info("回调通知: order_no={}, url={}, notify_id={}", order_no, conf.return_url, notify_id) await fetch_manual_retry( session, conf.return_url, notify_id, conf.app_id, timestamp, content ) except Exception as e: log.error("回调通知异常: order_no={}, error={}", order_no, e, exc_info=True) return False @classmethod async def open_query_service( cls, auth: AuthSchema, query: OpenTransferQuerySchema ) -> dict: crud = OpenTransferCRUD(auth) transfer_data = await crud.get(third_biz_no=query.third_biz_no) if transfer_data is None: raise CustomException("三方订单号不存在") result = await AccountService.transfer_detail_service(auth=auth, out_biz_no=transfer_data.out_biz_no) result_dict = result.model_dump(exclude_none=True) result_dict.update({"third_biz_no": transfer_data.third_biz_no}) return result_dict @classmethod async def open_transfer_service( cls, auth: AuthSchema, data: OpenTransferSchema ) -> OpenTransferOutSchema: third_biz_no = data.third_biz_no if not third_biz_no: raise CustomException("三方订单号不能为空") # 先查询是否存在三方订单号 crud = OpenTransferCRUD(auth) existing = await crud.get(third_biz_no=third_biz_no) if existing: raise CustomException("三方订单号已存在") # 执行转账记录创建 result = await AccountService.transfer_service(auth=auth, data=data) log.info(f"租户资金专户转账发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}") # 保存三方订单号关联记录 create_data = { "third_biz_no": third_biz_no, "out_biz_no": result.out_biz_no, } await crud.create(create_data) return OpenTransferOutSchema( status=result.status, order_no=result.order_no, third_biz_no=third_biz_no, ) class OpenConfService: """开放配置服务层""" @classmethod async def get_conf_service( cls, auth: AuthSchema, ) -> Optional[OpenConfOutSchema]: """ 查询开放配置 """ crud = OpenConfCRUD(auth) result = await crud.get_first() if result is not None: return OpenConfOutSchema.model_validate(result) return None @classmethod async def save_conf_service( cls, auth: AuthSchema, data: OpenConfUpdateSchema, ) -> OpenConfOutSchema: """ 创建/更新开放配置(前端只允许配置回调地址) """ # 先查询是否存在配置 update_data = { "notify_url": data.notify_url, "return_url": data.return_url, } crud = OpenConfCRUD(auth) existing = await crud.get_first() if not existing: update_data["app_id"] = get_snowflake_id_str(auth.tenant_id) update_data["gateway_url"] = "https://api.qcsj88888.com" result = await crud.create(update_data) log.info(f"开放配置创建成功: 租户ID: {auth.tenant_id}") else: result = await crud.update(existing.id, update_data) log.info(f"开放配置更新成功: 租户ID: {auth.tenant_id}") return OpenConfOutSchema.model_validate(result)