service.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. from datetime import datetime, timedelta
  2. from app.api.v1.module_system.auth.schema import AuthSchema
  3. from app.core.alipay import AlipayClient
  4. from app.core.exceptions import CustomException
  5. from app.core.logger import log
  6. from .crud import FacetofaceCRUD
  7. from .enums import FacetofaceOrderStatus
  8. from .schema import (
  9. FacetofaceApplySchema,
  10. FacetofaceOrderListOutSchema,
  11. FacetofaceOrderOutSchema,
  12. )
  13. class FacetofaceService:
  14. """当面付开通服务"""
  15. @classmethod
  16. async def apply_service(
  17. cls, auth: AuthSchema, data: FacetofaceApplySchema
  18. ) -> FacetofaceOrderOutSchema:
  19. """
  20. 提交当面付开通申请
  21. 三步操作: agent.create → facetoface.sign → agent.confirm
  22. """
  23. from alipay.aop.api.domain.AlipayOpenAgentCreateModel import AlipayOpenAgentCreateModel
  24. from alipay.aop.api.request.AlipayOpenAgentCreateRequest import AlipayOpenAgentCreateRequest
  25. from alipay.aop.api.response.AlipayOpenAgentCreateResponse import AlipayOpenAgentCreateResponse
  26. from alipay.aop.api.request.AlipayOpenAgentFacetofaceSignRequest import AlipayOpenAgentFacetofaceSignRequest
  27. from alipay.aop.api.response.AlipayOpenAgentFacetofaceSignResponse import AlipayOpenAgentFacetofaceSignResponse
  28. from alipay.aop.api.domain.AlipayOpenAgentConfirmModel import AlipayOpenAgentConfirmModel
  29. from alipay.aop.api.request.AlipayOpenAgentConfirmRequest import AlipayOpenAgentConfirmRequest
  30. from alipay.aop.api.response.AlipayOpenAgentConfirmResponse import AlipayOpenAgentConfirmResponse
  31. client = AlipayClient.get_client()
  32. crud = FacetofaceCRUD(auth)
  33. # 检查该企业是否已有申请单
  34. existing = await crud.get_by_enterprise_id(data.enterprise_id)
  35. if existing and existing.order_status not in (
  36. FacetofaceOrderStatus.CLOSED.value,
  37. ):
  38. raise CustomException(msg="该企业已有进行中的当面付申请")
  39. # Step 1: 创建应用事务,获取 batch_no
  40. create_model = AlipayOpenAgentCreateModel()
  41. create_request = AlipayOpenAgentCreateRequest()
  42. create_request.biz_model = create_model
  43. response = client.execute(create_request)
  44. if not response:
  45. raise CustomException(msg="创建应用事务失败: 无响应")
  46. create_result = AlipayOpenAgentCreateResponse()
  47. create_result.parse_response_content(response)
  48. if not create_result.is_success():
  49. log.error(f"创建应用事务失败: {create_result.code} - {create_result.msg} - {create_result.sub_msg}")
  50. raise CustomException(msg=f"创建应用事务失败: {create_result.sub_msg or create_result.msg}")
  51. batch_no = create_result.batch_no
  52. if not batch_no:
  53. raise CustomException(msg="创建应用事务失败: 未返回 batch_no")
  54. log.info(f"当面付申请 - Step1 创建事务成功: batch_no={batch_no}")
  55. # Step 2: 提交当面付开通申请
  56. sign_request = AlipayOpenAgentFacetofaceSignRequest()
  57. sign_request.batch_no = batch_no
  58. sign_request.shop_name = data.shop_name
  59. if data.shop_address:
  60. sign_request.shop_address = data.shop_address
  61. if data.mcc_code:
  62. sign_request.mcc_code = data.mcc_code
  63. if data.rate:
  64. sign_request.rate = data.rate
  65. if data.business_license_no:
  66. sign_request.business_license_no = data.business_license_no
  67. if data.business_license_mobile:
  68. sign_request.business_license_mobile = data.business_license_mobile
  69. if data.sign_and_auth:
  70. sign_request.sign_and_auth = "true"
  71. response = client.execute(sign_request)
  72. if not response:
  73. raise CustomException(msg="提交当面付申请失败: 无响应")
  74. sign_result = AlipayOpenAgentFacetofaceSignResponse()
  75. sign_result.parse_response_content(response)
  76. if not sign_result.is_success():
  77. log.error(f"提交当面付申请失败: {sign_result.code} - {sign_result.msg} - {sign_result.sub_msg}")
  78. raise CustomException(msg=f"提交当面付申请失败: {sign_result.sub_msg or sign_result.msg}")
  79. log.info(f"当面付申请 - Step2 提交签约成功: batch_no={batch_no}")
  80. # Step 3: 确认提交事务
  81. confirm_model = AlipayOpenAgentConfirmModel()
  82. confirm_model.batch_no = batch_no
  83. confirm_request = AlipayOpenAgentConfirmRequest()
  84. confirm_request.biz_model = confirm_model
  85. response = client.execute(confirm_request)
  86. if not response:
  87. raise CustomException(msg="确认提交事务失败: 无响应")
  88. confirm_result = AlipayOpenAgentConfirmResponse()
  89. confirm_result.parse_response_content(response)
  90. if not confirm_result.is_success():
  91. log.error(f"确认提交事务失败: {confirm_result.code} - {confirm_result.msg} - {confirm_result.sub_msg}")
  92. raise CustomException(msg=f"确认提交事务失败: {confirm_result.sub_msg or confirm_result.msg}")
  93. log.info(f"当面付申请 - Step3 确认事务成功: batch_no={batch_no}")
  94. # 保存申请单到数据库
  95. now = datetime.now()
  96. create_data = {
  97. "enterprise_id": data.enterprise_id,
  98. "batch_no": batch_no,
  99. "order_status": FacetofaceOrderStatus.SUBMITTED.value,
  100. "merchant_name": data.merchant_name,
  101. "shop_name": data.shop_name,
  102. "shop_address": data.shop_address,
  103. "mcc_code": data.mcc_code,
  104. "rate": data.rate,
  105. "business_license_no": data.business_license_no,
  106. "business_license_mobile": data.business_license_mobile,
  107. "sign_and_auth": data.sign_and_auth,
  108. "remark": data.remark,
  109. "next_query_time": now + timedelta(minutes=5),
  110. "query_count": 0,
  111. }
  112. # 如果该企业已有已关闭的申请单,更新而非新建
  113. if existing:
  114. obj = await crud.get(id=existing.id, preload=[])
  115. if obj:
  116. for key, value in create_data.items():
  117. if hasattr(obj, key):
  118. setattr(obj, key, value)
  119. await auth.db.flush()
  120. await auth.db.refresh(obj)
  121. return FacetofaceOrderOutSchema.model_validate(obj)
  122. order = await crud.create(create_data)
  123. if not order:
  124. raise CustomException(msg="保存申请单失败")
  125. return FacetofaceOrderOutSchema.model_validate(order)
  126. @classmethod
  127. async def query_order_service(
  128. cls, auth: AuthSchema, order_id: int
  129. ) -> FacetofaceOrderOutSchema:
  130. """手动查询单个申请单状态"""
  131. crud = FacetofaceCRUD(auth)
  132. order = await crud.get(id=order_id)
  133. if not order:
  134. raise CustomException(msg="申请单不存在")
  135. if not order.batch_no:
  136. raise CustomException(msg="申请单无事务编号,无法查询")
  137. terminal_statuses = [
  138. FacetofaceOrderStatus.SUCCESS.value,
  139. FacetofaceOrderStatus.CLOSED.value,
  140. ]
  141. if order.order_status in terminal_statuses:
  142. return FacetofaceOrderOutSchema.model_validate(order)
  143. await cls._query_and_update_order(crud, order)
  144. await auth.db.refresh(order)
  145. return FacetofaceOrderOutSchema.model_validate(order)
  146. @classmethod
  147. async def _query_and_update_order(cls, crud: FacetofaceCRUD, order) -> None:
  148. """查询支付宝接口并更新申请单状态"""
  149. from alipay.aop.api.domain.AlipayOpenAgentOrderQueryModel import AlipayOpenAgentOrderQueryModel
  150. from alipay.aop.api.request.AlipayOpenAgentOrderQueryRequest import AlipayOpenAgentOrderQueryRequest
  151. from alipay.aop.api.response.AlipayOpenAgentOrderQueryResponse import AlipayOpenAgentOrderQueryResponse
  152. client = AlipayClient.get_client()
  153. query_model = AlipayOpenAgentOrderQueryModel()
  154. query_model.batch_no = order.batch_no
  155. query_request = AlipayOpenAgentOrderQueryRequest()
  156. query_request.biz_model = query_model
  157. try:
  158. response = client.execute(query_request)
  159. except Exception as e:
  160. log.error(f"查询当面付申请单状态异常: batch_no={order.batch_no}, error={e}")
  161. return
  162. if not response:
  163. log.warning(f"查询当面付申请单无响应: batch_no={order.batch_no}")
  164. return
  165. result = AlipayOpenAgentOrderQueryResponse()
  166. result.parse_response_content(response)
  167. if not result.is_success():
  168. log.error(f"查询当面付申请单失败: batch_no={order.batch_no}, {result.code} - {result.sub_msg or result.msg}")
  169. return
  170. now = datetime.now()
  171. update_data: dict = {
  172. "last_query_time": now,
  173. "query_count": order.query_count + 1,
  174. }
  175. if result.order_no:
  176. update_data["order_no"] = result.order_no
  177. if result.confirm_url:
  178. update_data["confirm_url"] = result.confirm_url
  179. if result.reject_reason:
  180. update_data["reject_reason"] = result.reject_reason
  181. alipay_status = result.order_status
  182. if alipay_status:
  183. if alipay_status == "MERCHANT_CONFIRM":
  184. update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_CONFIRM.value
  185. # 等待商家确认,继续轮询
  186. update_data["next_query_time"] = now + timedelta(hours=4)
  187. elif alipay_status == "MERCHANT_AUDITING":
  188. update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_AUDITING.value
  189. update_data["next_query_time"] = now + timedelta(hours=4)
  190. elif alipay_status in ("MERCHANT_AGREED", "AGENT_BINDAPP_SUCCESS"):
  191. update_data["order_status"] = FacetofaceOrderStatus.SUCCESS.value
  192. update_data["next_query_time"] = None
  193. elif alipay_status in ("MERCHANT_REJECTED", "MERCHANT_CANCELLED", "AUDIT_REJECTED", "AUDIT_FAILED"):
  194. update_data["order_status"] = FacetofaceOrderStatus.CLOSED.value
  195. update_data["next_query_time"] = None
  196. else:
  197. update_data["next_query_time"] = now + timedelta(hours=4)
  198. log.info(f"当面付申请单状态更新: batch_no={order.batch_no}, alipay_status={alipay_status}, local_status={update_data.get('order_status', order.order_status)}")
  199. obj = await crud.get(id=order.id, preload=[])
  200. if obj:
  201. for key, value in update_data.items():
  202. if hasattr(obj, key):
  203. setattr(obj, key, value)
  204. await crud.auth.db.flush()
  205. @classmethod
  206. async def batch_status_service(
  207. cls, auth: AuthSchema, enterprise_ids: list[str]
  208. ) -> dict[str, str | None]:
  209. """批量查询企业当面付状态"""
  210. crud = FacetofaceCRUD(auth)
  211. result: dict[str, str | None] = {}
  212. for eid in enterprise_ids:
  213. order = await crud.get_by_enterprise_id(eid)
  214. result[eid] = order.order_status if order else None
  215. return result
  216. @classmethod
  217. async def get_by_enterprise_service(
  218. cls, auth: AuthSchema, enterprise_id: str
  219. ) -> FacetofaceOrderOutSchema | None:
  220. """按企业ID查询当面付申请单"""
  221. crud = FacetofaceCRUD(auth)
  222. order = await crud.get_by_enterprise_id(enterprise_id)
  223. if not order:
  224. return None
  225. return FacetofaceOrderOutSchema.model_validate(order)
  226. @classmethod
  227. async def list_service(
  228. cls,
  229. auth: AuthSchema,
  230. page_no: int = 1,
  231. page_size: int = 20,
  232. search: dict | None = None,
  233. ) -> dict:
  234. """查询申请单列表"""
  235. crud = FacetofaceCRUD(auth)
  236. offset = (page_no - 1) * page_size
  237. return await crud.page(
  238. offset=offset,
  239. limit=page_size,
  240. order_by=[{"id": "desc"}],
  241. search=search or {},
  242. out_schema=FacetofaceOrderListOutSchema,
  243. )
  244. @classmethod
  245. async def detail_service(
  246. cls, auth: AuthSchema, order_id: int
  247. ) -> FacetofaceOrderOutSchema:
  248. """查询申请单详情"""
  249. crud = FacetofaceCRUD(auth)
  250. order = await crud.get(id=order_id)
  251. if not order:
  252. raise CustomException(msg="申请单不存在")
  253. return FacetofaceOrderOutSchema.model_validate(order)
  254. @classmethod
  255. async def poll_pending_orders(cls) -> int:
  256. """
  257. 轮询待处理的申请单(供定时任务调用)
  258. 返回本次处理的申请单数量
  259. """
  260. from app.core.database import async_db_session
  261. async with async_db_session() as db:
  262. from app.api.v1.module_system.auth.schema import AuthSchema
  263. auth = AuthSchema(db=db, user=None, tenant_id=1)
  264. crud = FacetofaceCRUD(auth)
  265. orders = await crud.get_pending_orders()
  266. if not orders:
  267. return 0
  268. count = 0
  269. for order in orders:
  270. try:
  271. await cls._query_and_update_order(crud, order)
  272. count += 1
  273. except Exception as e:
  274. log.error(f"轮询当面付申请单异常: batch_no={order.batch_no}, error={e}")
  275. await db.commit()
  276. log.info(f"当面付申请单轮询完成: 处理 {count}/{len(orders)} 条")
  277. return count