from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_crud import CRUDBase from app.core.exceptions import CustomException from .model import EnterpriseModel from .schema import EnterpriseCreateOrUpdateSchema class EnterpriseCRUD(CRUDBase[EnterpriseModel, EnterpriseCreateOrUpdateSchema, EnterpriseCreateOrUpdateSchema]): """企业 CRUD 操作""" def __init__(self, auth: AuthSchema) -> None: """ 初始化企业数据层。 参数: - auth (AuthSchema): 认证信息模型(含 DB 会话等上下文)。 返回: - None """ self.auth = auth super().__init__(model=EnterpriseModel, auth=auth) async def get_by_enterprise_id( self, enterprise_id: str ) -> EnterpriseModel | None: """ 根据企业ID查询企业 参数: - enterprise_id (str): 企业ID 返回: - EnterpriseModel | None: 企业信息,如果不存在则为 None """ return await self.get(enterprise_id=enterprise_id) async def get_by_account_id( self, account_id: str ) -> EnterpriseModel | None: """ 根据支付宝账号 ID 查询企业 参数: - account_id (str): 支付宝账号 ID 返回: - EnterpriseModel | None: 企业信息,如果不存在则为 None """ return await self.get(account_id=account_id) async def update_by_enterprise_id( self, enterprise_id: str, data: dict ) -> EnterpriseModel | None: """ 根据企业ID更新企业 参数: - enterprise_id (str): 企业ID - data (dict): 更新字段字典 返回: - EnterpriseModel: 更新后的企业信息 """ obj = await self.get(enterprise_id=enterprise_id, preload=[]) if not obj: raise CustomException(msg="更新失败!对象不存在") # 设置字段值(只检查一次current_user) if self.auth.user and hasattr(obj, "updated_id"): setattr(obj, "updated_id", self.auth.user.id) for key, value in data.items(): if hasattr(obj, key): setattr(obj, key, value) await self.auth.db.flush() await self.auth.db.refresh(obj) # 权限二次确认:flush后再次验证对象仍在权限范围内 # 防止并发修改导致的权限逃逸(如其他事务修改了created_id) # 验证时也不自动预加载关系 verify_obj = await self.get(enterprise_id=enterprise_id, preload=[]) if not verify_obj: # 对象已被删除或权限已失效 raise CustomException(msg="更新失败!对象不存在或无权限访问") return obj