scope_sync.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. """
  2. 费控制度成员联动同步工具
  3. 部门停用/员工解约时自动移除相关制度中的成员引用。
  4. 员工调部门/部门新增员工时自动创建本地额度记录。
  5. """
  6. from app.api.v1.module_system.auth.schema import AuthSchema
  7. from app.core.alipay import AlipayClient
  8. from app.core.logger import log
  9. from app.plugin.module_payment.expense.institution.crud import InstitutionCRUD
  10. async def _sync_employee_quota(
  11. auth: AuthSchema, enterprise_id: str, employee_id: str, department_ids: list[str], is_add: bool
  12. ) -> None:
  13. """根据员工所属部门,同步本地额度记录
  14. 扫描所有按部门模式的制度:
  15. - 匹配的部门 → 确保员工有额度记录(创建缺失的)
  16. - 不匹配的部门 → 删除该员工的额度记录(处理离部门场景)
  17. """
  18. if not employee_id:
  19. return
  20. from app.plugin.module_payment.expense.quota.model import QuotaModel
  21. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  22. from sqlalchemy import insert, delete as sa_delete, select
  23. try:
  24. crud = InstitutionCRUD(auth)
  25. institutions = await crud.list(
  26. search={
  27. "enterprise_id": enterprise_id,
  28. "status__ne": "INSTITUTION_DELETE",
  29. "applicable_scope": "department",
  30. },
  31. order_by=[{"id": "desc"}],
  32. )
  33. if not institutions:
  34. return
  35. tenant_id = auth.user.tenant_id if auth.user else 1
  36. dept_id_set = set(department_ids)
  37. for inst in institutions:
  38. inst_id = inst.institution_id
  39. scope_owner_ids_str = getattr(inst, "scope_owner_id_list", None) or getattr(inst, "department_id", None)
  40. if not inst_id or not scope_owner_ids_str:
  41. continue
  42. # 解析制度的部门ID列表
  43. import json
  44. try:
  45. scope_ids = json.loads(scope_owner_ids_str) if isinstance(scope_owner_ids_str, str) else scope_owner_ids_str
  46. except (json.JSONDecodeError, TypeError):
  47. scope_ids = [str(scope_owner_ids_str)] if scope_owner_ids_str else []
  48. scope_ids = [str(s) for s in scope_ids]
  49. # 判断是否匹配
  50. matched = bool(scope_ids and dept_id_set.intersection(scope_ids))
  51. if matched:
  52. # 匹配 → 确保有额度记录
  53. check = select(QuotaModel).where(
  54. QuotaModel.employee_id == employee_id,
  55. QuotaModel.institution_id == inst_id,
  56. )
  57. existing = await auth.db.execute(check)
  58. if not existing.scalar_one_or_none():
  59. stmt = insert(QuotaModel).values(
  60. employee_id=employee_id,
  61. institution_id=inst_id,
  62. out_biz_no=f"scope_{inst_id}_{employee_id}",
  63. total_amount=0,
  64. available_amount=0,
  65. status=QuotaStatusEnum.QUOTA_PENDING.value,
  66. enterprise_id=enterprise_id,
  67. tenant_id=tenant_id,
  68. )
  69. await auth.db.execute(stmt)
  70. log.info(f"部门联动 - 新增员工额度: employee_id={employee_id}, institution_id={inst_id}")
  71. else:
  72. # 不匹配 → 删除该员工的额度记录
  73. if not is_add:
  74. continue
  75. del_stmt = sa_delete(QuotaModel).where(
  76. QuotaModel.employee_id == employee_id,
  77. QuotaModel.institution_id == inst_id,
  78. )
  79. await auth.db.execute(del_stmt)
  80. log.info(f"部门联动 - 删除员工额度: employee_id={employee_id}, institution_id={inst_id}")
  81. await auth.db.flush()
  82. except Exception as e:
  83. log.error(f"部门联动同步额度失败(不影响主体操作): {e}")
  84. async def sync_employee_add_to_department_institutions(
  85. auth: AuthSchema,
  86. enterprise_id: str,
  87. employee_id: str,
  88. department_ids: list[str],
  89. ) -> None:
  90. """员工加入部门时,为引用该部门的制度创建本地额度记录"""
  91. await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=True)
  92. async def sync_employee_remove_from_department_institutions(
  93. auth: AuthSchema,
  94. enterprise_id: str,
  95. employee_id: str,
  96. department_ids: list[str],
  97. ) -> None:
  98. """员工离开部门时,从引用该部门的制度中删除本地额度记录"""
  99. await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=False)
  100. async def remove_department_from_institution_scopes(
  101. auth: AuthSchema,
  102. enterprise_id: str,
  103. department_id: str,
  104. ) -> None:
  105. """
  106. 当部门被停用时,扫描所有引用该部门的制度,移除该部门
  107. 此方法被 department/service.py 的停用方法调用
  108. """
  109. try:
  110. crud = InstitutionCRUD(auth)
  111. institutions = await crud.list(
  112. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"},
  113. order_by=[{"id": "desc"}],
  114. )
  115. if not institutions:
  116. return
  117. for inst in institutions:
  118. inst_id = inst.institution_id
  119. if not inst_id:
  120. continue
  121. from .service import InstitutionScopeService
  122. await InstitutionScopeService.scope_modify_service(
  123. auth=auth,
  124. institution_id=inst_id,
  125. data={
  126. "enterprise_id": enterprise_id,
  127. "adapter_type": "EMPLOYEE_DEPARTMENT",
  128. "delete_owner_id_list": [department_id],
  129. },
  130. )
  131. log.info(f"已从制度 {inst_id} 中移除停用部门 {department_id}")
  132. except Exception as e:
  133. log.error(f"移除部门失败(不影响主体操作): {e}")
  134. async def sync_employee_to_all_institution(
  135. auth: AuthSchema, enterprise_id: str, employee_id: str
  136. ) -> None:
  137. """员工激活时,为全体员工(applicable_scope=all)的制度创建本地额度记录"""
  138. if not employee_id or not enterprise_id:
  139. return
  140. try:
  141. crud = InstitutionCRUD(auth)
  142. institutions = await crud.list(
  143. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE", "applicable_scope": "all"},
  144. order_by=[{"id": "desc"}],
  145. )
  146. if not institutions:
  147. return
  148. from app.plugin.module_payment.expense.quota.model import QuotaModel
  149. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  150. from sqlalchemy import insert, select
  151. tenant_id = auth.user.tenant_id if auth.user else 1
  152. for inst in institutions:
  153. inst_id = inst.institution_id
  154. if not inst_id:
  155. continue
  156. check = select(QuotaModel).where(QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id)
  157. existing = await auth.db.execute(check)
  158. if existing.scalar_one_or_none():
  159. continue
  160. stmt = insert(QuotaModel).values(
  161. employee_id=employee_id, institution_id=inst_id,
  162. out_biz_no=f"all_{inst_id}_{employee_id}",
  163. total_amount=0, available_amount=0,
  164. status=QuotaStatusEnum.QUOTA_PENDING.value,
  165. enterprise_id=enterprise_id, tenant_id=tenant_id,
  166. )
  167. await auth.db.execute(stmt)
  168. await auth.db.flush()
  169. except Exception as e:
  170. log.error(f"全员制度联动失败: {e}")
  171. async def remove_employee_from_institution_scopes(
  172. auth: AuthSchema,
  173. enterprise_id: str,
  174. employee_id: str,
  175. ) -> None:
  176. """
  177. 当员工被解约时,从所有费控制度中移除该员工
  178. 处理三种 scope 模式:
  179. - employee(指定员工):调支付宝 scope.modify 移除
  180. - department/all(按部门/全员):Alipay 自动处理,只需清理本地额度
  181. 此方法被 employee/service.py 的删除方法调用
  182. """
  183. from app.plugin.module_payment.expense.quota.model import QuotaModel
  184. from sqlalchemy import delete as sa_delete
  185. try:
  186. crud = InstitutionCRUD(auth)
  187. institutions = await crud.list(
  188. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"},
  189. order_by=[{"id": "desc"}],
  190. )
  191. if not institutions:
  192. return
  193. for inst in institutions:
  194. inst_id = inst.institution_id
  195. scope = getattr(inst, "applicable_scope", "")
  196. if not inst_id:
  197. continue
  198. # 员工模式 → 调支付宝移除
  199. if scope == "employee":
  200. from .service import InstitutionScopeService
  201. try:
  202. await InstitutionScopeService.scope_modify_service(
  203. auth=auth, institution_id=inst_id,
  204. data={
  205. "enterprise_id": enterprise_id,
  206. "adapter_type": "EMPLOYEE_SELECT",
  207. "delete_owner_id_list": [employee_id],
  208. },
  209. )
  210. log.info(f"已从制度 {inst_id} 中移除解约员工 {employee_id}")
  211. except Exception as e:
  212. log.warning(f"支付宝移除员工失败(继续清理本地): {e}")
  213. # department/all → Alipay 自动处理,无需调 scope.modify
  214. # 清理本地 pay_expense_quota
  215. del_stmt = sa_delete(QuotaModel).where(
  216. QuotaModel.employee_id == employee_id,
  217. QuotaModel.institution_id == inst_id,
  218. )
  219. await auth.db.execute(del_stmt)
  220. log.info(f"已清理本地额度: employee_id={employee_id}, institution_id={inst_id}")
  221. await auth.db.flush()
  222. except Exception as e:
  223. log.error(f"移除员工失败(不影响主体操作): {e}")