service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import os
  2. import tempfile
  3. from datetime import datetime, timedelta
  4. from fastapi import UploadFile
  5. from app.api.v1.module_system.auth.schema import AuthSchema
  6. from app.config.setting import settings
  7. from app.core.alipay import AlipayClient
  8. from app.core.exceptions import CustomException
  9. from app.core.logger import log
  10. from app.utils.upload_util import UploadUtil
  11. from .crud import FacetofaceCRUD
  12. from .enums import FacetofaceOrderStatus
  13. from .schema import (
  14. FacetofaceApplySchema,
  15. FacetofaceOrderListOutSchema,
  16. FacetofaceOrderOutSchema,
  17. )
  18. class FacetofaceService:
  19. """当面付开通服务"""
  20. @classmethod
  21. async def apply_service(
  22. cls,
  23. auth: AuthSchema,
  24. data: FacetofaceApplySchema,
  25. shop_scene_pic: UploadFile | None = None,
  26. shop_sign_board_pic: UploadFile | None = None,
  27. business_license_pic: UploadFile | None = None,
  28. ) -> FacetofaceOrderOutSchema:
  29. """
  30. 提交当面付开通申请
  31. 三步操作: agent.create → facetoface.sign → agent.confirm
  32. """
  33. from alipay.aop.api.FileItem import FileItem
  34. from alipay.aop.api.domain.AlipayOpenAgentCreateModel import AlipayOpenAgentCreateModel
  35. from alipay.aop.api.request.AlipayOpenAgentCreateRequest import AlipayOpenAgentCreateRequest
  36. from alipay.aop.api.response.AlipayOpenAgentCreateResponse import AlipayOpenAgentCreateResponse
  37. from alipay.aop.api.request.AlipayOpenAgentFacetofaceSignRequest import AlipayOpenAgentFacetofaceSignRequest
  38. from alipay.aop.api.response.AlipayOpenAgentFacetofaceSignResponse import AlipayOpenAgentFacetofaceSignResponse
  39. from alipay.aop.api.domain.AlipayOpenAgentConfirmModel import AlipayOpenAgentConfirmModel
  40. from alipay.aop.api.request.AlipayOpenAgentConfirmRequest import AlipayOpenAgentConfirmRequest
  41. from alipay.aop.api.response.AlipayOpenAgentConfirmResponse import AlipayOpenAgentConfirmResponse
  42. client = AlipayClient.get_client()
  43. crud = FacetofaceCRUD(auth)
  44. # 检查该企业是否已有申请单
  45. existing = await crud.get_by_enterprise_id(data.enterprise_id)
  46. if existing and existing.order_status not in (
  47. FacetofaceOrderStatus.CLOSED.value,
  48. ):
  49. raise CustomException(msg="该企业已有进行中的当面付申请")
  50. # Step 1: 创建应用事务,获取 batch_no
  51. create_model = AlipayOpenAgentCreateModel()
  52. create_request = AlipayOpenAgentCreateRequest()
  53. create_request.biz_model = create_model
  54. response = client.execute(create_request)
  55. if not response:
  56. raise CustomException(msg="创建应用事务失败: 无响应")
  57. create_result = AlipayOpenAgentCreateResponse()
  58. create_result.parse_response_content(response)
  59. if not create_result.is_success():
  60. log.error(f"创建应用事务失败: {create_result.code} - {create_result.msg} - {create_result.sub_msg}")
  61. raise CustomException(msg=f"创建应用事务失败: {create_result.sub_msg or create_result.msg}")
  62. batch_no = create_result.batch_no
  63. if not batch_no:
  64. raise CustomException(msg="创建应用事务失败: 未返回 batch_no")
  65. log.info(f"当面付申请 - Step1 创建事务成功: batch_no={batch_no}")
  66. # Step 2: 提交当面付开通申请
  67. sign_request = AlipayOpenAgentFacetofaceSignRequest()
  68. sign_request.batch_no = batch_no
  69. sign_request.shop_name = data.shop_name
  70. if data.shop_address:
  71. sign_request.shop_address = data.shop_address
  72. if data.mcc_code:
  73. sign_request.mcc_code = data.mcc_code
  74. if data.rate:
  75. sign_request.rate = data.rate
  76. if data.business_license_no:
  77. sign_request.business_license_no = data.business_license_no
  78. if data.business_license_mobile:
  79. sign_request.business_license_mobile = data.business_license_mobile
  80. if data.sign_and_auth:
  81. sign_request.sign_and_auth = "true"
  82. # 处理图片上传
  83. image_paths: dict[str, str | None] = {
  84. "shop_scene_pic_path": None,
  85. "shop_sign_board_pic_path": None,
  86. "business_license_pic_path": None,
  87. }
  88. file_mapping = [
  89. ("shop_scene_pic", shop_scene_pic, "shop_scene_pic_path"),
  90. ("shop_sign_board_pic", shop_sign_board_pic, "shop_sign_board_pic_path"),
  91. ("business_license_pic", business_license_pic, "business_license_pic_path"),
  92. ]
  93. for attr_name, upload_file, path_key in file_mapping:
  94. if upload_file and upload_file.filename:
  95. content = await upload_file.read()
  96. await upload_file.seek(0)
  97. file_item = FileItem(
  98. file_name=upload_file.filename,
  99. file_content=content,
  100. mime_type=upload_file.content_type or "application/octet-stream",
  101. )
  102. sign_request.__setattr__(attr_name, file_item)
  103. saved_path = await cls._save_uploaded_file(upload_file)
  104. image_paths[path_key] = saved_path
  105. response = client.execute(sign_request)
  106. if not response:
  107. raise CustomException(msg="提交当面付申请失败: 无响应")
  108. sign_result = AlipayOpenAgentFacetofaceSignResponse()
  109. sign_result.parse_response_content(response)
  110. if not sign_result.is_success():
  111. log.error(f"提交当面付申请失败: {sign_result.code} - {sign_result.msg} - {sign_result.sub_msg}")
  112. raise CustomException(msg=f"提交当面付申请失败: {sign_result.sub_msg or sign_result.msg}")
  113. log.info(f"当面付申请 - Step2 提交签约成功: batch_no={batch_no}")
  114. # Step 3: 确认提交事务
  115. confirm_model = AlipayOpenAgentConfirmModel()
  116. confirm_model.batch_no = batch_no
  117. confirm_request = AlipayOpenAgentConfirmRequest()
  118. confirm_request.biz_model = confirm_model
  119. response = client.execute(confirm_request)
  120. if not response:
  121. raise CustomException(msg="确认提交事务失败: 无响应")
  122. confirm_result = AlipayOpenAgentConfirmResponse()
  123. confirm_result.parse_response_content(response)
  124. if not confirm_result.is_success():
  125. log.error(f"确认提交事务失败: {confirm_result.code} - {confirm_result.msg} - {confirm_result.sub_msg}")
  126. raise CustomException(msg=f"确认提交事务失败: {confirm_result.sub_msg or confirm_result.msg}")
  127. log.info(f"当面付申请 - Step3 确认事务成功: batch_no={batch_no}")
  128. # 保存申请单到数据库
  129. now = datetime.now()
  130. create_data = {
  131. "enterprise_id": data.enterprise_id,
  132. "batch_no": batch_no,
  133. "order_status": FacetofaceOrderStatus.SUBMITTED.value,
  134. "shop_scene_pic_path": image_paths["shop_scene_pic_path"],
  135. "shop_sign_board_pic_path": image_paths["shop_sign_board_pic_path"],
  136. "business_license_pic_path": image_paths["business_license_pic_path"],
  137. "merchant_name": data.merchant_name,
  138. "shop_name": data.shop_name,
  139. "shop_address": data.shop_address,
  140. "mcc_code": data.mcc_code,
  141. "rate": data.rate,
  142. "business_license_no": data.business_license_no,
  143. "business_license_mobile": data.business_license_mobile,
  144. "sign_and_auth": data.sign_and_auth,
  145. "remark": data.remark,
  146. "next_query_time": now + timedelta(minutes=5),
  147. "query_count": 0,
  148. }
  149. # 如果该企业已有已关闭的申请单,更新而非新建
  150. if existing:
  151. obj = await crud.get(id=existing.id, preload=[])
  152. if obj:
  153. for key, value in create_data.items():
  154. if hasattr(obj, key):
  155. setattr(obj, key, value)
  156. await auth.db.flush()
  157. await auth.db.refresh(obj)
  158. return FacetofaceOrderOutSchema.model_validate(obj)
  159. order = await crud.create(create_data)
  160. if not order:
  161. raise CustomException(msg="保存申请单失败")
  162. return FacetofaceOrderOutSchema.model_validate(order)
  163. @classmethod
  164. async def query_order_service(
  165. cls, auth: AuthSchema, order_id: int
  166. ) -> FacetofaceOrderOutSchema:
  167. """手动查询单个申请单状态"""
  168. crud = FacetofaceCRUD(auth)
  169. order = await crud.get(id=order_id)
  170. if not order:
  171. raise CustomException(msg="申请单不存在")
  172. if not order.batch_no:
  173. raise CustomException(msg="申请单无事务编号,无法查询")
  174. terminal_statuses = [
  175. FacetofaceOrderStatus.SUCCESS.value,
  176. FacetofaceOrderStatus.CLOSED.value,
  177. ]
  178. if order.order_status in terminal_statuses:
  179. return FacetofaceOrderOutSchema.model_validate(order)
  180. await cls._query_and_update_order(crud, order)
  181. await auth.db.refresh(order)
  182. return FacetofaceOrderOutSchema.model_validate(order)
  183. @classmethod
  184. async def _query_and_update_order(cls, crud: FacetofaceCRUD, order) -> None:
  185. """查询支付宝接口并更新申请单状态"""
  186. from alipay.aop.api.domain.AlipayOpenAgentOrderQueryModel import AlipayOpenAgentOrderQueryModel
  187. from alipay.aop.api.request.AlipayOpenAgentOrderQueryRequest import AlipayOpenAgentOrderQueryRequest
  188. from alipay.aop.api.response.AlipayOpenAgentOrderQueryResponse import AlipayOpenAgentOrderQueryResponse
  189. client = AlipayClient.get_client()
  190. query_model = AlipayOpenAgentOrderQueryModel()
  191. query_model.batch_no = order.batch_no
  192. query_request = AlipayOpenAgentOrderQueryRequest()
  193. query_request.biz_model = query_model
  194. try:
  195. response = client.execute(query_request)
  196. except Exception as e:
  197. log.error(f"查询当面付申请单状态异常: batch_no={order.batch_no}, error={e}")
  198. return
  199. if not response:
  200. log.warning(f"查询当面付申请单无响应: batch_no={order.batch_no}")
  201. return
  202. result = AlipayOpenAgentOrderQueryResponse()
  203. result.parse_response_content(response)
  204. if not result.is_success():
  205. log.error(f"查询当面付申请单失败: batch_no={order.batch_no}, {result.code} - {result.sub_msg or result.msg}")
  206. return
  207. now = datetime.now()
  208. update_data: dict = {
  209. "last_query_time": now,
  210. "query_count": order.query_count + 1,
  211. }
  212. if result.order_no:
  213. update_data["order_no"] = result.order_no
  214. if result.confirm_url:
  215. update_data["confirm_url"] = result.confirm_url
  216. if result.reject_reason:
  217. update_data["reject_reason"] = result.reject_reason
  218. alipay_status = result.order_status
  219. if alipay_status:
  220. if alipay_status == "MERCHANT_CONFIRM":
  221. update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_CONFIRM.value
  222. # 等待商家确认,继续轮询
  223. update_data["next_query_time"] = now + timedelta(hours=4)
  224. elif alipay_status == "MERCHANT_AUDITING":
  225. update_data["order_status"] = FacetofaceOrderStatus.MERCHANT_AUDITING.value
  226. update_data["next_query_time"] = now + timedelta(hours=4)
  227. elif alipay_status in ("MERCHANT_AGREED", "AGENT_BINDAPP_SUCCESS"):
  228. update_data["order_status"] = FacetofaceOrderStatus.SUCCESS.value
  229. update_data["next_query_time"] = None
  230. elif alipay_status in ("MERCHANT_REJECTED", "MERCHANT_CANCELLED", "AUDIT_REJECTED", "AUDIT_FAILED"):
  231. update_data["order_status"] = FacetofaceOrderStatus.CLOSED.value
  232. update_data["next_query_time"] = None
  233. else:
  234. update_data["next_query_time"] = now + timedelta(hours=4)
  235. log.info(f"当面付申请单状态更新: batch_no={order.batch_no}, alipay_status={alipay_status}, local_status={update_data.get('order_status', order.order_status)}")
  236. obj = await crud.get(id=order.id, preload=[])
  237. if obj:
  238. for key, value in update_data.items():
  239. if hasattr(obj, key):
  240. setattr(obj, key, value)
  241. await crud.auth.db.flush()
  242. @classmethod
  243. async def batch_status_service(
  244. cls, auth: AuthSchema, enterprise_ids: list[str]
  245. ) -> dict[str, str | None]:
  246. """批量查询企业当面付状态"""
  247. crud = FacetofaceCRUD(auth)
  248. result: dict[str, str | None] = {}
  249. for eid in enterprise_ids:
  250. order = await crud.get_by_enterprise_id(eid)
  251. result[eid] = order.order_status if order else None
  252. return result
  253. @classmethod
  254. async def get_by_enterprise_service(
  255. cls, auth: AuthSchema, enterprise_id: str
  256. ) -> FacetofaceOrderOutSchema | None:
  257. """按企业ID查询当面付申请单"""
  258. crud = FacetofaceCRUD(auth)
  259. order = await crud.get_by_enterprise_id(enterprise_id)
  260. if not order:
  261. return None
  262. return FacetofaceOrderOutSchema.model_validate(order)
  263. @classmethod
  264. async def list_service(
  265. cls,
  266. auth: AuthSchema,
  267. page_no: int = 1,
  268. page_size: int = 20,
  269. search: dict | None = None,
  270. ) -> dict:
  271. """查询申请单列表"""
  272. crud = FacetofaceCRUD(auth)
  273. offset = (page_no - 1) * page_size
  274. return await crud.page(
  275. offset=offset,
  276. limit=page_size,
  277. order_by=[{"id": "desc"}],
  278. search=search or {},
  279. out_schema=FacetofaceOrderListOutSchema,
  280. )
  281. @classmethod
  282. async def detail_service(
  283. cls, auth: AuthSchema, order_id: int
  284. ) -> FacetofaceOrderOutSchema:
  285. """查询申请单详情"""
  286. crud = FacetofaceCRUD(auth)
  287. order = await crud.get(id=order_id)
  288. if not order:
  289. raise CustomException(msg="申请单不存在")
  290. return FacetofaceOrderOutSchema.model_validate(order)
  291. @classmethod
  292. async def _save_uploaded_file(cls, upload_file: UploadFile) -> str:
  293. """保存上传的文件到本地,返回文件路径"""
  294. await upload_file.seek(0)
  295. content = await upload_file.read()
  296. await upload_file.seek(0)
  297. ext = os.path.splitext(upload_file.filename or "file")[1] or ".jpg"
  298. safe_name = f"f2f_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.urandom(4).hex()}{ext}"
  299. upload_dir = settings.UPLOAD_FILE_PATH.joinpath("facetoface", datetime.now().strftime("%Y/%m/%d"))
  300. upload_dir.mkdir(parents=True, exist_ok=True)
  301. filepath = upload_dir.joinpath(safe_name)
  302. with open(filepath, "wb") as f:
  303. f.write(content)
  304. log.info(f"当面付图片已保存: {filepath}")
  305. return str(filepath)
  306. @classmethod
  307. async def poll_pending_orders(cls) -> int:
  308. """
  309. 轮询待处理的申请单(供定时任务调用)
  310. 返回本次处理的申请单数量
  311. """
  312. from app.core.database import async_db_session
  313. async with async_db_session() as db:
  314. from app.api.v1.module_system.auth.schema import AuthSchema
  315. auth = AuthSchema(db=db, user=None, tenant_id=1)
  316. crud = FacetofaceCRUD(auth)
  317. orders = await crud.get_pending_orders()
  318. if not orders:
  319. return 0
  320. count = 0
  321. for order in orders:
  322. try:
  323. await cls._query_and_update_order(crud, order)
  324. count += 1
  325. except Exception as e:
  326. log.error(f"轮询当面付申请单异常: batch_no={order.batch_no}, error={e}")
  327. await db.commit()
  328. log.info(f"当面付申请单轮询完成: 处理 {count}/{len(orders)} 条")
  329. return count