from datetime import datetime from decimal import Decimal from sqlalchemy import func, select from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_crud import CRUDBase from app.core.exceptions import CustomException from typing import TYPE_CHECKING, Optional from .model import AccountModel, TransferModel, DepositModel, WithdrawModel from .schema import ( AccountCreateSchema, AccountTransferSchema, AccountDepositSchema, AccountWithdrawSchema, ) if TYPE_CHECKING: from sqlalchemy.engine import Result class AccountCRUD(CRUDBase[AccountModel, AccountCreateSchema, AccountCreateSchema]): """资金专户 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=AccountModel, auth=auth) async def get_by_enterprise_id( self, enterprise_id: str ) -> AccountModel | None: return await self.get(enterprise_id=enterprise_id) async def get_by_account_book_id( self, account_book_id: str ) -> AccountModel | None: return await self.get(account_book_id=account_book_id) async def update_by_enterprise_id( self, enterprise_id: str, data: dict ) -> AccountModel | None: obj = await self.get(enterprise_id=enterprise_id, preload=[]) if not obj: raise CustomException(msg="更新失败!对象不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) verify_obj = await self.get(enterprise_id=enterprise_id, preload=[]) if not verify_obj: raise CustomException(msg="更新失败!对象不存在或无权限访问") return obj class TransferCRUD(CRUDBase[TransferModel, AccountTransferSchema, AccountTransferSchema]): """转账记录 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=TransferModel, auth=auth) async def get_by_out_biz_no( self, out_biz_no: str ) -> TransferModel | None: return await self.get(out_biz_no=out_biz_no) async def get_by_order_no( self, order_no: str ) -> TransferModel | None: return await self.get(order_no=order_no) async def update_by_order_no( self, order_no: str, data: dict ) -> TransferModel | None: obj = await self.get(order_no=order_no, preload=[]) if not obj: raise CustomException(msg="转账记录不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj # 统计企业在指定时间范围内的转账总金额以及每天的转账金额。 async def get_transfer_amount( self, enterprise_id: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, tenant_id: Optional[int] = None, ) -> Decimal: conditions = [ TransferModel.status == "SUCCESS", ] if tenant_id: conditions.append(TransferModel.tenant_id == tenant_id) if enterprise_id: conditions.append(TransferModel.enterprise_id == enterprise_id) if start_date: conditions.append(TransferModel.created_time >= start_date) if end_date: conditions.append(TransferModel.created_time <= end_date) try: # 统计转时间范围内的转账总金额,字段amount sql = select(func.sum(TransferModel.amount).label("total_amount")).where( *conditions ) sql = await self.filter_permissions(sql) result: Result = await self.auth.db.execute(sql) return result.scalars().first() or Decimal(0) except Exception as e: raise CustomException(msg=f"列表查询失败: {e!s}") class DepositCRUD(CRUDBase[DepositModel, AccountDepositSchema, AccountDepositSchema]): """充值记录 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=DepositModel, auth=auth) async def get_by_out_biz_no( self, out_biz_no: str ) -> DepositModel | None: return await self.get(out_biz_no=out_biz_no) async def get_by_enterprise_id( self, enterprise_id: str ) -> DepositModel | None: return await self.get(enterprise_id=enterprise_id) async def update_by_out_biz_no( self, out_biz_no: str, data: dict ) -> DepositModel | None: obj = await self.get(out_biz_no=out_biz_no, preload=[]) if not obj: raise CustomException(msg="充值记录不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj class WithdrawCRUD(CRUDBase[WithdrawModel, AccountWithdrawSchema, AccountWithdrawSchema]): """提现记录 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=WithdrawModel, auth=auth) async def get_by_out_biz_no( self, out_biz_no: str ) -> WithdrawModel | None: return await self.get(out_biz_no=out_biz_no) async def update_by_out_biz_no( self, out_biz_no: str, data: dict ) -> WithdrawModel | None: obj = await self.get(out_biz_no=out_biz_no, preload=[]) if not obj: raise CustomException(msg="提现记录不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj