| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372 |
- from datetime import datetime
- from decimal import Decimal
- from typing import Any, Optional
- from redis.asyncio import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id_str
- from app.plugin.module_payment.enterprise.crud import EnterpriseCRUD
- from .crud import AccountCRUD, TransferCRUD, DepositCRUD, WithdrawCRUD
- from .enums import (
- DepositStatusEnum,
- WithdrawStatusEnum,
- )
- from .schema import (
- AccountAuthorizeApplySchema,
- AccountAuthorizeApplyOutSchema,
- AccountCreateSchema,
- AccountDepositSchema,
- AccountDepositOutSchema,
- AccountOperationOutSchema,
- AccountQuerySchema,
- AccountTransferSchema,
- AccountTransferOutSchema,
- AccountWithdrawSchema,
- ReceiptApplySchema,
- TransferListOutSchema,
- TransferOutSchema,
- TenantTransferCreate,
- TenantTransferResponse,
- )
- from ..openapi.crud import OpenTransferCRUD
- # 支付宝资金专户转账错误码 → 友好提示
- _TRANSFER_ERROR_HINTS = {
- "SYSTEM_ERROR": "系统繁忙,请稍后重试",
- "INVALID_PARAMETER": "请求参数有误,请检查后重试",
- "AMOUNT_LESS_THAN_ONE_CENT": "转账金额不能低于 0.01 元",
- "BALANCE_IS_NOT_ENOUGH": "企业余额不足,建议充值",
- "BANK_RESPONSE_ERROR": "银行处理失败:账户异常",
- "CARD_BIN_ERROR": "收款银行账号不正确,请确认",
- "DUPLICATE_DIFFERENT_REQUEST": "重复请求但参数不一致,请检查",
- "EXCEED_LIMIT_SM_MIN_AMOUNT": "转账金额不能低于 0.1 元",
- "EXCEED_LIMIT_DM_MAX_AMOUNT": "超出单日转账限额,请明天再试或联系管理员提升限额",
- "INVALID_ACCOUNT_BOOK": "资金专户不存在,请检查专户号",
- "INVALID_CARDNO": "无效的收款银行卡号",
- "INVALID_IDENTITY_TYPE": "收款方身份类型不匹配",
- "NO_AGREEMENT": "无转账权限,请联系管理员",
- "PAYEE_CARD_INFO_ERROR": "收款方账号或银行卡信息有误,请核实",
- "PAYEE_NOT_EXIST": "收款账号不存在或姓名有误",
- "PAYER_BALANCE_NOT_ENOUGH": "付款方余额不足,建议充值",
- "REQUEST_PROCESSING": "系统处理中,请稍后重试",
- "TRANS_AUTH_NO_EXIST": "转账授权协议不存在,请先签约",
- }
- def _parse_dt(val: str | None) -> datetime | None:
- """解析支付宝日期字符串"""
- if not val:
- return None
- for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
- try:
- return datetime.strptime(val, fmt)
- except ValueError:
- continue
- return None
- class AccountService:
- """资金专户服务层"""
- @classmethod
- async def stat_transfer_amount_service(
- cls,
- auth: AuthSchema,
- tenant_id: Optional[int] = None,
- enterprise_id: Optional[str] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- ) -> Decimal:
- """
- 统计转账金额(✅)
- 统计企业在指定时间范围内的转账总金额以及每天的转账金额。
- """
- crud = TransferCRUD(auth)
-
- return await crud.get_transfer_amount(
- tenant_id=tenant_id,
- enterprise_id=enterprise_id,
- start_date=start_date,
- end_date=end_date,
- )
- @classmethod
- async def stat_consume_amount_service(
- cls,
- auth: AuthSchema,
- tenant_id: Optional[int] = None,
- enterprise_id: Optional[str] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- ) -> Decimal:
- """
- 统计消费金额(✅)
- 统计企业在指定时间范围内的消费总金额。
- 数据来源: pay_bill 表,consume_type=CONSUME, status=PROCESSED
- """
- from app.plugin.module_payment.notification.crud import BillCRUD
- crud = BillCRUD(auth)
- return await crud.get_consume_amount(
- tenant_id=tenant_id,
- enterprise_id=enterprise_id,
- start_date=start_date,
- end_date=end_date,
- )
- @classmethod
- async def stat_summary_amount_service(
- cls,
- auth: AuthSchema,
- tenant_id: Optional[int] = None,
- enterprise_id: Optional[str] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- ) -> Decimal:
- """
- 汇总统计金额(✅)
- 汇总 = 消费统计 + 转账统计,对应时间段结果相加
- """
- transfer_amount = await cls.stat_transfer_amount_service(
- auth=auth,
- tenant_id=tenant_id,
- enterprise_id=enterprise_id,
- start_date=start_date,
- end_date=end_date,
- )
- consume_amount = await cls.stat_consume_amount_service(
- auth=auth,
- tenant_id=tenant_id,
- enterprise_id=enterprise_id,
- start_date=start_date,
- end_date=end_date,
- )
- return transfer_amount + consume_amount
- @classmethod
- async def authorize_apply_service(
- cls,
- auth: AuthSchema,
- data: AccountAuthorizeApplySchema
- ) -> AccountAuthorizeApplyOutSchema:
- """
- 申请转账授权签约(✅)
- 调用: alipay.commerce.ec.trans.authorize.apply
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAuthorizeApplyRequest import (
- AlipayCommerceEcTransAuthorizeApplyRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAuthorizeApplyModel import (
- AlipayCommerceEcTransAuthorizeApplyModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAuthorizeApplyResponse import (
- AlipayCommerceEcTransAuthorizeApplyResponse,
- )
- model = AlipayCommerceEcTransAuthorizeApplyModel()
- model.enterprise_id = data.enterprise_id
- request = AlipayCommerceEcTransAuthorizeApplyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="申请转账授权失败: 无响应")
- result = AlipayCommerceEcTransAuthorizeApplyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"申请转账授权失败: {result.msg}")
- return AccountAuthorizeApplyOutSchema(
- sign_url=result.sign_url,
- )
- @classmethod
- async def create_account_service(
- cls,
- auth: AuthSchema,
- data: AccountCreateSchema
- ) -> AccountOperationOutSchema:
- """
- 开通资金专户(✅)
- 调用: alipay.commerce.ec.trans.account.create
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountCreateRequest import (
- AlipayCommerceEcTransAccountCreateRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountCreateModel import (
- AlipayCommerceEcTransAccountCreateModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountCreateResponse import (
- AlipayCommerceEcTransAccountCreateResponse,
- )
- model = AlipayCommerceEcTransAccountCreateModel()
- model.enterprise_id = data.enterprise_id
- # model.account_type = data.account_type or "ALL" # 收支全能户
- # model.scene = data.scene or "B2B_TRANS" # ToB转账场景
- model.account_type = "ALL"
- model.scene = "B2B_TRANS"
- request = AlipayCommerceEcTransAccountCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="开通资金专户失败: 无响应")
- result = AlipayCommerceEcTransAccountCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"开通资金专户失败: {result.msg}")
- account_data = AccountCreateSchema(
- enterprise_id=model.enterprise_id,
- account_book_id=result.account_book_id,
- account_type=model.account_type,
- scene=model.scene,
- )
- if result.account_book_id:
- account_data.account_book_id = result.account_book_id
- await AccountCRUD(auth).create(account_data)
- return AccountOperationOutSchema(
- enterprise_id=account_data.enterprise_id,
- account_book_id=account_data.account_book_id,
- )
- @classmethod
- async def deposit_service(
- cls,
- auth: AuthSchema,
- data: AccountDepositSchema
- ) -> AccountDepositOutSchema:
- """
- 资金专户充值(✅)
- 调用: alipay.commerce.ec.trans.account.deposit
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountDepositRequest import (
- AlipayCommerceEcTransAccountDepositRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountDepositModel import (
- AlipayCommerceEcTransAccountDepositModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountDepositResponse import (
- AlipayCommerceEcTransAccountDepositResponse,
- )
- model = AlipayCommerceEcTransAccountDepositModel()
- model.enterprise_id = data.enterprise_id
- model.account_book_id = data.account_book_id
- model.amount = str(data.amount)
- model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
- request = AlipayCommerceEcTransAccountDepositRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="充值失败: 无响应")
- result = AlipayCommerceEcTransAccountDepositResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"充值失败: {result.msg}")
- deposit_crud = DepositCRUD(auth)
- deposit_data = {
- "enterprise_id": data.enterprise_id,
- "out_biz_no": model.out_biz_no,
- "account_book_id": data.account_book_id,
- "amount": data.amount,
- "url": result.url,
- "status": DepositStatusEnum.DEALING.value,
- "remark": data.remark,
- }
- await deposit_crud.create(deposit_data)
- return AccountDepositOutSchema(
- url=result.url,
- )
- @classmethod
- async def transfer_service(
- cls,
- auth: AuthSchema,
- data: AccountTransferSchema
- ) -> AccountTransferOutSchema:
- """
- 资金专户转账(✅)
- 调用: alipay.commerce.ec.trans.account.transfer
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import (
- AlipayCommerceEcTransAccountTransferRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import (
- AlipayCommerceEcTransAccountTransferModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import (
- AlipayCommerceEcTransAccountTransferResponse,
- )
- from alipay.aop.api.domain.TransParticipant import (
- TransParticipant,
- )
- from alipay.aop.api.domain.BankCardExtInfoDTO import (
- BankCardExtInfoDTO,
- )
-
- # 检查资金专户是否存在
- account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id)
- if not account:
- raise CustomException(msg="资金账户不存在")
- if account.tenant_id != auth.tenant_id:
- raise CustomException(msg="无权限操作")
- if data.enterprise_id and account.enterprise_id != data.enterprise_id:
- raise CustomException(msg="参数错误")
-
- if not data.order_title and account.enterprise_id:
- enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id)
- if not enterprise:
- raise CustomException(msg="资金账户所属企业不存在")
- data.order_title = f"来自{enterprise.name}转账"
- model = AlipayCommerceEcTransAccountTransferModel()
- model.enterprise_id = account.enterprise_id
- model.account_book_id = account.account_book_id
- model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
- # 转账总金额,单位为元,精确到小数点后两位
- model.amount = str(data.amount)
- model.order_title = data.order_title
- payee_info = TransParticipant()
- payee_info.identity_type = data.payee_info.identity_type
- payee_info.name = data.payee_info.name
- payee_info.identity = data.payee_info.identity
- if data.payee_info.bankcard_ext_info:
- payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict(
- data.payee_info.bankcard_ext_info.model_dump(exclude_none=True)
- )
- model.payee_info = payee_info
- request = AlipayCommerceEcTransAccountTransferRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="转账失败: 无响应")
- result = AlipayCommerceEcTransAccountTransferResponse()
- result.parse_response_content(response)
- sub_code = getattr(result, 'sub_code', '') or ''
- sub_msg = getattr(result, 'sub_msg', '') or ''
- # 构建转账记录数据,但延迟写入:
- # - 成功时在当前会话写入
- # - 失败时使用独立事务写入并提交,避免被外层回滚吞掉
- transfer_crud = TransferCRUD(auth)
- transfer_data = {
- "enterprise_id": model.enterprise_id,
- "out_biz_no": model.out_biz_no,
- "account_book_id": model.account_book_id,
- "amount": model.amount,
- "order_title": model.order_title,
- "payee_info": data.payee_info.model_dump() if data.payee_info else None,
- "payee_type": data.payee_info.identity_type if data.payee_info else None,
- "status": result.status,
- "order_no": result.order_no,
- "fund_order_id": result.fund_order_id,
- "remark": "",
- }
- log.info(f"记录转账: {transfer_data}")
- if not result.is_success():
- # 优先用 sub_code 匹配
- hint = _TRANSFER_ERROR_HINTS.get(sub_code)
- # sub_code 无匹配时,尝试从 sub_msg 中提取错误码(支付宝部分接口sub_code返回unknown-sub-code)
- if not hint:
- for code_key, code_hint in _TRANSFER_ERROR_HINTS.items():
- if code_key in sub_msg:
- hint = code_hint
- break
- hint = hint or sub_msg or result.msg or "转账失败"
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg} (sub_code={sub_code}, sub_msg={sub_msg})")
- # 使用独立的 session/事务保证失败记录能被持久化
- from app.core.database import async_db_session
- async with async_db_session() as _session:
- async with _session.begin():
- new_auth = AuthSchema(db=_session, check_data_scope=False)
- # 保持 tenant_id
- new_auth.tenant_id = getattr(auth, "tenant_id", None)
- transfer_data["status"]="FAIL"
- transfer_data["remark"]=f"{result.msg} ({sub_code} {sub_msg})"
- await TransferCRUD(new_auth).create(transfer_data)
- raise CustomException(msg=f"转账失败: {hint}")
- # 成功时写入当前会话
- await transfer_crud.create(transfer_data)
- return AccountTransferOutSchema(
- status=result.status,
- order_no=result.order_no,
- fund_order_id=result.fund_order_id,
- out_biz_no=model.out_biz_no,
- )
- @classmethod
- async def tenant_transfer_service(
- cls,
- auth: AuthSchema,
- tenant_id: int,
- data: TenantTransferCreate,
- request_ip: str,
- api_key_id: int | None = None,
- ) -> TenantTransferResponse:
- """
- 租户API转账(通过API Key认证)
- 调用: alipay.commerce.ec.trans.account.transfer
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import (
- AlipayCommerceEcTransAccountTransferRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import (
- AlipayCommerceEcTransAccountTransferModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import (
- AlipayCommerceEcTransAccountTransferResponse,
- )
- from alipay.aop.api.domain.TransParticipant import (
- TransParticipant,
- )
- from alipay.aop.api.domain.BankCardExtInfoDTO import (
- BankCardExtInfoDTO,
- )
-
- # 检查资金专户是否存在
- account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id)
- if not account:
- raise CustomException(msg="资金账户不存在")
- if account.tenant_id != tenant_id:
- raise CustomException(msg="无权限操作")
- if data.enterprise_id and account.enterprise_id != data.enterprise_id:
- raise CustomException(msg="参数错误")
-
- if not data.order_title and account.enterprise_id:
- enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id)
- if not enterprise:
- raise CustomException(msg="资金账户所属企业不存在")
- data.order_title = f"来自{enterprise.name}转账"
- model = AlipayCommerceEcTransAccountTransferModel()
- model.enterprise_id = account.enterprise_id
- model.account_book_id = account.account_book_id
- model.out_biz_no = get_snowflake_id_str(tenant_id)
- # 转账总金额,单位为元,精确到小数点后两位
- model.amount = str(data.amount)
- model.order_title = data.order_title
- payee_info = TransParticipant()
- payee_info.identity_type = data.payee_info.identity_type
- payee_info.name = data.payee_info.name
- payee_info.identity = data.payee_info.identity
- if data.payee_info.bankcard_ext_info:
- payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict(
- data.payee_info.bankcard_ext_info.model_dump(exclude_none=True)
- )
- model.payee_info = payee_info
- request = AlipayCommerceEcTransAccountTransferRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="转账失败: 无响应")
- result = AlipayCommerceEcTransAccountTransferResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}")
- transfer_crud = TransferCRUD(auth)
- transfer_data = {
- "enterprise_id": model.enterprise_id,
- "out_biz_no": model.out_biz_no,
- "account_book_id": model.account_book_id,
- "amount": model.amount,
- "order_title": model.order_title,
- "payee_info": data.payee_info.model_dump() if data.payee_info else None,
- "payee_type": data.payee_info.identity_type if data.payee_info else None,
- "status": result.status,
- "order_no": result.order_no,
- "fund_order_id": result.fund_order_id,
- }
- await transfer_crud.create(transfer_data)
- return TenantTransferResponse(
- status=result.status,
- order_no=result.order_no,
- fund_order_id=result.fund_order_id,
- )
- @classmethod
- async def withdraw_service(
- cls,
- auth: AuthSchema,
- data: AccountWithdrawSchema
- ) -> AccountOperationOutSchema:
- """
- 资金专户提现
- 调用: alipay.commerce.ec.trans.account.withdraw
- 接口文档: https://opendocs.alipay.com/pre-open/d651859b_alipay.commerce.ec.trans.account.withdraw
- 参数说明:
- - enterprise_id: 企业ID
- - account_book_id: 资金专户号
- - amount: 提现金额
- - out_biz_no: 商家侧订单号(唯一)
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountWithdrawRequest import (
- AlipayCommerceEcTransAccountWithdrawRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountWithdrawModel import (
- AlipayCommerceEcTransAccountWithdrawModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountWithdrawResponse import (
- AlipayCommerceEcTransAccountWithdrawResponse,
- )
- crud = AccountCRUD(auth)
- enterprise = await crud.get_by_enterprise_id(data.enterprise_id)
- if not enterprise:
- raise CustomException(msg="企业不存在")
- model = AlipayCommerceEcTransAccountWithdrawModel()
- model.enterprise_id = enterprise.enterprise_id
- model.account_book_id = data.account_book_id
- model.amount = str(data.amount)
- model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
- if data.remark:
- model.remark = data.remark
- request = AlipayCommerceEcTransAccountWithdrawRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="提现失败: 无响应")
- result = AlipayCommerceEcTransAccountWithdrawResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"提现失败: {result.msg}")
- withdraw_crud = WithdrawCRUD(auth)
- withdraw_data = {
- "enterprise_id": data.enterprise_id,
- "out_biz_no": model.out_biz_no,
- "account_book_id": data.account_book_id,
- "amount": data.amount,
- # 专户提现到余额户是同步操作,要么执行成功,要么执行异常,
- # 出参status设计多余,遵循规范使用业务码区分成功与失败
- "status": WithdrawStatusEnum.SUCCESS.value,
- "order_no": result.order_no,
- }
- await withdraw_crud.create(withdraw_data)
- log.info(f"资金专户提现发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}")
- return AccountOperationOutSchema(
- enterprise_id=data.enterprise_id,
- account_book_id=data.account_book_id,
- )
- @classmethod
- async def query_account_service(
- cls,
- auth: AuthSchema,
- data: AccountQuerySchema
- ) -> list[Any]:
- """
- 查询资金专户(调用支付宝接口)
- 调用: alipay.commerce.ec.trans.account.query
- """
- from alipay.aop.api.request.AlipayCommerceEcTransAccountQueryRequest import (
- AlipayCommerceEcTransAccountQueryRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransAccountQueryModel import (
- AlipayCommerceEcTransAccountQueryModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransAccountQueryResponse import (
- AlipayCommerceEcTransAccountQueryResponse,
- )
- from alipay.aop.api.domain.FundAccountApiDTO import (
- FundAccountApiDTO,
- )
- model = AlipayCommerceEcTransAccountQueryModel()
- model.enterprise_id = data.enterprise_id
- request = AlipayCommerceEcTransAccountQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询资金专户失败: 无响应")
- result = AlipayCommerceEcTransAccountQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询资金专户失败: {result.msg}")
- collect = []
- for v in list(result.account_list or []):
- if not hasattr(v, "account_book_id"):
- continue
- if not hasattr(v, "scene") or v.scene != "B2B_TRANS":
- continue
- account = FundAccountApiDTO.to_alipay_dict(v)
- collect.append(account)
-
- return collect
- @classmethod
- async def transfer_detail_service(
- cls,
- auth: AuthSchema,
- out_biz_no: str
- ) -> TransferOutSchema:
- """
- 查询转账记录详情
- """
- crud = TransferCRUD(auth)
- transfer = await crud.get_by_out_biz_no(out_biz_no)
- if not transfer:
- raise CustomException(msg="转账记录不存在")
- transfer_result = TransferOutSchema.model_validate(transfer)
- # 查询三方订单号
- open_transfer_crud = OpenTransferCRUD(auth)
- open_transfer_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no)
- if open_transfer_data:
- transfer_result.third_biz_no = open_transfer_data.third_biz_no
- return transfer_result
- @classmethod
- async def transfer_list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """
- 查询转账记录列表
- """
- log.info(f"查询转账记录列表: {page_no}, {page_size}, {search}")
- crud = TransferCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=TransferListOutSchema,
- )
- @classmethod
- async def transfer_export_service(
- cls,
- auth: AuthSchema,
- start_time: str,
- end_time: str,
- enterprise_id: Optional[str] = None,
- ) -> bytes:
- """
- 导出转账记录报表为Excel文件
- """
- log.info(f"导出转账记录报表: {start_time} -> {end_time}")
-
- crud = TransferCRUD(auth)
-
- search = {
- "created_time__gte": start_time,
- "created_time__lte": end_time,
- }
-
- if enterprise_id:
- search["enterprise_id"] = enterprise_id
-
- records = await crud.list(
- search=search,
- order_by=[{"id": "desc"}],
- )
-
- from app.utils.excel_util import ExcelUtil
-
- status_map = {
- "DEALING": "处理中",
- "SUCCESS": "成功",
- "FAIL": "失败",
- "REFUND": "退票",
- }
-
- payee_type_map = {
- "ALIPAY_ACCOUNT": "支付宝账户",
- "BANK_CARD": "银行卡",
- }
-
- list_data = []
- for i, record in enumerate(records, start=1):
- payee_info = record.payee_info or {}
- list_data.append({
- "序号": i,
- "订单号": record.out_biz_no or "",
- "商户订单号": record.order_no or "",
- "金额(元)": str(record.amount or 0),
- "收款方姓名": payee_info.get("name", ""),
- "收款方类型": payee_type_map.get(payee_info.get("identity_type", ""), ""),
- "状态": status_map.get(record.status, record.status),
- "转账标题": record.order_title or "",
- "创建时间": record.created_time.strftime("%Y-%m-%d %H:%M:%S") if record.created_time else "",
- })
-
- mapping_dict = {
- "序号": "序号",
- "订单号": "订单号",
- "商户订单号": "商户订单号",
- "金额(元)": "金额(元)",
- "收款方姓名": "收款方姓名",
- "收款方类型": "收款方类型",
- "状态": "状态",
- "转账标题": "转账标题",
- "创建时间": "创建时间",
- }
-
- return ExcelUtil.export_list2excel(list_data, mapping_dict)
- @classmethod
- async def apply_receipt_service(
- cls,
- auth: AuthSchema,
- redis: Redis,
- data: ReceiptApplySchema,
- ) -> str:
- """
- 申请转账业务回单
-
- 调用: alipay.commerce.ec.trans.receipt.apply
-
- 参数:
- - enterprise_id: 企业ID
- - order_no: 支付宝转账单号
-
- 返回: file_id
- """
- from app.core.redis_crud import RedisCURD
-
- redis_crud = RedisCURD(redis)
-
- cache_key = f"receipt:{data.enterprise_id}:{data.order_no}"
- cached_file_id = await redis_crud.get(cache_key)
-
- if cached_file_id:
- log.info(f"使用缓存的 file_id: {cached_file_id}")
- return cached_file_id
-
- crud = EnterpriseCRUD(auth)
- enterprise = await crud.get_by_enterprise_id(data.enterprise_id)
- if not enterprise:
- raise CustomException(msg="企业不存在")
-
- from alipay.aop.api.request.AlipayCommerceEcTransReceiptApplyRequest import (
- AlipayCommerceEcTransReceiptApplyRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransReceiptApplyModel import (
- AlipayCommerceEcTransReceiptApplyModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransReceiptApplyResponse import (
- AlipayCommerceEcTransReceiptApplyResponse,
- )
-
- model = AlipayCommerceEcTransReceiptApplyModel()
- model.enterprise_id = data.enterprise_id
- model.order_no = data.order_no
-
- request = AlipayCommerceEcTransReceiptApplyRequest()
- request.biz_model = model
-
- client = AlipayClient.get_client()
- response = client.execute(request)
-
- if not response:
- raise CustomException(msg="申请回单失败: 无响应")
-
- result = AlipayCommerceEcTransReceiptApplyResponse()
- result.parse_response_content(response)
-
- if not result.is_success():
- # 清除缓存
- await redis_crud.delete(cache_key)
- raise CustomException(msg=f"申请回单失败: {result.msg}")
-
- file_id = str(result.file_id)
- await redis_crud.set(cache_key, file_id, expire=172800)
-
- log.info(f"申请回单成功: order_no={data.order_no}, file_id={file_id}")
- return file_id
- @classmethod
- async def query_receipt_service(cls, enterprise_id: str, file_id: str) -> dict:
- """
- 查询回单状态
-
- 调用: alipay.commerce.ec.trans.receipt.query
-
- 参数:
- - file_id: 文件申请号
-
- 返回: {file_id, status, download_url, error_message}
- """
- from alipay.aop.api.request.AlipayCommerceEcTransReceiptQueryRequest import (
- AlipayCommerceEcTransReceiptQueryRequest,
- )
- from alipay.aop.api.response.AlipayCommerceEcTransReceiptQueryResponse import (
- AlipayCommerceEcTransReceiptQueryResponse,
- )
- from alipay.aop.api.domain.AlipayCommerceEcTransReceiptQueryModel import (
- AlipayCommerceEcTransReceiptQueryModel,
- )
-
- model = AlipayCommerceEcTransReceiptQueryModel()
- model.enterprise_id = enterprise_id
- model.file_id = file_id
-
- request = AlipayCommerceEcTransReceiptQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
-
- if not response:
- raise CustomException(msg="查询回单失败: 无响应")
-
- result = AlipayCommerceEcTransReceiptQueryResponse()
- result.parse_response_content(response)
-
- if not result.is_success():
- raise CustomException(msg=f"查询回单失败: {result.msg}")
-
- data = {
- "file_id": file_id,
- "status": result.status,
- "download_url": result.download_url,
- "error_message": result.error_message,
- }
-
- return data
- @classmethod
- async def transfer_sync_status_service(
- cls,
- auth: AuthSchema,
- data: "TransferSyncStatusSchema",
- ) -> dict:
- """
- 手动同步转账状态(管理员补录)
- 用于修复因通知丢失而卡在 DEALING 的转账记录
- """
- from app.plugin.module_payment.account.crud import TransferCRUD
- from app.plugin.module_payment.account.schema import TransferSyncStatusSchema
- crud = TransferCRUD(auth)
- transfer = await crud.get_by_out_biz_no(data.out_biz_no)
- if not transfer:
- raise CustomException(msg=f"转账记录不存在: {data.out_biz_no}")
- if transfer.status != "DEALING" and data.status == "SUCCESS":
- raise CustomException(msg=f"转账记录当前状态为 {transfer.status},无需同步")
- update_data = {"status": data.status}
- if data.error_code:
- update_data["error_code"] = data.error_code
- if data.error_msg:
- update_data["error_msg"] = data.error_msg
- for key, value in update_data.items():
- if hasattr(transfer, key):
- setattr(transfer, key, value)
- await auth.db.flush()
- await auth.db.refresh(transfer)
- log.info(f"手动同步转账状态成功: out_biz_no={data.out_biz_no}, {transfer.status}")
- return {
- "out_biz_no": transfer.out_biz_no,
- "status": transfer.status,
- "error_code": transfer.error_code,
- "error_msg": transfer.error_msg,
- }
- @classmethod
- async def transfer_sync_all_service(
- cls,
- auth: AuthSchema,
- ) -> dict:
- """
- 全量同步转账状态
- 尝试调 fund.trans.common.query,如无权限则降级为列出 DEALING 记录供手动同步
- """
- from sqlalchemy import select
- from app.plugin.module_payment.account.model import TransferModel
- from app.plugin.module_payment.account.enums import TransferStatusEnum
- stmt = select(TransferModel).where(
- TransferModel.out_biz_no.isnot(None),
- ).order_by(TransferModel.id.asc())
- result = await auth.db.execute(stmt)
- all_transfers = result.scalars().all()
- synced = 0
- errors = 0
- details = []
- _has_permission = True
- for transfer in all_transfers:
- out_biz_no = transfer.out_biz_no
- eid = transfer.enterprise_id
- if not out_biz_no or not eid:
- continue
- try:
- result = await cls._sync_transfer_detail(auth, out_biz_no, eid)
- if result is False:
- # 两个方案都失败了(无权限),停止全量同步
- _has_permission = False
- break
- if isinstance(result, str):
- synced += 1
- details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": result})
- else:
- details.append({"out_biz_no": out_biz_no, "status": transfer.status, "action": "no_change"})
- except Exception as e:
- errors += 1
- details.append({"out_biz_no": out_biz_no, "status": transfer.status, "error": str(e)})
- log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}")
- if not _has_permission:
- dealing = [t for t in all_transfers if t.status == TransferStatusEnum.DEALING.value]
- return {
- "total": len(all_transfers),
- "synced": synced,
- "no_permission": True,
- "dealing_count": len(dealing),
- "details": [{"out_biz_no": t.out_biz_no, "status": t.status} for t in dealing],
- "note": "无法通过支付宝 API 查询转账状态,请在开放平台开通 alipay.fund.trans.common.query 权限,或逐个使用 sync-status 手动补录",
- }
- if synced > 0:
- await auth.db.flush()
- return {
- "total": len(all_transfers),
- "synced": synced,
- "errors": errors,
- "details": details,
- }
- @classmethod
- async def _sync_transfer_detail(
- cls,
- auth: AuthSchema,
- out_biz_no: str,
- enterprise_id: str,
- ) -> str | bool | None:
- """查询单笔转账详情并更新本地记录
- 优先调 fund.trans.common.query,无权限时改用 consume.detail.query(用 order_no 当 pay_no 查)
- 返回: 新状态str / False(无权限) / None(失败/无变化)
- """
- from sqlalchemy import select, update as sa_update
- from app.plugin.module_payment.account.model import TransferModel
- from app.core.alipay import AlipayClient
- # 先查本地记录
- tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no)
- tf_result = await auth.db.execute(tf_stmt)
- local_transfer = tf_result.scalar_one_or_none()
- # — 方案A: fund.trans.common.query —
- try:
- from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import (
- AlipayFundTransCommonQueryRequest,
- )
- from alipay.aop.api.domain.AlipayFundTransCommonQueryModel import (
- AlipayFundTransCommonQueryModel,
- )
- from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import (
- AlipayFundTransCommonQueryResponse,
- )
- model = AlipayFundTransCommonQueryModel()
- model.out_biz_no = out_biz_no
- model.product_code = "TRANS_ACCOUNT_NO_PWD"
- model.biz_scene = "DIRECT_TRANSFER"
- request = AlipayFundTransCommonQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if response:
- result = AlipayFundTransCommonQueryResponse()
- result.parse_response_content(response)
- if result.is_success():
- alipay_status = getattr(result, 'status', None)
- if alipay_status and alipay_status != "DEALING":
- return await cls._apply_transfer_update(auth, out_biz_no, result, alipay_status)
- return None
- sub_msg = getattr(result, 'sub_msg', '') or ''
- if '权限' not in sub_msg and 'NO_PERMISSION' not in sub_msg:
- return None
- # 权限不足,继续方案B
- except ImportError:
- pass
- # — 方案B: consume.detail.query(用 order_no 当 pay_no 查) —
- order_no = local_transfer.order_no if local_transfer else None
- if not order_no:
- log.warning(f"无 order_no 可用于查询: out_biz_no={out_biz_no}")
- return False
- try:
- from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
- AlipayCommerceEcConsumeDetailQueryRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import (
- AlipayCommerceEcConsumeDetailQueryModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
- AlipayCommerceEcConsumeDetailQueryResponse,
- )
- model = AlipayCommerceEcConsumeDetailQueryModel()
- model.pay_no = order_no
- model.enterprise_id = enterprise_id
- request = AlipayCommerceEcConsumeDetailQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- return False
- result = AlipayCommerceEcConsumeDetailQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_code = getattr(result, 'sub_code', '') or ''
- sub_msg = getattr(result, 'sub_msg', '') or ''
- # 权限不足
- if '权限' in sub_msg or 'NO_PERMISSION' in sub_code:
- return False
- log.warning(f"consume.detail.query 查无记录: out_biz_no={out_biz_no}, err={sub_msg}")
- return None
- consume_info = getattr(result, 'consume_info', None)
- if not consume_info:
- return None
- consume_type = getattr(consume_info, 'consume_type', '')
- if consume_type != "TRANSFER":
- return None
- notify_reason = getattr(consume_info, 'notify_reason', '') or ''
- if 'SUCCESS' in notify_reason.upper():
- new_status = "SUCCESS"
- elif 'FAIL' in notify_reason.upper():
- new_status = "FAIL"
- else:
- return None
- update_data = {"status": new_status}
- pay_no = getattr(consume_info, 'pay_no', None)
- if pay_no and pay_no != order_no:
- update_data["order_no"] = pay_no
- upd = sa_update(TransferModel).where(
- TransferModel.out_biz_no == out_biz_no
- ).values(**update_data)
- await auth.db.execute(upd)
- log.info(f"转账同步(consume详情) - out_biz_no={out_biz_no}, status={new_status}")
- return new_status
- except ImportError:
- log.warning("consume.detail.query SDK 不可用")
- return False
- except Exception as e:
- log.warning(f"consume.detail.query 异常: out_biz_no={out_biz_no}, err={e}")
- return False
- @classmethod
- async def _apply_transfer_update(
- cls,
- auth: AuthSchema,
- out_biz_no: str,
- result: object,
- alipay_status: str,
- ) -> str | None:
- """根据 fund.trans.common.query 结果更新本地记录"""
- from sqlalchemy import update as sa_update
- from app.plugin.module_payment.account.model import TransferModel
- update_data = {"status": alipay_status}
- order_no = getattr(result, 'order_id', None)
- pay_fund_order_id = getattr(result, 'pay_fund_order_id', None)
- trans_amount = getattr(result, 'trans_amount', None)
- error_code = getattr(result, 'error_code', None)
- fail_reason = getattr(result, 'fail_reason', None)
- if order_no:
- update_data["order_no"] = order_no
- if pay_fund_order_id:
- update_data["fund_order_id"] = pay_fund_order_id
- if trans_amount:
- update_data["amount"] = Decimal(str(trans_amount))
- if error_code:
- update_data["error_code"] = error_code
- if fail_reason:
- update_data["error_msg"] = fail_reason
- if update_data.get("status") != "DEALING":
- upd = sa_update(TransferModel).where(
- TransferModel.out_biz_no == out_biz_no
- ).values(**update_data)
- await auth.db.execute(upd)
- log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}")
- return alipay_status
- return None
- @classmethod
- async def update_transfer_status_service(
- cls,
- auth: AuthSchema,
- order_no: str,
- status: str,
- ext_info: dict = {}
- ) -> None:
- """
- 更新转账状态(由通知处理器调用)
- """
- crud = TransferCRUD(auth)
- transfer = await crud.get_by_order_no(order_no)
- if not transfer:
- log.warning(f"转账记录不存在: {order_no}")
- return
- update_data = {}
- update_data["status"] = status
- if ext_info:
- update_data["ext_info"] = ext_info
- await crud.update_by_order_no(order_no, update_data)
- @classmethod
- async def update_deposit_status_service(
- cls,
- auth: AuthSchema,
- out_biz_no: str,
- status: str,
- ) -> None:
- """
- 更新充值状态(由通知处理器调用)
- """
- crud = DepositCRUD(auth)
- deposit = await crud.get_by_out_biz_no(out_biz_no)
- if not deposit:
- log.warning(f"充值记录不存在: {out_biz_no}")
- return
- update_data = {"status": status}
- await crud.update_by_out_biz_no(out_biz_no, update_data)
- @classmethod
- async def update_withdraw_status_service(
- cls,
- auth: AuthSchema,
- out_biz_no: str,
- status: str,
- error_code: str | None = None,
- error_msg: str | None = None,
- ) -> None:
- """
- 更新提现状态(由通知处理器调用)
- """
- crud = WithdrawCRUD(auth)
- withdraw = await crud.get_by_out_biz_no(out_biz_no)
- if not withdraw:
- log.warning(f"提现记录不存在: {out_biz_no}")
- return
- update_data = {"status": status}
- if error_code:
- update_data["error_code"] = error_code
- if error_msg:
- update_data["error_msg"] = error_msg
- await crud.update_by_out_biz_no(out_biz_no, update_data)
- @classmethod
- async def consume_detail_query_service(
- cls,
- auth: AuthSchema,
- pay_no: str,
- enterprise_id: str | None = None,
- ant_shop_id: str | None = None,
- query_options: list[str] | None = None,
- ) -> dict:
- """
- 账单详情查询(✅)
- 调用: alipay.commerce.ec.consume.detail.query
- 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。
- """
- from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
- AlipayCommerceEcConsumeDetailQueryRequest,
- )
- from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import (
- AlipayCommerceEcConsumeDetailQueryModel,
- )
- from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
- AlipayCommerceEcConsumeDetailQueryResponse,
- )
- model = AlipayCommerceEcConsumeDetailQueryModel()
- model.pay_no = pay_no
- if enterprise_id:
- model.enterprise_id = enterprise_id
- if ant_shop_id:
- model.ant_shop_id = ant_shop_id
- if query_options:
- model.query_options = query_options
- request = AlipayCommerceEcConsumeDetailQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="账单详情查询失败: 无响应")
- result = AlipayCommerceEcConsumeDetailQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"账单详情查询失败: {result.msg}")
- consume_info = result.consume_info
- if not consume_info:
- raise CustomException(msg="账单详情查询失败: 无账单信息")
- return {
- "account_id": consume_info.account_id,
- "pay_no": consume_info.pay_no,
- "consume_type": consume_info.consume_type,
- "gmt_biz_create": consume_info.gmt_biz_create,
- "consume_biz_type": consume_info.consume_biz_type,
- "consume_amount": consume_info.consume_amount,
- "order_complete_label": consume_info.order_complete_label,
- "refund_status": consume_info.refund_status,
- "refund_amount": consume_info.refund_amount,
- "peer_payer_card_name": consume_info.peer_payer_card_name,
- "user_id": getattr(consume_info, 'user_id', None),
- "open_id": getattr(consume_info, 'open_id', None),
- "enterprise_id": consume_info.enterprise_id,
- "employee_id": consume_info.employee_id,
- "enterprise_name": getattr(consume_info, 'enterprise_name', None),
- "employee_name": getattr(consume_info, 'employee_name', None),
- "consume_scene_code": getattr(consume_info, 'consume_scene_code', None),
- "consume_type_sub_category": getattr(consume_info, 'consume_type_sub_category', None),
- "consume_title": getattr(consume_info, 'consume_title', None),
- "gmt_pay": getattr(consume_info, 'gmt_pay', None),
- "gmt_refund": getattr(consume_info, 'gmt_refund', None),
- "pay_amount": getattr(consume_info, 'pay_amount', None),
- "invoice_amount": getattr(consume_info, 'invoice_amount', None),
- "peer_pay_amount": getattr(consume_info, 'peer_pay_amount', None),
- "subsidy_amount": getattr(consume_info, 'subsidy_amount', None),
- "ext_infos": getattr(consume_info, 'ext_infos', None),
- }
|