| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979 |
- from datetime import datetime
- from decimal import Decimal
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id
- from .enums import QuotaStatusEnum
- from .schema import (
- AdjustQuotaSchema,
- ExpenseQuotaCreateSchema,
- ExpenseQuotaDeleteSchema,
- ExpenseQuotaModifySchema,
- ExpenseQuotaQueryOutSchema,
- ExpenseQuotaQuerySchema,
- IssueBatchCancelOutSchema,
- IssueBatchCancelSchema,
- IssueBatchCreateOutSchema,
- IssueBatchCreateSchema,
- IssueBatchListOutSchema,
- IssueBatchRecordsQueryOutSchema,
- IssueBatchRecordsQuerySchema,
- IssueQuotaCheckFailedItem,
- IssueRecordInfoItem,
- QuotaCreateSchema,
- QuotaDetailInfoSchema,
- QuotaListOutSchema,
- QuotaOperationOutSchema,
- QuotaOutSchema,
- QuotaUpdateSchema,
- )
- from .crud import IssueBatchCRUD, QuotaCRUD
- from .model import IssueBatchModel
- class QuotaService:
- """额度服务层"""
- @classmethod
- async def create_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """
- 创建余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.create
- """
- crud = QuotaCRUD(auth)
- out_biz_no = data.outer_source_id or str(get_snowflake_id())
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateModel,
- )
- from alipay.aop.api.domain.IssueQuotaTarget import (
- IssueQuotaTarget,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel()
- model.target_type = data.target_type
- model.target_id = data.target_id
- model.enterprise_id = data.enterprise_id
- model.outer_source_id = out_biz_no
- model.quota_type = data.quota_type or "CAP"
- model.share_mode = data.share_mode or "0"
- if data.effective_start_date:
- model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.effective_end_date:
- model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.issue_name:
- model.issue_name = data.issue_name
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_quota_target_list:
- target_list = []
- for item in data.issue_quota_target_list:
- target = IssueQuotaTarget()
- target.owner_type = item.owner_type
- target.owner_id = item.owner_id
- target.quota = item.quota
- if item.amount is not None:
- target.amount = item.amount
- target_list.append(target)
- model.issue_quota_target_list = target_list
- request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="创建余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建余额/点券失败: {result.msg}")
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if result.quota_id:
- quota_data["quota_id"] = result.quota_id
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id)
- @classmethod
- async def create_quota_service(
- cls, auth: AuthSchema, data: QuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """创建额度"""
- crud = QuotaCRUD(auth)
- out_biz_no = str(get_snowflake_id())
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if quota_data.get("available_amount") is None:
- quota_data["available_amount"] = quota_data.get("total_amount", 0)
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id)
- @classmethod
- async def query_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema
- ) -> ExpenseQuotaQueryOutSchema:
- """
- 查询余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.query
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- model.owner_type = data.owner_type
- model.page_size = data.page_size
- model.page_num = data.page_num
- if data.target_type:
- model.target_type = data.target_type
- if data.target_id:
- model.target_id = data.target_id
- if data.owner_id:
- model.owner_id = data.owner_id
- if data.owner_open_id:
- model.owner_open_id = data.owner_open_id
- if data.enterprise_id:
- model.enterprise_id = data.enterprise_id
- if data.quota_id_list:
- model.quota_id_list = data.quota_id_list
- if data.quota_type:
- model.quota_type = data.quota_type
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询余额/点券失败: {result.msg}")
- return ExpenseQuotaQueryOutSchema(
- page_num=result.page_num or data.page_num,
- page_size=result.page_size or data.page_size,
- total_page_count=result.total_page_count or 0,
- )
- @classmethod
- async def modify_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema
- ) -> QuotaOperationOutSchema:
- """
- 修改余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.modify
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = data.quota_id
- model.action = data.action
- model.outer_source_id = data.outer_source_id
- model.enterprise_id = data.enterprise_id
- if data.amount is not None:
- model.amount = str(data.amount)
- if data.share_mode:
- model.share_mode = data.share_mode
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="修改余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"修改余额/点券失败: {result.msg}")
- return QuotaOperationOutSchema(
- out_biz_no=out_biz_no,
- quota_id=data.quota_id,
- result=result.success,
- )
- @classmethod
- async def delete_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema
- ) -> QuotaOperationOutSchema:
- """
- 删除额度
- 调用: alipay.ebpp.invoice.expensecontrol.quota.delete
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel()
- model.enterprise_id = data.enterprise_id
- if data.quota_id:
- model.quota_id = data.quota_id
- if data.issue_batch_id:
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="删除额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"删除额度失败: {result.msg}")
- await crud.delete(id=quota.id)
- return QuotaOperationOutSchema(out_biz_no=out_biz_no)
- # ========================
- # 手工批量发放额度
- # ========================
- @classmethod
- async def issue_batch_create_service(
- cls, auth: AuthSchema, data: IssueBatchCreateSchema
- ) -> IssueBatchCreateOutSchema:
- """
- 手工批量发放额度
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel,
- )
- from alipay.aop.api.domain.IssueTargetInfoContent import (
- IssueTargetInfoContent,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- # 本地检查批次号是否已存在,避免无效调用支付宝
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- existing_batch = await issue_batch_crud.get_by_batch_no(data.batch_no)
- if existing_batch:
- raise CustomException(msg=f"批次号 {data.batch_no} 已存在,请勿重复创建")
- except CustomException:
- raise
- except Exception:
- pass
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel()
- model.enterprise_id = data.enterprise_id
- model.issue_name = data.issue_name
- model.quota_type = data.quota_type
- model.effective_start_date = data.effective_start_date
- model.effective_end_date = data.effective_end_date
- model.institution_id = data.institution_id
- model.batch_no = data.batch_no
- model.share_mode = data.share_mode
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_target_info_list:
- target_list = []
- for item in data.issue_target_info_list:
- target = IssueTargetInfoContent()
- target.issue_quota = item.issue_quota
- if item.owner_open_id:
- target.owner_open_id = item.owner_open_id
- if item.owner_id:
- target.owner_id = item.owner_id
- if item.user_name:
- target.user_name = item.user_name
- if item.owner_type:
- target.owner_type = item.owner_type
- target_list.append(target)
- model.issue_target_info_list = target_list
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="手工批量发放额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"手工批量发放额度失败: {sub_msg or result.msg}")
- # 保存批次记录到本地
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- total_amount = Decimal("0")
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- try:
- total_amount += Decimal(item.issue_quota)
- except Exception:
- pass
- batch_data = {
- "issue_batch_id": result.issue_batch_id,
- "batch_no": data.batch_no,
- "institution_id": data.institution_id,
- "issue_name": data.issue_name,
- "quota_type": data.quota_type,
- "share_mode": data.share_mode,
- "total_count": len(data.issue_target_info_list or []),
- "total_amount": total_amount,
- "status": "ACTIVE",
- "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"),
- "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"),
- "issue_desc": data.issue_desc,
- }
- await issue_batch_crud.create(batch_data)
- except Exception as e:
- import traceback
- log.warning(f"保存发放批次记录失败(不影响发放), 可检查: 1)batch_no是否重复 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表(在更新本地记录前获取,用于过滤)
- failed_owner_ids: set[str] = set()
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_owner_ids = {
- str(getattr(f, 'owner_id', ''))
- for f in result.issue_quota_check_failed_list
- if getattr(f, 'owner_id', None)
- }
- # 为每个员工插入新的额度记录(每次发放独立记录,支持同一员工多次发放)
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from app.plugin.module_payment.expense.quota.enums import QuotaStatusEnum
- from sqlalchemy import insert, delete as sa_delete
- # 清理同一制度下 quota_id 为空的陈旧记录(来自较早失败/未完成的批次)
- try:
- clean_stmt = sa_delete(QuotaModel).where(
- QuotaModel.institution_id == data.institution_id,
- QuotaModel.quota_id.is_(None),
- )
- del_result = await auth.db.execute(clean_stmt)
- if del_result.rowcount:
- log.info(f"手工发放 - 清理 {del_result.rowcount} 条陈旧记录(institution_id={data.institution_id})")
- await auth.db.flush()
- except Exception as e:
- log.warning(f"手工发放 - 清理陈旧记录失败(不影响发放): {e}")
- tenant_id = auth.user.tenant_id if auth.user else 1
- effective_start = datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S") if data.effective_start_date else None
- effective_end = datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S") if data.effective_end_date else None
- inserted_count = 0
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- emp_id = (item.owner_id or "")
- # 跳过支付宝校验失败的员工
- if emp_id in failed_owner_ids:
- log.warning(f"手工发放 - 跳过校验失败员工: owner_id={emp_id}")
- continue
- try:
- quota_amount = Decimal(item.issue_quota)
- except Exception:
- quota_amount = Decimal("0")
- # 每次都新增独立记录,不覆盖已有记录
- out_biz_no = f"batch_{data.batch_no}_{emp_id}"
- stmt = insert(QuotaModel).values(
- employee_id=emp_id,
- institution_id=data.institution_id,
- quota_type=data.quota_type,
- target_type="INSTITUTION",
- target_id=data.institution_id,
- out_biz_no=out_biz_no,
- quota_id=out_biz_no, # 先用 out_biz_no 作初始值,后续支付宝回调更新
- total_amount=quota_amount,
- available_amount=quota_amount,
- status=QuotaStatusEnum.QUOTA_ACTIVE.value,
- valid_from=effective_start,
- valid_to=effective_end,
- enterprise_id=data.enterprise_id,
- tenant_id=tenant_id,
- )
- await auth.db.execute(stmt)
- inserted_count += 1
- await auth.db.flush()
- log.info(f"手工发放 - 新增 {inserted_count} 条额度记录(跳过 {len(failed_owner_ids)} 条校验失败)")
- # 查询支付宝端的发放记录,获取每个员工的 real quota_id 并更新本地记录
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- records_model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- records_model.enterprise_id = data.enterprise_id
- records_model.institution_id = data.institution_id
- records_model.issue_batch_id = result.issue_batch_id
- records_model.page_size = len(data.issue_target_info_list)
- records_model.page_num = 1
- records_request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- records_request.biz_model = records_model
- records_response = client.execute(records_request)
- if records_response:
- records_result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- records_result.parse_response_content(records_response)
- if records_result.is_success() and hasattr(records_result, 'issue_record_info_list') and records_result.issue_record_info_list:
- updated_quota_id_count = 0
- for r in records_result.issue_record_info_list:
- quota_id = getattr(r, 'quota_id', None)
- owner_id = getattr(r, 'owner_id', None)
- if quota_id and owner_id:
- # 按 out_biz_no 模式匹配,只更新本批次插入的记录
- q_upd = sa_update(QuotaModel).where(
- QuotaModel.out_biz_no == f"batch_{data.batch_no}_{owner_id}",
- QuotaModel.institution_id == data.institution_id,
- ).values(quota_id=quota_id)
- await auth.db.execute(q_upd)
- updated_quota_id_count += 1
- if updated_quota_id_count:
- await auth.db.flush()
- log.info(f"手工发放 - 已从支付宝查询并更新 {updated_quota_id_count} 条记录的 quota_id")
- except Exception as e:
- log.warning(f"查询支付宝发放记录获取 quota_id 失败(不影响发放): {e}")
- except Exception as e:
- import traceback
- log.warning(f"保存额度记录到本地失败(不影响发放), 可检查: 1)tenant_id/enterprise_id 2)字段长度\n异常: {e}\n{traceback.format_exc()}")
- # 组装校验失败列表
- failed_list = None
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_list = []
- for f in result.issue_quota_check_failed_list:
- failed_list.append(IssueQuotaCheckFailedItem(
- user_name=getattr(f, 'user_name', None),
- owner_type=getattr(f, 'owner_type', None),
- owner_id=getattr(f, 'owner_id', None),
- owner_open_id=getattr(f, 'owner_open_id', None),
- issue_quota=getattr(f, 'issue_quota', None),
- message=getattr(f, 'message', None),
- result=getattr(f, 'result', None),
- ))
- return IssueBatchCreateOutSchema(
- issue_batch_id=result.issue_batch_id,
- issue_quota_check_failed_list=failed_list,
- )
- @classmethod
- async def issue_batch_cancel_service(
- cls, auth: AuthSchema, data: IssueBatchCancelSchema
- ) -> IssueBatchCancelOutSchema:
- """
- 作废手工发放批次
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="作废手工发放批次失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse()
- result.parse_response_content(response)
- if not result.is_success():
- sub_msg = getattr(result, 'sub_msg', '') or ''
- err_detail = f"{result.msg}" + (f" - {sub_msg}" if sub_msg else "")
- log.error(f"支付宝接口调用失败: {result.code} - {err_detail}")
- raise CustomException(msg=f"作废手工发放批次失败: {sub_msg or result.msg}")
- # 更新本地批次记录状态
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- if batch:
- setattr(batch, 'status', 'CANCELLED')
- await issue_batch_crud.update(batch.id, {"status": "CANCELLED"})
- except Exception as e:
- log.warning(f"更新批次本地状态失败(不影响作废): {e}")
- # 作废批次后,删除该批次创建的额度记录
- try:
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import delete as sa_delete
- # 从本地批次记录获取 batch_no
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- batch_no = batch.batch_no if batch else None
- if batch_no:
- # 按 out_biz_no 模式匹配: batch_{batch_no}_%
- pattern = f"batch_{batch_no}_%"
- del_stmt = sa_delete(QuotaModel).where(
- QuotaModel.out_biz_no.like(pattern),
- QuotaModel.institution_id == data.institution_id,
- )
- await auth.db.execute(del_stmt)
- await auth.db.flush()
- log.info(f"批次作废 - 已删除该批次创建的额度记录: batch_no={batch_no}")
- except Exception as e:
- log.warning(f"删除额度记录失败(不影响作废): {e}")
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- @classmethod
- async def issue_batch_records_query_service(
- cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema
- ) -> IssueBatchRecordsQueryOutSchema:
- """
- 查询手工发放发放明细
- 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- model.page_size = data.page_size
- model.page_num = data.page_num
- request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询手工发放明细失败: 无响应")
- result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询手工发放明细失败: {result.msg}")
- record_list = None
- if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list:
- record_list = []
- for r in result.issue_record_info_list:
- record_list.append(IssueRecordInfoItem(
- quota_id=getattr(r, 'quota_id', None),
- issue_quota=getattr(r, 'issue_quota', None),
- issue_status=getattr(r, 'issue_status', None),
- owner_type=getattr(r, 'owner_type', None),
- owner_id=getattr(r, 'owner_id', None),
- owner_open_id=getattr(r, 'owner_open_id', None),
- user_name=getattr(r, 'user_name', None),
- currency=getattr(r, 'currency', None),
- ))
- return IssueBatchRecordsQueryOutSchema(
- page_num=getattr(result, 'page_num', data.page_num),
- page_size=getattr(result, 'page_size', data.page_size),
- total_page_count=getattr(result, 'total_page_count', 0),
- issue_record_info_list=record_list,
- )
- @classmethod
- async def list_batch_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """分页查询手工发放批次列表(本地DB)"""
- crud = IssueBatchCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=IssueBatchListOutSchema,
- )
- @classmethod
- async def list_employee_quota_records_service(
- cls,
- auth: AuthSchema,
- employee_id: str,
- institution_id: str | None = None,
- ) -> list[dict]:
- """查询员工的额度记录列表"""
- from app.plugin.module_payment.expense.quota.model import QuotaModel
- from sqlalchemy import select
- where = [QuotaModel.employee_id == employee_id]
- if institution_id:
- where.append(QuotaModel.institution_id == institution_id)
- stmt = select(QuotaModel).where(*where).order_by(QuotaModel.created_time.desc())
- result = await auth.db.execute(stmt)
- quotas = result.scalars().all()
- return [
- {
- "quota_id": q.quota_id,
- "out_biz_no": q.out_biz_no,
- "total_amount": float(q.total_amount) if q.total_amount else 0,
- "available_amount": float(q.available_amount) if q.available_amount else 0,
- "quota_type": q.quota_type,
- "status": q.status,
- "valid_from": q.valid_from,
- "valid_to": q.valid_to,
- "created_time": q.created_time,
- "institution_id": q.institution_id,
- }
- for q in quotas
- ]
- @classmethod
- async def adjust_quota_service(
- cls, auth: AuthSchema, data: AdjustQuotaSchema
- ) -> dict:
- """调整额度金额 (调Alipay quota.modify + 记录变更日志)"""
- from .crud import QuotaChangeLogCRUD
- from .model import QuotaModel
- from sqlalchemy import select, update as sa_update
- # 查询当前额度记录
- stmt = select(QuotaModel).where(QuotaModel.quota_id == data.quota_id)
- result = await auth.db.execute(stmt)
- quota = result.scalar_one_or_none()
- if not quota:
- raise CustomException(msg="额度记录不存在")
- current_available = float(quota.available_amount) if quota.available_amount else 0
- diff = round(data.amount - current_available, 2)
- outer_source_id = str(get_snowflake_id())
- # 调Alipay quota.modify
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = data.quota_id
- model.action = "ADD" if diff >= 0 else "DEDUCT"
- model.outer_source_id = outer_source_id
- model.enterprise_id = data.enterprise_id
- model.amount = str(int(abs(diff) * 100))
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="调整额度失败: 无响应")
- mod_result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- mod_result.parse_response_content(response)
- if not mod_result.is_success():
- sub_msg = getattr(mod_result, 'sub_msg', '') or ''
- sub_code = getattr(mod_result, 'sub_code', '') or ''
- log.error(f"支付宝接口调用失败: {mod_result.code} - {mod_result.msg} (sub_code={sub_code}, sub_msg={sub_msg})")
- # 如果 Alipay 提示额度不存在,清理本地陈旧记录
- if '不存在' in sub_msg or 'INVALID' in sub_code:
- try:
- from sqlalchemy import delete as sa_delete
- del_stmt = sa_delete(QuotaModel).where(QuotaModel.quota_id == data.quota_id)
- await auth.db.execute(del_stmt)
- await auth.db.flush()
- log.info(f"调整额度 - 支付宝侧额度不存在,已清理本地陈旧记录: quota_id={data.quota_id}")
- except Exception as e:
- log.warning(f"清理本地陈旧记录失败: {e}")
- raise CustomException(msg=f"调整额度失败: {sub_msg or mod_result.msg}")
- # 更新本地额度记录
- new_available = data.amount
- new_total = float(quota.total_amount) if quota.total_amount else 0
- if diff > 0:
- new_total += diff
- else:
- new_total = max(0, new_total + diff)
- upd = sa_update(QuotaModel).where(
- QuotaModel.quota_id == data.quota_id
- ).values(
- total_amount=new_total,
- available_amount=new_available,
- )
- await auth.db.execute(upd)
- await auth.db.flush()
- # 记录变更日志
- try:
- log_crud = QuotaChangeLogCRUD(auth)
- await log_crud.create({
- "quota_id": data.quota_id,
- "employee_id": quota.employee_id or "",
- "institution_id": quota.institution_id or "",
- "change_type": "ADJUST",
- "coupon_name": quota.out_biz_no or "额度调整",
- "change_amount": diff,
- "before_amount": current_available,
- "after_amount": new_available,
- "change_desc": data.change_desc or "",
- "enterprise_id": data.enterprise_id,
- "tenant_id": auth.user.tenant_id if auth.user else 1,
- })
- except Exception as e:
- log.warning(f"记录变更日志失败(不影响调整): {e}")
- return {
- "quota_id": data.quota_id,
- "before_amount": current_available,
- "after_amount": new_available,
- "diff": diff,
- }
- @classmethod
- async def list_quota_changes_service(
- cls, auth: AuthSchema, quota_id: str
- ) -> list[dict]:
- """查询额度的变更记录"""
- from .crud import QuotaChangeLogCRUD
- crud = QuotaChangeLogCRUD(auth)
- logs = await crud.list(search={"quota_id": quota_id}, order_by=[{"id": "desc"}])
- return [
- {
- "coupon_name": log.coupon_name,
- "change_time": log.created_time,
- "change_amount": float(log.change_amount) if log.change_amount else 0,
- "change_desc": log.change_desc,
- "change_type": log.change_type,
- }
- for log in (logs or [])
- ]
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- crud = QuotaCRUD(auth)
- offset = (page_no - 1) * page_size
- result = await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=QuotaListOutSchema,
- )
- # 如果指定了 institution_id,同时查询支付宝端自动发放的额度
- if search and search.get("institution_id"):
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- alipay_model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- # owner_type 为必填字段,查询制度下所有额度时使用通用类型
- alipay_model.owner_type = "ENTERPRISE_PAY_UID"
- alipay_model.target_type = "INSTITUTION"
- alipay_model.target_id = search["institution_id"]
- alipay_model.page_size = 100
- alipay_model.page_num = 1
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = alipay_model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if response:
- alipay_result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- alipay_result.parse_response_content(response)
- if alipay_result.is_success() and hasattr(alipay_result, 'quota_detail_info_list') and alipay_result.quota_detail_info_list:
- # 将支付宝端额度合并到结果中(去重)
- existing_ids = {item.get("quota_id") for item in result.get("items", []) if item.get("quota_id")}
- for q in alipay_result.quota_detail_info_list:
- qid = getattr(q, 'quota_id', None)
- if qid and qid not in existing_ids:
- result["items"].append({
- "quota_id": qid,
- "target_type": getattr(q, 'target_type', None),
- "target_id": getattr(q, 'target_id', None),
- "quota_type": getattr(q, 'quota_type', None),
- "employee_id": getattr(q, 'owner_id', None),
- "total_amount": getattr(q, 'total_amount', None),
- "available_amount": getattr(q, 'available_amount', None),
- "status": getattr(q, 'status', "QUOTA_ACTIVE"),
- "created_time": getattr(q, 'effective_start_date', None),
- })
- existing_ids.add(qid)
- result["total"] = len(result["items"])
- except Exception as e:
- log.warning(f"查询支付宝端额度失败(不影响本地数据): {e}")
- return result
|