| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- from typing import Annotated, Optional
- from fastapi import APIRouter, Depends, Path, Query
- from fastapi.responses import JSONResponse
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.common.response import ResponseSchema, SuccessResponse
- from app.core.dependencies import AuthPermission, get_current_user
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.core.router_class import OperationLogRoute
- from .crud import EmployeeCRUD
- from .schema import (
- EmployeeCreateOrUpdateSchema,
- EmployeeListOutSchema,
- EmployeeOperationOutSchema,
- EmployeeInviteQuerySchema,
- EmployeeInviteQueryOutSchema,
- )
- from .service import EmployeeService
- EmployeeRouter = APIRouter(
- route_class=OperationLogRoute,
- prefix="/employee",
- tags=["员工管理"],
- )
- @EmployeeRouter.get(
- "/info",
- summary="查询员工详情",
- description="查询员工详情",
- )
- async def info_employee_controller(
- auth: Annotated[AuthSchema, Depends(get_current_user)],
- enterprise_id: Annotated[Optional[str], Query(description="企业ID")] = None,
- employee_id: Annotated[Optional[str], Query(description="员工ID")] = None,
- employee_email: Annotated[Optional[str], Query(description="员工邮箱")] = None,
- employee_mobile: Annotated[Optional[str], Query(description="员工手机号")] = None,
- ) -> JSONResponse:
- """查询员工详情"""
- # 如果没传 enterprise_id,从当前用户推断
- if not enterprise_id:
- if not employee_mobile and auth.user and auth.user.mobile:
- employee_mobile = auth.user.mobile
-
- if employee_mobile:
- crud_cache = EmployeeCRUD(auth)
- employee = await crud_cache.get(employee_mobile=employee_mobile)
- if employee:
- enterprise_id = employee.enterprise_id
-
- if not enterprise_id:
- raise CustomException(msg="缺少企业ID参数")
-
- result = await EmployeeService.info_service(
- auth=auth,
- employee_id=employee_id,
- employee_email=employee_email,
- employee_mobile=employee_mobile,
- enterprise_id=enterprise_id
- )
- return SuccessResponse(data=result, msg="查询员工详情成功")
- @EmployeeRouter.post(
- "",
- summary="添加员工",
- description="添加员工 (alipay.commerce.ec.employee.add)",
- response_model=ResponseSchema[EmployeeOperationOutSchema],
- )
- async def add_employee_controller(
- data: EmployeeCreateOrUpdateSchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:employee:create"]))],
- ) -> JSONResponse:
- """添加员工"""
- result = await EmployeeService.add_employee_service(auth=auth, data=data)
- log.info(f"添加员工成功: {data.employee_name}, employee_id={result.employee_id}")
- # 不触发额度联动:员工需完成签约激活后才享受费控制度
- # 由 webhook employee.activated 通知触发
- return SuccessResponse(data=result, msg="添加员工成功")
- @EmployeeRouter.get(
- "",
- summary="查询员工列表",
- description="分页查询员工列表",
- response_model=ResponseSchema[EmployeeListOutSchema],
- )
- async def list_employee_controller(
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:employee:list"]))],
- page_no: Annotated[int, Query(description="页码")] = 1,
- page_size: Annotated[int, Query(description="每页数量")] = 20,
- enterprise_id: Annotated[str | None, Query(description="企业ID")] = None,
- employee_name: Annotated[str | None, Query(description="员工姓名")] = None,
- employee_no: Annotated[str | None, Query(description="员工工号")] = None,
- status: Annotated[str | None, Query(description="状态")] = None,
- ) -> JSONResponse:
- """查询员工列表"""
- search = {}
- if enterprise_id:
- search["enterprise_id"] = enterprise_id
- if employee_name:
- search["employee_name"] = employee_name
- if employee_no:
- search["employee_no"] = employee_no
- if status:
- search["status"] = status
- result = await EmployeeService.list_service(
- auth=auth, page_no=page_no, page_size=page_size, search=search
- )
- return SuccessResponse(data=result, msg="查询员工列表成功")
- @EmployeeRouter.get(
- "/detail",
- summary="查询员工详情",
- description="根据 employee_id 查询员工详情",
- )
- async def get_detail_controller(
- enterprise_id: Annotated[str, Query(description="企业ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:employee:detail"]))],
- employee_id: Annotated[Optional[str], Query(description="员工ID")] = None,
- employee_email: Annotated[Optional[str], Query(description="员工邮箱")] = None,
- employee_mobile: Annotated[Optional[str], Query(description="员工手机号")] = None,
- ) -> JSONResponse:
- """查询员工详情"""
- result = await EmployeeService.detail_service(auth=auth, employee_id=employee_id, employee_email=employee_email, employee_mobile=employee_mobile, enterprise_id=enterprise_id)
- log.info(f"查询员工详情成功: {employee_id}, enterprise_id={enterprise_id}")
- return SuccessResponse(data=result, msg="查询员工详情成功")
- @EmployeeRouter.delete(
- "/{employee_id}",
- summary="删除员工",
- description="删除员工 (alipay.commerce.ec.employee.delete)",
- response_model=ResponseSchema[EmployeeOperationOutSchema],
- )
- async def delete_employee_controller(
- employee_id: Annotated[str, Path(description="员工ID")],
- enterprise_id: Annotated[str, Query(description="企业ID")],
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:employee:delete"]))],
- ) -> JSONResponse:
- """删除员工"""
- result = await EmployeeService.delete_employee_service(
- auth=auth, employee_id=employee_id, enterprise_id=enterprise_id
- )
- log.info(f"删除员工成功: {employee_id}, enterprise_id={enterprise_id}")
- return SuccessResponse(data=result, msg="删除员工成功")
- @EmployeeRouter.post(
- "/invite/query",
- summary="获取员工签约激活链接",
- description="获取员工签约激活链接 (alipay.commerce.ec.employee.invite.query)",
- response_model=ResponseSchema[EmployeeInviteQueryOutSchema],
- )
- async def invite_query_controller(
- data: EmployeeInviteQuerySchema,
- auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:employee:invite"]))],
- ) -> JSONResponse:
- """获取员工签约激活链接"""
- result = await EmployeeService.invite_query_service(auth=auth, data=data)
- log.info(f"获取员工签约激活链接成功: enterprise_id={data.enterprise_id}")
- return SuccessResponse(data=result, msg="获取员工签约激活链接成功")
|