bill_handler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # ========== 标准消费/退款通知:落库 ==========
  48. if data.consume_type in ("CONSUME", "REFUND"):
  49. await self._save_bill_base(data, auth)
  50. try:
  51. detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
  52. except Exception as e:
  53. log.warning(f"查询账单详情失败(不影响主流程): {e}")
  54. detail = None
  55. if detail:
  56. await self._save_bill_detail(detail, auth)
  57. # ========== 转账通知(企业码扩展) ==========
  58. elif data.consume_type == "TRANSFER":
  59. from app.plugin.module_payment.account.enums import TransferStatusEnum
  60. reason = (data.notify_reason or "").upper()
  61. if "SUCCESS" in reason:
  62. status = TransferStatusEnum.SUCCESS.value
  63. elif "FAIL" in reason:
  64. status = TransferStatusEnum.FAIL.value
  65. else:
  66. status = TransferStatusEnum.DEALING.value
  67. log.warning(f"转账状态无法判断: pay_no={data.pay_no}, notify_reason={data.notify_reason}")
  68. await AccountService.update_transfer_status_service(
  69. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  70. )
  71. # 无论成功/失败都回调商户,告知最终结果
  72. await OpenTransferService.open_return_service(auth, data.pay_no)
  73. else:
  74. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  75. return True
  76. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  77. """保存账单基础数据"""
  78. bill_crud = BillCRUD(auth)
  79. bill_data = {
  80. "enterprise_id": data.enterprise_id,
  81. "account_id": data.account_id,
  82. "employee_id": data.employee_id,
  83. "consume_type": data.consume_type,
  84. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  85. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  86. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  87. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  88. "notify_reason": data.notify_reason,
  89. "notify_msg": data.notify_msg,
  90. "related_pay_no": data.related_pay_no,
  91. "expense_rule_group_id": data.expense_rule_group_id,
  92. "expense_scene_code": data.expense_scene_code,
  93. "expense_type": data.expense_type,
  94. "status": "NEW",
  95. }
  96. await bill_crud.create_or_update(data.pay_no, bill_data)
  97. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  98. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  99. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  100. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  101. AlipayCommerceEcConsumeDetailQueryRequest,
  102. )
  103. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  104. AlipayCommerceEcConsumeDetailQueryResponse,
  105. )
  106. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  107. model = AlipayCommerceEcConsumeDetailQueryModel()
  108. model.pay_no = pay_no
  109. model.enterprise_id = enterprise_id
  110. request = AlipayCommerceEcConsumeDetailQueryRequest()
  111. request.biz_model = model
  112. client = AlipayClient.get_client()
  113. response = client.execute(request)
  114. if not response:
  115. log.error("查询账单详情失败: 无响应")
  116. return None
  117. result = AlipayCommerceEcConsumeDetailQueryResponse()
  118. result.parse_response_content(response)
  119. if not result.is_success():
  120. log.error(f"查询账单详情失败: {result.msg}")
  121. return None
  122. # 解析响应为字典
  123. return {
  124. "pay_no": getattr(result.consume_info, "pay_no", None),
  125. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  126. "employee_id": getattr(result.consume_info, "employee_id", None),
  127. "employee_name": getattr(result.consume_info, "employee_name", None),
  128. "consume_amount": getattr(result, "consume_amount", None),
  129. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  130. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  131. "order_info": self._parse_order_info(result),
  132. "voucher_list": self._parse_voucher_list(result),
  133. }
  134. def _parse_order_info(self, result) -> dict | None:
  135. """解析订单信息"""
  136. order_info = getattr(result, "order_info", None)
  137. if not order_info:
  138. return None
  139. return {
  140. "order_no": getattr(order_info, "order_no", None),
  141. "trade_no": getattr(order_info, "trade_no", None),
  142. "product_code": getattr(order_info, "product_code", None),
  143. "order_title": getattr(order_info, "order_title", None),
  144. "order_amount": getattr(order_info, "order_amount", None),
  145. "order_status": getattr(order_info, "order_status", None),
  146. "merchant_name": getattr(order_info, "merchant_name", None),
  147. "merchant_id": getattr(order_info, "merchant_id", None),
  148. "shop_name": getattr(order_info, "shop_name", None),
  149. "gmt_payment": getattr(order_info, "gmt_payment", None),
  150. "fund_channel": getattr(order_info, "fund_channel", None),
  151. }
  152. def _parse_voucher_list(self, result) -> list | None:
  153. """解析凭证列表"""
  154. voucher_list = getattr(result, "voucher_list", None)
  155. if not voucher_list:
  156. return None
  157. vouchers = []
  158. for voucher in voucher_list:
  159. vouchers.append({
  160. "voucher_id": getattr(voucher, "voucher_id", None),
  161. "voucher_type": getattr(voucher, "voucher_type", None),
  162. "voucher_status": getattr(voucher, "voucher_status", None),
  163. "invoice_code": getattr(voucher, "invoice_code", None),
  164. "invoice_no": getattr(voucher, "invoice_no", None),
  165. "invoice_amount": getattr(voucher, "invoice_amount", None),
  166. "tax_amount": getattr(voucher, "tax_amount", None),
  167. "issue_date": getattr(voucher, "issue_date", None),
  168. "check_code": getattr(voucher, "check_code", None),
  169. "pdf_url": getattr(voucher, "pdf_url", None),
  170. })
  171. return vouchers
  172. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  173. """保存账单详情和凭证数据"""
  174. bill_crud = BillCRUD(auth)
  175. pay_no = detail.get("pay_no")
  176. if not pay_no:
  177. return
  178. # 更新账单详情
  179. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  180. # 保存订单详情
  181. order_info = detail.get("order_info")
  182. if order_info:
  183. await self._save_order_detail(pay_no, order_info, auth)
  184. # 保存凭证列表
  185. voucher_list = detail.get("voucher_list")
  186. if voucher_list:
  187. await self._save_voucher_list(pay_no, voucher_list, auth)
  188. log.info(f"保存账单详情成功: pay_no={pay_no}")
  189. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  190. """保存订单详情"""
  191. order_crud = BillOrderCRUD(auth)
  192. order_no = order_info.get("order_no")
  193. if not order_no:
  194. return
  195. order_data = {
  196. "pay_no": pay_no,
  197. "trade_no": order_info.get("trade_no"),
  198. "product_code": order_info.get("product_code"),
  199. "order_title": order_info.get("order_title"),
  200. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  201. "order_status": order_info.get("order_status"),
  202. "merchant_name": order_info.get("merchant_name"),
  203. "merchant_id": order_info.get("merchant_id"),
  204. "shop_name": order_info.get("shop_name"),
  205. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  206. "fund_channel": order_info.get("fund_channel"),
  207. }
  208. await order_crud.create_or_update(order_no, order_data)
  209. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  210. """保存凭证列表"""
  211. voucher_crud = BillVoucherCRUD(auth)
  212. for voucher in voucher_list:
  213. voucher_id = voucher.get("voucher_id")
  214. if not voucher_id:
  215. continue
  216. voucher_data = {
  217. "pay_no": pay_no,
  218. "voucher_type": voucher.get("voucher_type"),
  219. "voucher_status": voucher.get("voucher_status"),
  220. "invoice_code": voucher.get("invoice_code"),
  221. "invoice_no": voucher.get("invoice_no"),
  222. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  223. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  224. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  225. "check_code": voucher.get("check_code"),
  226. "pdf_url": voucher.get("pdf_url"),
  227. }
  228. await voucher_crud.create_or_update(voucher_id, voucher_data)
  229. class VoucherHandler(BaseHandler[dict]):
  230. """凭证变动通知处理器"""
  231. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  232. """
  233. 处理凭证变动通知
  234. 流程:
  235. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  236. 2. 更新凭证信息
  237. """
  238. try:
  239. notify_data = VoucherChangeContent(**content)
  240. except Exception as e:
  241. log.error(f"解析凭证通知内容失败: {e}")
  242. return False
  243. action = notify_data.action
  244. voucher_id = notify_data.voucher_id
  245. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  246. try:
  247. return await self._process_voucher(notify_data, auth)
  248. except Exception as e:
  249. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  250. return False
  251. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  252. """处理凭证"""
  253. log.info(
  254. f"凭证变动: voucher_id={data.voucher_id}, "
  255. f"enterprise_id={data.enterprise_id}, "
  256. f"action={data.action}"
  257. )
  258. # 4.1: 调用支付宝查询账单及凭证详情
  259. try:
  260. # 使用 pay_no 或 voucher_id 查询,这里需要确定查询参数
  261. # 由于凭证变动通知中没有直接提供 pay_no,我们先尝试通过 voucher_id 查询
  262. detail = await self._query_bill_detail_by_voucher(data.voucher_id, auth)
  263. except Exception as e:
  264. log.warning(f"查询凭证详情失败: {e}")
  265. return True
  266. # 4.3: 更新凭证信息
  267. if detail:
  268. await self._update_voucher_info(detail, auth)
  269. return True
  270. async def _query_bill_detail_by_voucher(self, voucher_id: str, auth: AuthSchema) -> dict | None:
  271. """通过凭证ID查询账单详情"""
  272. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  273. AlipayCommerceEcConsumeDetailQueryRequest,
  274. )
  275. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  276. AlipayCommerceEcConsumeDetailQueryResponse,
  277. )
  278. # 注意:consume.detail.query 需要 pay_no 参数
  279. # 如果没有 pay_no,可以考虑通过其他方式获取
  280. # 这里先返回空,实际实现时需要根据业务场景补充
  281. log.warning(f"凭证查询需要 pay_no,当前仅收到 voucher_id={voucher_id}")
  282. return None
  283. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  284. """更新凭证信息"""
  285. voucher_crud = BillVoucherCRUD(auth)
  286. voucher_list = detail.get("voucher_list")
  287. if not voucher_list:
  288. return
  289. pay_no = detail.get("pay_no")
  290. for voucher in voucher_list:
  291. voucher_id = voucher.get("voucher_id")
  292. if not voucher_id:
  293. continue
  294. voucher_data = {
  295. "pay_no": pay_no,
  296. "voucher_type": voucher.get("voucher_type"),
  297. "voucher_status": voucher.get("voucher_status"),
  298. "invoice_code": voucher.get("invoice_code"),
  299. "invoice_no": voucher.get("invoice_no"),
  300. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  301. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  302. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  303. "check_code": voucher.get("check_code"),
  304. "pdf_url": voucher.get("pdf_url"),
  305. }
  306. await voucher_crud.create_or_update(voucher_id, voucher_data)
  307. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")