|
|
@@ -1,109 +0,0 @@
|
|
|
-import asyncio
|
|
|
-import json
|
|
|
-from datetime import datetime
|
|
|
-
|
|
|
-from app.core.logger import log
|
|
|
-from app.core.redis_crud import RedisCURD
|
|
|
-from redis.asyncio import Redis
|
|
|
-
|
|
|
-from .service import AccountService
|
|
|
-from app.api.v1.module_system.auth.schema import AuthSchema
|
|
|
-
|
|
|
-
|
|
|
-class TransferBatchTask:
|
|
|
- """批量转账后台任务"""
|
|
|
-
|
|
|
- @classmethod
|
|
|
- async def process_batch_transfer(cls):
|
|
|
- """
|
|
|
- 处理批量转账任务
|
|
|
- 从 Redis 队列中取出任务,逐笔处理
|
|
|
- """
|
|
|
- redis: Redis = await RedisCURD.get_redis()
|
|
|
-
|
|
|
- while True:
|
|
|
- try:
|
|
|
- # 从队列中取出任务
|
|
|
- task_data_str = await RedisCURD(redis).rpop("transfer:batch:queue")
|
|
|
- if not task_data_str:
|
|
|
- await asyncio.sleep(1)
|
|
|
- continue
|
|
|
-
|
|
|
- # 解析任务数据
|
|
|
- task_data = json.loads(task_data_str)
|
|
|
- batch_id = task_data["batch_id"]
|
|
|
- tenant_id = task_data["tenant_id"]
|
|
|
- transfers = task_data["transfers"]
|
|
|
-
|
|
|
- log.info(f"开始处理批量转账任务: {batch_id}, 总笔数: {len(transfers)}")
|
|
|
-
|
|
|
- # 更新状态为处理中
|
|
|
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "PROCESSING")
|
|
|
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
|
|
|
-
|
|
|
- # 处理每笔转账
|
|
|
- for transfer_data in transfers:
|
|
|
- out_biz_no = transfer_data.get("out_biz_no")
|
|
|
- if not out_biz_no:
|
|
|
- log.warning(f"转账记录缺少 out_biz_no: {transfer_data}")
|
|
|
- continue
|
|
|
-
|
|
|
- try:
|
|
|
- # 构建 AuthSchema
|
|
|
- auth = AuthSchema(
|
|
|
- tenant_id=tenant_id,
|
|
|
- user_id="system",
|
|
|
- user_name="系统",
|
|
|
- enterprise_id=transfer_data.get("enterprise_id", ""),
|
|
|
- )
|
|
|
-
|
|
|
- # 调用转账服务
|
|
|
- from .schema import AccountTransferSchema
|
|
|
- transfer_schema = AccountTransferSchema(**transfer_data)
|
|
|
- result = await AccountService.transfer_service(auth=auth, data=transfer_schema)
|
|
|
-
|
|
|
- # 记录成功结果
|
|
|
- await RedisCURD(redis).hset(
|
|
|
- f"transfer:batch:result:{batch_id}:{out_biz_no}",
|
|
|
- status="SUCCESS",
|
|
|
- order_no=result.order_no or "",
|
|
|
- fund_order_id=result.fund_order_id or ""
|
|
|
- )
|
|
|
-
|
|
|
- # 更新成功计数
|
|
|
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "success", 1)
|
|
|
- log.info(f"转账成功: {out_biz_no} -> {transfer_data.get('amount')}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- # 记录失败结果
|
|
|
- await RedisCURD(redis).hset(
|
|
|
- f"transfer:batch:result:{batch_id}:{out_biz_no}",
|
|
|
- status="FAIL",
|
|
|
- error_msg=str(e)
|
|
|
- )
|
|
|
-
|
|
|
- # 更新失败计数
|
|
|
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "failed", 1)
|
|
|
- log.error(f"转账失败: {out_biz_no}, 错误: {str(e)}")
|
|
|
-
|
|
|
- finally:
|
|
|
- # 更新已处理计数
|
|
|
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "processed", 1)
|
|
|
-
|
|
|
- # 更新状态为完成
|
|
|
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "COMPLETED")
|
|
|
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
|
|
|
-
|
|
|
- log.info(f"批量转账任务处理完成: {batch_id}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.error(f"处理批量转账任务时出错: {str(e)}")
|
|
|
- await asyncio.sleep(5)
|
|
|
-
|
|
|
-
|
|
|
-async def start_transfer_batch_task():
|
|
|
- """
|
|
|
- 启动批量转账后台任务
|
|
|
- """
|
|
|
- log.info("批量转账后台任务已启动")
|
|
|
- await TransferBatchTask.process_batch_transfer()
|