from datetime import datetime from decimal import Decimal from typing import Any from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str from app.plugin.module_payment.enterprise.crud import EnterpriseCRUD from .crud import AccountCRUD, TransferCRUD, DepositCRUD, WithdrawCRUD from .enums import ( DepositStatusEnum, ) from .schema import ( AccountAuthorizeApplySchema, AccountAuthorizeApplyOutSchema, AccountCreateSchema, AccountDepositSchema, AccountDepositOutSchema, AccountOperationOutSchema, AccountQuerySchema, AccountTransferSchema, AccountTransferOutSchema, TransferListOutSchema, TransferOutSchema, TenantTransferCreate, TenantTransferResponse, ) class AccountService: """资金专户服务层""" @classmethod async def authorize_apply_service( cls, auth: AuthSchema, data: AccountAuthorizeApplySchema ) -> AccountAuthorizeApplyOutSchema: """ 申请转账授权签约(✅) 调用: alipay.commerce.ec.trans.authorize.apply """ from alipay.aop.api.request.AlipayCommerceEcTransAuthorizeApplyRequest import ( AlipayCommerceEcTransAuthorizeApplyRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAuthorizeApplyModel import ( AlipayCommerceEcTransAuthorizeApplyModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAuthorizeApplyResponse import ( AlipayCommerceEcTransAuthorizeApplyResponse, ) model = AlipayCommerceEcTransAuthorizeApplyModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAuthorizeApplyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请转账授权失败: 无响应") result = AlipayCommerceEcTransAuthorizeApplyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"申请转账授权失败: {result.msg}") return AccountAuthorizeApplyOutSchema( sign_url=result.sign_url, ) @classmethod async def create_account_service( cls, auth: AuthSchema, data: AccountCreateSchema ) -> AccountOperationOutSchema: """ 开通资金专户(✅) 调用: alipay.commerce.ec.trans.account.create """ from alipay.aop.api.request.AlipayCommerceEcTransAccountCreateRequest import ( AlipayCommerceEcTransAccountCreateRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountCreateModel import ( AlipayCommerceEcTransAccountCreateModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountCreateResponse import ( AlipayCommerceEcTransAccountCreateResponse, ) model = AlipayCommerceEcTransAccountCreateModel() model.enterprise_id = data.enterprise_id model.account_type = data.account_type or "ALL" # 收支全能户 model.scene = data.scene or "B2B_TRANS" # ToB转账场景 request = AlipayCommerceEcTransAccountCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="开通资金专户失败: 无响应") result = AlipayCommerceEcTransAccountCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"开通资金专户失败: {result.msg}") account_data = AccountCreateSchema( enterprise_id=model.enterprise_id, account_book_id=result.account_book_id, account_type=model.account_type, scene=model.scene, ) if result.account_book_id: account_data.account_book_id = result.account_book_id await AccountCRUD(auth).create(account_data) return AccountOperationOutSchema( enterprise_id=account_data.enterprise_id, account_book_id=account_data.account_book_id, ) @classmethod async def deposit_service( cls, auth: AuthSchema, data: AccountDepositSchema ) -> AccountDepositOutSchema: """ 资金专户充值(✅) 调用: alipay.commerce.ec.trans.account.deposit """ from alipay.aop.api.request.AlipayCommerceEcTransAccountDepositRequest import ( AlipayCommerceEcTransAccountDepositRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountDepositModel import ( AlipayCommerceEcTransAccountDepositModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountDepositResponse import ( AlipayCommerceEcTransAccountDepositResponse, ) model = AlipayCommerceEcTransAccountDepositModel() model.enterprise_id = data.enterprise_id model.account_book_id = data.account_book_id model.amount = str(data.amount) model.out_biz_no = get_snowflake_id_str(auth.tenant_id) request = AlipayCommerceEcTransAccountDepositRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="充值失败: 无响应") result = AlipayCommerceEcTransAccountDepositResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"充值失败: {result.msg}") deposit_crud = DepositCRUD(auth) deposit_data = { "enterprise_id": data.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": data.account_book_id, "amount": data.amount, "url": result.url, "status": DepositStatusEnum.DEALING.value, "remark": data.remark, } await deposit_crud.create(deposit_data) return AccountDepositOutSchema( url=result.url, ) @classmethod async def transfer_service( cls, auth: AuthSchema, data: AccountTransferSchema ) -> AccountTransferOutSchema: """ 资金专户转账(✅) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != auth.tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(auth.tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}") transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, } await transfer_crud.create(transfer_data) return AccountTransferOutSchema( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, ) @classmethod async def tenant_transfer_service( cls, auth: AuthSchema, tenant_id: int, data: TenantTransferCreate, request_ip: str, api_key_id: int | None = None, ) -> TenantTransferResponse: """ 租户API转账(通过API Key认证) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}") transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, } await transfer_crud.create(transfer_data) return TenantTransferResponse( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, ) # @classmethod # async def withdraw_service( # cls, # auth: AuthSchema, # data: AccountWithdrawSchema # ) -> AccountOperationOutSchema: # """ # 资金专户提现 # 调用: alipay.commerce.ec.trans.account.withdraw # """ # from alipay.aop.api.request.AlipayCommerceEcTransAccountWithdrawRequest import ( # AlipayCommerceEcTransAccountWithdrawRequest, # ) # from alipay.aop.api.domain.AlipayCommerceEcTransAccountWithdrawModel import ( # AlipayCommerceEcTransAccountWithdrawModel, # ) # from alipay.aop.api.response.AlipayCommerceEcTransAccountWithdrawResponse import ( # AlipayCommerceEcTransAccountWithdrawResponse, # ) # crud = AccountCRUD(auth) # enterprise = await crud.get_by_enterprise_id(data.enterprise_id) # if not enterprise: # raise CustomException(msg="企业不存在") # model = AlipayCommerceEcTransAccountWithdrawModel() # model.enterprise_id = enterprise.alipay_enterprise_id # model.account_book_id = data.account_book_id # model.amount = str(data.amount) # model.out_biz_no = data.out_biz_no # request = AlipayCommerceEcTransAccountWithdrawRequest() # request.biz_model = model # client = AlipayClient.get_client() # response = client.execute(request) # if not response: # raise CustomException(msg="提现失败: 无响应") # result = AlipayCommerceEcTransAccountWithdrawResponse() # result.parse_response_content(response) # if not result.is_success(): # log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") # raise CustomException(msg=f"提现失败: {result.msg}") # withdraw_crud = WithdrawCRUD(auth) # withdraw_data = { # "enterprise_id": data.enterprise_id, # "out_biz_no": data.out_biz_no, # "account_book_id": data.account_book_id, # "amount": data.amount, # "status": WithdrawStatusEnum.DEALING.value, # "order_no": result.order_no, # } # await withdraw_crud.create(withdraw_data) # return AccountOperationOutSchema( # enterprise_id=data.enterprise_id, # account_book_id=data.account_book_id, # out_biz_no=data.out_biz_no, # status=WithdrawStatusEnum.DEALING.value, # ) @classmethod async def query_account_service( cls, auth: AuthSchema, data: AccountQuerySchema ) -> list[Any]: """ 查询资金专户(调用支付宝接口) 调用: alipay.commerce.ec.trans.account.query """ from alipay.aop.api.request.AlipayCommerceEcTransAccountQueryRequest import ( AlipayCommerceEcTransAccountQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountQueryModel import ( AlipayCommerceEcTransAccountQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountQueryResponse import ( AlipayCommerceEcTransAccountQueryResponse, ) from alipay.aop.api.domain.FundAccountApiDTO import ( FundAccountApiDTO, ) model = AlipayCommerceEcTransAccountQueryModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAccountQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询资金专户失败: 无响应") result = AlipayCommerceEcTransAccountQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询资金专户失败: {result.msg}") collect = [] for v in list(result.account_list or []): account = FundAccountApiDTO.to_alipay_dict(v) collect.append(account) return collect @classmethod async def transfer_detail_service( cls, auth: AuthSchema, out_biz_no: str ) -> TransferOutSchema: """ 查询转账记录详情 """ crud = TransferCRUD(auth) transfer = await crud.get_by_out_biz_no(out_biz_no) if not transfer: raise CustomException(msg="转账记录不存在") return TransferOutSchema.model_validate(transfer) @classmethod async def transfer_list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询转账记录列表 """ log.info(f"查询转账记录列表: {page_no}, {page_size}, {search}") crud = TransferCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=TransferListOutSchema, ) @classmethod async def update_transfer_status_service( cls, auth: AuthSchema, order_no: str, status: str, ext_info: dict = {} ) -> None: """ 更新转账状态(由通知处理器调用) """ crud = TransferCRUD(auth) transfer = await crud.get_by_order_no(order_no) if not transfer: log.warning(f"转账记录不存在: {order_no}") return update_data = {} update_data["status"] = status if ext_info: update_data["ext_info"] = ext_info await crud.update_by_order_no(order_no, update_data) @classmethod async def update_deposit_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, ) -> None: """ 更新充值状态(由通知处理器调用) """ crud = DepositCRUD(auth) deposit = await crud.get_by_out_biz_no(out_biz_no) if not deposit: log.warning(f"充值记录不存在: {out_biz_no}") return update_data = {"status": status} await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def update_withdraw_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, error_code: str | None = None, error_msg: str | None = None, ) -> None: """ 更新提现状态(由通知处理器调用) """ crud = WithdrawCRUD(auth) withdraw = await crud.get_by_out_biz_no(out_biz_no) if not withdraw: log.warning(f"提现记录不存在: {out_biz_no}") return update_data = {"status": status} if error_code: update_data["error_code"] = error_code if error_msg: update_data["error_msg"] = error_msg await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def consume_detail_query_service( cls, auth: AuthSchema, pay_no: str, enterprise_id: str | None = None, ant_shop_id: str | None = None, query_options: list[str] | None = None, ) -> dict: """ 账单详情查询(✅) 调用: alipay.commerce.ec.consume.detail.query 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。 """ from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import ( AlipayCommerceEcConsumeDetailQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import ( AlipayCommerceEcConsumeDetailQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import ( AlipayCommerceEcConsumeDetailQueryResponse, ) model = AlipayCommerceEcConsumeDetailQueryModel() model.pay_no = pay_no if enterprise_id: model.enterprise_id = enterprise_id if ant_shop_id: model.ant_shop_id = ant_shop_id if query_options: model.query_options = query_options request = AlipayCommerceEcConsumeDetailQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="账单详情查询失败: 无响应") result = AlipayCommerceEcConsumeDetailQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"账单详情查询失败: {result.msg}") consume_info = result.consume_info if not consume_info: raise CustomException(msg="账单详情查询失败: 无账单信息") return { "account_id": consume_info.account_id, "pay_no": consume_info.pay_no, "consume_type": consume_info.consume_type, "gmt_biz_create": consume_info.gmt_biz_create, "consume_biz_type": consume_info.consume_biz_type, "consume_amount": consume_info.consume_amount, "order_complete_label": consume_info.order_complete_label, "refund_status": consume_info.refund_status, "refund_amount": consume_info.refund_amount, "peer_payer_card_name": consume_info.peer_payer_card_name, "user_id": getattr(consume_info, 'user_id', None), "open_id": getattr(consume_info, 'open_id', None), "enterprise_id": consume_info.enterprise_id, "employee_id": consume_info.employee_id, "enterprise_name": getattr(consume_info, 'enterprise_name', None), "employee_name": getattr(consume_info, 'employee_name', None), "consume_scene_code": getattr(consume_info, 'consume_scene_code', None), "consume_type_sub_category": getattr(consume_info, 'consume_type_sub_category', None), "consume_title": getattr(consume_info, 'consume_title', None), "gmt_pay": getattr(consume_info, 'gmt_pay', None), "gmt_refund": getattr(consume_info, 'gmt_refund', None), "pay_amount": getattr(consume_info, 'pay_amount', None), "invoice_amount": getattr(consume_info, 'invoice_amount', None), "peer_pay_amount": getattr(consume_info, 'peer_pay_amount', None), "subsidy_amount": getattr(consume_info, 'subsidy_amount', None), "ext_infos": getattr(consume_info, 'ext_infos', None), }