| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import asyncio
- import json
- from datetime import datetime
- from app.core.logger import log
- from app.core.redis_crud import RedisCURD
- from redis.asyncio import Redis
- from .service import AccountService
- from app.api.v1.module_system.auth.schema import AuthSchema
- class TransferBatchTask:
- """批量转账后台任务"""
- @classmethod
- async def process_batch_transfer(cls):
- """
- 处理批量转账任务
- 从 Redis 队列中取出任务,逐笔处理
- """
- redis: Redis = await RedisCURD.get_redis()
- while True:
- try:
- # 从队列中取出任务
- task_data_str = await RedisCURD(redis).rpop("transfer:batch:queue")
- if not task_data_str:
- await asyncio.sleep(1)
- continue
- # 解析任务数据
- task_data = json.loads(task_data_str)
- batch_id = task_data["batch_id"]
- tenant_id = task_data["tenant_id"]
- transfers = task_data["transfers"]
- log.info(f"开始处理批量转账任务: {batch_id}, 总笔数: {len(transfers)}")
- # 更新状态为处理中
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "PROCESSING")
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
- # 处理每笔转账
- for transfer_data in transfers:
- out_biz_no = transfer_data.get("out_biz_no")
- if not out_biz_no:
- log.warning(f"转账记录缺少 out_biz_no: {transfer_data}")
- continue
- try:
- # 构建 AuthSchema
- auth = AuthSchema(
- tenant_id=tenant_id,
- user_id="system",
- user_name="系统",
- enterprise_id=transfer_data.get("enterprise_id", ""),
- )
- # 调用转账服务
- from .schema import AccountTransferSchema
- transfer_schema = AccountTransferSchema(**transfer_data)
- result = await AccountService.transfer_service(auth=auth, data=transfer_schema)
- # 记录成功结果
- await RedisCURD(redis).hset(
- f"transfer:batch:result:{batch_id}:{out_biz_no}",
- status="SUCCESS",
- order_no=result.order_no or "",
- fund_order_id=result.fund_order_id or ""
- )
- # 更新成功计数
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "success", 1)
- log.info(f"转账成功: {out_biz_no} -> {transfer_data.get('amount')}")
- except Exception as e:
- # 记录失败结果
- await RedisCURD(redis).hset(
- f"transfer:batch:result:{batch_id}:{out_biz_no}",
- status="FAIL",
- error_msg=str(e)
- )
- # 更新失败计数
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "failed", 1)
- log.error(f"转账失败: {out_biz_no}, 错误: {str(e)}")
- finally:
- # 更新已处理计数
- await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "processed", 1)
- # 更新状态为完成
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "COMPLETED")
- await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
- log.info(f"批量转账任务处理完成: {batch_id}")
- except Exception as e:
- log.error(f"处理批量转账任务时出错: {str(e)}")
- await asyncio.sleep(5)
- async def start_transfer_batch_task():
- """
- 启动批量转账后台任务
- """
- log.info("批量转账后台任务已启动")
- await TransferBatchTask.process_batch_transfer()
|