from datetime import datetime from typing import Optional, cast, List from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str from .crud import EnterpriseCRUD from .enums import EnterpriseStatusEnum from .schema import ( EnterpriseApplyInviteSchema, EnterpriseCreateOrUpdateSchema, EnterpriseInviteOutSchema, EnterpriseListOutSchema, EnterpriseOperationOutSchema, ) from alipay.aop.api.domain.EnterpriseInfoDTO import EnterpriseInfoDTO class EnterpriseService: """企业服务层""" @classmethod async def create_service_from_alipay( cls, auth: AuthSchema, data: EnterpriseCreateOrUpdateSchema ) -> EnterpriseOperationOutSchema: """ 创建企业记录(仅创建记录,不申请邀请码) """ crud = EnterpriseCRUD(auth) data.validate_enterprise_id() enterprise = await crud.create(data.model_dump(exclude_none=True)) if not enterprise: raise CustomException(msg="创建企业记录失败") return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id) @classmethod async def apply_invite_service( cls, auth: AuthSchema, redis: Redis, data: EnterpriseApplyInviteSchema ) -> EnterpriseInviteOutSchema: """ 申请企业邀请码(可多次调用,但需检查是否已存在且未过期) 调用: alipay.commerce.ec.enterprise.registerinvite.create """ from alipay.aop.api.request.AlipayCommerceEcEnterpriseRegisterinviteCreateRequest import ( AlipayCommerceEcEnterpriseRegisterinviteCreateRequest, ) from alipay.aop.api.domain.AlipayCommerceEcEnterpriseRegisterinviteCreateModel import ( AlipayCommerceEcEnterpriseRegisterinviteCreateModel, EnterpriseBaseInfoDTO, EnterpriseProfilesDTO ) from alipay.aop.api.response.AlipayCommerceEcEnterpriseRegisterinviteCreateResponse import ( AlipayCommerceEcEnterpriseRegisterinviteCreateResponse ) model = AlipayCommerceEcEnterpriseRegisterinviteCreateModel() model.out_biz_no = get_snowflake_id_str(auth.tenant_id) # 支付宝接口需要 out_biz_no,用业务主键 enterprise_id model.identity_type = data.identity_type model.identity = data.identity model.identity_open_id = data.identity_open_id # model.register_mode = None # model.sign_fund_way = None request = AlipayCommerceEcEnterpriseRegisterinviteCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请邀请码失败: 无响应") result = AlipayCommerceEcEnterpriseRegisterinviteCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"申请邀请码失败: {result.msg}") invite_time = datetime.now() invite_out = EnterpriseInviteOutSchema( pc_invite_url=result.pc_invite_url or "", invite_time=invite_time, expire_time=result.expire_time, ) # 根据out_biz_no关联tenant_id存储redis,方便后期支付宝回调用于创建企业, 过期时间为三个月 # await redis.set(f"alipay:notify:{model.out_biz_no}", auth.tenant_id, ex=60 * 5 * 24 * 30) return invite_out @classmethod async def detail_service(cls, auth: AuthSchema, enterprise_id: str) -> EnterpriseInfoDTO: """ 查询企业详情 调用: alipay.commerce.ec.enterprise.info.query """ from alipay.aop.api.request.AlipayCommerceEcEnterpriseInfoQueryRequest import AlipayCommerceEcEnterpriseInfoQueryRequest from alipay.aop.api.domain.AlipayCommerceEcEnterpriseInfoQueryModel import AlipayCommerceEcEnterpriseInfoQueryModel from alipay.aop.api.response.AlipayCommerceEcEnterpriseInfoQueryResponse import AlipayCommerceEcEnterpriseInfoQueryResponse model = AlipayCommerceEcEnterpriseInfoQueryModel() model.enterprise_id = enterprise_id request = AlipayCommerceEcEnterpriseInfoQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询企业详情失败: 无响应") result = AlipayCommerceEcEnterpriseInfoQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询企业详情失败: {result.msg}") return result.enterprise_info # type: ignore @classmethod async def all_service( cls, auth: AuthSchema, ) -> list[EnterpriseListOutSchema]: if not auth.tenant_id or auth.tenant_id == 1: return [] crud = EnterpriseCRUD(auth) models = await crud.list() if not models or len(models) == 0: return [] return [EnterpriseListOutSchema.model_validate(model) for model in models] @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询企业列表 """ crud = EnterpriseCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=EnterpriseListOutSchema, ) @classmethod async def update_service_from_alipay( cls, auth: AuthSchema, data: EnterpriseCreateOrUpdateSchema ) -> EnterpriseOperationOutSchema: """ 修改企业信息 调用: alipay.commerce.ec.enterprise.info.modify (可选) """ crud = EnterpriseCRUD(auth) data.validate_enterprise_id() enterprise_id = cast(str, data.enterprise_id) enterprise = await crud.get_by_enterprise_id(enterprise_id=enterprise_id) if not enterprise: enterprise = await EnterpriseService.create_service_from_alipay(auth, data) else: update_data = data.model_dump(exclude_none=True, exclude={"enterprise_id"}) await crud.update_by_enterprise_id(enterprise_id, update_data) # 当状态为签约成功或认证时,调用支付宝查询企业详情 if data.status and data.status in [ EnterpriseStatusEnum.ENTERPRISE_ACTIVATED.value, EnterpriseStatusEnum.ENTERPRISE_AUTH.value ]: log.info(f"从支付宝平台查询企业详情: enterprise_id={enterprise_id}") enterprise_info = await EnterpriseService.detail_service(auth, enterprise_id) # 同步支付宝查询到的企业信息 query_data = EnterpriseCreateOrUpdateSchema( # enterprise_id=result.enterprise_info.enterprise_id, account_id=enterprise_info.account_id, name=enterprise_info.enterprise_name, short_name=enterprise_info.enterprise_alias, ) await crud.update_by_enterprise_id( enterprise_id, query_data.model_dump(exclude_none=True, exclude={"enterprise_id"})) return EnterpriseOperationOutSchema(enterprise_id=enterprise_id) @classmethod async def enterprise_unsign_service( cls, auth: AuthSchema, enterprise_id: str ) -> EnterpriseOperationOutSchema: """ 企业解约 调用: alipay.commerce.ec.enterprise.unsign 注意: 支付宝异步通知后才会更新状态,此处仅调用接口 """ crud = EnterpriseCRUD(auth) enterprise = await crud.get_by_enterprise_id(enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_UNSIGN.value: raise CustomException(msg="企业已解约") if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_WITHDRAW.value: raise CustomException(msg="企业已注销,无法解约") client = AlipayClient.get_client() from alipay.aop.api.request.AlipayCommerceEcEnterpriseUnsignRequest import ( AlipayCommerceEcEnterpriseUnsignRequest, ) from alipay.aop.api.domain.AlipayCommerceEcEnterpriseUnsignModel import ( AlipayCommerceEcEnterpriseUnsignModel, ) from alipay.aop.api.response.AlipayCommerceEcEnterpriseUnsignResponse import ( AlipayCommerceEcEnterpriseUnsignResponse, ) model = AlipayCommerceEcEnterpriseUnsignModel() model.enterprise_id = enterprise.enterprise_id request = AlipayCommerceEcEnterpriseUnsignRequest() request.biz_model = model response = client.execute(request) if not response: raise CustomException(msg="企业解约申请失败: 无响应") result = AlipayCommerceEcEnterpriseUnsignResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"企业解约申请失败: {result.msg}") return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id) @classmethod async def enterprise_delete_service( cls, auth: AuthSchema, enterprise_id: str ) -> EnterpriseOperationOutSchema: """ 企业注销 调用: alipay.commerce.ec.enterprise.delete 注意: 支付宝异步通知后才会更新状态,此处仅调用接口 """ crud = EnterpriseCRUD(auth) enterprise = await crud.get_by_enterprise_id(enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") if enterprise.status == EnterpriseStatusEnum.ENTERPRISE_WITHDRAW.value: raise CustomException(msg="企业已注销") if enterprise.status != EnterpriseStatusEnum.ENTERPRISE_UNSIGN.value: raise CustomException(msg="请先完成解约再注销") client = AlipayClient.get_client() from alipay.aop.api.request.AlipayCommerceEcEnterpriseDeleteRequest import ( AlipayCommerceEcEnterpriseDeleteRequest, ) from alipay.aop.api.domain.AlipayCommerceEcEnterpriseDeleteModel import ( AlipayCommerceEcEnterpriseDeleteModel, ) from alipay.aop.api.response.AlipayCommerceEcEnterpriseDeleteResponse import ( AlipayCommerceEcEnterpriseDeleteResponse, ) model = AlipayCommerceEcEnterpriseDeleteModel() model.enterprise_id = enterprise.enterprise_id request = AlipayCommerceEcEnterpriseDeleteRequest() request.biz_model = model response = client.execute(request) if not response: raise CustomException(msg="企业注销申请失败: 无响应") result = AlipayCommerceEcEnterpriseDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"企业注销申请失败: {result.msg}") return EnterpriseOperationOutSchema(enterprise_id=enterprise.enterprise_id)