from datetime import datetime from decimal import Decimal from typing import Any, Optional from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str from app.plugin.module_payment.enterprise.crud import EnterpriseCRUD from .crud import AccountCRUD, TransferCRUD, DepositCRUD, WithdrawCRUD from .enums import ( DepositStatusEnum, WithdrawStatusEnum, ) from .schema import ( AccountAuthorizeApplySchema, AccountAuthorizeApplyOutSchema, AccountCreateSchema, AccountDepositSchema, AccountDepositOutSchema, AccountOperationOutSchema, AccountQuerySchema, AccountTransferSchema, AccountTransferOutSchema, AccountWithdrawSchema, ReceiptApplySchema, TransferListOutSchema, TransferOutSchema, TenantTransferCreate, TenantTransferResponse, ) from ..openapi.crud import OpenTransferCRUD # 支付宝资金专户转账错误码 → 友好提示 _TRANSFER_ERROR_HINTS = { "SYSTEM_ERROR": "系统繁忙,请稍后重试", "INVALID_PARAMETER": "请求参数有误,请检查后重试", "AMOUNT_LESS_THAN_ONE_CENT": "转账金额不能低于 0.01 元", "BALANCE_IS_NOT_ENOUGH": "企业余额不足,建议充值", "BANK_RESPONSE_ERROR": "银行处理失败:账户异常", "CARD_BIN_ERROR": "收款银行账号不正确,请确认", "DUPLICATE_DIFFERENT_REQUEST": "重复请求但参数不一致,请检查", "EXCEED_LIMIT_SM_MIN_AMOUNT": "转账金额不能低于 0.1 元", "EXCEED_LIMIT_DM_MAX_AMOUNT": "超出单日转账限额,请明天再试或联系管理员提升限额", "INVALID_ACCOUNT_BOOK": "资金专户不存在,请检查专户号", "INVALID_CARDNO": "无效的收款银行卡号", "INVALID_IDENTITY_TYPE": "收款方身份类型不匹配", "NO_AGREEMENT": "无转账权限,请联系管理员", "PAYEE_CARD_INFO_ERROR": "收款方账号或银行卡信息有误,请核实", "PAYEE_NOT_EXIST": "收款账号不存在或姓名有误", "PAYER_BALANCE_NOT_ENOUGH": "付款方余额不足,建议充值", "REQUEST_PROCESSING": "系统处理中,请稍后重试", "TRANS_AUTH_NO_EXIST": "转账授权协议不存在,请先签约", } def _parse_dt(val: str | None) -> datetime | None: """解析支付宝日期字符串""" if not val: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(val, fmt) except ValueError: continue return None class AccountService: """资金专户服务层""" @classmethod async def stat_transfer_amount_service( cls, auth: AuthSchema, tenant_id: Optional[int] = None, enterprise_id: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> Decimal: """ 统计转账金额(✅) 统计企业在指定时间范围内的转账总金额以及每天的转账金额。 """ crud = TransferCRUD(auth) return await crud.get_transfer_amount( tenant_id=tenant_id, enterprise_id=enterprise_id, start_date=start_date, end_date=end_date, ) @classmethod async def authorize_apply_service( cls, auth: AuthSchema, data: AccountAuthorizeApplySchema ) -> AccountAuthorizeApplyOutSchema: """ 申请转账授权签约(✅) 调用: alipay.commerce.ec.trans.authorize.apply """ from alipay.aop.api.request.AlipayCommerceEcTransAuthorizeApplyRequest import ( AlipayCommerceEcTransAuthorizeApplyRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAuthorizeApplyModel import ( AlipayCommerceEcTransAuthorizeApplyModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAuthorizeApplyResponse import ( AlipayCommerceEcTransAuthorizeApplyResponse, ) model = AlipayCommerceEcTransAuthorizeApplyModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAuthorizeApplyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请转账授权失败: 无响应") result = AlipayCommerceEcTransAuthorizeApplyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"申请转账授权失败: {result.msg}") return AccountAuthorizeApplyOutSchema( sign_url=result.sign_url, ) @classmethod async def create_account_service( cls, auth: AuthSchema, data: AccountCreateSchema ) -> AccountOperationOutSchema: """ 开通资金专户(✅) 调用: alipay.commerce.ec.trans.account.create """ from alipay.aop.api.request.AlipayCommerceEcTransAccountCreateRequest import ( AlipayCommerceEcTransAccountCreateRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountCreateModel import ( AlipayCommerceEcTransAccountCreateModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountCreateResponse import ( AlipayCommerceEcTransAccountCreateResponse, ) model = AlipayCommerceEcTransAccountCreateModel() model.enterprise_id = data.enterprise_id # model.account_type = data.account_type or "ALL" # 收支全能户 # model.scene = data.scene or "B2B_TRANS" # ToB转账场景 model.account_type = "ALL" model.scene = "B2B_TRANS" request = AlipayCommerceEcTransAccountCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="开通资金专户失败: 无响应") result = AlipayCommerceEcTransAccountCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"开通资金专户失败: {result.msg}") account_data = AccountCreateSchema( enterprise_id=model.enterprise_id, account_book_id=result.account_book_id, account_type=model.account_type, scene=model.scene, ) if result.account_book_id: account_data.account_book_id = result.account_book_id await AccountCRUD(auth).create(account_data) return AccountOperationOutSchema( enterprise_id=account_data.enterprise_id, account_book_id=account_data.account_book_id, ) @classmethod async def deposit_service( cls, auth: AuthSchema, data: AccountDepositSchema ) -> AccountDepositOutSchema: """ 资金专户充值(✅) 调用: alipay.commerce.ec.trans.account.deposit """ from alipay.aop.api.request.AlipayCommerceEcTransAccountDepositRequest import ( AlipayCommerceEcTransAccountDepositRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountDepositModel import ( AlipayCommerceEcTransAccountDepositModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountDepositResponse import ( AlipayCommerceEcTransAccountDepositResponse, ) model = AlipayCommerceEcTransAccountDepositModel() model.enterprise_id = data.enterprise_id model.account_book_id = data.account_book_id model.amount = str(data.amount) model.out_biz_no = get_snowflake_id_str(auth.tenant_id) request = AlipayCommerceEcTransAccountDepositRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="充值失败: 无响应") result = AlipayCommerceEcTransAccountDepositResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"充值失败: {result.msg}") deposit_crud = DepositCRUD(auth) deposit_data = { "enterprise_id": data.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": data.account_book_id, "amount": data.amount, "url": result.url, "status": DepositStatusEnum.DEALING.value, "remark": data.remark, } await deposit_crud.create(deposit_data) return AccountDepositOutSchema( url=result.url, ) @classmethod async def transfer_service( cls, auth: AuthSchema, data: AccountTransferSchema ) -> AccountTransferOutSchema: """ 资金专户转账(✅) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != auth.tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(auth.tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) sub_code = getattr(result, 'sub_code', '') or '' sub_msg = getattr(result, 'sub_msg', '') or '' # 构建转账记录数据,但延迟写入: # - 成功时在当前会话写入 # - 失败时使用独立事务写入并提交,避免被外层回滚吞掉 transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "payee_type": data.payee_info.identity_type if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, "remark": "", } log.info(f"记录转账: {transfer_data}") if not result.is_success(): # 优先用 sub_code 匹配 hint = _TRANSFER_ERROR_HINTS.get(sub_code) # sub_code 无匹配时,尝试从 sub_msg 中提取错误码(支付宝部分接口sub_code返回unknown-sub-code) if not hint: for code_key, code_hint in _TRANSFER_ERROR_HINTS.items(): if code_key in sub_msg: hint = code_hint break hint = hint or sub_msg or result.msg or "转账失败" log.error(f"支付宝接口调用失败: {result.code} - {result.msg} (sub_code={sub_code}, sub_msg={sub_msg})") # 使用独立的 session/事务保证失败记录能被持久化 from app.core.database import async_db_session async with async_db_session() as _session: async with _session.begin(): new_auth = AuthSchema(db=_session, check_data_scope=False) # 保持 tenant_id new_auth.tenant_id = getattr(auth, "tenant_id", None) transfer_data["status"]="FAIL" transfer_data["remark"]=f"{result.msg} ({sub_code} {sub_msg})" await TransferCRUD(new_auth).create(transfer_data) raise CustomException(msg=f"转账失败: {hint}") # 成功时写入当前会话 await transfer_crud.create(transfer_data) return AccountTransferOutSchema( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, out_biz_no=model.out_biz_no, ) @classmethod async def tenant_transfer_service( cls, auth: AuthSchema, tenant_id: int, data: TenantTransferCreate, request_ip: str, api_key_id: int | None = None, ) -> TenantTransferResponse: """ 租户API转账(通过API Key认证) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}") transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "payee_type": data.payee_info.identity_type if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, } await transfer_crud.create(transfer_data) return TenantTransferResponse( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, ) @classmethod async def withdraw_service( cls, auth: AuthSchema, data: AccountWithdrawSchema ) -> AccountOperationOutSchema: """ 资金专户提现 调用: alipay.commerce.ec.trans.account.withdraw 接口文档: https://opendocs.alipay.com/pre-open/d651859b_alipay.commerce.ec.trans.account.withdraw 参数说明: - enterprise_id: 企业ID - account_book_id: 资金专户号 - amount: 提现金额 - out_biz_no: 商家侧订单号(唯一) """ from alipay.aop.api.request.AlipayCommerceEcTransAccountWithdrawRequest import ( AlipayCommerceEcTransAccountWithdrawRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountWithdrawModel import ( AlipayCommerceEcTransAccountWithdrawModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountWithdrawResponse import ( AlipayCommerceEcTransAccountWithdrawResponse, ) crud = AccountCRUD(auth) enterprise = await crud.get_by_enterprise_id(data.enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") model = AlipayCommerceEcTransAccountWithdrawModel() model.enterprise_id = enterprise.enterprise_id model.account_book_id = data.account_book_id model.amount = str(data.amount) model.out_biz_no = get_snowflake_id_str(auth.tenant_id) if data.remark: model.remark = data.remark request = AlipayCommerceEcTransAccountWithdrawRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="提现失败: 无响应") result = AlipayCommerceEcTransAccountWithdrawResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"提现失败: {result.msg}") withdraw_crud = WithdrawCRUD(auth) withdraw_data = { "enterprise_id": data.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": data.account_book_id, "amount": data.amount, # 专户提现到余额户是同步操作,要么执行成功,要么执行异常, # 出参status设计多余,遵循规范使用业务码区分成功与失败 "status": WithdrawStatusEnum.SUCCESS.value, "order_no": result.order_no, } await withdraw_crud.create(withdraw_data) log.info(f"资金专户提现发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}") return AccountOperationOutSchema( enterprise_id=data.enterprise_id, account_book_id=data.account_book_id, ) @classmethod async def query_account_service( cls, auth: AuthSchema, data: AccountQuerySchema ) -> list[Any]: """ 查询资金专户(调用支付宝接口) 调用: alipay.commerce.ec.trans.account.query """ from alipay.aop.api.request.AlipayCommerceEcTransAccountQueryRequest import ( AlipayCommerceEcTransAccountQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountQueryModel import ( AlipayCommerceEcTransAccountQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountQueryResponse import ( AlipayCommerceEcTransAccountQueryResponse, ) from alipay.aop.api.domain.FundAccountApiDTO import ( FundAccountApiDTO, ) model = AlipayCommerceEcTransAccountQueryModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAccountQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询资金专户失败: 无响应") result = AlipayCommerceEcTransAccountQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询资金专户失败: {result.msg}") collect = [] for v in list(result.account_list or []): if not hasattr(v, "account_book_id"): continue if not hasattr(v, "scene") or v.scene != "B2B_TRANS": continue account = FundAccountApiDTO.to_alipay_dict(v) collect.append(account) return collect @classmethod async def transfer_detail_service( cls, auth: AuthSchema, out_biz_no: str ) -> TransferOutSchema: """ 查询转账记录详情 """ crud = TransferCRUD(auth) transfer = await crud.get_by_out_biz_no(out_biz_no) if not transfer: raise CustomException(msg="转账记录不存在") transfer_result = TransferOutSchema.model_validate(transfer) # 查询三方订单号 open_transfer_crud = OpenTransferCRUD(auth) open_transfer_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no) if open_transfer_data: transfer_result.third_biz_no = open_transfer_data.third_biz_no return transfer_result @classmethod async def transfer_list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询转账记录列表 """ log.info(f"查询转账记录列表: {page_no}, {page_size}, {search}") crud = TransferCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=TransferListOutSchema, ) @classmethod async def transfer_export_service( cls, auth: AuthSchema, start_time: str, end_time: str, enterprise_id: Optional[str] = None, ) -> bytes: """ 导出转账记录报表为Excel文件 """ log.info(f"导出转账记录报表: {start_time} -> {end_time}") crud = TransferCRUD(auth) search = { "created_time__gte": start_time, "created_time__lte": end_time, } if enterprise_id: search["enterprise_id"] = enterprise_id records = await crud.list( search=search, order_by=[{"id": "desc"}], ) from app.utils.excel_util import ExcelUtil status_map = { "DEALING": "处理中", "SUCCESS": "成功", "FAIL": "失败", "REFUND": "退票", } payee_type_map = { "ALIPAY_ACCOUNT": "支付宝账户", "BANK_CARD": "银行卡", } list_data = [] for i, record in enumerate(records, start=1): payee_info = record.payee_info or {} list_data.append({ "序号": i, "订单号": record.out_biz_no or "", "商户订单号": record.order_no or "", "金额(元)": str(record.amount or 0), "收款方姓名": payee_info.get("name", ""), "收款方类型": payee_type_map.get(payee_info.get("identity_type", ""), ""), "状态": status_map.get(record.status, record.status), "转账标题": record.order_title or "", "创建时间": record.created_time.strftime("%Y-%m-%d %H:%M:%S") if record.created_time else "", }) mapping_dict = { "序号": "序号", "订单号": "订单号", "商户订单号": "商户订单号", "金额(元)": "金额(元)", "收款方姓名": "收款方姓名", "收款方类型": "收款方类型", "状态": "状态", "转账标题": "转账标题", "创建时间": "创建时间", } return ExcelUtil.export_list2excel(list_data, mapping_dict) @classmethod async def apply_receipt_service( cls, auth: AuthSchema, redis: Redis, data: ReceiptApplySchema, ) -> str: """ 申请转账业务回单 调用: alipay.commerce.ec.trans.receipt.apply 参数: - enterprise_id: 企业ID - order_no: 支付宝转账单号 返回: file_id """ from app.core.redis_crud import RedisCURD redis_crud = RedisCURD(redis) cache_key = f"receipt:{data.enterprise_id}:{data.order_no}" cached_file_id = await redis_crud.get(cache_key) if cached_file_id: log.info(f"使用缓存的 file_id: {cached_file_id}") return cached_file_id crud = EnterpriseCRUD(auth) enterprise = await crud.get_by_enterprise_id(data.enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") from alipay.aop.api.request.AlipayCommerceEcTransReceiptApplyRequest import ( AlipayCommerceEcTransReceiptApplyRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransReceiptApplyModel import ( AlipayCommerceEcTransReceiptApplyModel, ) from alipay.aop.api.response.AlipayCommerceEcTransReceiptApplyResponse import ( AlipayCommerceEcTransReceiptApplyResponse, ) model = AlipayCommerceEcTransReceiptApplyModel() model.enterprise_id = data.enterprise_id model.order_no = data.order_no request = AlipayCommerceEcTransReceiptApplyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请回单失败: 无响应") result = AlipayCommerceEcTransReceiptApplyResponse() result.parse_response_content(response) if not result.is_success(): # 清除缓存 await redis_crud.delete(cache_key) raise CustomException(msg=f"申请回单失败: {result.msg}") file_id = str(result.file_id) await redis_crud.set(cache_key, file_id, expire=172800) log.info(f"申请回单成功: order_no={data.order_no}, file_id={file_id}") return file_id @classmethod async def query_receipt_service(cls, enterprise_id: str, file_id: str) -> dict: """ 查询回单状态 调用: alipay.commerce.ec.trans.receipt.query 参数: - file_id: 文件申请号 返回: {file_id, status, download_url, error_message} """ from alipay.aop.api.request.AlipayCommerceEcTransReceiptQueryRequest import ( AlipayCommerceEcTransReceiptQueryRequest, ) from alipay.aop.api.response.AlipayCommerceEcTransReceiptQueryResponse import ( AlipayCommerceEcTransReceiptQueryResponse, ) from alipay.aop.api.domain.AlipayCommerceEcTransReceiptQueryModel import ( AlipayCommerceEcTransReceiptQueryModel, ) model = AlipayCommerceEcTransReceiptQueryModel() model.enterprise_id = enterprise_id model.file_id = file_id request = AlipayCommerceEcTransReceiptQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询回单失败: 无响应") result = AlipayCommerceEcTransReceiptQueryResponse() result.parse_response_content(response) if not result.is_success(): raise CustomException(msg=f"查询回单失败: {result.msg}") data = { "file_id": file_id, "status": result.status, "download_url": result.download_url, "error_message": result.error_message, } return data @classmethod async def transfer_sync_status_service( cls, auth: AuthSchema, data: "TransferSyncStatusSchema", ) -> dict: """ 手动同步转账状态(管理员补录) 用于修复因通知丢失而卡在 DEALING 的转账记录 """ from app.plugin.module_payment.account.crud import TransferCRUD from app.plugin.module_payment.account.schema import TransferSyncStatusSchema crud = TransferCRUD(auth) transfer = await crud.get_by_out_biz_no(data.out_biz_no) if not transfer: raise CustomException(msg=f"转账记录不存在: {data.out_biz_no}") if transfer.status != "DEALING" and data.status == "SUCCESS": raise CustomException(msg=f"转账记录当前状态为 {transfer.status},无需同步") update_data = {"status": data.status} if data.error_code: update_data["error_code"] = data.error_code if data.error_msg: update_data["error_msg"] = data.error_msg for key, value in update_data.items(): if hasattr(transfer, key): setattr(transfer, key, value) await auth.db.flush() await auth.db.refresh(transfer) log.info(f"手动同步转账状态成功: out_biz_no={data.out_biz_no}, {transfer.status}") return { "out_biz_no": transfer.out_biz_no, "status": transfer.status, "error_code": transfer.error_code, "error_msg": transfer.error_msg, } @classmethod async def transfer_sync_all_service( cls, auth: AuthSchema, ) -> dict: """ 全量同步转账状态 尝试调 fund.trans.common.query,如无权限则降级为列出 DEALING 记录供手动同步 """ from sqlalchemy import select from app.plugin.module_payment.account.model import TransferModel from app.plugin.module_payment.account.enums import TransferStatusEnum stmt = select(TransferModel).where( TransferModel.out_biz_no.isnot(None), ).order_by(TransferModel.id.asc()) result = await auth.db.execute(stmt) all_transfers = result.scalars().all() synced = 0 errors = 0 details = [] _has_permission = True for transfer in all_transfers: out_biz_no = transfer.out_biz_no eid = transfer.enterprise_id if not out_biz_no or not eid: continue try: result = await cls._sync_transfer_detail(auth, out_biz_no, eid) if result is False: # 两个方案都失败了(无权限),停止全量同步 _has_permission = False break if isinstance(result, str): synced += 1 details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": result}) else: details.append({"out_biz_no": out_biz_no, "status": transfer.status, "action": "no_change"}) except Exception as e: errors += 1 details.append({"out_biz_no": out_biz_no, "status": transfer.status, "error": str(e)}) log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}") if not _has_permission: dealing = [t for t in all_transfers if t.status == TransferStatusEnum.DEALING.value] return { "total": len(all_transfers), "synced": synced, "no_permission": True, "dealing_count": len(dealing), "details": [{"out_biz_no": t.out_biz_no, "status": t.status} for t in dealing], "note": "无法通过支付宝 API 查询转账状态,请在开放平台开通 alipay.fund.trans.common.query 权限,或逐个使用 sync-status 手动补录", } if synced > 0: await auth.db.flush() return { "total": len(all_transfers), "synced": synced, "errors": errors, "details": details, } @classmethod async def _sync_transfer_detail( cls, auth: AuthSchema, out_biz_no: str, enterprise_id: str, ) -> str | bool | None: """查询单笔转账详情并更新本地记录 优先调 fund.trans.common.query,无权限时改用 consume.detail.query(用 order_no 当 pay_no 查) 返回: 新状态str / False(无权限) / None(失败/无变化) """ from sqlalchemy import select, update as sa_update from app.plugin.module_payment.account.model import TransferModel from app.core.alipay import AlipayClient # 先查本地记录 tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no) tf_result = await auth.db.execute(tf_stmt) local_transfer = tf_result.scalar_one_or_none() # — 方案A: fund.trans.common.query — try: from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import ( AlipayFundTransCommonQueryRequest, ) from alipay.aop.api.domain.AlipayFundTransCommonQueryModel import ( AlipayFundTransCommonQueryModel, ) from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import ( AlipayFundTransCommonQueryResponse, ) model = AlipayFundTransCommonQueryModel() model.out_biz_no = out_biz_no model.product_code = "TRANS_ACCOUNT_NO_PWD" model.biz_scene = "DIRECT_TRANSFER" request = AlipayFundTransCommonQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if response: result = AlipayFundTransCommonQueryResponse() result.parse_response_content(response) if result.is_success(): alipay_status = getattr(result, 'status', None) if alipay_status and alipay_status != "DEALING": return await cls._apply_transfer_update(auth, out_biz_no, result, alipay_status) return None sub_msg = getattr(result, 'sub_msg', '') or '' if '权限' not in sub_msg and 'NO_PERMISSION' not in sub_msg: return None # 权限不足,继续方案B except ImportError: pass # — 方案B: consume.detail.query(用 order_no 当 pay_no 查) — order_no = local_transfer.order_no if local_transfer else None if not order_no: log.warning(f"无 order_no 可用于查询: out_biz_no={out_biz_no}") return False try: from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import ( AlipayCommerceEcConsumeDetailQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import ( AlipayCommerceEcConsumeDetailQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import ( AlipayCommerceEcConsumeDetailQueryResponse, ) model = AlipayCommerceEcConsumeDetailQueryModel() model.pay_no = order_no model.enterprise_id = enterprise_id request = AlipayCommerceEcConsumeDetailQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: return False result = AlipayCommerceEcConsumeDetailQueryResponse() result.parse_response_content(response) if not result.is_success(): sub_code = getattr(result, 'sub_code', '') or '' sub_msg = getattr(result, 'sub_msg', '') or '' # 权限不足 if '权限' in sub_msg or 'NO_PERMISSION' in sub_code: return False log.warning(f"consume.detail.query 查无记录: out_biz_no={out_biz_no}, err={sub_msg}") return None consume_info = getattr(result, 'consume_info', None) if not consume_info: return None consume_type = getattr(consume_info, 'consume_type', '') if consume_type != "TRANSFER": return None notify_reason = getattr(consume_info, 'notify_reason', '') or '' if 'SUCCESS' in notify_reason.upper(): new_status = "SUCCESS" elif 'FAIL' in notify_reason.upper(): new_status = "FAIL" else: return None update_data = {"status": new_status} pay_no = getattr(consume_info, 'pay_no', None) if pay_no and pay_no != order_no: update_data["order_no"] = pay_no upd = sa_update(TransferModel).where( TransferModel.out_biz_no == out_biz_no ).values(**update_data) await auth.db.execute(upd) log.info(f"转账同步(consume详情) - out_biz_no={out_biz_no}, status={new_status}") return new_status except ImportError: log.warning("consume.detail.query SDK 不可用") return False except Exception as e: log.warning(f"consume.detail.query 异常: out_biz_no={out_biz_no}, err={e}") return False @classmethod async def _apply_transfer_update( cls, auth: AuthSchema, out_biz_no: str, result: object, alipay_status: str, ) -> str | None: """根据 fund.trans.common.query 结果更新本地记录""" from sqlalchemy import update as sa_update from app.plugin.module_payment.account.model import TransferModel update_data = {"status": alipay_status} order_no = getattr(result, 'order_id', None) pay_fund_order_id = getattr(result, 'pay_fund_order_id', None) trans_amount = getattr(result, 'trans_amount', None) error_code = getattr(result, 'error_code', None) fail_reason = getattr(result, 'fail_reason', None) if order_no: update_data["order_no"] = order_no if pay_fund_order_id: update_data["fund_order_id"] = pay_fund_order_id if trans_amount: update_data["amount"] = Decimal(str(trans_amount)) if error_code: update_data["error_code"] = error_code if fail_reason: update_data["error_msg"] = fail_reason if update_data.get("status") != "DEALING": upd = sa_update(TransferModel).where( TransferModel.out_biz_no == out_biz_no ).values(**update_data) await auth.db.execute(upd) log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}") return alipay_status return None @classmethod async def update_transfer_status_service( cls, auth: AuthSchema, order_no: str, status: str, ext_info: dict = {} ) -> None: """ 更新转账状态(由通知处理器调用) """ crud = TransferCRUD(auth) transfer = await crud.get_by_order_no(order_no) if not transfer: log.warning(f"转账记录不存在: {order_no}") return update_data = {} update_data["status"] = status if ext_info: update_data["ext_info"] = ext_info await crud.update_by_order_no(order_no, update_data) @classmethod async def update_deposit_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, ) -> None: """ 更新充值状态(由通知处理器调用) """ crud = DepositCRUD(auth) deposit = await crud.get_by_out_biz_no(out_biz_no) if not deposit: log.warning(f"充值记录不存在: {out_biz_no}") return update_data = {"status": status} await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def update_withdraw_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, error_code: str | None = None, error_msg: str | None = None, ) -> None: """ 更新提现状态(由通知处理器调用) """ crud = WithdrawCRUD(auth) withdraw = await crud.get_by_out_biz_no(out_biz_no) if not withdraw: log.warning(f"提现记录不存在: {out_biz_no}") return update_data = {"status": status} if error_code: update_data["error_code"] = error_code if error_msg: update_data["error_msg"] = error_msg await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def consume_detail_query_service( cls, auth: AuthSchema, pay_no: str, enterprise_id: str | None = None, ant_shop_id: str | None = None, query_options: list[str] | None = None, ) -> dict: """ 账单详情查询(✅) 调用: alipay.commerce.ec.consume.detail.query 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。 """ from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import ( AlipayCommerceEcConsumeDetailQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import ( AlipayCommerceEcConsumeDetailQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import ( AlipayCommerceEcConsumeDetailQueryResponse, ) model = AlipayCommerceEcConsumeDetailQueryModel() model.pay_no = pay_no if enterprise_id: model.enterprise_id = enterprise_id if ant_shop_id: model.ant_shop_id = ant_shop_id if query_options: model.query_options = query_options request = AlipayCommerceEcConsumeDetailQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="账单详情查询失败: 无响应") result = AlipayCommerceEcConsumeDetailQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"账单详情查询失败: {result.msg}") consume_info = result.consume_info if not consume_info: raise CustomException(msg="账单详情查询失败: 无账单信息") return { "account_id": consume_info.account_id, "pay_no": consume_info.pay_no, "consume_type": consume_info.consume_type, "gmt_biz_create": consume_info.gmt_biz_create, "consume_biz_type": consume_info.consume_biz_type, "consume_amount": consume_info.consume_amount, "order_complete_label": consume_info.order_complete_label, "refund_status": consume_info.refund_status, "refund_amount": consume_info.refund_amount, "peer_payer_card_name": consume_info.peer_payer_card_name, "user_id": getattr(consume_info, 'user_id', None), "open_id": getattr(consume_info, 'open_id', None), "enterprise_id": consume_info.enterprise_id, "employee_id": consume_info.employee_id, "enterprise_name": getattr(consume_info, 'enterprise_name', None), "employee_name": getattr(consume_info, 'employee_name', None), "consume_scene_code": getattr(consume_info, 'consume_scene_code', None), "consume_type_sub_category": getattr(consume_info, 'consume_type_sub_category', None), "consume_title": getattr(consume_info, 'consume_title', None), "gmt_pay": getattr(consume_info, 'gmt_pay', None), "gmt_refund": getattr(consume_info, 'gmt_refund', None), "pay_amount": getattr(consume_info, 'pay_amount', None), "invoice_amount": getattr(consume_info, 'invoice_amount', None), "peer_pay_amount": getattr(consume_info, 'peer_pay_amount', None), "subsidy_amount": getattr(consume_info, 'subsidy_amount', None), "ext_infos": getattr(consume_info, 'ext_infos', None), }