from datetime import datetime from decimal import Decimal from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id from .enums import QuotaStatusEnum from .schema import ( ExpenseQuotaCreateSchema, ExpenseQuotaDeleteSchema, ExpenseQuotaModifySchema, ExpenseQuotaQueryOutSchema, ExpenseQuotaQuerySchema, IssueBatchCancelOutSchema, IssueBatchCancelSchema, IssueBatchCreateOutSchema, IssueBatchCreateSchema, IssueBatchListOutSchema, IssueBatchRecordsQueryOutSchema, IssueBatchRecordsQuerySchema, IssueQuotaCheckFailedItem, IssueRecordInfoItem, QuotaCreateSchema, QuotaDetailInfoSchema, QuotaListOutSchema, QuotaOperationOutSchema, QuotaOutSchema, QuotaUpdateSchema, ) from .crud import IssueBatchCRUD, QuotaCRUD from .model import IssueBatchModel class QuotaService: """额度服务层""" @classmethod async def create_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema ) -> QuotaOperationOutSchema: """ 创建余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.create """ crud = QuotaCRUD(auth) out_biz_no = data.outer_source_id or str(get_snowflake_id()) try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import ( AlipayEbppInvoiceExpensecontrolQuotaCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import ( AlipayEbppInvoiceExpensecontrolQuotaCreateModel, ) from alipay.aop.api.domain.IssueQuotaTarget import ( IssueQuotaTarget, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import ( AlipayEbppInvoiceExpensecontrolQuotaCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel() model.target_type = data.target_type model.target_id = data.target_id model.enterprise_id = data.enterprise_id model.outer_source_id = out_biz_no model.quota_type = data.quota_type or "CAP" model.share_mode = data.share_mode or "0" if data.effective_start_date: model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S") if data.effective_end_date: model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S") if data.issue_name: model.issue_name = data.issue_name if data.issue_desc: model.issue_desc = data.issue_desc if data.issue_quota_target_list: target_list = [] for item in data.issue_quota_target_list: target = IssueQuotaTarget() target.owner_type = item.owner_type target.owner_id = item.owner_id target.quota = item.quota if item.amount is not None: target.amount = item.amount target_list.append(target) model.issue_quota_target_list = target_list request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="创建余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建余额/点券失败: {result.msg}") quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if result.quota_id: quota_data["quota_id"] = result.quota_id quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id) @classmethod async def create_quota_service( cls, auth: AuthSchema, data: QuotaCreateSchema ) -> QuotaOperationOutSchema: """创建额度""" crud = QuotaCRUD(auth) out_biz_no = str(get_snowflake_id()) quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if quota_data.get("available_amount") is None: quota_data["available_amount"] = quota_data.get("total_amount", 0) quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id) @classmethod async def query_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema ) -> ExpenseQuotaQueryOutSchema: """ 查询余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.query """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( AlipayEbppInvoiceExpensecontrolQuotaQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() model.owner_type = data.owner_type model.page_size = data.page_size model.page_num = data.page_num if data.target_type: model.target_type = data.target_type if data.target_id: model.target_id = data.target_id if data.owner_id: model.owner_id = data.owner_id if data.owner_open_id: model.owner_open_id = data.owner_open_id if data.enterprise_id: model.enterprise_id = data.enterprise_id if data.quota_id_list: model.quota_id_list = data.quota_id_list if data.quota_type: model.quota_type = data.quota_type request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询余额/点券失败: {result.msg}") return ExpenseQuotaQueryOutSchema( page_num=result.page_num or data.page_num, page_size=result.page_size or data.page_size, total_page_count=result.total_page_count or 0, ) @classmethod async def modify_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema ) -> QuotaOperationOutSchema: """ 修改余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.modify """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import ( AlipayEbppInvoiceExpensecontrolQuotaModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import ( AlipayEbppInvoiceExpensecontrolQuotaModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import ( AlipayEbppInvoiceExpensecontrolQuotaModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel() model.quota_id = data.quota_id model.action = data.action model.outer_source_id = data.outer_source_id model.enterprise_id = data.enterprise_id if data.amount is not None: model.amount = str(data.amount) if data.share_mode: model.share_mode = data.share_mode request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="修改余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"修改余额/点券失败: {result.msg}") return QuotaOperationOutSchema( out_biz_no=out_biz_no, quota_id=data.quota_id, result=result.success, ) @classmethod async def delete_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema ) -> QuotaOperationOutSchema: """ 删除额度 调用: alipay.ebpp.invoice.expensecontrol.quota.delete """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel() model.enterprise_id = data.enterprise_id if data.quota_id: model.quota_id = data.quota_id if data.issue_batch_id: model.issue_batch_id = data.issue_batch_id request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="删除额度失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除额度失败: {result.msg}") await crud.delete(id=quota.id) return QuotaOperationOutSchema(out_biz_no=out_biz_no) # ======================== # 手工批量发放额度 # ======================== @classmethod async def issue_batch_create_service( cls, auth: AuthSchema, data: IssueBatchCreateSchema ) -> IssueBatchCreateOutSchema: """ 手工批量发放额度 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel, ) from alipay.aop.api.domain.IssueTargetInfoContent import ( IssueTargetInfoContent, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel() model.enterprise_id = data.enterprise_id model.issue_name = data.issue_name model.quota_type = data.quota_type model.effective_start_date = data.effective_start_date model.effective_end_date = data.effective_end_date model.institution_id = data.institution_id model.batch_no = data.batch_no model.share_mode = data.share_mode if data.issue_desc: model.issue_desc = data.issue_desc if data.issue_target_info_list: target_list = [] for item in data.issue_target_info_list: target = IssueTargetInfoContent() target.issue_quota = item.issue_quota if item.owner_open_id: target.owner_open_id = item.owner_open_id if item.owner_id: target.owner_id = item.owner_id if item.user_name: target.user_name = item.user_name if item.owner_type: target.owner_type = item.owner_type target_list.append(target) model.issue_target_info_list = target_list request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="手工批量发放额度失败: 无响应") result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"手工批量发放额度失败: {result.msg}") # 保存批次记录到本地 try: issue_batch_crud = IssueBatchCRUD(auth) total_amount = Decimal("0") if data.issue_target_info_list: for item in data.issue_target_info_list: try: total_amount += Decimal(item.issue_quota) except Exception: pass batch_data = { "issue_batch_id": result.issue_batch_id, "batch_no": data.batch_no, "institution_id": data.institution_id, "issue_name": data.issue_name, "quota_type": data.quota_type, "share_mode": data.share_mode, "total_count": len(data.issue_target_info_list or []), "total_amount": total_amount, "status": "ACTIVE", "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"), "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"), "issue_desc": data.issue_desc, } await issue_batch_crud.create(batch_data) except Exception as e: log.warning(f"保存发放批次记录失败(不影响发放): {e}") # 组装校验失败列表 failed_list = None if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list: failed_list = [] for f in result.issue_quota_check_failed_list: failed_list.append(IssueQuotaCheckFailedItem( user_name=getattr(f, 'user_name', None), owner_type=getattr(f, 'owner_type', None), owner_id=getattr(f, 'owner_id', None), owner_open_id=getattr(f, 'owner_open_id', None), issue_quota=getattr(f, 'issue_quota', None), message=getattr(f, 'message', None), result=getattr(f, 'result', None), )) return IssueBatchCreateOutSchema( issue_batch_id=result.issue_batch_id, issue_quota_check_failed_list=failed_list, ) @classmethod async def issue_batch_cancel_service( cls, auth: AuthSchema, data: IssueBatchCancelSchema ) -> IssueBatchCancelOutSchema: """ 作废手工发放批次 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel() model.enterprise_id = data.enterprise_id model.institution_id = data.institution_id model.issue_batch_id = data.issue_batch_id request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="作废手工发放批次失败: 无响应") result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"作废手工发放批次失败: {result.msg}") # 更新本地批次记录状态 try: issue_batch_crud = IssueBatchCRUD(auth) batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id) if batch: setattr(batch, 'status', 'CANCELLED') await issue_batch_crud.update(batch.id, {"status": "CANCELLED"}) except Exception as e: log.warning(f"更新批次本地状态失败(不影响作废): {e}") return IssueBatchCancelOutSchema( result=getattr(result, 'result', False), ) @classmethod async def issue_batch_records_query_service( cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema ) -> IssueBatchRecordsQueryOutSchema: """ 查询手工发放发放明细 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel() model.enterprise_id = data.enterprise_id model.institution_id = data.institution_id model.issue_batch_id = data.issue_batch_id model.page_size = data.page_size model.page_num = data.page_num request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询手工发放明细失败: 无响应") result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询手工发放明细失败: {result.msg}") record_list = None if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list: record_list = [] for r in result.issue_record_info_list: record_list.append(IssueRecordInfoItem( quota_id=getattr(r, 'quota_id', None), issue_quota=getattr(r, 'issue_quota', None), issue_status=getattr(r, 'issue_status', None), owner_type=getattr(r, 'owner_type', None), owner_id=getattr(r, 'owner_id', None), owner_open_id=getattr(r, 'owner_open_id', None), user_name=getattr(r, 'user_name', None), currency=getattr(r, 'currency', None), )) return IssueBatchRecordsQueryOutSchema( page_num=getattr(result, 'page_num', data.page_num), page_size=getattr(result, 'page_size', data.page_size), total_page_count=getattr(result, 'total_page_count', "0"), issue_record_info_list=record_list, ) @classmethod async def list_batch_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """分页查询手工发放批次列表(本地DB)""" crud = IssueBatchCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=IssueBatchListOutSchema, ) @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: crud = QuotaCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=QuotaListOutSchema, )