bill_handler.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # ========== 标准消费/退款通知:落库 ==========
  48. if data.consume_type in ("CONSUME", "REFUND"):
  49. await self._save_bill_base(data, auth)
  50. try:
  51. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
  52. except Exception as e:
  53. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  54. detail = None
  55. if detail:
  56. await self._save_bill_detail(detail, auth)
  57. # 同步更新本地额度
  58. try:
  59. await self._sync_expense_quota(data, auth)
  60. except Exception as e:
  61. log.warning(f"同步额度失败(不影响主流程): {e}")
  62. # ========== 转账通知(企业码扩展) ==========
  63. elif data.consume_type == "TRANSFER":
  64. from app.plugin.module_payment.account.enums import TransferStatusEnum
  65. reason = (data.notify_reason or "").upper()
  66. if "SUCCESS" in reason:
  67. status = TransferStatusEnum.SUCCESS.value
  68. await AccountService.update_transfer_status_service(
  69. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  70. )
  71. elif "FAIL" in reason:
  72. status = TransferStatusEnum.FAIL.value
  73. await AccountService.update_transfer_status_service(
  74. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  75. )
  76. else:
  77. log.warning(f"转账状态无法判断,跳过更新: pay_no={data.pay_no}, notify_reason={data.notify_reason}")
  78. # 无论是否更新状态,都回调商户通知状态变化
  79. await OpenTransferService.open_return_service(auth, data.pay_no)
  80. else:
  81. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  82. return True
  83. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  84. """保存账单基础数据"""
  85. bill_crud = BillCRUD(auth)
  86. bill_data = {
  87. "enterprise_id": data.enterprise_id,
  88. "account_id": data.account_id,
  89. "employee_id": data.employee_id,
  90. "consume_type": data.consume_type,
  91. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  92. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  93. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  94. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  95. "notify_reason": data.notify_reason,
  96. "notify_msg": data.notify_msg,
  97. "related_pay_no": data.related_pay_no,
  98. "expense_rule_group_id": data.expense_rule_group_id,
  99. "expense_scene_code": data.expense_scene_code,
  100. "expense_type": data.expense_type,
  101. "status": "NEW",
  102. }
  103. await bill_crud.create_or_update(data.pay_no, bill_data)
  104. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  105. async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  106. """消费/退款时同步更新本地额度(pay_expense_quota)
  107. 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id,
  108. 然后按 employee_id + institution_id 更新对应额度的可用金额。
  109. """
  110. if not data.expense_rule_group_id:
  111. return
  112. expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0")
  113. if expense_amount <= 0:
  114. return
  115. # 查使用规则 → 找到制度ID
  116. from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
  117. from app.plugin.module_payment.expense.quota.model import QuotaModel
  118. from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
  119. from sqlalchemy import select, update as sa_update
  120. rule_stmt = select(ExpenseRuleModel).where(
  121. ExpenseRuleModel.rule_id == data.expense_rule_group_id
  122. )
  123. rule_result = await auth.db.execute(rule_stmt)
  124. rule = rule_result.scalar_one_or_none()
  125. if not rule:
  126. log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}")
  127. return
  128. institution_id = rule.institution_id
  129. if not institution_id:
  130. return
  131. # 查该员工在该制度下的额度
  132. quota_stmt = select(QuotaModel).where(
  133. QuotaModel.employee_id == data.employee_id,
  134. QuotaModel.institution_id == institution_id,
  135. )
  136. quota_result = await auth.db.execute(quota_stmt)
  137. quota = quota_result.scalar_one_or_none()
  138. if not quota:
  139. log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}")
  140. return
  141. # 更新可用额度
  142. if data.consume_type == "REFUND":
  143. # 退款:增加可用额度
  144. new_available = (quota.available_amount or Decimal("0")) + expense_amount
  145. else:
  146. # 消费:减少可用额度
  147. new_available = (quota.available_amount or Decimal("0")) - expense_amount
  148. if new_available < 0:
  149. new_available = Decimal("0")
  150. new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value
  151. upd = (
  152. sa_update(QuotaModel)
  153. .where(QuotaModel.id == quota.id)
  154. .values(available_amount=new_available, status=new_status)
  155. )
  156. await auth.db.execute(upd)
  157. await auth.db.flush()
  158. log.info(
  159. f"消费通知同步额度: employee_id={data.employee_id}, "
  160. f"institution_id={institution_id}, "
  161. f"consume_type={data.consume_type}, "
  162. f"amount={expense_amount}, "
  163. f"available_amount={new_available}, status={new_status}"
  164. )
  165. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  166. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  167. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  168. AlipayCommerceEcConsumeDetailQueryRequest,
  169. )
  170. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  171. AlipayCommerceEcConsumeDetailQueryResponse,
  172. )
  173. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  174. model = AlipayCommerceEcConsumeDetailQueryModel()
  175. model.pay_no = pay_no
  176. model.enterprise_id = enterprise_id
  177. request = AlipayCommerceEcConsumeDetailQueryRequest()
  178. request.biz_model = model
  179. client = AlipayClient.get_client()
  180. response = client.execute(request)
  181. if not response:
  182. log.error("查询账单详情失败: 无响应")
  183. return None
  184. result = AlipayCommerceEcConsumeDetailQueryResponse()
  185. result.parse_response_content(response)
  186. if not result.is_success():
  187. log.error(f"查询账单详情失败: {result.msg}")
  188. return None
  189. # 解析响应为字典
  190. return {
  191. "pay_no": getattr(result.consume_info, "pay_no", None),
  192. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  193. "employee_id": getattr(result.consume_info, "employee_id", None),
  194. "employee_name": getattr(result.consume_info, "employee_name", None),
  195. "consume_amount": getattr(result, "consume_amount", None),
  196. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  197. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  198. "order_info": self._parse_order_info(result),
  199. "voucher_list": self._parse_voucher_list(result),
  200. }
  201. def _parse_order_info(self, result) -> dict | None:
  202. """解析订单信息"""
  203. order_info = getattr(result, "order_info", None)
  204. if not order_info:
  205. return None
  206. return {
  207. "order_no": getattr(order_info, "order_no", None),
  208. "trade_no": getattr(order_info, "trade_no", None),
  209. "product_code": getattr(order_info, "product_code", None),
  210. "order_title": getattr(order_info, "order_title", None),
  211. "order_amount": getattr(order_info, "order_amount", None),
  212. "order_status": getattr(order_info, "order_status", None),
  213. "merchant_name": getattr(order_info, "merchant_name", None),
  214. "merchant_id": getattr(order_info, "merchant_id", None),
  215. "shop_name": getattr(order_info, "shop_name", None),
  216. "gmt_payment": getattr(order_info, "gmt_payment", None),
  217. "fund_channel": getattr(order_info, "fund_channel", None),
  218. }
  219. def _parse_voucher_list(self, result) -> list | None:
  220. """解析凭证列表"""
  221. voucher_list = getattr(result, "voucher_list", None)
  222. if not voucher_list:
  223. return None
  224. vouchers = []
  225. for voucher in voucher_list:
  226. vouchers.append({
  227. "voucher_id": getattr(voucher, "voucher_id", None),
  228. "voucher_type": getattr(voucher, "voucher_type", None),
  229. "voucher_status": getattr(voucher, "voucher_status", None),
  230. "invoice_code": getattr(voucher, "invoice_code", None),
  231. "invoice_no": getattr(voucher, "invoice_no", None),
  232. "invoice_amount": getattr(voucher, "invoice_amount", None),
  233. "tax_amount": getattr(voucher, "tax_amount", None),
  234. "issue_date": getattr(voucher, "issue_date", None),
  235. "check_code": getattr(voucher, "check_code", None),
  236. "pdf_url": getattr(voucher, "pdf_url", None),
  237. })
  238. return vouchers
  239. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  240. """保存账单详情和凭证数据"""
  241. bill_crud = BillCRUD(auth)
  242. pay_no = detail.get("pay_no")
  243. if not pay_no:
  244. return
  245. # 更新账单详情
  246. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  247. # 保存订单详情
  248. order_info = detail.get("order_info")
  249. if order_info:
  250. await self._save_order_detail(pay_no, order_info, auth)
  251. # 保存凭证列表
  252. voucher_list = detail.get("voucher_list")
  253. if voucher_list:
  254. await self._save_voucher_list(pay_no, voucher_list, auth)
  255. log.info(f"保存账单详情成功: pay_no={pay_no}")
  256. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  257. """保存订单详情"""
  258. order_crud = BillOrderCRUD(auth)
  259. order_no = order_info.get("order_no")
  260. if not order_no:
  261. return
  262. order_data = {
  263. "pay_no": pay_no,
  264. "trade_no": order_info.get("trade_no"),
  265. "product_code": order_info.get("product_code"),
  266. "order_title": order_info.get("order_title"),
  267. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  268. "order_status": order_info.get("order_status"),
  269. "merchant_name": order_info.get("merchant_name"),
  270. "merchant_id": order_info.get("merchant_id"),
  271. "shop_name": order_info.get("shop_name"),
  272. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  273. "fund_channel": order_info.get("fund_channel"),
  274. }
  275. await order_crud.create_or_update(order_no, order_data)
  276. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  277. """保存凭证列表"""
  278. voucher_crud = BillVoucherCRUD(auth)
  279. for voucher in voucher_list:
  280. voucher_id = voucher.get("voucher_id")
  281. if not voucher_id:
  282. continue
  283. voucher_data = {
  284. "pay_no": pay_no,
  285. "voucher_type": voucher.get("voucher_type"),
  286. "voucher_status": voucher.get("voucher_status"),
  287. "invoice_code": voucher.get("invoice_code"),
  288. "invoice_no": voucher.get("invoice_no"),
  289. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  290. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  291. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  292. "check_code": voucher.get("check_code"),
  293. "pdf_url": voucher.get("pdf_url"),
  294. }
  295. await voucher_crud.create_or_update(voucher_id, voucher_data)
  296. class VoucherHandler(BaseHandler[dict]):
  297. """凭证变动通知处理器"""
  298. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  299. """
  300. 处理凭证变动通知
  301. 流程:
  302. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  303. 2. 更新凭证信息
  304. """
  305. try:
  306. notify_data = VoucherChangeContent(**content)
  307. except Exception as e:
  308. log.error(f"解析凭证通知内容失败: {e}")
  309. return False
  310. action = notify_data.action
  311. voucher_id = notify_data.voucher_id
  312. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  313. try:
  314. return await self._process_voucher(notify_data, auth)
  315. except Exception as e:
  316. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  317. return False
  318. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  319. """处理凭证变动通知
  320. 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。
  321. """
  322. log.info(
  323. f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, "
  324. f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}"
  325. )
  326. if not data.pay_no:
  327. log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}")
  328. return True
  329. try:
  330. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth)
  331. except Exception as e:
  332. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  333. return True
  334. if detail:
  335. await self._update_voucher_info(detail, auth)
  336. return True
  337. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  338. """更新凭证信息"""
  339. voucher_crud = BillVoucherCRUD(auth)
  340. voucher_list = detail.get("voucher_list")
  341. if not voucher_list:
  342. return
  343. pay_no = detail.get("pay_no")
  344. for voucher in voucher_list:
  345. voucher_id = voucher.get("voucher_id")
  346. if not voucher_id:
  347. continue
  348. voucher_data = {
  349. "pay_no": pay_no,
  350. "voucher_type": voucher.get("voucher_type"),
  351. "voucher_status": voucher.get("voucher_status"),
  352. "invoice_code": voucher.get("invoice_code"),
  353. "invoice_no": voucher.get("invoice_no"),
  354. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  355. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  356. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  357. "check_code": voucher.get("check_code"),
  358. "pdf_url": voucher.get("pdf_url"),
  359. }
  360. await voucher_crud.create_or_update(voucher_id, voucher_data)
  361. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")