Explorar o código

fix(account): sync-all 改为直接查本地 out_biz_no 调 fund.trans.common.query

alphah hai 2 semanas
pai
achega
f1213a59d5
Modificáronse 1 ficheiros con 40 adicións e 185 borrados
  1. 40 185
      backend/app/plugin/module_payment/account/service.py

+ 40 - 185
backend/app/plugin/module_payment/account/service.py

@@ -860,199 +860,52 @@ class AccountService:
     async def transfer_sync_all_service(
         cls,
         auth: AuthSchema,
-        start_date: str | None = None,
-        end_date: str | None = None,
     ) -> dict:
         """
-        全量拉取所有企业的消费/转账记录,同步到 pay_bill / pay_transfer
-        使用 consume.detail.batchquery 分批拉取 + fund.trans.common.query 补全转账详情
+        全量同步转账状态
+        查出本地所有 pay_transfer.out_biz_no,逐个调用 fund.trans.common.query 查询支付宝状态并更新
         """
-        from sqlalchemy import select, update as sa_update, insert as sa_insert
+        from sqlalchemy import select, update as sa_update
         from app.plugin.module_payment.account.model import TransferModel
         from app.plugin.module_payment.account.enums import TransferStatusEnum
-        from app.plugin.module_payment.notification.model import PayBillModel
-        from datetime import datetime, timedelta
 
-        now = datetime.now()
-        if not end_date:
-            end_date = now.strftime("%Y-%m-%d 23:59:59")
-        if not start_date:
-            start_date = (now - timedelta(days=90)).strftime("%Y-%m-%d 00:00:00")
+        # 直接查所有有 out_biz_no 的转账记录(绕过权限过滤)
+        stmt = select(TransferModel).where(
+            TransferModel.out_biz_no.isnot(None),
+        ).order_by(TransferModel.id.asc())
+        result = await auth.db.execute(stmt)
+        all_transfers = result.scalars().all()
 
-        # 检查 jointaccountbill.detail.batchquery 是否可用
-        _can_batch = False
-        try:
-            from alipay.aop.api.request.AlipayCommerceEcJointaccountbillDetailBatchqueryRequest import (
-                AlipayCommerceEcJointaccountbillDetailBatchqueryRequest,
-            )
-            from alipay.aop.api.domain.AlipayCommerceEcJointaccountbillDetailBatchqueryModel import (
-                AlipayCommerceEcJointaccountbillDetailBatchqueryModel,
-            )
-            from alipay.aop.api.response.AlipayCommerceEcJointaccountbillDetailBatchqueryResponse import (
-                AlipayCommerceEcJointaccountbillDetailBatchqueryResponse,
-            )
-            _can_batch = True
-        except ImportError:
-            log.warning("SDK 不支持 jointaccountbill.detail.batchquery,降级为仅同步 DEALING 转账")
-
-        from app.core.alipay import AlipayClient
-
-        # 获取所有企业(绕过权限过滤)
-        from app.plugin.module_payment.enterprise.model import EnterpriseModel
-        ent_stmt = select(EnterpriseModel).where(EnterpriseModel.enterprise_id.isnot(None))
-        ent_result = await auth.db.execute(ent_stmt)
-        enterprises = ent_result.scalars().all()
-
-        if not _can_batch:
-            # 降级模式:直接查 DEALING 转账
-            stmt = select(TransferModel).where(
-                TransferModel.status == TransferStatusEnum.DEALING.value,
-                TransferModel.out_biz_no.isnot(None),
-            )
-            result = await auth.db.execute(stmt)
-            dealing = result.scalars().all()
-            return {
-                "total": len(dealing),
-                "synced": 0,
-                "bill_synced": 0,
-                "transfer_synced": 0,
-                "details": [{"out_biz_no": t.out_biz_no, "status": "DEALING", "hint": "手动同步"} for t in dealing],
-                "note": "SDK不支持 jointaccountbill 批量查询,请逐一使用 sync-status 同步",
-            }
-
-        client = AlipayClient.get_client()
-
-        total_records = 0
-        bill_upserted = 0
-        transfer_upserted = 0
-        transfer_detail_synced = 0
+        synced = 0
         errors = 0
         details = []
 
-        for enterprise in enterprises:
-            eid = enterprise.enterprise_id
-            page_num = 1
-            page_size = 100
-
-            while True:
-                try:
-                    batch_model = AlipayCommerceEcJointaccountbillDetailBatchqueryModel()
-                    batch_model.enterprise_id = eid
-                    batch_model.biz_scene = "ISV_DEFAULT"
-                    batch_model.start_date = start_date
-                    batch_model.end_date = end_date
-                    batch_model.page_num = page_num
-                    batch_model.page_size = page_size
-
-                    request = AlipayCommerceEcJointaccountbillDetailBatchqueryRequest()
-                    request.biz_model = batch_model
-
-                    response = client.execute(request)
-                    if not response:
-                        break
-
-                    batch_result = AlipayCommerceEcJointaccountbillDetailBatchqueryResponse()
-                    batch_result.parse_response_content(response)
-                    if not batch_result.is_success():
-                        errors += 1
-                        err_code = getattr(batch_result, 'sub_code', None) or getattr(batch_result, 'code', '')
-                        err_msg = getattr(batch_result, 'sub_msg', None) or getattr(batch_result, 'msg', '')
-                        log.warning(f"全量同步 - 企业 {eid} 第 {page_num} 页查询失败: [{err_code}] {err_msg}")
-                        break
-
-                    bill_list = getattr(batch_result, 'bill_list', None)
-                    if not bill_list:
-                        break
-
-                    for c in bill_list:
-                        total_records += 1
-                        bill_no = getattr(c, 'bill_no', None)
-                        biz_out_no = getattr(c, 'biz_out_no', None)  # 外部业务号 = out_biz_no(转账时存在)
-                        if not bill_no:
-                            continue
-
-                        # — 1) 落库 pay_bill(有则更新,无则新增) —
-                        bill_data = {
-                            "pay_no": bill_no,
-                            "enterprise_id": getattr(c, 'enterprise_id', eid),
-                            "account_id": "",
-                            "employee_id": "",
-                            "consume_type": "TRANSFER" if biz_out_no else "CONSUME",
-                            "consume_amount": Decimal(str(getattr(c, 'amount', 0))),
-                            "gmt_biz_create": _parse_dt(getattr(c, 'biz_date', None)),
-                            "notify_reason": "SYNC",
-                            "notify_msg": getattr(c, 'title', '全量同步'),
-                            "status": "PROCESSED",
-                        }
-
-                        # 检查 pay_bill 是否已存在
-                        bill_stmt = select(PayBillModel).where(PayBillModel.pay_no == bill_no)
-                        bill_exist = (await auth.db.execute(bill_stmt)).scalar_one_or_none()
-                        if bill_exist:
-                            for k, v in bill_data.items():
-                                if k != 'pay_no' and hasattr(bill_exist, k) and v is not None:
-                                    setattr(bill_exist, k, v)
-                        else:
-                            ins = sa_insert(PayBillModel).values(**bill_data)
-                            await auth.db.execute(ins)
-                        bill_upserted += 1
-
-                        # — 2) 如果有 biz_out_no,说明是转账,同步 pay_transfer —
-                        if biz_out_no:
-                            # 查 local
-                            tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == biz_out_no)
-                            tf_exist = (await auth.db.execute(tf_stmt)).scalar_one_or_none()
-                            if not tf_exist:
-                                # 新增转账记录
-                                ins_tf = sa_insert(TransferModel).values(
-                                    out_biz_no=biz_out_no,
-                                    enterprise_id=eid,
-                                    amount=Decimal(str(getattr(c, 'amount', 0))),
-                                    status=TransferStatusEnum.DEALING.value,
-                                )
-                                await auth.db.execute(ins_tf)
-                                transfer_upserted += 1
-                                details.append({"pay_no": bill_no, "out_biz_no": biz_out_no, "type": "TRANSFER", "action": "insert"})
-
-                                # 新记录尝试查转账详情
-                                try:
-                                    await cls._sync_transfer_detail(auth, biz_out_no, eid)
-                                    transfer_detail_synced += 1
-                                except Exception as e:
-                                    log.warning(f"查询转账详情失败: out_biz_no={biz_out_no}, err={e}")
-                            else:
-                                # 已有记录,尝试更新状态
-                                if tf_exist.status == TransferStatusEnum.DEALING.value:
-                                    try:
-                                        await cls._sync_transfer_detail(auth, biz_out_no, eid)
-                                        transfer_detail_synced += 1
-                                    except Exception as e:
-                                        log.warning(f"查询转账详情失败: out_biz_no={biz_out_no}, err={e}")
-                                details.append({"pay_no": bill_no, "out_biz_no": biz_out_no, "type": "TRANSFER", "action": "exists"})
-                        else:
-                            details.append({"pay_no": bill_no, "type": "CONSUME", "action": "bill_only"})
-
-                except Exception as e:
-                    errors += 1
-                    log.warning(f"全量同步 - 企业 {eid} 第 {page_num} 页异常: {e}", exc_info=True)
-                    break
-
-                # 翻页:用 has_next_page 判断
-                if not getattr(batch_result, 'has_next_page', False):
-                    break
-                page_num += 1
-
-        if bill_upserted > 0 or transfer_upserted > 0:
+        for transfer in all_transfers:
+            out_biz_no = transfer.out_biz_no
+            eid = transfer.enterprise_id
+            if not out_biz_no or not eid:
+                continue
+
+            try:
+                result = await cls._sync_transfer_detail(auth, out_biz_no, eid)
+                if result:
+                    synced += 1
+                    details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": result})
+                else:
+                    details.append({"out_biz_no": out_biz_no, "status": transfer.status, "action": "no_change"})
+            except Exception as e:
+                errors += 1
+                details.append({"out_biz_no": out_biz_no, "status": transfer.status, "error": str(e)})
+                log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}")
+
+        if synced > 0:
             await auth.db.flush()
 
         return {
-            "total_records": total_records,
-            "bill_upserted": bill_upserted,
-            "transfer_inserted": transfer_upserted,
-            "transfer_detail_synced": transfer_detail_synced,
+            "total": len(all_transfers),
+            "synced": synced,
             "errors": errors,
-            "date_range": {"start": start_date, "end": end_date},
-            "note": "使用 jointaccountbill.detail.batchquery 拉取,pay_bill 已同步,TRANSFER 已写入 pay_transfer",
+            "details": details,
         }
 
     @classmethod
@@ -1061,8 +914,8 @@ class AccountService:
         auth: AuthSchema,
         out_biz_no: str,
         enterprise_id: str,
-    ) -> None:
-        """调用 fund.trans.common.query 查询单笔转账详情并更新本地记录"""
+    ) -> str | None:
+        """调用 fund.trans.common.query 查询单笔转账详情并更新本地记录,返回新状态"""
         from sqlalchemy import update as sa_update
         from app.plugin.module_payment.account.model import TransferModel
 
@@ -1078,7 +931,7 @@ class AccountService:
                 AlipayFundTransCommonQueryResponse,
             )
         except ImportError:
-            return
+            return None
 
         model = AlipayFundTransCommonQueryModel()
         model.out_biz_no = out_biz_no
@@ -1092,17 +945,17 @@ class AccountService:
         response = client.execute(request)
 
         if not response:
-            return
+            return None
 
         result = AlipayFundTransCommonQueryResponse()
         result.parse_response_content(response)
 
         if not result.is_success():
-            return
+            return None
 
         alipay_status = getattr(result, 'status', None)
         if not alipay_status:
-            return
+            return None
 
         update_data = {"status": alipay_status}
         order_no = getattr(result, 'order_id', None)
@@ -1134,6 +987,8 @@ class AccountService:
             ).values(**update_data)
             await auth.db.execute(upd)
             log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}")
+            return alipay_status
+        return None
 
 
     @classmethod