validator.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import re
  2. from datetime import date, datetime, time
  3. from typing import Annotated, Any
  4. from pydantic import AfterValidator, PlainSerializer, WithJsonSchema
  5. from app.common.constant import DATE_DISPLAY_FMT, DATETIME_DISPLAY_FMT, RET, TIME_DISPLAY_FMT
  6. from app.core.exceptions import CustomException
  7. # 自定义日期时间字符串类型
  8. DateTimeStr = Annotated[
  9. datetime,
  10. AfterValidator(lambda x: datetime_validator(x)),
  11. PlainSerializer(
  12. lambda x: x.strftime(DATETIME_DISPLAY_FMT) if isinstance(x, datetime) else str(x),
  13. return_type=str,
  14. when_used="json",
  15. ),
  16. WithJsonSchema({"type": "string"}, mode="serialization"),
  17. ]
  18. # 自定义日期字符串类型
  19. DateStr = Annotated[
  20. date,
  21. AfterValidator(lambda x: date_validator(x)),
  22. PlainSerializer(
  23. lambda x: x.strftime(DATE_DISPLAY_FMT) if isinstance(x, date) else str(x),
  24. return_type=str,
  25. when_used="json",
  26. ),
  27. WithJsonSchema({"type": "string"}, mode="serialization"),
  28. ]
  29. # 自定义时间字符串类型
  30. TimeStr = Annotated[
  31. time,
  32. AfterValidator(lambda x: time_validator(x)),
  33. PlainSerializer(
  34. lambda x: x.strftime(TIME_DISPLAY_FMT) if isinstance(x, time) else str(x),
  35. return_type=str,
  36. when_used="json",
  37. ),
  38. WithJsonSchema({"type": "string"}, mode="serialization"),
  39. ]
  40. # 自定义手机号类型
  41. Telephone = Annotated[
  42. str,
  43. AfterValidator(lambda x: mobile_validator(x)),
  44. PlainSerializer(lambda x: x, return_type=str),
  45. WithJsonSchema({"type": "string"}, mode="serialization"),
  46. ]
  47. # 自定义邮箱类型
  48. Email = Annotated[
  49. str,
  50. AfterValidator(lambda x: email_validator(x)),
  51. PlainSerializer(lambda x: x, return_type=str),
  52. WithJsonSchema({"type": "string"}, mode="serialization"),
  53. ]
  54. def datetime_validator(value: str | datetime) -> datetime:
  55. """
  56. 日期格式验证器。
  57. 参数:
  58. - value (str | datetime): 日期值。
  59. 返回:
  60. - datetime: 格式化后的日期。
  61. 异常:
  62. - CustomException: 日期格式无效时抛出。
  63. """
  64. try:
  65. if isinstance(value, str):
  66. return datetime.strptime(value, DATETIME_DISPLAY_FMT)
  67. if isinstance(value, datetime):
  68. return value
  69. except Exception:
  70. raise CustomException(code=RET.ERROR.code, msg="无效的日期格式")
  71. def date_validator(value: str | date) -> date:
  72. """
  73. 日期格式验证器。
  74. 参数:
  75. - value (str | date): 日期值。
  76. 返回:
  77. - date: 格式化后的日期。
  78. 异常:
  79. - CustomException: 日期格式无效时抛出。
  80. """
  81. try:
  82. if isinstance(value, str):
  83. return datetime.strptime(value, DATE_DISPLAY_FMT).date()
  84. if isinstance(value, date):
  85. return value
  86. except Exception:
  87. raise CustomException(code=RET.ERROR.code, msg="无效的日期格式")
  88. def time_validator(value: str | time) -> time:
  89. """
  90. 时间格式验证器。
  91. 参数:
  92. - value (str | time): 时间值。
  93. 返回:
  94. - time: 格式化后的时间。
  95. 异常:
  96. - CustomException: 时间格式无效时抛出。
  97. """
  98. try:
  99. if isinstance(value, str):
  100. return datetime.strptime(value, TIME_DISPLAY_FMT).time()
  101. if isinstance(value, time):
  102. return value
  103. except Exception:
  104. raise CustomException(code=RET.ERROR.code, msg="无效的时间格式")
  105. def email_validator(value: str) -> str:
  106. """
  107. 邮箱地址验证器。
  108. 参数:
  109. - value (str): 邮箱地址。
  110. 返回:
  111. - str: 验证后的邮箱地址。
  112. 异常:
  113. - CustomException: 邮箱格式无效时抛出。
  114. """
  115. if not value:
  116. raise CustomException(code=RET.ERROR.code, msg="邮箱地址不能为空")
  117. regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
  118. if not re.match(regex, value):
  119. raise CustomException(code=RET.ERROR.code, msg="邮箱地址格式不正确")
  120. return value
  121. def mobile_validator(value: str | None) -> str | None:
  122. """
  123. 手机号验证器。
  124. 参数:
  125. - value (str | None): 手机号。
  126. 返回:
  127. - str | None: 验证后的手机号。
  128. 异常:
  129. - CustomException: 手机号格式无效时抛出。
  130. """
  131. if not value:
  132. return value
  133. if len(value) != 11 or not value.isdigit():
  134. raise CustomException(code=RET.ERROR.code, msg="手机号格式不正确")
  135. regex = r"^1(3\d|4[4-9]|5[0-35-9]|6[67]|7[013-8]|8[0-9]|9[0-9])\d{8}$"
  136. if not re.match(regex, value):
  137. raise CustomException(code=RET.ERROR.code, msg="手机号格式不正确")
  138. return value
  139. def validate_required_code(value: str | None) -> str:
  140. """
  141. 必填编码校验:字母开头,总长 2–16,仅含字母/数字/下划线。
  142. 参数:
  143. - value (str | None): 编码。
  144. 返回:
  145. - str: 去空白后的编码。
  146. 异常:
  147. - ValueError: 为空或格式不合法。
  148. """
  149. if value is None or not str(value).strip():
  150. raise ValueError("编码不能为空")
  151. v = value.strip()
  152. if not re.match(r"^[A-Za-z][A-Za-z0-9_]{1,15}$", v):
  153. raise ValueError("编码需字母开头,允许字母/数字/下划线,长度2-16")
  154. return v
  155. def code_validator(value: str | None) -> str | None:
  156. """
  157. 可选编码验证器(为空则跳过)。
  158. 参数:
  159. - value (str | None): 编码。
  160. 返回:
  161. - str | None: 验证后的编码;未填写时返回 None。
  162. 异常:
  163. - CustomException: 已填写但格式无效时抛出。
  164. """
  165. if not value:
  166. return value
  167. v = value.strip()
  168. if not v:
  169. return None
  170. if not re.match(r"^[A-Za-z][A-Za-z0-9_]{1,15}$", v):
  171. raise CustomException(
  172. code=RET.ERROR.code,
  173. msg="编码需字母开头,允许字母/数字/下划线,长度2-16",
  174. )
  175. return v
  176. def menu_request_validator(data: Any) -> Any:
  177. """
  178. 菜单请求数据验证器。
  179. 参数:
  180. - data (Any): 请求数据。
  181. 返回:
  182. - Any: 验证后的请求数据。
  183. 异常:
  184. - CustomException: 请求数据无效时抛出。
  185. """
  186. menu_types = {1: "目录", 2: "功能", 3: "权限", 4: "外链"}
  187. if data.type not in menu_types:
  188. raise CustomException(
  189. code=RET.ERROR.code,
  190. msg=f"菜单类型必须为: {','.join(map(str, menu_types.keys()))}",
  191. )
  192. if data.type in [1, 2]:
  193. if not data.route_name:
  194. raise CustomException(code=RET.ERROR.code, msg="路由名称不能为空")
  195. if not data.route_path:
  196. raise CustomException(code=RET.ERROR.code, msg="路由路径不能为空")
  197. if data.type == 1 and not (data.redirect and str(data.redirect).strip()):
  198. raise CustomException(code=RET.ERROR.code, msg="目录类型必须填写重定向地址")
  199. if data.type == 2 and not data.component_path:
  200. raise CustomException(code=RET.ERROR.code, msg="组件路径不能为空")
  201. return data
  202. def role_permission_request_validator(data: Any) -> Any:
  203. """
  204. 角色权限设置数据验证器。
  205. 参数:
  206. - data (Any): 请求数据。
  207. 返回:
  208. - Any: 验证后的请求数据。
  209. 异常:
  210. - CustomException: 请求数据无效时抛出。
  211. """
  212. data_scopes = {
  213. 1: "仅本人数据权限",
  214. 2: "本部门数据权限",
  215. 3: "本部门及以下数据权限",
  216. 4: "全部数据权限",
  217. 5: "自定义数据权限",
  218. }
  219. if data.data_scope not in data_scopes:
  220. raise CustomException(
  221. code=RET.ERROR.code,
  222. msg=f"数据权限范围必须为: {','.join(map(str, data_scopes.keys()))}",
  223. )
  224. if not data.role_ids:
  225. raise CustomException(code=RET.ERROR.code, msg="角色不能为空")
  226. return data