from datetime import datetime from decimal import Decimal from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.logger import log from app.plugin.module_payment.account.service import AccountService from app.core.alipay import AlipayClient from ..schemas import ConsumeChangeContent, VoucherChangeContent from ..model import PayBillModel, PayBillOrderModel, PayBillVoucherModel from ..crud import BillCRUD, BillOrderCRUD, BillVoucherCRUD from .base_handler import BaseHandler from ...openapi.service import OpenTransferService class BillHandler(BaseHandler[dict]): """账单变动通知处理器""" async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool: """ 处理账单变动通知 流程: 1. 保存账单基础数据 2. 调用 alipay.commerce.ec.consume.detail.query 查询详情 3. 保存账单和凭证详情数据 """ try: notify_data = ConsumeChangeContent(**content) except Exception as e: log.error(f"解析账单通知内容失败: {e}") return False try: return await self._process_bill(notify_data, auth) except Exception as e: log.error(f"处理账单变动通知异常: {e}", exc_info=True) return False async def _process_bill(self, data: ConsumeChangeContent, auth: AuthSchema) -> bool: """处理账单变动通知 支付宝文档: alipay.commerce.ec.consume.change.notify consume_type 标准枚举: CONSUME(消费), REFUND(退款) notify_reason 标准枚举: COLLECT(归集), SUMMARY_SUCCESS(汇总成功), ORDER_COMPLETE(完结) 注: TRANSFER(转账) 为支付宝企业码扩展值,不在标准文档枚举中 """ log.info( f"账单变动: pay_no={data.pay_no}, " f"enterprise_id={data.enterprise_id}, " f"consume_amount={data.consume_amount}, " f"consume_type={data.consume_type}, " f"notify_reason={data.notify_reason}" ) # ========== 标准消费/退款通知:落库 ========== if data.consume_type in ("CONSUME", "REFUND"): await self._save_bill_base(data, auth) try: detail = await self._query_bill_detail(data.pay_no, data.enterprise_id, auth) except Exception as e: log.warning(f"查询账单详情失败(不影响主流程): {e}") detail = None if detail: await self._save_bill_detail(detail, auth) # 同步更新本地额度 try: await self._sync_expense_quota(data, auth) except Exception as e: log.warning(f"同步额度失败(不影响主流程): {e}") # ========== 转账通知(企业码扩展) ========== elif data.consume_type == "TRANSFER": from app.plugin.module_payment.account.enums import TransferStatusEnum reason = (data.notify_reason or "").upper() if "SUCCESS" in reason: status = TransferStatusEnum.SUCCESS.value await AccountService.update_transfer_status_service( auth, data.pay_no, status, data.model_dump(exclude_none=True) ) elif "FAIL" in reason: status = TransferStatusEnum.FAIL.value await AccountService.update_transfer_status_service( auth, data.pay_no, status, data.model_dump(exclude_none=True) ) else: log.warning(f"转账状态无法判断,跳过更新: pay_no={data.pay_no}, notify_reason={data.notify_reason}") # 无论是否更新状态,都回调商户通知状态变化 await OpenTransferService.open_return_service(auth, data.pay_no) else: log.info(f"未知账单类型: consume_type={data.consume_type}, pay_no={data.pay_no}") return True async def _save_bill_base(self, data: ConsumeChangeContent, auth: AuthSchema) -> None: """保存账单基础数据""" bill_crud = BillCRUD(auth) bill_data = { "enterprise_id": data.enterprise_id, "account_id": data.account_id, "employee_id": data.employee_id, "consume_type": data.consume_type, "consume_amount": Decimal(data.consume_amount) if data.consume_amount else None, "gmt_biz_create": datetime.strptime(data.gmt_biz_create, "%Y-%m-%d %H:%M:%S") if data.gmt_biz_create else None, "gmt_recieve_pay": datetime.strptime(data.gmt_recieve_pay, "%Y-%m-%d %H:%M:%S") if data.gmt_recieve_pay else None, "peer_pay_amount": Decimal(data.peer_pay_amount) if data.peer_pay_amount else None, "notify_reason": data.notify_reason, "notify_msg": data.notify_msg, "related_pay_no": data.related_pay_no, "expense_rule_group_id": data.expense_rule_group_id, "expense_scene_code": data.expense_scene_code, "expense_type": data.expense_type, "status": "NEW", } await bill_crud.create_or_update(data.pay_no, bill_data) log.info(f"保存账单基础数据成功: pay_no={data.pay_no}") async def _sync_expense_quota(self, data: ConsumeChangeContent, auth: AuthSchema) -> None: """消费/退款时同步更新本地额度(pay_expense_quota) 通过 expense_rule_group_id → pay_expense_rule.rule_id 找到 institution_id, 然后按 employee_id + institution_id 更新对应额度的可用金额。 """ if not data.expense_rule_group_id: return expense_amount = Decimal(data.consume_amount) if data.consume_amount else Decimal("0") if expense_amount <= 0: return # 查使用规则 → 找到制度ID from app.plugin.module_payment.expense.rule.model import ExpenseRuleModel from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from sqlalchemy import select, update as sa_update rule_stmt = select(ExpenseRuleModel).where( ExpenseRuleModel.rule_id == data.expense_rule_group_id ) rule_result = await auth.db.execute(rule_stmt) rule = rule_result.scalar_one_or_none() if not rule: log.debug(f"未找到对应使用规则: rule_id={data.expense_rule_group_id}") return institution_id = rule.institution_id if not institution_id: return # 查该员工在该制度下的额度 quota_stmt = select(QuotaModel).where( QuotaModel.employee_id == data.employee_id, QuotaModel.institution_id == institution_id, ) quota_result = await auth.db.execute(quota_stmt) quota = quota_result.scalar_one_or_none() if not quota: log.debug(f"未找到额度记录: employee_id={data.employee_id}, institution_id={institution_id}") return # 更新可用额度 if data.consume_type == "REFUND": # 退款:增加可用额度 new_available = (quota.available_amount or Decimal("0")) + expense_amount else: # 消费:减少可用额度 new_available = (quota.available_amount or Decimal("0")) - expense_amount if new_available < 0: new_available = Decimal("0") new_status = QuotaStatusEnum.QUOTA_EXHAUSTED.value if new_available <= 0 else QuotaStatusEnum.QUOTA_ACTIVE.value upd = ( sa_update(QuotaModel) .where(QuotaModel.id == quota.id) .values(available_amount=new_available, status=new_status) ) await auth.db.execute(upd) await auth.db.flush() log.info( f"消费通知同步额度: employee_id={data.employee_id}, " f"institution_id={institution_id}, " f"consume_type={data.consume_type}, " f"amount={expense_amount}, " f"available_amount={new_available}, status={new_status}" ) async def _query_bill_detail(self, pay_no: str, enterprise_id: str, auth: AuthSchema) -> dict | None: """调用 alipay.commerce.ec.consume.detail.query 查询账单详情""" from alipay.aop.api.request.AlipayCommerceEcConsumeDetailQueryRequest import ( AlipayCommerceEcConsumeDetailQueryRequest, ) from alipay.aop.api.response.AlipayCommerceEcConsumeDetailQueryResponse import ( AlipayCommerceEcConsumeDetailQueryResponse, ) from alipay.aop.api.domain.AlipayCommerceEcConsumeDetailQueryModel import AlipayCommerceEcConsumeDetailQueryModel model = AlipayCommerceEcConsumeDetailQueryModel() model.pay_no = pay_no model.enterprise_id = enterprise_id request = AlipayCommerceEcConsumeDetailQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: log.error("查询账单详情失败: 无响应") return None result = AlipayCommerceEcConsumeDetailQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"查询账单详情失败: {result.msg}") return None # 解析响应为字典 return { "pay_no": getattr(result.consume_info, "pay_no", None), "enterprise_id": getattr(result.consume_info, "enterprise_id", None), "employee_id": getattr(result.consume_info, "employee_id", None), "employee_name": getattr(result.consume_info, "employee_name", None), "consume_amount": getattr(result, "consume_amount", None), "peer_pay_amount": getattr(result, "peer_pay_amount", None), "employee_pay_amount": getattr(result, "employee_pay_amount", None), "order_info": self._parse_order_info(result), "voucher_list": self._parse_voucher_list(result), } def _parse_order_info(self, result) -> dict | None: """解析订单信息""" order_info = getattr(result, "order_info", None) if not order_info: return None return { "order_no": getattr(order_info, "order_no", None), "trade_no": getattr(order_info, "trade_no", None), "product_code": getattr(order_info, "product_code", None), "order_title": getattr(order_info, "order_title", None), "order_amount": getattr(order_info, "order_amount", None), "order_status": getattr(order_info, "order_status", None), "merchant_name": getattr(order_info, "merchant_name", None), "merchant_id": getattr(order_info, "merchant_id", None), "shop_name": getattr(order_info, "shop_name", None), "gmt_payment": getattr(order_info, "gmt_payment", None), "fund_channel": getattr(order_info, "fund_channel", None), } def _parse_voucher_list(self, result) -> list | None: """解析凭证列表""" voucher_list = getattr(result, "voucher_list", None) if not voucher_list: return None vouchers = [] for voucher in voucher_list: vouchers.append({ "voucher_id": getattr(voucher, "voucher_id", None), "voucher_type": getattr(voucher, "voucher_type", None), "voucher_status": getattr(voucher, "voucher_status", None), "invoice_code": getattr(voucher, "invoice_code", None), "invoice_no": getattr(voucher, "invoice_no", None), "invoice_amount": getattr(voucher, "invoice_amount", None), "tax_amount": getattr(voucher, "tax_amount", None), "issue_date": getattr(voucher, "issue_date", None), "check_code": getattr(voucher, "check_code", None), "pdf_url": getattr(voucher, "pdf_url", None), }) return vouchers async def _save_bill_detail(self, detail: dict, auth: AuthSchema) -> None: """保存账单详情和凭证数据""" bill_crud = BillCRUD(auth) pay_no = detail.get("pay_no") if not pay_no: return # 更新账单详情 await bill_crud.update_by_pay_no(pay_no, {"status": "PROCESSED", "ext_infos": detail}) # 保存订单详情 order_info = detail.get("order_info") if order_info: await self._save_order_detail(pay_no, order_info, auth) # 保存凭证列表 voucher_list = detail.get("voucher_list") if voucher_list: await self._save_voucher_list(pay_no, voucher_list, auth) log.info(f"保存账单详情成功: pay_no={pay_no}") async def _save_order_detail(self, pay_no: str, order_info: dict, auth: AuthSchema) -> None: """保存订单详情""" order_crud = BillOrderCRUD(auth) order_no = order_info.get("order_no") if not order_no: return order_data = { "pay_no": pay_no, "trade_no": order_info.get("trade_no"), "product_code": order_info.get("product_code"), "order_title": order_info.get("order_title"), "order_amount": Decimal(order_info.get("order_amount")) if order_info.get("order_amount") else None, "order_status": order_info.get("order_status"), "merchant_name": order_info.get("merchant_name"), "merchant_id": order_info.get("merchant_id"), "shop_name": order_info.get("shop_name"), "gmt_payment": datetime.strptime(order_info.get("gmt_payment"), "%Y-%m-%d %H:%M:%S") if order_info.get("gmt_payment") else None, "fund_channel": order_info.get("fund_channel"), } await order_crud.create_or_update(order_no, order_data) async def _save_voucher_list(self, pay_no: str, voucher_list: list, auth: AuthSchema) -> None: """保存凭证列表""" voucher_crud = BillVoucherCRUD(auth) for voucher in voucher_list: voucher_id = voucher.get("voucher_id") if not voucher_id: continue voucher_data = { "pay_no": pay_no, "voucher_type": voucher.get("voucher_type"), "voucher_status": voucher.get("voucher_status"), "invoice_code": voucher.get("invoice_code"), "invoice_no": voucher.get("invoice_no"), "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None, "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None, "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None, "check_code": voucher.get("check_code"), "pdf_url": voucher.get("pdf_url"), } await voucher_crud.create_or_update(voucher_id, voucher_data) class VoucherHandler(BaseHandler[dict]): """凭证变动通知处理器""" async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool: """ 处理凭证变动通知 流程: 1. 调用 alipay.commerce.ec.consume.detail.query 查询详情 2. 更新凭证信息 """ try: notify_data = VoucherChangeContent(**content) except Exception as e: log.error(f"解析凭证通知内容失败: {e}") return False action = notify_data.action voucher_id = notify_data.voucher_id log.info(f"处理凭证变动通知: action={action}, voucher_id={voucher_id}") try: return await self._process_voucher(notify_data, auth) except Exception as e: log.error(f"处理凭证变动通知异常: {e}", exc_info=True) return False async def _process_voucher(self, data: VoucherChangeContent, auth: AuthSchema) -> bool: """处理凭证变动通知 通知中已携带 pay_no 字段,直接用 pay_no 查询账单详情并更新凭证信息。 """ log.info( f"凭证变动: pay_no={data.pay_no}, voucher_type={data.voucher_type}, " f"enterprise_id={data.enterprise_id}, notify_reason={data.notify_reason}" ) if not data.pay_no: log.warning(f"凭证变动通知缺少 pay_no: voucher_type={data.voucher_type}") return True try: detail = await self._query_bill_detail(data.pay_no, data.enterprise_id or "", auth) except Exception as e: log.warning(f"查询账单详情失败(不影响主流程): {e}") return True if detail: await self._update_voucher_info(detail, auth) return True async def _update_voucher_info(self, detail: dict, auth: AuthSchema) -> None: """更新凭证信息""" voucher_crud = BillVoucherCRUD(auth) voucher_list = detail.get("voucher_list") if not voucher_list: return pay_no = detail.get("pay_no") for voucher in voucher_list: voucher_id = voucher.get("voucher_id") if not voucher_id: continue voucher_data = { "pay_no": pay_no, "voucher_type": voucher.get("voucher_type"), "voucher_status": voucher.get("voucher_status"), "invoice_code": voucher.get("invoice_code"), "invoice_no": voucher.get("invoice_no"), "invoice_amount": Decimal(voucher.get("invoice_amount")) if voucher.get("invoice_amount") else None, "tax_amount": Decimal(voucher.get("tax_amount")) if voucher.get("tax_amount") else None, "issue_date": datetime.strptime(voucher.get("issue_date"), "%Y-%m-%d") if voucher.get("issue_date") else None, "check_code": voucher.get("check_code"), "pdf_url": voucher.get("pdf_url"), } await voucher_crud.create_or_update(voucher_id, voucher_data) log.info(f"更新凭证信息成功: voucher_count={len(voucher_list)}")