| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031 |
- from datetime import datetime
- from decimal import Decimal
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id
- from .enums import QuotaStatusEnum
- from .schema import (
- AdjustQuotaSchema,
- ExpenseQuotaCreateSchema,
- ExpenseQuotaDeleteSchema,
- ExpenseQuotaModifySchema,
- ExpenseQuotaQueryOutSchema,
- ExpenseQuotaQuerySchema,
- IssueBatchCancelOutSchema,
- IssueBatchCancelSchema,
- IssueBatchCreateOutSchema,
- IssueBatchCreateSchema,
- IssueBatchListOutSchema,
- IssueBatchRecordsQueryOutSchema,
- IssueBatchRecordsQuerySchema,
- IssueQuotaCheckFailedItem,
- IssueRecordInfoItem,
- QuotaCreateSchema,
- QuotaDetailInfoSchema,
- QuotaListOutSchema,
- QuotaOperationOutSchema,
- QuotaOutSchema,
- QuotaUpdateSchema,
- )
- from .crud import IssueBatchCRUD, QuotaCRUD
- from .model import IssueBatchModel
- class QuotaService:
- """额度服务层"""
- @classmethod
- async def create_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """
- 创建余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.create
- """
- crud = QuotaCRUD(auth)
- out_biz_no = data.outer_source_id or str(get_snowflake_id())
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateModel,
- )
- from alipay.aop.api.domain.IssueQuotaTarget import (
- IssueQuotaTarget,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel()
- model.target_type = data.target_type
- model.target_id = data.target_id
- model.enterprise_id = data.enterprise_id
- model.outer_source_id = out_biz_no
- model.quota_type = data.quota_type or "CAP"
- model.share_mode = data.share_mode or "0"
- if data.effective_start_date:
- model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.effective_end_date:
- model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.issue_name:
- model.issue_name = data.issue_name
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_quota_target_list:
- target_list = []
- for item in data.issue_quota_target_list:
- target = IssueQuotaTarget()
- target.owner_type = item.owner_type
- target.owner_id = item.owner_id
- target.quota = item.quota
- if item.amount is not None:
- target.amount = item.amount
- target_list.append(target)
- model.issue_quota_target_list = target_list
- request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="创建余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建余额/点券失败: {result.msg}")
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if result.quota_id:
- quota_data["quota_id"] = result.quota_id
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id)
- @classmethod
- async def create_quota_service(
- cls, auth: AuthSchema, data: QuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """创建额度"""
- crud = QuotaCRUD(auth)
- out_biz_no = str(get_snowflake_id())
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if quota_data.get("available_amount") is None:
- quota_data["available_amount"] = quota_data.get("total_amount", 0)
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id)
- @classmethod
- async def query_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema
- ) -> ExpenseQuotaQueryOutSchema:
- """
- 查询余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.query
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- model.owner_type = data.owner_type
- model.page_size = data.page_size
- model.page_num = data.page_num
- if data.target_type:
- model.target_type = data.target_type
- if data.target_id:
- model.target_id = data.target_id
- if data.owner_id:
- model.owner_id = data.owner_id
- if data.owner_open_id:
- model.owner_open_id = data.owner_open_id
- if data.enterprise_id:
- model.enterprise_id = data.enterprise_id
- if data.quota_id_list:
- model.quota_id_list = data.quota_id_list
- if data.quota_type:
- model.quota_type = data.quota_type
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询余额/点券失败: {result.msg}")
- return ExpenseQuotaQueryOutSchema(
- page_num=result.page_num or data.page_num,
- page_size=result.page_size or data.page_size,
- total_page_count=result.total_page_count or 0,
- )
- @classmethod
- async def modify_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema
- ) -> QuotaOperationOutSchema:
- """
- 修改余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.modify
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = data.quota_id
- model.action = data.action
- model.outer_source_id = data.outer_source_id
- model.enterprise_id = data.enterprise_id
- if data.amount is not None:
- model.amount = str(data.amount)
- if data.share_mode:
- model.share_mode = data.share_mode
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="修改余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"修改余额/点券失败: {result.msg}")
- return QuotaOperationOutSchema(
- out_biz_no=out_biz_no,
- quota_id=data.quota_id,
- result=result.success,
- )
- @classmethod
- async def delete_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema
- ) -> QuotaOperationOutSchema:
- """
- 删除额度
- 调用: alipay.ebpp.invoice.expensecontrol.quota.delete
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel()
- model.enterprise_id = data.enterprise_id
- if data.quota_id:
- model.quota_id = data.quota_id
- if data.issue_batch_id:
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="删除额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"删除额度失败: {result.msg}")
- await crud.delete(id=quota.id)
- return QuotaOperationOutSchema(out_biz_no=out_biz_no)
- # ========================
- # 手工批量发放额度
- # ========================
- @classmethod
- async def issue_batch_create_service(
- cls, auth: AuthSchema, data: IssueBatchCreateSchema
- ) -> IssueBatchCreateOutSchema:
- """
- 手工批量发放额度
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel,
- )
- from alipay.aop.api.domain.IssueTargetInfoContent import (
- IssueTargetInfoContent,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- # 本地检查批次号是否已存在,避免无效调用支付宝
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- existing_batch = await issue_batch_crud.get_by_batch_no(data.batch_no)
- if existing_batch:
- raise CustomException(msg=f"批次号 {data.batch_no} 已存在,请勿重复创建")
- except CustomException:
- raise
- except Exception:
- pass
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel()
- model.enterprise_id = data.enterprise_id
- model.issue_name = data.issue_name
- model.quota_type = data.quota_type
- model.effective_start_date = data.effective_start_date
- model.effective_end_date = data.effective_end_date
- model.institution_id = data.institution_id
- model.batch_no = data.batch_no
- model.share_mode = data.share_mode
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_target_info_list:
- target_list = []
- for item in data.issue_target_info_list:
- target = IssueTargetInfoContent()
- target.issue_quota = item.issue_quota
- if item.owner_open_id:
- target.owner_open_id = item.owner_open_id
- if item.owner_id:
- target.owner_id = item.owner_id
- if item.user_name:
- target.user_name = item.user_name
- if item.owner_type:
- target.owner_type = item.owner_type
- target_list.append(target)
- model.issue_target_info_list = target_list
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="手工批量发放额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"手工批量发放额度失败: {sub_msg or result.msg}")
- # 保存批次记录到本地
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- total_amount = Decimal("0")
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- try:
- total_amount += Decimal(item.issue_quota)
- except Exception:
- pass
- batch_data = {
- "issue_batch_id": result.issue_batch_id,
- "batch_no": data.batch_no,
- "institution_id": data.institution_id,
- "issue_name": data.issue_name,
- "quota_type": data.quota_type,
- "share_mode": data.share_mode,
- "total_count": len(data.issue_target_info_list or []),
- "total_amount": total_amount,
- "status": "ACTIVE",
- "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"),
- "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"),
- "issue_desc": data.issue_desc,
- }
- await issue_batch_crud.create(batch_data)
- except Exception as e:
- import traceback
- log.warning(f"保存发放批次记录失败(不影响发放), 可检查: 1)batch_no是否重复 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表(在更新本地记录前获取,用于过滤)
- failed_owner_ids: set[str] = set()
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_owner_ids = {
- str(getattr(f, 'owner_id', ''))
- for f in result.issue_quota_check_failed_list
- if getattr(f, 'owner_id', None)
- }
- # 为每个员工插入新的额度记录(每次发放独立记录,支持同一员工多次发放)
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import insert, delete as sa_delete
- # 清理同一制度下 quota_id 为空的陈旧记录(来自较早失败/未完成的批次)
- try:
- clean_stmt = sa_delete(QuotaModel).where(
- QuotaModel.institution_id == data.institution_id,
- QuotaModel.quota_id.is_(None),
- )
- del_result = await auth.db.execute(clean_stmt)
- if del_result.rowcount:
- log.info(f"手工发放 - 清理 {del_result.rowcount} 条陈旧记录(institution_id={data.institution_id})")
- await auth.db.flush()
- except Exception as e:
- log.warning(f"手工发放 - 清理陈旧记录失败(不影响发放): {e}")
- tenant_id = auth.user.tenant_id if auth.user else 1
- effective_start = datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S") if data.effective_start_date else None
- effective_end = datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S") if data.effective_end_date else None
- inserted_count = 0
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- emp_id = (item.owner_id or "")
- # 跳过支付宝校验失败的员工
- if emp_id in failed_owner_ids:
- log.warning(f"手工发放 - 跳过校验失败员工: owner_id={emp_id}")
- continue
- try:
- quota_amount = Decimal(item.issue_quota)
- except Exception:
- quota_amount = Decimal("0")
- # 每次都新增独立记录,不覆盖已有记录
- stmt = insert(QuotaModel).values(
- employee_id=emp_id,
- institution_id=data.institution_id,
- quota_type=data.quota_type,
- target_type="INSTITUTION",
- target_id=data.institution_id,
- out_biz_no=f"batch_{data.batch_no}_{emp_id}",
- total_amount=quota_amount,
- available_amount=quota_amount,
- status=QuotaStatusEnum.QUOTA_ACTIVE.value,
- valid_from=effective_start,
- valid_to=effective_end,
- enterprise_id=data.enterprise_id,
- tenant_id=tenant_id,
- )
- await auth.db.execute(stmt)
- inserted_count += 1
- await auth.db.flush()
- log.info(f"手工发放 - 新增 {inserted_count} 条额度记录(跳过 {len(failed_owner_ids)} 条校验失败)")
- # 查询支付宝端的发放记录,获取每个员工的 real quota_id 并更新本地记录
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- records_model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- records_model.enterprise_id = data.enterprise_id
- records_model.institution_id = data.institution_id
- records_model.issue_batch_id = result.issue_batch_id
- records_model.page_size = len(data.issue_target_info_list)
- records_model.page_num = 1
- records_request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- records_request.biz_model = records_model
- records_response = client.execute(records_request)
- if records_response:
- records_result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- records_result.parse_response_content(records_response)
- if records_result.is_success() and hasattr(records_result, 'issue_record_info_list') and records_result.issue_record_info_list:
- updated_quota_id_count = 0
- for r in records_result.issue_record_info_list:
- quota_id = getattr(r, 'quota_id', None)
- owner_id = getattr(r, 'owner_id', None)
- if quota_id and owner_id:
- # 按 out_biz_no 模式匹配,只更新本批次插入的记录
- q_upd = sa_update(QuotaModel).where(
- QuotaModel.out_biz_no == f"batch_{data.batch_no}_{owner_id}",
- QuotaModel.institution_id == data.institution_id,
- ).values(quota_id=quota_id)
- await auth.db.execute(q_upd)
- updated_quota_id_count += 1
- if updated_quota_id_count:
- await auth.db.flush()
- log.info(f"手工发放 - 已从支付宝查询并更新 {updated_quota_id_count} 条记录的 quota_id")
- except Exception as e:
- log.warning(f"查询支付宝发放记录获取 quota_id 失败(不影响发放): {e}")
- except Exception as e:
- import traceback
- log.warning(f"保存额度记录到本地失败(不影响发放), 可检查: 1)tenant_id/enterprise_id 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表
- failed_list = None
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_list = []
- for f in result.issue_quota_check_failed_list:
- failed_list.append(IssueQuotaCheckFailedItem(
- user_name=getattr(f, 'user_name', None),
- owner_type=getattr(f, 'owner_type', None),
- owner_id=getattr(f, 'owner_id', None),
- owner_open_id=getattr(f, 'owner_open_id', None),
- issue_quota=getattr(f, 'issue_quota', None),
- message=getattr(f, 'message', None),
- result=getattr(f, 'result', None),
- ))
- return IssueBatchCreateOutSchema(
- issue_batch_id=result.issue_batch_id,
- issue_quota_check_failed_list=failed_list,
- )
- @classmethod
- async def issue_batch_cancel_service(
- cls, auth: AuthSchema, data: IssueBatchCancelSchema
- ) -> IssueBatchCancelOutSchema:
- """
- 作废手工发放批次
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="作废手工发放批次失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"作废手工发放批次失败: {sub_msg or result.msg}")
- # 更新本地批次记录状态
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- if batch:
- setattr(batch, 'status', 'CANCELLED')
- await issue_batch_crud.update(batch.id, {"status": "CANCELLED"})
- except Exception as e:
- log.warning(f"更新批次本地状态失败(不影响作废): {e}")
- # 作废批次后,删除该批次创建的额度记录
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import delete as sa_delete
- # 从本地批次记录获取 batch_no
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- batch_no = batch.batch_no if batch else None
- if batch_no:
- # 按 out_biz_no 模式匹配: batch_{batch_no}_%
- pattern = f"batch_{batch_no}_%"
- del_stmt = sa_delete(QuotaModel).where(
- QuotaModel.out_biz_no.like(pattern),
- QuotaModel.institution_id == data.institution_id,
- )
- await auth.db.execute(del_stmt)
- await auth.db.flush()
- log.info(f"批次作废 - 已删除该批次创建的额度记录: batch_no={batch_no}")
- except Exception as e:
- log.warning(f"删除额度记录失败(不影响作废): {e}")
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- @classmethod
- async def issue_batch_records_query_service(
- cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema
- ) -> IssueBatchRecordsQueryOutSchema:
- """
- 查询手工发放发放明细
- 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- model.page_size = data.page_size
- model.page_num = data.page_num
- request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询手工发放明细失败: 无响应")
- result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询手工发放明细失败: {result.msg}")
- record_list = None
- if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list:
- record_list = []
- for r in result.issue_record_info_list:
- record_list.append(IssueRecordInfoItem(
- quota_id=getattr(r, 'quota_id', None),
- issue_quota=getattr(r, 'issue_quota', None),
- issue_status=getattr(r, 'issue_status', None),
- owner_type=getattr(r, 'owner_type', None),
- owner_id=getattr(r, 'owner_id', None),
- owner_open_id=getattr(r, 'owner_open_id', None),
- user_name=getattr(r, 'user_name', None),
- currency=getattr(r, 'currency', None),
- ))
- return IssueBatchRecordsQueryOutSchema(
- page_num=getattr(result, 'page_num', data.page_num),
- page_size=getattr(result, 'page_size', data.page_size),
- total_page_count=getattr(result, 'total_page_count', 0),
- issue_record_info_list=record_list,
- )
- @classmethod
- async def list_batch_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """分页查询手工发放批次列表(本地DB)"""
- crud = IssueBatchCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=IssueBatchListOutSchema,
- )
- @classmethod
- async def list_employee_quota_records_service(
- cls,
- auth: AuthSchema,
- employee_id: str,
- institution_id: str | None = None,
- ) -> list[dict]:
- """查询员工的额度记录列表"""
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import select
- where = [QuotaModel.employee_id == employee_id]
- if institution_id:
- where.append(QuotaModel.institution_id == institution_id)
- stmt = select(QuotaModel).where(*where).order_by(QuotaModel.created_time.desc())
- result = await auth.db.execute(stmt)
- quotas = result.scalars().all()
- return [
- {
- "quota_id": q.quota_id,
- "out_biz_no": q.out_biz_no,
- "total_amount": float(q.total_amount) if q.total_amount else 0,
- "available_amount": float(q.available_amount) if q.available_amount else 0,
- "quota_type": q.quota_type,
- "status": q.status,
- "valid_from": q.valid_from,
- "valid_to": q.valid_to,
- "created_time": q.created_time,
- "institution_id": q.institution_id,
- }
- for q in quotas
- ]
- @classmethod
- async def adjust_quota_service(
- cls, auth: AuthSchema, data: AdjustQuotaSchema
- ) -> dict:
- """调整额度金额 (调Alipay quota.modify + 记录变更日志)"""
- from .crud import QuotaChangeLogCRUD
- from .model import QuotaModel
- from sqlalchemy import select, update as sa_update
- # 查询当前额度记录(支持按 quota_id 或 id 查找)
- stmt = select(QuotaModel).where(QuotaModel.quota_id == data.quota_id)
- result = await auth.db.execute(stmt)
- quota = result.scalar_one_or_none()
- if not quota:
- # quota_id 为空时尝试按数据库 id 查找
- try:
- local_id = int(data.quota_id)
- stmt = select(QuotaModel).where(QuotaModel.id == local_id)
- result = await auth.db.execute(stmt)
- quota = result.scalar_one_or_none()
- except (ValueError, TypeError):
- pass
- if not quota:
- raise CustomException(msg="额度记录不存在")
- # 如果本地 quota_id 为空,尝试从 Alipay 获取真实 quota_id
- alipay_quota_id = quota.quota_id
- if not alipay_quota_id and quota.out_biz_no:
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- query_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- query_model.enterprise_id = quota.enterprise_id or data.enterprise_id
- query_model.owner_id = quota.employee_id
- query_model.owner_type = "ENTERPRISE_PAY_UID"
- query_model.page_num = 1
- query_model.page_size = 10
- query_request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- query_request.biz_model = query_model
- client = AlipayClient.get_client()
- query_response = client.execute(query_request)
- if query_response:
- query_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- query_result.parse_response_content(query_response)
- if query_result.is_success() and hasattr(query_result, 'quota_detail_info_list') and query_result.quota_detail_info_list:
- for q in query_result.quota_detail_info_list:
- qid = getattr(q, 'quota_id', None)
- if qid:
- alipay_quota_id = qid
- # 回写本地
- upd = sa_update(QuotaModel).where(QuotaModel.id == quota.id).values(quota_id=qid)
- await auth.db.execute(upd)
- await auth.db.flush()
- log.info(f"从支付宝回填 quota_id: id={quota.id}, quota_id={qid}")
- break
- except Exception as e:
- log.warning(f"从支付宝获取真实 quota_id 失败: {e}")
- if not alipay_quota_id:
- raise CustomException(msg="该额度暂未关联支付宝额度ID,无法调整(请等待支付宝同步后重试)")
- current_available = float(quota.available_amount) if quota.available_amount else 0
- diff = round(data.amount - current_available, 2)
- outer_source_id = str(get_snowflake_id())
- # 调Alipay quota.modify
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = alipay_quota_id
- model.action = "ADD" if diff >= 0 else "DEDUCT"
- model.outer_source_id = outer_source_id
- model.enterprise_id = data.enterprise_id
- model.amount = str(int(abs(diff) * 100))
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="调整额度失败: 无响应")
- mod_result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- mod_result.parse_response_content(response)
- if not mod_result.is_success():
- sub_msg = getattr(mod_result, 'sub_msg', '') or ''
- sub_code = getattr(mod_result, 'sub_code', '') or ''
- log.error(f"支付宝接口调用失败: {mod_result.code} - {mod_result.msg} (sub_code={sub_code}, sub_msg={sub_msg})")
- # 如果 Alipay 提示额度不存在,清理本地陈旧记录
- if '不存在' in sub_msg or 'INVALID' in sub_code:
- try:
- from sqlalchemy import delete as sa_delete
- del_stmt = sa_delete(QuotaModel).where(QuotaModel.quota_id == data.quota_id)
- await auth.db.execute(del_stmt)
- await auth.db.flush()
- log.info(f"调整额度 - 支付宝侧额度不存在,已清理本地陈旧记录: quota_id={data.quota_id}")
- except Exception as e:
- log.warning(f"清理本地陈旧记录失败: {e}")
- raise CustomException(msg=f"调整额度失败: {sub_msg or mod_result.msg}")
- # 更新本地额度记录
- new_available = data.amount
- new_total = float(quota.total_amount) if quota.total_amount else 0
- if diff > 0:
- new_total += diff
- else:
- new_total = max(0, new_total + diff)
- upd = sa_update(QuotaModel).where(
- QuotaModel.quota_id == data.quota_id
- ).values(
- total_amount=new_total,
- available_amount=new_available,
- )
- await auth.db.execute(upd)
- await auth.db.flush()
- # 记录变更日志
- try:
- log_crud = QuotaChangeLogCRUD(auth)
- await log_crud.create({
- "quota_id": data.quota_id,
- "employee_id": quota.employee_id or "",
- "institution_id": quota.institution_id or "",
- "change_type": "ADJUST",
- "coupon_name": quota.out_biz_no or "额度调整",
- "change_amount": diff,
- "before_amount": current_available,
- "after_amount": new_available,
- "change_desc": data.change_desc or "",
- "enterprise_id": data.enterprise_id,
- "tenant_id": auth.user.tenant_id if auth.user else 1,
- })
- except Exception as e:
- log.warning(f"记录变更日志失败(不影响调整): {e}")
- return {
- "quota_id": data.quota_id,
- "before_amount": current_available,
- "after_amount": new_available,
- "diff": diff,
- }
- @classmethod
- async def list_quota_changes_service(
- cls, auth: AuthSchema, quota_id: str
- ) -> list[dict]:
- """查询额度的变更记录"""
- from .crud import QuotaChangeLogCRUD
- crud = QuotaChangeLogCRUD(auth)
- logs = await crud.list(search={"quota_id": quota_id}, order_by=[{"id": "desc"}])
- return [
- {
- "coupon_name": log.coupon_name,
- "change_time": log.created_time,
- "change_amount": float(log.change_amount) if log.change_amount else 0,
- "change_desc": log.change_desc,
- "change_type": log.change_type,
- }
- for log in (logs or [])
- ]
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- crud = QuotaCRUD(auth)
- offset = (page_no - 1) * page_size
- result = await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=QuotaListOutSchema,
- )
- # 如果指定了 institution_id,同时查询支付宝端自动发放的额度
- if search and search.get("institution_id"):
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- alipay_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- # owner_type 为必填字段,查询制度下所有额度时使用通用类型
- alipay_model.owner_type = "ENTERPRISE_PAY_UID"
- alipay_model.target_type = "INSTITUTION"
- alipay_model.target_id = search["institution_id"]
- alipay_model.page_size = 100
- alipay_model.page_num = 1
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = alipay_model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if response:
- alipay_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- alipay_result.parse_response_content(response)
- if alipay_result.is_success() and hasattr(alipay_result, 'quota_detail_info_list') and alipay_result.quota_detail_info_list:
- # 将支付宝端额度合并到结果中(去重)
- existing_ids = {item.get("quota_id") for item in result.get("items", []) if item.get("quota_id")}
- for q in alipay_result.quota_detail_info_list:
- qid = getattr(q, 'quota_id', None)
- if qid and qid not in existing_ids:
- result["items"].append({
- "quota_id": qid,
- "target_type": getattr(q, 'target_type', None),
- "target_id": getattr(q, 'target_id', None),
- "quota_type": getattr(q, 'quota_type', None),
- "employee_id": getattr(q, 'owner_id', None),
- "total_amount": getattr(q, 'total_amount', None),
- "available_amount": getattr(q, 'available_amount', None),
- "status": getattr(q, 'status', "QUOTA_ACTIVE"),
- "created_time": getattr(q, 'effective_start_date', None),
- })
- existing_ids.add(qid)
- result["total"] = len(result["items"])
- except Exception as e:
- log.warning(f"查询支付宝端额度失败(不影响本地数据): {e}")
- return result
|