ソースを参照

fix(account): sync-all 增加 consume.detail.query 回退方案查询转账状态

alphah 2 週間 前
コミット
dee256cd3a
1 ファイル変更125 行追加36 行削除
  1. 125 36
      backend/app/plugin/module_payment/account/service.py

+ 125 - 36
backend/app/plugin/module_payment/account/service.py

@@ -889,10 +889,10 @@ class AccountService:
             try:
                 result = await cls._sync_transfer_detail(auth, out_biz_no, eid)
                 if result is False:
-                    # fund.trans.common.query 无权限
+                    # 两个方案都失败了(无权限),停止全量同步
                     _has_permission = False
                     break
-                if result:
+                if isinstance(result, str):
                     synced += 1
                     details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": result})
                 else:
@@ -903,14 +903,14 @@ class AccountService:
                 log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}")
 
         if not _has_permission:
-            # 无权限时降级为列出 DEALING 记录
             dealing = [t for t in all_transfers if t.status == TransferStatusEnum.DEALING.value]
             return {
                 "total": len(all_transfers),
-                "synced": 0,
+                "synced": synced,
+                "no_permission": True,
                 "dealing_count": len(dealing),
-                "details": [{"out_biz_no": t.out_biz_no, "status": "DEALING"} for t in dealing],
-                "note": "fund.trans.common.query 权限,请在支付宝开放平台开通接口权限后重试,或逐个使用 sync-status 手动补录",
+                "details": [{"out_biz_no": t.out_biz_no, "status": t.status} for t in dealing],
+                "note": "无法通过支付宝 API 查询转账状态,请在开放平台开通 alipay.fund.trans.common.query 权限,或逐个使用 sync-status 手动补录",
             }
 
         if synced > 0:
@@ -930,11 +930,22 @@ class AccountService:
         out_biz_no: str,
         enterprise_id: str,
     ) -> str | bool | None:
-        """调用 fund.trans.common.query 查询单笔转账详情并更新本地记录,返回新状态/False(无权限)/None(其他失败)"""
-        from sqlalchemy import update as sa_update
+        """查询单笔转账详情并更新本地记录
+        优先调 fund.trans.common.query,无权限时改用 consume.detail.query(用 fund_order_id 查)
+        返回: 新状态str / False(无权限) / None(失败/无变化)
+        """
+        from sqlalchemy import select, update as sa_update
         from app.plugin.module_payment.account.model import TransferModel
 
         from app.core.alipay import AlipayClient
+
+        # 先查本地记录,获取 fund_order_id
+        tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no)
+        tf_result = await auth.db.execute(tf_stmt)
+        local_transfer = tf_result.scalar_one_or_none()
+        fund_order_id = local_transfer.fund_order_id if local_transfer else None
+
+        # — 方案A: fund.trans.common.query —
         try:
             from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import (
                 AlipayFundTransCommonQueryRequest,
@@ -945,37 +956,121 @@ class AccountService:
             from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import (
                 AlipayFundTransCommonQueryResponse,
             )
+
+            model = AlipayFundTransCommonQueryModel()
+            model.out_biz_no = out_biz_no
+            model.product_code = "TRANS_ACCOUNT_NO_PWD"
+            model.biz_scene = "DIRECT_TRANSFER"
+
+            request = AlipayFundTransCommonQueryRequest()
+            request.biz_model = model
+
+            client = AlipayClient.get_client()
+            response = client.execute(request)
+
+            if response:
+                result = AlipayFundTransCommonQueryResponse()
+                result.parse_response_content(response)
+
+                if result.is_success():
+                    alipay_status = getattr(result, 'status', None)
+                    if alipay_status and alipay_status != "DEALING":
+                        return await cls._apply_transfer_update(auth, out_biz_no, result, alipay_status)
+                    return None
+
+                sub_msg = getattr(result, 'sub_msg', '') or ''
+                if '权限' not in sub_msg and 'NO_PERMISSION' not in sub_msg:
+                    return None
+                # 权限不足,继续方案B
         except ImportError:
-            return None
+            pass
 
-        model = AlipayFundTransCommonQueryModel()
-        model.out_biz_no = out_biz_no
-        model.product_code = "TRANS_ACCOUNT_NO_PWD"
-        model.biz_scene = "DIRECT_TRANSFER"
+        # — 方案B: consume.detail.query(用 fund_order_id 当 pay_no 查) —
+        if not fund_order_id:
+            log.warning(f"无 fund_order_id 可用于查询: out_biz_no={out_biz_no}")
+            return False
 
-        request = AlipayFundTransCommonQueryRequest()
-        request.biz_model = model
+        try:
+            from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
+                AlipayCommerceEcConsumeDetailQueryRequest,
+            )
+            from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import (
+                AlipayCommerceEcConsumeDetailQueryModel,
+            )
+            from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
+                AlipayCommerceEcConsumeDetailQueryResponse,
+            )
 
-        client = AlipayClient.get_client()
-        response = client.execute(request)
+            model = AlipayCommerceEcConsumeDetailQueryModel()
+            model.pay_no = fund_order_id
+            model.enterprise_id = enterprise_id
 
-        if not response:
-            return None
+            request = AlipayCommerceEcConsumeDetailQueryRequest()
+            request.biz_model = model
 
-        result = AlipayFundTransCommonQueryResponse()
-        result.parse_response_content(response)
+            client = AlipayClient.get_client()
+            response = client.execute(request)
 
-        if not result.is_success():
-            sub_code = getattr(result, 'sub_code', '') or ''
-            sub_msg = getattr(result, 'sub_msg', '') or ''
-            if '权限' in sub_msg or 'NO_PERMISSION' in sub_code:
-                log.warning(f"fund.trans.common.query 无权限: out_biz_no={out_biz_no}")
+            if not response:
                 return False
-            return None
 
-        alipay_status = getattr(result, 'status', None)
-        if not alipay_status:
-            return None
+            result = AlipayCommerceEcConsumeDetailQueryResponse()
+            result.parse_response_content(response)
+
+            if not result.is_success():
+                sub_msg = getattr(result, 'sub_msg', '') or ''
+                log.warning(f"consume.detail.query 查询失败: out_biz_no={out_biz_no}, err={sub_msg}")
+                return False
+
+            consume_info = getattr(result, 'consume_info', None)
+            if not consume_info:
+                return False
+
+            # 从消费详情中提取转账状态
+            consume_type = getattr(consume_info, 'consume_type', '')
+            if consume_type != "TRANSFER":
+                return None
+
+            # consume.detail.query 返回的 consume_info 没有直接的状态字段
+            # 但如果有 notify_reason 可以判断
+            notify_reason = getattr(consume_info, 'notify_reason', '')
+            if 'SUCCESS' in notify_reason.upper():
+                new_status = "SUCCESS"
+            elif 'FAIL' in notify_reason.upper():
+                new_status = "FAIL"
+            else:
+                return None
+
+            update_data = {"status": new_status}
+            pay_no = getattr(consume_info, 'pay_no', None)
+            if pay_no:
+                update_data["order_no"] = pay_no
+
+            upd = sa_update(TransferModel).where(
+                TransferModel.out_biz_no == out_biz_no
+            ).values(**update_data)
+            await auth.db.execute(upd)
+            log.info(f"转账同步(consume详情) - out_biz_no={out_biz_no}, status={new_status}")
+            return new_status
+
+        except ImportError:
+            log.warning("consume.detail.query 不可用")
+            return False
+        except Exception as e:
+            log.warning(f"consume.detail.query 异常: out_biz_no={out_biz_no}, err={e}")
+            return False
+
+    @classmethod
+    async def _apply_transfer_update(
+        cls,
+        auth: AuthSchema,
+        out_biz_no: str,
+        result: object,
+        alipay_status: str,
+    ) -> str | None:
+        """根据 fund.trans.common.query 结果更新本地记录"""
+        from sqlalchemy import update as sa_update
+        from app.plugin.module_payment.account.model import TransferModel
 
         update_data = {"status": alipay_status}
         order_no = getattr(result, 'order_id', None)
@@ -983,7 +1078,6 @@ class AccountService:
         trans_amount = getattr(result, 'trans_amount', None)
         error_code = getattr(result, 'error_code', None)
         fail_reason = getattr(result, 'fail_reason', None)
-        pay_date = getattr(result, 'pay_date', None)
 
         if order_no:
             update_data["order_no"] = order_no
@@ -995,11 +1089,6 @@ class AccountService:
             update_data["error_code"] = error_code
         if fail_reason:
             update_data["error_msg"] = fail_reason
-        if pay_date:
-            try:
-                update_data["ext_info"] = {"pay_date": pay_date}
-            except Exception:
-                pass
 
         if update_data.get("status") != "DEALING":
             upd = sa_update(TransferModel).where(