scope_sync.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. """
  2. 费控制度成员联动同步工具
  3. 部门停用/员工解约时自动移除相关制度中的成员引用。
  4. 员工调部门/部门新增员工时自动创建本地额度记录。
  5. """
  6. from app.api.v1.module_system.auth.schema import AuthSchema
  7. from app.core.alipay import AlipayClient
  8. from app.core.logger import log
  9. from app.plugin.module_payment.expense.institution.crud import InstitutionCRUD
  10. def _compute_quota_status(inst) -> tuple:
  11. """根据制度配置和当前时间,计算额度记录的状态和金额
  12. 返回: (total_amount, available_amount, status)
  13. """
  14. from datetime import datetime
  15. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  16. from decimal import Decimal
  17. grant_mode = getattr(inst, "grant_mode", None)
  18. amount_val = float(getattr(inst, "amount", 0) or 0)
  19. # 判断是否在有效期内
  20. now = datetime.now()
  21. in_period = True
  22. start_date = getattr(inst, "effective_start_date", None)
  23. end_date = getattr(inst, "effective_end_date", None)
  24. if start_date and now < start_date:
  25. in_period = False
  26. if end_date and in_period and now > end_date:
  27. in_period = False
  28. # 定额+有效期内 → ACTIVE + 全额
  29. if grant_mode == "period" and amount_val > 0 and in_period:
  30. return (Decimal(str(amount_val)), Decimal(str(amount_val)), QuotaStatusEnum.QUOTA_ACTIVE.value)
  31. # 否则 → PENDING + 0
  32. return (Decimal("0"), Decimal("0"), QuotaStatusEnum.QUOTA_PENDING.value)
  33. async def _sync_employee_quota(
  34. auth: AuthSchema, enterprise_id: str, employee_id: str, department_ids: list[str], is_add: bool
  35. ) -> None:
  36. """根据员工所属部门,同步本地额度记录
  37. 扫描所有按部门模式的制度:
  38. - 匹配的部门 → 确保员工有额度记录(检查有效期内状态)
  39. - 不匹配的部门 → 删除该员工的额度记录(处理离部门场景)
  40. """
  41. if not employee_id:
  42. return
  43. from app.plugin.module_payment.expense.quota.model import QuotaModel
  44. from sqlalchemy import insert, delete as sa_delete, select
  45. try:
  46. crud = InstitutionCRUD(auth)
  47. institutions = await crud.list(
  48. search={
  49. "enterprise_id": enterprise_id,
  50. "status__ne": "INSTITUTION_DELETE",
  51. "applicable_scope": "department",
  52. },
  53. order_by=[{"id": "desc"}],
  54. )
  55. if not institutions:
  56. return
  57. tenant_id = auth.user.tenant_id if auth.user else 1
  58. dept_id_set = set(department_ids)
  59. for inst in institutions:
  60. inst_id = inst.institution_id
  61. scope_owner_ids_str = getattr(inst, "scope_owner_id_list", None) or getattr(inst, "department_id", None)
  62. if not inst_id or not scope_owner_ids_str:
  63. continue
  64. # 解析制度的部门ID列表
  65. import json
  66. try:
  67. scope_ids = json.loads(scope_owner_ids_str) if isinstance(scope_owner_ids_str, str) else scope_owner_ids_str
  68. except (json.JSONDecodeError, TypeError):
  69. scope_ids = [str(scope_owner_ids_str)] if scope_owner_ids_str else []
  70. scope_ids = [str(s) for s in scope_ids]
  71. # 判断是否匹配
  72. matched = bool(scope_ids and dept_id_set.intersection(scope_ids))
  73. if matched:
  74. # 检查是否已有记录
  75. check = select(QuotaModel).where(
  76. QuotaModel.employee_id == employee_id,
  77. QuotaModel.institution_id == inst_id,
  78. )
  79. existing = await auth.db.execute(check)
  80. if not existing.scalar_one_or_none():
  81. total, available, status = _compute_quota_status(inst)
  82. out_biz_no = f"scope_{inst_id}_{employee_id}"
  83. stmt = insert(QuotaModel).values(
  84. employee_id=employee_id,
  85. institution_id=inst_id,
  86. out_biz_no=out_biz_no,
  87. quota_id=out_biz_no,
  88. total_amount=total,
  89. available_amount=available,
  90. status=status,
  91. enterprise_id=enterprise_id,
  92. tenant_id=tenant_id,
  93. )
  94. await auth.db.execute(stmt)
  95. log.info(f"部门联动 - 新增员工额度: employee_id={employee_id}, institution_id={inst_id}, status={status}, amount={total}")
  96. else:
  97. if not is_add:
  98. continue
  99. del_stmt = sa_delete(QuotaModel).where(
  100. QuotaModel.employee_id == employee_id,
  101. QuotaModel.institution_id == inst_id,
  102. )
  103. await auth.db.execute(del_stmt)
  104. log.info(f"部门联动 - 删除员工额度: employee_id={employee_id}, institution_id={inst_id}")
  105. await auth.db.flush()
  106. except Exception as e:
  107. log.error(f"部门联动同步额度失败(不影响主体操作): {e}")
  108. async def sync_employee_add_to_department_institutions(
  109. auth: AuthSchema,
  110. enterprise_id: str,
  111. employee_id: str,
  112. department_ids: list[str],
  113. ) -> None:
  114. """员工加入部门时,为引用该部门的制度创建本地额度记录"""
  115. await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=True)
  116. async def sync_employee_remove_from_department_institutions(
  117. auth: AuthSchema,
  118. enterprise_id: str,
  119. employee_id: str,
  120. department_ids: list[str],
  121. ) -> None:
  122. """员工离开部门时,从引用该部门的制度中删除本地额度记录"""
  123. await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=False)
  124. async def remove_department_from_institution_scopes(
  125. auth: AuthSchema,
  126. enterprise_id: str,
  127. department_id: str,
  128. ) -> None:
  129. """
  130. 当部门被停用时,扫描所有引用该部门的制度,移除该部门
  131. 此方法被 department/service.py 的停用方法调用
  132. """
  133. try:
  134. crud = InstitutionCRUD(auth)
  135. institutions = await crud.list(
  136. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"},
  137. order_by=[{"id": "desc"}],
  138. )
  139. if not institutions:
  140. return
  141. for inst in institutions:
  142. inst_id = inst.institution_id
  143. if not inst_id:
  144. continue
  145. from .service import InstitutionScopeService
  146. await InstitutionScopeService.scope_modify_service(
  147. auth=auth,
  148. institution_id=inst_id,
  149. data={
  150. "enterprise_id": enterprise_id,
  151. "adapter_type": "EMPLOYEE_DEPARTMENT",
  152. "delete_owner_id_list": [department_id],
  153. },
  154. )
  155. log.info(f"已从制度 {inst_id} 中移除停用部门 {department_id}")
  156. except Exception as e:
  157. log.error(f"移除部门失败(不影响主体操作): {e}")
  158. async def sync_employee_to_all_institution(
  159. auth: AuthSchema, enterprise_id: str, employee_id: str
  160. ) -> None:
  161. """员工激活时,为全体员工(applicable_scope=all)的制度创建本地额度记录
  162. 根据制度有效期和发放模式判断状态:有效期内定额→ACTIVE,否则→PENDING
  163. """
  164. if not employee_id or not enterprise_id:
  165. return
  166. try:
  167. crud = InstitutionCRUD(auth)
  168. institutions = await crud.list(
  169. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE", "applicable_scope": "all"},
  170. order_by=[{"id": "desc"}],
  171. )
  172. if not institutions:
  173. return
  174. from app.plugin.module_payment.expense.quota.model import QuotaModel
  175. from sqlalchemy import insert, select
  176. tenant_id = auth.user.tenant_id if auth.user else 1
  177. for inst in institutions:
  178. inst_id = inst.institution_id
  179. if not inst_id:
  180. continue
  181. check = select(QuotaModel).where(QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id)
  182. existing = await auth.db.execute(check)
  183. if existing.scalar_one_or_none():
  184. continue
  185. total, available, status = _compute_quota_status(inst)
  186. out_biz_no = f"all_{inst_id}_{employee_id}"
  187. stmt = insert(QuotaModel).values(
  188. employee_id=employee_id, institution_id=inst_id,
  189. out_biz_no=out_biz_no,
  190. quota_id=out_biz_no,
  191. total_amount=total, available_amount=available,
  192. status=status,
  193. enterprise_id=enterprise_id, tenant_id=tenant_id,
  194. )
  195. await auth.db.execute(stmt)
  196. log.info(f"全员联动 - 新增员工额度: employee_id={employee_id}, institution_id={inst_id}, status={status}, amount={total}")
  197. await auth.db.flush()
  198. except Exception as e:
  199. log.error(f"全员制度联动失败: {e}")
  200. async def remove_employee_from_institution_scopes(
  201. auth: AuthSchema,
  202. enterprise_id: str,
  203. employee_id: str,
  204. ) -> None:
  205. """
  206. 当员工被解约时,从所有费控制度中移除该员工
  207. 处理三种 scope 模式:
  208. - employee(指定员工):调支付宝 scope.modify 移除
  209. - department/all(按部门/全员):Alipay 自动处理,只需清理本地额度
  210. 此方法被 employee/service.py 的删除方法调用
  211. """
  212. from app.plugin.module_payment.expense.quota.model import QuotaModel
  213. from sqlalchemy import delete as sa_delete
  214. try:
  215. crud = InstitutionCRUD(auth)
  216. institutions = await crud.list(
  217. search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"},
  218. order_by=[{"id": "desc"}],
  219. )
  220. if not institutions:
  221. return
  222. for inst in institutions:
  223. inst_id = inst.institution_id
  224. scope = getattr(inst, "applicable_scope", "")
  225. if not inst_id:
  226. continue
  227. # 员工模式 → 调支付宝移除
  228. if scope == "employee":
  229. from .service import InstitutionScopeService
  230. try:
  231. await InstitutionScopeService.scope_modify_service(
  232. auth=auth, institution_id=inst_id,
  233. data={
  234. "enterprise_id": enterprise_id,
  235. "adapter_type": "EMPLOYEE_SELECT",
  236. "delete_owner_id_list": [employee_id],
  237. },
  238. )
  239. log.info(f"已从制度 {inst_id} 中移除解约员工 {employee_id}")
  240. except Exception as e:
  241. log.warning(f"支付宝移除员工失败(继续清理本地): {e}")
  242. # department/all → Alipay 自动处理,无需调 scope.modify
  243. # 清理本地 pay_expense_quota
  244. del_stmt = sa_delete(QuotaModel).where(
  245. QuotaModel.employee_id == employee_id,
  246. QuotaModel.institution_id == inst_id,
  247. )
  248. await auth.db.execute(del_stmt)
  249. log.info(f"已清理本地额度: employee_id={employee_id}, institution_id={inst_id}")
  250. await auth.db.flush()
  251. except Exception as e:
  252. log.error(f"移除员工失败(不影响主体操作): {e}")