| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- from datetime import datetime
- from decimal import Decimal
- from redis.asyncio import Redis
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.logger import log
- from app.plugin.module_payment.account.service import AccountService
- from app.core.alipay import AlipayClient
- from ..schemas import ConsumeChangeContent, VoucherChangeContent
- from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel
- from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD
- from .base_handler import BaseHandler
- from ...openapi.service import OpenTransferService
- class BillHandler(BaseHandler[dict]):
- """账单变动通知处理器"""
- async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
- """
- 处理账单变动通知
-
- 流程:
- 1. 保存账单基础数据
- 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情
- 3. 保存账单和凭证详情数据
- """
- try:
- notify_data = ConsumeChangeContent(**content)
- except Exception as e:
- log.error(f"解析账单通知内容失败: {e}")
- return False
- try:
- return await self._process_bill(notify_data, auth)
- except Exception as e:
- log.error(f"处理账单变动通知异常: {e}", exc_info=True)
- return False
- async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool:
- """处理账单变动通知
- 支付宝文档: alipay.commerce.ec.consume.change.notify
- consume_type 标准枚举: CONSUME(消费), REFUND(退款)
- notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结)
- 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中
- """
- log.info(
- f"账单变动: pay_no={data.pay_no}, "
- f"enterprise_id={data.enterprise_id}, "
- f"consume_amount={data.consume_amount}, "
- f"consume_type={data.consume_type}, "
- f"notify_reason={data.notify_reason}"
- )
- # ========== 标准消费/退款通知:落库 ==========
- if data.consume_type in ("CONSUME", "REFUND"):
- await self._save_bill_base(data, auth)
- try:
- detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth)
- except Exception as e:
- log.warning(f"查询账单详情失败(不影响主流程): {e}")
- detail = None
- if detail:
- await self._save_bill_detail(detail, auth)
- # 同步更新本地额度
- try:
- await self._sync_expense_quota(data, auth)
- except Exception as e:
- log.warning(f"同步额度失败(不影响主流程): {e}")
- # ========== 转账通知(企业码扩展) ==========
- elif data.consume_type == "TRANSFER":
- await AccountService.update_transfer_status_service(
- auth, data.pay_no, "SUCCESS", data.model_dump(exclude_none=True)
- )
- # 通知到达,清除重试字段(无需再反查)
- await self._clear_transfer_retry(auth, data.pay_no)
- await OpenTransferService.open_return_service(auth, data.pay_no)
- else:
- log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}")
- return True
- async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
- """保存账单基础数据"""
- bill_crud = BillCRUD(auth)
-
- bill_data = {
- "enterprise_id": data.enterprise_id,
- "account_id": data.account_id,
- "employee_id": data.employee_id,
- "consume_type": data.consume_type,
- "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None,
- "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None,
- "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None,
- "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None,
- "notify_reason": data.notify_reason,
- "notify_msg": data.notify_msg,
- "related_pay_no": data.related_pay_no,
- "expense_rule_group_id": data.expense_rule_group_id,
- "expense_scene_code": data.expense_scene_code,
- "expense_type": data.expense_type,
- "status": "NEW",
- }
-
- await bill_crud.create_or_update(data.pay_no, bill_data)
- log.info(f"保存账单基础数据成功: pay_no={data.pay_no}")
- async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None:
- """消费/退款时同步更新本地额度(pay_expense_quota)
- 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id,
- 然后按 employee_id + institution_id 更新对应额度的可用金额。
- """
- if not data.expense_rule_group_id:
- return
- expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0")
- if expense_amount <= 0:
- return
- # 查使用规则 → 找到制度ID
- from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import select, update as sa_update
- rule_stmt = select(ExpenseRuleModel).where(
- ExpenseRuleModel.rule_id == data.expense_rule_group_id
- )
- rule_result = await auth.db.execute(rule_stmt)
- rule = rule_result.scalar_one_or_none()
- if not rule:
- log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}")
- return
- institution_id = rule.institution_id
- if not institution_id:
- return
- # 查该员工在该制度下的额度
- quota_stmt = select(QuotaModel).where(
- QuotaModel.employee_id == data.employee_id,
- QuotaModel.institution_id == institution_id,
- )
- quota_result = await auth.db.execute(quota_stmt)
- quota = quota_result.scalar_one_or_none()
- if not quota:
- log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}")
- return
- # 更新可用额度
- if data.consume_type == "REFUND":
- # 退款:增加可用额度
- new_available = (quota.available_amount or Decimal("0")) + expense_amount
- else:
- # 消费:减少可用额度
- new_available = (quota.available_amount or Decimal("0")) - expense_amount
- if new_available < 0:
- new_available = Decimal("0")
- new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value
- upd = (
- sa_update(QuotaModel)
- .where(QuotaModel.id == quota.id)
- .values(available_amount=new_available, status=new_status)
- )
- await auth.db.execute(upd)
- await auth.db.flush()
- log.info(
- f"消费通知同步额度: employee_id={data.employee_id}, "
- f"institution_id={institution_id}, "
- f"consume_type={data.consume_type}, "
- f"amount={expense_amount}, "
- f"available_amount={new_available}, status={new_status}"
- )
- async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None:
- """调用 alipay.commerce.ec.consume.detail.query 查询账单详情"""
- from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import (
- AlipayCommerceEcConsumeDetailQueryRequest,
- )
- from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import (
- AlipayCommerceEcConsumeDetailQueryResponse,
- )
- from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel
- model = AlipayCommerceEcConsumeDetailQueryModel()
- model.pay_no = pay_no
- model.enterprise_id = enterprise_id
- request = AlipayCommerceEcConsumeDetailQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- log.error("查询账单详情失败: 无响应")
- return None
- result = AlipayCommerceEcConsumeDetailQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"查询账单详情失败: {result.msg}")
- return None
- # 解析响应为字典
- return {
- "pay_no": getattr(result.consume_info, "pay_no", None),
- "enterprise_id": getattr(result.consume_info, "enterprise_id", None),
- "employee_id": getattr(result.consume_info, "employee_id", None),
- "employee_name": getattr(result.consume_info, "employee_name", None),
- "consume_amount": getattr(result, "consume_amount", None),
- "peer_pay_amount": getattr(result, "peer_pay_amount", None),
- "employee_pay_amount": getattr(result, "employee_pay_amount", None),
- "order_info": self._parse_order_info(result),
- "voucher_list": self._parse_voucher_list(result),
- }
- def _parse_order_info(self, result) -> dict | None:
- """解析订单信息"""
- order_info = getattr(result, "order_info", None)
- if not order_info:
- return None
-
- return {
- "order_no": getattr(order_info, "order_no", None),
- "trade_no": getattr(order_info, "trade_no", None),
- "product_code": getattr(order_info, "product_code", None),
- "order_title": getattr(order_info, "order_title", None),
- "order_amount": getattr(order_info, "order_amount", None),
- "order_status": getattr(order_info, "order_status", None),
- "merchant_name": getattr(order_info, "merchant_name", None),
- "merchant_id": getattr(order_info, "merchant_id", None),
- "shop_name": getattr(order_info, "shop_name", None),
- "gmt_payment": getattr(order_info, "gmt_payment", None),
- "fund_channel": getattr(order_info, "fund_channel", None),
- }
- def _parse_voucher_list(self, result) -> list | None:
- """解析凭证列表"""
- voucher_list = getattr(result, "voucher_list", None)
- if not voucher_list:
- return None
-
- vouchers = []
- for voucher in voucher_list:
- vouchers.append({
- "voucher_id": getattr(voucher, "voucher_id", None),
- "voucher_type": getattr(voucher, "voucher_type", None),
- "voucher_status": getattr(voucher, "voucher_status", None),
- "invoice_code": getattr(voucher, "invoice_code", None),
- "invoice_no": getattr(voucher, "invoice_no", None),
- "invoice_amount": getattr(voucher, "invoice_amount", None),
- "tax_amount": getattr(voucher, "tax_amount", None),
- "issue_date": getattr(voucher, "issue_date", None),
- "check_code": getattr(voucher, "check_code", None),
- "pdf_url": getattr(voucher, "pdf_url", None),
- })
- return vouchers
- async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None:
- """保存账单详情和凭证数据"""
- bill_crud = BillCRUD(auth)
-
- pay_no = detail.get("pay_no")
- if not pay_no:
- return
- # 更新账单详情
- await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail})
- # 保存订单详情
- order_info = detail.get("order_info")
- if order_info:
- await self._save_order_detail(pay_no, order_info, auth)
- # 保存凭证列表
- voucher_list = detail.get("voucher_list")
- if voucher_list:
- await self._save_voucher_list(pay_no, voucher_list, auth)
- log.info(f"保存账单详情成功: pay_no={pay_no}")
- async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None:
- """保存订单详情"""
- order_crud = BillOrderCRUD(auth)
-
- order_no = order_info.get("order_no")
- if not order_no:
- return
- order_data = {
- "pay_no": pay_no,
- "trade_no": order_info.get("trade_no"),
- "product_code": order_info.get("product_code"),
- "order_title": order_info.get("order_title"),
- "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None,
- "order_status": order_info.get("order_status"),
- "merchant_name": order_info.get("merchant_name"),
- "merchant_id": order_info.get("merchant_id"),
- "shop_name": order_info.get("shop_name"),
- "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None,
- "fund_channel": order_info.get("fund_channel"),
- }
-
- await order_crud.create_or_update(order_no, order_data)
- async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None:
- """保存凭证列表"""
- voucher_crud = BillVoucherCRUD(auth)
- for voucher in voucher_list:
- voucher_id = voucher.get("voucher_id")
- if not voucher_id:
- continue
- voucher_data = {
- "pay_no": pay_no,
- "voucher_type": voucher.get("voucher_type"),
- "voucher_status": voucher.get("voucher_status"),
- "invoice_code": voucher.get("invoice_code"),
- "invoice_no": voucher.get("invoice_no"),
- "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
- "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
- "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
- "check_code": voucher.get("check_code"),
- "pdf_url": voucher.get("pdf_url"),
- }
-
- await voucher_crud.create_or_update(voucher_id, voucher_data)
- @staticmethod
- async def _clear_transfer_retry(auth: AuthSchema, pay_no: str) -> None:
- """通知到达时清除重试字段,终止反查"""
- from sqlalchemy import update as sa_update
- from app.plugin.module_payment.account.model import TransferModel
- try:
- upd = sa_update(TransferModel).where(
- TransferModel.order_no == pay_no
- ).values(next_retry_at=None, retry_count=4)
- await auth.db.execute(upd)
- except Exception:
- pass
- class VoucherHandler(BaseHandler[dict]):
- """凭证变动通知处理器"""
- async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool:
- """
- 处理凭证变动通知
-
- 流程:
- 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情
- 2. 更新凭证信息
- """
- try:
- notify_data = VoucherChangeContent(**content)
- except Exception as e:
- log.error(f"解析凭证通知内容失败: {e}")
- return False
- action = notify_data.action
- voucher_id = notify_data.voucher_id
- log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}")
- try:
- return await self._process_voucher(notify_data, auth)
- except Exception as e:
- log.error(f"处理凭证变动通知异常: {e}", exc_info=True)
- return False
- async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool:
- """处理凭证变动通知
- 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。
- """
- log.info(
- f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, "
- f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}"
- )
- if not data.pay_no:
- log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}")
- return True
- try:
- detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth)
- except Exception as e:
- log.warning(f"查询账单详情失败(不影响主流程): {e}")
- return True
- if detail:
- await self._update_voucher_info(detail, auth)
- return True
- async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None:
- """更新凭证信息"""
- voucher_crud = BillVoucherCRUD(auth)
-
- voucher_list = detail.get("voucher_list")
- if not voucher_list:
- return
- pay_no = detail.get("pay_no")
-
- for voucher in voucher_list:
- voucher_id = voucher.get("voucher_id")
- if not voucher_id:
- continue
- voucher_data = {
- "pay_no": pay_no,
- "voucher_type": voucher.get("voucher_type"),
- "voucher_status": voucher.get("voucher_status"),
- "invoice_code": voucher.get("invoice_code"),
- "invoice_no": voucher.get("invoice_no"),
- "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None,
- "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None,
- "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None,
- "check_code": voucher.get("check_code"),
- "pdf_url": voucher.get("pdf_url"),
- }
-
- await voucher_crud.create_or_update(voucher_id, voucher_data)
-
- log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")
|