from datetime import datetime from decimal import Decimal from app.api.v1.module_system.auth.schema import AuthSchema from app.core.alipay import AlipayClient from app.core.exceptions import CustomException from app.core.logger import log from app.utils.snowflake import get_snowflake_id from .enums import QuotaStatusEnum from .schema import ( AdjustQuotaSchema, ExpenseQuotaCreateSchema, ExpenseQuotaDeleteSchema, ExpenseQuotaModifySchema, ExpenseQuotaQueryOutSchema, ExpenseQuotaQuerySchema, IssueBatchCancelOutSchema, IssueBatchCancelSchema, IssueBatchCreateOutSchema, IssueBatchCreateSchema, IssueBatchListOutSchema, IssueBatchRecordsQueryOutSchema, IssueBatchRecordsQuerySchema, IssueQuotaCheckFailedItem, IssueRecordInfoItem, QuotaCreateSchema, QuotaDetailInfoSchema, QuotaListOutSchema, QuotaOperationOutSchema, QuotaOutSchema, QuotaUpdateSchema, ) from .crud import IssueBatchCRUD, QuotaCRUD from .model import IssueBatchModel class QuotaService: """额度服务层""" @classmethod async def create_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema ) -> QuotaOperationOutSchema: """ 创建余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.create """ crud = QuotaCRUD(auth) out_biz_no = data.outer_source_id or str(get_snowflake_id()) try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import ( AlipayEbppInvoiceExpensecontrolQuotaCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import ( AlipayEbppInvoiceExpensecontrolQuotaCreateModel, ) from alipay.aop.api.domain.IssueQuotaTarget import ( IssueQuotaTarget, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import ( AlipayEbppInvoiceExpensecontrolQuotaCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel() model.target_type = data.target_type model.target_id = data.target_id model.enterprise_id = data.enterprise_id model.outer_source_id = out_biz_no model.quota_type = data.quota_type or "CAP" model.share_mode = data.share_mode or "0" if data.effective_start_date: model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S") if data.effective_end_date: model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S") if data.issue_name: model.issue_name = data.issue_name if data.issue_desc: model.issue_desc = data.issue_desc if data.issue_quota_target_list: target_list = [] for item in data.issue_quota_target_list: target = IssueQuotaTarget() target.owner_type = item.owner_type target.owner_id = item.owner_id target.quota = item.quota if item.amount is not None: target.amount = item.amount target_list.append(target) model.issue_quota_target_list = target_list request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="创建余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"创建余额/点券失败: {result.msg}") quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if result.quota_id: quota_data["quota_id"] = result.quota_id quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id) @classmethod async def create_quota_service( cls, auth: AuthSchema, data: QuotaCreateSchema ) -> QuotaOperationOutSchema: """创建额度""" crud = QuotaCRUD(auth) out_biz_no = str(get_snowflake_id()) quota_data = data.model_dump(exclude_none=True) quota_data["out_biz_no"] = out_biz_no quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value if quota_data.get("available_amount") is None: quota_data["available_amount"] = quota_data.get("total_amount", 0) quota = await crud.create(quota_data) if not quota: raise CustomException(msg="创建额度记录失败") return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id) @classmethod async def query_expense_quota_service( cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema ) -> ExpenseQuotaQueryOutSchema: """ 查询余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.query """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( AlipayEbppInvoiceExpensecontrolQuotaQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() model.owner_type = data.owner_type model.page_size = data.page_size model.page_num = data.page_num if data.target_type: model.target_type = data.target_type if data.target_id: model.target_id = data.target_id if data.owner_id: model.owner_id = data.owner_id if data.owner_open_id: model.owner_open_id = data.owner_open_id if data.enterprise_id: model.enterprise_id = data.enterprise_id if data.quota_id_list: model.quota_id_list = data.quota_id_list if data.quota_type: model.quota_type = data.quota_type request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询余额/点券失败: {result.msg}") return ExpenseQuotaQueryOutSchema( page_num=result.page_num or data.page_num, page_size=result.page_size or data.page_size, total_page_count=result.total_page_count or 0, ) @classmethod async def modify_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema ) -> QuotaOperationOutSchema: """ 修改余额/点券 调用: alipay.ebpp.invoice.expensecontrol.quota.modify """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import ( AlipayEbppInvoiceExpensecontrolQuotaModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import ( AlipayEbppInvoiceExpensecontrolQuotaModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import ( AlipayEbppInvoiceExpensecontrolQuotaModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel() model.quota_id = data.quota_id model.action = data.action model.outer_source_id = data.outer_source_id model.enterprise_id = data.enterprise_id if data.amount is not None: model.amount = str(data.amount) if data.share_mode: model.share_mode = data.share_mode request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="修改余额/点券失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"修改余额/点券失败: {result.msg}") return QuotaOperationOutSchema( out_biz_no=out_biz_no, quota_id=data.quota_id, result=result.success, ) @classmethod async def delete_expense_quota_service( cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema ) -> QuotaOperationOutSchema: """ 删除额度 调用: alipay.ebpp.invoice.expensecontrol.quota.delete """ crud = QuotaCRUD(auth) quota = await crud.get_by_out_biz_no(out_biz_no) if not quota: raise CustomException(msg="额度不存在") try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import ( AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel() model.enterprise_id = data.enterprise_id if data.quota_id: model.quota_id = data.quota_id if data.issue_batch_id: model.issue_batch_id = data.issue_batch_id request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="删除额度失败: 无响应") result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"删除额度失败: {result.msg}") await crud.delete(id=quota.id) return QuotaOperationOutSchema(out_biz_no=out_biz_no) # ======================== # 手工批量发放额度 # ======================== @classmethod async def issue_batch_create_service( cls, auth: AuthSchema, data: IssueBatchCreateSchema ) -> IssueBatchCreateOutSchema: """ 手工批量发放额度 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel, ) from alipay.aop.api.domain.IssueTargetInfoContent import ( IssueTargetInfoContent, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import ( AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") # 本地检查批次号是否已存在,避免无效调用支付宝 try: issue_batch_crud = IssueBatchCRUD(auth) existing_batch = await issue_batch_crud.get_by_batch_no(data.batch_no) if existing_batch: raise CustomException(msg=f"批次号 {data.batch_no} 已存在,请勿重复创建") except CustomException: raise except Exception: pass model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel() model.enterprise_id = data.enterprise_id model.issue_name = data.issue_name model.quota_type = data.quota_type model.effective_start_date = data.effective_start_date model.effective_end_date = data.effective_end_date model.institution_id = data.institution_id model.batch_no = data.batch_no model.share_mode = data.share_mode if data.issue_desc: model.issue_desc = data.issue_desc if data.issue_target_info_list: target_list = [] for item in data.issue_target_info_list: target = IssueTargetInfoContent() target.issue_quota = item.issue_quota if item.owner_open_id: target.owner_open_id = item.owner_open_id if item.owner_id: target.owner_id = item.owner_id if item.user_name: target.user_name = item.user_name if item.owner_type: target.owner_type = item.owner_type target_list.append(target) model.issue_target_info_list = target_list request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="手工批量发放额度失败: 无响应") result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse() result.parse_response_content(response) if not result.is_success(): sub_msg = getattr(result, 'sub_msg', '') or '' err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "") log.error(f"支付宝接口调用失败: {result.code} - {err_detail}") raise CustomException(msg=f"手工批量发放额度失败: {sub_msg or result.msg}") # 保存批次记录到本地 try: issue_batch_crud = IssueBatchCRUD(auth) total_amount = Decimal("0") if data.issue_target_info_list: for item in data.issue_target_info_list: try: total_amount += Decimal(item.issue_quota) except Exception: pass batch_data = { "issue_batch_id": result.issue_batch_id, "batch_no": data.batch_no, "institution_id": data.institution_id, "issue_name": data.issue_name, "quota_type": data.quota_type, "share_mode": data.share_mode, "total_count": len(data.issue_target_info_list or []), "total_amount": total_amount, "status": "ACTIVE", "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"), "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"), "issue_desc": data.issue_desc, } await issue_batch_crud.create(batch_data) except Exception as e: import traceback log.warning(f"保存发放批次记录失败(不影响发放), 可检查: 1)batch_no是否重复 2)字段长度\n异常: {e}\n{traceback.format_exc()}") # 组装校验失败列表(在更新本地记录前获取,用于过滤) failed_owner_ids: set[str] = set() if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list: failed_owner_ids = { str(getattr(f, 'owner_id', '')) for f in result.issue_quota_check_failed_list if getattr(f, 'owner_id', None) } # 为每个员工插入新的额度记录(每次发放独立记录,支持同一员工多次发放) try: from app.plugin.module_payment.expense.quota.model import QuotaModel from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum from sqlalchemy import insert, delete as sa_delete # 清理同一制度下 quota_id 为空的陈旧记录(来自较早失败/未完成的批次) try: clean_stmt = sa_delete(QuotaModel).where( QuotaModel.institution_id == data.institution_id, QuotaModel.quota_id.is_(None), ) del_result = await auth.db.execute(clean_stmt) if del_result.rowcount: log.info(f"手工发放 - 清理 {del_result.rowcount} 条陈旧记录(institution_id={data.institution_id})") await auth.db.flush() except Exception as e: log.warning(f"手工发放 - 清理陈旧记录失败(不影响发放): {e}") tenant_id = auth.user.tenant_id if auth.user else 1 effective_start = datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S") if data.effective_start_date else None effective_end = datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S") if data.effective_end_date else None inserted_count = 0 if data.issue_target_info_list: for item in data.issue_target_info_list: emp_id = (item.owner_id or "") # 跳过支付宝校验失败的员工 if emp_id in failed_owner_ids: log.warning(f"手工发放 - 跳过校验失败员工: owner_id={emp_id}") continue try: quota_amount = Decimal(item.issue_quota) except Exception: quota_amount = Decimal("0") # 每次都新增独立记录,不覆盖已有记录 stmt = insert(QuotaModel).values( employee_id=emp_id, institution_id=data.institution_id, quota_type=data.quota_type, target_type="INSTITUTION", target_id=data.institution_id, out_biz_no=f"batch_{data.batch_no}_{emp_id}", total_amount=quota_amount, available_amount=quota_amount, status=QuotaStatusEnum.QUOTA_ACTIVE.value, valid_from=effective_start, valid_to=effective_end, enterprise_id=data.enterprise_id, tenant_id=tenant_id, ) await auth.db.execute(stmt) inserted_count += 1 await auth.db.flush() log.info(f"手工发放 - 新增 {inserted_count} 条额度记录(跳过 {len(failed_owner_ids)} 条校验失败)") # 查询支付宝端的发放记录,获取每个员工的 real quota_id 并更新本地记录 try: from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse, ) records_model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel() records_model.enterprise_id = data.enterprise_id records_model.institution_id = data.institution_id records_model.issue_batch_id = result.issue_batch_id records_model.page_size = len(data.issue_target_info_list) records_model.page_num = 1 records_request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest() records_request.biz_model = records_model records_response = client.execute(records_request) if records_response: records_result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse() records_result.parse_response_content(records_response) if records_result.is_success() and hasattr(records_result, 'issue_record_info_list') and records_result.issue_record_info_list: updated_quota_id_count = 0 for r in records_result.issue_record_info_list: quota_id = getattr(r, 'quota_id', None) owner_id = getattr(r, 'owner_id', None) if quota_id and owner_id: # 按 out_biz_no 模式匹配,只更新本批次插入的记录 q_upd = sa_update(QuotaModel).where( QuotaModel.out_biz_no == f"batch_{data.batch_no}_{owner_id}", QuotaModel.institution_id == data.institution_id, ).values(quota_id=quota_id) await auth.db.execute(q_upd) updated_quota_id_count += 1 if updated_quota_id_count: await auth.db.flush() log.info(f"手工发放 - 已从支付宝查询并更新 {updated_quota_id_count} 条记录的 quota_id") except Exception as e: log.warning(f"查询支付宝发放记录获取 quota_id 失败(不影响发放): {e}") except Exception as e: import traceback log.warning(f"保存额度记录到本地失败(不影响发放), 可检查: 1)tenant_id/enterprise_id 2)字段长度\n异常: {e}\n{traceback.format_exc()}") # 组装校验失败列表 failed_list = None if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list: failed_list = [] for f in result.issue_quota_check_failed_list: failed_list.append(IssueQuotaCheckFailedItem( user_name=getattr(f, 'user_name', None), owner_type=getattr(f, 'owner_type', None), owner_id=getattr(f, 'owner_id', None), owner_open_id=getattr(f, 'owner_open_id', None), issue_quota=getattr(f, 'issue_quota', None), message=getattr(f, 'message', None), result=getattr(f, 'result', None), )) return IssueBatchCreateOutSchema( issue_batch_id=result.issue_batch_id, issue_quota_check_failed_list=failed_list, ) @classmethod async def issue_batch_cancel_service( cls, auth: AuthSchema, data: IssueBatchCancelSchema ) -> IssueBatchCancelOutSchema: """ 作废手工发放批次 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel """ try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import ( AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel() model.enterprise_id = data.enterprise_id model.institution_id = data.institution_id model.issue_batch_id = data.issue_batch_id request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="作废手工发放批次失败: 无响应") result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse() result.parse_response_content(response) if not result.is_success(): sub_msg = getattr(result, 'sub_msg', '') or '' err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "") log.error(f"支付宝接口调用失败: {result.code} - {err_detail}") raise CustomException(msg=f"作废手工发放批次失败: {sub_msg or result.msg}") # 更新本地批次记录状态 try: issue_batch_crud = IssueBatchCRUD(auth) batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id) if batch: setattr(batch, 'status', 'CANCELLED') await issue_batch_crud.update(batch.id, {"status": "CANCELLED"}) except Exception as e: log.warning(f"更新批次本地状态失败(不影响作废): {e}") # 作废批次后,删除该批次创建的额度记录 try: from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import delete as sa_delete # 从本地批次记录获取 batch_no issue_batch_crud = IssueBatchCRUD(auth) batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id) batch_no = batch.batch_no if batch else None if batch_no: # 按 out_biz_no 模式匹配: batch_{batch_no}_% pattern = f"batch_{batch_no}_%" del_stmt = sa_delete(QuotaModel).where( QuotaModel.out_biz_no.like(pattern), QuotaModel.institution_id == data.institution_id, ) await auth.db.execute(del_stmt) await auth.db.flush() log.info(f"批次作废 - 已删除该批次创建的额度记录: batch_no={batch_no}") except Exception as e: log.warning(f"删除额度记录失败(不影响作废): {e}") return IssueBatchCancelOutSchema( result=getattr(result, 'result', False), ) return IssueBatchCancelOutSchema( result=getattr(result, 'result', False), ) @classmethod async def issue_batch_records_query_service( cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema ) -> IssueBatchRecordsQueryOutSchema: """ 查询手工发放发放明细 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery """ try: from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import ( AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖") model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel() model.enterprise_id = data.enterprise_id model.institution_id = data.institution_id model.issue_batch_id = data.issue_batch_id model.page_size = data.page_size model.page_num = data.page_num request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="查询手工发放明细失败: 无响应") result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse() result.parse_response_content(response) if not result.is_success(): log.error(f"支付宝接口调用失败: {result.code} - {result.msg}") raise CustomException(msg=f"查询手工发放明细失败: {result.msg}") record_list = None if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list: record_list = [] for r in result.issue_record_info_list: record_list.append(IssueRecordInfoItem( quota_id=getattr(r, 'quota_id', None), issue_quota=getattr(r, 'issue_quota', None), issue_status=getattr(r, 'issue_status', None), owner_type=getattr(r, 'owner_type', None), owner_id=getattr(r, 'owner_id', None), owner_open_id=getattr(r, 'owner_open_id', None), user_name=getattr(r, 'user_name', None), currency=getattr(r, 'currency', None), )) return IssueBatchRecordsQueryOutSchema( page_num=getattr(result, 'page_num', data.page_num), page_size=getattr(result, 'page_size', data.page_size), total_page_count=getattr(result, 'total_page_count', 0), issue_record_info_list=record_list, ) @classmethod async def list_batch_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: """分页查询手工发放批次列表(本地DB)""" crud = IssueBatchCRUD(auth) offset = (page_no - 1) * page_size return await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=IssueBatchListOutSchema, ) @classmethod async def list_employee_quota_records_service( cls, auth: AuthSchema, employee_id: str, institution_id: str | None = None, ) -> list[dict]: """查询员工的额度记录列表""" from app.plugin.module_payment.expense.quota.model import QuotaModel from sqlalchemy import select where = [QuotaModel.employee_id == employee_id] if institution_id: where.append(QuotaModel.institution_id == institution_id) stmt = select(QuotaModel).where(*where).order_by(QuotaModel.created_time.desc()) result = await auth.db.execute(stmt) quotas = result.scalars().all() return [ { "quota_id": q.quota_id, "out_biz_no": q.out_biz_no, "total_amount": float(q.total_amount) if q.total_amount else 0, "available_amount": float(q.available_amount) if q.available_amount else 0, "quota_type": q.quota_type, "status": q.status, "valid_from": q.valid_from, "valid_to": q.valid_to, "created_time": q.created_time, "institution_id": q.institution_id, } for q in quotas ] @classmethod async def adjust_quota_service( cls, auth: AuthSchema, data: AdjustQuotaSchema ) -> dict: """调整额度金额 (调Alipay quota.modify + 记录变更日志)""" from .crud import QuotaChangeLogCRUD from .model import QuotaModel from sqlalchemy import select, update as sa_update # 查询当前额度记录(支持按 quota_id 或 id 查找) stmt = select(QuotaModel).where(QuotaModel.quota_id == data.quota_id) result = await auth.db.execute(stmt) quota = result.scalar_one_or_none() if not quota: # quota_id 为空时尝试按数据库 id 查找 try: local_id = int(data.quota_id) stmt = select(QuotaModel).where(QuotaModel.id == local_id) result = await auth.db.execute(stmt) quota = result.scalar_one_or_none() except (ValueError, TypeError): pass if not quota: raise CustomException(msg="额度记录不存在") # 如果本地 quota_id 为空,尝试从 Alipay 获取真实 quota_id alipay_quota_id = quota.quota_id if not alipay_quota_id and quota.out_biz_no: try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( AlipayEbppInvoiceExpensecontrolQuotaQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, ) query_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() query_model.enterprise_id = quota.enterprise_id or data.enterprise_id query_model.owner_id = quota.employee_id query_model.owner_type = "ENTERPRISE_PAY_UID" query_model.page_num = 1 query_model.page_size = 10 query_request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() query_request.biz_model = query_model client = AlipayClient.get_client() query_response = client.execute(query_request) if query_response: query_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() query_result.parse_response_content(query_response) if query_result.is_success() and hasattr(query_result, 'quota_detail_info_list') and query_result.quota_detail_info_list: for q in query_result.quota_detail_info_list: qid = getattr(q, 'quota_id', None) if qid: alipay_quota_id = qid # 回写本地 upd = sa_update(QuotaModel).where(QuotaModel.id == quota.id).values(quota_id=qid) await auth.db.execute(upd) await auth.db.flush() log.info(f"从支付宝回填 quota_id: id={quota.id}, quota_id={qid}") break except Exception as e: log.warning(f"从支付宝获取真实 quota_id 失败: {e}") if not alipay_quota_id: raise CustomException(msg="该额度暂未关联支付宝额度ID,无法调整(请等待支付宝同步后重试)") current_available = float(quota.available_amount) if quota.available_amount else 0 diff = round(data.amount - current_available, 2) outer_source_id = str(get_snowflake_id()) # 调Alipay quota.modify try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import ( AlipayEbppInvoiceExpensecontrolQuotaModifyRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import ( AlipayEbppInvoiceExpensecontrolQuotaModifyModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import ( AlipayEbppInvoiceExpensecontrolQuotaModifyResponse, ) except ImportError: raise CustomException(msg="支付宝SDK未正确安装") model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel() model.quota_id = alipay_quota_id model.action = "ADD" if diff >= 0 else "DEDUCT" model.outer_source_id = outer_source_id model.enterprise_id = data.enterprise_id model.amount = str(int(abs(diff) * 100)) request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest() request.biz_model = model client = AlipayClient.get_client() response = client.execute(request) if not response: raise CustomException(msg="调整额度失败: 无响应") mod_result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse() mod_result.parse_response_content(response) if not mod_result.is_success(): sub_msg = getattr(mod_result, 'sub_msg', '') or '' sub_code = getattr(mod_result, 'sub_code', '') or '' log.error(f"支付宝接口调用失败: {mod_result.code} - {mod_result.msg} (sub_code={sub_code}, sub_msg={sub_msg})") # 如果 Alipay 提示额度不存在,清除本地错误的 quota_id(可能是 issue_rule_id 误存),后续可重试查询回填 if ('不存在' in sub_msg or 'INVALID' in sub_code) and quota: try: upd = sa_update(QuotaModel).where(QuotaModel.id == quota.id).values(quota_id=None) await auth.db.execute(upd) await auth.db.flush() log.info(f"调整额度 - 本地quota_id无效({data.quota_id})已清除,后续可从支付宝查询回填") except Exception as e: log.warning(f"清除本地无效quota_id失败: {e}") raise CustomException(msg=f"调整额度失败: {sub_msg or mod_result.msg}") # 更新本地额度记录 new_available = data.amount new_total = float(quota.total_amount) if quota.total_amount else 0 if diff > 0: new_total += diff else: new_total = max(0, new_total + diff) upd = sa_update(QuotaModel).where( QuotaModel.quota_id == data.quota_id ).values( total_amount=new_total, available_amount=new_available, ) await auth.db.execute(upd) await auth.db.flush() # 记录变更日志 try: log_crud = QuotaChangeLogCRUD(auth) await log_crud.create({ "quota_id": data.quota_id, "employee_id": quota.employee_id or "", "institution_id": quota.institution_id or "", "change_type": "ADJUST", "coupon_name": quota.out_biz_no or "额度调整", "change_amount": diff, "before_amount": current_available, "after_amount": new_available, "change_desc": data.change_desc or "", "enterprise_id": data.enterprise_id, "tenant_id": auth.user.tenant_id if auth.user else 1, }) except Exception as e: log.warning(f"记录变更日志失败(不影响调整): {e}") return { "quota_id": data.quota_id, "before_amount": current_available, "after_amount": new_available, "diff": diff, } @classmethod async def list_quota_changes_service( cls, auth: AuthSchema, quota_id: str ) -> list[dict]: """查询额度的变更记录""" from .crud import QuotaChangeLogCRUD crud = QuotaChangeLogCRUD(auth) logs = await crud.list(search={"quota_id": quota_id}, order_by=[{"id": "desc"}]) return [ { "coupon_name": log.coupon_name, "change_time": log.created_time, "change_amount": float(log.change_amount) if log.change_amount else 0, "change_desc": log.change_desc, "change_type": log.change_type, } for log in (logs or []) ] @classmethod async def list_service( cls, auth: AuthSchema, page_no: int = 1, page_size: int = 20, search: dict | None = None, ) -> dict: crud = QuotaCRUD(auth) offset = (page_no - 1) * page_size result = await crud.page( offset=offset, limit=page_size, order_by=[{"id": "desc"}], search=search or {}, out_schema=QuotaListOutSchema, ) # 如果指定了 institution_id,同时查询支付宝端自动发放的额度 if search and search.get("institution_id"): try: from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import ( AlipayEbppInvoiceExpensecontrolQuotaQueryRequest, ) from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import ( AlipayEbppInvoiceExpensecontrolQuotaQueryModel, ) from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import ( AlipayEbppInvoiceExpensecontrolQuotaQueryResponse, ) alipay_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel() # owner_type 为必填字段,查询制度下所有额度时使用通用类型 alipay_model.owner_type = "ENTERPRISE_PAY_UID" alipay_model.target_type = "INSTITUTION" alipay_model.target_id = search["institution_id"] alipay_model.page_size = 100 alipay_model.page_num = 1 request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest() request.biz_model = alipay_model client = AlipayClient.get_client() response = client.execute(request) if response: alipay_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse() alipay_result.parse_response_content(response) if alipay_result.is_success() and hasattr(alipay_result, 'quota_detail_info_list') and alipay_result.quota_detail_info_list: # 将支付宝端额度合并到结果中(去重) existing_ids = {item.get("quota_id") for item in result.get("items", []) if item.get("quota_id")} for q in alipay_result.quota_detail_info_list: qid = getattr(q, 'quota_id', None) if qid and qid not in existing_ids: result["items"].append({ "quota_id": qid, "target_type": getattr(q, 'target_type', None), "target_id": getattr(q, 'target_id', None), "quota_type": getattr(q, 'quota_type', None), "employee_id": getattr(q, 'owner_id', None), "total_amount": getattr(q, 'total_amount', None), "available_amount": getattr(q, 'available_amount', None), "status": getattr(q, 'status', "QUOTA_ACTIVE"), "created_time": getattr(q, 'effective_start_date', None), }) existing_ids.add(qid) result["total"] = len(result["items"]) except Exception as e: log.warning(f"查询支付宝端额度失败(不影响本地数据): {e}") return result