bill_handler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # ========== 标准消费/退款通知:落库 ==========
  48. if data.consume_type in ("CONSUME", "REFUND"):
  49. await self._save_bill_base(data, auth)
  50. try:
  51. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
  52. except Exception as e:
  53. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  54. detail = None
  55. if detail:
  56. await self._save_bill_detail(detail, auth)
  57. # 同步更新本地额度
  58. try:
  59. await self._sync_expense_quota(data, auth)
  60. except Exception as e:
  61. log.warning(f"同步额度失败(不影响主流程): {e}")
  62. # ========== 转账通知(企业码扩展) ==========
  63. elif data.consume_type == "TRANSFER":
  64. from app.plugin.module_payment.account.enums import TransferStatusEnum
  65. reason = (data.notify_reason or "").upper()
  66. if "SUCCESS" in reason:
  67. status = TransferStatusEnum.SUCCESS.value
  68. elif "FAIL" in reason:
  69. status = TransferStatusEnum.FAIL.value
  70. else:
  71. status = TransferStatusEnum.DEALING.value
  72. log.warning(f"转账状态无法判断: pay_no={data.pay_no}, notify_reason={data.notify_reason}")
  73. await AccountService.update_transfer_status_service(
  74. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  75. )
  76. # 无论成功/失败都回调商户,告知最终结果
  77. await OpenTransferService.open_return_service(auth, data.pay_no)
  78. else:
  79. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  80. return True
  81. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  82. """保存账单基础数据"""
  83. bill_crud = BillCRUD(auth)
  84. bill_data = {
  85. "enterprise_id": data.enterprise_id,
  86. "account_id": data.account_id,
  87. "employee_id": data.employee_id,
  88. "consume_type": data.consume_type,
  89. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  90. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  91. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  92. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  93. "notify_reason": data.notify_reason,
  94. "notify_msg": data.notify_msg,
  95. "related_pay_no": data.related_pay_no,
  96. "expense_rule_group_id": data.expense_rule_group_id,
  97. "expense_scene_code": data.expense_scene_code,
  98. "expense_type": data.expense_type,
  99. "status": "NEW",
  100. }
  101. await bill_crud.create_or_update(data.pay_no, bill_data)
  102. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  103. async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  104. """消费/退款时同步更新本地额度(pay_expense_quota)
  105. 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id,
  106. 然后按 employee_id + institution_id 更新对应额度的可用金额。
  107. """
  108. if not data.expense_rule_group_id:
  109. return
  110. expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0")
  111. if expense_amount <= 0:
  112. return
  113. # 查使用规则 → 找到制度ID
  114. from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
  115. from app.plugin.module_payment.expense.quota.model import QuotaModel
  116. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  117. from sqlalchemy import select, update as sa_update
  118. rule_stmt = select(ExpenseRuleModel).where(
  119. ExpenseRuleModel.rule_id == data.expense_rule_group_id
  120. )
  121. rule_result = await auth.db.execute(rule_stmt)
  122. rule = rule_result.scalar_one_or_none()
  123. if not rule:
  124. log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}")
  125. return
  126. institution_id = rule.institution_id
  127. if not institution_id:
  128. return
  129. # 查该员工在该制度下的额度
  130. quota_stmt = select(QuotaModel).where(
  131. QuotaModel.employee_id == data.employee_id,
  132. QuotaModel.institution_id == institution_id,
  133. )
  134. quota_result = await auth.db.execute(quota_stmt)
  135. quota = quota_result.scalar_one_or_none()
  136. if not quota:
  137. log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}")
  138. return
  139. # 更新可用额度
  140. if data.consume_type == "REFUND":
  141. # 退款:增加可用额度
  142. new_available = (quota.available_amount or Decimal("0")) + expense_amount
  143. else:
  144. # 消费:减少可用额度
  145. new_available = (quota.available_amount or Decimal("0")) - expense_amount
  146. if new_available < 0:
  147. new_available = Decimal("0")
  148. new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value
  149. upd = (
  150. sa_update(QuotaModel)
  151. .where(QuotaModel.id == quota.id)
  152. .values(available_amount=new_available, status=new_status)
  153. )
  154. await auth.db.execute(upd)
  155. await auth.db.flush()
  156. log.info(
  157. f"消费通知同步额度: employee_id={data.employee_id}, "
  158. f"institution_id={institution_id}, "
  159. f"consume_type={data.consume_type}, "
  160. f"amount={expense_amount}, "
  161. f"available_amount={new_available}, status={new_status}"
  162. )
  163. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  164. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  165. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  166. AlipayCommerceEcConsumeDetailQueryRequest,
  167. )
  168. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  169. AlipayCommerceEcConsumeDetailQueryResponse,
  170. )
  171. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  172. model = AlipayCommerceEcConsumeDetailQueryModel()
  173. model.pay_no = pay_no
  174. model.enterprise_id = enterprise_id
  175. request = AlipayCommerceEcConsumeDetailQueryRequest()
  176. request.biz_model = model
  177. client = AlipayClient.get_client()
  178. response = client.execute(request)
  179. if not response:
  180. log.error("查询账单详情失败: 无响应")
  181. return None
  182. result = AlipayCommerceEcConsumeDetailQueryResponse()
  183. result.parse_response_content(response)
  184. if not result.is_success():
  185. log.error(f"查询账单详情失败: {result.msg}")
  186. return None
  187. # 解析响应为字典
  188. return {
  189. "pay_no": getattr(result.consume_info, "pay_no", None),
  190. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  191. "employee_id": getattr(result.consume_info, "employee_id", None),
  192. "employee_name": getattr(result.consume_info, "employee_name", None),
  193. "consume_amount": getattr(result, "consume_amount", None),
  194. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  195. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  196. "order_info": self._parse_order_info(result),
  197. "voucher_list": self._parse_voucher_list(result),
  198. }
  199. def _parse_order_info(self, result) -> dict | None:
  200. """解析订单信息"""
  201. order_info = getattr(result, "order_info", None)
  202. if not order_info:
  203. return None
  204. return {
  205. "order_no": getattr(order_info, "order_no", None),
  206. "trade_no": getattr(order_info, "trade_no", None),
  207. "product_code": getattr(order_info, "product_code", None),
  208. "order_title": getattr(order_info, "order_title", None),
  209. "order_amount": getattr(order_info, "order_amount", None),
  210. "order_status": getattr(order_info, "order_status", None),
  211. "merchant_name": getattr(order_info, "merchant_name", None),
  212. "merchant_id": getattr(order_info, "merchant_id", None),
  213. "shop_name": getattr(order_info, "shop_name", None),
  214. "gmt_payment": getattr(order_info, "gmt_payment", None),
  215. "fund_channel": getattr(order_info, "fund_channel", None),
  216. }
  217. def _parse_voucher_list(self, result) -> list | None:
  218. """解析凭证列表"""
  219. voucher_list = getattr(result, "voucher_list", None)
  220. if not voucher_list:
  221. return None
  222. vouchers = []
  223. for voucher in voucher_list:
  224. vouchers.append({
  225. "voucher_id": getattr(voucher, "voucher_id", None),
  226. "voucher_type": getattr(voucher, "voucher_type", None),
  227. "voucher_status": getattr(voucher, "voucher_status", None),
  228. "invoice_code": getattr(voucher, "invoice_code", None),
  229. "invoice_no": getattr(voucher, "invoice_no", None),
  230. "invoice_amount": getattr(voucher, "invoice_amount", None),
  231. "tax_amount": getattr(voucher, "tax_amount", None),
  232. "issue_date": getattr(voucher, "issue_date", None),
  233. "check_code": getattr(voucher, "check_code", None),
  234. "pdf_url": getattr(voucher, "pdf_url", None),
  235. })
  236. return vouchers
  237. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  238. """保存账单详情和凭证数据"""
  239. bill_crud = BillCRUD(auth)
  240. pay_no = detail.get("pay_no")
  241. if not pay_no:
  242. return
  243. # 更新账单详情
  244. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  245. # 保存订单详情
  246. order_info = detail.get("order_info")
  247. if order_info:
  248. await self._save_order_detail(pay_no, order_info, auth)
  249. # 保存凭证列表
  250. voucher_list = detail.get("voucher_list")
  251. if voucher_list:
  252. await self._save_voucher_list(pay_no, voucher_list, auth)
  253. log.info(f"保存账单详情成功: pay_no={pay_no}")
  254. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  255. """保存订单详情"""
  256. order_crud = BillOrderCRUD(auth)
  257. order_no = order_info.get("order_no")
  258. if not order_no:
  259. return
  260. order_data = {
  261. "pay_no": pay_no,
  262. "trade_no": order_info.get("trade_no"),
  263. "product_code": order_info.get("product_code"),
  264. "order_title": order_info.get("order_title"),
  265. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  266. "order_status": order_info.get("order_status"),
  267. "merchant_name": order_info.get("merchant_name"),
  268. "merchant_id": order_info.get("merchant_id"),
  269. "shop_name": order_info.get("shop_name"),
  270. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  271. "fund_channel": order_info.get("fund_channel"),
  272. }
  273. await order_crud.create_or_update(order_no, order_data)
  274. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  275. """保存凭证列表"""
  276. voucher_crud = BillVoucherCRUD(auth)
  277. for voucher in voucher_list:
  278. voucher_id = voucher.get("voucher_id")
  279. if not voucher_id:
  280. continue
  281. voucher_data = {
  282. "pay_no": pay_no,
  283. "voucher_type": voucher.get("voucher_type"),
  284. "voucher_status": voucher.get("voucher_status"),
  285. "invoice_code": voucher.get("invoice_code"),
  286. "invoice_no": voucher.get("invoice_no"),
  287. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  288. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  289. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  290. "check_code": voucher.get("check_code"),
  291. "pdf_url": voucher.get("pdf_url"),
  292. }
  293. await voucher_crud.create_or_update(voucher_id, voucher_data)
  294. class VoucherHandler(BaseHandler[dict]):
  295. """凭证变动通知处理器"""
  296. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  297. """
  298. 处理凭证变动通知
  299. 流程:
  300. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  301. 2. 更新凭证信息
  302. """
  303. try:
  304. notify_data = VoucherChangeContent(**content)
  305. except Exception as e:
  306. log.error(f"解析凭证通知内容失败: {e}")
  307. return False
  308. action = notify_data.action
  309. voucher_id = notify_data.voucher_id
  310. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  311. try:
  312. return await self._process_voucher(notify_data, auth)
  313. except Exception as e:
  314. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  315. return False
  316. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  317. """处理凭证变动通知
  318. 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。
  319. """
  320. log.info(
  321. f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, "
  322. f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}"
  323. )
  324. if not data.pay_no:
  325. log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}")
  326. return True
  327. try:
  328. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth)
  329. except Exception as e:
  330. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  331. return True
  332. if detail:
  333. await self._update_voucher_info(detail, auth)
  334. return True
  335. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  336. """更新凭证信息"""
  337. voucher_crud = BillVoucherCRUD(auth)
  338. voucher_list = detail.get("voucher_list")
  339. if not voucher_list:
  340. return
  341. pay_no = detail.get("pay_no")
  342. for voucher in voucher_list:
  343. voucher_id = voucher.get("voucher_id")
  344. if not voucher_id:
  345. continue
  346. voucher_data = {
  347. "pay_no": pay_no,
  348. "voucher_type": voucher.get("voucher_type"),
  349. "voucher_status": voucher.get("voucher_status"),
  350. "invoice_code": voucher.get("invoice_code"),
  351. "invoice_no": voucher.get("invoice_no"),
  352. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  353. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  354. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  355. "check_code": voucher.get("check_code"),
  356. "pdf_url": voucher.get("pdf_url"),
  357. }
  358. await voucher_crud.create_or_update(voucher_id, voucher_data)
  359. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")