from typing import Optional, cast from app.api.v1.module_system.auth.schema import AuthSchema from app.api.v1.module_system.user.crud import UserCRUD from app.api.v1.module_system.user.schema import UserCreateSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.hash_bcrpy_util import PwdUtil from .crud import EmployeeCRUD from .schema import ( EmployeeCreateOrUpdateSchema, EmployeeListOutSchema, EmployeeOperationOutSchema, EmployeeInviteQuerySchema, EmployeeInviteQueryOutSchema, EmployeeOutSchema, ) from alipay.aop.api.domain.EmployeeInfoDTO import EmployeeInfoDTO class EmployeeService: """员工服务层""" @classmethod async def add_employee_service( cls, auth: AuthSchema, data: EmployeeCreateOrUpdateSchema ) -> EmployeeOperationOutSchema: """ 添加员工 调用: alipay.commerce.ec.employee.add """ crud = EmployeeCRUD(auth) from alipay.aop.api.request.AlipayCommerceEcEmployeeAddRequest import ( AlipayCommerceEcEmployeeAddRequest, ) from alipay.aop.api.domain.AlipayCommerceEcEmployeeAddModel import ( AlipayCommerceEcEmployeeAddModel, ) from alipay.aop.api.response.AlipayCommerceEcEmployeeAddResponse import ( AlipayCommerceEcEmployeeAddResponse, ) model = AlipayCommerceEcEmployeeAddModel() # 必选 model.enterprise_id = data.enterprise_id model.employee_name = data.employee_name # 身份标识(identity_type+identity)、身份证(employee_cert_type+employee_cert_no)、 # 手机号、邮箱四者必选其一; 当传入多个时,优先级为:身份标识>身份证>手机号>邮箱 model.identity_type = data.identity_type model.identity = data.identity model.identity_open_id = data.identity_open_id model.employee_mobile = data.employee_mobile model.employee_email = data.employee_email model.employee_cert_type = data.employee_cert_type model.employee_cert_no = data.employee_cert_no model.iot_check_type = data.iot_check_type model.employee_no = data.employee_no model.department_ids = data.department_ids model.accounting_entity_ids = data.accounting_entity_ids model.label_names = data.label_names model.sign_return_url = data.sign_return_url model.create_share_code = data.create_share_code model.sign_url_carry_info = data.sign_url_carry_info model.profiles = data.profiles request = AlipayCommerceEcEmployeeAddRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="添加员工失败: 无响应") result = AlipayCommerceEcEmployeeAddResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"添加员工失败: {result.sub_msg or result.msg or result.code}") result_data = EmployeeOperationOutSchema( employee_id=result.employee_id, sign_url=result.sign_url, share_code=result.share_code, iot_unique_id=result.iot_unique_id, ) create_data_dict = result_data.model_dump() create_data_dict.update(data.model_dump(exclude_none=True)) # 自动创建系统用户记录 user_crud = UserCRUD(auth) # 检查用户是否已存在 username = str(result.employee_id) existing_user = await user_crud.get_by_username_crud(username=username) if not existing_user: # 创建新用户 user_data = UserCreateSchema( username=username, password=PwdUtil.set_password_hash(password="123456"), # 默认密码 name=data.employee_name, mobile=data.employee_mobile, email=data.employee_email, tenant_id=auth.tenant_id, # role_ids=[11], # status="0", # 启用状态 description=f"e:{data.enterprise_id}:{result.employee_id}" ) user_dict = user_data.model_dump(exclude_unset=True) new_user = await user_crud.create(data=user_dict, skip_tenant_id=True) create_data_dict["user_id"] = new_user.id else: create_data_dict["user_id"] = existing_user.id await crud.create(data=create_data_dict) return result_data @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询员工列表 """ crud = EmployeeCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=EmployeeListOutSchema, preload=["user"] ) @classmethod async def info_service( cls, auth: AuthSchema, employee_id: Optional[str], employee_email: Optional[str], employee_mobile: Optional[str], enterprise_id: str ) -> EmployeeOutSchema: crud = EmployeeCRUD(auth) out_data = await crud.get(employee_id=employee_id, employee_email=employee_email, employee_mobile=employee_mobile, enterprise_id=enterprise_id) if not out_data: raise CustomException(msg="员工不存在") return EmployeeOutSchema.model_validate(out_data) @classmethod async def detail_service( cls, auth: AuthSchema, employee_id: Optional[str], employee_email: Optional[str], employee_mobile: Optional[str], enterprise_id: str ) -> dict: """ 查询员工详情 调用: alipay.commerce.ec.employee.info.query """ from alipay.aop.api.request.AlipayCommerceEcEmployeeInfoQueryRequest import AlipayCommerceEcEmployeeInfoQueryRequest from alipay.aop.api.domain.AlipayCommerceEcEmployeeInfoQueryModel import AlipayCommerceEcEmployeeInfoQueryModel from alipay.aop.api.response.AlipayCommerceEcEmployeeInfoQueryResponse import AlipayCommerceEcEmployeeInfoQueryResponse model = AlipayCommerceEcEmployeeInfoQueryModel() model.enterprise_id = enterprise_id model.employee_id = employee_id model.employee_email = employee_email model.mobile = employee_mobile request = AlipayCommerceEcEmployeeInfoQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询员工详情失败: 无响应") result = AlipayCommerceEcEmployeeInfoQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询员工详情失败: {result.msg}") return EmployeeInfoDTO.to_alipay_dict(cast(EmployeeInfoDTO, result.employee_info)) @classmethod async def delete_employee_service( cls, auth: AuthSchema, employee_id: str, enterprise_id: str ) -> EmployeeOperationOutSchema: """ 删除员工 调用: alipay.commerce.ec.employee.delete """ from alipay.aop.api.request.AlipayCommerceEcEmployeeDeleteRequest import AlipayCommerceEcEmployeeDeleteRequest from alipay.aop.api.domain.AlipayCommerceEcEmployeeDeleteModel import AlipayCommerceEcEmployeeDeleteModel from alipay.aop.api.response.AlipayCommerceEcEmployeeDeleteResponse import AlipayCommerceEcEmployeeDeleteResponse model = AlipayCommerceEcEmployeeDeleteModel() model.enterprise_id = enterprise_id model.employee_id = employee_id request = AlipayCommerceEcEmployeeDeleteRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="删除员工失败: 无响应") result = AlipayCommerceEcEmployeeDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除员工失败: {result.sub_msg or result.msg or result.code}") # 从本地数据库删除, 并删除关联的用户 # 先查询员工是否存在 crud = EmployeeCRUD(auth) employee = await crud.get(employee_id=employee_id, enterprise_id=enterprise_id, preload=["user"]) if not employee: raise CustomException(msg=f"员工 {employee_id} 不存在") # 解约联动:从所有引用该员工的费控制度中移除 try: from app.plugin.module_payment.expense.institution.scope_sync import remove_employee_from_institution_scopes await remove_employee_from_institution_scopes( auth=auth, enterprise_id=enterprise_id, employee_id=employee_id ) except Exception as e: log.warning(f"从制度移除解约员工失败(不影响主体操作): {e}") # 先删除关联的用户 if employee.user_id: user_service = UserCRUD(auth) await user_service.delete(ids=[employee.user_id]) await crud.delete_by_employee_id(employee_id) return EmployeeOperationOutSchema( employee_id=employee_id, ) @classmethod async def invite_query_service( cls, auth: AuthSchema, data: EmployeeInviteQuerySchema ) -> EmployeeInviteQueryOutSchema: """ 获取员工签约激活链接 调用: alipay.commerce.ec.employee.invite.query """ from alipay.aop.api.request.AlipayCommerceEcEmployeeInviteQueryRequest import AlipayCommerceEcEmployeeInviteQueryRequest from alipay.aop.api.domain.AlipayCommerceEcEmployeeInviteQueryModel import AlipayCommerceEcEmployeeInviteQueryModel from alipay.aop.api.response.AlipayCommerceEcEmployeeInviteQueryResponse import AlipayCommerceEcEmployeeInviteQueryResponse model = AlipayCommerceEcEmployeeInviteQueryModel() model.enterprise_id = data.enterprise_id model.employee_id = data.employee_id model.page_content_code = data.page_content_code model.withholding_sign_str = data.withholding_sign_str model.create_share_code = data.create_share_code request = AlipayCommerceEcEmployeeInviteQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="获取员工签约激活链接失败: 无响应") result = AlipayCommerceEcEmployeeInviteQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"获取员工签约激活链接失败: {result.msg}") return EmployeeInviteQueryOutSchema( enterprise_id=result.enterprise_id or data.enterprise_id, sign_url=result.sign_url or "", mini_app_sign_url=result.mini_app_sign_url or "", share_code=getattr(result, 'share_code', None) ) @classmethod async def update_employee_from_alipay(cls, auth: AuthSchema, data: EmployeeCreateOrUpdateSchema): """ 从支付宝更新员工信息 """ # 先查询支付宝员工信息 employee = await cls.detail_service( auth=auth, employee_id=data.employee_id, employee_email=data.employee_email, employee_mobile=data.employee_mobile, enterprise_id=data.enterprise_id ) if not employee : raise CustomException(msg=f"员工 {data.employee_id} 不存在") if hasattr(employee, 'employee_id') and employee['employee_id'] != data.employee_id: raise CustomException(msg=f"员工 {data.employee_id} 不存在") crud = EmployeeCRUD(auth) await crud.update_by( employee_mobile=data.employee_mobile, employee_email=data.employee_email, identity_open_id=data.identity_open_id, data=data.model_dump(exclude_none=True) )