bill_handler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. from datetime import datetime
  2. from decimal import Decimal
  3. from redis.asyncio import Redis
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from app.core.logger import log
  6. from app.plugin.module_payment.account.service import AccountService
  7. from app.core.alipay import AlipayClient
  8. from ..schemas import ConsumeChangeContent, VoucherChangeContent
  9. from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
  10. from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
  11. from .base_handler import BaseHandler
  12. from ...openapi.service import OpenTransferService
  13. class BillHandler(BaseHandler[dict]):
  14. """账单变动通知处理器"""
  15. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  16. """
  17. 处理账单变动通知
  18. 流程:
  19. 1. 保存账单基础数据
  20. 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  21. 3. 保存账单和凭证详情数据
  22. """
  23. try:
  24. notify_data = ConsumeChangeContent(**content)
  25. except Exception as e:
  26. log.error(f"解析账单通知内容失败: {e}")
  27. return False
  28. try:
  29. return await self._process_bill(notify_data, auth)
  30. except Exception as e:
  31. log.error(f"处理账单变动通知异常: {e}", exc_info=True)
  32. return False
  33. async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
  34. """处理账单"""
  35. log.info(
  36. f"账单变动: pay_no={data.pay_no}, "
  37. f"enterprise_id={data.enterprise_id}, "
  38. f"consume_amount={data.consume_amount}, "
  39. f"consume_type={data.consume_type}"
  40. )
  41. # 1. 保存账单基础数据
  42. # await self._save_bill_base(data, auth)
  43. # 2. 调用支付宝查询详情
  44. # try:
  45. # detail = await self._query_bill_detail(data.pay_no, auth)
  46. # except Exception as e:
  47. # log.warning(f"查询账单详情失败: {e}")
  48. # return True
  49. # 3. 保存账单和凭证详情数据
  50. # if detail:
  51. # await self._save_bill_detail(detail, auth)
  52. # 处理转账类型
  53. if data.consume_type == "TRANSFER":
  54. await AccountService.update_transfer_status_service(
  55. auth, data.pay_no, "SUCCESS", data.model_dump(exclude_none=True)
  56. )
  57. await OpenTransferService.open_return_service(auth, data.pay_no)
  58. return True
  59. async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
  60. """保存账单基础数据"""
  61. bill_crud = BillCRUD(auth)
  62. bill_data = {
  63. "enterprise_id": data.enterprise_id,
  64. "account_id": data.account_id,
  65. "employee_id": data.employee_id,
  66. "consume_type": data.consume_type,
  67. "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
  68. "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
  69. "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
  70. "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
  71. "notify_reason": data.notify_reason,
  72. "notify_msg": data.notify_msg,
  73. "related_pay_no": data.related_pay_no,
  74. "expense_rule_group_id": data.expense_rule_group_id,
  75. "expense_scene_code": data.expense_scene_code,
  76. "expense_type": data.expense_type,
  77. "status": "NEW",
  78. }
  79. await bill_crud.create_or_update(data.pay_no, bill_data)
  80. log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
  81. async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
  82. """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
  83. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  84. AlipayCommerceEcConsumeDetailQueryRequest,
  85. )
  86. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  87. AlipayCommerceEcConsumeDetailQueryResponse,
  88. )
  89. from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
  90. model = AlipayCommerceEcConsumeDetailQueryModel()
  91. model.pay_no = pay_no
  92. model.enterprise_id = enterprise_id
  93. request = AlipayCommerceEcConsumeDetailQueryRequest()
  94. request.biz_model = model
  95. client = AlipayClient.get_client()
  96. response = client.execute(request)
  97. if not response:
  98. log.error("查询账单详情失败: 无响应")
  99. return None
  100. result = AlipayCommerceEcConsumeDetailQueryResponse()
  101. result.parse_response_content(response)
  102. if not result.is_success():
  103. log.error(f"查询账单详情失败: {result.msg}")
  104. return None
  105. # 解析响应为字典
  106. return {
  107. "pay_no": getattr(result.consume_info, "pay_no", None),
  108. "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
  109. "employee_id": getattr(result.consume_info, "employee_id", None),
  110. "employee_name": getattr(result.consume_info, "employee_name", None),
  111. "consume_amount": getattr(result, "consume_amount", None),
  112. "peer_pay_amount": getattr(result, "peer_pay_amount", None),
  113. "employee_pay_amount": getattr(result, "employee_pay_amount", None),
  114. "order_info": self._parse_order_info(result),
  115. "voucher_list": self._parse_voucher_list(result),
  116. }
  117. def _parse_order_info(self, result) -> dict | None:
  118. """解析订单信息"""
  119. order_info = getattr(result, "order_info", None)
  120. if not order_info:
  121. return None
  122. return {
  123. "order_no": getattr(order_info, "order_no", None),
  124. "trade_no": getattr(order_info, "trade_no", None),
  125. "product_code": getattr(order_info, "product_code", None),
  126. "order_title": getattr(order_info, "order_title", None),
  127. "order_amount": getattr(order_info, "order_amount", None),
  128. "order_status": getattr(order_info, "order_status", None),
  129. "merchant_name": getattr(order_info, "merchant_name", None),
  130. "merchant_id": getattr(order_info, "merchant_id", None),
  131. "shop_name": getattr(order_info, "shop_name", None),
  132. "gmt_payment": getattr(order_info, "gmt_payment", None),
  133. "fund_channel": getattr(order_info, "fund_channel", None),
  134. }
  135. def _parse_voucher_list(self, result) -> list | None:
  136. """解析凭证列表"""
  137. voucher_list = getattr(result, "voucher_list", None)
  138. if not voucher_list:
  139. return None
  140. vouchers = []
  141. for voucher in voucher_list:
  142. vouchers.append({
  143. "voucher_id": getattr(voucher, "voucher_id", None),
  144. "voucher_type": getattr(voucher, "voucher_type", None),
  145. "voucher_status": getattr(voucher, "voucher_status", None),
  146. "invoice_code": getattr(voucher, "invoice_code", None),
  147. "invoice_no": getattr(voucher, "invoice_no", None),
  148. "invoice_amount": getattr(voucher, "invoice_amount", None),
  149. "tax_amount": getattr(voucher, "tax_amount", None),
  150. "issue_date": getattr(voucher, "issue_date", None),
  151. "check_code": getattr(voucher, "check_code", None),
  152. "pdf_url": getattr(voucher, "pdf_url", None),
  153. })
  154. return vouchers
  155. async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
  156. """保存账单详情和凭证数据"""
  157. bill_crud = BillCRUD(auth)
  158. pay_no = detail.get("pay_no")
  159. if not pay_no:
  160. return
  161. # 更新账单详情
  162. await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
  163. # 保存订单详情
  164. order_info = detail.get("order_info")
  165. if order_info:
  166. await self._save_order_detail(pay_no, order_info, auth)
  167. # 保存凭证列表
  168. voucher_list = detail.get("voucher_list")
  169. if voucher_list:
  170. await self._save_voucher_list(pay_no, voucher_list, auth)
  171. log.info(f"保存账单详情成功: pay_no={pay_no}")
  172. async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
  173. """保存订单详情"""
  174. order_crud = BillOrderCRUD(auth)
  175. order_no = order_info.get("order_no")
  176. if not order_no:
  177. return
  178. order_data = {
  179. "pay_no": pay_no,
  180. "trade_no": order_info.get("trade_no"),
  181. "product_code": order_info.get("product_code"),
  182. "order_title": order_info.get("order_title"),
  183. "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
  184. "order_status": order_info.get("order_status"),
  185. "merchant_name": order_info.get("merchant_name"),
  186. "merchant_id": order_info.get("merchant_id"),
  187. "shop_name": order_info.get("shop_name"),
  188. "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
  189. "fund_channel": order_info.get("fund_channel"),
  190. }
  191. await order_crud.create_or_update(order_no, order_data)
  192. async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
  193. """保存凭证列表"""
  194. voucher_crud = BillVoucherCRUD(auth)
  195. for voucher in voucher_list:
  196. voucher_id = voucher.get("voucher_id")
  197. if not voucher_id:
  198. continue
  199. voucher_data = {
  200. "pay_no": pay_no,
  201. "voucher_type": voucher.get("voucher_type"),
  202. "voucher_status": voucher.get("voucher_status"),
  203. "invoice_code": voucher.get("invoice_code"),
  204. "invoice_no": voucher.get("invoice_no"),
  205. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  206. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  207. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  208. "check_code": voucher.get("check_code"),
  209. "pdf_url": voucher.get("pdf_url"),
  210. }
  211. await voucher_crud.create_or_update(voucher_id, voucher_data)
  212. class VoucherHandler(BaseHandler[dict]):
  213. """凭证变动通知处理器"""
  214. async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
  215. """
  216. 处理凭证变动通知
  217. 流程:
  218. 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
  219. 2. 更新凭证信息
  220. """
  221. try:
  222. notify_data = VoucherChangeContent(**content)
  223. except Exception as e:
  224. log.error(f"解析凭证通知内容失败: {e}")
  225. return False
  226. action = notify_data.action
  227. voucher_id = notify_data.voucher_id
  228. log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
  229. try:
  230. return await self._process_voucher(notify_data, auth)
  231. except Exception as e:
  232. log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
  233. return False
  234. async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
  235. """处理凭证"""
  236. log.info(
  237. f"凭证变动: voucher_id={data.voucher_id}, "
  238. f"enterprise_id={data.enterprise_id}, "
  239. f"action={data.action}"
  240. )
  241. # 4.1: 调用支付宝查询账单及凭证详情
  242. try:
  243. # 使用 pay_no 或 voucher_id 查询,这里需要确定查询参数
  244. # 由于凭证变动通知中没有直接提供 pay_no,我们先尝试通过 voucher_id 查询
  245. detail = await self._query_bill_detail_by_voucher(data.voucher_id, auth)
  246. except Exception as e:
  247. log.warning(f"查询凭证详情失败: {e}")
  248. return True
  249. # 4.3: 更新凭证信息
  250. if detail:
  251. await self._update_voucher_info(detail, auth)
  252. return True
  253. async def _query_bill_detail_by_voucher(self, voucher_id: str, auth: AuthSchema) -> dict | None:
  254. """通过凭证ID查询账单详情"""
  255. from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
  256. AlipayCommerceEcConsumeDetailQueryRequest,
  257. )
  258. from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
  259. AlipayCommerceEcConsumeDetailQueryResponse,
  260. )
  261. # 注意:consume.detail.query 需要 pay_no 参数
  262. # 如果没有 pay_no,可以考虑通过其他方式获取
  263. # 这里先返回空,实际实现时需要根据业务场景补充
  264. log.warning(f"凭证查询需要 pay_no,当前仅收到 voucher_id={voucher_id}")
  265. return None
  266. async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
  267. """更新凭证信息"""
  268. voucher_crud = BillVoucherCRUD(auth)
  269. voucher_list = detail.get("voucher_list")
  270. if not voucher_list:
  271. return
  272. pay_no = detail.get("pay_no")
  273. for voucher in voucher_list:
  274. voucher_id = voucher.get("voucher_id")
  275. if not voucher_id:
  276. continue
  277. voucher_data = {
  278. "pay_no": pay_no,
  279. "voucher_type": voucher.get("voucher_type"),
  280. "voucher_status": voucher.get("voucher_status"),
  281. "invoice_code": voucher.get("invoice_code"),
  282. "invoice_no": voucher.get("invoice_no"),
  283. "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
  284. "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
  285. "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
  286. "check_code": voucher.get("check_code"),
  287. "pdf_url": voucher.get("pdf_url"),
  288. }
  289. await voucher_crud.create_or_update(voucher_id, voucher_data)
  290. log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")