crud.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from sqlalchemy import func, select
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.base_crud import CRUDBase
  6. from app.core.exceptions import CustomException
  7. from typing import TYPE_CHECKING, Optional
  8. from .model import AccountModel, TransferModel, DepositModel, WithdrawModel
  9. from .schema import (
  10. AccountCreateSchema,
  11. AccountTransferSchema,
  12. AccountDepositSchema,
  13. AccountWithdrawSchema,
  14. )
  15. if TYPE_CHECKING:
  16. from sqlalchemy.engine import Result
  17. class AccountCRUD(CRUDBase[AccountModel, AccountCreateSchema, AccountCreateSchema]):
  18. """资金专户 CRUD 操作"""
  19. def __init__(self, auth: AuthSchema) -> None:
  20. self.auth = auth
  21. super().__init__(model=AccountModel, auth=auth)
  22. async def get_by_enterprise_id(
  23. self, enterprise_id: str
  24. ) -> AccountModel | None:
  25. return await self.get(enterprise_id=enterprise_id)
  26. async def get_by_account_book_id(
  27. self, account_book_id: str
  28. ) -> AccountModel | None:
  29. return await self.get(account_book_id=account_book_id)
  30. async def update_by_enterprise_id(
  31. self, enterprise_id: str, data: dict
  32. ) -> AccountModel | None:
  33. obj = await self.get(enterprise_id=enterprise_id, preload=[])
  34. if not obj:
  35. raise CustomException(msg="更新失败!对象不存在")
  36. if self.auth.user and hasattr(obj, "updated_id"):
  37. setattr(obj, "updated_id", self.auth.user.id)
  38. for key, value in data.items():
  39. if hasattr(obj, key):
  40. setattr(obj, key, value)
  41. await self.auth.db.flush()
  42. await self.auth.db.refresh(obj)
  43. verify_obj = await self.get(enterprise_id=enterprise_id, preload=[])
  44. if not verify_obj:
  45. raise CustomException(msg="更新失败!对象不存在或无权限访问")
  46. return obj
  47. class TransferCRUD(CRUDBase[TransferModel, AccountTransferSchema, AccountTransferSchema]):
  48. """转账记录 CRUD 操作"""
  49. def __init__(self, auth: AuthSchema) -> None:
  50. self.auth = auth
  51. super().__init__(model=TransferModel, auth=auth)
  52. async def get_by_out_biz_no(
  53. self, out_biz_no: str
  54. ) -> TransferModel | None:
  55. return await self.get(out_biz_no=out_biz_no)
  56. async def get_by_order_no(
  57. self, order_no: str
  58. ) -> TransferModel | None:
  59. return await self.get(order_no=order_no)
  60. async def update_by_order_no(
  61. self, order_no: str, data: dict
  62. ) -> TransferModel | None:
  63. obj = await self.get(order_no=order_no, preload=[])
  64. if not obj:
  65. raise CustomException(msg="转账记录不存在")
  66. if self.auth.user and hasattr(obj, "updated_id"):
  67. setattr(obj, "updated_id", self.auth.user.id)
  68. for key, value in data.items():
  69. if hasattr(obj, key):
  70. setattr(obj, key, value)
  71. await self.auth.db.flush()
  72. await self.auth.db.refresh(obj)
  73. return obj
  74. # 统计企业在指定时间范围内的转账总金额以及每天的转账金额。
  75. async def get_transfer_amount(
  76. self,
  77. enterprise_id: Optional[str] = None,
  78. start_date: Optional[datetime] = None,
  79. end_date: Optional[datetime] = None,
  80. tenant_id: Optional[int] = None,
  81. payee_type: Optional[str] = None,
  82. ) -> Decimal:
  83. conditions = [
  84. TransferModel.status == "SUCCESS",
  85. ]
  86. if tenant_id:
  87. conditions.append(TransferModel.tenant_id == tenant_id)
  88. if enterprise_id:
  89. conditions.append(TransferModel.enterprise_id == enterprise_id)
  90. if start_date:
  91. conditions.append(TransferModel.created_time >= start_date)
  92. if end_date:
  93. conditions.append(TransferModel.created_time <= end_date)
  94. if payee_type:
  95. conditions.append(TransferModel.payee_type == payee_type)
  96. try:
  97. # 统计转时间范围内的转账总金额,字段amount
  98. sql = select(func.sum(TransferModel.amount).label("total_amount")).where(
  99. *conditions
  100. )
  101. sql = await self.filter_permissions(sql)
  102. result: Result = await self.auth.db.execute(sql)
  103. return result.scalars().first() or Decimal(0)
  104. except Exception as e:
  105. raise CustomException(msg=f"列表查询失败: {e!s}")
  106. class DepositCRUD(CRUDBase[DepositModel, AccountDepositSchema, AccountDepositSchema]):
  107. """充值记录 CRUD 操作"""
  108. def __init__(self, auth: AuthSchema) -> None:
  109. self.auth = auth
  110. super().__init__(model=DepositModel, auth=auth)
  111. async def get_by_out_biz_no(
  112. self, out_biz_no: str
  113. ) -> DepositModel | None:
  114. return await self.get(out_biz_no=out_biz_no)
  115. async def get_by_enterprise_id(
  116. self, enterprise_id: str
  117. ) -> DepositModel | None:
  118. return await self.get(enterprise_id=enterprise_id)
  119. async def update_by_out_biz_no(
  120. self, out_biz_no: str, data: dict
  121. ) -> DepositModel | None:
  122. obj = await self.get(out_biz_no=out_biz_no, preload=[])
  123. if not obj:
  124. raise CustomException(msg="充值记录不存在")
  125. if self.auth.user and hasattr(obj, "updated_id"):
  126. setattr(obj, "updated_id", self.auth.user.id)
  127. for key, value in data.items():
  128. if hasattr(obj, key):
  129. setattr(obj, key, value)
  130. await self.auth.db.flush()
  131. await self.auth.db.refresh(obj)
  132. return obj
  133. class WithdrawCRUD(CRUDBase[WithdrawModel, AccountWithdrawSchema, AccountWithdrawSchema]):
  134. """提现记录 CRUD 操作"""
  135. def __init__(self, auth: AuthSchema) -> None:
  136. self.auth = auth
  137. super().__init__(model=WithdrawModel, auth=auth)
  138. async def get_by_out_biz_no(
  139. self, out_biz_no: str
  140. ) -> WithdrawModel | None:
  141. return await self.get(out_biz_no=out_biz_no)
  142. async def update_by_out_biz_no(
  143. self, out_biz_no: str, data: dict
  144. ) -> WithdrawModel | None:
  145. obj = await self.get(out_biz_no=out_biz_no, preload=[])
  146. if not obj:
  147. raise CustomException(msg="提现记录不存在")
  148. if self.auth.user and hasattr(obj, "updated_id"):
  149. setattr(obj, "updated_id", self.auth.user.id)
  150. for key, value in data.items():
  151. if hasattr(obj, key):
  152. setattr(obj, key, value)
  153. await self.auth.db.flush()
  154. await self.auth.db.refresh(obj)
  155. return obj