service.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389
  1. from datetime import datetime, timedelta
  2. from decimal import Decimal
  3. from typing import Any, Optional
  4. from redis.asyncio import Redis
  5. from app.api.v1.module_system.auth.schema import AuthSchema
  6. from app.core.alipay import AlipayClient
  7. from app.core.exceptions import CustomException
  8. from app.core.logger import log
  9. from app.utils.snowflake import get_snowflake_id_str
  10. from app.plugin.module_payment.enterprise.crud import EnterpriseCRUD
  11. from .crud import AccountCRUD, TransferCRUD, DepositCRUD, WithdrawCRUD
  12. from .enums import (
  13. DepositStatusEnum,
  14. WithdrawStatusEnum,
  15. TransferStatusEnum,
  16. )
  17. from .schema import (
  18. AccountAuthorizeApplySchema,
  19. AccountAuthorizeApplyOutSchema,
  20. AccountCreateSchema,
  21. AccountDepositSchema,
  22. AccountDepositOutSchema,
  23. AccountOperationOutSchema,
  24. AccountQuerySchema,
  25. AccountTransferSchema,
  26. AccountTransferOutSchema,
  27. AccountWithdrawSchema,
  28. ReceiptApplySchema,
  29. TransferListOutSchema,
  30. TransferOutSchema,
  31. TenantTransferCreate,
  32. TenantTransferResponse,
  33. )
  34. from ..openapi.crud import OpenTransferCRUD
  35. # 支付宝资金专户转账错误码 → 友好提示
  36. _TRANSFER_ERROR_HINTS = {
  37. "SYSTEM_ERROR": "系统繁忙,请稍后重试",
  38. "INVALID_PARAMETER": "请求参数有误,请检查后重试",
  39. "AMOUNT_LESS_THAN_ONE_CENT": "转账金额不能低于 0.01 元",
  40. "BALANCE_IS_NOT_ENOUGH": "企业余额不足,建议充值",
  41. "BANK_RESPONSE_ERROR": "银行处理失败:账户异常",
  42. "CARD_BIN_ERROR": "收款银行账号不正确,请确认",
  43. "DUPLICATE_DIFFERENT_REQUEST": "重复请求但参数不一致,请检查",
  44. "EXCEED_LIMIT_SM_MIN_AMOUNT": "转账金额不能低于 0.1 元",
  45. "EXCEED_LIMIT_DM_MAX_AMOUNT": "超出单日转账限额,请明天再试或联系管理员提升限额",
  46. "INVALID_ACCOUNT_BOOK": "资金专户不存在,请检查专户号",
  47. "INVALID_CARDNO": "无效的收款银行卡号",
  48. "INVALID_IDENTITY_TYPE": "收款方身份类型不匹配",
  49. "NO_AGREEMENT": "无转账权限,请联系管理员",
  50. "PAYEE_CARD_INFO_ERROR": "收款方账号或银行卡信息有误,请核实",
  51. "PAYEE_NOT_EXIST": "收款账号不存在或姓名有误",
  52. "PAYER_BALANCE_NOT_ENOUGH": "付款方余额不足,建议充值",
  53. "REQUEST_PROCESSING": "系统处理中,请稍后重试",
  54. "TRANS_AUTH_NO_EXIST": "转账授权协议不存在,请先签约",
  55. }
  56. def _parse_dt(val: str | None) -> datetime | None:
  57. """解析支付宝日期字符串"""
  58. if not val:
  59. return None
  60. for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
  61. try:
  62. return datetime.strptime(val, fmt)
  63. except ValueError:
  64. continue
  65. return None
  66. class AccountService:
  67. """资金专户服务层"""
  68. @classmethod
  69. async def stat_transfer_amount_service(
  70. cls,
  71. auth: AuthSchema,
  72. tenant_id: Optional[int] = None,
  73. enterprise_id: Optional[str] = None,
  74. start_date: Optional[datetime] = None,
  75. end_date: Optional[datetime] = None,
  76. payee_type: Optional[str] = None,
  77. ) -> Decimal:
  78. """
  79. 统计转账金额(✅)
  80. 统计企业在指定时间范围内的转账总金额以及每天的转账金额。
  81. """
  82. crud = TransferCRUD(auth)
  83. return await crud.get_transfer_amount(
  84. tenant_id=tenant_id,
  85. enterprise_id=enterprise_id,
  86. start_date=start_date,
  87. end_date=end_date,
  88. payee_type=payee_type,
  89. )
  90. @classmethod
  91. async def stat_consume_amount_service(
  92. cls,
  93. auth: AuthSchema,
  94. tenant_id: Optional[int] = None,
  95. enterprise_id: Optional[str] = None,
  96. start_date: Optional[datetime] = None,
  97. end_date: Optional[datetime] = None,
  98. ) -> Decimal:
  99. """
  100. 统计消费金额(✅)
  101. 统计企业在指定时间范围内的消费总金额。
  102. 数据来源: pay_bill 表,consume_type=CONSUME, status=PROCESSED
  103. """
  104. from app.plugin.module_payment.notification.crud import BillCRUD
  105. crud = BillCRUD(auth)
  106. return await crud.get_consume_amount(
  107. tenant_id=tenant_id,
  108. enterprise_id=enterprise_id,
  109. start_date=start_date,
  110. end_date=end_date,
  111. )
  112. @classmethod
  113. async def stat_summary_amount_service(
  114. cls,
  115. auth: AuthSchema,
  116. tenant_id: Optional[int] = None,
  117. enterprise_id: Optional[str] = None,
  118. start_date: Optional[datetime] = None,
  119. end_date: Optional[datetime] = None,
  120. payee_type: Optional[str] = None,
  121. ) -> Decimal:
  122. """
  123. 汇总统计金额(✅)
  124. 汇总 = 消费统计 + 转账统计,对应时间段结果相加
  125. payee_type 仅过滤转账部分
  126. """
  127. transfer_amount = await cls.stat_transfer_amount_service(
  128. auth=auth,
  129. tenant_id=tenant_id,
  130. enterprise_id=enterprise_id,
  131. start_date=start_date,
  132. end_date=end_date,
  133. payee_type=payee_type,
  134. )
  135. consume_amount = await cls.stat_consume_amount_service(
  136. auth=auth,
  137. tenant_id=tenant_id,
  138. enterprise_id=enterprise_id,
  139. start_date=start_date,
  140. end_date=end_date,
  141. )
  142. return transfer_amount + consume_amount
  143. @classmethod
  144. async def authorize_apply_service(
  145. cls,
  146. auth: AuthSchema,
  147. data: AccountAuthorizeApplySchema
  148. ) -> AccountAuthorizeApplyOutSchema:
  149. """
  150. 申请转账授权签约(✅)
  151. 调用: alipay.commerce.ec.trans.authorize.apply
  152. """
  153. from alipay.aop.api.request.AlipayCommerceEcTransAuthorizeApplyRequest import (
  154. AlipayCommerceEcTransAuthorizeApplyRequest,
  155. )
  156. from alipay.aop.api.domain.AlipayCommerceEcTransAuthorizeApplyModel import (
  157. AlipayCommerceEcTransAuthorizeApplyModel,
  158. )
  159. from alipay.aop.api.response.AlipayCommerceEcTransAuthorizeApplyResponse import (
  160. AlipayCommerceEcTransAuthorizeApplyResponse,
  161. )
  162. model = AlipayCommerceEcTransAuthorizeApplyModel()
  163. model.enterprise_id = data.enterprise_id
  164. request = AlipayCommerceEcTransAuthorizeApplyRequest()
  165. request.biz_model = model
  166. client = AlipayClient.get_client()
  167. response = client.execute(request)
  168. if not response:
  169. raise CustomException(msg="申请转账授权失败: 无响应")
  170. result = AlipayCommerceEcTransAuthorizeApplyResponse()
  171. result.parse_response_content(response)
  172. if not result.is_success():
  173. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  174. raise CustomException(msg=f"申请转账授权失败: {result.msg}")
  175. return AccountAuthorizeApplyOutSchema(
  176. sign_url=result.sign_url,
  177. )
  178. @classmethod
  179. async def create_account_service(
  180. cls,
  181. auth: AuthSchema,
  182. data: AccountCreateSchema
  183. ) -> AccountOperationOutSchema:
  184. """
  185. 开通资金专户(✅)
  186. 调用: alipay.commerce.ec.trans.account.create
  187. """
  188. from alipay.aop.api.request.AlipayCommerceEcTransAccountCreateRequest import (
  189. AlipayCommerceEcTransAccountCreateRequest,
  190. )
  191. from alipay.aop.api.domain.AlipayCommerceEcTransAccountCreateModel import (
  192. AlipayCommerceEcTransAccountCreateModel,
  193. )
  194. from alipay.aop.api.response.AlipayCommerceEcTransAccountCreateResponse import (
  195. AlipayCommerceEcTransAccountCreateResponse,
  196. )
  197. model = AlipayCommerceEcTransAccountCreateModel()
  198. model.enterprise_id = data.enterprise_id
  199. # model.account_type = data.account_type or "ALL" # 收支全能户
  200. # model.scene = data.scene or "B2B_TRANS" # ToB转账场景
  201. model.account_type = "ALL"
  202. model.scene = "B2B_TRANS"
  203. request = AlipayCommerceEcTransAccountCreateRequest()
  204. request.biz_model = model
  205. client = AlipayClient.get_client()
  206. response = client.execute(request)
  207. if not response:
  208. raise CustomException(msg="开通资金专户失败: 无响应")
  209. result = AlipayCommerceEcTransAccountCreateResponse()
  210. result.parse_response_content(response)
  211. if not result.is_success():
  212. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  213. raise CustomException(msg=f"开通资金专户失败: {result.msg}")
  214. account_data = AccountCreateSchema(
  215. enterprise_id=model.enterprise_id,
  216. account_book_id=result.account_book_id,
  217. account_type=model.account_type,
  218. scene=model.scene,
  219. )
  220. if result.account_book_id:
  221. account_data.account_book_id = result.account_book_id
  222. await AccountCRUD(auth).create(account_data)
  223. return AccountOperationOutSchema(
  224. enterprise_id=account_data.enterprise_id,
  225. account_book_id=account_data.account_book_id,
  226. )
  227. @classmethod
  228. async def deposit_service(
  229. cls,
  230. auth: AuthSchema,
  231. data: AccountDepositSchema
  232. ) -> AccountDepositOutSchema:
  233. """
  234. 资金专户充值(✅)
  235. 调用: alipay.commerce.ec.trans.account.deposit
  236. """
  237. from alipay.aop.api.request.AlipayCommerceEcTransAccountDepositRequest import (
  238. AlipayCommerceEcTransAccountDepositRequest,
  239. )
  240. from alipay.aop.api.domain.AlipayCommerceEcTransAccountDepositModel import (
  241. AlipayCommerceEcTransAccountDepositModel,
  242. )
  243. from alipay.aop.api.response.AlipayCommerceEcTransAccountDepositResponse import (
  244. AlipayCommerceEcTransAccountDepositResponse,
  245. )
  246. model = AlipayCommerceEcTransAccountDepositModel()
  247. model.enterprise_id = data.enterprise_id
  248. model.account_book_id = data.account_book_id
  249. model.amount = str(data.amount)
  250. model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
  251. request = AlipayCommerceEcTransAccountDepositRequest()
  252. request.biz_model = model
  253. client = AlipayClient.get_client()
  254. response = client.execute(request)
  255. if not response:
  256. raise CustomException(msg="充值失败: 无响应")
  257. result = AlipayCommerceEcTransAccountDepositResponse()
  258. result.parse_response_content(response)
  259. if not result.is_success():
  260. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  261. raise CustomException(msg=f"充值失败: {result.msg}")
  262. deposit_crud = DepositCRUD(auth)
  263. deposit_data = {
  264. "enterprise_id": data.enterprise_id,
  265. "out_biz_no": model.out_biz_no,
  266. "account_book_id": data.account_book_id,
  267. "amount": data.amount,
  268. "url": result.url,
  269. "status": DepositStatusEnum.DEALING.value,
  270. "remark": data.remark,
  271. }
  272. await deposit_crud.create(deposit_data)
  273. return AccountDepositOutSchema(
  274. url=result.url,
  275. )
  276. @classmethod
  277. async def transfer_service(
  278. cls,
  279. auth: AuthSchema,
  280. data: AccountTransferSchema
  281. ) -> AccountTransferOutSchema:
  282. """
  283. 资金专户转账(✅)
  284. 调用: alipay.commerce.ec.trans.account.transfer
  285. """
  286. from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import (
  287. AlipayCommerceEcTransAccountTransferRequest,
  288. )
  289. from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import (
  290. AlipayCommerceEcTransAccountTransferModel,
  291. )
  292. from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import (
  293. AlipayCommerceEcTransAccountTransferResponse,
  294. )
  295. from alipay.aop.api.domain.TransParticipant import (
  296. TransParticipant,
  297. )
  298. from alipay.aop.api.domain.BankCardExtInfoDTO import (
  299. BankCardExtInfoDTO,
  300. )
  301. # 检查资金专户是否存在
  302. account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id)
  303. if not account:
  304. raise CustomException(msg="资金账户不存在")
  305. if account.tenant_id != auth.tenant_id:
  306. raise CustomException(msg="无权限操作")
  307. if data.enterprise_id and account.enterprise_id != data.enterprise_id:
  308. raise CustomException(msg="参数错误")
  309. if not data.order_title and account.enterprise_id:
  310. enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id)
  311. if not enterprise:
  312. raise CustomException(msg="资金账户所属企业不存在")
  313. data.order_title = f"来自{enterprise.name}转账"
  314. model = AlipayCommerceEcTransAccountTransferModel()
  315. model.enterprise_id = account.enterprise_id
  316. model.account_book_id = account.account_book_id
  317. model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
  318. # 转账总金额,单位为元,精确到小数点后两位
  319. model.amount = str(data.amount)
  320. model.order_title = data.order_title
  321. payee_info = TransParticipant()
  322. payee_info.identity_type = data.payee_info.identity_type
  323. payee_info.name = data.payee_info.name
  324. payee_info.identity = data.payee_info.identity
  325. if data.payee_info.bankcard_ext_info:
  326. payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict(
  327. data.payee_info.bankcard_ext_info.model_dump(exclude_none=True)
  328. )
  329. model.payee_info = payee_info
  330. if data.remark:
  331. model.remark = data.remark
  332. request = AlipayCommerceEcTransAccountTransferRequest()
  333. request.biz_model = model
  334. client = AlipayClient.get_client()
  335. response = client.execute(request)
  336. if not response:
  337. raise CustomException(msg="转账失败: 无响应")
  338. result = AlipayCommerceEcTransAccountTransferResponse()
  339. result.parse_response_content(response)
  340. sub_code = getattr(result, 'sub_code', '') or ''
  341. sub_msg = getattr(result, 'sub_msg', '') or ''
  342. # 构建转账记录数据,但延迟写入:
  343. # - 成功时在当前会话写入
  344. # - 失败时使用独立事务写入并提交,避免被外层回滚吞掉
  345. transfer_crud = TransferCRUD(auth)
  346. transfer_data = {
  347. "enterprise_id": model.enterprise_id,
  348. "out_biz_no": model.out_biz_no,
  349. "account_book_id": model.account_book_id,
  350. "amount": model.amount,
  351. "order_title": model.order_title,
  352. "payee_info": data.payee_info.model_dump() if data.payee_info else None,
  353. "payee_type": data.payee_info.identity_type if data.payee_info else None,
  354. "status": result.status,
  355. "order_no": result.order_no,
  356. "fund_order_id": result.fund_order_id,
  357. "remark": "",
  358. }
  359. if result.status == TransferStatusEnum.DEALING.value:
  360. transfer_data["retry_count"] = 0
  361. transfer_data["next_retry_at"] = datetime.now() + timedelta(minutes=5)
  362. log.info(f"记录转账: {transfer_data}")
  363. if not result.is_success():
  364. # 优先用 sub_code 匹配
  365. hint = _TRANSFER_ERROR_HINTS.get(sub_code)
  366. # sub_code 无匹配时,尝试从 sub_msg 中提取错误码(支付宝部分接口sub_code返回unknown-sub-code)
  367. if not hint:
  368. for code_key, code_hint in _TRANSFER_ERROR_HINTS.items():
  369. if code_key in sub_msg:
  370. hint = code_hint
  371. break
  372. hint = hint or sub_msg or result.msg or "转账失败"
  373. log.error(f"支付宝接口调用失败: {result.code} - {result.msg} (sub_code={sub_code}, sub_msg={sub_msg})")
  374. # 使用独立的 session/事务保证失败记录能被持久化
  375. from app.core.database import async_db_session
  376. async with async_db_session() as _session:
  377. async with _session.begin():
  378. new_auth = AuthSchema(db=_session, check_data_scope=False)
  379. # 保持 tenant_id
  380. new_auth.tenant_id = getattr(auth, "tenant_id", None)
  381. transfer_data["status"]="FAIL"
  382. transfer_data["remark"]=f"{result.msg} ({sub_code} {sub_msg})"
  383. await TransferCRUD(new_auth).create(transfer_data)
  384. raise CustomException(msg=f"转账失败: {hint}")
  385. # 成功时写入当前会话
  386. await transfer_crud.create(transfer_data)
  387. return AccountTransferOutSchema(
  388. status=result.status,
  389. order_no=result.order_no,
  390. fund_order_id=result.fund_order_id,
  391. out_biz_no=model.out_biz_no,
  392. )
  393. @classmethod
  394. async def tenant_transfer_service(
  395. cls,
  396. auth: AuthSchema,
  397. tenant_id: int,
  398. data: TenantTransferCreate,
  399. request_ip: str,
  400. api_key_id: int | None = None,
  401. ) -> TenantTransferResponse:
  402. """
  403. 租户API转账(通过API Key认证)
  404. 调用: alipay.commerce.ec.trans.account.transfer
  405. """
  406. from alipay.aop.api.request.AlipayCommerceEcTransAccountTransferRequest import (
  407. AlipayCommerceEcTransAccountTransferRequest,
  408. )
  409. from alipay.aop.api.domain.AlipayCommerceEcTransAccountTransferModel import (
  410. AlipayCommerceEcTransAccountTransferModel,
  411. )
  412. from alipay.aop.api.response.AlipayCommerceEcTransAccountTransferResponse import (
  413. AlipayCommerceEcTransAccountTransferResponse,
  414. )
  415. from alipay.aop.api.domain.TransParticipant import (
  416. TransParticipant,
  417. )
  418. from alipay.aop.api.domain.BankCardExtInfoDTO import (
  419. BankCardExtInfoDTO,
  420. )
  421. # 检查资金专户是否存在
  422. account = await AccountCRUD(auth).get_by_account_book_id(data.account_book_id)
  423. if not account:
  424. raise CustomException(msg="资金账户不存在")
  425. if account.tenant_id != tenant_id:
  426. raise CustomException(msg="无权限操作")
  427. if data.enterprise_id and account.enterprise_id != data.enterprise_id:
  428. raise CustomException(msg="参数错误")
  429. if not data.order_title and account.enterprise_id:
  430. enterprise = await EnterpriseCRUD(auth).get_by_enterprise_id(account.enterprise_id)
  431. if not enterprise:
  432. raise CustomException(msg="资金账户所属企业不存在")
  433. data.order_title = f"来自{enterprise.name}转账"
  434. model = AlipayCommerceEcTransAccountTransferModel()
  435. model.enterprise_id = account.enterprise_id
  436. model.account_book_id = account.account_book_id
  437. model.out_biz_no = get_snowflake_id_str(tenant_id)
  438. # 转账总金额,单位为元,精确到小数点后两位
  439. model.amount = str(data.amount)
  440. model.order_title = data.order_title
  441. payee_info = TransParticipant()
  442. payee_info.identity_type = data.payee_info.identity_type
  443. payee_info.name = data.payee_info.name
  444. payee_info.identity = data.payee_info.identity
  445. if data.payee_info.bankcard_ext_info:
  446. payee_info.bankcard_ext_info = BankCardExtInfoDTO.from_alipay_dict(
  447. data.payee_info.bankcard_ext_info.model_dump(exclude_none=True)
  448. )
  449. model.payee_info = payee_info
  450. if data.remark:
  451. model.remark = data.remark
  452. request = AlipayCommerceEcTransAccountTransferRequest()
  453. request.biz_model = model
  454. client = AlipayClient.get_client()
  455. response = client.execute(request)
  456. if not response:
  457. raise CustomException(msg="转账失败: 无响应")
  458. result = AlipayCommerceEcTransAccountTransferResponse()
  459. result.parse_response_content(response)
  460. if not result.is_success():
  461. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  462. raise CustomException(msg=f"转账失败: {result.sub_msg or result.msg or result.code}")
  463. transfer_crud = TransferCRUD(auth)
  464. transfer_data = {
  465. "enterprise_id": model.enterprise_id,
  466. "out_biz_no": model.out_biz_no,
  467. "account_book_id": model.account_book_id,
  468. "amount": model.amount,
  469. "order_title": model.order_title,
  470. "payee_info": data.payee_info.model_dump() if data.payee_info else None,
  471. "payee_type": data.payee_info.identity_type if data.payee_info else None,
  472. "status": result.status,
  473. "order_no": result.order_no,
  474. "fund_order_id": result.fund_order_id,
  475. }
  476. if result.status == TransferStatusEnum.DEALING.value:
  477. transfer_data["retry_count"] = 0
  478. transfer_data["next_retry_at"] = datetime.now() + timedelta(minutes=5)
  479. await transfer_crud.create(transfer_data)
  480. return TenantTransferResponse(
  481. status=result.status,
  482. order_no=result.order_no,
  483. fund_order_id=result.fund_order_id,
  484. )
  485. @classmethod
  486. async def withdraw_service(
  487. cls,
  488. auth: AuthSchema,
  489. data: AccountWithdrawSchema
  490. ) -> AccountOperationOutSchema:
  491. """
  492. 资金专户提现
  493. 调用: alipay.commerce.ec.trans.account.withdraw
  494. 接口文档: https://opendocs.alipay.com/pre-open/d651859b_alipay.commerce.ec.trans.account.withdraw
  495. 参数说明:
  496. - enterprise_id: 企业ID
  497. - account_book_id: 资金专户号
  498. - amount: 提现金额
  499. - out_biz_no: 商家侧订单号(唯一)
  500. """
  501. from alipay.aop.api.request.AlipayCommerceEcTransAccountWithdrawRequest import (
  502. AlipayCommerceEcTransAccountWithdrawRequest,
  503. )
  504. from alipay.aop.api.domain.AlipayCommerceEcTransAccountWithdrawModel import (
  505. AlipayCommerceEcTransAccountWithdrawModel,
  506. )
  507. from alipay.aop.api.response.AlipayCommerceEcTransAccountWithdrawResponse import (
  508. AlipayCommerceEcTransAccountWithdrawResponse,
  509. )
  510. crud = AccountCRUD(auth)
  511. enterprise = await crud.get_by_enterprise_id(data.enterprise_id)
  512. if not enterprise:
  513. raise CustomException(msg="企业不存在")
  514. model = AlipayCommerceEcTransAccountWithdrawModel()
  515. model.enterprise_id = enterprise.enterprise_id
  516. model.account_book_id = data.account_book_id
  517. model.amount = str(data.amount)
  518. model.out_biz_no = get_snowflake_id_str(auth.tenant_id)
  519. if data.remark:
  520. model.remark = data.remark
  521. request = AlipayCommerceEcTransAccountWithdrawRequest()
  522. request.biz_model = model
  523. client = AlipayClient.get_client()
  524. response = client.execute(request)
  525. if not response:
  526. raise CustomException(msg="提现失败: 无响应")
  527. result = AlipayCommerceEcTransAccountWithdrawResponse()
  528. result.parse_response_content(response)
  529. if not result.is_success():
  530. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  531. raise CustomException(msg=f"提现失败: {result.msg}")
  532. withdraw_crud = WithdrawCRUD(auth)
  533. withdraw_data = {
  534. "enterprise_id": data.enterprise_id,
  535. "out_biz_no": model.out_biz_no,
  536. "account_book_id": data.account_book_id,
  537. "amount": data.amount,
  538. # 专户提现到余额户是同步操作,要么执行成功,要么执行异常,
  539. # 出参status设计多余,遵循规范使用业务码区分成功与失败
  540. "status": WithdrawStatusEnum.SUCCESS.value,
  541. "order_no": result.order_no,
  542. }
  543. await withdraw_crud.create(withdraw_data)
  544. log.info(f"资金专户提现发起成功: 企业: {data.enterprise_id}, 金额: {data.amount}")
  545. return AccountOperationOutSchema(
  546. enterprise_id=data.enterprise_id,
  547. account_book_id=data.account_book_id,
  548. )
  549. @classmethod
  550. async def query_account_service(
  551. cls,
  552. auth: AuthSchema,
  553. data: AccountQuerySchema
  554. ) -> list[Any]:
  555. """
  556. 查询资金专户(调用支付宝接口)
  557. 调用: alipay.commerce.ec.trans.account.query
  558. """
  559. from alipay.aop.api.request.AlipayCommerceEcTransAccountQueryRequest import (
  560. AlipayCommerceEcTransAccountQueryRequest,
  561. )
  562. from alipay.aop.api.domain.AlipayCommerceEcTransAccountQueryModel import (
  563. AlipayCommerceEcTransAccountQueryModel,
  564. )
  565. from alipay.aop.api.response.AlipayCommerceEcTransAccountQueryResponse import (
  566. AlipayCommerceEcTransAccountQueryResponse,
  567. )
  568. from alipay.aop.api.domain.FundAccountApiDTO import (
  569. FundAccountApiDTO,
  570. )
  571. model = AlipayCommerceEcTransAccountQueryModel()
  572. model.enterprise_id = data.enterprise_id
  573. request = AlipayCommerceEcTransAccountQueryRequest()
  574. request.biz_model = model
  575. client = AlipayClient.get_client()
  576. response = client.execute(request)
  577. if not response:
  578. raise CustomException(msg="查询资金专户失败: 无响应")
  579. result = AlipayCommerceEcTransAccountQueryResponse()
  580. result.parse_response_content(response)
  581. if not result.is_success():
  582. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  583. raise CustomException(msg=f"查询资金专户失败: {result.msg}")
  584. collect = []
  585. for v in list(result.account_list or []):
  586. if not hasattr(v, "account_book_id"):
  587. continue
  588. if not hasattr(v, "scene") or v.scene != "B2B_TRANS":
  589. continue
  590. account = FundAccountApiDTO.to_alipay_dict(v)
  591. collect.append(account)
  592. return collect
  593. @classmethod
  594. async def transfer_detail_service(
  595. cls,
  596. auth: AuthSchema,
  597. out_biz_no: str
  598. ) -> TransferOutSchema:
  599. """
  600. 查询转账记录详情
  601. """
  602. crud = TransferCRUD(auth)
  603. transfer = await crud.get_by_out_biz_no(out_biz_no)
  604. if not transfer:
  605. raise CustomException(msg="转账记录不存在")
  606. transfer_result = TransferOutSchema.model_validate(transfer)
  607. # 查询三方订单号
  608. open_transfer_crud = OpenTransferCRUD(auth)
  609. open_transfer_data = await open_transfer_crud.get(out_biz_no=transfer.out_biz_no)
  610. if open_transfer_data:
  611. transfer_result.third_biz_no = open_transfer_data.third_biz_no
  612. return transfer_result
  613. @classmethod
  614. async def transfer_list_service(
  615. cls,
  616. auth: AuthSchema,
  617. page_no: int = 1,
  618. page_size: int = 20,
  619. search: dict | None = None,
  620. ) -> dict:
  621. """
  622. 查询转账记录列表
  623. """
  624. log.info(f"查询转账记录列表: {page_no}, {page_size}, {search}")
  625. crud = TransferCRUD(auth)
  626. offset = (page_no - 1) * page_size
  627. return await crud.page(
  628. offset=offset,
  629. limit=page_size,
  630. order_by=[{"id": "desc"}],
  631. search=search or {},
  632. out_schema=TransferListOutSchema,
  633. )
  634. @classmethod
  635. async def transfer_export_service(
  636. cls,
  637. auth: AuthSchema,
  638. start_time: str,
  639. end_time: str,
  640. enterprise_id: Optional[str] = None,
  641. ) -> bytes:
  642. """
  643. 导出转账记录报表为Excel文件
  644. """
  645. log.info(f"导出转账记录报表: {start_time} -> {end_time}")
  646. crud = TransferCRUD(auth)
  647. search = {
  648. "created_time__gte": start_time,
  649. "created_time__lte": end_time,
  650. }
  651. if enterprise_id:
  652. search["enterprise_id"] = enterprise_id
  653. records = await crud.list(
  654. search=search,
  655. order_by=[{"id": "desc"}],
  656. )
  657. from app.utils.excel_util import ExcelUtil
  658. status_map = {
  659. "DEALING": "处理中",
  660. "SUCCESS": "成功",
  661. "FAIL": "失败",
  662. "REFUND": "退票",
  663. }
  664. payee_type_map = {
  665. "ALIPAY_ACCOUNT": "支付宝账户",
  666. "BANK_CARD": "银行卡",
  667. }
  668. list_data = []
  669. for i, record in enumerate(records, start=1):
  670. payee_info = record.payee_info or {}
  671. list_data.append({
  672. "序号": i,
  673. "订单号": record.out_biz_no or "",
  674. "商户订单号": record.order_no or "",
  675. "金额(元)": str(record.amount or 0),
  676. "收款方姓名": payee_info.get("name", ""),
  677. "收款方类型": payee_type_map.get(payee_info.get("identity_type", ""), ""),
  678. "状态": status_map.get(record.status, record.status),
  679. "转账标题": record.order_title or "",
  680. "创建时间": record.created_time.strftime("%Y-%m-%d %H:%M:%S") if record.created_time else "",
  681. })
  682. mapping_dict = {
  683. "序号": "序号",
  684. "订单号": "订单号",
  685. "商户订单号": "商户订单号",
  686. "金额(元)": "金额(元)",
  687. "收款方姓名": "收款方姓名",
  688. "收款方类型": "收款方类型",
  689. "状态": "状态",
  690. "转账标题": "转账标题",
  691. "创建时间": "创建时间",
  692. }
  693. return ExcelUtil.export_list2excel(list_data, mapping_dict)
  694. @classmethod
  695. async def apply_receipt_service(
  696. cls,
  697. auth: AuthSchema,
  698. redis: Redis,
  699. data: ReceiptApplySchema,
  700. ) -> str:
  701. """
  702. 申请转账业务回单
  703. 调用: alipay.commerce.ec.trans.receipt.apply
  704. 参数:
  705. - enterprise_id: 企业ID
  706. - order_no: 支付宝转账单号
  707. 返回: file_id
  708. """
  709. from app.core.redis_crud import RedisCURD
  710. redis_crud = RedisCURD(redis)
  711. cache_key = f"receipt:{data.enterprise_id}:{data.order_no}"
  712. cached_file_id = await redis_crud.get(cache_key)
  713. if cached_file_id:
  714. log.info(f"使用缓存的 file_id: {cached_file_id}")
  715. return cached_file_id
  716. crud = EnterpriseCRUD(auth)
  717. enterprise = await crud.get_by_enterprise_id(data.enterprise_id)
  718. if not enterprise:
  719. raise CustomException(msg="企业不存在")
  720. from alipay.aop.api.request.AlipayCommerceEcTransReceiptApplyRequest import (
  721. AlipayCommerceEcTransReceiptApplyRequest,
  722. )
  723. from alipay.aop.api.domain.AlipayCommerceEcTransReceiptApplyModel import (
  724. AlipayCommerceEcTransReceiptApplyModel,
  725. )
  726. from alipay.aop.api.response.AlipayCommerceEcTransReceiptApplyResponse import (
  727. AlipayCommerceEcTransReceiptApplyResponse,
  728. )
  729. model = AlipayCommerceEcTransReceiptApplyModel()
  730. model.enterprise_id = data.enterprise_id
  731. model.order_no = data.order_no
  732. request = AlipayCommerceEcTransReceiptApplyRequest()
  733. request.biz_model = model
  734. client = AlipayClient.get_client()
  735. response = client.execute(request)
  736. if not response:
  737. raise CustomException(msg="申请回单失败: 无响应")
  738. result = AlipayCommerceEcTransReceiptApplyResponse()
  739. result.parse_response_content(response)
  740. if not result.is_success():
  741. # 清除缓存
  742. await redis_crud.delete(cache_key)
  743. raise CustomException(msg=f"申请回单失败: {result.msg}")
  744. file_id = str(result.file_id)
  745. await redis_crud.set(cache_key, file_id, expire=172800)
  746. log.info(f"申请回单成功: order_no={data.order_no}, file_id={file_id}")
  747. return file_id
  748. @classmethod
  749. async def query_receipt_service(cls, enterprise_id: str, file_id: str) -> dict:
  750. """
  751. 查询回单状态
  752. 调用: alipay.commerce.ec.trans.receipt.query
  753. 参数:
  754. - file_id: 文件申请号
  755. 返回: {file_id, status, download_url, error_message}
  756. """
  757. from alipay.aop.api.request.AlipayCommerceEcTransReceiptQueryRequest import (
  758. AlipayCommerceEcTransReceiptQueryRequest,
  759. )
  760. from alipay.aop.api.response.AlipayCommerceEcTransReceiptQueryResponse import (
  761. AlipayCommerceEcTransReceiptQueryResponse,
  762. )
  763. from alipay.aop.api.domain.AlipayCommerceEcTransReceiptQueryModel import (
  764. AlipayCommerceEcTransReceiptQueryModel,
  765. )
  766. model = AlipayCommerceEcTransReceiptQueryModel()
  767. model.enterprise_id = enterprise_id
  768. model.file_id = file_id
  769. request = AlipayCommerceEcTransReceiptQueryRequest()
  770. request.biz_model = model
  771. client = AlipayClient.get_client()
  772. response = client.execute(request)
  773. if not response:
  774. raise CustomException(msg="查询回单失败: 无响应")
  775. result = AlipayCommerceEcTransReceiptQueryResponse()
  776. result.parse_response_content(response)
  777. if not result.is_success():
  778. raise CustomException(msg=f"查询回单失败: {result.msg}")
  779. data = {
  780. "file_id": file_id,
  781. "status": result.status,
  782. "download_url": result.download_url,
  783. "error_message": result.error_message,
  784. }
  785. return data
  786. @classmethod
  787. async def transfer_sync_status_service(
  788. cls,
  789. auth: AuthSchema,
  790. data: "TransferSyncStatusSchema",
  791. ) -> dict:
  792. """
  793. 手动同步转账状态(管理员补录)
  794. 用于修复因通知丢失而卡在 DEALING 的转账记录
  795. """
  796. from app.plugin.module_payment.account.crud import TransferCRUD
  797. from app.plugin.module_payment.account.schema import TransferSyncStatusSchema
  798. crud = TransferCRUD(auth)
  799. transfer = await crud.get_by_out_biz_no(data.out_biz_no)
  800. if not transfer:
  801. raise CustomException(msg=f"转账记录不存在: {data.out_biz_no}")
  802. if transfer.status != "DEALING" and data.status == "SUCCESS":
  803. raise CustomException(msg=f"转账记录当前状态为 {transfer.status},无需同步")
  804. update_data = {"status": data.status}
  805. if data.error_code:
  806. update_data["error_code"] = data.error_code
  807. if data.error_msg:
  808. update_data["error_msg"] = data.error_msg
  809. for key, value in update_data.items():
  810. if hasattr(transfer, key):
  811. setattr(transfer, key, value)
  812. await auth.db.flush()
  813. await auth.db.refresh(transfer)
  814. log.info(f"手动同步转账状态成功: out_biz_no={data.out_biz_no}, {transfer.status}")
  815. return {
  816. "out_biz_no": transfer.out_biz_no,
  817. "status": transfer.status,
  818. "error_code": transfer.error_code,
  819. "error_msg": transfer.error_msg,
  820. }
  821. @classmethod
  822. async def transfer_sync_all_service(
  823. cls,
  824. auth: AuthSchema,
  825. ) -> dict:
  826. """
  827. 全量同步转账状态(使用 trans.order.query)
  828. """
  829. from sqlalchemy import select
  830. from app.plugin.module_payment.account.model import TransferModel
  831. from app.plugin.module_payment.account.enums import TransferStatusEnum
  832. stmt = select(TransferModel).where(
  833. TransferModel.status == TransferStatusEnum.DEALING.value,
  834. ).order_by(TransferModel.id.asc())
  835. result = await auth.db.execute(stmt)
  836. all_transfers = list(result.scalars().all())
  837. synced = 0
  838. errors = 0
  839. details = []
  840. for transfer in all_transfers:
  841. out_biz_no = transfer.out_biz_no
  842. eid = transfer.enterprise_id
  843. order_no = transfer.order_no or ""
  844. if not out_biz_no or not eid:
  845. continue
  846. try:
  847. new_status = await cls._sync_transfer_detail(auth, out_biz_no, eid, order_no)
  848. if new_status:
  849. synced += 1
  850. details.append({"out_biz_no": out_biz_no, "old_status": transfer.status, "new_status": new_status})
  851. else:
  852. details.append({"out_biz_no": out_biz_no, "status": transfer.status, "action": "no_change"})
  853. except Exception as e:
  854. errors += 1
  855. details.append({"out_biz_no": out_biz_no, "status": transfer.status, "error": str(e)})
  856. log.warning(f"全量同步 - 查询失败: out_biz_no={out_biz_no}, err={e}")
  857. if synced > 0:
  858. await auth.db.flush()
  859. return {
  860. "total": len(all_transfers),
  861. "synced": synced,
  862. "errors": errors,
  863. "details": details,
  864. }
  865. @classmethod
  866. async def _sync_transfer_detail(
  867. cls,
  868. auth: AuthSchema,
  869. out_biz_no: str,
  870. enterprise_id: str,
  871. order_no: str = "",
  872. ) -> str | None:
  873. """调 alipay.commerce.ec.trans.order.query 查询单笔转账状态并更新
  874. 有 out_biz_no 传 out_biz_no,有 order_no 也传 order_no(两个都传)
  875. 返回: 新状态str / None(不可查/无变化)
  876. """
  877. from sqlalchemy import update as sa_update
  878. from app.plugin.module_payment.account.model import TransferModel
  879. from app.core.alipay import AlipayClient
  880. try:
  881. from alipay.aop.api.request.AlipayCommerceEcTransOrderQueryRequest import (
  882. AlipayCommerceEcTransOrderQueryRequest,
  883. )
  884. from alipay.aop.api.domain.AlipayCommerceEcTransOrderQueryModel import (
  885. AlipayCommerceEcTransOrderQueryModel,
  886. )
  887. from alipay.aop.api.response.AlipayCommerceEcTransOrderQueryResponse import (
  888. AlipayCommerceEcTransOrderQueryResponse,
  889. )
  890. except ImportError:
  891. log.warning("SDK 不支持 trans.order.query")
  892. return None
  893. model = AlipayCommerceEcTransOrderQueryModel()
  894. model.enterprise_id = enterprise_id
  895. if out_biz_no:
  896. model.out_biz_no = out_biz_no
  897. if order_no:
  898. model.order_no = order_no
  899. request = AlipayCommerceEcTransOrderQueryRequest()
  900. request.biz_model = model
  901. client = AlipayClient.get_client()
  902. response = client.execute(request)
  903. if not response:
  904. return None
  905. result = AlipayCommerceEcTransOrderQueryResponse()
  906. result.parse_response_content(response)
  907. if not result.is_success():
  908. sub_code = getattr(result, 'sub_code', '') or ''
  909. sub_msg = getattr(result, 'sub_msg', '') or ''
  910. if sub_code == "ORDER_NOT_EXIST":
  911. return None
  912. log.warning(f"trans.order.query 失败: out_biz_no={out_biz_no}, sub_code={sub_code}, sub_msg={sub_msg}")
  913. return None
  914. alipay_status = getattr(result, 'status', None)
  915. if not alipay_status or alipay_status == "DEALING":
  916. return None
  917. update_data = {"status": alipay_status}
  918. api_order_no = getattr(result, 'order_no', None)
  919. api_amount = getattr(result, 'amount', None)
  920. api_fund_order_id = getattr(result, 'pay_fund_order_id', None)
  921. if api_order_no:
  922. update_data["order_no"] = api_order_no
  923. if api_fund_order_id:
  924. update_data["fund_order_id"] = api_fund_order_id
  925. if api_amount:
  926. update_data["amount"] = Decimal(str(api_amount))
  927. if alipay_status == "FAIL":
  928. sub_msg = getattr(result, 'sub_msg', '') or getattr(result, 'msg', '') or ''
  929. if sub_msg:
  930. update_data["remark"] = sub_msg
  931. upd = sa_update(TransferModel).where(
  932. TransferModel.out_biz_no == out_biz_no
  933. ).values(**update_data)
  934. await auth.db.execute(upd)
  935. log.info(f"转账同步(trans.order.query) - out_biz_no={out_biz_no}, status={alipay_status}")
  936. return alipay_status
  937. # ─────────── 退避时间映射 ───────────
  938. _RETRY_INTERVALS = [5, 10, 45, 60] # 累计: +5min, +15min, +1h, +2h
  939. @classmethod
  940. async def retry_dealing_transfers(cls) -> dict:
  941. """定时任务:反查处理中的到卡转账结果(DB 锁防多 worker 重复执行)
  942. 启动时立即执行一次,之后每分钟执行一次。使用独立数据库会话。
  943. """
  944. from sqlalchemy import select, text
  945. from app.plugin.module_payment.account.model import TransferModel
  946. from app.plugin.module_payment.account.enums import TransferStatusEnum
  947. from app.core.database import async_db_session
  948. db = async_db_session()
  949. try:
  950. async with db.begin():
  951. # 抢 PostgreSQL advisory lock
  952. lock = await db.execute(text("SELECT pg_try_advisory_lock(99999)"))
  953. if not lock.scalar():
  954. log.debug("[重试任务] 锁已被其他 worker 持有,跳过")
  955. return {"scanned": 0, "processed": 0, "skipped_lock": True}
  956. try:
  957. log.info("[重试任务] ====== 开始执行 ======")
  958. now = datetime.now()
  959. stmt = (
  960. select(TransferModel)
  961. .where(
  962. TransferModel.status == TransferStatusEnum.DEALING.value,
  963. TransferModel.retry_count < 4,
  964. TransferModel.next_retry_at.isnot(None),
  965. TransferModel.next_retry_at <= now,
  966. )
  967. .order_by(TransferModel.next_retry_at.asc())
  968. )
  969. result = await db.execute(stmt)
  970. pending = list(result.scalars().all())
  971. if not pending:
  972. return {"scanned": 0, "processed": 0}
  973. log.info(f"[重试任务] 扫描到 {len(pending)} 条")
  974. processed = 0
  975. for transfer in pending:
  976. out_biz_no = transfer.out_biz_no
  977. enterprise_id = transfer.enterprise_id
  978. order_no = transfer.order_no or ""
  979. retry_count = transfer.retry_count
  980. try:
  981. sync_result = await cls._sync_transfer_detail(
  982. AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id),
  983. out_biz_no, enterprise_id, order_no,
  984. )
  985. except Exception as e:
  986. log.warning(f"[重试任务] trans.order.query 异常: out_biz_no={out_biz_no}, err={e}")
  987. continue
  988. if sync_result: # SUCCESS 或 FAIL
  989. transfer.retry_count = 4
  990. transfer.next_retry_at = None
  991. log.info(f"[重试任务] out_biz_no={out_biz_no}, retry={retry_count+1}/4, result={sync_result}, 已更新")
  992. try:
  993. auth = AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id)
  994. await OpenTransferService.open_return_service(auth, order_no or out_biz_no)
  995. except Exception:
  996. pass
  997. else:
  998. # 区分退避策略:对卡转账走多次退避,支付宝转账仅重试1次
  999. is_card_transfer = transfer.payee_type == "BANK_CARD"
  1000. max_retries = 3 if is_card_transfer else 0
  1001. if retry_count < max_retries:
  1002. accumulated = sum(cls._RETRY_INTERVALS[:retry_count + 2])
  1003. transfer.retry_count = retry_count + 1
  1004. transfer.next_retry_at = transfer.created_time + timedelta(minutes=accumulated)
  1005. log.info(f"[重试任务] out_biz_no={out_biz_no}, payee_type={transfer.payee_type}, retry={retry_count+1}/{max_retries+1}, result=DEALING, 下次={transfer.next_retry_at}")
  1006. processed += 1
  1007. else:
  1008. transfer.status = TransferStatusEnum.FAIL.value
  1009. transfer.retry_count = 4
  1010. transfer.next_retry_at = None
  1011. fail_reason = "反查超时判定为失败" if is_card_transfer else "支付宝转账仅重试1次仍DEALING"
  1012. transfer.error_msg = fail_reason
  1013. processed += 1
  1014. log.info(f"[重试任务] out_biz_no={out_biz_no}, payee_type={transfer.payee_type}, retry={retry_count+1}/{max_retries+1}次DEALING, 判FAIL: {fail_reason}")
  1015. try:
  1016. auth = AuthSchema(db=db, check_data_scope=False, tenant_id=transfer.tenant_id)
  1017. await OpenTransferService.open_return_service(auth, order_no or out_biz_no)
  1018. except Exception:
  1019. pass
  1020. # db.begin() 会自动 commit
  1021. log.info(f"[重试任务] ====== 执行完成 ====== 处理 {processed} 条,扫描 {len(pending)} 条")
  1022. return {"scanned": len(pending), "processed": processed}
  1023. finally:
  1024. await db.execute(text("SELECT pg_advisory_unlock(99999)"))
  1025. except Exception:
  1026. log.error("[重试任务] 执行异常", exc_info=True)
  1027. return {"scanned": 0, "processed": 0}
  1028. finally:
  1029. await db.close()
  1030. @classmethod
  1031. async def update_transfer_status_service(
  1032. cls,
  1033. auth: AuthSchema,
  1034. order_no: str,
  1035. status: str,
  1036. ext_info: dict = {}
  1037. ) -> None:
  1038. """
  1039. 更新转账状态(由通知处理器调用)
  1040. """
  1041. crud = TransferCRUD(auth)
  1042. transfer = await crud.get_by_order_no(order_no)
  1043. if not transfer:
  1044. log.warning(f"转账记录不存在: {order_no}")
  1045. return
  1046. update_data = {}
  1047. update_data["status"] = status
  1048. if ext_info:
  1049. update_data["ext_info"] = ext_info
  1050. await crud.update_by_order_no(order_no, update_data)
  1051. @classmethod
  1052. async def update_deposit_status_service(
  1053. cls,
  1054. auth: AuthSchema,
  1055. out_biz_no: str,
  1056. status: str,
  1057. ) -> None:
  1058. """
  1059. 更新充值状态(由通知处理器调用)
  1060. """
  1061. crud = DepositCRUD(auth)
  1062. deposit = await crud.get_by_out_biz_no(out_biz_no)
  1063. if not deposit:
  1064. log.warning(f"充值记录不存在: {out_biz_no}")
  1065. return
  1066. update_data = {"status": status}
  1067. await crud.update_by_out_biz_no(out_biz_no, update_data)
  1068. @classmethod
  1069. async def update_withdraw_status_service(
  1070. cls,
  1071. auth: AuthSchema,
  1072. out_biz_no: str,
  1073. status: str,
  1074. error_code: str | None = None,
  1075. error_msg: str | None = None,
  1076. ) -> None:
  1077. """
  1078. 更新提现状态(由通知处理器调用)
  1079. """
  1080. crud = WithdrawCRUD(auth)
  1081. withdraw = await crud.get_by_out_biz_no(out_biz_no)
  1082. if not withdraw:
  1083. log.warning(f"提现记录不存在: {out_biz_no}")
  1084. return
  1085. update_data = {"status": status}
  1086. if error_code:
  1087. update_data["error_code"] = error_code
  1088. if error_msg:
  1089. update_data["error_msg"] = error_msg
  1090. await crud.update_by_out_biz_no(out_biz_no, update_data)
  1091. @classmethod
  1092. async def consume_detail_query_service(
  1093. cls,
  1094. auth: AuthSchema,
  1095. pay_no: str,
  1096. enterprise_id: str | None = None,
  1097. ant_shop_id: str | None = None,
  1098. query_options: list[str] | None = None,
  1099. ) -> dict:
  1100. """
  1101. 账单详情查询(✅)
  1102. 调用: alipay.commerce.ec.consume.detail.query
  1103. 用于查询企业码账单详情,支持查询关联退款、订单、票据等信息。
  1104. """
  1105. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  1106. AlipayCommerceEcConsumeDetailQueryRequest,
  1107. )
  1108. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import (
  1109. AlipayCommerceEcConsumeDetailQueryModel,
  1110. )
  1111. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  1112. AlipayCommerceEcConsumeDetailQueryResponse,
  1113. )
  1114. model = AlipayCommerceEcConsumeDetailQueryModel()
  1115. model.pay_no = pay_no
  1116. if enterprise_id:
  1117. model.enterprise_id = enterprise_id
  1118. if ant_shop_id:
  1119. model.ant_shop_id = ant_shop_id
  1120. if query_options:
  1121. model.query_options = query_options
  1122. request = AlipayCommerceEcConsumeDetailQueryRequest()
  1123. request.biz_model = model
  1124. client = AlipayClient.get_client()
  1125. response = client.execute(request)
  1126. if not response:
  1127. raise CustomException(msg="账单详情查询失败: 无响应")
  1128. result = AlipayCommerceEcConsumeDetailQueryResponse()
  1129. result.parse_response_content(response)
  1130. if not result.is_success():
  1131. log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
  1132. raise CustomException(msg=f"账单详情查询失败: {result.msg}")
  1133. consume_info = result.consume_info
  1134. if not consume_info:
  1135. raise CustomException(msg="账单详情查询失败: 无账单信息")
  1136. return {
  1137. "account_id": consume_info.account_id,
  1138. "pay_no": consume_info.pay_no,
  1139. "consume_type": consume_info.consume_type,
  1140. "gmt_biz_create": consume_info.gmt_biz_create,
  1141. "consume_biz_type": consume_info.consume_biz_type,
  1142. "consume_amount": consume_info.consume_amount,
  1143. "order_complete_label": consume_info.order_complete_label,
  1144. "refund_status": consume_info.refund_status,
  1145. "refund_amount": consume_info.refund_amount,
  1146. "peer_payer_card_name": consume_info.peer_payer_card_name,
  1147. "user_id": getattr(consume_info, 'user_id', None),
  1148. "open_id": getattr(consume_info, 'open_id', None),
  1149. "enterprise_id": consume_info.enterprise_id,
  1150. "employee_id": consume_info.employee_id,
  1151. "enterprise_name": getattr(consume_info, 'enterprise_name', None),
  1152. "employee_name": getattr(consume_info, 'employee_name', None),
  1153. "consume_scene_code": getattr(consume_info, 'consume_scene_code', None),
  1154. "consume_type_sub_category": getattr(consume_info, 'consume_type_sub_category', None),
  1155. "consume_title": getattr(consume_info, 'consume_title', None),
  1156. "gmt_pay": getattr(consume_info, 'gmt_pay', None),
  1157. "gmt_refund": getattr(consume_info, 'gmt_refund', None),
  1158. "pay_amount": getattr(consume_info, 'pay_amount', None),
  1159. "invoice_amount": getattr(consume_info, 'invoice_amount', None),
  1160. "peer_pay_amount": getattr(consume_info, 'peer_pay_amount', None),
  1161. "subsidy_amount": getattr(consume_info, 'subsidy_amount', None),
  1162. "ext_infos": getattr(consume_info, 'ext_infos', None),
  1163. }