service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. import io
  2. from typing import Any
  3. import pandas as pd
  4. from fastapi import UploadFile
  5. from app.api.v1.module_system.auth.schema import AuthSchema
  6. from app.core.base_schema import BatchSetAvailable
  7. from app.core.exceptions import CustomException
  8. from app.core.logger import log
  9. from app.utils.excel_util import ExcelUtil
  10. from .crud import Demo01CRUD
  11. from .schema import (
  12. Demo01CreateSchema,
  13. Demo01OutSchema,
  14. Demo01QueryParam,
  15. Demo01UpdateSchema,
  16. )
  17. class Demo01Service:
  18. """
  19. 示例管理模块服务层
  20. """
  21. @classmethod
  22. async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
  23. """
  24. 详情
  25. 参数:
  26. - auth (AuthSchema): 认证信息模型
  27. - id (int): 示例ID
  28. 返回:
  29. - dict: 示例模型实例字典
  30. """
  31. obj = await Demo01CRUD(auth).get_by_id_crud(id=id)
  32. if not obj:
  33. raise CustomException(msg="该数据不存在")
  34. return Demo01OutSchema.model_validate(obj).model_dump()
  35. @classmethod
  36. async def list_service(
  37. cls,
  38. auth: AuthSchema,
  39. search: Demo01QueryParam | None = None,
  40. order_by: list[dict[str, str]] | None = None,
  41. ) -> list[dict]:
  42. """
  43. 列表查询
  44. 参数:
  45. - auth (AuthSchema): 认证信息模型
  46. - search (Demo01QueryParam | None): 查询参数
  47. - order_by (list[dict[str, str]] | None): 排序参数
  48. 返回:
  49. - list[dict]: 示例模型实例字典列表
  50. """
  51. search_dict = search.__dict__ if search else None
  52. obj_list = await Demo01CRUD(auth).list_crud(search=search_dict, order_by=order_by)
  53. return [Demo01OutSchema.model_validate(obj).model_dump() for obj in obj_list]
  54. @classmethod
  55. async def page_service(
  56. cls,
  57. auth: AuthSchema,
  58. page_no: int,
  59. page_size: int,
  60. search: Demo01QueryParam | None = None,
  61. order_by: list[dict[str, str]] | None = None,
  62. ) -> dict:
  63. """
  64. 分页查询
  65. 参数:
  66. - auth (AuthSchema): 认证信息模型
  67. - page_no (int): 页码
  68. - page_size (int): 每页数量
  69. - search (Demo01QueryParam | None): 查询参数
  70. - order_by (list[dict[str, str]] | None): 排序参数
  71. 返回:
  72. - dict: 分页数据
  73. """
  74. search_dict = search.__dict__ if search else {}
  75. order_by_list = order_by or [{"id": "asc"}]
  76. offset = (page_no - 1) * page_size
  77. result = await Demo01CRUD(auth).page_crud(
  78. offset=offset,
  79. limit=page_size,
  80. order_by=order_by_list,
  81. search=search_dict,
  82. )
  83. return result
  84. @classmethod
  85. async def create_service(cls, auth: AuthSchema, data: Demo01CreateSchema) -> dict:
  86. """
  87. 创建
  88. 参数:
  89. - auth (AuthSchema): 认证信息模型
  90. - data (Demo01CreateSchema): 示例创建模型
  91. 返回:
  92. - dict: 示例模型实例字典
  93. """
  94. obj = await Demo01CRUD(auth).get(name=data.name)
  95. if obj:
  96. raise CustomException(msg="创建失败,名称已存在")
  97. obj = await Demo01CRUD(auth).create_crud(data=data)
  98. return Demo01OutSchema.model_validate(obj).model_dump()
  99. @classmethod
  100. async def update_service(cls, auth: AuthSchema, id: int, data: Demo01UpdateSchema) -> dict:
  101. """
  102. 更新
  103. 参数:
  104. - auth (AuthSchema): 认证信息模型
  105. - id (int): 示例ID
  106. - data (Demo01UpdateSchema): 示例更新模型
  107. 返回:
  108. - dict: 示例模型实例字典
  109. """
  110. # 检查数据是否存在
  111. obj = await Demo01CRUD(auth).get_by_id_crud(id=id)
  112. if not obj:
  113. raise CustomException(msg="更新失败,该数据不存在")
  114. # 检查名称是否重复
  115. exist_obj = await Demo01CRUD(auth).get(name=data.name)
  116. if exist_obj and exist_obj.id != id:
  117. raise CustomException(msg="更新失败,名称重复")
  118. obj = await Demo01CRUD(auth).update_crud(id=id, data=data)
  119. return Demo01OutSchema.model_validate(obj).model_dump()
  120. @classmethod
  121. async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  122. """
  123. 删除
  124. 参数:
  125. - auth (AuthSchema): 认证信息模型
  126. - ids (list[int]): 示例ID列表
  127. 返回:
  128. - None
  129. """
  130. if len(ids) < 1:
  131. raise CustomException(msg="删除失败,删除对象不能为空")
  132. # 检查所有要删除的数据是否存在
  133. for id in ids:
  134. obj = await Demo01CRUD(auth).get_by_id_crud(id=id)
  135. if not obj:
  136. raise CustomException(msg=f"删除失败,ID为{id}的数据不存在")
  137. await Demo01CRUD(auth).delete_crud(ids=ids)
  138. @classmethod
  139. async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  140. """
  141. 批量设置状态
  142. 参数:
  143. - auth (AuthSchema): 认证信息模型
  144. - data (BatchSetAvailable): 批量设置状态模型
  145. 返回:
  146. - None
  147. """
  148. await Demo01CRUD(auth).set_available_crud(ids=data.ids, status=data.status)
  149. @classmethod
  150. async def batch_export_service(cls, obj_list: list[dict[str, Any]]) -> bytes:
  151. """
  152. 批量导出
  153. 参数:
  154. - obj_list (list[dict[str, Any]]): 示例模型实例字典列表
  155. 返回:
  156. - bytes: Excel文件字节流
  157. """
  158. mapping_dict = {
  159. "id": "编号",
  160. "name": "名称",
  161. "status": "状态",
  162. "description": "备注",
  163. "created_time": "创建时间",
  164. "updated_time": "更新时间",
  165. "created_id": "创建者",
  166. }
  167. # 复制数据并转换状态
  168. data = obj_list.copy()
  169. for item in data:
  170. # 处理状态
  171. item["status"] = "启用" if item.get("status") == "0" else "停用"
  172. # 处理创建者
  173. creator_info = item.get("created_id")
  174. if isinstance(creator_info, dict):
  175. item["created_id"] = creator_info.get("name", "未知")
  176. else:
  177. item["created_id"] = "未知"
  178. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  179. @classmethod
  180. async def batch_import_service(
  181. cls, auth: AuthSchema, file: UploadFile, update_support: bool = False
  182. ) -> str:
  183. """
  184. 批量导入
  185. 参数:
  186. - auth (AuthSchema): 认证信息模型
  187. - file (UploadFile): 上传的Excel文件
  188. - update_support (bool): 是否支持更新存在数据
  189. 返回:
  190. - str: 导入结果信息
  191. """
  192. header_dict = {"名称": "name", "状态": "status", "描述": "description"}
  193. try:
  194. # 读取Excel文件
  195. contents = await file.read()
  196. df = pd.read_excel(io.BytesIO(contents))
  197. await file.close()
  198. if df.empty:
  199. raise CustomException(msg="导入文件为空")
  200. # 检查表头是否完整
  201. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  202. if missing_headers:
  203. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  204. # 重命名列名
  205. df.rename(columns=header_dict, inplace=True)
  206. # 验证必填字段
  207. required_fields = ["name", "status"]
  208. errors = []
  209. for field in required_fields:
  210. missing_rows = df[df[field].isnull()].index.tolist()
  211. if missing_rows:
  212. field_name = next(k for k, v in header_dict.items() if v == field)
  213. rows_str = "、".join([str(i + 1) for i in missing_rows])
  214. errors.append(f"{field_name}不能为空,第{rows_str}行")
  215. if errors:
  216. raise CustomException(msg=f"导入失败,以下行缺少必要字段:\n{'; '.join(errors)}")
  217. error_msgs = []
  218. success_count = 0
  219. count = 0
  220. # 处理每一行数据
  221. for _index, row in df.iterrows():
  222. count += 1
  223. try:
  224. # 数据转换前的类型检查
  225. try:
  226. status = "0" if row["status"] == "正常" else "1"
  227. except ValueError:
  228. error_msgs.append(f"第{count}行: 状态必须是'正常'或'停用'")
  229. continue
  230. # 构建用户数据
  231. data = {
  232. "name": str(row["name"]),
  233. "status": status,
  234. "description": str(row["description"]),
  235. }
  236. # 处理用户导入
  237. exists_obj = await Demo01CRUD(auth).get(name=data["name"])
  238. if exists_obj:
  239. if update_support:
  240. await Demo01CRUD(auth).update(id=exists_obj.id, data=data)
  241. success_count += 1
  242. else:
  243. error_msgs.append(f"第{count}行: 对象 {data['name']} 已存在")
  244. else:
  245. await Demo01CRUD(auth).create(data=data)
  246. success_count += 1
  247. except Exception as e:
  248. error_msgs.append(f"第{count}行: {e!s}")
  249. continue
  250. # 返回详细的导入结果
  251. result = f"成功导入 {success_count} 条数据"
  252. if error_msgs:
  253. result += "\n错误信息:\n" + "\n".join(error_msgs)
  254. return result
  255. except Exception as e:
  256. log.error(f"批量导入用户失败: {e!s}")
  257. raise CustomException(msg=f"导入失败: {e!s}")
  258. @classmethod
  259. async def import_template_download_service(cls) -> bytes:
  260. """
  261. 下载导入模板
  262. 返回:
  263. - bytes: Excel文件字节流
  264. """
  265. header_list = ["名称", "状态", "描述"]
  266. selector_header_list = ["状态"]
  267. option_list = [{"状态": ["正常", "停用"]}]
  268. return ExcelUtil.get_excel_template(
  269. header_list=header_list,
  270. selector_header_list=selector_header_list,
  271. option_list=option_list,
  272. )