import asyncio import json from datetime import datetime from app.core.logger import log from app.core.redis_crud import RedisCURD from redis.asyncio import Redis from .service import AccountService from app.api.v1.module_system.auth.schema import AuthSchema class TransferBatchTask: """批量转账后台任务""" @classmethod async def process_batch_transfer(cls): """ 处理批量转账任务 从 Redis 队列中取出任务,逐笔处理 """ redis: Redis = await RedisCURD.get_redis() while True: try: # 从队列中取出任务 task_data_str = await RedisCURD(redis).rpop("transfer:batch:queue") if not task_data_str: await asyncio.sleep(1) continue # 解析任务数据 task_data = json.loads(task_data_str) batch_id = task_data["batch_id"] tenant_id = task_data["tenant_id"] transfers = task_data["transfers"] log.info(f"开始处理批量转账任务: {batch_id}, 总笔数: {len(transfers)}") # 更新状态为处理中 await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "PROCESSING") await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat()) # 处理每笔转账 for transfer_data in transfers: out_biz_no = transfer_data.get("out_biz_no") if not out_biz_no: log.warning(f"转账记录缺少 out_biz_no: {transfer_data}") continue try: # 构建 AuthSchema auth = AuthSchema( tenant_id=tenant_id, user_id="system", user_name="系统", enterprise_id=transfer_data.get("enterprise_id", ""), ) # 调用转账服务 from .schema import AccountTransferSchema transfer_schema = AccountTransferSchema(**transfer_data) result = await AccountService.transfer_service(auth=auth, data=transfer_schema) # 记录成功结果 await RedisCURD(redis).hset( f"transfer:batch:result:{batch_id}:{out_biz_no}", status="SUCCESS", order_no=result.order_no or "", fund_order_id=result.fund_order_id or "" ) # 更新成功计数 await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "success", 1) log.info(f"转账成功: {out_biz_no} -> {transfer_data.get('amount')}") except Exception as e: # 记录失败结果 await RedisCURD(redis).hset( f"transfer:batch:result:{batch_id}:{out_biz_no}", status="FAIL", error_msg=str(e) ) # 更新失败计数 await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "failed", 1) log.error(f"转账失败: {out_biz_no}, 错误: {str(e)}") finally: # 更新已处理计数 await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "processed", 1) # 更新状态为完成 await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "COMPLETED") await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat()) log.info(f"批量转账任务处理完成: {batch_id}") except Exception as e: log.error(f"处理批量转账任务时出错: {str(e)}") await asyncio.sleep(5) async def start_transfer_batch_task(): """ 启动批量转账后台任务 """ log.info("批量转账后台任务已启动") await TransferBatchTask.process_batch_transfer()