bill_handler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单变动通知
  35. 支付宝文档: alipay.commerce.ec.consume.change.notify
  36. consume_type 标准枚举: CONSUME(消费), REFUND(退款)
  37. notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
  38. 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
  39. """
  40. log.info(
  41. f"账单变动: pay_no={data.pay_no}, "
  42. f"enterprise_id={data.enterprise_id}, "
  43. f"consume_amount={data.consume_amount}, "
  44. f"consume_type={data.consume_type}, "
  45. f"notify_reason={data.notify_reason}"
  46. )
  47. # 1. 保存账单基础数据
  48. # await self._save_bill_base(data, auth)
  49. # 2. 调用支付宝查询详情
  50. # try:
  51. # detail = await self._query_bill_detail(data.pay_no, auth)
  52. # except Exception as e:
  53. # log.warning(f"查询账单详情失败: {e}")
  54. # return True
  55. # 3. 保存账单和凭证详情数据
  56. # if detail:
  57. # await self._save_bill_detail(detail, auth)
  58. # ========== 标准消费通知 ==========
  59. if data.consume_type == "CONSUME":
  60. log.info(f"消费账单通知: pay_no={data.pay_no}, amount={data.consume_amount}, reason={data.notify_reason}")
  61. elif data.consume_type == "REFUND":
  62. log.info(
  63. f"退款账单通知: pay_no={data.pay_no}, "
  64. f"related_pay_no={data.related_pay_no}, "
  65. f"amount={data.consume_amount}"
  66. )
  67. # ========== 转账通知(企业码扩展) ==========
  68. elif data.consume_type == "TRANSFER":
  69. from app.plugin.module_payment.account.enums import TransferStatusEnum
  70. reason = (data.notify_reason or "").upper()
  71. if "SUCCESS" in reason:
  72. status = TransferStatusEnum.SUCCESS.value
  73. elif "FAIL" in reason:
  74. status = TransferStatusEnum.FAIL.value
  75. else:
  76. status = TransferStatusEnum.DEALING.value
  77. log.warning(f"转账状态无法判断: pay_no={data.pay_no}, notify_reason={data.notify_reason}")
  78. await AccountService.update_transfer_status_service(
  79. auth, data.pay_no, status, data.model_dump(exclude_none=True)
  80. )
  81. # 无论成功/失败都回调商户,告知最终结果
  82. await OpenTransferService.open_return_service(auth, data.pay_no)
  83. else:
  84. log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
  85. return True
  86. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  87. """保存账单基础数据"""
  88. bill_crud = BillCRUD(auth)
  89. bill_data = {
  90. "enterprise_id": data.enterprise_id,
  91. "account_id": data.account_id,
  92. "employee_id": data.employee_id,
  93. "consume_type": data.consume_type,
  94. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  95. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  96. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  97. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  98. "notify_reason": data.notify_reason,
  99. "notify_msg": data.notify_msg,
  100. "related_pay_no": data.related_pay_no,
  101. "expense_rule_group_id": data.expense_rule_group_id,
  102. "expense_scene_code": data.expense_scene_code,
  103. "expense_type": data.expense_type,
  104. "status": "NEW",
  105. }
  106. await bill_crud.create_or_update(data.pay_no, bill_data)
  107. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  108. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  109. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  110. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  111. AlipayCommerceEcConsumeDetailQueryRequest,
  112. )
  113. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  114. AlipayCommerceEcConsumeDetailQueryResponse,
  115. )
  116. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  117. model = AlipayCommerceEcConsumeDetailQueryModel()
  118. model.pay_no = pay_no
  119. model.enterprise_id = enterprise_id
  120. request = AlipayCommerceEcConsumeDetailQueryRequest()
  121. request.biz_model = model
  122. client = AlipayClient.get_client()
  123. response = client.execute(request)
  124. if not response:
  125. log.error("查询账单详情失败: 无响应")
  126. return None
  127. result = AlipayCommerceEcConsumeDetailQueryResponse()
  128. result.parse_response_content(response)
  129. if not result.is_success():
  130. log.error(f"查询账单详情失败: {result.msg}")
  131. return None
  132. # 解析响应为字典
  133. return {
  134. "pay_no": getattr(result.consume_info, "pay_no", None),
  135. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  136. "employee_id": getattr(result.consume_info, "employee_id", None),
  137. "employee_name": getattr(result.consume_info, "employee_name", None),
  138. "consume_amount": getattr(result, "consume_amount", None),
  139. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  140. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  141. "order_info": self._parse_order_info(result),
  142. "voucher_list": self._parse_voucher_list(result),
  143. }
  144. def _parse_order_info(self, result) -> dict | None:
  145. """解析订单信息"""
  146. order_info = getattr(result, "order_info", None)
  147. if not order_info:
  148. return None
  149. return {
  150. "order_no": getattr(order_info, "order_no", None),
  151. "trade_no": getattr(order_info, "trade_no", None),
  152. "product_code": getattr(order_info, "product_code", None),
  153. "order_title": getattr(order_info, "order_title", None),
  154. "order_amount": getattr(order_info, "order_amount", None),
  155. "order_status": getattr(order_info, "order_status", None),
  156. "merchant_name": getattr(order_info, "merchant_name", None),
  157. "merchant_id": getattr(order_info, "merchant_id", None),
  158. "shop_name": getattr(order_info, "shop_name", None),
  159. "gmt_payment": getattr(order_info, "gmt_payment", None),
  160. "fund_channel": getattr(order_info, "fund_channel", None),
  161. }
  162. def _parse_voucher_list(self, result) -> list | None:
  163. """解析凭证列表"""
  164. voucher_list = getattr(result, "voucher_list", None)
  165. if not voucher_list:
  166. return None
  167. vouchers = []
  168. for voucher in voucher_list:
  169. vouchers.append({
  170. "voucher_id": getattr(voucher, "voucher_id", None),
  171. "voucher_type": getattr(voucher, "voucher_type", None),
  172. "voucher_status": getattr(voucher, "voucher_status", None),
  173. "invoice_code": getattr(voucher, "invoice_code", None),
  174. "invoice_no": getattr(voucher, "invoice_no", None),
  175. "invoice_amount": getattr(voucher, "invoice_amount", None),
  176. "tax_amount": getattr(voucher, "tax_amount", None),
  177. "issue_date": getattr(voucher, "issue_date", None),
  178. "check_code": getattr(voucher, "check_code", None),
  179. "pdf_url": getattr(voucher, "pdf_url", None),
  180. })
  181. return vouchers
  182. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  183. """保存账单详情和凭证数据"""
  184. bill_crud = BillCRUD(auth)
  185. pay_no = detail.get("pay_no")
  186. if not pay_no:
  187. return
  188. # 更新账单详情
  189. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  190. # 保存订单详情
  191. order_info = detail.get("order_info")
  192. if order_info:
  193. await self._save_order_detail(pay_no, order_info, auth)
  194. # 保存凭证列表
  195. voucher_list = detail.get("voucher_list")
  196. if voucher_list:
  197. await self._save_voucher_list(pay_no, voucher_list, auth)
  198. log.info(f"保存账单详情成功: pay_no={pay_no}")
  199. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  200. """保存订单详情"""
  201. order_crud = BillOrderCRUD(auth)
  202. order_no = order_info.get("order_no")
  203. if not order_no:
  204. return
  205. order_data = {
  206. "pay_no": pay_no,
  207. "trade_no": order_info.get("trade_no"),
  208. "product_code": order_info.get("product_code"),
  209. "order_title": order_info.get("order_title"),
  210. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  211. "order_status": order_info.get("order_status"),
  212. "merchant_name": order_info.get("merchant_name"),
  213. "merchant_id": order_info.get("merchant_id"),
  214. "shop_name": order_info.get("shop_name"),
  215. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  216. "fund_channel": order_info.get("fund_channel"),
  217. }
  218. await order_crud.create_or_update(order_no, order_data)
  219. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  220. """保存凭证列表"""
  221. voucher_crud = BillVoucherCRUD(auth)
  222. for voucher in voucher_list:
  223. voucher_id = voucher.get("voucher_id")
  224. if not voucher_id:
  225. continue
  226. voucher_data = {
  227. "pay_no": pay_no,
  228. "voucher_type": voucher.get("voucher_type"),
  229. "voucher_status": voucher.get("voucher_status"),
  230. "invoice_code": voucher.get("invoice_code"),
  231. "invoice_no": voucher.get("invoice_no"),
  232. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  233. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  234. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  235. "check_code": voucher.get("check_code"),
  236. "pdf_url": voucher.get("pdf_url"),
  237. }
  238. await voucher_crud.create_or_update(voucher_id, voucher_data)
  239. class VoucherHandler(BaseHandler[dict]):
  240. """凭证变动通知处理器"""
  241. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  242. """
  243. 处理凭证变动通知
  244. 流程:
  245. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  246. 2. 更新凭证信息
  247. """
  248. try:
  249. notify_data = VoucherChangeContent(**content)
  250. except Exception as e:
  251. log.error(f"解析凭证通知内容失败: {e}")
  252. return False
  253. action = notify_data.action
  254. voucher_id = notify_data.voucher_id
  255. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  256. try:
  257. return await self._process_voucher(notify_data, auth)
  258. except Exception as e:
  259. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  260. return False
  261. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  262. """处理凭证"""
  263. log.info(
  264. f"凭证变动: voucher_id={data.voucher_id}, "
  265. f"enterprise_id={data.enterprise_id}, "
  266. f"action={data.action}"
  267. )
  268. # 4.1: 调用支付宝查询账单及凭证详情
  269. try:
  270. # 使用 pay_no 或 voucher_id 查询,这里需要确定查询参数
  271. # 由于凭证变动通知中没有直接提供 pay_no,我们先尝试通过 voucher_id 查询
  272. detail = await self._query_bill_detail_by_voucher(data.voucher_id, auth)
  273. except Exception as e:
  274. log.warning(f"查询凭证详情失败: {e}")
  275. return True
  276. # 4.3: 更新凭证信息
  277. if detail:
  278. await self._update_voucher_info(detail, auth)
  279. return True
  280. async def _query_bill_detail_by_voucher(self, voucher_id: str, auth: AuthSchema) -> dict | None:
  281. """通过凭证ID查询账单详情"""
  282. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  283. AlipayCommerceEcConsumeDetailQueryRequest,
  284. )
  285. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  286. AlipayCommerceEcConsumeDetailQueryResponse,
  287. )
  288. # 注意:consume.detail.query 需要 pay_no 参数
  289. # 如果没有 pay_no,可以考虑通过其他方式获取
  290. # 这里先返回空,实际实现时需要根据业务场景补充
  291. log.warning(f"凭证查询需要 pay_no,当前仅收到 voucher_id={voucher_id}")
  292. return None
  293. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  294. """更新凭证信息"""
  295. voucher_crud = BillVoucherCRUD(auth)
  296. voucher_list = detail.get("voucher_list")
  297. if not voucher_list:
  298. return
  299. pay_no = detail.get("pay_no")
  300. for voucher in voucher_list:
  301. voucher_id = voucher.get("voucher_id")
  302. if not voucher_id:
  303. continue
  304. voucher_data = {
  305. "pay_no": pay_no,
  306. "voucher_type": voucher.get("voucher_type"),
  307. "voucher_status": voucher.get("voucher_status"),
  308. "invoice_code": voucher.get("invoice_code"),
  309. "invoice_no": voucher.get("invoice_no"),
  310. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  311. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  312. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  313. "check_code": voucher.get("check_code"),
  314. "pdf_url": voucher.get("pdf_url"),
  315. }
  316. await voucher_crud.create_or_update(voucher_id, voucher_data)
  317. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")