from datetime import datetime from decimal import Decimal from typing import Any, Optional from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id_str from app.plugin.module_payment.enterprise.crud import EnterpriseCRUD from .crud import AccountCRUD, TransferCRUD, DepositCRUD, WithdrawCRUD from .enums import ( DepositStatusEnum, WithdrawStatusEnum, ) from .schema import ( AccountAuthorizeApplySchema, AccountAuthorizeApplyOutSchema, AccountCreateSchema, AccountDepositSchema, AccountDepositOutSchema, AccountOperationOutSchema, AccountQuerySchema, AccountTransferSchema, AccountTransferOutSchema, AccountWithdrawSchema, ReceiptApplySchema, TransferListOutSchema, TransferOutSchema, TenantTransferCreate, TenantTransferResponse, ) from ..openapi.crud import OpenTransferCRUD def _parse_dt(val: str | None) -> datetime | None: """解析支付宝日期字符串""" if not val: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(val, fmt) except ValueError: continue return None class AccountService: """资金专户服务层""" @classmethod async def stat_transfer_amount_service( cls, auth: AuthSchema, tenant_id: Optional[int] = None, enterprise_id: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ) -> Decimal: """ 统计转账金额(✅) 统计企业在指定时间范围内的转账总金额以及每天的转账金额。 """ crud = TransferCRUD(auth) return await crud.get_transfer_amount( tenant_id=tenant_id, enterprise_id=enterprise_id, start_date=start_date, end_date=end_date, ) @classmethod async def authorize_apply_service( cls, auth: AuthSchema, data: AccountAuthorizeApplySchema ) -> AccountAuthorizeApplyOutSchema: """ 申请转账授权签约(✅) 调用: alipay.commerce.ec.trans.authorize.apply """ from alipay.aop.api.request.AlipayCommerceEcTransAuthorizeApplyRequest import ( AlipayCommerceEcTransAuthorizeApplyRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAuthorizeApplyModel import ( AlipayCommerceEcTransAuthorizeApplyModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAuthorizeApplyResponse import ( AlipayCommerceEcTransAuthorizeApplyResponse, ) model = AlipayCommerceEcTransAuthorizeApplyModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAuthorizeApplyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请转账授权失败: 无响应") result = AlipayCommerceEcTransAuthorizeApplyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"申请转账授权失败: {result.msg}") return AccountAuthorizeApplyOutSchema( sign_url=result.sign_url, ) @classmethod async def create_account_service( cls, auth: AuthSchema, data: AccountCreateSchema ) -> AccountOperationOutSchema: """ 开通资金专户(✅) 调用: alipay.commerce.ec.trans.account.create """ from alipay.aop.api.request.AlipayCommerceEcTransAccountCreateRequest import ( AlipayCommerceEcTransAccountCreateRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountCreateModel import ( AlipayCommerceEcTransAccountCreateModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountCreateResponse import ( AlipayCommerceEcTransAccountCreateResponse, ) model = AlipayCommerceEcTransAccountCreateModel() model.enterprise_id = data.enterprise_id # model.account_type = data.account_type or "ALL" # 收支全能户 # model.scene = data.scene or "B2B_TRANS" # ToB转账场景 model.account_type = "ALL" model.scene = "B2B_TRANS" request = AlipayCommerceEcTransAccountCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="开通资金专户失败: 无响应") result = AlipayCommerceEcTransAccountCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"开通资金专户失败: {result.msg}") account_data = AccountCreateSchema( enterprise_id=model.enterprise_id, account_book_id=result.account_book_id, account_type=model.account_type, scene=model.scene, ) if result.account_book_id: account_data.account_book_id = result.account_book_id await AccountCRUD(auth).create(account_data) return AccountOperationOutSchema( enterprise_id=account_data.enterprise_id, account_book_id=account_data.account_book_id, ) @classmethod async def deposit_service( cls, auth: AuthSchema, data: AccountDepositSchema ) -> AccountDepositOutSchema: """ 资金专户充值(✅) 调用: alipay.commerce.ec.trans.account.deposit """ from alipay.aop.api.request.AlipayCommerceEcTransAccountDepositRequest import ( AlipayCommerceEcTransAccountDepositRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountDepositModel import ( AlipayCommerceEcTransAccountDepositModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountDepositResponse import ( AlipayCommerceEcTransAccountDepositResponse, ) model = AlipayCommerceEcTransAccountDepositModel() model.enterprise_id = data.enterprise_id model.account_book_id = data.account_book_id model.amount = str(data.amount) model.out_biz_no = get_snowflake_id_str(auth.tenant_id) request = AlipayCommerceEcTransAccountDepositRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="充值失败: 无响应") result = AlipayCommerceEcTransAccountDepositResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"充值失败: {result.msg}") deposit_crud = DepositCRUD(auth) deposit_data = { "enterprise_id": data.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": data.account_book_id, "amount": data.amount, "url": result.url, "status": DepositStatusEnum.DEALING.value, "remark": data.remark, } await deposit_crud.create(deposit_data) return AccountDepositOutSchema( url=result.url, ) @classmethod async def transfer_service( cls, auth: AuthSchema, data: AccountTransferSchema ) -> AccountTransferOutSchema: """ 资金专户转账(✅) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != auth.tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(auth.tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}") transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, } await transfer_crud.create(transfer_data) return AccountTransferOutSchema( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, out_biz_no=model.out_biz_no, ) @classmethod async def tenant_transfer_service( cls, auth: AuthSchema, tenant_id: int, data: TenantTransferCreate, request_ip: str, api_key_id: int | None = None, ) -> TenantTransferResponse: """ 租户API转账(通过API Key认证) 调用: alipay.commerce.ec.trans.account.transfer """ from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import ( AlipayCommerceEcTransAccountTransferRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import ( AlipayCommerceEcTransAccountTransferModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import ( AlipayCommerceEcTransAccountTransferResponse, ) from alipay.aop.api.domain.TransParticipant import ( TransParticipant, ) from alipay.aop.api.domain.BankCardExtInfoDTO import ( BankCardExtInfoDTO, ) # 检查资金专户是否存在 account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id) if not account: raise CustomException(msg="资金账户不存在") if account.tenant_id != tenant_id: raise CustomException(msg="无权限操作") if data.enterprise_id and account.enterprise_id != data.enterprise_id: raise CustomException(msg="参数错误") if not data.order_title and account.enterprise_id: enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id) if not enterprise: raise CustomException(msg="资金账户所属企业不存在") data.order_title = f"来自{enterprise.name}转账" model = AlipayCommerceEcTransAccountTransferModel() model.enterprise_id = account.enterprise_id model.account_book_id = account.account_book_id model.out_biz_no = get_snowflake_id_str(tenant_id) # 转账总金额,单位为元,精确到小数点后两位 model.amount = str(data.amount) model.order_title = data.order_title payee_info = TransParticipant() payee_info.identity_type = data.payee_info.identity_type payee_info.name = data.payee_info.name payee_info.identity = data.payee_info.identity if data.payee_info.bankcard_ext_info: payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict( data.payee_info.bankcard_ext_info.model_dump(exclude_none=True) ) model.payee_info = payee_info request = AlipayCommerceEcTransAccountTransferRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="转账失败: 无响应") result = AlipayCommerceEcTransAccountTransferResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}") transfer_crud = TransferCRUD(auth) transfer_data = { "enterprise_id": model.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": model.account_book_id, "amount": model.amount, "order_title": model.order_title, "payee_info": data.payee_info.model_dump() if data.payee_info else None, "status": result.status, "order_no": result.order_no, "fund_order_id": result.fund_order_id, } await transfer_crud.create(transfer_data) return TenantTransferResponse( status=result.status, order_no=result.order_no, fund_order_id=result.fund_order_id, ) @classmethod async def withdraw_service( cls, auth: AuthSchema, data: AccountWithdrawSchema ) -> AccountOperationOutSchema: """ 资金专户提现 调用: alipay.commerce.ec.trans.account.withdraw 接口文档: https://opendocs.alipay.com/pre-open/d651859b_alipay.commerce.ec.trans.account.withdraw 参数说明: - enterprise_id: 企业ID - account_book_id: 资金专户号 - amount: 提现金额 - out_biz_no: 商家侧订单号(唯一) """ from alipay.aop.api.request.AlipayCommerceEcTransAccountWithdrawRequest import ( AlipayCommerceEcTransAccountWithdrawRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountWithdrawModel import ( AlipayCommerceEcTransAccountWithdrawModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountWithdrawResponse import ( AlipayCommerceEcTransAccountWithdrawResponse, ) crud = AccountCRUD(auth) enterprise = await crud.get_by_enterprise_id(data.enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") model = AlipayCommerceEcTransAccountWithdrawModel() model.enterprise_id = enterprise.enterprise_id model.account_book_id = data.account_book_id model.amount = str(data.amount) model.out_biz_no = get_snowflake_id_str(auth.tenant_id) request = AlipayCommerceEcTransAccountWithdrawRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="提现失败: 无响应") result = AlipayCommerceEcTransAccountWithdrawResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"提现失败: {result.msg}") withdraw_crud = WithdrawCRUD(auth) withdraw_data = { "enterprise_id": data.enterprise_id, "out_biz_no": model.out_biz_no, "account_book_id": data.account_book_id, "amount": data.amount, # 专户提现到余额户是同步操作,要么执行成功,要么执行异常, # 出参status设计多余,遵循规范使用业务码区分成功与失败 "status": WithdrawStatusEnum.SUCCESS.value, "order_no": result.order_no, } await withdraw_crud.create(withdraw_data) log.info(f"资金专户提现发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}") return AccountOperationOutSchema( enterprise_id=data.enterprise_id, account_book_id=data.account_book_id, ) @classmethod async def query_account_service( cls, auth: AuthSchema, data: AccountQuerySchema ) -> list[Any]: """ 查询资金专户(调用支付宝接口) 调用: alipay.commerce.ec.trans.account.query """ from alipay.aop.api.request.AlipayCommerceEcTransAccountQueryRequest import ( AlipayCommerceEcTransAccountQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransAccountQueryModel import ( AlipayCommerceEcTransAccountQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcTransAccountQueryResponse import ( AlipayCommerceEcTransAccountQueryResponse, ) from alipay.aop.api.domain.FundAccountApiDTO import ( FundAccountApiDTO, ) model = AlipayCommerceEcTransAccountQueryModel() model.enterprise_id = data.enterprise_id request = AlipayCommerceEcTransAccountQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询资金专户失败: 无响应") result = AlipayCommerceEcTransAccountQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询资金专户失败: {result.msg}") collect = [] for v in list(result.account_list or []): if not hasattr(v, "account_book_id"): continue if not hasattr(v, "scene") or v.scene != "B2B_TRANS": continue account = FundAccountApiDTO.to_alipay_dict(v) collect.append(account) return collect @classmethod async def transfer_detail_service( cls, auth: AuthSchema, out_biz_no: str ) -> TransferOutSchema: """ 查询转账记录详情 """ crud = TransferCRUD(auth) transfer = await crud.get_by_out_biz_no(out_biz_no) if not transfer: raise CustomException(msg="转账记录不存在") transfer_result = TransferOutSchema.model_validate(transfer) # 查询三方订单号 open_transfer_crud = OpenTransferCRUD(auth) open_transfer_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no) if open_transfer_data: transfer_result.third_biz_no = open_transfer_data.third_biz_no return transfer_result @classmethod async def transfer_list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """ 查询转账记录列表 """ log.info(f"查询转账记录列表: {page_no}, {page_size}, {search}") crud = TransferCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=TransferListOutSchema, ) @classmethod async def transfer_export_service( cls, auth: AuthSchema, start_time: str, end_time: str, enterprise_id: Optional[str] = None, ) -> bytes: """ 导出转账记录报表为Excel文件 """ log.info(f"导出转账记录报表: {start_time} -> {end_time}") crud = TransferCRUD(auth) search = { "created_time__gte": start_time, "created_time__lte": end_time, } if enterprise_id: search["enterprise_id"] = enterprise_id records = await crud.list( search=search, order_by=[{"id": "desc"}], ) from app.utils.excel_util import ExcelUtil status_map = { "DEALING": "处理中", "SUCCESS": "成功", "FAIL": "失败", "REFUND": "退票", } payee_type_map = { "ALIPAY_ACCOUNT": "支付宝账户", "BANK_CARD": "银行卡", } list_data = [] for i, record in enumerate(records, start=1): payee_info = record.payee_info or {} list_data.append({ "序号": i, "订单号": record.out_biz_no or "", "商户订单号": record.order_no or "", "金额(元)": str(record.amount or 0), "收款方姓名": payee_info.get("name", ""), "收款方类型": payee_type_map.get(payee_info.get("identity_type", ""), ""), "状态": status_map.get(record.status, record.status), "转账标题": record.order_title or "", "创建时间": record.created_time.strftime("%Y-%m-%d %H:%M:%S") if record.created_time else "", }) mapping_dict = { "序号": "序号", "订单号": "订单号", "商户订单号": "商户订单号", "金额(元)": "金额(元)", "收款方姓名": "收款方姓名", "收款方类型": "收款方类型", "状态": "状态", "转账标题": "转账标题", "创建时间": "创建时间", } return ExcelUtil.export_list2excel(list_data, mapping_dict) @classmethod async def apply_receipt_service( cls, auth: AuthSchema, redis: Redis, data: ReceiptApplySchema, ) -> str: """ 申请转账业务回单 调用: alipay.commerce.ec.trans.receipt.apply 参数: - enterprise_id: 企业ID - order_no: 支付宝转账单号 返回: file_id """ from app.core.redis_crud import RedisCURD redis_crud = RedisCURD(redis) cache_key = f"receipt:{data.enterprise_id}:{data.order_no}" cached_file_id = await redis_crud.get(cache_key) if cached_file_id: log.info(f"使用缓存的 file_id: {cached_file_id}") return cached_file_id crud = EnterpriseCRUD(auth) enterprise = await crud.get_by_enterprise_id(data.enterprise_id) if not enterprise: raise CustomException(msg="企业不存在") from alipay.aop.api.request.AlipayCommerceEcTransReceiptApplyRequest import ( AlipayCommerceEcTransReceiptApplyRequest, ) from alipay.aop.api.domain.AlipayCommerceEcTransReceiptApplyModel import ( AlipayCommerceEcTransReceiptApplyModel, ) from alipay.aop.api.response.AlipayCommerceEcTransReceiptApplyResponse import ( AlipayCommerceEcTransReceiptApplyResponse, ) model = AlipayCommerceEcTransReceiptApplyModel() model.enterprise_id = data.enterprise_id model.order_no = data.order_no request = AlipayCommerceEcTransReceiptApplyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="申请回单失败: 无响应") result = AlipayCommerceEcTransReceiptApplyResponse() result.parse_response_content(response) if not result.is_success(): # 清除缓存 await redis_crud.delete(cache_key) raise CustomException(msg=f"申请回单失败: {result.msg}") file_id = str(result.file_id) await redis_crud.set(cache_key, file_id, expire=172800) log.info(f"申请回单成功: order_no={data.order_no}, file_id={file_id}") return file_id @classmethod async def query_receipt_service(cls, enterprise_id: str, file_id: str) -> dict: """ 查询回单状态 调用: alipay.commerce.ec.trans.receipt.query 参数: - file_id: 文件申请号 返回: {file_id, status, download_url, error_message} """ from alipay.aop.api.request.AlipayCommerceEcTransReceiptQueryRequest import ( AlipayCommerceEcTransReceiptQueryRequest, ) from alipay.aop.api.response.AlipayCommerceEcTransReceiptQueryResponse import ( AlipayCommerceEcTransReceiptQueryResponse, ) from alipay.aop.api.domain.AlipayCommerceEcTransReceiptQueryModel import ( AlipayCommerceEcTransReceiptQueryModel, ) model = AlipayCommerceEcTransReceiptQueryModel() model.enterprise_id = enterprise_id model.file_id = file_id request = AlipayCommerceEcTransReceiptQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询回单失败: 无响应") result = AlipayCommerceEcTransReceiptQueryResponse() result.parse_response_content(response) if not result.is_success(): raise CustomException(msg=f"查询回单失败: {result.msg}") data = { "file_id": file_id, "status": result.status, "download_url": result.download_url, "error_message": result.error_message, } return data @classmethod async def transfer_sync_status_service( cls, auth: AuthSchema, data: "TransferSyncStatusSchema", ) -> dict: """ 手动同步转账状态(管理员补录) 用于修复因通知丢失而卡在 DEALING 的转账记录 """ from app.plugin.module_payment.account.crud import TransferCRUD from app.plugin.module_payment.account.schema import TransferSyncStatusSchema crud = TransferCRUD(auth) transfer = await crud.get_by_out_biz_no(data.out_biz_no) if not transfer: raise CustomException(msg=f"转账记录不存在: {data.out_biz_no}") if transfer.status != "DEALING" and data.status == "SUCCESS": raise CustomException(msg=f"转账记录当前状态为 {transfer.status},无需同步") update_data = {"status": data.status} if data.error_code: update_data["error_code"] = data.error_code if data.error_msg: update_data["error_msg"] = data.error_msg for key, value in update_data.items(): if hasattr(transfer, key): setattr(transfer, key, value) await auth.db.flush() await auth.db.refresh(transfer) log.info(f"手动同步转账状态成功: out_biz_no={data.out_biz_no}, {transfer.status}") return { "out_biz_no": transfer.out_biz_no, "status": transfer.status, "error_code": transfer.error_code, "error_msg": transfer.error_msg, } @classmethod async def transfer_sync_all_service( cls, auth: AuthSchema, start_date: str | None = None, end_date: str | None = None, ) -> dict: """ 全量拉取所有企业的消费/转账记录,同步到 pay_bill / pay_transfer 使用 consume.detail.batchquery 分批拉取 + fund.trans.common.query 补全转账详情 """ from sqlalchemy import select, update as sa_update, insert as sa_insert from app.plugin.module_payment.account.model import TransferModel from app.plugin.module_payment.account.enums import TransferStatusEnum from app.plugin.module_payment.notification.model import PayBillModel from datetime import datetime, timedelta now = datetime.now() if not end_date: end_date = now.strftime("%Y-%m-%d") if not start_date: start_date = (now - timedelta(days=90)).strftime("%Y-%m-%d") # 检查 consume.detail.batchquery 是否可用 _can_batch = False try: from alipay.aop.api.request.AlipayCommerceEcConsumeDetailBatchqueryRequest import ( AlipayCommerceEcConsumeDetailBatchqueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailBatchqueryModel import ( AlipayCommerceEcConsumeDetailBatchqueryModel, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailBatchqueryResponse import ( AlipayCommerceEcConsumeDetailBatchqueryResponse, ) _can_batch = True except ImportError: log.warning("SDK 不支持 consume.detail.batchquery,降级为仅同步 DEALING 转账") from app.core.alipay import AlipayClient # 获取所有企业(绕过权限过滤) from app.plugin.module_payment.enterprise.model import EnterpriseModel ent_stmt = select(EnterpriseModel).where(EnterpriseModel.enterprise_id.isnot(None)) ent_result = await auth.db.execute(ent_stmt) enterprises = ent_result.scalars().all() if not enterprises and not _can_batch: # 降级模式:直接查 DEALING 转账 stmt = select(TransferModel).where( TransferModel.status == TransferStatusEnum.DEALING.value, TransferModel.out_biz_no.isnot(None), ) result = await auth.db.execute(stmt) dealing = result.scalars().all() return { "total": len(dealing), "synced": 0, "bill_synced": 0, "transfer_synced": 0, "details": [{"out_biz_no": t.out_biz_no, "status": "DEALING", "hint": "手动同步"} for t in dealing], "note": "SDK不支持批量查询,请逐一使用 sync-status 同步", } client = AlipayClient.get_client() total_records = 0 bill_upserted = 0 transfer_upserted = 0 transfer_detail_synced = 0 errors = 0 details = [] for enterprise in enterprises: eid = enterprise.enterprise_id page_num = 1 page_size = 100 while True: try: batch_model = AlipayCommerceEcConsumeDetailBatchqueryModel() batch_model.enterprise_id = eid batch_model.start_date = start_date batch_model.end_date = end_date batch_model.page_num = page_num batch_model.page_size = page_size request = AlipayCommerceEcConsumeDetailBatchqueryRequest() request.biz_model = batch_model response = client.execute(request) if not response: break batch_result = AlipayCommerceEcConsumeDetailBatchqueryResponse() batch_result.parse_response_content(response) if not batch_result.is_success(): errors += 1 break consume_list = getattr(batch_result, 'consume_info_list', None) if not consume_list: break for c in consume_list: total_records += 1 pay_no = getattr(c, 'pay_no', None) consume_type = getattr(c, 'consume_type', None) if not pay_no: continue # — 1) 落库 pay_bill(有则更新,无则新增) — bill_data = { "pay_no": pay_no, "enterprise_id": getattr(c, 'enterprise_id', eid), "account_id": getattr(c, 'account_id', ''), "employee_id": getattr(c, 'employee_id', ''), "consume_type": consume_type or '', "consume_amount": Decimal(str(getattr(c, 'consume_amount', 0))), "gmt_biz_create": _parse_dt(getattr(c, 'gmt_biz_create', None)), "gmt_recieve_pay": _parse_dt(getattr(c, 'gmt_recieve_pay', None)), "peer_pay_amount": Decimal(str(getattr(c, 'peer_pay_amount', 0))) if getattr(c, 'peer_pay_amount', None) else None, "notify_reason": getattr(c, 'notify_reason', 'SYNC'), "notify_msg": getattr(c, 'notify_msg', '全量同步'), "related_pay_no": getattr(c, 'related_pay_no', None), "status": "PROCESSED", } # 检查 pay_bill 是否已存在 bill_stmt = select(PayBillModel).where(PayBillModel.pay_no == pay_no) bill_exist = (await auth.db.execute(bill_stmt)).scalar_one_or_none() if bill_exist: for k, v in bill_data.items(): if k != 'pay_no' and hasattr(bill_exist, k) and v is not None: setattr(bill_exist, k, v) else: ins = sa_insert(PayBillModel).values(**bill_data) await auth.db.execute(ins) bill_upserted += 1 # — 2) 如果是转账,同步 pay_transfer — if consume_type == "TRANSFER": out_biz_no = getattr(c, 'out_biz_no', None) if not out_biz_no: # 尝试从 ext_infos 取 ext = getattr(c, 'ext_infos', None) if isinstance(ext, dict): out_biz_no = ext.get('out_biz_no') if out_biz_no: # 查 local tf_stmt = select(TransferModel).where(TransferModel.out_biz_no == out_biz_no) tf_exist = (await auth.db.execute(tf_stmt)).scalar_one_or_none() if not tf_exist: # 新增转账记录 ins_tf = sa_insert(TransferModel).values( out_biz_no=out_biz_no, enterprise_id=eid, amount=Decimal(str(getattr(c, 'consume_amount', 0))), status=TransferStatusEnum.DEALING.value, ) await auth.db.execute(ins_tf) transfer_upserted += 1 details.append({"pay_no": pay_no, "out_biz_no": out_biz_no, "type": "TRANSFER", "action": "insert"}) # 新记录尝试查转账详情 try: await cls._sync_transfer_detail(auth, out_biz_no, eid) transfer_detail_synced += 1 except Exception as e: log.warning(f"查询转账详情失败: out_biz_no={out_biz_no}, err={e}") else: # 已有记录,尝试更新状态 if tf_exist.status == TransferStatusEnum.DEALING.value: try: await cls._sync_transfer_detail(auth, out_biz_no, eid) transfer_detail_synced += 1 except Exception as e: log.warning(f"查询转账详情失败: out_biz_no={out_biz_no}, err={e}") details.append({"pay_no": pay_no, "out_biz_no": out_biz_no, "type": "TRANSFER", "action": "exists"}) else: details.append({"pay_no": pay_no, "type": "TRANSFER", "action": "skipped", "reason": "no_out_biz_no"}) else: details.append({"pay_no": pay_no, "type": consume_type, "action": "bill_only"}) except Exception as e: errors += 1 log.warning(f"全量同步 - 企业 {eid} 第 {page_num} 页异常: {e}") break # 翻页 current_page = getattr(batch_result, 'page_num', 0) or 0 total_pages = getattr(batch_result, 'total_page_count', 0) or 0 if current_page >= total_pages: break page_num += 1 if bill_upserted > 0 or transfer_upserted > 0: await auth.db.flush() return { "total_records": total_records, "bill_upserted": bill_upserted, "transfer_inserted": transfer_upserted, "transfer_detail_synced": transfer_detail_synced, "errors": errors, "date_range": {"start": start_date, "end": end_date}, "note": "pay_bill 已同步,TRANSFER 类型记录已写入/更新 pay_transfer", } @classmethod async def _sync_transfer_detail( cls, auth: AuthSchema, out_biz_no: str, enterprise_id: str, ) -> None: """调用 fund.trans.common.query 查询单笔转账详情并更新本地记录""" from sqlalchemy import update as sa_update from app.plugin.module_payment.account.model import TransferModel from app.core.alipay import AlipayClient try: from alipay.aop.api.request.AlipayFundTransCommonQueryRequest import ( AlipayFundTransCommonQueryRequest, ) from alipay.aop.api.domain.AlipayFundTransCommonQueryModel import ( AlipayFundTransCommonQueryModel, ) from alipay.aop.api.response.AlipayFundTransCommonQueryResponse import ( AlipayFundTransCommonQueryResponse, ) except ImportError: return model = AlipayFundTransCommonQueryModel() model.out_biz_no = out_biz_no model.product_code = "TRANS_ACCOUNT_NO_PWD" model.biz_scene = "DIRECT_TRANSFER" request = AlipayFundTransCommonQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: return result = AlipayFundTransCommonQueryResponse() result.parse_response_content(response) if not result.is_success(): return alipay_status = getattr(result, 'status', None) if not alipay_status: return update_data = {"status": alipay_status} order_no = getattr(result, 'order_id', None) pay_fund_order_id = getattr(result, 'pay_fund_order_id', None) trans_amount = getattr(result, 'trans_amount', None) error_code = getattr(result, 'error_code', None) fail_reason = getattr(result, 'fail_reason', None) pay_date = getattr(result, 'pay_date', None) if order_no: update_data["order_no"] = order_no if pay_fund_order_id: update_data["fund_order_id"] = pay_fund_order_id if trans_amount: update_data["amount"] = Decimal(str(trans_amount)) if error_code: update_data["error_code"] = error_code if fail_reason: update_data["error_msg"] = fail_reason if pay_date: try: update_data["ext_info"] = {"pay_date": pay_date} except Exception: pass if update_data.get("status") != "DEALING": upd = sa_update(TransferModel).where( TransferModel.out_biz_no == out_biz_no ).values(**update_data) await auth.db.execute(upd) log.info(f"转账详情同步 - out_biz_no={out_biz_no}, status={alipay_status}") @classmethod async def update_transfer_status_service( cls, auth: AuthSchema, order_no: str, status: str, ext_info: dict = {} ) -> None: """ 更新转账状态(由通知处理器调用) """ crud = TransferCRUD(auth) transfer = await crud.get_by_order_no(order_no) if not transfer: log.warning(f"转账记录不存在: {order_no}") return update_data = {} update_data["status"] = status if ext_info: update_data["ext_info"] = ext_info await crud.update_by_order_no(order_no, update_data) @classmethod async def update_deposit_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, ) -> None: """ 更新充值状态(由通知处理器调用) """ crud = DepositCRUD(auth) deposit = await crud.get_by_out_biz_no(out_biz_no) if not deposit: log.warning(f"充值记录不存在: {out_biz_no}") return update_data = {"status": status} await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def update_withdraw_status_service( cls, auth: AuthSchema, out_biz_no: str, status: str, error_code: str | None = None, error_msg: str | None = None, ) -> None: """ 更新提现状态(由通知处理器调用) """ crud = WithdrawCRUD(auth) withdraw = await crud.get_by_out_biz_no(out_biz_no) if not withdraw: log.warning(f"提现记录不存在: {out_biz_no}") return update_data = {"status": status} if error_code: update_data["error_code"] = error_code if error_msg: update_data["error_msg"] = error_msg await crud.update_by_out_biz_no(out_biz_no, update_data) @classmethod async def consume_detail_query_service( cls, auth: AuthSchema, pay_no: str, enterprise_id: str | None = None, ant_shop_id: str | None = None, query_options: list[str] | None = None, ) -> dict: """ 账单详情查询(✅) 调用: alipay.commerce.ec.consume.detail.query 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。 """ from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import ( AlipayCommerceEcConsumeDetailQueryRequest, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import ( AlipayCommerceEcConsumeDetailQueryModel, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import ( AlipayCommerceEcConsumeDetailQueryResponse, ) model = AlipayCommerceEcConsumeDetailQueryModel() model.pay_no = pay_no if enterprise_id: model.enterprise_id = enterprise_id if ant_shop_id: model.ant_shop_id = ant_shop_id if query_options: model.query_options = query_options request = AlipayCommerceEcConsumeDetailQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="账单详情查询失败: 无响应") result = AlipayCommerceEcConsumeDetailQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"账单详情查询失败: {result.msg}") consume_info = result.consume_info if not consume_info: raise CustomException(msg="账单详情查询失败: 无账单信息") return { "account_id": consume_info.account_id, "pay_no": consume_info.pay_no, "consume_type": consume_info.consume_type, "gmt_biz_create": consume_info.gmt_biz_create, "consume_biz_type": consume_info.consume_biz_type, "consume_amount": consume_info.consume_amount, "order_complete_label": consume_info.order_complete_label, "refund_status": consume_info.refund_status, "refund_amount": consume_info.refund_amount, "peer_payer_card_name": consume_info.peer_payer_card_name, "user_id": getattr(consume_info, 'user_id', None), "open_id": getattr(consume_info, 'open_id', None), "enterprise_id": consume_info.enterprise_id, "employee_id": consume_info.employee_id, "enterprise_name": getattr(consume_info, 'enterprise_name', None), "employee_name": getattr(consume_info, 'employee_name', None), "consume_scene_code": getattr(consume_info, 'consume_scene_code', None), "consume_type_sub_category": getattr(consume_info, 'consume_type_sub_category', None), "consume_title": getattr(consume_info, 'consume_title', None), "gmt_pay": getattr(consume_info, 'gmt_pay', None), "gmt_refund": getattr(consume_info, 'gmt_refund', None), "pay_amount": getattr(consume_info, 'pay_amount', None), "invoice_amount": getattr(consume_info, 'invoice_amount', None), "peer_pay_amount": getattr(consume_info, 'peer_pay_amount', None), "subsidy_amount": getattr(consume_info, 'subsidy_amount', None), "ext_infos": getattr(consume_info, 'ext_infos', None), }