from decimal import Decimal from typing import Optional from redis.asyncio import Redis from app.api.v1.module_system.auth.schema import AuthSchema from app.core.logger import log from app.plugin.module_payment.enterprise.schema import EnterpriseCreateOrUpdateSchema from app.plugin.module_payment.notification.enums import AlipayNotifyMethodEnum from app.plugin.module_payment.enterprise.service import EnterpriseService from .base_handler import BaseHandler from ...points import PointsService from ...points.schema import PointsChangeSchema class AccountHandler(BaseHandler): """资金专户通知处理器""" async def handle(self, method: str, content: dict, auth: AuthSchema, redis: Redis) -> bool: """ 处理资金专户相关通知 支持的通知类型: - alipay.commerce.ec.fund.change.notify (资金变更通知) - alipay.commerce.ec.trans.authorize.notify (转账授权通知) """ log.info(f"开始处理资金专户通知: {method}") if method == AlipayNotifyMethodEnum.FUND_CHANGE_NOTIFY.value: return await self._handle_fund_change(content, auth) elif method == AlipayNotifyMethodEnum.TRANS_AUTHORIZE_NOTIFY.value: return await self._handle_trans_authorize(content, auth) log.warning(f"未知的资金专户通知类型: {method}") return False async def _handle_fund_change(self, content: dict, auth: AuthSchema) -> bool: """ 处理资金变更通知 """ try: # 解析通知内容 # fund_change_info = content.get("fund_change_info", {}) # business_type = fund_change_info.get("business_type") # fund_status = fund_change_info.get("fund_status") # amount = fund_change_info.get("amount") # out_biz_no = fund_change_info.get("out_biz_no") # enterprise_id = fund_change_info.get("enterprise_id") # log.info(f"资金变更通知: 业务类型={business_type}, 状态={fund_status}, 金额={amount}") # # 只处理转账成功的通知 # if business_type == "TRANSFER" and fund_status == "SUCCESS": # # 扣除积分 # deduct_data = PointsChangeSchema( # points=Decimal(amount), # change_type="", # enterprise_id=enterprise_id, # business_id=out_biz_no, # business_type="TRANSFER", # remark=f"转账扣除积分: {amount}" # ) # await PointsService.deduct_points_service(auth, deduct_data) # log.info(f"转账成功,扣除积分: {amount}") return True except Exception as e: log.error(f"处理资金变更通知失败: {e}") return False async def _handle_trans_authorize(self, content: dict, auth: AuthSchema) -> bool: """ 处理转账授权通知 """ try: # 解析通知内容 authorize_status = content.get("status") # NORMAL enterprise_id = content.get("enterprise_id") log.info(f"转账授权通知: 状态={authorize_status}, 企业ID={enterprise_id}") await EnterpriseService.update_service_from_alipay( auth, EnterpriseCreateOrUpdateSchema(remark=authorize_status, enterprise_id=enterprise_id) ) return True except Exception as e: log.error(f"处理转账授权通知失败: {e}") return False