employee_handler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. from redis.asyncio import Redis
  2. from app.api.v1.module_system.auth.schema import AuthSchema
  3. from app.core.logger import log
  4. from app.plugin.module_payment.employee.schema import EmployeeCreateOrUpdateSchema
  5. from app.plugin.module_payment.employee.service import EmployeeService
  6. from ..schemas import EmployeeChangeContent
  7. from ..enums import EmployeeActionEnum
  8. from .base_handler import BaseHandler
  9. class EmployeeHandler(BaseHandler[dict]):
  10. """员工变更通知处理器"""
  11. async def update_employee(self, data: EmployeeChangeContent, auth: AuthSchema):
  12. """更新员工信息"""
  13. update_data = EmployeeCreateOrUpdateSchema(
  14. enterprise_id=data.enterprise_id,
  15. employee_id=data.employee_id,
  16. employee_name=data.employee_name,
  17. employee_mobile=data.mobile,
  18. status=data.activate,
  19. employee_email=data.email,
  20. identity_open_id=data.open_id,
  21. department_list=data.department_list,
  22. role_list=data.role_list,
  23. )
  24. await EmployeeService.update_employee_from_alipay(auth, update_data)
  25. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  26. """
  27. 处理员工变更通知
  28. 动作类型:
  29. - EMPLOYEE_ADD: 员工新增
  30. - EMPLOYEE_ACTIVATE: 员工激活
  31. - EMPLOYEE_UPDATE: 员工信息修改
  32. - EMPLOYEE_DEPARTMENT_CHANGE: 部门修改
  33. - EMPLOYEE_ROLE_CHANGE: 角色修改
  34. - EMPLOYEE_DELETE: 员工删除
  35. """
  36. try:
  37. notify_data = EmployeeChangeContent(**content)
  38. except Exception as e:
  39. log.error(f"解析员工通知内容失败: {e}")
  40. return False
  41. action = notify_data.action
  42. employee_id = notify_data.employee_id
  43. log.info(f"处理员工变更通知: action={action}, employee_id={employee_id}")
  44. try:
  45. if action == EmployeeActionEnum.EMPLOYEE_ADD.value:
  46. return await self._handle_add(notify_data, auth)
  47. elif action == EmployeeActionEnum.EMPLOYEE_ACTIVATE.value:
  48. return await self._handle_activate(notify_data, auth)
  49. elif action == EmployeeActionEnum.EMPLOYEE_UPDATE.value:
  50. return await self._handle_update(notify_data, auth)
  51. elif action == EmployeeActionEnum.EMPLOYEE_DEPARTMENT_CHANGE.value:
  52. return await self._handle_department_change(notify_data, auth)
  53. elif action == EmployeeActionEnum.EMPLOYEE_ROLE_CHANGE.value:
  54. return await self._handle_role_change(notify_data, auth)
  55. elif action == EmployeeActionEnum.EMPLOYEE_DELETE.value:
  56. return await self._handle_delete(notify_data, auth)
  57. else:
  58. log.warning(f"未知的员工变更动作: {action}")
  59. return True
  60. except Exception as e:
  61. log.error(f"支付宝通知消息 - 处理员工变更通知消息异常: {e}")
  62. return False
  63. async def _get_department_ids(self, data: EmployeeChangeContent) -> list[str]:
  64. """从通知数据中提取部门ID列表"""
  65. dept_list = data.department_list or []
  66. return [
  67. (d.get("department_id") or "") for d in dept_list
  68. if isinstance(d, dict) and d.get("department_id")
  69. ]
  70. async def _handle_add(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  71. """处理员工新增(未激活,不触发额度联动)"""
  72. log.info(f"员工新增: employee_id={data.employee_id}, enterprise_id={data.enterprise_id}")
  73. # 有则更新,无则新增
  74. from app.plugin.module_payment.employee.crud import EmployeeCRUD
  75. crud = EmployeeCRUD(auth)
  76. exists = await crud.get_by_employee_id(data.employee_id, data.enterprise_id)
  77. if exists:
  78. await self.update_employee(data, auth)
  79. else:
  80. update_data = EmployeeCreateOrUpdateSchema(
  81. enterprise_id=data.enterprise_id,
  82. employee_id=data.employee_id,
  83. employee_name=data.employee_name,
  84. employee_mobile=data.mobile,
  85. status=data.activate,
  86. employee_email=data.email,
  87. identity_open_id=data.open_id,
  88. department_list=data.department_list,
  89. role_list=data.role_list,
  90. )
  91. await crud.create(data=update_data.model_dump(exclude_none=True))
  92. return True
  93. async def _handle_activate(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  94. """处理员工激活"""
  95. log.info(f"员工激活: employee_id={data.employee_id}")
  96. await self.update_employee(data, auth)
  97. # 联动1:全体员工级别的制度
  98. if data.enterprise_id:
  99. from app.plugin.module_payment.expense.institution.scope_sync import sync_employee_to_all_institution
  100. await sync_employee_to_all_institution(
  101. auth=auth, enterprise_id=data.enterprise_id,
  102. employee_id=data.employee_id,
  103. )
  104. # 联动2:部门级别的制度
  105. dept_ids = await self._get_department_ids(data)
  106. if dept_ids:
  107. from app.plugin.module_payment.expense.institution.scope_sync import sync_employee_add_to_department_institutions
  108. await sync_employee_add_to_department_institutions(
  109. auth=auth, enterprise_id=data.enterprise_id,
  110. employee_id=data.employee_id, department_ids=dept_ids,
  111. )
  112. return True
  113. async def _handle_update(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  114. """处理员工信息修改"""
  115. log.info(f"员工信息修改: employee_id={data.employee_id}")
  116. await self.update_employee(data, auth)
  117. return True
  118. async def _handle_department_change(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  119. """处理部门修改"""
  120. log.info(f"部门修改: employee_id={data.employee_id}")
  121. await self.update_employee(data, auth)
  122. # 联动:根据新部门重新同步额度(先移除旧的不匹配的,再创建新的)
  123. dept_ids = await self._get_department_ids(data)
  124. from app.plugin.module_payment.expense.institution.scope_sync import sync_employee_add_to_department_institutions
  125. await sync_employee_add_to_department_institutions(
  126. auth=auth, enterprise_id=data.enterprise_id,
  127. employee_id=data.employee_id, department_ids=dept_ids,
  128. )
  129. return True
  130. async def _handle_role_change(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  131. """处理角色修改"""
  132. log.info(f"角色修改: employee_id={data.employee_id}")
  133. await self.update_employee(data, auth)
  134. return True
  135. async def _handle_delete(self, data: EmployeeChangeContent, auth: AuthSchema) -> bool:
  136. """处理员工删除/离职"""
  137. log.info(f"员工删除/离职: employee_id={data.employee_id}")
  138. await self.update_employee(data, auth)
  139. # 联动:从所有引用该员工的制度中移除
  140. if data.employee_id and data.enterprise_id:
  141. try:
  142. from app.plugin.module_payment.expense.institution.scope_sync import remove_employee_from_institution_scopes
  143. await remove_employee_from_institution_scopes(
  144. auth=auth, enterprise_id=data.enterprise_id,
  145. employee_id=data.employee_id,
  146. )
  147. except Exception as e:
  148. log.warning(f"删除员工联动失败(不影响主流程): {e}")
  149. return True