| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 |
- import json
- from fastapi import UploadFile
- from redis.asyncio.client import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.enums import RedisInitKeyConfig
- from app.core.base_schema import UploadResponseSchema
- from app.core.database import async_db_session
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.core.redis_crud import RedisCURD
- from app.utils.excel_util import ExcelUtil
- from app.utils.upload_util import UploadUtil
- from .crud import ParamsCRUD
- from .schema import (
- ParamsCreateSchema,
- ParamsOutSchema,
- ParamsQueryParam,
- ParamsUpdateSchema,
- )
- class ParamsService:
- """
- 配置管理模块服务层
- """
- @classmethod
- async def get_obj_detail_service(cls, auth: AuthSchema, id: int) -> dict:
- """
- 获取配置详情
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 配置管理型ID
- 返回:
- - dict: 配置管理型模型实例字典表示
- """
- obj = await ParamsCRUD(auth).get_obj_by_id_crud(id=id)
- return ParamsOutSchema.model_validate(obj).model_dump()
- @classmethod
- async def get_obj_by_key_service(cls, auth: AuthSchema, config_key: str) -> dict:
- """
- 根据配置键获取配置详情
- 参数:
- - auth (AuthSchema): 认证信息模型
- - config_key (str): 配置管理型key
- 返回:
- - Dict: 配置管理型模型实例字典表示
- """
- obj = await ParamsCRUD(auth).get_obj_by_key_crud(key=config_key)
- if not obj:
- raise CustomException(msg=f"配置键 {config_key} 不存在")
- return ParamsOutSchema.model_validate(obj).model_dump()
- @classmethod
- async def get_config_value_by_key_service(cls, auth: AuthSchema, config_key: str) -> str | None:
- """
- 根据配置键获取配置值
- 参数:
- - auth (AuthSchema): 认证信息模型
- - config_key (str): 配置管理型key
- 返回:
- - str | None: 配置值字符串或None
- """
- obj = await ParamsCRUD(auth).get_obj_by_key_crud(key=config_key)
- if not obj:
- raise CustomException(msg=f"配置键 {config_key} 不存在")
- return obj.config_value
- @classmethod
- async def get_obj_list_service(
- cls,
- auth: AuthSchema,
- search: ParamsQueryParam | None = None,
- order_by: list[dict] | None = None,
- ) -> list[dict]:
- """
- 获取配置管理型列表
- 参数:
- - auth (AuthSchema): 认证信息模型
- - search (ParamsQueryParam | None): 查询参数对象
- - order_by (list[dict] | None): 排序参数列表
- 返回:
- - list[dict]: 配置管理型模型实例字典列表表示
- """
- obj_list = None
- if search:
- obj_list = await ParamsCRUD(auth).get_obj_list_crud(
- search=search.__dict__, order_by=order_by
- )
- else:
- obj_list = await ParamsCRUD(auth).get_obj_list_crud()
- return [ParamsOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- @classmethod
- async def get_obj_page_service(
- cls,
- auth: AuthSchema,
- page_no: int,
- page_size: int,
- search: ParamsQueryParam | None = None,
- order_by: list[dict[str, str]] | None = None,
- ) -> dict:
- """
- 分页查询系统参数(数据库 OFFSET/LIMIT)。
- 参数:
- - auth (AuthSchema): 认证信息模型
- - page_no (int): 页码(从 1 开始)
- - page_size (int): 每页条数
- - search (ParamsQueryParam | None): 查询条件
- - order_by (list[dict[str, str]] | None): 排序字段列表
- 返回:
- - dict: 分页结果(结构由 `CRUD.page` 返回约定)
- """
- offset = (page_no - 1) * page_size
- return await ParamsCRUD(auth).page(
- offset=offset,
- limit=page_size,
- order_by=order_by or [{"id": "asc"}],
- search=search.__dict__ if search else {},
- out_schema=ParamsOutSchema,
- )
- @classmethod
- async def create_obj_service(
- cls, auth: AuthSchema, redis: Redis, data: ParamsCreateSchema
- ) -> dict:
- """
- 创建配置管理型
- 参数:
- - auth (AuthSchema): 认证信息模型
- - redis (Redis): Redis 客户端实例
- - data (ParamsCreateSchema): 配置管理型创建模型
- 返回:
- - dict: 新创建的配置管理型模型实例字典表示
- """
- exist_obj = await ParamsCRUD(auth).get(config_key=data.config_key)
- if exist_obj:
- raise CustomException(msg="创建失败,该配置key已存在")
- obj = await ParamsCRUD(auth).create_obj_crud(data=data)
- new_obj_dict = ParamsOutSchema.model_validate(obj).model_dump()
- # 同步redis
- redis_key = f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:{data.config_key}"
- try:
- result = await RedisCURD(redis).set(
- key=redis_key,
- value="",
- )
- if not result:
- log.error(f"同步配置到缓存失败: {new_obj_dict}")
- raise CustomException(msg="同步配置到缓存失败")
- except Exception as e:
- log.error(f"创建字典类型失败: {e}")
- raise CustomException(msg=f"创建字典类型失败 {e}")
- return new_obj_dict
- @classmethod
- async def update_obj_service(
- cls, auth: AuthSchema, redis: Redis, id: int, data: ParamsUpdateSchema
- ) -> dict:
- """
- 更新配置管理型
- 参数:
- - auth (AuthSchema): 认证信息模型
- - redis (Redis): Redis 客户端实例
- - id (int): 配置管理型ID
- - data (ParamsUpdateSchema): 配置管理型更新模型
- 返回:
- - Dict: 更新后的配置管理型模型实例字典表示
- """
- exist_obj = await ParamsCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist_obj:
- raise CustomException(msg="更新失败,该数系统配置不存在")
- if exist_obj.config_key != data.config_key:
- raise CustomException(msg="更新失败,系统配置key不允许修改")
- new_obj = await ParamsCRUD(auth).update_obj_crud(id=id, data=data)
- if not new_obj:
- raise CustomException(msg="更新失败,系统配置不存在")
- out = ParamsOutSchema.model_validate(new_obj)
- new_obj_dict = out.model_dump()
- redis_payload = out.model_dump(mode="json")
- # 同步redis
- redis_key = f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:{new_obj.config_key}"
- try:
- value = json.dumps(redis_payload, ensure_ascii=False)
- result = await RedisCURD(redis).set(
- key=redis_key,
- value=value,
- )
- if not result:
- log.error(f"同步配置到缓存失败: {new_obj_dict}")
- raise CustomException(msg="同步配置到缓存失败")
- except Exception as e:
- log.error(f"更新系统配置失败: {e}")
- raise CustomException(msg="更新系统配置失败")
- return new_obj_dict
- @classmethod
- async def delete_obj_service(cls, auth: AuthSchema, redis: Redis, ids: list[int]) -> None:
- """
- 删除配置管理型
- 参数:
- - auth (AuthSchema): 认证信息模型
- - redis (Redis): Redis 客户端实例
- - ids (list[int]): 配置管理型ID列表
- 返回:
- - None
- """
- if len(ids) < 1:
- raise CustomException(msg="删除失败,删除对象不能为空")
- for id in ids:
- exist_obj = await ParamsCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist_obj:
- raise CustomException(msg="删除失败,该数据字典类型不存在")
- # 检查是否是否初始化类型
- if exist_obj.config_type:
- # 如果有字典数据,不能删除
- raise CustomException(
- msg=f"{exist_obj.config_name} 删除失败,系统初始化配置不可以删除"
- )
- await ParamsCRUD(auth).delete_obj_crud(ids=ids)
- # 同步删除Redis缓存
- for id in ids:
- exist_obj = await ParamsCRUD(auth).get_obj_by_id_crud(id=id)
- if not exist_obj:
- continue
- redis_key = f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:{exist_obj.config_key}"
- try:
- await RedisCURD(redis).delete(redis_key)
- log.info(f"删除系统配置成功: {id}")
- except Exception as e:
- log.error(f"删除系统配置失败: {e}")
- raise CustomException(msg="删除字典类型失败")
- @classmethod
- async def export_obj_service(cls, data_list: list[dict]) -> bytes:
- """
- 导出系统配置列表
- 参数:
- - data_list (list[dict]): 系统配置模型实例字典列表表示
- 返回:
- - bytes: Excel文件二进制数据
- """
- mapping_dict = {
- "id": "编号",
- "config_name": "参数名称",
- "config_key": "参数键名",
- "config_value": "参数键值",
- "config_type": "系统内置((True:是 False:否))",
- "description": "备注",
- "created_time": "创建时间",
- "updated_time": "更新时间",
- "created_id": "创建者ID",
- "updated_id": "更新者ID",
- }
- # 复制数据并转换状态
- data = data_list.copy()
- for item in data:
- # 处理状态
- item["config_type"] = "是" if item.get("config_type") else "否"
- item["creator"] = (
- item.get("creator", {}).get("name", "未知")
- if isinstance(item.get("creator"), dict)
- else "未知"
- )
- return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
- @classmethod
- async def upload_service(cls, base_url: str, file: UploadFile) -> dict:
- """
- 上传文件
- 参数:
- - base_url (str): 基础URL
- - file (UploadFile): 上传的文件对象
- 返回:
- - dict: 上传文件的响应模型实例字典表示
- """
- filename, filepath, file_url = await UploadUtil.upload_file(file=file, base_url=base_url)
- return UploadResponseSchema(
- file_path=f"{filepath}",
- file_name=filename,
- origin_name=file.filename,
- file_url=f"{file_url}",
- ).model_dump()
- @classmethod
- async def init_config_service(cls, redis: Redis) -> None:
- """
- 初始化系统配置
- 参数:
- - redis (Redis): Redis 客户端实例
- 返回:
- - None
- """
- async with async_db_session() as session:
- async with session.begin():
- # 在初始化过程中,不需要检查数据权限
- auth = AuthSchema(db=session, check_data_scope=False)
- config_obj = await ParamsCRUD(auth).get_obj_list_crud()
- if not config_obj:
- raise CustomException(msg="系统配置不存在")
- try:
- # 保存到Redis并设置过期时间
- for config in config_obj:
- redis_key = f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:{config.config_key}"
- out = ParamsOutSchema.model_validate(config)
- config_obj_dict = out.model_dump()
- redis_payload = out.model_dump(mode="json")
- value = json.dumps(redis_payload, ensure_ascii=False)
- result = await RedisCURD(redis).set(
- key=redis_key,
- value=value,
- )
- if not result:
- log.error(f"❌️ 初始化系统配置失败: {config_obj_dict}")
- raise CustomException(msg="初始化系统配置失败")
- except Exception as e:
- log.error(f"❌️ 初始化系统配置失败: {e}")
- raise CustomException(msg="初始化系统配置失败")
- @classmethod
- async def get_init_config_service(cls, redis: Redis) -> list[dict]:
- """
- 获取系统配置
- 参数:
- - redis (Redis): Redis 客户端实例
- 返回:
- - list[dict]: 系统配置模型实例字典列表表示
- """
- redis_keys = await RedisCURD(redis).get_keys(f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:*")
- redis_configs = await RedisCURD(redis).mget(redis_keys)
- configs = []
- for config in redis_configs:
- if not config:
- continue
- try:
- new_config = json.loads(config)
- configs.append(new_config)
- except Exception as e:
- log.error(f"解析系统配置数据失败: {e}")
- continue
- return configs
- @classmethod
- async def get_system_config_for_middleware(cls, redis: Redis) -> dict:
- """
- 获取中间件所需的系统配置
- 参数:
- - redis (Redis): Redis 客户端实例
- 返回:
- - dict: 包含演示模式、IP白名单、API白名单和IP黑名单的配置字典
- """
- # 定义需要获取的配置键
- config_keys = [
- f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:demo_enable",
- f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:ip_white_list",
- f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:white_api_list_path",
- f"{RedisInitKeyConfig.SYSTEM_CONFIG.key}:ip_black_list",
- ]
- # 批量获取配置
- config_values = await RedisCURD(redis).mget(config_keys)
- # 初始化默认配置
- config_result = {
- "demo_enable": False,
- "ip_white_list": [],
- "white_api_list_path": [],
- "ip_black_list": [],
- }
- # 解析演示模式配置
- if config_values[0]:
- try:
- demo_config = json.loads(config_values[0])
- config_result["demo_enable"] = (
- demo_config.get("config_value", False)
- if isinstance(demo_config, dict)
- else False
- )
- except json.JSONDecodeError:
- log.error("解析演示模式配置失败")
- # 解析IP白名单配置
- if config_values[1]:
- try:
- ip_white_config = json.loads(config_values[1])
- # 确保是列表类型
- config_result["ip_white_list"] = json.loads(ip_white_config.get("config_value", []))
- except json.JSONDecodeError:
- log.error("解析IP白名单配置失败")
- # 解析IP黑名单
- # 解析API路径白名单
- if config_values[2]:
- try:
- white_api_config = json.loads(config_values[2])
- # 确保是列表类型
- config_result["white_api_list_path"] = json.loads(
- white_api_config.get("config_value", [])
- )
- except json.JSONDecodeError:
- log.error("解析API白名单配置失败")
- # 解析IP黑名单
- if config_values[3]:
- try:
- black_ip_config = json.loads(config_values[3])
- # 确保是列表类型
- config_result["ip_black_list"] = json.loads(black_ip_config.get("config_value", []))
- except json.JSONDecodeError:
- log.error("解析IP黑名单配置失败")
- return config_result
|