| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- from datetime import datetime, timedelta
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from .crud import FacetofaceCRUD
- from .enums import FacetofaceOrderStatus
- from .schema import (
- FacetofaceApplySchema,
- FacetofaceOrderListOutSchema,
- FacetofaceOrderOutSchema,
- )
- class FacetofaceService:
- """当面付开通服务"""
- @classmethod
- async def apply_service(
- cls, auth: AuthSchema, data: FacetofaceApplySchema
- ) -> FacetofaceOrderOutSchema:
- """
- 提交当面付开通申请
- 三步操作: agent.create → facetoface.sign → agent.confirm
- """
- from alipay.aop.api.domain.AlipayOpenAgentCreateModel import AlipayOpenAgentCreateModel
- from alipay.aop.api.request.AlipayOpenAgentCreateRequest import AlipayOpenAgentCreateRequest
- from alipay.aop.api.response.AlipayOpenAgentCreateResponse import AlipayOpenAgentCreateResponse
- from alipay.aop.api.request.AlipayOpenAgentFacetofaceSignRequest import AlipayOpenAgentFacetofaceSignRequest
- from alipay.aop.api.response.AlipayOpenAgentFacetofaceSignResponse import AlipayOpenAgentFacetofaceSignResponse
- from alipay.aop.api.domain.AlipayOpenAgentConfirmModel import AlipayOpenAgentConfirmModel
- from alipay.aop.api.request.AlipayOpenAgentConfirmRequest import AlipayOpenAgentConfirmRequest
- from alipay.aop.api.response.AlipayOpenAgentConfirmResponse import AlipayOpenAgentConfirmResponse
- client = AlipayClient.get_client()
- crud = FacetofaceCRUD(auth)
- # 检查该企业是否已有申请单
- existing = await crud.get_by_enterprise_id(data.enterprise_id)
- if existing and existing.order_status not in (
- FacetofaceOrderStatus.CLOSED.value,
- ):
- raise CustomException(msg="该企业已有进行中的当面付申请")
- # Step 1: 创建应用事务,获取 batch_no
- create_model = AlipayOpenAgentCreateModel()
- create_request = AlipayOpenAgentCreateRequest()
- create_request.biz_model = create_model
- response = client.execute(create_request)
- if not response:
- raise CustomException(msg="创建应用事务失败: 无响应")
- create_result = AlipayOpenAgentCreateResponse()
- create_result.parse_response_content(response)
- if not create_result.is_success():
- log.error(f"创建应用事务失败: {create_result.code} - {create_result.msg} - {create_result.sub_msg}")
- raise CustomException(msg=f"创建应用事务失败: {create_result.sub_msg or create_result.msg}")
- batch_no = create_result.batch_no
- if not batch_no:
- raise CustomException(msg="创建应用事务失败: 未返回 batch_no")
- log.info(f"当面付申请 - Step1 创建事务成功: batch_no={batch_no}")
- # Step 2: 提交当面付开通申请
- sign_request = AlipayOpenAgentFacetofaceSignRequest()
- sign_request.batch_no = batch_no
- sign_request.shop_name = data.shop_name
- if data.shop_address:
- sign_request.shop_address = data.shop_address
- if data.mcc_code:
- sign_request.mcc_code = data.mcc_code
- if data.rate:
- sign_request.rate = data.rate
- if data.business_license_no:
- sign_request.business_license_no = data.business_license_no
- if data.business_license_mobile:
- sign_request.business_license_mobile = data.business_license_mobile
- if data.sign_and_auth:
- sign_request.sign_and_auth = "true"
- response = client.execute(sign_request)
- if not response:
- raise CustomException(msg="提交当面付申请失败: 无响应")
- sign_result = AlipayOpenAgentFacetofaceSignResponse()
- sign_result.parse_response_content(response)
- if not sign_result.is_success():
- log.error(f"提交当面付申请失败: {sign_result.code} - {sign_result.msg} - {sign_result.sub_msg}")
- raise CustomException(msg=f"提交当面付申请失败: {sign_result.sub_msg or sign_result.msg}")
- log.info(f"当面付申请 - Step2 提交签约成功: batch_no={batch_no}")
- # Step 3: 确认提交事务
- confirm_model = AlipayOpenAgentConfirmModel()
- confirm_model.batch_no = batch_no
- confirm_request = AlipayOpenAgentConfirmRequest()
- confirm_request.biz_model = confirm_model
- response = client.execute(confirm_request)
- if not response:
- raise CustomException(msg="确认提交事务失败: 无响应")
- confirm_result = AlipayOpenAgentConfirmResponse()
- confirm_result.parse_response_content(response)
- if not confirm_result.is_success():
- log.error(f"确认提交事务失败: {confirm_result.code} - {confirm_result.msg} - {confirm_result.sub_msg}")
- raise CustomException(msg=f"确认提交事务失败: {confirm_result.sub_msg or confirm_result.msg}")
- log.info(f"当面付申请 - Step3 确认事务成功: batch_no={batch_no}")
- # 保存申请单到数据库
- now = datetime.now()
- create_data = {
- "enterprise_id": data.enterprise_id,
- "batch_no": batch_no,
- "order_status": FacetofaceOrderStatus.SUBMITTED.value,
- "merchant_name": data.merchant_name,
- "shop_name": data.shop_name,
- "shop_address": data.shop_address,
- "mcc_code": data.mcc_code,
- "rate": data.rate,
- "business_license_no": data.business_license_no,
- "business_license_mobile": data.business_license_mobile,
- "sign_and_auth": data.sign_and_auth,
- "remark": data.remark,
- "next_query_time": now + timedelta(minutes=5),
- "query_count": 0,
- }
- # 如果该企业已有已关闭的申请单,更新而非新建
- if existing:
- obj = await crud.get(id=existing.id, preload=[])
- if obj:
- for key, value in create_data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await auth.db.flush()
- await auth.db.refresh(obj)
- return FacetofaceOrderOutSchema.model_validate(obj)
- order = await crud.create(create_data)
- if not order:
- raise CustomException(msg="保存申请单失败")
- return FacetofaceOrderOutSchema.model_validate(order)
- @classmethod
- async def query_order_service(
- cls, auth: AuthSchema, order_id: int
- ) -> FacetofaceOrderOutSchema:
- """手动查询单个申请单状态"""
- crud = FacetofaceCRUD(auth)
- order = await crud.get(id=order_id)
- if not order:
- raise CustomException(msg="申请单不存在")
- if not order.batch_no:
- raise CustomException(msg="申请单无事务编号,无法查询")
- terminal_statuses = [
- FacetofaceOrderStatus.SUCCESS.value,
- FacetofaceOrderStatus.CLOSED.value,
- ]
- if order.order_status in terminal_statuses:
- return FacetofaceOrderOutSchema.model_validate(order)
- await cls._query_and_update_order(crud, order)
- await auth.db.refresh(order)
- return FacetofaceOrderOutSchema.model_validate(order)
- @classmethod
- async def _query_and_update_order(cls, crud: FacetofaceCRUD, order) -> None:
- """查询支付宝接口并更新申请单状态"""
- from alipay.aop.api.domain.AlipayOpenAgentOrderQueryModel import AlipayOpenAgentOrderQueryModel
- from alipay.aop.api.request.AlipayOpenAgentOrderQueryRequest import AlipayOpenAgentOrderQueryRequest
- from alipay.aop.api.response.AlipayOpenAgentOrderQueryResponse import AlipayOpenAgentOrderQueryResponse
- client = AlipayClient.get_client()
- query_model = AlipayOpenAgentOrderQueryModel()
- query_model.batch_no = order.batch_no
- query_request = AlipayOpenAgentOrderQueryRequest()
- query_request.biz_model = query_model
- try:
- response = client.execute(query_request)
- except Exception as e:
- log.error(f"查询当面付申请单状态异常: batch_no={order.batch_no}, error={e}")
- return
- if not response:
- log.warning(f"查询当面付申请单无响应: batch_no={order.batch_no}")
- return
- result = AlipayOpenAgentOrderQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"查询当面付申请单失败: batch_no={order.batch_no}, {result.code} - {result.sub_msg or result.msg}")
- return
- now = datetime.now()
- update_data: dict = {
- "last_query_time": now,
- "query_count": order.query_count + 1,
- }
- if result.order_no:
- update_data["order_no"] = result.order_no
- if result.confirm_url:
- update_data["confirm_url"] = result.confirm_url
- if result.reject_reason:
- update_data["reject_reason"] = result.reject_reason
- alipay_status = result.order_status
- if alipay_status:
- if alipay_status == "MERCHANT_CONFIRM":
- update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_CONFIRM.value
- # 等待商家确认,继续轮询
- update_data["next_query_time"] = now + timedelta(hours=4)
- elif alipay_status == "MERCHANT_AUDITING":
- update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_AUDITING.value
- update_data["next_query_time"] = now + timedelta(hours=4)
- elif alipay_status in ("MERCHANT_AGREED", "AGENT_BINDAPP_SUCCESS"):
- update_data["order_status"] = FacetofaceOrderStatus.SUCCESS.value
- update_data["next_query_time"] = None
- elif alipay_status in ("MERCHANT_REJECTED", "MERCHANT_CANCELLED", "AUDIT_REJECTED", "AUDIT_FAILED"):
- update_data["order_status"] = FacetofaceOrderStatus.CLOSED.value
- update_data["next_query_time"] = None
- else:
- update_data["next_query_time"] = now + timedelta(hours=4)
- log.info(f"当面付申请单状态更新: batch_no={order.batch_no}, alipay_status={alipay_status}, local_status={update_data.get('order_status', order.order_status)}")
- obj = await crud.get(id=order.id, preload=[])
- if obj:
- for key, value in update_data.items():
- if hasattr(obj, key):
- setattr(obj, key, value)
- await crud.auth.db.flush()
- @classmethod
- async def batch_status_service(
- cls, auth: AuthSchema, enterprise_ids: list[str]
- ) -> dict[str, str | None]:
- """批量查询企业当面付状态"""
- crud = FacetofaceCRUD(auth)
- result: dict[str, str | None] = {}
- for eid in enterprise_ids:
- order = await crud.get_by_enterprise_id(eid)
- result[eid] = order.order_status if order else None
- return result
- @classmethod
- async def get_by_enterprise_service(
- cls, auth: AuthSchema, enterprise_id: str
- ) -> FacetofaceOrderOutSchema | None:
- """按企业ID查询当面付申请单"""
- crud = FacetofaceCRUD(auth)
- order = await crud.get_by_enterprise_id(enterprise_id)
- if not order:
- return None
- return FacetofaceOrderOutSchema.model_validate(order)
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """查询申请单列表"""
- crud = FacetofaceCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=FacetofaceOrderListOutSchema,
- )
- @classmethod
- async def detail_service(
- cls, auth: AuthSchema, order_id: int
- ) -> FacetofaceOrderOutSchema:
- """查询申请单详情"""
- crud = FacetofaceCRUD(auth)
- order = await crud.get(id=order_id)
- if not order:
- raise CustomException(msg="申请单不存在")
- return FacetofaceOrderOutSchema.model_validate(order)
- @classmethod
- async def poll_pending_orders(cls) -> int:
- """
- 轮询待处理的申请单(供定时任务调用)
- 返回本次处理的申请单数量
- """
- from app.core.database import async_db_session
- async with async_db_session() as db:
- from app.api.v1.module_system.auth.schema import AuthSchema
- auth = AuthSchema(db=db, user=None, tenant_id=1)
- crud = FacetofaceCRUD(auth)
- orders = await crud.get_pending_orders()
- if not orders:
- return 0
- count = 0
- for order in orders:
- try:
- await cls._query_and_update_order(crud, order)
- count += 1
- except Exception as e:
- log.error(f"轮询当面付申请单异常: batch_no={order.batch_no}, error={e}")
- await db.commit()
- log.info(f"当面付申请单轮询完成: 处理 {count}/{len(orders)} 条")
- return count
|