| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- from datetime import datetime
- from typing import Optional, cast, List
- from redis.asyncio import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id_str
- from .crud import EnterpriseCRUD
- from .enums import EnterpriseStatusEnum
- from .schema import (
- EnterpriseApplyInviteSchema,
- EnterpriseCreateOrUpdateSchema,
- EnterpriseInviteOutSchema,
- EnterpriseListOutSchema,
- EnterpriseOperationOutSchema,
- )
- from alipay.aop.api.domain.EnterpriseInfoDTO import EnterpriseInfoDTO
- class EnterpriseService:
- """企业服务层"""
- @classmethod
- async def create_service_from_alipay(
- cls, auth: AuthSchema, data: EnterpriseCreateOrUpdateSchema
- ) -> EnterpriseOperationOutSchema:
- """
- 创建企业记录(仅创建记录,不申请邀请码)
- """
- crud = EnterpriseCRUD(auth)
- data.validate_enterprise_id()
- enterprise = await crud.create(data.model_dump(exclude_none=True))
- if not enterprise:
- raise CustomException(msg="创建企业记录失败")
- return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id)
- @classmethod
- async def apply_invite_service(
- cls, auth: AuthSchema, redis: Redis, data: EnterpriseApplyInviteSchema
- ) -> EnterpriseInviteOutSchema:
- """
- 申请企业邀请码(可多次调用,但需检查是否已存在且未过期)
- 调用: alipay.commerce.ec.enterprise.registerinvite.create
- """
- from alipay.aop.api.request.AlipayCommerceEcEnterpriseRegisterinviteCreateRequest import (
- AlipayCommerceEcEnterpriseRegisterinviteCreateRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcEnterpriseRegisterinviteCreateModel import (
- AlipayCommerceEcEnterpriseRegisterinviteCreateModel, EnterpriseBaseInfoDTO, EnterpriseProfilesDTO
- )
- from alipay.aop.api.response.AlipayCommerceEcEnterpriseRegisterinviteCreateResponse import (
- AlipayCommerceEcEnterpriseRegisterinviteCreateResponse
- )
- model = AlipayCommerceEcEnterpriseRegisterinviteCreateModel()
- model.out_biz_no = get_snowflake_id_str(auth.tenant_id) # 支付宝接口需要 out_biz_no,用业务主键 enterprise_id
- model.identity_type = data.identity_type
- model.identity = data.identity
- model.identity_open_id = data.identity_open_id
- # model.register_mode = None
- # model.sign_fund_way = None
- request = AlipayCommerceEcEnterpriseRegisterinviteCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="申请邀请码失败: 无响应")
- result = AlipayCommerceEcEnterpriseRegisterinviteCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"申请邀请码失败: {result.msg}")
- invite_time = datetime.now()
- invite_out = EnterpriseInviteOutSchema(
- pc_invite_url=result.pc_invite_url or "",
- invite_time=invite_time,
- expire_time=result.expire_time,
- )
- # 根据out_biz_no关联tenant_id存储redis,方便后期支付宝回调用于创建企业, 过期时间为三个月
- # await redis.set(f"alipay:notify:{model.out_biz_no}", auth.tenant_id, ex=60 * 5 * 24 * 30)
- return invite_out
- @classmethod
- async def detail_service(cls, auth: AuthSchema, enterprise_id: str) -> EnterpriseInfoDTO:
- """
- 查询企业详情
- 调用: alipay.commerce.ec.enterprise.info.query
- """
- from alipay.aop.api.request.AlipayCommerceEcEnterpriseInfoQueryRequest import AlipayCommerceEcEnterpriseInfoQueryRequest
- from alipay.aop.api.domain.AlipayCommerceEcEnterpriseInfoQueryModel import AlipayCommerceEcEnterpriseInfoQueryModel
- from alipay.aop.api.response.AlipayCommerceEcEnterpriseInfoQueryResponse import AlipayCommerceEcEnterpriseInfoQueryResponse
- model = AlipayCommerceEcEnterpriseInfoQueryModel()
- model.enterprise_id = enterprise_id
- request = AlipayCommerceEcEnterpriseInfoQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询企业详情失败: 无响应")
- result = AlipayCommerceEcEnterpriseInfoQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询企业详情失败: {result.msg}")
- return result.enterprise_info # type: ignore
- @classmethod
- async def all_service(
- cls,
- auth: AuthSchema,
- ) -> list[EnterpriseListOutSchema]:
- if not auth.tenant_id or auth.tenant_id == 1:
- return []
- crud = EnterpriseCRUD(auth)
- models = await crud.list()
- if not models or len(models) == 0:
- return []
- return [EnterpriseListOutSchema.model_validate(model) for model in models]
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """
- 查询企业列表
- """
- crud = EnterpriseCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=EnterpriseListOutSchema,
- )
- @classmethod
- async def update_service_from_alipay(
- cls, auth: AuthSchema, data: EnterpriseCreateOrUpdateSchema
- ) -> EnterpriseOperationOutSchema:
- """
- 修改企业信息
- 调用: alipay.commerce.ec.enterprise.info.modify (可选)
- """
- crud = EnterpriseCRUD(auth)
- data.validate_enterprise_id()
- enterprise_id = cast(str, data.enterprise_id)
- enterprise = await crud.get_by_enterprise_id(enterprise_id=enterprise_id)
- if not enterprise:
- enterprise = await EnterpriseService.create_service_from_alipay(auth, data)
- else:
- update_data = data.model_dump(exclude_none=True, exclude={"enterprise_id"})
- await crud.update_by_enterprise_id(enterprise_id, update_data)
-
-
- # 当状态为签约成功或认证时,调用支付宝查询企业详情
- if data.status and data.status in [
- EnterpriseStatusEnum.ENTERPRISE_ACTIVATED.value,
- EnterpriseStatusEnum.ENTERPRISE_AUTH.value
- ]:
- log.info(f"从支付宝平台查询企业详情: enterprise_id={enterprise_id}")
- enterprise_info = await EnterpriseService.detail_service(auth, enterprise_id)
- # 同步支付宝查询到的企业信息
- query_data = EnterpriseCreateOrUpdateSchema(
- # enterprise_id=result.enterprise_info.enterprise_id,
- account_id=enterprise_info.account_id,
- name=enterprise_info.enterprise_name,
- short_name=enterprise_info.enterprise_alias,
- )
-
- await crud.update_by_enterprise_id(
- enterprise_id, query_data.model_dump(exclude_none=True, exclude={"enterprise_id"}))
-
- return EnterpriseOperationOutSchema(enterprise_id=enterprise_id)
- @classmethod
- async def enterprise_unsign_service(
- cls, auth: AuthSchema, enterprise_id: str
- ) -> EnterpriseOperationOutSchema:
- """
- 企业解约
- 调用: alipay.commerce.ec.enterprise.unsign
- 注意: 支付宝异步通知后才会更新状态,此处仅调用接口
- """
- crud = EnterpriseCRUD(auth)
- enterprise = await crud.get_by_enterprise_id(enterprise_id)
- if not enterprise:
- raise CustomException(msg="企业不存在")
- if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_UNSIGN.value:
- raise CustomException(msg="企业已解约")
- if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_WITHDRAW.value:
- raise CustomException(msg="企业已注销,无法解约")
- client = AlipayClient.get_client()
- from alipay.aop.api.request.AlipayCommerceEcEnterpriseUnsignRequest import (
- AlipayCommerceEcEnterpriseUnsignRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcEnterpriseUnsignModel import (
- AlipayCommerceEcEnterpriseUnsignModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcEnterpriseUnsignResponse import (
- AlipayCommerceEcEnterpriseUnsignResponse,
- )
- model = AlipayCommerceEcEnterpriseUnsignModel()
- model.enterprise_id = enterprise.enterprise_id
- request = AlipayCommerceEcEnterpriseUnsignRequest()
- request.biz_model = model
- response = client.execute(request)
- if not response:
- raise CustomException(msg="企业解约申请失败: 无响应")
- result = AlipayCommerceEcEnterpriseUnsignResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"企业解约申请失败: {result.msg}")
- return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id)
- @classmethod
- async def enterprise_delete_service(
- cls, auth: AuthSchema, enterprise_id: str
- ) -> EnterpriseOperationOutSchema:
- """
- 企业注销
- 调用: alipay.commerce.ec.enterprise.delete
- 注意: 支付宝异步通知后才会更新状态,此处仅调用接口
- """
- crud = EnterpriseCRUD(auth)
- enterprise = await crud.get_by_enterprise_id(enterprise_id)
- if not enterprise:
- raise CustomException(msg="企业不存在")
- if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_WITHDRAW.value:
- raise CustomException(msg="企业已注销")
- if enterprise.status != EnterpriseStatusEnum.ENTERPRISE_UNSIGN.value:
- raise CustomException(msg="请先完成解约再注销")
- client = AlipayClient.get_client()
- from alipay.aop.api.request.AlipayCommerceEcEnterpriseDeleteRequest import (
- AlipayCommerceEcEnterpriseDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcEnterpriseDeleteModel import (
- AlipayCommerceEcEnterpriseDeleteModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcEnterpriseDeleteResponse import (
- AlipayCommerceEcEnterpriseDeleteResponse,
- )
- model = AlipayCommerceEcEnterpriseDeleteModel()
- model.enterprise_id = enterprise.enterprise_id
- request = AlipayCommerceEcEnterpriseDeleteRequest()
- request.biz_model = model
- response = client.execute(request)
- if not response:
- raise CustomException(msg="企业注销申请失败: 无响应")
- result = AlipayCommerceEcEnterpriseDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"企业注销申请失败: {result.msg}")
- return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id)
|