Forráskód Böngészése

feat(account): 全量同步接口 - consume.detail.batchquery 拉取所有交易写入 pay_bill + pay_transfer

alphah 2 hete
szülő
commit
64e8fa20cd
1 módosított fájl, 275 hozzáadás és 102 törlés
  1. 275 102
      backend/app/plugin/module_payment/account/service.py

+ 275 - 102
backend/app/plugin/module_payment/account/service.py

@@ -36,6 +36,18 @@ from .schema import (
 from ..openapi.crud import OpenTransferCRUD
 
 
+def _parse_dt(val: str | None) -> datetime | None:
+    """解析支付宝日期字符串"""
+    if not val:
+        return None
+    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
+        try:
+            return datetime.strptime(val, fmt)
+        except ValueError:
+            continue
+    return None
+
+
 class AccountService:
     """资金专户服务层"""
 
@@ -848,130 +860,291 @@ class AccountService:
     async def transfer_sync_all_service(
         cls,
         auth: AuthSchema,
+        start_date: str | None = None,
+        end_date: str | None = None,
     ) -> dict:
         """
-        全量同步转账状态
-        自动查询所有卡在 DEALING 的转账,调用支付宝核实状态
+        全量拉取所有企业的消费/转账记录,同步到 pay_bill / pay_transfer
+        使用 consume.detail.batchquery 分批拉取 + fund.trans.common.query 补全转账详情
         """
-        from sqlalchemy import select, update as sa_update
+        from sqlalchemy import select, update as sa_update, insert as sa_insert
         from app.plugin.module_payment.account.model import TransferModel
         from app.plugin.module_payment.account.enums import TransferStatusEnum
+        from app.plugin.module_payment.notification.model import PayBillModel
+        from datetime import datetime, timedelta
+
+        now = datetime.now()
+        if not end_date:
+            end_date = now.strftime("%Y-%m-%d")
+        if not start_date:
+            start_date = (now - timedelta(days=90)).strftime("%Y-%m-%d")
 
-        # 检查支付宝SDK是否支持转账订单查询API
-        _can_query = False
+        # 检查 consume.detail.batchquery 是否可用
+        _can_batch = False
         try:
-            from alipay.aop.api.request.AlipayCommerceEcTransOrderQueryRequest import (
-                AlipayCommerceEcTransOrderQueryRequest,
+            from alipay.aop.api.request.AlipayCommerceEcConsumeDetailBatchqueryRequest import (
+                AlipayCommerceEcConsumeDetailBatchqueryRequest,
             )
-            from alipay.aop.api.domain.AlipayCommerceEcTransOrderQueryModel import (
-                AlipayCommerceEcTransOrderQueryModel,
+            from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailBatchqueryModel import (
+                AlipayCommerceEcConsumeDetailBatchqueryModel,
             )
-            from alipay.aop.api.response.AlipayCommerceEcTransOrderQueryResponse import (
-                AlipayCommerceEcTransOrderQueryResponse,
+            from alipay.aop.api.response.AlipayCommerceEcConsumeDetailBatchqueryResponse import (
+                AlipayCommerceEcConsumeDetailBatchqueryResponse,
             )
             from app.core.alipay_client import AlipayClient
-            _can_query = True
+            _can_batch = True
         except ImportError:
-            log.warning("支付宝SDK未包含 TransOrderQuery API,将仅列出 DEALING 记录供手动同步")
-
-        client = AlipayClient.get_client() if _can_query else None
-
-        # 直接查库,获取所有 DEALING 转账(不含 out_biz_no 为空的记录)
-        stmt = select(TransferModel).where(
-            TransferModel.status == TransferStatusEnum.DEALING.value,
-            TransferModel.out_biz_no.isnot(None),
-        )
-        stmt = stmt.order_by(TransferModel.id.asc())
-        result = await auth.db.execute(stmt)
-        dealing_transfers = result.scalars().all()
-
-        synced = 0
-        failed = 0
-        total = len(dealing_transfers)
-        details = []
-
-        if not _can_query:
-            # SDK不支持查询API,仅列出记录供参考
-            for transfer in dealing_transfers:
-                details.append({
-                    "out_biz_no": transfer.out_biz_no,
-                    "enterprise_id": transfer.enterprise_id,
-                    "amount": float(transfer.amount) if transfer.amount else 0,
-                    "status": "DEALING",
-                    "hint": "请使用 transfer/sync-status 手动补录",
-                })
+            log.warning("SDK 不支持 consume.detail.batchquery,降级为仅同步 DEALING 转账")
+
+        # 获取所有企业(绕过权限过滤)
+        from app.plugin.module_payment.enterprise.model import EnterpriseModel
+        ent_stmt = select(EnterpriseModel).where(EnterpriseModel.enterprise_id.isnot(None))
+        ent_result = await auth.db.execute(ent_stmt)
+        enterprises = ent_result.scalars().all()
+
+        if not enterprises and not _can_batch:
+            # 降级模式:直接查 DEALING 转账
+            stmt = select(TransferModel).where(
+                TransferModel.status == TransferStatusEnum.DEALING.value,
+                TransferModel.out_biz_no.isnot(None),
+            )
+            result = await auth.db.execute(stmt)
+            dealing = result.scalars().all()
             return {
-                "total": total,
+                "total": len(dealing),
                 "synced": 0,
-                "failed": 0,
-                "details": details,
-                "note": "SDK不支持自动查询,请参考 details 中的 out_biz_no 逐条调用 sync-status 手动同步",
+                "bill_synced": 0,
+                "transfer_synced": 0,
+                "details": [{"out_biz_no": t.out_biz_no, "status": "DEALING", "hint": "手动同步"} for t in dealing],
+                "note": "SDK不支持批量查询,请逐一使用 sync-status 同步",
             }
 
-        for transfer in dealing_transfers:
-            out_biz_no = transfer.out_biz_no
-            eid = transfer.enterprise_id
-            if not out_biz_no or not eid:
-                continue
+        client = AlipayClient.get_client()
 
-            try:
-                query_model = AlipayCommerceEcTransOrderQueryModel()
-                query_model.out_biz_no = out_biz_no
-                query_model.enterprise_id = eid
-
-                request = AlipayCommerceEcTransOrderQueryRequest()
-                request.biz_model = query_model
-
-                response = client.execute(request)
-                if not response:
-                    failed += 1
-                    details.append({"out_biz_no": out_biz_no, "status": "DEALING", "error": "无响应"})
-                    continue
-
-                query_result = AlipayCommerceEcTransOrderQueryResponse()
-                query_result.parse_response_content(response)
-
-                if not query_result.is_success():
-                    failed += 1
-                    err_msg = query_result.sub_msg or query_result.msg
-                    details.append({"out_biz_no": out_biz_no, "status": "DEALING", "error": err_msg})
-                    log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, error={err_msg}")
-                    continue
-
-                alipay_status = getattr(query_result, 'status', None)
-                if alipay_status and alipay_status != TransferStatusEnum.DEALING.value:
-                    update_data = {"status": alipay_status}
-                    order_no = getattr(query_result, 'order_no', None)
-                    fund_order_id = getattr(query_result, 'pay_fund_order_id', None)
-                    if order_no:
-                        update_data["order_no"] = order_no
-                    if fund_order_id:
-                        update_data["fund_order_id"] = fund_order_id
-
-                    upd = sa_update(TransferModel).where(
-                        TransferModel.out_biz_no == out_biz_no
-                    ).values(**update_data)
-                    await auth.db.execute(upd)
-                    synced += 1
-                    details.append({"out_biz_no": out_biz_no, "status": alipay_status, "synced": True})
-                    log.info(f"全量同步 - 已更新: out_biz_no={out_biz_no}, old=DEALING, new={alipay_status}")
-                else:
-                    details.append({"out_biz_no": out_biz_no, "status": "DEALING", "error": "支付宝侧仍为处理中"})
-            except Exception as e:
-                failed += 1
-                details.append({"out_biz_no": out_biz_no, "status": "DEALING", "error": str(e)})
-                log.warning(f"全量同步 - 异常: out_biz_no={out_biz_no}, error={e}")
-
-        if synced > 0 or failed > 0:
+        total_records = 0
+        bill_upserted = 0
+        transfer_upserted = 0
+        transfer_detail_synced = 0
+        errors = 0
+        details = []
+
+        for enterprise in enterprises:
+            eid = enterprise.enterprise_id
+            page_num = 1
+            page_size = 100
+
+            while True:
+                try:
+                    batch_model = AlipayCommerceEcConsumeDetailBatchqueryModel()
+                    batch_model.enterprise_id = eid
+                    batch_model.start_date = start_date
+                    batch_model.end_date = end_date
+                    batch_model.page_num = page_num
+                    batch_model.page_size = page_size
+
+                    request = AlipayCommerceEcConsumeDetailBatchqueryRequest()
+                    request.biz_model = batch_model
+
+                    response = client.execute(request)
+                    if not response:
+                        break
+
+                    batch_result = AlipayCommerceEcConsumeDetailBatchqueryResponse()
+                    batch_result.parse_response_content(response)
+                    if not batch_result.is_success():
+                        errors += 1
+                        break
+
+                    consume_list = getattr(batch_result, 'consume_info_list', None)
+                    if not consume_list:
+                        break
+
+                    for c in consume_list:
+                        total_records += 1
+                        pay_no = getattr(c, 'pay_no', None)
+                        consume_type = getattr(c, 'consume_type', None)
+                        if not pay_no:
+                            continue
+
+                        # — 1) 落库 pay_bill(有则更新,无则新增) —
+                        bill_data = {
+                            "pay_no": pay_no,
+                            "enterprise_id": getattr(c, 'enterprise_id', eid),
+                            "account_id": getattr(c, 'account_id', ''),
+                            "employee_id": getattr(c, 'employee_id', ''),
+                            "consume_type": consume_type or '',
+                            "consume_amount": Decimal(str(getattr(c, 'consume_amount', 0))),
+                            "gmt_biz_create": _parse_dt(getattr(c, 'gmt_biz_create', None)),
+                            "gmt_recieve_pay": _parse_dt(getattr(c, 'gmt_recieve_pay', None)),
+                            "peer_pay_amount": Decimal(str(getattr(c, 'peer_pay_amount', 0))) if getattr(c, 'peer_pay_amount', None) else None,
+                            "notify_reason": getattr(c, 'notify_reason', 'SYNC'),
+                            "notify_msg": getattr(c, 'notify_msg', '全量同步'),
+                            "related_pay_no": getattr(c, 'related_pay_no', None),
+                            "status": "PROCESSED",
+                        }
+
+                        # 检查 pay_bill 是否已存在
+                        bill_stmt = select(PayBillModel).where(PayBillModel.pay_no == pay_no)
+                        bill_exist = (await auth.db.execute(bill_stmt)).scalar_one_or_none()
+                        if bill_exist:
+                            for k, v in bill_data.items():
+                                if k != 'pay_no' and hasattr(bill_exist, k) and v is not None:
+                                    setattr(bill_exist, k, v)
+                        else:
+                            ins = sa_insert(PayBillModel).values(**bill_data)
+                            await auth.db.execute(ins)
+                        bill_upserted += 1
+
+                        # — 2) 如果是转账,同步 pay_transfer —
+                        if consume_type == "TRANSFER":
+                            out_biz_no = getattr(c, 'out_biz_no', None)
+                            if not out_biz_no:
+                                # 尝试从 ext_infos 取
+                                ext = getattr(c, 'ext_infos', None)
+                                if isinstance(ext, dict):
+                                    out_biz_no = ext.get('out_biz_no')
+
+                            if out_biz_no:
+                                # 查 local
+                                tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no)
+                                tf_exist = (await auth.db.execute(tf_stmt)).scalar_one_or_none()
+                                if not tf_exist:
+                                    # 新增转账记录
+                                    ins_tf = sa_insert(TransferModel).values(
+                                        out_biz_no=out_biz_no,
+                                        enterprise_id=eid,
+                                        amount=Decimal(str(getattr(c, 'consume_amount', 0))),
+                                        status=TransferStatusEnum.DEALING.value,
+                                    )
+                                    await auth.db.execute(ins_tf)
+                                    transfer_upserted += 1
+                                    details.append({"pay_no": pay_no, "out_biz_no": out_biz_no, "type": "TRANSFER", "action": "insert"})
+
+                                    # 新记录尝试查转账详情
+                                    try:
+                                        await cls._sync_transfer_detail(auth, out_biz_no, eid)
+                                        transfer_detail_synced += 1
+                                    except Exception as e:
+                                        log.warning(f"查询转账详情失败: out_biz_no={out_biz_no}, err={e}")
+                                else:
+                                    # 已有记录,尝试更新状态
+                                    if tf_exist.status == TransferStatusEnum.DEALING.value:
+                                        try:
+                                            await cls._sync_transfer_detail(auth, out_biz_no, eid)
+                                            transfer_detail_synced += 1
+                                        except Exception as e:
+                                            log.warning(f"查询转账详情失败: out_biz_no={out_biz_no}, err={e}")
+                                    details.append({"pay_no": pay_no, "out_biz_no": out_biz_no, "type": "TRANSFER", "action": "exists"})
+                            else:
+                                details.append({"pay_no": pay_no, "type": "TRANSFER", "action": "skipped", "reason": "no_out_biz_no"})
+                        else:
+                            details.append({"pay_no": pay_no, "type": consume_type, "action": "bill_only"})
+
+                except Exception as e:
+                    errors += 1
+                    log.warning(f"全量同步 - 企业 {eid} 第 {page_num} 页异常: {e}")
+                    break
+
+                # 翻页
+                current_page = getattr(batch_result, 'page_num', 0) or 0
+                total_pages = getattr(batch_result, 'total_page_count', 0) or 0
+                if current_page >= total_pages:
+                    break
+                page_num += 1
+
+        if bill_upserted > 0 or transfer_upserted > 0:
             await auth.db.flush()
 
         return {
-            "total": total,
-            "synced": synced,
-            "failed": failed,
-            "details": details,
+            "total_records": total_records,
+            "bill_upserted": bill_upserted,
+            "transfer_inserted": transfer_upserted,
+            "transfer_detail_synced": transfer_detail_synced,
+            "errors": errors,
+            "date_range": {"start": start_date, "end": end_date},
+            "note": "pay_bill 已同步,TRANSFER 类型记录已写入/更新 pay_transfer",
         }
 
+    @classmethod
+    async def _sync_transfer_detail(
+        cls,
+        auth: AuthSchema,
+        out_biz_no: str,
+        enterprise_id: str,
+    ) -> None:
+        """调用 fund.trans.common.query 查询单笔转账详情并更新本地记录"""
+        from sqlalchemy import update as sa_update
+        from app.plugin.module_payment.account.model import TransferModel
+
+        try:
+            from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import (
+                AlipayFundTransCommonQueryRequest,
+            )
+            from alipay.aop.api.domain.AlipayFundTransCommonQueryModel import (
+                AlipayFundTransCommonQueryModel,
+            )
+            from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import (
+                AlipayFundTransCommonQueryResponse,
+            )
+            from app.core.alipay_client import AlipayClient
+        except ImportError:
+            return
+
+        model = AlipayFundTransCommonQueryModel()
+        model.out_biz_no = out_biz_no
+        model.product_code = "TRANS_ACCOUNT_NO_PWD"
+        model.biz_scene = "DIRECT_TRANSFER"
+
+        request = AlipayFundTransCommonQueryRequest()
+        request.biz_model = model
+
+        client = AlipayClient.get_client()
+        response = client.execute(request)
+
+        if not response:
+            return
+
+        result = AlipayFundTransCommonQueryResponse()
+        result.parse_response_content(response)
+
+        if not result.is_success():
+            return
+
+        alipay_status = getattr(result, 'status', None)
+        if not alipay_status:
+            return
+
+        update_data = {"status": alipay_status}
+        order_no = getattr(result, 'order_id', None)
+        pay_fund_order_id = getattr(result, 'pay_fund_order_id', None)
+        trans_amount = getattr(result, 'trans_amount', None)
+        error_code = getattr(result, 'error_code', None)
+        fail_reason = getattr(result, 'fail_reason', None)
+        pay_date = getattr(result, 'pay_date', None)
+
+        if order_no:
+            update_data["order_no"] = order_no
+        if pay_fund_order_id:
+            update_data["fund_order_id"] = pay_fund_order_id
+        if trans_amount:
+            update_data["amount"] = Decimal(str(trans_amount))
+        if error_code:
+            update_data["error_code"] = error_code
+        if fail_reason:
+            update_data["error_msg"] = fail_reason
+        if pay_date:
+            try:
+                update_data["ext_info"] = {"pay_date": pay_date}
+            except Exception:
+                pass
+
+        if update_data.get("status") != "DEALING":
+            upd = sa_update(TransferModel).where(
+                TransferModel.out_biz_no == out_biz_no
+            ).values(**update_data)
+            await auth.db.execute(upd)
+            log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}")
+
 
     @classmethod
     async def update_transfer_status_service(