| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030 |
- from datetime import datetime
- from decimal import Decimal
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id
- from .enums import QuotaStatusEnum
- from .schema import (
- AdjustQuotaSchema,
- ExpenseQuotaCreateSchema,
- ExpenseQuotaDeleteSchema,
- ExpenseQuotaModifySchema,
- ExpenseQuotaQueryOutSchema,
- ExpenseQuotaQuerySchema,
- IssueBatchCancelOutSchema,
- IssueBatchCancelSchema,
- IssueBatchCreateOutSchema,
- IssueBatchCreateSchema,
- IssueBatchListOutSchema,
- IssueBatchRecordsQueryOutSchema,
- IssueBatchRecordsQuerySchema,
- IssueQuotaCheckFailedItem,
- IssueRecordInfoItem,
- QuotaCreateSchema,
- QuotaDetailInfoSchema,
- QuotaListOutSchema,
- QuotaOperationOutSchema,
- QuotaOutSchema,
- QuotaUpdateSchema,
- )
- from .crud import IssueBatchCRUD, QuotaCRUD
- from .model import IssueBatchModel
- class QuotaService:
- """额度服务层"""
- @classmethod
- async def create_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """
- 创建余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.create
- """
- crud = QuotaCRUD(auth)
- out_biz_no = data.outer_source_id or str(get_snowflake_id())
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateModel,
- )
- from alipay.aop.api.domain.IssueQuotaTarget import (
- IssueQuotaTarget,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel()
- model.target_type = data.target_type
- model.target_id = data.target_id
- model.enterprise_id = data.enterprise_id
- model.outer_source_id = out_biz_no
- model.quota_type = data.quota_type or "CAP"
- model.share_mode = data.share_mode or "0"
- if data.effective_start_date:
- model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.effective_end_date:
- model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.issue_name:
- model.issue_name = data.issue_name
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_quota_target_list:
- target_list = []
- for item in data.issue_quota_target_list:
- target = IssueQuotaTarget()
- target.owner_type = item.owner_type
- target.owner_id = item.owner_id
- target.quota = item.quota
- if item.amount is not None:
- target.amount = item.amount
- target_list.append(target)
- model.issue_quota_target_list = target_list
- request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="创建余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建余额/点券失败: {result.msg}")
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if result.quota_id:
- quota_data["quota_id"] = result.quota_id
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id)
- @classmethod
- async def create_quota_service(
- cls, auth: AuthSchema, data: QuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """创建额度"""
- crud = QuotaCRUD(auth)
- out_biz_no = str(get_snowflake_id())
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if quota_data.get("available_amount") is None:
- quota_data["available_amount"] = quota_data.get("total_amount", 0)
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id)
- @classmethod
- async def query_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema
- ) -> ExpenseQuotaQueryOutSchema:
- """
- 查询余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.query
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- model.owner_type = data.owner_type
- model.page_size = data.page_size
- model.page_num = data.page_num
- if data.target_type:
- model.target_type = data.target_type
- if data.target_id:
- model.target_id = data.target_id
- if data.owner_id:
- model.owner_id = data.owner_id
- if data.owner_open_id:
- model.owner_open_id = data.owner_open_id
- if data.enterprise_id:
- model.enterprise_id = data.enterprise_id
- if data.quota_id_list:
- model.quota_id_list = data.quota_id_list
- if data.quota_type:
- model.quota_type = data.quota_type
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询余额/点券失败: {result.msg}")
- return ExpenseQuotaQueryOutSchema(
- page_num=result.page_num or data.page_num,
- page_size=result.page_size or data.page_size,
- total_page_count=result.total_page_count or 0,
- )
- @classmethod
- async def modify_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema
- ) -> QuotaOperationOutSchema:
- """
- 修改余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.modify
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = data.quota_id
- model.action = data.action
- model.outer_source_id = data.outer_source_id
- model.enterprise_id = data.enterprise_id
- if data.amount is not None:
- model.amount = str(data.amount)
- if data.share_mode:
- model.share_mode = data.share_mode
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="修改余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"修改余额/点券失败: {result.msg}")
- return QuotaOperationOutSchema(
- out_biz_no=out_biz_no,
- quota_id=data.quota_id,
- result=result.success,
- )
- @classmethod
- async def delete_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema
- ) -> QuotaOperationOutSchema:
- """
- 删除额度
- 调用: alipay.ebpp.invoice.expensecontrol.quota.delete
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel()
- model.enterprise_id = data.enterprise_id
- if data.quota_id:
- model.quota_id = data.quota_id
- if data.issue_batch_id:
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="删除额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"删除额度失败: {result.msg}")
- await crud.delete(id=quota.id)
- return QuotaOperationOutSchema(out_biz_no=out_biz_no)
- # ========================
- # 手工批量发放额度
- # ========================
- @classmethod
- async def issue_batch_create_service(
- cls, auth: AuthSchema, data: IssueBatchCreateSchema
- ) -> IssueBatchCreateOutSchema:
- """
- 手工批量发放额度
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel,
- )
- from alipay.aop.api.domain.IssueTargetInfoContent import (
- IssueTargetInfoContent,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- # 本地检查批次号是否已存在,避免无效调用支付宝
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- existing_batch = await issue_batch_crud.get_by_batch_no(data.batch_no)
- if existing_batch:
- raise CustomException(msg=f"批次号 {data.batch_no} 已存在,请勿重复创建")
- except CustomException:
- raise
- except Exception:
- pass
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel()
- model.enterprise_id = data.enterprise_id
- model.issue_name = data.issue_name
- model.quota_type = data.quota_type
- model.effective_start_date = data.effective_start_date
- model.effective_end_date = data.effective_end_date
- model.institution_id = data.institution_id
- model.batch_no = data.batch_no
- model.share_mode = data.share_mode
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_target_info_list:
- target_list = []
- for item in data.issue_target_info_list:
- target = IssueTargetInfoContent()
- target.issue_quota = item.issue_quota
- if item.owner_open_id:
- target.owner_open_id = item.owner_open_id
- if item.owner_id:
- target.owner_id = item.owner_id
- if item.user_name:
- target.user_name = item.user_name
- if item.owner_type:
- target.owner_type = item.owner_type
- target_list.append(target)
- model.issue_target_info_list = target_list
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="手工批量发放额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"手工批量发放额度失败: {sub_msg or result.msg}")
- # 保存批次记录到本地
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- total_amount = Decimal("0")
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- try:
- total_amount += Decimal(item.issue_quota)
- except Exception:
- pass
- batch_data = {
- "issue_batch_id": result.issue_batch_id,
- "batch_no": data.batch_no,
- "institution_id": data.institution_id,
- "issue_name": data.issue_name,
- "quota_type": data.quota_type,
- "share_mode": data.share_mode,
- "total_count": len(data.issue_target_info_list or []),
- "total_amount": total_amount,
- "status": "ACTIVE",
- "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"),
- "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"),
- "issue_desc": data.issue_desc,
- }
- await issue_batch_crud.create(batch_data)
- except Exception as e:
- import traceback
- log.warning(f"保存发放批次记录失败(不影响发放), 可检查: 1)batch_no是否重复 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表(在更新本地记录前获取,用于过滤)
- failed_owner_ids: set[str] = set()
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_owner_ids = {
- str(getattr(f, 'owner_id', ''))
- for f in result.issue_quota_check_failed_list
- if getattr(f, 'owner_id', None)
- }
- # 为每个员工插入新的额度记录(每次发放独立记录,支持同一员工多次发放)
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import insert, delete as sa_delete
- # 清理同一制度下 quota_id 为空的陈旧记录(来自较早失败/未完成的批次)
- try:
- clean_stmt = sa_delete(QuotaModel).where(
- QuotaModel.institution_id == data.institution_id,
- QuotaModel.quota_id.is_(None),
- )
- del_result = await auth.db.execute(clean_stmt)
- if del_result.rowcount:
- log.info(f"手工发放 - 清理 {del_result.rowcount} 条陈旧记录(institution_id={data.institution_id})")
- await auth.db.flush()
- except Exception as e:
- log.warning(f"手工发放 - 清理陈旧记录失败(不影响发放): {e}")
- tenant_id = auth.user.tenant_id if auth.user else 1
- effective_start = datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S") if data.effective_start_date else None
- effective_end = datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S") if data.effective_end_date else None
- inserted_count = 0
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- emp_id = (item.owner_id or "")
- # 跳过支付宝校验失败的员工
- if emp_id in failed_owner_ids:
- log.warning(f"手工发放 - 跳过校验失败员工: owner_id={emp_id}")
- continue
- try:
- quota_amount = Decimal(item.issue_quota)
- except Exception:
- quota_amount = Decimal("0")
- # 每次都新增独立记录,不覆盖已有记录
- stmt = insert(QuotaModel).values(
- employee_id=emp_id,
- institution_id=data.institution_id,
- quota_type=data.quota_type,
- target_type="INSTITUTION",
- target_id=data.institution_id,
- out_biz_no=f"batch_{data.batch_no}_{emp_id}",
- total_amount=quota_amount,
- available_amount=quota_amount,
- status=QuotaStatusEnum.QUOTA_ACTIVE.value,
- valid_from=effective_start,
- valid_to=effective_end,
- enterprise_id=data.enterprise_id,
- tenant_id=tenant_id,
- )
- await auth.db.execute(stmt)
- inserted_count += 1
- await auth.db.flush()
- log.info(f"手工发放 - 新增 {inserted_count} 条额度记录(跳过 {len(failed_owner_ids)} 条校验失败)")
- # 查询支付宝端的发放记录,获取每个员工的 real quota_id 并更新本地记录
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- records_model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- records_model.enterprise_id = data.enterprise_id
- records_model.institution_id = data.institution_id
- records_model.issue_batch_id = result.issue_batch_id
- records_model.page_size = len(data.issue_target_info_list)
- records_model.page_num = 1
- records_request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- records_request.biz_model = records_model
- records_response = client.execute(records_request)
- if records_response:
- records_result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- records_result.parse_response_content(records_response)
- if records_result.is_success() and hasattr(records_result, 'issue_record_info_list') and records_result.issue_record_info_list:
- updated_quota_id_count = 0
- for r in records_result.issue_record_info_list:
- quota_id = getattr(r, 'quota_id', None)
- owner_id = getattr(r, 'owner_id', None)
- if quota_id and owner_id:
- # 按 out_biz_no 模式匹配,只更新本批次插入的记录
- q_upd = sa_update(QuotaModel).where(
- QuotaModel.out_biz_no == f"batch_{data.batch_no}_{owner_id}",
- QuotaModel.institution_id == data.institution_id,
- ).values(quota_id=quota_id)
- await auth.db.execute(q_upd)
- updated_quota_id_count += 1
- if updated_quota_id_count:
- await auth.db.flush()
- log.info(f"手工发放 - 已从支付宝查询并更新 {updated_quota_id_count} 条记录的 quota_id")
- except Exception as e:
- log.warning(f"查询支付宝发放记录获取 quota_id 失败(不影响发放): {e}")
- except Exception as e:
- import traceback
- log.warning(f"保存额度记录到本地失败(不影响发放), 可检查: 1)tenant_id/enterprise_id 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表
- failed_list = None
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_list = []
- for f in result.issue_quota_check_failed_list:
- failed_list.append(IssueQuotaCheckFailedItem(
- user_name=getattr(f, 'user_name', None),
- owner_type=getattr(f, 'owner_type', None),
- owner_id=getattr(f, 'owner_id', None),
- owner_open_id=getattr(f, 'owner_open_id', None),
- issue_quota=getattr(f, 'issue_quota', None),
- message=getattr(f, 'message', None),
- result=getattr(f, 'result', None),
- ))
- return IssueBatchCreateOutSchema(
- issue_batch_id=result.issue_batch_id,
- issue_quota_check_failed_list=failed_list,
- )
- @classmethod
- async def issue_batch_cancel_service(
- cls, auth: AuthSchema, data: IssueBatchCancelSchema
- ) -> IssueBatchCancelOutSchema:
- """
- 作废手工发放批次
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="作废手工发放批次失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"作废手工发放批次失败: {sub_msg or result.msg}")
- # 更新本地批次记录状态
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- if batch:
- setattr(batch, 'status', 'CANCELLED')
- await issue_batch_crud.update(batch.id, {"status": "CANCELLED"})
- except Exception as e:
- log.warning(f"更新批次本地状态失败(不影响作废): {e}")
- # 作废批次后,删除该批次创建的额度记录
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import delete as sa_delete
- # 从本地批次记录获取 batch_no
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- batch_no = batch.batch_no if batch else None
- if batch_no:
- # 按 out_biz_no 模式匹配: batch_{batch_no}_%
- pattern = f"batch_{batch_no}_%"
- del_stmt = sa_delete(QuotaModel).where(
- QuotaModel.out_biz_no.like(pattern),
- QuotaModel.institution_id == data.institution_id,
- )
- await auth.db.execute(del_stmt)
- await auth.db.flush()
- log.info(f"批次作废 - 已删除该批次创建的额度记录: batch_no={batch_no}")
- except Exception as e:
- log.warning(f"删除额度记录失败(不影响作废): {e}")
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- @classmethod
- async def issue_batch_records_query_service(
- cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema
- ) -> IssueBatchRecordsQueryOutSchema:
- """
- 查询手工发放发放明细
- 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- model.page_size = data.page_size
- model.page_num = data.page_num
- request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询手工发放明细失败: 无响应")
- result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询手工发放明细失败: {result.msg}")
- record_list = None
- if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list:
- record_list = []
- for r in result.issue_record_info_list:
- record_list.append(IssueRecordInfoItem(
- quota_id=getattr(r, 'quota_id', None),
- issue_quota=getattr(r, 'issue_quota', None),
- issue_status=getattr(r, 'issue_status', None),
- owner_type=getattr(r, 'owner_type', None),
- owner_id=getattr(r, 'owner_id', None),
- owner_open_id=getattr(r, 'owner_open_id', None),
- user_name=getattr(r, 'user_name', None),
- currency=getattr(r, 'currency', None),
- ))
- return IssueBatchRecordsQueryOutSchema(
- page_num=getattr(result, 'page_num', data.page_num),
- page_size=getattr(result, 'page_size', data.page_size),
- total_page_count=getattr(result, 'total_page_count', 0),
- issue_record_info_list=record_list,
- )
- @classmethod
- async def list_batch_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """分页查询手工发放批次列表(本地DB)"""
- crud = IssueBatchCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=IssueBatchListOutSchema,
- )
- @classmethod
- async def list_employee_quota_records_service(
- cls,
- auth: AuthSchema,
- employee_id: str,
- institution_id: str | None = None,
- ) -> list[dict]:
- """查询员工的额度记录列表"""
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import select
- where = [QuotaModel.employee_id == employee_id]
- if institution_id:
- where.append(QuotaModel.institution_id == institution_id)
- stmt = select(QuotaModel).where(*where).order_by(QuotaModel.created_time.desc())
- result = await auth.db.execute(stmt)
- quotas = result.scalars().all()
- return [
- {
- "quota_id": q.quota_id,
- "out_biz_no": q.out_biz_no,
- "total_amount": float(q.total_amount) if q.total_amount else 0,
- "available_amount": float(q.available_amount) if q.available_amount else 0,
- "quota_type": q.quota_type,
- "status": q.status,
- "valid_from": q.valid_from,
- "valid_to": q.valid_to,
- "created_time": q.created_time,
- "institution_id": q.institution_id,
- }
- for q in quotas
- ]
- @classmethod
- async def adjust_quota_service(
- cls, auth: AuthSchema, data: AdjustQuotaSchema
- ) -> dict:
- """调整额度金额 (调Alipay quota.modify + 记录变更日志)"""
- from .crud import QuotaChangeLogCRUD
- from .model import QuotaModel
- from sqlalchemy import select, update as sa_update
- # 查询当前额度记录(支持按 quota_id 或 id 查找)
- stmt = select(QuotaModel).where(QuotaModel.quota_id == data.quota_id)
- result = await auth.db.execute(stmt)
- quota = result.scalar_one_or_none()
- if not quota:
- # quota_id 为空时尝试按数据库 id 查找
- try:
- local_id = int(data.quota_id)
- stmt = select(QuotaModel).where(QuotaModel.id == local_id)
- result = await auth.db.execute(stmt)
- quota = result.scalar_one_or_none()
- except (ValueError, TypeError):
- pass
- if not quota:
- raise CustomException(msg="额度记录不存在")
- # 如果本地 quota_id 为空,尝试从 Alipay 获取真实 quota_id
- alipay_quota_id = quota.quota_id
- if not alipay_quota_id and quota.out_biz_no:
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- query_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- query_model.enterprise_id = quota.enterprise_id or data.enterprise_id
- query_model.owner_id = quota.employee_id
- query_model.owner_type = "ENTERPRISE_PAY_UID"
- query_model.page_num = 1
- query_model.page_size = 10
- query_request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- query_request.biz_model = query_model
- client = AlipayClient.get_client()
- query_response = client.execute(query_request)
- if query_response:
- query_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- query_result.parse_response_content(query_response)
- if query_result.is_success() and hasattr(query_result, 'quota_detail_info_list') and query_result.quota_detail_info_list:
- for q in query_result.quota_detail_info_list:
- qid = getattr(q, 'quota_id', None)
- if qid:
- alipay_quota_id = qid
- # 回写本地
- upd = sa_update(QuotaModel).where(QuotaModel.id == quota.id).values(quota_id=qid)
- await auth.db.execute(upd)
- await auth.db.flush()
- log.info(f"从支付宝回填 quota_id: id={quota.id}, quota_id={qid}")
- break
- except Exception as e:
- log.warning(f"从支付宝获取真实 quota_id 失败: {e}")
- if not alipay_quota_id:
- raise CustomException(msg="该额度暂未关联支付宝额度ID,无法调整(请等待支付宝同步后重试)")
- current_available = float(quota.available_amount) if quota.available_amount else 0
- diff = round(data.amount - current_available, 2)
- outer_source_id = str(get_snowflake_id())
- # 调Alipay quota.modify
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = alipay_quota_id
- model.action = "ADD" if diff >= 0 else "DEDUCT"
- model.outer_source_id = outer_source_id
- model.enterprise_id = data.enterprise_id
- model.amount = str(int(abs(diff) * 100))
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="调整额度失败: 无响应")
- mod_result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- mod_result.parse_response_content(response)
- if not mod_result.is_success():
- sub_msg = getattr(mod_result, 'sub_msg', '') or ''
- sub_code = getattr(mod_result, 'sub_code', '') or ''
- log.error(f"支付宝接口调用失败: {mod_result.code} - {mod_result.msg} (sub_code={sub_code}, sub_msg={sub_msg})")
- # 如果 Alipay 提示额度不存在,清除本地错误的 quota_id(可能是 issue_rule_id 误存),后续可重试查询回填
- if ('不存在' in sub_msg or 'INVALID' in sub_code) and quota:
- try:
- upd = sa_update(QuotaModel).where(QuotaModel.id == quota.id).values(quota_id=None)
- await auth.db.execute(upd)
- await auth.db.flush()
- log.info(f"调整额度 - 本地quota_id无效({data.quota_id})已清除,后续可从支付宝查询回填")
- except Exception as e:
- log.warning(f"清除本地无效quota_id失败: {e}")
- raise CustomException(msg=f"调整额度失败: {sub_msg or mod_result.msg}")
- # 更新本地额度记录
- new_available = data.amount
- new_total = float(quota.total_amount) if quota.total_amount else 0
- if diff > 0:
- new_total += diff
- else:
- new_total = max(0, new_total + diff)
- upd = sa_update(QuotaModel).where(
- QuotaModel.quota_id == data.quota_id
- ).values(
- total_amount=new_total,
- available_amount=new_available,
- )
- await auth.db.execute(upd)
- await auth.db.flush()
- # 记录变更日志
- try:
- log_crud = QuotaChangeLogCRUD(auth)
- await log_crud.create({
- "quota_id": data.quota_id,
- "employee_id": quota.employee_id or "",
- "institution_id": quota.institution_id or "",
- "change_type": "ADJUST",
- "coupon_name": quota.out_biz_no or "额度调整",
- "change_amount": diff,
- "before_amount": current_available,
- "after_amount": new_available,
- "change_desc": data.change_desc or "",
- "enterprise_id": data.enterprise_id,
- "tenant_id": auth.user.tenant_id if auth.user else 1,
- })
- except Exception as e:
- log.warning(f"记录变更日志失败(不影响调整): {e}")
- return {
- "quota_id": data.quota_id,
- "before_amount": current_available,
- "after_amount": new_available,
- "diff": diff,
- }
- @classmethod
- async def list_quota_changes_service(
- cls, auth: AuthSchema, quota_id: str
- ) -> list[dict]:
- """查询额度的变更记录"""
- from .crud import QuotaChangeLogCRUD
- crud = QuotaChangeLogCRUD(auth)
- logs = await crud.list(search={"quota_id": quota_id}, order_by=[{"id": "desc"}])
- return [
- {
- "coupon_name": log.coupon_name,
- "change_time": log.created_time,
- "change_amount": float(log.change_amount) if log.change_amount else 0,
- "change_desc": log.change_desc,
- "change_type": log.change_type,
- }
- for log in (logs or [])
- ]
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- crud = QuotaCRUD(auth)
- offset = (page_no - 1) * page_size
- result = await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=QuotaListOutSchema,
- )
- # 如果指定了 institution_id,同时查询支付宝端自动发放的额度
- if search and search.get("institution_id"):
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- alipay_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- # owner_type 为必填字段,查询制度下所有额度时使用通用类型
- alipay_model.owner_type = "ENTERPRISE_PAY_UID"
- alipay_model.target_type = "INSTITUTION"
- alipay_model.target_id = search["institution_id"]
- alipay_model.page_size = 100
- alipay_model.page_num = 1
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = alipay_model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if response:
- alipay_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- alipay_result.parse_response_content(response)
- if alipay_result.is_success() and hasattr(alipay_result, 'quota_detail_info_list') and alipay_result.quota_detail_info_list:
- # 将支付宝端额度合并到结果中(去重)
- existing_ids = {item.get("quota_id") for item in result.get("items", []) if item.get("quota_id")}
- for q in alipay_result.quota_detail_info_list:
- qid = getattr(q, 'quota_id', None)
- if qid and qid not in existing_ids:
- result["items"].append({
- "quota_id": qid,
- "target_type": getattr(q, 'target_type', None),
- "target_id": getattr(q, 'target_id', None),
- "quota_type": getattr(q, 'quota_type', None),
- "employee_id": getattr(q, 'owner_id', None),
- "total_amount": getattr(q, 'total_amount', None),
- "available_amount": getattr(q, 'available_amount', None),
- "status": getattr(q, 'status', "QUOTA_ACTIVE"),
- "created_time": getattr(q, 'effective_start_date', None),
- })
- existing_ids.add(qid)
- result["total"] = len(result["items"])
- except Exception as e:
- log.warning(f"查询支付宝端额度失败(不影响本地数据): {e}")
- return result
|