Переглянути джерело

feat(account): 到卡转账退避反查 + trans.order.query + APScheduler 定时任务

alphah 1 тиждень тому
батько
коміт
693e82881b

+ 33 - 0
backend/app/alembic/versions/b1c2d3e4f5a6_add_transfer_retry_fields.py

@@ -0,0 +1,33 @@
+"""add transfer retry fields
+
+Revision ID: b1c2d3e4f5a6
+Revises: a1b2c3d4e5f6
+Create Date: 2026-05-23 17:15:00.000000
+
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision: str = 'b1c2d3e4f5a6'
+down_revision: Union[str, None] = 'a1b2c3d4e5f6'
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+    op.add_column('pay_transfer', sa.Column('retry_count', sa.Integer(), nullable=False, server_default='0', comment='反查重试次数'))
+    op.add_column('pay_transfer', sa.Column('next_retry_at', sa.DateTime(), nullable=True, comment='下次反查时间'))
+    op.execute("""
+        CREATE INDEX idx_transfer_retry ON pay_transfer (next_retry_at)
+        WHERE status = 'DEALING' AND retry_count < 4
+    """)
+
+
+def downgrade() -> None:
+    op.execute("DROP INDEX IF EXISTS idx_transfer_retry")
+    op.drop_column('pay_transfer', 'next_retry_at')
+    op.drop_column('pay_transfer', 'retry_count')

+ 33 - 0
backend/app/plugin/module_payment/account/controller.py

@@ -583,3 +583,36 @@ async def receipt_download_controller(
 #     result = await AccountService.get_batch_result_service(auth=auth, batch_id=batch_id)
 #     log.info(f"查询批量转账结果成功: {batch_id}")
 #     return SuccessResponse(data=result, msg="查询批量转账结果成功")
+
+
+# ============ 定时任务:反查转账状态 ============
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+
+scheduler = AsyncIOScheduler()
+
+
+@AccountRouter.on_event("startup")
+async def start_retry_scheduler():
+    """启动时立即执行一次,之后每分钟执行"""
+    log.info("[重试任务] 启动反查定时任务")
+    try:
+        await AccountService.retry_dealing_transfers()
+    except Exception:
+        log.error("[重试任务] 启动首次执行异常", exc_info=True)
+
+    scheduler.add_job(
+        AccountService.retry_dealing_transfers,
+        'interval',
+        minutes=1,
+        max_instances=1,
+        id='retry_dealing_transfers',
+        replace_existing=True,
+    )
+    scheduler.start()
+    log.info("[重试任务] 定时任务已注册(每分钟执行)")
+
+
+@AccountRouter.on_event("shutdown")
+async def stop_retry_scheduler():
+    scheduler.shutdown(wait=False)
+    log.info("[重试任务] 定时任务已停止")

+ 8 - 1
backend/app/plugin/module_payment/account/model.py

@@ -1,7 +1,8 @@
 from datetime import datetime
 from decimal import Decimal
+from typing import Optional
 
-from sqlalchemy import DateTime, String, Text, JSON, Numeric, ForeignKey
+from sqlalchemy import DateTime, String, Text, JSON, Numeric, ForeignKey, Integer
 from sqlalchemy.orm import Mapped, mapped_column, relationship
 
 from app.core.base_model import PaymentModelMixin, TenantMixin, EnterpriseMixin
@@ -101,6 +102,12 @@ class TransferModel(PaymentModelMixin, TenantMixin, EnterpriseMixin):
         JSON, comment="扩展信息"
     )
     remark: Mapped[str | None] = mapped_column(Text, comment="备注")
+    retry_count: Mapped[int] = mapped_column(
+        Integer, default=0, nullable=False, server_default='0', comment="反查重试次数"
+    )
+    next_retry_at: Mapped[Optional[datetime]] = mapped_column(
+        DateTime, nullable=True, comment="下次反查时间"
+    )
 
 
 class DepositModel(PaymentModelMixin, TenantMixin, EnterpriseMixin):

+ 167 - 169
backend/app/plugin/module_payment/account/service.py

@@ -1,4 +1,4 @@
-from datetime import datetime
+from datetime import datetime, timedelta
 from decimal import Decimal
 from typing import Any, Optional
 
@@ -411,6 +411,11 @@ class AccountService:
             "fund_order_id": result.fund_order_id,
             "remark": "",
         }
+
+        if result.status == TransferStatusEnum.DEALING.value:
+            transfer_data["retry_count"] = 0
+            transfer_data["next_retry_at"] = datetime.now() + timedelta(minutes=5)
+
         log.info(f"记录转账: {transfer_data}")
 
         if not result.is_success():
@@ -544,6 +549,10 @@ class AccountService:
             "fund_order_id": result.fund_order_id,
         }
 
+        if result.status == TransferStatusEnum.DEALING.value:
+            transfer_data["retry_count"] = 0
+            transfer_data["next_retry_at"] = datetime.now() + timedelta(minutes=5)
+
         await transfer_crud.create(transfer_data)
 
         return TenantTransferResponse(
@@ -978,39 +987,33 @@ class AccountService:
         auth: AuthSchema,
     ) -> dict:
         """
-        全量同步转账状态
-        尝试调 fund.trans.common.query,如无权限则降级为列出 DEALING 记录供手动同步
+        全量同步转账状态(使用 trans.order.query)
         """
         from sqlalchemy import select
         from app.plugin.module_payment.account.model import TransferModel
         from app.plugin.module_payment.account.enums import TransferStatusEnum
 
         stmt = select(TransferModel).where(
-            TransferModel.out_biz_no.isnot(None),
+            TransferModel.status == TransferStatusEnum.DEALING.value,
         ).order_by(TransferModel.id.asc())
         result = await auth.db.execute(stmt)
-        all_transfers = result.scalars().all()
+        all_transfers = list(result.scalars().all())
 
         synced = 0
         errors = 0
         details = []
-        _has_permission = True
 
         for transfer in all_transfers:
             out_biz_no = transfer.out_biz_no
             eid = transfer.enterprise_id
+            order_no = transfer.order_no or ""
             if not out_biz_no or not eid:
                 continue
-
             try:
-                result = await cls._sync_transfer_detail(auth, out_biz_no, eid)
-                if result is False:
-                    # 两个方案都失败了(无权限),停止全量同步
-                    _has_permission = False
-                    break
-                if isinstance(result, str):
+                new_status = await cls._sync_transfer_detail(auth, out_biz_no, eid, order_no)
+                if new_status:
                     synced += 1
-                    details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": result})
+                    details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": new_status})
                 else:
                     details.append({"out_biz_no": out_biz_no, "status": transfer.status, "action": "no_change"})
             except Exception as e:
@@ -1018,17 +1021,6 @@ class AccountService:
                 details.append({"out_biz_no": out_biz_no, "status": transfer.status, "error": str(e)})
                 log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}")
 
-        if not _has_permission:
-            dealing = [t for t in all_transfers if t.status == TransferStatusEnum.DEALING.value]
-            return {
-                "total": len(all_transfers),
-                "synced": synced,
-                "no_permission": True,
-                "dealing_count": len(dealing),
-                "details": [{"out_biz_no": t.out_biz_no, "status": t.status} for t in dealing],
-                "note": "无法通过支付宝 API 查询转账状态,请在开放平台开通 alipay.fund.trans.common.query 权限,或逐个使用 sync-status 手动补录",
-            }
-
         if synced > 0:
             await auth.db.flush()
 
@@ -1045,176 +1037,182 @@ class AccountService:
         auth: AuthSchema,
         out_biz_no: str,
         enterprise_id: str,
-    ) -> str | bool | None:
-        """查询单笔转账详情并更新本地记录
-        优先调 fund.trans.common.query,无权限时改用 consume.detail.query(用 order_no 当 pay_no 查)
-        返回: 新状态str / False(无权限) / None(失败/无变化)
+        order_no: str = "",
+    ) -> str | None:
+        """调 alipay.commerce.ec.trans.order.query 查询单笔转账状态并更新
+        有 out_biz_no 传 out_biz_no,有 order_no 也传 order_no(两个都传)
+        返回: 新状态str / None(不可查/无变化)
         """
-        from sqlalchemy import select, update as sa_update
+        from sqlalchemy import update as sa_update
         from app.plugin.module_payment.account.model import TransferModel
-
         from app.core.alipay import AlipayClient
 
-        # 先查本地记录
-        tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no)
-        tf_result = await auth.db.execute(tf_stmt)
-        local_transfer = tf_result.scalar_one_or_none()
-
-        # — 方案A: fund.trans.common.query —
         try:
-            from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import (
-                AlipayFundTransCommonQueryRequest,
+            from alipay.aop.api.request.AlipayCommerceEcTransOrderQueryRequest import (
+                AlipayCommerceEcTransOrderQueryRequest,
             )
-            from alipay.aop.api.domain.AlipayFundTransCommonQueryModel import (
-                AlipayFundTransCommonQueryModel,
+            from alipay.aop.api.domain.AlipayCommerceEcTransOrderQueryModel import (
+                AlipayCommerceEcTransOrderQueryModel,
             )
-            from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import (
-                AlipayFundTransCommonQueryResponse,
+            from alipay.aop.api.response.AlipayCommerceEcTransOrderQueryResponse import (
+                AlipayCommerceEcTransOrderQueryResponse,
             )
-
-            model = AlipayFundTransCommonQueryModel()
-            model.out_biz_no = out_biz_no
-            model.product_code = "TRANS_ACCOUNT_NO_PWD"
-            model.biz_scene = "DIRECT_TRANSFER"
-
-            request = AlipayFundTransCommonQueryRequest()
-            request.biz_model = model
-
-            client = AlipayClient.get_client()
-            response = client.execute(request)
-
-            if response:
-                result = AlipayFundTransCommonQueryResponse()
-                result.parse_response_content(response)
-
-                if result.is_success():
-                    alipay_status = getattr(result, 'status', None)
-                    if alipay_status and alipay_status != "DEALING":
-                        return await cls._apply_transfer_update(auth, out_biz_no, result, alipay_status)
-                    return None
-
-                sub_msg = getattr(result, 'sub_msg', '') or ''
-                if '权限' not in sub_msg and 'NO_PERMISSION' not in sub_msg:
-                    return None
-                # 权限不足,继续方案B
         except ImportError:
-            pass
-
-        # — 方案B: consume.detail.query(用 order_no 当 pay_no 查) —
-        order_no = local_transfer.order_no if local_transfer else None
-        if not order_no:
-            log.warning(f"无 order_no 可用于查询: out_biz_no={out_biz_no}")
-            return False
+            log.warning("SDK 不支持 trans.order.query")
+            return None
 
-        try:
-            from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
-                AlipayCommerceEcConsumeDetailQueryRequest,
-            )
-            from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import (
-                AlipayCommerceEcConsumeDetailQueryModel,
-            )
-            from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
-                AlipayCommerceEcConsumeDetailQueryResponse,
-            )
-
-            model = AlipayCommerceEcConsumeDetailQueryModel()
-            model.pay_no = order_no
-            model.enterprise_id = enterprise_id
+        model = AlipayCommerceEcTransOrderQueryModel()
+        model.enterprise_id = enterprise_id
+        if out_biz_no:
+            model.out_biz_no = out_biz_no
+        if order_no:
+            model.order_no = order_no
 
-            request = AlipayCommerceEcConsumeDetailQueryRequest()
-            request.biz_model = model
+        request = AlipayCommerceEcTransOrderQueryRequest()
+        request.biz_model = model
 
-            client = AlipayClient.get_client()
-            response = client.execute(request)
+        client = AlipayClient.get_client()
+        response = client.execute(request)
 
-            if not response:
-                return False
+        if not response:
+            return None
 
-            result = AlipayCommerceEcConsumeDetailQueryResponse()
-            result.parse_response_content(response)
+        result = AlipayCommerceEcTransOrderQueryResponse()
+        result.parse_response_content(response)
 
-            if not result.is_success():
-                sub_code = getattr(result, 'sub_code', '') or ''
-                sub_msg = getattr(result, 'sub_msg', '') or ''
-                # 权限不足
-                if '权限' in sub_msg or 'NO_PERMISSION' in sub_code:
-                    return False
-                log.warning(f"consume.detail.query 查无记录: out_biz_no={out_biz_no}, err={sub_msg}")
+        if not result.is_success():
+            sub_code = getattr(result, 'sub_code', '') or ''
+            sub_msg = getattr(result, 'sub_msg', '') or ''
+            if sub_code == "ORDER_NOT_EXIST":
                 return None
+            log.warning(f"trans.order.query 失败: out_biz_no={out_biz_no}, sub_code={sub_code}, sub_msg={sub_msg}")
+            return None
 
-            consume_info = getattr(result, 'consume_info', None)
-            if not consume_info:
-                return None
+        alipay_status = getattr(result, 'status', None)
+        if not alipay_status or alipay_status == "DEALING":
+            return None
 
-            consume_type = getattr(consume_info, 'consume_type', '')
-            if consume_type != "TRANSFER":
-                return None
+        update_data = {"status": alipay_status}
+        api_order_no = getattr(result, 'order_no', None)
+        api_amount = getattr(result, 'amount', None)
+        api_fund_order_id = getattr(result, 'pay_fund_order_id', None)
 
-            notify_reason = getattr(consume_info, 'notify_reason', '') or ''
-            if 'SUCCESS' in notify_reason.upper():
-                new_status = "SUCCESS"
-            elif 'FAIL' in notify_reason.upper():
-                new_status = "FAIL"
-            else:
-                return None
+        if api_order_no:
+            update_data["order_no"] = api_order_no
+        if api_fund_order_id:
+            update_data["fund_order_id"] = api_fund_order_id
+        if api_amount:
+            update_data["amount"] = Decimal(str(api_amount))
 
-            update_data = {"status": new_status}
-            pay_no = getattr(consume_info, 'pay_no', None)
-            if pay_no and pay_no != order_no:
-                update_data["order_no"] = pay_no
+        upd = sa_update(TransferModel).where(
+            TransferModel.out_biz_no == out_biz_no
+        ).values(**update_data)
+        await auth.db.execute(upd)
+        log.info(f"转账同步(trans.order.query) - out_biz_no={out_biz_no}, status={alipay_status}")
+        return alipay_status
 
-            upd = sa_update(TransferModel).where(
-                TransferModel.out_biz_no == out_biz_no
-            ).values(**update_data)
-            await auth.db.execute(upd)
-            log.info(f"转账同步(consume详情) - out_biz_no={out_biz_no}, status={new_status}")
-            return new_status
 
-        except ImportError:
-            log.warning("consume.detail.query SDK 不可用")
-            return False
-        except Exception as e:
-            log.warning(f"consume.detail.query 异常: out_biz_no={out_biz_no}, err={e}")
-            return False
+    # ─────────── 退避时间映射 ───────────
+    _RETRY_INTERVALS = [5, 10, 30, 60]  # 分钟: retry_count 0→1, 1→2, 2→3, 3 判 FAIL
 
     @classmethod
-    async def _apply_transfer_update(
-        cls,
-        auth: AuthSchema,
-        out_biz_no: str,
-        result: object,
-        alipay_status: str,
-    ) -> str | None:
-        """根据 fund.trans.common.query 结果更新本地记录"""
-        from sqlalchemy import update as sa_update
+    async def retry_dealing_transfers(cls) -> dict:
+        """定时任务:反查处理中的到卡转账结果(DB 锁防多 worker 重复执行)
+        启动时立即执行一次,之后每分钟执行一次。使用独立数据库会话。
+        """
+        from sqlalchemy import select, text
         from app.plugin.module_payment.account.model import TransferModel
+        from app.plugin.module_payment.account.enums import TransferStatusEnum
+        from app.core.database import async_db_session
 
-        update_data = {"status": alipay_status}
-        order_no = getattr(result, 'order_id', None)
-        pay_fund_order_id = getattr(result, 'pay_fund_order_id', None)
-        trans_amount = getattr(result, 'trans_amount', None)
-        error_code = getattr(result, 'error_code', None)
-        fail_reason = getattr(result, 'fail_reason', None)
-
-        if order_no:
-            update_data["order_no"] = order_no
-        if pay_fund_order_id:
-            update_data["fund_order_id"] = pay_fund_order_id
-        if trans_amount:
-            update_data["amount"] = Decimal(str(trans_amount))
-        if error_code:
-            update_data["error_code"] = error_code
-        if fail_reason:
-            update_data["error_msg"] = fail_reason
-
-        if update_data.get("status") != "DEALING":
-            upd = sa_update(TransferModel).where(
-                TransferModel.out_biz_no == out_biz_no
-            ).values(**update_data)
-            await auth.db.execute(upd)
-            log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}")
-            return alipay_status
-        return None
+        db = async_db_session()
+        try:
+            async with db.begin():
+                # 抢 PostgreSQL advisory lock
+                lock = await db.execute(text("SELECT pg_try_advisory_lock(99999)"))
+                if not lock.scalar():
+                    log.debug("[重试任务] 锁已被其他 worker 持有,跳过")
+                    return {"scanned": 0, "processed": 0, "skipped_lock": True}
+
+                try:
+                    log.info("[重试任务] ====== 开始执行 ======")
+                    now = datetime.now()
+
+                    stmt = (
+                        select(TransferModel)
+                        .where(
+                            TransferModel.status == TransferStatusEnum.DEALING.value,
+                            TransferModel.retry_count < 4,
+                            TransferModel.next_retry_at.isnot(None),
+                            TransferModel.next_retry_at <= now,
+                        )
+                        .order_by(TransferModel.next_retry_at.asc())
+                    )
+                    result = await db.execute(stmt)
+                    pending = list(result.scalars().all())
+
+                    if not pending:
+                        return {"scanned": 0, "processed": 0}
+
+                    log.info(f"[重试任务] 扫描到 {len(pending)} 条")
+
+                    processed = 0
+                    for transfer in pending:
+                        out_biz_no = transfer.out_biz_no
+                        enterprise_id = transfer.enterprise_id
+                        order_no = transfer.order_no or ""
+                        retry_count = transfer.retry_count
+
+                        try:
+                            sync_result = await cls._sync_transfer_detail(
+                                AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id),
+                                out_biz_no, enterprise_id, order_no,
+                            )
+                        except Exception as e:
+                            log.warning(f"[重试任务] trans.order.query 异常: out_biz_no={out_biz_no}, err={e}")
+                            continue
+
+                        if sync_result:  # SUCCESS 或 FAIL
+                            transfer.retry_count = 4
+                            transfer.next_retry_at = None
+                            log.info(f"[重试任务] out_biz_no={out_biz_no}, retry={retry_count+1}/4, result={sync_result}, 已更新")
+                            try:
+                                auth = AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id)
+                                await OpenTransferService.open_return_service(auth, order_no or out_biz_no)
+                            except Exception:
+                                pass
+                        else:
+                            if retry_count < 3:
+                                accumulated = sum(cls._RETRY_INTERVALS[:retry_count + 2])
+                                transfer.retry_count = retry_count + 1
+                                transfer.next_retry_at = transfer.created_time + timedelta(minutes=accumulated)
+                                log.info(f"[重试任务] out_biz_no={out_biz_no}, retry={retry_count+1}/4, result=DEALING, 下次={transfer.next_retry_at}")
+                                processed += 1
+                            else:
+                                transfer.status = TransferStatusEnum.FAIL.value
+                                transfer.retry_count = 4
+                                transfer.next_retry_at = None
+                                transfer.error_msg = "反查超时判定为失败"
+                                processed += 1
+                                log.info(f"[重试任务] out_biz_no={out_biz_no}, retry=4/4次DEALING, 超时判FAIL")
+                                try:
+                                    auth = AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id)
+                                    await OpenTransferService.open_return_service(auth, order_no or out_biz_no)
+                                except Exception:
+                                    pass
+
+                    # db.begin() 会自动 commit
+
+                    log.info(f"[重试任务] ====== 执行完成 ====== 处理 {processed} 条,扫描 {len(pending)} 条")
+                    return {"scanned": len(pending), "processed": processed}
+
+                finally:
+                    await db.execute(text("SELECT pg_advisory_unlock(99999)"))
+        except Exception:
+            log.error("[重试任务] 执行异常", exc_info=True)
+            return {"scanned": 0, "processed": 0}
+        finally:
+            await db.close()
 
 
     @classmethod

+ 15 - 0
backend/app/plugin/module_payment/notification/handlers/bill_handler.py

@@ -79,6 +79,8 @@ class BillHandler(BaseHandler[dict]):
             await AccountService.update_transfer_status_service(
                 auth, data.pay_no, "SUCCESS", data.model_dump(exclude_none=True)
             )
+            # 通知到达,清除重试字段(无需再反查)
+            await cls._clear_transfer_retry(auth, data.pay_no)
             await OpenTransferService.open_return_service(auth, data.pay_no)
 
 
@@ -341,6 +343,19 @@ class BillHandler(BaseHandler[dict]):
             
             await voucher_crud.create_or_update(voucher_id, voucher_data)
 
+    @staticmethod
+    async def _clear_transfer_retry(auth: AuthSchema, pay_no: str) -> None:
+        """通知到达时清除重试字段,终止反查"""
+        from sqlalchemy import update as sa_update
+        from app.plugin.module_payment.account.model import TransferModel
+        try:
+            upd = sa_update(TransferModel).where(
+                TransferModel.order_no == pay_no
+            ).values(next_retry_at=None, retry_count=4)
+            await auth.db.execute(upd)
+        except Exception:
+            pass
+
 
 class VoucherHandler(BaseHandler[dict]):
     """凭证变动通知处理器"""