| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760 |
- import io
- from typing import Any
- import pandas as pd
- from fastapi import UploadFile
- from app.api.v1.module_system.auth.schema import AuthSchema, SmsCodeSchema
- from app.api.v1.module_system.dept.crud import DeptCRUD
- from app.api.v1.module_system.menu.crud import MenuCRUD
- from app.api.v1.module_system.menu.schema import MenuOutSchema
- from app.api.v1.module_system.position.crud import PositionCRUD
- from app.api.v1.module_system.role.crud import RoleCRUD
- from app.core.base_schema import BatchSetAvailable, UploadResponseSchema
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.common_util import traversal_to_tree
- from app.utils.excel_util import ExcelUtil
- from app.utils.hash_bcrpy_util import PwdUtil
- from app.utils.upload_util import UploadUtil
- from redis.asyncio.client import Redis
- from .crud import UserCRUD
- from .schema import (
- CurrentUserUpdateSchema,
- ResetPasswordSchema,
- UserChangePasswordSchema,
- UserCreateSchema,
- UserForgetPasswordSchema,
- UserOutSchema,
- UserQueryParam,
- UserRegisterSchema,
- UserUpdateSchema,
- )
- from ..auth.service import SmsCodeService
- from ..tenant.schema import TenantCreateSchema
- from ..tenant.service import TenantService
- class UserService:
- """用户模块服务层"""
- @classmethod
- async def get_detail_by_id_service(cls, auth: AuthSchema, id: int) -> dict:
- """
- 根据ID获取用户详情
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 用户ID
- 返回:
- - dict: 用户详情字典
- """
- user = await UserCRUD(auth).get_by_id_crud(id=id)
- if not user:
- raise CustomException(msg="用户不存在")
- # 如果用户绑定了部门,则获取部门名称
- if user.dept_id:
- dept = await DeptCRUD(auth).get_by_id_crud(id=user.dept_id)
- UserOutSchema.dept_name = dept.name if dept else None
- else:
- UserOutSchema.dept_name = None
- return UserOutSchema.model_validate(user).model_dump()
- @classmethod
- async def get_user_list_service(
- cls,
- auth: AuthSchema,
- search: UserQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> list[dict]:
- """
- 获取用户列表
- 参数:
- - auth (AuthSchema): 认证信息模型
- - search (UserQueryParam | None): 查询参数对象。
- - order_by (list[dict[str, str]] | None): 排序参数列表。
- 返回:
- - list[dict]: 用户详情字典列表
- """
- user_list = await UserCRUD(auth).get_list_crud(search=search.__dict__, order_by=order_by)
- user_dict_list = []
- for user in user_list:
- user_dict = UserOutSchema.model_validate(user).model_dump()
- user_dict_list.append(user_dict)
- return user_dict_list
- @classmethod
- async def get_user_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: UserQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> dict:
- """
- 分页查询用户(数据库 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息模型
- - page_no (int): 页码(从 1 开始)
- - page_size (int): 每页条数
- - search (UserQueryParam | None): 查询条件
- - order_by (list[dict[str, str]] | None): 排序字段列表
- 返回:
- - dict: 分页结果(结构由 `CRUD.page` 返回约定)
- """
- offset = (page_no - 1) * page_size
- return await UserCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=order_by or [{"id": "asc"}],
- search=search.__dict__ if search else {},
- out_schema=UserOutSchema,
- )
- @classmethod
- async def create_user_service(cls, data: UserCreateSchema, auth: AuthSchema) -> dict:
- """
- 创建用户
- 参数:
- - data (UserCreateSchema): 用户创建信息
- - auth (AuthSchema): 认证信息模型
- 返回:
- - dict: 创建后的用户详情字典
- """
- if not data.username:
- raise CustomException(msg="用户名不能为空")
- # 检查是否试图创建超级管理员
- if data.is_superuser:
- raise CustomException(msg="不允许创建超级管理员")
- # 检查用户名是否存在
- user = await UserCRUD(auth).get_by_username_crud(username=data.username)
- if user:
- raise CustomException(msg="已存在相同用户名称的账号")
- # 检查部门是否存在
- if data.dept_id:
- dept = await DeptCRUD(auth).get_by_id_crud(id=data.dept_id)
- if not dept:
- raise CustomException(msg="部门不存在")
- # 创建用户
- if data.password:
- data.password = PwdUtil.set_password_hash(password=data.password)
- user_dict = data.model_dump(exclude_unset=True, exclude={"role_ids", "position_ids"})
- # 创建用户
- new_user = await UserCRUD(auth).create(data=user_dict)
- # 设置角色
- if data.role_ids and len(data.role_ids) > 0:
- await UserCRUD(auth).set_user_roles_crud(user_ids=[new_user.id], role_ids=data.role_ids)
- # 设置岗位
- if data.position_ids and len(data.position_ids) > 0:
- await UserCRUD(auth).set_user_positions_crud(
- user_ids=[new_user.id], position_ids=data.position_ids
- )
- new_user_dict = UserOutSchema.model_validate(new_user).model_dump()
- return new_user_dict
- @classmethod
- async def update_user_service(cls, id: int, data: UserUpdateSchema, auth: AuthSchema) -> dict:
- """
- 更新用户
- 参数:
- - id (int): 用户ID
- - data (UserUpdateSchema): 用户更新信息
- - auth (AuthSchema): 认证信息模型
- 返回:
- - Dict: 更新后的用户详情字典
- """
- if not data.username:
- raise CustomException(msg="账号不能为空")
- # 检查用户是否存在
- user = await UserCRUD(auth).get_by_id_crud(id=id)
- if not user:
- raise CustomException(msg="用户不存在")
- # 检查是否尝试修改超级管理员
- if user.is_superuser:
- raise CustomException(msg="超级管理员不允许修改")
- # 检查用户名是否重复
- exist_user = await UserCRUD(auth).get_by_username_crud(username=data.username)
- if exist_user and exist_user.id != id:
- raise CustomException(msg="已存在相同的账号")
- # 新增:检查手机号是否重复
- if data.mobile:
- exist_mobile_user = await UserCRUD(auth).get_by_mobile_crud(mobile=data.mobile)
- if exist_mobile_user and exist_mobile_user.id != id:
- raise CustomException(msg="更新失败,手机号已存在")
- # 新增:检查邮箱是否重复
- if data.email:
- exist_email_user = await UserCRUD(auth).get(email=data.email)
- if exist_email_user and exist_email_user.id != id:
- raise CustomException(msg="更新失败,邮箱已存在")
- # 检查部门是否存在且可用
- if data.dept_id:
- dept = await DeptCRUD(auth).get_by_id_crud(id=data.dept_id)
- if not dept:
- raise CustomException(msg="部门不存在")
- if dept.status == "1":
- raise CustomException(msg="部门已被禁用")
- # 更新用户 - 排除不应被修改的字段, 更新不更新密码
- user_dict = data.model_dump(
- exclude_unset=True,
- exclude={"role_ids", "position_ids", "last_login", "password"},
- )
- new_user = await UserCRUD(auth).update(id=id, data=user_dict)
- # 更新角色和岗位
- if data.role_ids and len(data.role_ids) > 0:
- # 检查角色是否都存在且可用
- roles = await RoleCRUD(auth).get_list_crud(search={"id": ("in", data.role_ids)})
- if len(roles) != len(data.role_ids):
- raise CustomException(msg="部分角色不存在")
- if not all(role.status for role in roles):
- raise CustomException(msg="部分角色已被禁用")
- await UserCRUD(auth).set_user_roles_crud(user_ids=[id], role_ids=data.role_ids)
- if data.position_ids and len(data.position_ids) > 0:
- # 检查岗位是否都存在且可用
- positions = await PositionCRUD(auth).get_list_crud(
- search={"id": ("in", data.position_ids)}
- )
- if len(positions) != len(data.position_ids):
- raise CustomException(msg="部分岗位不存在")
- if not all(position.status for position in positions):
- raise CustomException(msg="部分岗位已被禁用")
- await UserCRUD(auth).set_user_positions_crud(
- user_ids=[id], position_ids=data.position_ids
- )
- user_dict = UserOutSchema.model_validate(new_user).model_dump()
- return user_dict
- @classmethod
- async def delete_user_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """
- 删除用户
- 参数:
- - auth (AuthSchema): 认证信息模型
- - ids (list[int]): 用户ID列表
- 返回:
- - None
- """
- if len(ids) < 1:
- raise CustomException(msg="删除失败,删除对象不能为空")
- for id in ids:
- user = await UserCRUD(auth).get_by_id_crud(id=id)
- if not user:
- raise CustomException(msg="用户不存在")
- if user.is_superuser:
- raise CustomException(msg="超级管理员不能删除")
- if user.status == "0":
- raise CustomException(msg="用户已启用,不能删除")
- if auth.user and auth.user.id == id:
- raise CustomException(msg="不能删除当前登陆用户")
- # 删除用户角色关联数据
- await UserCRUD(auth).set_user_roles_crud(user_ids=ids, role_ids=[])
- # 删除用户岗位关联数据
- await UserCRUD(auth).set_user_positions_crud(user_ids=ids, position_ids=[])
- # 删除用户
- await UserCRUD(auth).delete(ids=ids)
- @classmethod
- async def get_current_user_info_service(cls, auth: AuthSchema) -> dict:
- """
- 获取当前用户信息
- 参数:
- - auth (AuthSchema): 认证信息模型
- 返回:
- - Dict: 当前用户详情字典
- """
- # 获取用户基本信息
- if not auth.user or not auth.user.id:
- raise CustomException(msg="用户不存在")
- user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
- # 获取部门名称
- if user and user.dept:
- UserOutSchema.dept_name = user.dept.name
- user_dict = UserOutSchema.model_validate(user).model_dump()
- # 获取菜单权限
- if auth.user and auth.user.is_superuser:
- # 使用树形结构查询,预加载children关系
- menu_all = await MenuCRUD(auth).get_tree_list_crud(
- search={"type": ("in", [1, 2, 4]), "status": "0"},
- order_by=[{"order": "asc"}],
- )
- menus = [MenuOutSchema.model_validate(menu).model_dump() for menu in menu_all]
- else:
- # 收集用户所有角色的菜单ID,使用列表推导式优化代码
- menu_ids = {
- menu.id
- for role in auth.user.roles or []
- for menu in role.menus
- if menu.status == "0" and menu.type in [1, 2, 4]
- }
- # 使用树形结构查询,预加载children关系
- menus = (
- [
- MenuOutSchema.model_validate(menu).model_dump()
- for menu in await MenuCRUD(auth).get_tree_list_crud(
- search={"id": ("in", list(menu_ids))},
- order_by=[{"order": "asc"}],
- )
- ]
- if menu_ids
- else []
- )
- user_dict["menus"] = traversal_to_tree(menus)
- return user_dict
- @classmethod
- async def update_current_user_info_service(
- cls, auth: AuthSchema, data: CurrentUserUpdateSchema
- ) -> dict:
- """
- 更新当前用户信息
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (CurrentUserUpdateSchema): 当前用户更新信息
- 返回:
- - Dict: 更新后的当前用户详情字典
- """
- if not auth.user or not auth.user.id:
- raise CustomException(msg="用户不存在")
- user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
- if not user:
- raise CustomException(msg="用户不存在")
- if user.is_superuser:
- raise CustomException(msg="超级管理员不能修改个人信息")
- # 新增:检查手机号是否重复
- if data.mobile:
- exist_mobile_user = await UserCRUD(auth).get_by_mobile_crud(mobile=data.mobile)
- if exist_mobile_user and exist_mobile_user.id != auth.user.id:
- raise CustomException(msg="更新失败,手机号已存在")
- # 新增:检查邮箱是否重复
- if data.email:
- exist_email_user = await UserCRUD(auth).get(email=data.email)
- if exist_email_user and exist_email_user.id != auth.user.id:
- raise CustomException(msg="更新失败,邮箱已存在")
- user_update_data = UserUpdateSchema(**data.model_dump())
- new_user = await UserCRUD(auth).update(id=auth.user.id, data=user_update_data)
- return UserOutSchema.model_validate(new_user).model_dump()
- @classmethod
- async def set_user_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
- """
- 设置用户状态
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (BatchSetAvailable): 批量设置用户状态数据
- 返回:
- - None
- """
- for id in data.ids:
- user = await UserCRUD(auth).get_by_id_crud(id=id)
- if not user:
- raise CustomException(msg=f"用户ID {id} 不存在")
- if user.is_superuser:
- raise CustomException(msg="超级管理员状态不能修改")
- await UserCRUD(auth).set_available_crud(ids=data.ids, status=data.status)
- @classmethod
- async def upload_avatar_service(cls, base_url: str, file: UploadFile) -> dict:
- """
- 上传用户头像
- 参数:
- - base_url (str): 基础URL
- - file (UploadFile): 上传的文件
- 返回:
- - Dict: 上传头像响应字典
- """
- filename, filepath, file_url = await UploadUtil.upload_file(file=file, base_url=base_url)
- return UploadResponseSchema(
- file_path=f"{filepath}",
- file_name=filename,
- origin_name=file.filename,
- file_url=f"{file_url}",
- ).model_dump()
- @classmethod
- async def change_user_password_service(
- cls, auth: AuthSchema, data: UserChangePasswordSchema
- ) -> dict:
- """
- 修改用户密码
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (UserChangePasswordSchema): 用户密码修改数据
- 返回:
- - Dict: 更新后的当前用户详情字典
- """
- if not auth.user or not auth.user.id:
- raise CustomException(msg="用户不存在")
- if not data.old_password or not data.new_password:
- raise CustomException(msg="密码不能为空")
- # 验证原密码
- user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
- if not user:
- raise CustomException(msg="用户不存在")
- if not PwdUtil.verify_password(
- plain_password=data.old_password, password_hash=user.password
- ):
- raise CustomException(msg="原密码输入错误")
- # 更新密码
- new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
- new_user = await UserCRUD(auth).change_password_crud(
- id=user.id, password_hash=new_password_hash
- )
- return UserOutSchema.model_validate(new_user).model_dump()
- @classmethod
- async def reset_user_password_service(cls, auth: AuthSchema, data: ResetPasswordSchema) -> dict:
- """
- 重置用户密码
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (ResetPasswordSchema): 用户密码重置数据
- 返回:
- - Dict: 更新后的当前用户详情字典
- """
- if not data.password:
- raise CustomException(msg="密码不能为空")
- # 验证用户
- user = await UserCRUD(auth).get_by_id_crud(id=data.id)
- if not user:
- raise CustomException(msg="用户不存在")
- # 检查是否是超级管理员
- if user.is_superuser:
- raise CustomException(msg="超级管理员密码不能重置")
- # 更新密码
- new_password_hash = PwdUtil.set_password_hash(password=data.password)
- new_user = await UserCRUD(auth).change_password_crud(
- id=data.id, password_hash=new_password_hash
- )
- return UserOutSchema.model_validate(new_user).model_dump()
- @classmethod
- async def register_user_service(cls, auth: AuthSchema, redis: Redis, data: UserRegisterSchema) -> dict:
- """
- 用户注册
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (UserRegisterSchema): 用户注册数据
- 返回:
- - Dict: 注册后的用户详情字典
- """
- # if not data.invite_code or data.invite_code != "8888":
- # raise CustomException("无效邀请码")
- # 检查用户名是否存在
- data.username = data.mobile
- username_ok = await UserCRUD(auth).get_by_mobile_crud(mobile=data.mobile)
- if username_ok:
- raise CustomException(msg="账号已存在")
- verify_result = await SmsCodeService.verify_sms_code_service(
- sms_code=SmsCodeSchema(
- mobile=data.mobile,
- template_name=data.template_name or "verify",
- code=data.sms_code,
- ),
- redis=redis
- )
- if not verify_result:
- raise CustomException("验证码过期或错误")
- tenant_data = TenantCreateSchema(
- name=data.mobile,
- code=data.mobile,
- )
- return await TenantService.create_service(auth=auth, data=tenant_data, password=data.password,)
- # data.password = PwdUtil.set_password_hash(password=data.password)
- # data.name = data.username
- # create_dict = data.model_dump(exclude_unset=True, exclude={"role_ids", "position_ids"})
- #
- # # 设置创建人ID
- # if auth.user and auth.user.id:
- # create_dict["created_id"] = auth.user.id
- #
- # result = await UserCRUD(auth).create(data=create_dict)
- # if data.role_ids:
- # await UserCRUD(auth).set_user_roles_crud(user_ids=[result.id], role_ids=data.role_ids)
- # return UserOutSchema.model_validate(result).model_dump()
- @classmethod
- async def forget_password_service(
- cls, auth: AuthSchema, data: UserForgetPasswordSchema
- ) -> dict:
- """
- 用户忘记密码
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (UserForgetPasswordSchema): 用户忘记密码数据
- 返回:
- - Dict: 更新后的当前用户详情字典
- """
- user = await UserCRUD(auth).get_by_username_crud(username=data.username)
- if not user:
- raise CustomException(msg="用户不存在")
- if user.status == "1":
- raise CustomException(msg="用户已停用")
- # 检查是否是超级管理员
- if user.is_superuser:
- raise CustomException(msg="超级管理员密码不能重置")
- new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
- new_user = await UserCRUD(auth).forget_password_crud(
- id=user.id, password_hash=new_password_hash
- )
- return UserOutSchema.model_validate(new_user).model_dump()
- @classmethod
- async def batch_import_user_service(
- cls, auth: AuthSchema, file: UploadFile, update_support: bool = False
- ) -> str:
- """
- 批量导入用户
- 参数:
- - auth (AuthSchema): 认证信息模型
- - file (UploadFile): 上传的Excel文件
- - update_support (bool, optional): 是否支持更新已存在用户. 默认值为False.
- 返回:
- - str: 导入结果消息
- """
- header_dict = {
- "部门编号": "dept_id",
- "账号": "username",
- "昵称": "name",
- "邮箱": "email",
- "手机号": "mobile",
- "性别": "gender",
- "状态": "status",
- }
- try:
- # 读取Excel文件
- contents = await file.read()
- df = pd.read_excel(io.BytesIO(contents))
- await file.close()
- if df.empty:
- raise CustomException(msg="导入文件为空")
- # 检查表头是否完整
- missing_headers = [header for header in header_dict.keys() if header not in df.columns]
- if missing_headers:
- raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
- # 重命名列名
- df.rename(columns=header_dict, inplace=True)
- # 验证必填字段
- required_fields = ["username", "name", "dept_id"]
- errors = []
- for field in required_fields:
- missing_rows = df[df[field].isnull()].index.tolist()
- if missing_rows:
- field_name = next(k for k, v in header_dict.items() if v == field)
- rows_str = "、".join([str(i + 1) for i in missing_rows])
- errors.append(f"{field_name}不能为空,第{rows_str}行")
- if errors:
- raise CustomException(msg=";".join(errors))
- error_msgs = []
- success_count = 0
- count = 0
- # 处理每一行数据
- for _index, row in df.iterrows():
- try:
- count = count + 1
- # 数据转换
- gender = "1" if row["gender"] == "男" else ("2" if row["gender"] == "女" else "1")
- status = "0" if row["status"] == "正常" else "1"
- # 构建用户数据
- user_data = {
- "username": str(row["username"]).strip(),
- "name": str(row["name"]).strip(),
- "email": str(row["email"]).strip(),
- "mobile": str(row["mobile"]).strip(),
- "gender": gender,
- "status": status,
- "dept_id": int(row["dept_id"]),
- "password": PwdUtil.set_password_hash(password="123456"), # 设置默认密码
- }
- # 处理用户导入
- exists_user = await UserCRUD(auth).get_by_username_crud(
- username=user_data["username"]
- )
- if exists_user:
- # 检查是否是超级管理员
- if exists_user.is_superuser:
- error_msgs.append(f"第{count}行: 超级管理员不允许修改")
- continue
- if update_support:
- user_update_data = UserUpdateSchema(**user_data)
- await UserCRUD(auth).update(id=exists_user.id, data=user_update_data)
- success_count += 1
- else:
- error_msgs.append(f"第{count}行: 用户 {user_data['username']} 已存在")
- else:
- user_create_schema = UserCreateSchema(**user_data)
- user_create_data = user_create_schema.model_dump(
- exclude_unset=True, exclude={"role_ids", "position_ids"}
- )
- new_user = await UserCRUD(auth).create(data=user_create_data)
- if user_create_schema.role_ids and len(user_create_schema.role_ids) > 0:
- await UserCRUD(auth).set_user_roles_crud(
- user_ids=[new_user.id], role_ids=user_create_schema.role_ids
- )
- if user_create_schema.position_ids and len(user_create_schema.position_ids) > 0:
- await UserCRUD(auth).set_user_positions_crud(
- user_ids=[new_user.id], position_ids=user_create_schema.position_ids
- )
- success_count += 1
- except Exception as e:
- error_msgs.append(f"第{count}行: 异常{e!s}")
- continue
- # 返回详细的导入结果
- result = f"成功导入 {success_count} 条数据"
- if error_msgs:
- result += "\n错误信息:\n" + "\n".join(error_msgs)
- return result
- except Exception as e:
- log.error(f"批量导入用户失败: {e!s}")
- raise CustomException(msg=f"导入失败: {e!s}")
- @classmethod
- async def get_import_template_user_service(cls) -> bytes:
- """
- 获取用户导入模板
- 返回:
- - bytes: Excel文件字节流
- """
- header_list = [
- "部门编号",
- "账号",
- "昵称",
- "邮箱",
- "手机号",
- "性别",
- "状态",
- ]
- selector_header_list = ["性别", "状态"]
- option_list = [
- {"性别": ["男", "女", "未知"]},
- {"状态": ["正常", "停用"]},
- ]
- return ExcelUtil.get_excel_template(
- header_list=header_list,
- selector_header_list=selector_header_list,
- option_list=option_list,
- )
- @classmethod
- async def export_user_list_service(cls, user_list: list[dict[str, Any]]) -> bytes:
- """
- 导出用户列表为Excel文件
- 参数:
- - user_list (list[dict[str, Any]]): 用户列表
- 返回:
- - bytes: Excel文件字节流
- """
- if not user_list:
- raise CustomException(msg="没有数据可导出")
- # 定义字段映射
- mapping_dict = {
- "id": "用户编号",
- "avatar": "头像",
- "username": "用户名称",
- "name": "用户昵称",
- "dept_name": "部门",
- "email": "邮箱",
- "mobile": "手机号",
- "gender": "性别",
- "status": "状态",
- "is_superuser": "是否超级管理员",
- "last_login": "最后登录时间",
- "description": "备注",
- "created_time": "创建时间",
- "updated_time": "更新时间",
- "updated_id": "更新者ID",
- }
- # 复制数据并转换
- # creator = {'id': 1, 'name': '管理员', 'username': 'admin'}
- data = user_list.copy()
- for item in data:
- item["status"] = "启用" if item.get("status") == "0" else "停用"
- gender = item.get("gender")
- item["gender"] = "男" if gender == "1" else ("女" if gender == "2" else "未知")
- item["is_superuser"] = "是" if item.get("is_superuser") else "否"
- item["creator"] = (
- item.get("creator", {}).get("name", "未知")
- if isinstance(item.get("creator"), dict)
- else "未知"
- )
- return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
|