""" 费控制度成员联动同步工具 部门停用/员工解约时自动移除相关制度中的成员引用。 员工调部门/部门新增员工时自动创建本地额度记录。 """ from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.logger import log from app.plugin.module_payment.expense.institution.crud import InstitutionCRUD def _compute_quota_status(inst) -> tuple: """根据制度配置和当前时间,计算额度记录的状态和金额 返回: (total_amount, available_amount, status) """ from datetime import datetime from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from decimal import Decimal grant_mode = getattr(inst, "grant_mode", None) amount_val = float(getattr(inst, "amount", 0) or 0) # 判断是否在有效期内 now = datetime.now() in_period = True start_date = getattr(inst, "effective_start_date", None) end_date = getattr(inst, "effective_end_date", None) if start_date and now < start_date: in_period = False if end_date and in_period and now > end_date: in_period = False # 定额+有效期内 → ACTIVE + 全额 if grant_mode == "period" and amount_val > 0 and in_period: return (Decimal(str(amount_val)), Decimal(str(amount_val)), QuotaStatusEnum.QUOTA_ACTIVE.value) # 否则 → PENDING + 0 return (Decimal("0"), Decimal("0"), QuotaStatusEnum.QUOTA_PENDING.value) async def _sync_employee_quota( auth: AuthSchema, enterprise_id: str, employee_id: str, department_ids: list[str], is_add: bool ) -> None: """根据员工所属部门,同步本地额度记录 扫描所有按部门模式的制度: - 匹配的部门 → 确保员工有额度记录(检查有效期内状态) - 不匹配的部门 → 删除该员工的额度记录(处理离部门场景) """ if not employee_id: return from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import insert, delete as sa_delete, select try: crud = InstitutionCRUD(auth) institutions = await crud.list( search={ "enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE", "applicable_scope": "department", }, order_by=[{"id": "desc"}], ) if not institutions: return tenant_id = auth.user.tenant_id if auth.user else 1 dept_id_set = set(department_ids) for inst in institutions: inst_id = inst.institution_id scope_owner_ids_str = getattr(inst, "scope_owner_id_list", None) or getattr(inst, "department_id", None) if not inst_id or not scope_owner_ids_str: continue # 解析制度的部门ID列表 import json try: scope_ids = json.loads(scope_owner_ids_str) if isinstance(scope_owner_ids_str, str) else scope_owner_ids_str except (json.JSONDecodeError, TypeError): scope_ids = [str(scope_owner_ids_str)] if scope_owner_ids_str else [] scope_ids = [str(s) for s in scope_ids] # 判断是否匹配 matched = bool(scope_ids and dept_id_set.intersection(scope_ids)) if matched: # 检查是否已有记录 check = select(QuotaModel).where( QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id, ) existing = await auth.db.execute(check) if not existing.scalar_one_or_none(): total, available, status = _compute_quota_status(inst) stmt = insert(QuotaModel).values( employee_id=employee_id, institution_id=inst_id, out_biz_no=f"scope_{inst_id}_{employee_id}", total_amount=total, available_amount=available, status=status, enterprise_id=enterprise_id, tenant_id=tenant_id, ) await auth.db.execute(stmt) log.info(f"部门联动 - 新增员工额度: employee_id={employee_id}, institution_id={inst_id}, status={status}, amount={total}") else: if not is_add: continue del_stmt = sa_delete(QuotaModel).where( QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id, ) await auth.db.execute(del_stmt) log.info(f"部门联动 - 删除员工额度: employee_id={employee_id}, institution_id={inst_id}") await auth.db.flush() except Exception as e: log.error(f"部门联动同步额度失败(不影响主体操作): {e}") async def sync_employee_add_to_department_institutions( auth: AuthSchema, enterprise_id: str, employee_id: str, department_ids: list[str], ) -> None: """员工加入部门时,为引用该部门的制度创建本地额度记录""" await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=True) async def sync_employee_remove_from_department_institutions( auth: AuthSchema, enterprise_id: str, employee_id: str, department_ids: list[str], ) -> None: """员工离开部门时,从引用该部门的制度中删除本地额度记录""" await _sync_employee_quota(auth, enterprise_id, employee_id, department_ids, is_add=False) async def remove_department_from_institution_scopes( auth: AuthSchema, enterprise_id: str, department_id: str, ) -> None: """ 当部门被停用时,扫描所有引用该部门的制度,移除该部门 此方法被 department/service.py 的停用方法调用 """ try: crud = InstitutionCRUD(auth) institutions = await crud.list( search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"}, order_by=[{"id": "desc"}], ) if not institutions: return for inst in institutions: inst_id = inst.institution_id if not inst_id: continue from .service import InstitutionScopeService await InstitutionScopeService.scope_modify_service( auth=auth, institution_id=inst_id, data={ "enterprise_id": enterprise_id, "adapter_type": "EMPLOYEE_DEPARTMENT", "delete_owner_id_list": [department_id], }, ) log.info(f"已从制度 {inst_id} 中移除停用部门 {department_id}") except Exception as e: log.error(f"移除部门失败(不影响主体操作): {e}") async def sync_employee_to_all_institution( auth: AuthSchema, enterprise_id: str, employee_id: str ) -> None: """员工激活时,为全体员工(applicable_scope=all)的制度创建本地额度记录 根据制度有效期和发放模式判断状态:有效期内定额→ACTIVE,否则→PENDING """ if not employee_id or not enterprise_id: return try: crud = InstitutionCRUD(auth) institutions = await crud.list( search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE", "applicable_scope": "all"}, order_by=[{"id": "desc"}], ) if not institutions: return from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import insert, select tenant_id = auth.user.tenant_id if auth.user else 1 for inst in institutions: inst_id = inst.institution_id if not inst_id: continue check = select(QuotaModel).where(QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id) existing = await auth.db.execute(check) if existing.scalar_one_or_none(): continue total, available, status = _compute_quota_status(inst) stmt = insert(QuotaModel).values( employee_id=employee_id, institution_id=inst_id, out_biz_no=f"all_{inst_id}_{employee_id}", total_amount=total, available_amount=available, status=status, enterprise_id=enterprise_id, tenant_id=tenant_id, ) await auth.db.execute(stmt) log.info(f"全员联动 - 新增员工额度: employee_id={employee_id}, institution_id={inst_id}, status={status}, amount={total}") await auth.db.flush() except Exception as e: log.error(f"全员制度联动失败: {e}") async def remove_employee_from_institution_scopes( auth: AuthSchema, enterprise_id: str, employee_id: str, ) -> None: """ 当员工被解约时,从所有费控制度中移除该员工 处理三种 scope 模式: - employee(指定员工):调支付宝 scope.modify 移除 - department/all(按部门/全员):Alipay 自动处理,只需清理本地额度 此方法被 employee/service.py 的删除方法调用 """ from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import delete as sa_delete try: crud = InstitutionCRUD(auth) institutions = await crud.list( search={"enterprise_id": enterprise_id, "status__ne": "INSTITUTION_DELETE"}, order_by=[{"id": "desc"}], ) if not institutions: return for inst in institutions: inst_id = inst.institution_id scope = getattr(inst, "applicable_scope", "") if not inst_id: continue # 员工模式 → 调支付宝移除 if scope == "employee": from .service import InstitutionScopeService try: await InstitutionScopeService.scope_modify_service( auth=auth, institution_id=inst_id, data={ "enterprise_id": enterprise_id, "adapter_type": "EMPLOYEE_SELECT", "delete_owner_id_list": [employee_id], }, ) log.info(f"已从制度 {inst_id} 中移除解约员工 {employee_id}") except Exception as e: log.warning(f"支付宝移除员工失败(继续清理本地): {e}") # department/all → Alipay 自动处理,无需调 scope.modify # 清理本地 pay_expense_quota del_stmt = sa_delete(QuotaModel).where( QuotaModel.employee_id == employee_id, QuotaModel.institution_id == inst_id, ) await auth.db.execute(del_stmt) log.info(f"已清理本地额度: employee_id={employee_id}, institution_id={inst_id}") await auth.db.flush() except Exception as e: log.error(f"移除员工失败(不影响主体操作): {e}")