tasks.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import asyncio
  2. import json
  3. from datetime import datetime
  4. from app.core.logger import log
  5. from app.core.redis_crud import RedisCURD
  6. from redis.asyncio import Redis
  7. from .service import AccountService
  8. from app.api.v1.module_system.auth.schema import AuthSchema
  9. class TransferBatchTask:
  10. """批量转账后台任务"""
  11. @classmethod
  12. async def process_batch_transfer(cls):
  13. """
  14. 处理批量转账任务
  15. 从 Redis 队列中取出任务,逐笔处理
  16. """
  17. redis: Redis = await RedisCURD.get_redis()
  18. while True:
  19. try:
  20. # 从队列中取出任务
  21. task_data_str = await RedisCURD(redis).rpop("transfer:batch:queue")
  22. if not task_data_str:
  23. await asyncio.sleep(1)
  24. continue
  25. # 解析任务数据
  26. task_data = json.loads(task_data_str)
  27. batch_id = task_data["batch_id"]
  28. tenant_id = task_data["tenant_id"]
  29. transfers = task_data["transfers"]
  30. log.info(f"开始处理批量转账任务: {batch_id}, 总笔数: {len(transfers)}")
  31. # 更新状态为处理中
  32. await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "PROCESSING")
  33. await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
  34. # 处理每笔转账
  35. for transfer_data in transfers:
  36. out_biz_no = transfer_data.get("out_biz_no")
  37. if not out_biz_no:
  38. log.warning(f"转账记录缺少 out_biz_no: {transfer_data}")
  39. continue
  40. try:
  41. # 构建 AuthSchema
  42. auth = AuthSchema(
  43. tenant_id=tenant_id,
  44. user_id="system",
  45. user_name="系统",
  46. enterprise_id=transfer_data.get("enterprise_id", ""),
  47. )
  48. # 调用转账服务
  49. from .schema import AccountTransferSchema
  50. transfer_schema = AccountTransferSchema(**transfer_data)
  51. result = await AccountService.transfer_service(auth=auth, data=transfer_schema)
  52. # 记录成功结果
  53. await RedisCURD(redis).hset(
  54. f"transfer:batch:result:{batch_id}:{out_biz_no}",
  55. status="SUCCESS",
  56. order_no=result.order_no or "",
  57. fund_order_id=result.fund_order_id or ""
  58. )
  59. # 更新成功计数
  60. await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "success", 1)
  61. log.info(f"转账成功: {out_biz_no} -> {transfer_data.get('amount')}")
  62. except Exception as e:
  63. # 记录失败结果
  64. await RedisCURD(redis).hset(
  65. f"transfer:batch:result:{batch_id}:{out_biz_no}",
  66. status="FAIL",
  67. error_msg=str(e)
  68. )
  69. # 更新失败计数
  70. await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "failed", 1)
  71. log.error(f"转账失败: {out_biz_no}, 错误: {str(e)}")
  72. finally:
  73. # 更新已处理计数
  74. await RedisCURD(redis).hincrby(f"transfer:batch:status:{batch_id}", "processed", 1)
  75. # 更新状态为完成
  76. await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "status", "COMPLETED")
  77. await RedisCURD(redis).hset(f"transfer:batch:status:{batch_id}", "updated_at", datetime.now().isoformat())
  78. log.info(f"批量转账任务处理完成: {batch_id}")
  79. except Exception as e:
  80. log.error(f"处理批量转账任务时出错: {str(e)}")
  81. await asyncio.sleep(5)
  82. async def start_transfer_batch_task():
  83. """
  84. 启动批量转账后台任务
  85. """
  86. log.info("批量转账后台任务已启动")
  87. await TransferBatchTask.process_batch_transfer()