import os import tempfile from datetime import datetime, timedelta from fastapi import UploadFile from app.api.v1.module_system.auth.schema import AuthSchema from app.config.setting import settings from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.upload_util import UploadUtil from .crud import FacetofaceCRUD from .enums import FacetofaceOrderStatus from .schema import ( FacetofaceApplySchema, FacetofaceOrderListOutSchema, FacetofaceOrderOutSchema, ) class FacetofaceService: """当面付开通服务""" @classmethod async def apply_service( cls, auth: AuthSchema, data: FacetofaceApplySchema, shop_scene_pic: UploadFile | None = None, shop_sign_board_pic: UploadFile | None = None, business_license_pic: UploadFile | None = None, ) -> FacetofaceOrderOutSchema: """ 提交当面付开通申请 三步操作: agent.create → facetoface.sign → agent.confirm """ from alipay.aop.api.FileItem import FileItem from alipay.aop.api.domain.AlipayOpenAgentCreateModel import AlipayOpenAgentCreateModel from alipay.aop.api.request.AlipayOpenAgentCreateRequest import AlipayOpenAgentCreateRequest from alipay.aop.api.response.AlipayOpenAgentCreateResponse import AlipayOpenAgentCreateResponse from alipay.aop.api.request.AlipayOpenAgentFacetofaceSignRequest import AlipayOpenAgentFacetofaceSignRequest from alipay.aop.api.response.AlipayOpenAgentFacetofaceSignResponse import AlipayOpenAgentFacetofaceSignResponse from alipay.aop.api.domain.AlipayOpenAgentConfirmModel import AlipayOpenAgentConfirmModel from alipay.aop.api.request.AlipayOpenAgentConfirmRequest import AlipayOpenAgentConfirmRequest from alipay.aop.api.response.AlipayOpenAgentConfirmResponse import AlipayOpenAgentConfirmResponse client = AlipayClient.get_client() crud = FacetofaceCRUD(auth) # 检查该企业是否已有申请单 existing = await crud.get_by_enterprise_id(data.enterprise_id) if existing and existing.order_status not in ( FacetofaceOrderStatus.CLOSED.value, ): raise CustomException(msg="该企业已有进行中的当面付申请") # Step 1: 创建应用事务,获取 batch_no create_model = AlipayOpenAgentCreateModel() create_request = AlipayOpenAgentCreateRequest() create_request.biz_model = create_model response = client.execute(create_request) if not response: raise CustomException(msg="创建应用事务失败: 无响应") create_result = AlipayOpenAgentCreateResponse() create_result.parse_response_content(response) if not create_result.is_success(): log.error(f"创建应用事务失败: {create_result.code} - {create_result.msg} - {create_result.sub_msg}") raise CustomException(msg=f"创建应用事务失败: {create_result.sub_msg or create_result.msg}") batch_no = create_result.batch_no if not batch_no: raise CustomException(msg="创建应用事务失败: 未返回 batch_no") log.info(f"当面付申请 - Step1 创建事务成功: batch_no={batch_no}") # Step 2: 提交当面付开通申请 sign_request = AlipayOpenAgentFacetofaceSignRequest() sign_request.batch_no = batch_no sign_request.shop_name = data.shop_name if data.shop_address: sign_request.shop_address = data.shop_address if data.mcc_code: sign_request.mcc_code = data.mcc_code if data.rate: sign_request.rate = data.rate if data.business_license_no: sign_request.business_license_no = data.business_license_no if data.business_license_mobile: sign_request.business_license_mobile = data.business_license_mobile if data.sign_and_auth: sign_request.sign_and_auth = "true" # 处理图片上传 image_paths: dict[str, str | None] = { "shop_scene_pic_path": None, "shop_sign_board_pic_path": None, "business_license_pic_path": None, } file_mapping = [ ("shop_scene_pic", shop_scene_pic, "shop_scene_pic_path"), ("shop_sign_board_pic", shop_sign_board_pic, "shop_sign_board_pic_path"), ("business_license_pic", business_license_pic, "business_license_pic_path"), ] for attr_name, upload_file, path_key in file_mapping: if upload_file and upload_file.filename: content = await upload_file.read() await upload_file.seek(0) file_item = FileItem( file_name=upload_file.filename, file_content=content, mime_type=upload_file.content_type or "application/octet-stream", ) sign_request.__setattr__(attr_name, file_item) saved_path = await cls._save_uploaded_file(upload_file) image_paths[path_key] = saved_path response = client.execute(sign_request) if not response: raise CustomException(msg="提交当面付申请失败: 无响应") sign_result = AlipayOpenAgentFacetofaceSignResponse() sign_result.parse_response_content(response) if not sign_result.is_success(): log.error(f"提交当面付申请失败: {sign_result.code} - {sign_result.msg} - {sign_result.sub_msg}") raise CustomException(msg=f"提交当面付申请失败: {sign_result.sub_msg or sign_result.msg}") log.info(f"当面付申请 - Step2 提交签约成功: batch_no={batch_no}") # Step 3: 确认提交事务 confirm_model = AlipayOpenAgentConfirmModel() confirm_model.batch_no = batch_no confirm_request = AlipayOpenAgentConfirmRequest() confirm_request.biz_model = confirm_model response = client.execute(confirm_request) if not response: raise CustomException(msg="确认提交事务失败: 无响应") confirm_result = AlipayOpenAgentConfirmResponse() confirm_result.parse_response_content(response) if not confirm_result.is_success(): log.error(f"确认提交事务失败: {confirm_result.code} - {confirm_result.msg} - {confirm_result.sub_msg}") raise CustomException(msg=f"确认提交事务失败: {confirm_result.sub_msg or confirm_result.msg}") log.info(f"当面付申请 - Step3 确认事务成功: batch_no={batch_no}") # 保存申请单到数据库 now = datetime.now() create_data = { "enterprise_id": data.enterprise_id, "batch_no": batch_no, "order_status": FacetofaceOrderStatus.SUBMITTED.value, "shop_scene_pic_path": image_paths["shop_scene_pic_path"], "shop_sign_board_pic_path": image_paths["shop_sign_board_pic_path"], "business_license_pic_path": image_paths["business_license_pic_path"], "merchant_name": data.merchant_name, "shop_name": data.shop_name, "shop_address": data.shop_address, "mcc_code": data.mcc_code, "rate": data.rate, "business_license_no": data.business_license_no, "business_license_mobile": data.business_license_mobile, "sign_and_auth": data.sign_and_auth, "remark": data.remark, "next_query_time": now + timedelta(minutes=5), "query_count": 0, } # 如果该企业已有已关闭的申请单,更新而非新建 if existing: obj = await crud.get(id=existing.id, preload=[]) if obj: for key, value in create_data.items(): if hasattr(obj, key): setattr(obj, key, value) await auth.db.flush() await auth.db.refresh(obj) return FacetofaceOrderOutSchema.model_validate(obj) order = await crud.create(create_data) if not order: raise CustomException(msg="保存申请单失败") return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def query_order_service( cls, auth: AuthSchema, order_id: int ) -> FacetofaceOrderOutSchema: """手动查询单个申请单状态""" crud = FacetofaceCRUD(auth) order = await crud.get(id=order_id) if not order: raise CustomException(msg="申请单不存在") if not order.batch_no: raise CustomException(msg="申请单无事务编号,无法查询") terminal_statuses = [ FacetofaceOrderStatus.SUCCESS.value, FacetofaceOrderStatus.CLOSED.value, ] if order.order_status in terminal_statuses: return FacetofaceOrderOutSchema.model_validate(order) await cls._query_and_update_order(crud, order) await auth.db.refresh(order) return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def _query_and_update_order(cls, crud: FacetofaceCRUD, order) -> None: """查询支付宝接口并更新申请单状态""" from alipay.aop.api.domain.AlipayOpenAgentOrderQueryModel import AlipayOpenAgentOrderQueryModel from alipay.aop.api.request.AlipayOpenAgentOrderQueryRequest import AlipayOpenAgentOrderQueryRequest from alipay.aop.api.response.AlipayOpenAgentOrderQueryResponse import AlipayOpenAgentOrderQueryResponse client = AlipayClient.get_client() query_model = AlipayOpenAgentOrderQueryModel() query_model.batch_no = order.batch_no query_request = AlipayOpenAgentOrderQueryRequest() query_request.biz_model = query_model try: response = client.execute(query_request) except Exception as e: log.error(f"查询当面付申请单状态异常: batch_no={order.batch_no}, error={e}") return if not response: log.warning(f"查询当面付申请单无响应: batch_no={order.batch_no}") return result = AlipayOpenAgentOrderQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"查询当面付申请单失败: batch_no={order.batch_no}, {result.code} - {result.sub_msg or result.msg}") return now = datetime.now() update_data: dict = { "last_query_time": now, "query_count": order.query_count + 1, } if result.order_no: update_data["order_no"] = result.order_no if result.confirm_url: update_data["confirm_url"] = result.confirm_url if result.reject_reason: update_data["reject_reason"] = result.reject_reason alipay_status = result.order_status if alipay_status: if alipay_status == "MERCHANT_CONFIRM": update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_CONFIRM.value # 等待商家确认,继续轮询 update_data["next_query_time"] = now + timedelta(hours=4) elif alipay_status == "MERCHANT_AUDITING": update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_AUDITING.value update_data["next_query_time"] = now + timedelta(hours=4) elif alipay_status in ("MERCHANT_AGREED", "AGENT_BINDAPP_SUCCESS"): update_data["order_status"] = FacetofaceOrderStatus.SUCCESS.value update_data["next_query_time"] = None elif alipay_status in ("MERCHANT_REJECTED", "MERCHANT_CANCELLED", "AUDIT_REJECTED", "AUDIT_FAILED"): update_data["order_status"] = FacetofaceOrderStatus.CLOSED.value update_data["next_query_time"] = None else: update_data["next_query_time"] = now + timedelta(hours=4) log.info(f"当面付申请单状态更新: batch_no={order.batch_no}, alipay_status={alipay_status}, local_status={update_data.get('order_status', order.order_status)}") obj = await crud.get(id=order.id, preload=[]) if obj: for key, value in update_data.items(): if hasattr(obj, key): setattr(obj, key, value) await crud.auth.db.flush() @classmethod async def batch_status_service( cls, auth: AuthSchema, enterprise_ids: list[str] ) -> dict[str, str | None]: """批量查询企业当面付状态""" crud = FacetofaceCRUD(auth) result: dict[str, str | None] = {} for eid in enterprise_ids: order = await crud.get_by_enterprise_id(eid) result[eid] = order.order_status if order else None return result @classmethod async def get_by_enterprise_service( cls, auth: AuthSchema, enterprise_id: str ) -> FacetofaceOrderOutSchema | None: """按企业ID查询当面付申请单""" crud = FacetofaceCRUD(auth) order = await crud.get_by_enterprise_id(enterprise_id) if not order: return None return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """查询申请单列表""" crud = FacetofaceCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=FacetofaceOrderListOutSchema, ) @classmethod async def detail_service( cls, auth: AuthSchema, order_id: int ) -> FacetofaceOrderOutSchema: """查询申请单详情""" crud = FacetofaceCRUD(auth) order = await crud.get(id=order_id) if not order: raise CustomException(msg="申请单不存在") return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def _save_uploaded_file(cls, upload_file: UploadFile) -> str: """保存上传的文件到本地,返回文件路径""" await upload_file.seek(0) content = await upload_file.read() await upload_file.seek(0) ext = os.path.splitext(upload_file.filename or "file")[1] or ".jpg" safe_name = f"f2f_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.urandom(4).hex()}{ext}" upload_dir = settings.UPLOAD_FILE_PATH.joinpath("facetoface", datetime.now().strftime("%Y/%m/%d")) upload_dir.mkdir(parents=True, exist_ok=True) filepath = upload_dir.joinpath(safe_name) with open(filepath, "wb") as f: f.write(content) log.info(f"当面付图片已保存: {filepath}") return str(filepath) @classmethod async def poll_pending_orders(cls) -> int: """ 轮询待处理的申请单(供定时任务调用) 返回本次处理的申请单数量 """ from app.core.database import async_db_session async with async_db_session() as db: from app.api.v1.module_system.auth.schema import AuthSchema auth = AuthSchema(db=db, user=None, tenant_id=1) crud = FacetofaceCRUD(auth) orders = await crud.get_pending_orders() if not orders: return 0 count = 0 for order in orders: try: await cls._query_and_update_order(crud, order) count += 1 except Exception as e: log.error(f"轮询当面付申请单异常: batch_no={order.batch_no}, error={e}") await db.commit() log.info(f"当面付申请单轮询完成: 处理 {count}/{len(orders)} 条") return count