bill_handler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # ========== 标准消费/退款通知:落库 ==========
  48. if data.consume_type in ("CONSUME", "REFUND"):
  49. await self._save_bill_base(data, auth)
  50. try:
  51. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
  52. except Exception as e:
  53. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  54. detail = None
  55. if detail:
  56. await self._save_bill_detail(detail, auth)
  57. # 同步更新本地额度
  58. try:
  59. await self._sync_expense_quota(data, auth)
  60. except Exception as e:
  61. log.warning(f"同步额度失败(不影响主流程): {e}")
  62. # ========== 转账通知(企业码扩展) ==========
  63. elif data.consume_type == "TRANSFER":
  64. from app.plugin.module_payment.account.enums import TransferStatusEnum
  65. reason = (data.notify_reason or "").upper()
  66. if "SUCCESS" in reason:
  67. status = TransferStatusEnum.SUCCESS.value
  68. elif "FAIL" in reason:
  69. status = TransferStatusEnum.FAIL.value
  70. else:
  71. # 无法判断状态时不更新(避免把已成功的状态改成DEALING)
  72. log.warning(f"转账状态无法判断,跳过更新: pay_no={data.pay_no}, notify_reason={data.notify_reason}")
  73. return True
  74. await AccountService.update_transfer_status_service(
  75. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  76. )
  77. # 无论成功/失败都回调商户,告知最终结果
  78. await OpenTransferService.open_return_service(auth, data.pay_no)
  79. else:
  80. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  81. return True
  82. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  83. """保存账单基础数据"""
  84. bill_crud = BillCRUD(auth)
  85. bill_data = {
  86. "enterprise_id": data.enterprise_id,
  87. "account_id": data.account_id,
  88. "employee_id": data.employee_id,
  89. "consume_type": data.consume_type,
  90. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  91. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  92. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  93. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  94. "notify_reason": data.notify_reason,
  95. "notify_msg": data.notify_msg,
  96. "related_pay_no": data.related_pay_no,
  97. "expense_rule_group_id": data.expense_rule_group_id,
  98. "expense_scene_code": data.expense_scene_code,
  99. "expense_type": data.expense_type,
  100. "status": "NEW",
  101. }
  102. await bill_crud.create_or_update(data.pay_no, bill_data)
  103. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  104. async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  105. """消费/退款时同步更新本地额度(pay_expense_quota)
  106. 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id,
  107. 然后按 employee_id + institution_id 更新对应额度的可用金额。
  108. """
  109. if not data.expense_rule_group_id:
  110. return
  111. expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0")
  112. if expense_amount <= 0:
  113. return
  114. # 查使用规则 → 找到制度ID
  115. from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
  116. from app.plugin.module_payment.expense.quota.model import QuotaModel
  117. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  118. from sqlalchemy import select, update as sa_update
  119. rule_stmt = select(ExpenseRuleModel).where(
  120. ExpenseRuleModel.rule_id == data.expense_rule_group_id
  121. )
  122. rule_result = await auth.db.execute(rule_stmt)
  123. rule = rule_result.scalar_one_or_none()
  124. if not rule:
  125. log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}")
  126. return
  127. institution_id = rule.institution_id
  128. if not institution_id:
  129. return
  130. # 查该员工在该制度下的额度
  131. quota_stmt = select(QuotaModel).where(
  132. QuotaModel.employee_id == data.employee_id,
  133. QuotaModel.institution_id == institution_id,
  134. )
  135. quota_result = await auth.db.execute(quota_stmt)
  136. quota = quota_result.scalar_one_or_none()
  137. if not quota:
  138. log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}")
  139. return
  140. # 更新可用额度
  141. if data.consume_type == "REFUND":
  142. # 退款:增加可用额度
  143. new_available = (quota.available_amount or Decimal("0")) + expense_amount
  144. else:
  145. # 消费:减少可用额度
  146. new_available = (quota.available_amount or Decimal("0")) - expense_amount
  147. if new_available < 0:
  148. new_available = Decimal("0")
  149. new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value
  150. upd = (
  151. sa_update(QuotaModel)
  152. .where(QuotaModel.id == quota.id)
  153. .values(available_amount=new_available, status=new_status)
  154. )
  155. await auth.db.execute(upd)
  156. await auth.db.flush()
  157. log.info(
  158. f"消费通知同步额度: employee_id={data.employee_id}, "
  159. f"institution_id={institution_id}, "
  160. f"consume_type={data.consume_type}, "
  161. f"amount={expense_amount}, "
  162. f"available_amount={new_available}, status={new_status}"
  163. )
  164. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  165. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  166. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  167. AlipayCommerceEcConsumeDetailQueryRequest,
  168. )
  169. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  170. AlipayCommerceEcConsumeDetailQueryResponse,
  171. )
  172. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  173. model = AlipayCommerceEcConsumeDetailQueryModel()
  174. model.pay_no = pay_no
  175. model.enterprise_id = enterprise_id
  176. request = AlipayCommerceEcConsumeDetailQueryRequest()
  177. request.biz_model = model
  178. client = AlipayClient.get_client()
  179. response = client.execute(request)
  180. if not response:
  181. log.error("查询账单详情失败: 无响应")
  182. return None
  183. result = AlipayCommerceEcConsumeDetailQueryResponse()
  184. result.parse_response_content(response)
  185. if not result.is_success():
  186. log.error(f"查询账单详情失败: {result.msg}")
  187. return None
  188. # 解析响应为字典
  189. return {
  190. "pay_no": getattr(result.consume_info, "pay_no", None),
  191. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  192. "employee_id": getattr(result.consume_info, "employee_id", None),
  193. "employee_name": getattr(result.consume_info, "employee_name", None),
  194. "consume_amount": getattr(result, "consume_amount", None),
  195. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  196. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  197. "order_info": self._parse_order_info(result),
  198. "voucher_list": self._parse_voucher_list(result),
  199. }
  200. def _parse_order_info(self, result) -> dict | None:
  201. """解析订单信息"""
  202. order_info = getattr(result, "order_info", None)
  203. if not order_info:
  204. return None
  205. return {
  206. "order_no": getattr(order_info, "order_no", None),
  207. "trade_no": getattr(order_info, "trade_no", None),
  208. "product_code": getattr(order_info, "product_code", None),
  209. "order_title": getattr(order_info, "order_title", None),
  210. "order_amount": getattr(order_info, "order_amount", None),
  211. "order_status": getattr(order_info, "order_status", None),
  212. "merchant_name": getattr(order_info, "merchant_name", None),
  213. "merchant_id": getattr(order_info, "merchant_id", None),
  214. "shop_name": getattr(order_info, "shop_name", None),
  215. "gmt_payment": getattr(order_info, "gmt_payment", None),
  216. "fund_channel": getattr(order_info, "fund_channel", None),
  217. }
  218. def _parse_voucher_list(self, result) -> list | None:
  219. """解析凭证列表"""
  220. voucher_list = getattr(result, "voucher_list", None)
  221. if not voucher_list:
  222. return None
  223. vouchers = []
  224. for voucher in voucher_list:
  225. vouchers.append({
  226. "voucher_id": getattr(voucher, "voucher_id", None),
  227. "voucher_type": getattr(voucher, "voucher_type", None),
  228. "voucher_status": getattr(voucher, "voucher_status", None),
  229. "invoice_code": getattr(voucher, "invoice_code", None),
  230. "invoice_no": getattr(voucher, "invoice_no", None),
  231. "invoice_amount": getattr(voucher, "invoice_amount", None),
  232. "tax_amount": getattr(voucher, "tax_amount", None),
  233. "issue_date": getattr(voucher, "issue_date", None),
  234. "check_code": getattr(voucher, "check_code", None),
  235. "pdf_url": getattr(voucher, "pdf_url", None),
  236. })
  237. return vouchers
  238. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  239. """保存账单详情和凭证数据"""
  240. bill_crud = BillCRUD(auth)
  241. pay_no = detail.get("pay_no")
  242. if not pay_no:
  243. return
  244. # 更新账单详情
  245. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  246. # 保存订单详情
  247. order_info = detail.get("order_info")
  248. if order_info:
  249. await self._save_order_detail(pay_no, order_info, auth)
  250. # 保存凭证列表
  251. voucher_list = detail.get("voucher_list")
  252. if voucher_list:
  253. await self._save_voucher_list(pay_no, voucher_list, auth)
  254. log.info(f"保存账单详情成功: pay_no={pay_no}")
  255. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  256. """保存订单详情"""
  257. order_crud = BillOrderCRUD(auth)
  258. order_no = order_info.get("order_no")
  259. if not order_no:
  260. return
  261. order_data = {
  262. "pay_no": pay_no,
  263. "trade_no": order_info.get("trade_no"),
  264. "product_code": order_info.get("product_code"),
  265. "order_title": order_info.get("order_title"),
  266. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  267. "order_status": order_info.get("order_status"),
  268. "merchant_name": order_info.get("merchant_name"),
  269. "merchant_id": order_info.get("merchant_id"),
  270. "shop_name": order_info.get("shop_name"),
  271. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  272. "fund_channel": order_info.get("fund_channel"),
  273. }
  274. await order_crud.create_or_update(order_no, order_data)
  275. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  276. """保存凭证列表"""
  277. voucher_crud = BillVoucherCRUD(auth)
  278. for voucher in voucher_list:
  279. voucher_id = voucher.get("voucher_id")
  280. if not voucher_id:
  281. continue
  282. voucher_data = {
  283. "pay_no": pay_no,
  284. "voucher_type": voucher.get("voucher_type"),
  285. "voucher_status": voucher.get("voucher_status"),
  286. "invoice_code": voucher.get("invoice_code"),
  287. "invoice_no": voucher.get("invoice_no"),
  288. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  289. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  290. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  291. "check_code": voucher.get("check_code"),
  292. "pdf_url": voucher.get("pdf_url"),
  293. }
  294. await voucher_crud.create_or_update(voucher_id, voucher_data)
  295. class VoucherHandler(BaseHandler[dict]):
  296. """凭证变动通知处理器"""
  297. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  298. """
  299. 处理凭证变动通知
  300. 流程:
  301. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  302. 2. 更新凭证信息
  303. """
  304. try:
  305. notify_data = VoucherChangeContent(**content)
  306. except Exception as e:
  307. log.error(f"解析凭证通知内容失败: {e}")
  308. return False
  309. action = notify_data.action
  310. voucher_id = notify_data.voucher_id
  311. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  312. try:
  313. return await self._process_voucher(notify_data, auth)
  314. except Exception as e:
  315. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  316. return False
  317. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  318. """处理凭证变动通知
  319. 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。
  320. """
  321. log.info(
  322. f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, "
  323. f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}"
  324. )
  325. if not data.pay_no:
  326. log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}")
  327. return True
  328. try:
  329. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth)
  330. except Exception as e:
  331. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  332. return True
  333. if detail:
  334. await self._update_voucher_info(detail, auth)
  335. return True
  336. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  337. """更新凭证信息"""
  338. voucher_crud = BillVoucherCRUD(auth)
  339. voucher_list = detail.get("voucher_list")
  340. if not voucher_list:
  341. return
  342. pay_no = detail.get("pay_no")
  343. for voucher in voucher_list:
  344. voucher_id = voucher.get("voucher_id")
  345. if not voucher_id:
  346. continue
  347. voucher_data = {
  348. "pay_no": pay_no,
  349. "voucher_type": voucher.get("voucher_type"),
  350. "voucher_status": voucher.get("voucher_status"),
  351. "invoice_code": voucher.get("invoice_code"),
  352. "invoice_no": voucher.get("invoice_no"),
  353. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  354. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  355. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  356. "check_code": voucher.get("check_code"),
  357. "pdf_url": voucher.get("pdf_url"),
  358. }
  359. await voucher_crud.create_or_update(voucher_id, voucher_data)
  360. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")