| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- from datetime import datetime
- from decimal import Decimal
- from sqlalchemy import func, select
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.base_crud import CRUDBase
- from app.core.exceptions import CustomException
- from typing import TYPE_CHECKING, Optional
- from .model import AccountModel, TransferModel, DepositModel, WithdrawModel
- from .schema import (
- AccountCreateSchema,
- AccountTransferSchema,
- AccountDepositSchema,
- AccountWithdrawSchema,
- )
- if TYPE_CHECKING:
- from sqlalchemy.engine import Result
- class AccountCRUD(CRUDBase[AccountModel, AccountCreateSchema, AccountCreateSchema]):
- """资金专户 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=AccountModel, auth=auth)
- async def get_by_enterprise_id(
- self, enterprise_id: str
- ) -> AccountModel | None:
- return await self.get(enterprise_id=enterprise_id)
- async def get_by_account_book_id(
- self, account_book_id: str
- ) -> AccountModel | None:
- return await self.get(account_book_id=account_book_id)
- async def update_by_enterprise_id(
- self, enterprise_id: str, data: dict
- ) -> AccountModel | None:
- obj = await self.get(enterprise_id=enterprise_id, preload=[])
- if not obj:
- raise CustomException(msg="更新失败!对象不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- verify_obj = await self.get(enterprise_id=enterprise_id, preload=[])
- if not verify_obj:
- raise CustomException(msg="更新失败!对象不存在或无权限访问")
- return obj
- class TransferCRUD(CRUDBase[TransferModel, AccountTransferSchema, AccountTransferSchema]):
- """转账记录 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=TransferModel, auth=auth)
- async def get_by_out_biz_no(
- self, out_biz_no: str
- ) -> TransferModel | None:
- return await self.get(out_biz_no=out_biz_no)
-
- async def get_by_order_no(
- self, order_no: str
- ) -> TransferModel | None:
- return await self.get(order_no=order_no)
- async def update_by_order_no(
- self, order_no: str, data: dict
- ) -> TransferModel | None:
- obj = await self.get(order_no=order_no, preload=[])
- if not obj:
- raise CustomException(msg="转账记录不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
-
- # 统计企业在指定时间范围内的转账总金额以及每天的转账金额。
- async def get_transfer_amount(
- self,
- enterprise_id: Optional[str] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- tenant_id: Optional[int] = None,
- ) -> Decimal:
- conditions = [
- TransferModel.status == "SUCCESS",
- ]
-
- if tenant_id:
- conditions.append(TransferModel.tenant_id == tenant_id)
- if enterprise_id:
- conditions.append(TransferModel.enterprise_id == enterprise_id)
- if start_date:
- conditions.append(TransferModel.created_time >= start_date)
- if end_date:
- conditions.append(TransferModel.created_time <= end_date)
- try:
- # 统计转时间范围内的转账总金额,字段amount
- sql = select(func.sum(TransferModel.amount).label("total_amount")).where(
- *conditions
- )
- sql = await self.filter_permissions(sql)
- result: Result = await self.auth.db.execute(sql)
- return result.scalars().first() or Decimal(0)
- except Exception as e:
- raise CustomException(msg=f"列表查询失败: {e!s}")
- class DepositCRUD(CRUDBase[DepositModel, AccountDepositSchema, AccountDepositSchema]):
- """充值记录 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=DepositModel, auth=auth)
- async def get_by_out_biz_no(
- self, out_biz_no: str
- ) -> DepositModel | None:
- return await self.get(out_biz_no=out_biz_no)
-
- async def get_by_enterprise_id(
- self, enterprise_id: str
- ) -> DepositModel | None:
- return await self.get(enterprise_id=enterprise_id)
- async def update_by_out_biz_no(
- self, out_biz_no: str, data: dict
- ) -> DepositModel | None:
- obj = await self.get(out_biz_no=out_biz_no, preload=[])
- if not obj:
- raise CustomException(msg="充值记录不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- class WithdrawCRUD(CRUDBase[WithdrawModel, AccountWithdrawSchema, AccountWithdrawSchema]):
- """提现记录 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=WithdrawModel, auth=auth)
- async def get_by_out_biz_no(
- self, out_biz_no: str
- ) -> WithdrawModel | None:
- return await self.get(out_biz_no=out_biz_no)
- async def update_by_out_biz_no(
- self, out_biz_no: str, data: dict
- ) -> WithdrawModel | None:
- obj = await self.get(out_biz_no=out_biz_no, preload=[])
- if not obj:
- raise CustomException(msg="提现记录不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
|