bill_handler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # ========== 标准消费/退款通知:落库 ==========
  48. if data.consume_type in ("CONSUME", "REFUND"):
  49. await self._save_bill_base(data, auth)
  50. try:
  51. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
  52. except Exception as e:
  53. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  54. detail = None
  55. if detail:
  56. await self._save_bill_detail(detail, auth)
  57. # 同步更新本地额度
  58. try:
  59. await self._sync_expense_quota(data, auth)
  60. except Exception as e:
  61. log.warning(f"同步额度失败(不影响主流程): {e}")
  62. # ========== 转账通知(企业码扩展) ==========
  63. elif data.consume_type == "TRANSFER":
  64. await AccountService.update_transfer_status_service(
  65. auth, data.pay_no, "SUCCESS", data.model_dump(exclude_none=True)
  66. )
  67. # 通知到达,清除重试字段(无需再反查)
  68. await cls._clear_transfer_retry(auth, data.pay_no)
  69. await OpenTransferService.open_return_service(auth, data.pay_no)
  70. else:
  71. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  72. return True
  73. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  74. """保存账单基础数据"""
  75. bill_crud = BillCRUD(auth)
  76. bill_data = {
  77. "enterprise_id": data.enterprise_id,
  78. "account_id": data.account_id,
  79. "employee_id": data.employee_id,
  80. "consume_type": data.consume_type,
  81. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  82. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  83. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  84. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  85. "notify_reason": data.notify_reason,
  86. "notify_msg": data.notify_msg,
  87. "related_pay_no": data.related_pay_no,
  88. "expense_rule_group_id": data.expense_rule_group_id,
  89. "expense_scene_code": data.expense_scene_code,
  90. "expense_type": data.expense_type,
  91. "status": "NEW",
  92. }
  93. await bill_crud.create_or_update(data.pay_no, bill_data)
  94. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  95. async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  96. """消费/退款时同步更新本地额度(pay_expense_quota)
  97. 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id,
  98. 然后按 employee_id + institution_id 更新对应额度的可用金额。
  99. """
  100. if not data.expense_rule_group_id:
  101. return
  102. expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0")
  103. if expense_amount <= 0:
  104. return
  105. # 查使用规则 → 找到制度ID
  106. from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
  107. from app.plugin.module_payment.expense.quota.model import QuotaModel
  108. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  109. from sqlalchemy import select, update as sa_update
  110. rule_stmt = select(ExpenseRuleModel).where(
  111. ExpenseRuleModel.rule_id == data.expense_rule_group_id
  112. )
  113. rule_result = await auth.db.execute(rule_stmt)
  114. rule = rule_result.scalar_one_or_none()
  115. if not rule:
  116. log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}")
  117. return
  118. institution_id = rule.institution_id
  119. if not institution_id:
  120. return
  121. # 查该员工在该制度下的额度
  122. quota_stmt = select(QuotaModel).where(
  123. QuotaModel.employee_id == data.employee_id,
  124. QuotaModel.institution_id == institution_id,
  125. )
  126. quota_result = await auth.db.execute(quota_stmt)
  127. quota = quota_result.scalar_one_or_none()
  128. if not quota:
  129. log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}")
  130. return
  131. # 更新可用额度
  132. if data.consume_type == "REFUND":
  133. # 退款:增加可用额度
  134. new_available = (quota.available_amount or Decimal("0")) + expense_amount
  135. else:
  136. # 消费:减少可用额度
  137. new_available = (quota.available_amount or Decimal("0")) - expense_amount
  138. if new_available < 0:
  139. new_available = Decimal("0")
  140. new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value
  141. upd = (
  142. sa_update(QuotaModel)
  143. .where(QuotaModel.id == quota.id)
  144. .values(available_amount=new_available, status=new_status)
  145. )
  146. await auth.db.execute(upd)
  147. await auth.db.flush()
  148. log.info(
  149. f"消费通知同步额度: employee_id={data.employee_id}, "
  150. f"institution_id={institution_id}, "
  151. f"consume_type={data.consume_type}, "
  152. f"amount={expense_amount}, "
  153. f"available_amount={new_available}, status={new_status}"
  154. )
  155. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  156. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  157. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  158. AlipayCommerceEcConsumeDetailQueryRequest,
  159. )
  160. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  161. AlipayCommerceEcConsumeDetailQueryResponse,
  162. )
  163. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  164. model = AlipayCommerceEcConsumeDetailQueryModel()
  165. model.pay_no = pay_no
  166. model.enterprise_id = enterprise_id
  167. request = AlipayCommerceEcConsumeDetailQueryRequest()
  168. request.biz_model = model
  169. client = AlipayClient.get_client()
  170. response = client.execute(request)
  171. if not response:
  172. log.error("查询账单详情失败: 无响应")
  173. return None
  174. result = AlipayCommerceEcConsumeDetailQueryResponse()
  175. result.parse_response_content(response)
  176. if not result.is_success():
  177. log.error(f"查询账单详情失败: {result.msg}")
  178. return None
  179. # 解析响应为字典
  180. return {
  181. "pay_no": getattr(result.consume_info, "pay_no", None),
  182. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  183. "employee_id": getattr(result.consume_info, "employee_id", None),
  184. "employee_name": getattr(result.consume_info, "employee_name", None),
  185. "consume_amount": getattr(result, "consume_amount", None),
  186. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  187. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  188. "order_info": self._parse_order_info(result),
  189. "voucher_list": self._parse_voucher_list(result),
  190. }
  191. def _parse_order_info(self, result) -> dict | None:
  192. """解析订单信息"""
  193. order_info = getattr(result, "order_info", None)
  194. if not order_info:
  195. return None
  196. return {
  197. "order_no": getattr(order_info, "order_no", None),
  198. "trade_no": getattr(order_info, "trade_no", None),
  199. "product_code": getattr(order_info, "product_code", None),
  200. "order_title": getattr(order_info, "order_title", None),
  201. "order_amount": getattr(order_info, "order_amount", None),
  202. "order_status": getattr(order_info, "order_status", None),
  203. "merchant_name": getattr(order_info, "merchant_name", None),
  204. "merchant_id": getattr(order_info, "merchant_id", None),
  205. "shop_name": getattr(order_info, "shop_name", None),
  206. "gmt_payment": getattr(order_info, "gmt_payment", None),
  207. "fund_channel": getattr(order_info, "fund_channel", None),
  208. }
  209. def _parse_voucher_list(self, result) -> list | None:
  210. """解析凭证列表"""
  211. voucher_list = getattr(result, "voucher_list", None)
  212. if not voucher_list:
  213. return None
  214. vouchers = []
  215. for voucher in voucher_list:
  216. vouchers.append({
  217. "voucher_id": getattr(voucher, "voucher_id", None),
  218. "voucher_type": getattr(voucher, "voucher_type", None),
  219. "voucher_status": getattr(voucher, "voucher_status", None),
  220. "invoice_code": getattr(voucher, "invoice_code", None),
  221. "invoice_no": getattr(voucher, "invoice_no", None),
  222. "invoice_amount": getattr(voucher, "invoice_amount", None),
  223. "tax_amount": getattr(voucher, "tax_amount", None),
  224. "issue_date": getattr(voucher, "issue_date", None),
  225. "check_code": getattr(voucher, "check_code", None),
  226. "pdf_url": getattr(voucher, "pdf_url", None),
  227. })
  228. return vouchers
  229. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  230. """保存账单详情和凭证数据"""
  231. bill_crud = BillCRUD(auth)
  232. pay_no = detail.get("pay_no")
  233. if not pay_no:
  234. return
  235. # 更新账单详情
  236. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  237. # 保存订单详情
  238. order_info = detail.get("order_info")
  239. if order_info:
  240. await self._save_order_detail(pay_no, order_info, auth)
  241. # 保存凭证列表
  242. voucher_list = detail.get("voucher_list")
  243. if voucher_list:
  244. await self._save_voucher_list(pay_no, voucher_list, auth)
  245. log.info(f"保存账单详情成功: pay_no={pay_no}")
  246. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  247. """保存订单详情"""
  248. order_crud = BillOrderCRUD(auth)
  249. order_no = order_info.get("order_no")
  250. if not order_no:
  251. return
  252. order_data = {
  253. "pay_no": pay_no,
  254. "trade_no": order_info.get("trade_no"),
  255. "product_code": order_info.get("product_code"),
  256. "order_title": order_info.get("order_title"),
  257. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  258. "order_status": order_info.get("order_status"),
  259. "merchant_name": order_info.get("merchant_name"),
  260. "merchant_id": order_info.get("merchant_id"),
  261. "shop_name": order_info.get("shop_name"),
  262. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  263. "fund_channel": order_info.get("fund_channel"),
  264. }
  265. await order_crud.create_or_update(order_no, order_data)
  266. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  267. """保存凭证列表"""
  268. voucher_crud = BillVoucherCRUD(auth)
  269. for voucher in voucher_list:
  270. voucher_id = voucher.get("voucher_id")
  271. if not voucher_id:
  272. continue
  273. voucher_data = {
  274. "pay_no": pay_no,
  275. "voucher_type": voucher.get("voucher_type"),
  276. "voucher_status": voucher.get("voucher_status"),
  277. "invoice_code": voucher.get("invoice_code"),
  278. "invoice_no": voucher.get("invoice_no"),
  279. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  280. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  281. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  282. "check_code": voucher.get("check_code"),
  283. "pdf_url": voucher.get("pdf_url"),
  284. }
  285. await voucher_crud.create_or_update(voucher_id, voucher_data)
  286. @staticmethod
  287. async def _clear_transfer_retry(auth: AuthSchema, pay_no: str) -> None:
  288. """通知到达时清除重试字段,终止反查"""
  289. from sqlalchemy import update as sa_update
  290. from app.plugin.module_payment.account.model import TransferModel
  291. try:
  292. upd = sa_update(TransferModel).where(
  293. TransferModel.order_no == pay_no
  294. ).values(next_retry_at=None, retry_count=4)
  295. await auth.db.execute(upd)
  296. except Exception:
  297. pass
  298. class VoucherHandler(BaseHandler[dict]):
  299. """凭证变动通知处理器"""
  300. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  301. """
  302. 处理凭证变动通知
  303. 流程:
  304. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  305. 2. 更新凭证信息
  306. """
  307. try:
  308. notify_data = VoucherChangeContent(**content)
  309. except Exception as e:
  310. log.error(f"解析凭证通知内容失败: {e}")
  311. return False
  312. action = notify_data.action
  313. voucher_id = notify_data.voucher_id
  314. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  315. try:
  316. return await self._process_voucher(notify_data, auth)
  317. except Exception as e:
  318. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  319. return False
  320. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  321. """处理凭证变动通知
  322. 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。
  323. """
  324. log.info(
  325. f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, "
  326. f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}"
  327. )
  328. if not data.pay_no:
  329. log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}")
  330. return True
  331. try:
  332. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth)
  333. except Exception as e:
  334. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  335. return True
  336. if detail:
  337. await self._update_voucher_info(detail, auth)
  338. return True
  339. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  340. """更新凭证信息"""
  341. voucher_crud = BillVoucherCRUD(auth)
  342. voucher_list = detail.get("voucher_list")
  343. if not voucher_list:
  344. return
  345. pay_no = detail.get("pay_no")
  346. for voucher in voucher_list:
  347. voucher_id = voucher.get("voucher_id")
  348. if not voucher_id:
  349. continue
  350. voucher_data = {
  351. "pay_no": pay_no,
  352. "voucher_type": voucher.get("voucher_type"),
  353. "voucher_status": voucher.get("voucher_status"),
  354. "invoice_code": voucher.get("invoice_code"),
  355. "invoice_no": voucher.get("invoice_no"),
  356. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  357. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  358. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  359. "check_code": voucher.get("check_code"),
  360. "pdf_url": voucher.get("pdf_url"),
  361. }
  362. await voucher_crud.create_or_update(voucher_id, voucher_data)
  363. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")