| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- from collections.abc import Sequence
- from typing import Any, List, Optional
- from datetime import datetime
- from decimal import Decimal
- from sqlalchemy import func, select
- from sqlalchemy.engine import Result
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.base_crud import CRUDBase
- from app.core.exceptions import CustomException
- from .model import AlipayNotifyLogModel, PayBillModel, PayBillOrderModel, PayBillVoucherModel
- class AlipayNotifyLogCRUD(CRUDBase[AlipayNotifyLogModel, Any, Any]):
- """支付宝通知日志 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=AlipayNotifyLogModel, auth=auth)
- async def get_by_notify_id(
- self, notify_id: str
- ) -> AlipayNotifyLogModel | None:
- return await self.get(notify_id=notify_id)
- async def update_by_notify_id(
- self, notify_id: str, data: dict
- ) -> AlipayNotifyLogModel | None:
- obj = await self.get(notify_id=notify_id, preload=[])
- if not obj:
- raise CustomException(msg="通知日志不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- class BillCRUD(CRUDBase[PayBillModel, Any, Any]):
- """账单 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=PayBillModel, auth=auth)
- async def get_by_pay_no(
- self, pay_no: str
- ) -> PayBillModel | None:
- """根据账单号查询账单"""
- return await self.get(pay_no=pay_no)
- async def get_by_enterprise_id(
- self, enterprise_id: str
- ) -> Sequence[PayBillModel]:
- """根据企业ID查询账单列表"""
- return await self.list({"enterprise_id": enterprise_id})
- async def update_by_pay_no(
- self, pay_no: str, data: dict
- ) -> PayBillModel | None:
- """根据账单号更新账单"""
- obj = await self.get(pay_no=pay_no, preload=[])
- if not obj:
- raise CustomException(msg="账单不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- async def get_consume_amount(
- self,
- enterprise_id: Optional[str] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- tenant_id: Optional[int] = None,
- ) -> Decimal:
- """统计消费金额:consume_type=CONSUME, status=PROCESSED,SUM consume_amount"""
- conditions = [
- PayBillModel.consume_type == "CONSUME",
- PayBillModel.status == "PROCESSED",
- ]
- if tenant_id:
- conditions.append(PayBillModel.tenant_id == tenant_id)
- if enterprise_id:
- conditions.append(PayBillModel.enterprise_id == enterprise_id)
- if start_date:
- conditions.append(PayBillModel.gmt_biz_create >= start_date)
- if end_date:
- conditions.append(PayBillModel.gmt_biz_create <= end_date)
- try:
- sql = select(func.sum(PayBillModel.consume_amount).label("total_amount")).where(
- *conditions
- )
- sql = await self.filter_permissions(sql)
- result: Result = await self.auth.db.execute(sql)
- return result.scalars().first() or Decimal(0)
- except Exception as e:
- raise CustomException(msg=f"列表查询失败: {e!s}")
- async def create_or_update(
- self, pay_no: str, data: dict
- ) -> PayBillModel:
- """创建或更新账单"""
- obj = await self.get(pay_no=pay_no, preload=[])
-
- if obj:
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
-
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
-
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- else:
- data["pay_no"] = pay_no
- if "tenant_id" not in data:
- data["tenant_id"] = self.auth.tenant_id
- return await self.create(data=data, skip_tenant_id=True)
- class BillOrderCRUD(CRUDBase[PayBillOrderModel, Any, Any]):
- """订单 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=PayBillOrderModel, auth=auth)
- async def get_by_order_no(
- self, order_no: str
- ) -> PayBillOrderModel | None:
- """根据订单号查询订单"""
- return await self.get(order_no=order_no)
- async def get_by_pay_no(
- self, pay_no: str
- ) -> Sequence[PayBillOrderModel]:
- """根据账单号查询订单列表"""
- return await self.list({"pay_no": pay_no})
- async def update_by_order_no(
- self, order_no: str, data: dict
- ) -> PayBillOrderModel | None:
- """根据订单号更新订单"""
- obj = await self.get(order_no=order_no, preload=[])
- if not obj:
- raise CustomException(msg="订单不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- async def create_or_update(
- self, order_no: str, data: dict
- ) -> PayBillOrderModel:
- """创建或更新订单"""
- obj = await self.get(order_no=order_no, preload=[])
-
- if obj:
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
-
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
-
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- else:
- data["order_no"] = order_no
- if "tenant_id" not in data:
- data["tenant_id"] = self.auth.tenant_id
- return await self.create(data=data, skip_tenant_id=True)
- class BillVoucherCRUD(CRUDBase[PayBillVoucherModel, Any, Any]):
- """凭证 CRUD 操作"""
- def __init__(self, auth: AuthSchema) -> None:
- self.auth = auth
- super().__init__(model=PayBillVoucherModel, auth=auth)
- async def get_by_voucher_id(
- self, voucher_id: str
- ) -> PayBillVoucherModel | None:
- """根据凭证ID查询凭证"""
- return await self.get(voucher_id=voucher_id)
- async def get_by_pay_no(
- self, pay_no: str
- ) -> Sequence[PayBillVoucherModel]:
- """根据账单号查询凭证列表"""
- return await self.list({"pay_no": pay_no})
- async def update_by_voucher_id(
- self, voucher_id: str, data: dict
- ) -> PayBillVoucherModel | None:
- """根据凭证ID更新凭证"""
- obj = await self.get(voucher_id=voucher_id, preload=[])
- if not obj:
- raise CustomException(msg="凭证不存在")
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- async def create_or_update(
- self, voucher_id: str, data: dict
- ) -> PayBillVoucherModel:
- """创建或更新凭证"""
- obj = await self.get(voucher_id=voucher_id, preload=[])
-
- if obj:
- if self.auth.user and hasattr(obj, "updated_id"):
- setattr(obj, "updated_id", self.auth.user.id)
-
- for key, value in data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
-
- await self.auth.db.flush()
- await self.auth.db.refresh(obj)
- return obj
- else:
- data["voucher_id"] = voucher_id
- if "tenant_id" not in data:
- data["tenant_id"] = self.auth.tenant_id
- return await self.create(data=data, skip_tenant_id=True)
|