permission.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. from typing import Any
  2. from sqlalchemy import select
  3. from sqlalchemy.sql.elements import ColumnElement
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.api.v1.module_system.dept.model import DeptModel
  6. from app.api.v1.module_system.user.model import UserModel
  7. from app.common.enums import PermissionFilterStrategy
  8. from app.utils.common_util import get_child_id_map, get_child_recursion
  9. class Permission:
  10. """
  11. 为业务模型提供数据权限过滤功能
  12. 使用策略模式,根据模型的 __permission_strategy__ 属性选择合适的过滤策略
  13. """
  14. # 数据权限常量定义,提高代码可读性
  15. DATA_SCOPE_SELF = 1 # 仅本人数据
  16. DATA_SCOPE_DEPT = 2 # 本部门数据
  17. DATA_SCOPE_DEPT_AND_CHILD = 3 # 本部门及以下数据
  18. DATA_SCOPE_ALL = 4 # 全部数据
  19. DATA_SCOPE_CUSTOM = 5 # 自定义数据
  20. def __init__(self, model: Any, auth: AuthSchema) -> None:
  21. """
  22. 初始化权限过滤器实例
  23. Args:
  24. db: 数据库会话
  25. model: 数据模型类
  26. current_user: 当前用户对象
  27. auth: 认证信息对象
  28. """
  29. self.model = model
  30. self.auth = auth
  31. self.conditions: list[ColumnElement] = [] # 权限条件列表
  32. async def filter_query(self, query: Any) -> Any:
  33. """
  34. 按数据权限为 SQLAlchemy 查询追加 WHERE 条件。
  35. 参数:
  36. - query (Any): SQLAlchemy 查询对象。
  37. 返回:
  38. - Any: 附加条件后的查询对象(无权限条件时原样返回)。
  39. """
  40. condition = await self.__permission_condition()
  41. return query.where(condition) if condition is not None else query
  42. async def __permission_condition(self) -> ColumnElement | None:
  43. """
  44. 应用数据范围权限隔离
  45. 根据模型的权限过滤策略,选择合适的过滤方法
  46. """
  47. # 如果不需要检查数据权限,则不限制
  48. if not self.auth.user:
  49. return None
  50. # 如果检查数据权限为False,则不限制
  51. if not self.auth.check_data_scope:
  52. return None
  53. # 超级管理员可以查看所有数据
  54. if self.auth.user.is_superuser:
  55. return None
  56. # 获取模型的权限过滤策略
  57. strategy = getattr(self.model, "__permission_strategy__", PermissionFilterStrategy.DATA_SCOPE)
  58. # 根据策略选择过滤方法
  59. if strategy == PermissionFilterStrategy.ROLE_BASED:
  60. return await self.__filter_by_role_based()
  61. elif strategy == PermissionFilterStrategy.DEPT_BASED:
  62. return await self.__filter_by_dept_based()
  63. elif strategy == PermissionFilterStrategy.SELF_ONLY:
  64. return await self.__filter_by_self_only()
  65. elif strategy == PermissionFilterStrategy.USER_ROLE:
  66. return await self.__filter_by_user_role()
  67. elif strategy == PermissionFilterStrategy.ENTERPRISE_BASED:
  68. return await self.__filter_by_enterprise_based()
  69. else:
  70. return await self.__filter_by_data_scope()
  71. async def __filter_by_enterprise_based(self) -> ColumnElement | None:
  72. """
  73. 基于企业ID的企业隔离过滤(双重隔离:租户+企业)
  74. 适用于 pay_employee、pay_expense_institution、pay_expense_rule、pay_expense_quota 等业务表
  75. 通过 tenant_id + enterprise_id 实现数据隔离
  76. 双重安全屏障:
  77. - tenant_id:确保跨租户隔离
  78. - enterprise_id:确保同租户下跨企业隔离
  79. """
  80. from sqlalchemy import and_
  81. conditions = []
  82. # 如果模型有 tenant_id 字段,按租户隔离
  83. tenant_id_attr = getattr(self.model, "tenant_id", None)
  84. if tenant_id_attr is not None:
  85. user_tenant_id = getattr(self.auth.user, "tenant_id", None)
  86. if user_tenant_id is not None:
  87. conditions.append(tenant_id_attr == user_tenant_id)
  88. # 如果有企业ID参数,按企业隔离
  89. auth_enterprise_id = getattr(self.auth, "enterprise_id", None)
  90. if auth_enterprise_id:
  91. enterprise_id_attr = getattr(self.model, "enterprise_id", None)
  92. if enterprise_id_attr is not None:
  93. conditions.append(enterprise_id_attr == auth_enterprise_id)
  94. if conditions:
  95. return and_(*conditions)
  96. return None
  97. async def __filter_by_role_based(self) -> ColumnElement | None:
  98. """
  99. 基于角色授权的权限过滤(适用于菜单等)
  100. 只显示用户角色授权的菜单
  101. """
  102. roles = getattr(self.auth.user, "roles", []) or []
  103. if not roles:
  104. id_attr = getattr(self.model, "id", None)
  105. if id_attr is not None:
  106. return id_attr == -1
  107. return None
  108. menu_ids = set()
  109. for role in roles:
  110. if hasattr(role, "menus") and role.menus:
  111. menu_ids.update(menu.id for menu in role.menus if menu.status == "0")
  112. if menu_ids:
  113. id_attr = getattr(self.model, "id", None)
  114. if id_attr is not None:
  115. return id_attr.in_(list(menu_ids))
  116. id_attr = getattr(self.model, "id", None)
  117. if id_attr is not None:
  118. return id_attr == -1
  119. return None
  120. async def __filter_by_user_role(self) -> ColumnElement | None:
  121. """
  122. 基于当前用户绑定角色的权限过滤(适用于角色列表)
  123. 只显示当前用户绑定的角色
  124. """
  125. roles = getattr(self.auth.user, "roles", []) or []
  126. if not roles:
  127. id_attr = getattr(self.model, "id", None)
  128. if id_attr is not None:
  129. return id_attr == -1
  130. return None
  131. role_ids = [role.id for role in roles]
  132. id_attr = getattr(self.model, "id", None)
  133. if id_attr is not None:
  134. return id_attr.in_(role_ids)
  135. return None
  136. async def __filter_by_dept_based(self) -> ColumnElement | None:
  137. """
  138. 基于部门关联的权限过滤(适用于部门、角色等)
  139. 根据用户的部门权限范围过滤数据
  140. """
  141. # 如果用户没有角色,则只能查看自己部门的数据
  142. roles = getattr(self.auth.user, "roles", []) or []
  143. if not roles:
  144. user_dept_id = getattr(self.auth.user, "dept_id", None)
  145. if user_dept_id is not None and hasattr(self.model, "id"):
  146. id_attr = getattr(self.model, "id", None)
  147. if id_attr is not None:
  148. return id_attr == user_dept_id
  149. return None
  150. # 获取用户所有角色的权限范围
  151. data_scopes = set()
  152. custom_dept_ids = set()
  153. for role in roles:
  154. data_scopes.add(role.data_scope)
  155. if role.data_scope == self.DATA_SCOPE_CUSTOM and hasattr(role, "depts") and role.depts:
  156. custom_dept_ids.update(dept.id for dept in role.depts)
  157. # 全部数据权限最高优先级
  158. if self.DATA_SCOPE_ALL in data_scopes:
  159. return None
  160. # 收集所有可访问的部门ID
  161. accessible_dept_ids = await self.__get_accessible_dept_ids(data_scopes, custom_dept_ids)
  162. # 根据模型类型过滤
  163. if self.model.__name__ == "DeptModel":
  164. return self.__filter_dept_model(accessible_dept_ids)
  165. elif self.model.__name__ == "UserModel":
  166. return self.__filter_user_model(accessible_dept_ids)
  167. else:
  168. return None
  169. async def __filter_by_self_only(self) -> ColumnElement | None:
  170. """
  171. 仅本人数据权限过滤
  172. """
  173. created_id_attr = getattr(self.model, "created_id", None)
  174. if created_id_attr is not None and self.auth.user:
  175. return created_id_attr == self.auth.user.id
  176. return None
  177. async def __filter_by_data_scope(self) -> ColumnElement | None:
  178. """
  179. 基于数据范围权限的通用过滤(默认策略)
  180. 适用于大多数业务模型
  181. """
  182. # 如果模型没有创建人created_id字段,则不限制
  183. if not hasattr(self.model, "created_id"):
  184. return None
  185. # 如果用户没有角色,则只能查看自己的数据
  186. roles = getattr(self.auth.user, "roles", []) or []
  187. if not roles:
  188. created_id_attr = getattr(self.model, "created_id", None)
  189. if created_id_attr is not None and self.auth.user:
  190. return created_id_attr == self.auth.user.id
  191. return None
  192. # 获取用户所有角色的权限范围
  193. data_scopes = set()
  194. custom_dept_ids = set()
  195. for role in roles:
  196. data_scopes.add(role.data_scope)
  197. if role.data_scope == self.DATA_SCOPE_CUSTOM and hasattr(role, "depts") and role.depts:
  198. custom_dept_ids.update(dept.id for dept in role.depts)
  199. # 全部数据权限最高优先级
  200. if self.DATA_SCOPE_ALL in data_scopes:
  201. return None
  202. # 收集所有可访问的部门ID
  203. accessible_dept_ids = await self.__get_accessible_dept_ids(data_scopes, custom_dept_ids)
  204. # 如果有部门权限,使用部门过滤
  205. if accessible_dept_ids:
  206. # 特殊处理:如果模型本身就是UserModel,直接过滤用户的dept_id
  207. if self.model.__name__ == "UserModel" and hasattr(self.model, "dept_id"):
  208. dept_id_attr = getattr(self.model, "dept_id", None)
  209. if dept_id_attr is not None:
  210. return dept_id_attr.in_(list(accessible_dept_ids))
  211. # 其他模型:通过created_by关系过滤创建人的部门
  212. creator_rel = getattr(self.model, "created_by", None)
  213. if creator_rel is not None and hasattr(UserModel, "dept_id"):
  214. return creator_rel.has(UserModel.dept_id.in_(list(accessible_dept_ids)))
  215. # 降级方案:只能查看自己的数据
  216. created_id_attr = getattr(self.model, "created_id", None)
  217. if created_id_attr is not None and self.auth.user:
  218. return created_id_attr == self.auth.user.id
  219. return None
  220. # 处理仅本人数据权限
  221. if self.DATA_SCOPE_SELF in data_scopes:
  222. created_id_attr = getattr(self.model, "created_id", None)
  223. if created_id_attr is not None and self.auth.user:
  224. return created_id_attr == self.auth.user.id
  225. return None
  226. # 默认情况:只能查看自己的数据
  227. created_id_attr = getattr(self.model, "created_id", None)
  228. if created_id_attr is not None and self.auth.user:
  229. return created_id_attr == self.auth.user.id
  230. return None
  231. async def __get_accessible_dept_ids(
  232. self, data_scopes: set, custom_dept_ids: set
  233. ) -> set[int]:
  234. """
  235. 获取用户可访问的所有部门ID
  236. Args:
  237. data_scopes: 用户角色的数据权限范围集合
  238. custom_dept_ids: 自定义权限关联的部门ID集合
  239. Returns:
  240. 可访问的部门ID集合
  241. """
  242. accessible_dept_ids = set()
  243. user_dept_id = getattr(self.auth.user, "dept_id", None)
  244. # 处理自定义数据权限(5)
  245. if self.DATA_SCOPE_CUSTOM in data_scopes:
  246. accessible_dept_ids.update(custom_dept_ids)
  247. # 处理本部门数据权限(2)
  248. if self.DATA_SCOPE_DEPT in data_scopes and user_dept_id is not None:
  249. accessible_dept_ids.add(user_dept_id)
  250. # 处理本部门及以下数据权限(3)
  251. if self.DATA_SCOPE_DEPT_AND_CHILD in data_scopes and user_dept_id is not None:
  252. try:
  253. dept_sql = select(DeptModel)
  254. dept_result = await self.auth.db.execute(dept_sql)
  255. dept_objs = dept_result.scalars().all()
  256. id_map = get_child_id_map(dept_objs)
  257. dept_with_children_ids = get_child_recursion(id=user_dept_id, id_map=id_map)
  258. accessible_dept_ids.update(dept_with_children_ids)
  259. except Exception:
  260. accessible_dept_ids.add(user_dept_id)
  261. return accessible_dept_ids
  262. def __filter_dept_model(self, accessible_dept_ids: set[int]) -> ColumnElement | None:
  263. """
  264. 过滤部门模型
  265. """
  266. if accessible_dept_ids:
  267. id_attr = getattr(self.model, "id", None)
  268. if id_attr is not None:
  269. return id_attr.in_(list(accessible_dept_ids))
  270. user_dept_id = getattr(self.auth.user, "dept_id", None)
  271. if user_dept_id is not None:
  272. id_attr = getattr(self.model, "id", None)
  273. if id_attr is not None:
  274. return id_attr == user_dept_id
  275. return None
  276. def __filter_user_model(self, accessible_dept_ids: set[int]) -> ColumnElement | None:
  277. """
  278. 过滤用户模型
  279. """
  280. if accessible_dept_ids:
  281. dept_id_attr = getattr(self.model, "dept_id", None)
  282. if dept_id_attr is not None:
  283. return dept_id_attr.in_(list(accessible_dept_ids))
  284. return None