import io from typing import Annotated, Any, Optional, Dict from fastapi import APIRouter, Body, Depends, Path, Query from fastapi.responses import JSONResponse, StreamingResponse from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.common.response import ResponseSchema, SuccessResponse from app.core.dependencies import AuthPermission from app.core.logger import log from app.core.router_class import OperationLogRoute from app.core.base_params import PaginationQueryParam from app.core.dependencies import redis_getter from .schema import ( AccountAuthorizeApplySchema, AccountAuthorizeApplyOutSchema, AccountCreateSchema, AccountDepositSchema, AccountDepositOutSchema, AccountOperationOutSchema, AccountQuerySchema, AccountTransferSchema, AccountWithdrawSchema, ReceiptApplySchema, ReceiptApplyOutSchema, ReceiptQueryOutSchema, TransferListOutSchema, TransferOutSchema, TransferTaskOutSchema, ConsumeDetailOutSchema, ) from .service import AccountService AccountRouter = APIRouter( route_class=OperationLogRoute, prefix="/account", tags=["资金专户"], ) @AccountRouter.post( "/authorize/apply", summary="申请转账授权签约", description="申请转账授权签约", response_model=ResponseSchema[AccountAuthorizeApplyOutSchema], ) async def authorize_apply_controller( data: AccountAuthorizeApplySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:authorize"]))], ) -> JSONResponse: """申请转账授权签约""" result = await AccountService.authorize_apply_service(auth=auth, data=data) log.info(f"申请转账授权签约成功: {data.enterprise_id}") return SuccessResponse(data=result, msg="申请转账授权签约成功") @AccountRouter.post( "", summary="开通资金专户", description="开通资金专户", response_model=ResponseSchema[AccountOperationOutSchema], ) async def create_account_controller( data: AccountCreateSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:create"]))], ) -> JSONResponse: """开通资金专户""" result = await AccountService.create_account_service(auth=auth, data=data) log.info(f"开通资金专户成功: {data.enterprise_id}") return SuccessResponse(data=result, msg="开通资金专户成功") @AccountRouter.get( "/query/{enterprise_id}", summary="查询资金专户", description="根据企业ID查询资金专户(调用支付宝接口)", response_model=ResponseSchema[list[Any]], ) async def query_account_controller( # data: AccountQuerySchema, enterprise_id: Annotated[str, Path(description="企业ID")], auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:detail"]))], ) -> JSONResponse: """查询资金专户""" result = await AccountService.query_account_service(auth=auth, data=AccountQuerySchema(enterprise_id=enterprise_id)) log.info(f"查询资金专户成功: {enterprise_id}") return SuccessResponse(data=result, msg="查询资金专户成功") @AccountRouter.post( "/deposit", summary="资金专户充值", description="从支付宝余额向资金专户充值", response_model=ResponseSchema[AccountDepositOutSchema], ) async def deposit_controller( data: AccountDepositSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:deposit"]))], ) -> JSONResponse: """资金专户充值""" result = await AccountService.deposit_service(auth=auth, data=data) log.info(f"资金专户充值发起成功: {data.enterprise_id} -> {str(data.amount)}") return SuccessResponse(data=result, msg="充值页面获取成功,请跳转完成支付") @AccountRouter.post( "/transfer", summary="资金专户转账", description="从资金专户转账到支付宝账户/银行卡/资金专户", response_model=ResponseSchema[TransferTaskOutSchema], ) async def transfer_controller( data: AccountTransferSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer"]))], ) -> JSONResponse: """资金专户转账""" result = await AccountService.transfer_service(auth=auth, data=data) log.info(f"资金专户转账发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}") return SuccessResponse(data=result, msg="转账申请已提交") @AccountRouter.post( "/withdraw", summary="资金专户提现", description="从资金专户向支付宝余额提现。调用支付宝接口: alipay.commerce.ec.trans.account.withdraw", response_model=ResponseSchema[AccountOperationOutSchema], ) async def withdraw_controller( data: AccountWithdrawSchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:withdraw"]))], ) -> JSONResponse: """ 资金专户提现 接口文档: https://opendocs.alipay.com/pre-open/d651859b_alipay.commerce.ec.trans.account.withdraw 参数说明: - enterprise_id: 企业ID - account_book_id: 资金专户号 - amount: 提现金额(大于0) - out_biz_no: 商家侧订单号(唯一) 返回: - enterprise_id: 企业ID - account_book_id: 资金专户号 """ result = await AccountService.withdraw_service(auth=auth, data=data) log.info(f"资金专户提现发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}") return SuccessResponse(data=result, msg="提现申请已提交") @AccountRouter.get( "/transfer/{out_biz_no}", summary="查询转账记录详情", description="根据订单号查询转账记录", response_model=ResponseSchema[TransferOutSchema], ) async def transfer_detail_controller( out_biz_no: Annotated[str, Path(description="商家侧订单号")], auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer:detail"]))], ) -> JSONResponse: """查询转账记录详情""" result = await AccountService.transfer_detail_service(auth=auth, out_biz_no=out_biz_no) log.info(f"查询转账记录详情成功: {out_biz_no}") return SuccessResponse(data=result, msg="查询转账记录详情成功") @AccountRouter.get( "/transfer", summary="查询转账记录列表", description="分页查询转账记录列表", response_model=ResponseSchema[TransferListOutSchema], ) async def transfer_list_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer:list"]))], page_no: Annotated[int, Query(description="页码")] = 1, page_size: Annotated[int, Query(description="每页数量")] = 20, out_biz_no: Annotated[str | None, Query(description="订单号")] = None, status: Annotated[str | None, Query(description="状态")] = None, ) -> JSONResponse: """查询转账记录列表""" search = {} if out_biz_no: search["out_biz_no"] = out_biz_no if status: search["status"] = status result = await AccountService.transfer_list_service( auth=auth, page_no=page_no, page_size=page_size, search=search ) return SuccessResponse(data=result, msg="查询转账记录列表成功") @AccountRouter.get( "/consume-detail", summary="账单详情查询", description="查询企业码账单详情,支持查询关联退款、订单、票据等信息", response_model=ResponseSchema[ConsumeDetailOutSchema], ) async def consume_detail_query_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:consume:detail"]))], pay_no: Annotated[str, Query(description="支付宝账单号")], enterprise_id: Annotated[str | None, Query(description="企业ID(2.0接口签约企业必填)")] = None, ant_shop_id: Annotated[str | None, Query(description="蚂蚁门店ID(商户服务商必填)")] = None, query_options: Annotated[str | None, Query(description="查询选项,多个用逗号分隔")] = None, ) -> JSONResponse: """ 账单详情查询 调用: alipay.commerce.ec.consume.detail.query 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。 - pay_no: 支付宝账单号(必填) - enterprise_id: 企业ID(2.0接口签约企业必填) - ant_shop_id: 蚂蚁门店ID(商户服务商必填) - query_options: 查询选项,支持 Refund/Order/Ticket,多个用逗号分隔 """ options_list = None if query_options: options_list = [opt.strip() for opt in query_options.split(",") if opt.strip()] result = await AccountService.consume_detail_query_service( auth=auth, pay_no=pay_no, enterprise_id=enterprise_id, ant_shop_id=ant_shop_id, query_options=options_list, ) log.info(f"账单详情查询成功: {pay_no}") return SuccessResponse(data=result, msg="账单详情查询成功") @AccountRouter.get( "/transfer/export", summary="导出转账记录报表", description="导出指定时间范围内的转账记录为Excel文件", responses={ 200: { "description": "Excel文件下载", "content": { "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": {} } } }, ) async def transfer_export_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer:list"]))], start_time: Annotated[str, Query(description="开始时间,格式:YYYY-MM-DD HH:MM:SS")], end_time: Annotated[str, Query(description="结束时间,格式:YYYY-MM-DD HH:MM:SS")], enterprise_id: Annotated[str | None, Query(description="企业ID(2.0接口签约企业必填)")] = None, ) -> StreamingResponse: """导出转账记录报表""" result = await AccountService.transfer_export_service( auth=auth, start_time=start_time, end_time=end_time, enterprise_id=enterprise_id ) log.info(f"导出转账记录报表成功: {start_time} -> {end_time}") return StreamingResponse( io.BytesIO(result), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={ "Content-Disposition": f"attachment; filename=transfer_report_{start_time}_{end_time}.xlsx" } ) # @AccountRouter.post( # "/receipt/apply", # summary="申请资金回单", # description="申请资金业务回单", # response_model=ResponseSchema[ReceiptApplyOutSchema], # ) # async def receipt_apply_controller( # data: ReceiptApplySchema, # auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:receipt"]))], # ) -> JSONResponse: # """申请资金回单""" # result = await AccountService.receipt_apply_service(auth=auth, data=data) # log.info(f"申请资金回单成功: {data.order_no}") # return SuccessResponse(data=result, msg="申请资金回单成功") @AccountRouter.post( "/receipt/apply", summary="申请转账业务回单", description="申请指定转账单号的业务回单,返回 file_id 用于查询状态", response_model=ResponseSchema[ReceiptApplyOutSchema], ) async def receipt_apply_controller( data: ReceiptApplySchema, auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:receipt"]))], redis: Annotated[Redis, Depends(redis_getter)], ) -> JSONResponse: """ 申请转账业务回单 调用: alipay.commerce.ec.trans.receipt.apply 参数: - enterprise_id: 企业ID - order_no: 支付宝转账单号 """ file_id = await AccountService.apply_receipt_service( auth=auth, redis=redis, data=data, ) log.info(f"申请转账业务回单成功: order_no={data.order_no}, file_id={file_id}") return SuccessResponse(data={"file_id": file_id}, msg="申请成功") @AccountRouter.get( "/receipt/{enterprise_id}/{file_id}", summary="查询回单状态", description="查询转账回单的处理状态和下载链接", response_model=ResponseSchema[ReceiptQueryOutSchema], ) async def receipt_query_controller( enterprise_id: Annotated[str, Path(description="企业ID")], file_id: Annotated[str, Path(description="文件申请号")], auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:receipt"]))], ) -> JSONResponse: """ 查询回单状态 调用: alipay.commerce.ec.trans.receipt.query 参数: - file_id: 文件申请号(有效期2天) """ result = await AccountService.query_receipt_service( enterprise_id=enterprise_id, file_id=file_id, ) log.info(f"查询回单状态成功: file_id={file_id}, status={result['status']}") return SuccessResponse(data=result, msg="查询成功") @AccountRouter.get( "/receipt/download", summary="获取回单下载链接", description="获取回单下载链接(封装申请+查询,PROCESS状态直接返回,由前端轮询)", response_model=ResponseSchema[ReceiptQueryOutSchema], ) async def receipt_download_controller( auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:receipt"]))], redis: Annotated[Redis, Depends(redis_getter)], enterprise_id: Annotated[str, Query(description="企业ID")], order_no: Annotated[str, Query(description="支付宝转账单号")], ) -> JSONResponse: """ 获取回单下载链接 封装完整流程: 1. 申请/获取 file_id(自动缓存) 2. 查询回单状态 3. 直接返回状态(PROCESS状态由前端主动查询) """ file_id = await AccountService.apply_receipt_service( auth=auth, redis=redis, data=ReceiptApplySchema(enterprise_id=enterprise_id, order_no=order_no), ) result = await AccountService.query_receipt_service( enterprise_id=enterprise_id, file_id=file_id, ) log.info(f"获取回单下载链接: file_id={file_id}, status={result['status']}") return SuccessResponse(data=result, msg="操作成功") # @AccountRouter.post( # "/transfer/batch", # summary="批量资金专户转账", # description="异步批量转账,先放入队列,后台处理", # response_model=ResponseSchema[AccountTransferBatchOutSchema], # ) # async def batch_transfer_controller( # data: AccountTransferBatchSchema, # auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer"]))], # ) -> JSONResponse: # """批量资金专户转账(异步)""" # result = await AccountService.batch_transfer_service(auth=auth, data=data) # log.info(f"批量转账任务已提交: {result.batch_id}, 总笔数: {result.total}") # return SuccessResponse(data=result, msg="批量转账任务已提交,正在处理中") # @AccountRouter.get( # "/transfer/batch/{batch_id}", # summary="查询批量转账结果", # description="查询批量转账的处理状态和结果", # response_model=ResponseSchema[dict], # ) # async def get_batch_result_controller( # batch_id: Annotated[str, Path(description="批次ID")], # auth: Annotated[AuthSchema, Depends(AuthPermission(["module_payment:account:transfer"]))], # ) -> JSONResponse: # """查询批量转账结果""" # result = await AccountService.get_batch_result_service(auth=auth, batch_id=batch_id) # log.info(f"查询批量转账结果成功: {batch_id}") # return SuccessResponse(data=result, msg="查询批量转账结果成功")