from datetime import datetime, timedelta from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from .crud import FacetofaceCRUD from .enums import FacetofaceOrderStatus from .schema import ( FacetofaceApplySchema, FacetofaceOrderListOutSchema, FacetofaceOrderOutSchema, ) class FacetofaceService: """当面付代开通服务""" @classmethod async def apply_service( cls, auth: AuthSchema, data: FacetofaceApplySchema, ) -> FacetofaceOrderOutSchema: """ 提交当面付代开通申请 三步操作: agent.create → agent.facetoface.sign → agent.confirm """ from alipay.aop.api.domain.AlipayOpenAgentCreateModel import AlipayOpenAgentCreateModel from alipay.aop.api.request.AlipayOpenAgentCreateRequest import AlipayOpenAgentCreateRequest from alipay.aop.api.response.AlipayOpenAgentCreateResponse import AlipayOpenAgentCreateResponse from alipay.aop.api.request.AlipayOpenAgentFacetofaceSignRequest import AlipayOpenAgentFacetofaceSignRequest from alipay.aop.api.response.AlipayOpenAgentFacetofaceSignResponse import AlipayOpenAgentFacetofaceSignResponse from alipay.aop.api.domain.AlipayOpenAgentConfirmModel import AlipayOpenAgentConfirmModel from alipay.aop.api.request.AlipayOpenAgentConfirmRequest import AlipayOpenAgentConfirmRequest from alipay.aop.api.response.AlipayOpenAgentConfirmResponse import AlipayOpenAgentConfirmResponse client = AlipayClient.get_client() crud = FacetofaceCRUD(auth) # 检查该企业是否已有进行中的申请 existing = await crud.get_by_enterprise_id(data.enterprise_id) if existing and existing.order_status not in ( FacetofaceOrderStatus.CLOSED.value, ): raise CustomException(msg="该企业已有进行中的当面付申请") # Step 1: 创建应用事务 create_model = AlipayOpenAgentCreateModel() create_model.account = data.account # ContactModel from alipay.aop.api.domain.ContactModel import ContactModel create_model.contact_info = ContactModel() create_model.contact_info.contact_name = data.contact_name create_model.contact_info.contact_mobile = data.contact_mobile if data.contact_email: create_model.contact_info.contact_email = data.contact_email if data.order_ticket: create_model.order_ticket = data.order_ticket create_request = AlipayOpenAgentCreateRequest() create_request.biz_model = create_model response = client.execute(create_request) if not response: raise CustomException(msg="创建应用事务失败: 无响应") create_result = AlipayOpenAgentCreateResponse() create_result.parse_response_content(response) if not create_result.is_success(): log.error(f"创建应用事务失败: {create_result.code} - {create_result.msg} - {create_result.sub_msg}") raise CustomException(msg=f"创建应用事务失败: {create_result.sub_msg or create_result.msg}") batch_no = create_result.batch_no if not batch_no: raise CustomException(msg="创建应用事务失败: 未返回 batch_no") log.info(f"当面付代开通 - Step1 创建事务成功: batch_no={batch_no}, account={data.account}") # Step 2: 提交当面付签约申请 sign_request = AlipayOpenAgentFacetofaceSignRequest() sign_request.batch_no = batch_no if data.sign_and_auth: sign_request.sign_and_auth = "true" if data.rate: sign_request.rate = data.rate response = client.execute(sign_request) if not response: raise CustomException(msg="提交当面付签约失败: 无响应") sign_result = AlipayOpenAgentFacetofaceSignResponse() sign_result.parse_response_content(response) if not sign_result.is_success(): log.error(f"提交当面付签约失败: {sign_result.code} - {sign_result.msg} - {sign_result.sub_msg}") raise CustomException(msg=f"提交当面付签约失败: {sign_result.sub_msg or sign_result.msg}") log.info(f"当面付代开通 - Step2 签约成功: batch_no={batch_no}") # Step 3: 确认提交事务 confirm_model = AlipayOpenAgentConfirmModel() confirm_model.batch_no = batch_no confirm_request = AlipayOpenAgentConfirmRequest() confirm_request.biz_model = confirm_model response = client.execute(confirm_request) if not response: raise CustomException(msg="确认提交事务失败: 无响应") confirm_result = AlipayOpenAgentConfirmResponse() confirm_result.parse_response_content(response) if not confirm_result.is_success(): log.error(f"确认提交事务失败: {confirm_result.code} - {confirm_result.msg} - {confirm_result.sub_msg}") raise CustomException(msg=f"确认提交事务失败: {confirm_result.sub_msg or confirm_result.msg}") log.info(f"当面付代开通 - Step3 确认事务成功: batch_no={batch_no}, order_no={confirm_result.order_no}") # 保存申请单 now = datetime.now() create_data = { "enterprise_id": data.enterprise_id, "batch_no": batch_no, "order_no": confirm_result.order_no, "order_status": FacetofaceOrderStatus.SUBMITTED.value, "account": data.account, "contact_name": data.contact_name, "contact_mobile": data.contact_mobile, "contact_email": data.contact_email, "order_ticket": data.order_ticket, "sign_and_auth": data.sign_and_auth, "rate": data.rate, "remark": data.remark, "app_auth_token": confirm_result.app_auth_token, "app_refresh_token": confirm_result.app_refresh_token, "auth_app_id": confirm_result.auth_app_id, "user_id": confirm_result.user_id, "open_id": confirm_result.open_id, "expires_in": confirm_result.expires_in, "re_expires_in": confirm_result.re_expires_in, "next_query_time": now + timedelta(minutes=5), "query_count": 0, } if existing: obj = await crud.get(id=existing.id, preload=[]) if obj: for key, value in create_data.items(): if hasattr(obj, key): setattr(obj, key, value) await auth.db.flush() await auth.db.refresh(obj) return FacetofaceOrderOutSchema.model_validate(obj) order = await crud.create(create_data) if not order: raise CustomException(msg="保存申请单失败") return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def query_order_service( cls, auth: AuthSchema, order_id: int ) -> FacetofaceOrderOutSchema: """手动查询单个申请单状态""" crud = FacetofaceCRUD(auth) order = await crud.get(id=order_id) if not order: raise CustomException(msg="申请单不存在") if not order.batch_no: raise CustomException(msg="申请单无事务编号,无法查询") terminal_statuses = [ FacetofaceOrderStatus.SUCCESS.value, FacetofaceOrderStatus.CLOSED.value, ] if order.order_status in terminal_statuses: return FacetofaceOrderOutSchema.model_validate(order) await cls._query_and_update_order(crud, order) await auth.db.refresh(order) return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def _query_and_update_order(cls, crud: FacetofaceCRUD, order) -> None: """查询支付宝接口并更新申请单状态""" from alipay.aop.api.domain.AlipayOpenAgentOrderQueryModel import AlipayOpenAgentOrderQueryModel from alipay.aop.api.request.AlipayOpenAgentOrderQueryRequest import AlipayOpenAgentOrderQueryRequest from alipay.aop.api.response.AlipayOpenAgentOrderQueryResponse import AlipayOpenAgentOrderQueryResponse client = AlipayClient.get_client() query_model = AlipayOpenAgentOrderQueryModel() query_model.batch_no = order.batch_no query_request = AlipayOpenAgentOrderQueryRequest() query_request.biz_model = query_model try: response = client.execute(query_request) except Exception as e: log.error(f"查询当面付申请单状态异常: batch_no={order.batch_no}, error={e}") return if not response: log.warning(f"查询当面付申请单无响应: batch_no={order.batch_no}") return result = AlipayOpenAgentOrderQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"查询当面付申请单失败: batch_no={order.batch_no}, {result.code} - {result.sub_msg or result.msg}") return now = datetime.now() update_data: dict = { "last_query_time": now, "query_count": order.query_count + 1, } if result.order_no: update_data["order_no"] = result.order_no if result.confirm_url: update_data["confirm_url"] = result.confirm_url if result.merchant_pid: update_data["merchant_pid"] = result.merchant_pid if result.reject_reason: update_data["reject_reason"] = result.reject_reason alipay_status = result.order_status if alipay_status: if alipay_status == "MERCHANT_CONFIRM": update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_CONFIRM.value update_data["next_query_time"] = now + timedelta(hours=4) elif alipay_status == "MERCHANT_AUDITING": update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_AUDITING.value update_data["next_query_time"] = now + timedelta(hours=4) elif alipay_status == "MERCHANT_CONFIRM_SUCCESS": update_data["order_status"] = FacetofaceOrderStatus.SUCCESS.value update_data["next_query_time"] = None elif alipay_status in ("MERCHANT_APPLY_ORDER_CANCELED", "MERCHANT_CONFIRM_TIME_OUT"): update_data["order_status"] = FacetofaceOrderStatus.CLOSED.value update_data["next_query_time"] = None elif alipay_status == "MERCHANT_INFO_HOLD": # 暂存状态,继续轮询 update_data["next_query_time"] = now + timedelta(hours=4) else: # 未识别状态,继续轮询 update_data["next_query_time"] = now + timedelta(hours=4) log.info( f"当面付申请单状态更新: batch_no={order.batch_no}, " f"alipay_status={alipay_status}, " f"local_status={update_data.get('order_status', order.order_status)}" ) obj = await crud.get(id=order.id, preload=[]) if obj: for key, value in update_data.items(): if hasattr(obj, key): setattr(obj, key, value) await crud.auth.db.flush() @classmethod async def get_by_enterprise_service( cls, auth: AuthSchema, enterprise_id: str ) -> FacetofaceOrderOutSchema | None: crud = FacetofaceCRUD(auth) order = await crud.get_by_enterprise_id(enterprise_id) if not order: return None return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def batch_status_service( cls, auth: AuthSchema, enterprise_ids: list[str] ) -> dict[str, str | None]: crud = FacetofaceCRUD(auth) result: dict[str, str | None] = {} for eid in enterprise_ids: order = await crud.get_by_enterprise_id(eid) result[eid] = order.order_status if order else None return result @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: crud = FacetofaceCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=FacetofaceOrderListOutSchema, ) @classmethod async def detail_service( cls, auth: AuthSchema, order_id: int ) -> FacetofaceOrderOutSchema: crud = FacetofaceCRUD(auth) order = await crud.get(id=order_id) if not order: raise CustomException(msg="申请单不存在") return FacetofaceOrderOutSchema.model_validate(order) @classmethod async def poll_pending_orders(cls) -> int: """ 轮询待处理的申请单(供定时任务调用) """ from app.core.database import async_db_session async with async_db_session() as db: from app.api.v1.module_system.auth.schema import AuthSchema auth = AuthSchema(db=db, user=None, tenant_id=1) crud = FacetofaceCRUD(auth) orders = await crud.get_pending_orders() if not orders: return 0 count = 0 for order in orders: try: await cls._query_and_update_order(crud, order) count += 1 except Exception as e: log.error(f"轮询当面付申请单异常: batch_no={order.batch_no}, error={e}") await db.commit() log.info(f"当面付申请单轮询完成: 处理 {count}/{len(orders)} 条") return count