from collections.abc import Sequence from typing import Any, List, Optional from datetime import datetime from decimal import Decimal from sqlalchemy import func, select from sqlalchemy.engine import Result from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_crud import CRUDBase from app.core.exceptions import CustomException from .model import AlipayNotifyLogModel, PayBillModel, PayBillOrderModel, PayBillVoucherModel class AlipayNotifyLogCRUD(CRUDBase[AlipayNotifyLogModel, Any, Any]): """支付宝通知日志 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=AlipayNotifyLogModel, auth=auth) async def get_by_notify_id( self, notify_id: str ) -> AlipayNotifyLogModel | None: return await self.get(notify_id=notify_id) async def update_by_notify_id( self, notify_id: str, data: dict ) -> AlipayNotifyLogModel | None: obj = await self.get(notify_id=notify_id, preload=[]) if not obj: raise CustomException(msg="通知日志不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj class BillCRUD(CRUDBase[PayBillModel, Any, Any]): """账单 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=PayBillModel, auth=auth) async def get_by_pay_no( self, pay_no: str ) -> PayBillModel | None: """根据账单号查询账单""" return await self.get(pay_no=pay_no) async def get_by_enterprise_id( self, enterprise_id: str ) -> Sequence[PayBillModel]: """根据企业ID查询账单列表""" return await self.list({"enterprise_id": enterprise_id}) async def update_by_pay_no( self, pay_no: str, data: dict ) -> PayBillModel | None: """根据账单号更新账单""" obj = await self.get(pay_no=pay_no, preload=[]) if not obj: raise CustomException(msg="账单不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj async def get_consume_amount( self, enterprise_id: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, tenant_id: Optional[int] = None, ) -> Decimal: """统计消费金额:consume_type=CONSUME, status=PROCESSED,SUM consume_amount""" conditions = [ PayBillModel.consume_type == "CONSUME", PayBillModel.status == "PROCESSED", ] if tenant_id: conditions.append(PayBillModel.tenant_id == tenant_id) if enterprise_id: conditions.append(PayBillModel.enterprise_id == enterprise_id) if start_date: conditions.append(PayBillModel.gmt_biz_create >= start_date) if end_date: conditions.append(PayBillModel.gmt_biz_create <= end_date) try: sql = select(func.sum(PayBillModel.consume_amount).label("total_amount")).where( *conditions ) sql = await self.filter_permissions(sql) result: Result = await self.auth.db.execute(sql) return result.scalars().first() or Decimal(0) except Exception as e: raise CustomException(msg=f"列表查询失败: {e!s}") async def create_or_update( self, pay_no: str, data: dict ) -> PayBillModel: """创建或更新账单""" obj = await self.get(pay_no=pay_no, preload=[]) if obj: if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj else: data["pay_no"] = pay_no if "tenant_id" not in data: data["tenant_id"] = self.auth.tenant_id return await self.create(data=data, skip_tenant_id=True) class BillOrderCRUD(CRUDBase[PayBillOrderModel, Any, Any]): """订单 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=PayBillOrderModel, auth=auth) async def get_by_order_no( self, order_no: str ) -> PayBillOrderModel | None: """根据订单号查询订单""" return await self.get(order_no=order_no) async def get_by_pay_no( self, pay_no: str ) -> Sequence[PayBillOrderModel]: """根据账单号查询订单列表""" return await self.list({"pay_no": pay_no}) async def update_by_order_no( self, order_no: str, data: dict ) -> PayBillOrderModel | None: """根据订单号更新订单""" obj = await self.get(order_no=order_no, preload=[]) if not obj: raise CustomException(msg="订单不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj async def create_or_update( self, order_no: str, data: dict ) -> PayBillOrderModel: """创建或更新订单""" obj = await self.get(order_no=order_no, preload=[]) if obj: if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj else: data["order_no"] = order_no if "tenant_id" not in data: data["tenant_id"] = self.auth.tenant_id return await self.create(data=data, skip_tenant_id=True) class BillVoucherCRUD(CRUDBase[PayBillVoucherModel, Any, Any]): """凭证 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: self.auth = auth super().__init__(model=PayBillVoucherModel, auth=auth) async def get_by_voucher_id( self, voucher_id: str ) -> PayBillVoucherModel | None: """根据凭证ID查询凭证""" return await self.get(voucher_id=voucher_id) async def get_by_pay_no( self, pay_no: str ) -> Sequence[PayBillVoucherModel]: """根据账单号查询凭证列表""" return await self.list({"pay_no": pay_no}) async def update_by_voucher_id( self, voucher_id: str, data: dict ) -> PayBillVoucherModel | None: """根据凭证ID更新凭证""" obj = await self.get(voucher_id=voucher_id, preload=[]) if not obj: raise CustomException(msg="凭证不存在") if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj async def create_or_update( self, voucher_id: str, data: dict ) -> PayBillVoucherModel: """创建或更新凭证""" obj = await self.get(voucher_id=voucher_id, preload=[]) if obj: if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) return obj else: data["voucher_id"] = voucher_id if "tenant_id" not in data: data["tenant_id"] = self.auth.tenant_id return await self.create(data=data, skip_tenant_id=True)