from typing import Optional, List, Dict, Any from alipay.aop.api.domain.DepartmentInfoDTO import DepartmentInfoDTO from alipay.aop.api.domain.AlipayCommerceEcDepartmentCreateModel import AlipayCommerceEcDepartmentCreateModel from alipay.aop.api.domain.AlipayCommerceEcDepartmentDeleteModel import AlipayCommerceEcDepartmentDeleteModel from alipay.aop.api.response.AlipayCommerceEcDepartmentCreateResponse import AlipayCommerceEcDepartmentCreateResponse from alipay.aop.api.response.AlipayCommerceEcDepartmentDeleteResponse import AlipayCommerceEcDepartmentDeleteResponse from alipay.aop.api.request.AlipayCommerceEcDepartmentCreateRequest import AlipayCommerceEcDepartmentCreateRequest from alipay.aop.api.request.AlipayCommerceEcDepartmentDeleteRequest import AlipayCommerceEcDepartmentDeleteRequest from alipay.aop.api.request.AlipayCommerceEcDepartmentInfoModifyRequest import AlipayCommerceEcDepartmentInfoModifyRequest from alipay.aop.api.request.AlipayCommerceEcDepartmentInfoQueryRequest import AlipayCommerceEcDepartmentInfoQueryRequest from alipay.aop.api.request.AlipayCommerceEcDepartmentSublistQueryRequest import AlipayCommerceEcDepartmentSublistQueryRequest from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.logger import log from app.core.exceptions import CustomException from .crud import DepartmentCRUD from .schema import ( DepartmentCreateSchema, DepartmentUpdateSchema, DepartmentDetailOutSchema, DepartmentOperationOutSchema ) class DepartmentService: """部门服务""" @classmethod async def create_department_service( cls, auth: AuthSchema, data: DepartmentCreateSchema ) -> DepartmentOperationOutSchema: """创建部门""" # 调用支付宝接口创建部门 department_create_model = AlipayCommerceEcDepartmentCreateModel() department_create_model.enterprise_id = data.enterprise_id department_create_model.department_name = data.department_name if data.department_code: department_create_model.department_code = data.department_code if data.parent_department_id: department_create_model.parent_department_id = data.parent_department_id or "-1" client = AlipayClient.get_client() request = AlipayCommerceEcDepartmentCreateRequest() request.biz_model = department_create_model response = client.execute(request) if not response: raise CustomException(msg="创建部门失败: 无响应") result = AlipayCommerceEcDepartmentCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建部门失败: {result.sub_code or result.msg or result.code}") # 保存到数据库 crud = DepartmentCRUD(auth) department_data = { "department_id": result.department_id or "", "department_name": data.department_name, "department_code": data.department_code, "parent_department_id": data.parent_department_id or "-1", "enterprise_id": data.enterprise_id, "sort_order": data.sort_order, "leader_employee_id": data.leader_employee_id, "leader_employee_name": data.leader_employee_name } await crud.create(department_data) return DepartmentOperationOutSchema( department_id=result.department_id or "", department_name=data.department_name ) @classmethod async def delete_department_service( cls, auth: AuthSchema, department_id: str, enterprise_id: str ) -> DepartmentOperationOutSchema: return DepartmentOperationOutSchema( department_id=department_id, department_name="" ) @classmethod async def update_department_service( cls, auth: AuthSchema, department_id: str, enterprise_id: str, data: DepartmentUpdateSchema ) -> DepartmentOperationOutSchema: """更新部门""" return DepartmentOperationOutSchema( department_id=department_id, department_name="" ) @classmethod async def get_department_service( cls, auth: AuthSchema, department_id: str, enterprise_id: str ) -> DepartmentDetailOutSchema: """查询部门详情""" return DepartmentDetailOutSchema( department=None, sub_departments=[] ) @classmethod async def get_sub_departments_service( cls, auth: AuthSchema, parent_department_id: str, enterprise_id: str ) -> List[str]: """查询子部门列表""" return [] @classmethod async def list_service( cls, auth: AuthSchema, page_no: int, page_size: int, search: Dict[str, Any] ) -> Dict[str, Any]: """分页查询部门列表""" return {} @classmethod async def get_department_tree_service( cls, auth: AuthSchema, enterprise_id: str ) -> List[Dict[str, Any]]: """获取部门树形结构""" return []