| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- from datetime import datetime
- from decimal import Decimal
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.alipay import AlipayClient
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.snowflake import get_snowflake_id
- from .enums import QuotaStatusEnum
- from .schema import (
- ExpenseQuotaCreateSchema,
- ExpenseQuotaDeleteSchema,
- ExpenseQuotaModifySchema,
- ExpenseQuotaQueryOutSchema,
- ExpenseQuotaQuerySchema,
- IssueBatchCancelOutSchema,
- IssueBatchCancelSchema,
- IssueBatchCreateOutSchema,
- IssueBatchCreateSchema,
- IssueBatchListOutSchema,
- IssueBatchRecordsQueryOutSchema,
- IssueBatchRecordsQuerySchema,
- IssueQuotaCheckFailedItem,
- IssueRecordInfoItem,
- QuotaCreateSchema,
- QuotaDetailInfoSchema,
- QuotaListOutSchema,
- QuotaOperationOutSchema,
- QuotaOutSchema,
- QuotaUpdateSchema,
- )
- from .crud import IssueBatchCRUD, QuotaCRUD
- from .model import IssueBatchModel
- class QuotaService:
- """额度服务层"""
- @classmethod
- async def create_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """
- 创建余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.create
- """
- crud = QuotaCRUD(auth)
- out_biz_no = data.outer_source_id or str(get_snowflake_id())
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaCreateRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaCreateModel import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateModel,
- )
- from alipay.aop.api.domain.IssueQuotaTarget import (
- IssueQuotaTarget,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaCreateResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaCreateModel()
- model.target_type = data.target_type
- model.target_id = data.target_id
- model.enterprise_id = data.enterprise_id
- model.outer_source_id = out_biz_no
- model.quota_type = data.quota_type or "CAP"
- model.share_mode = data.share_mode or "0"
- if data.effective_start_date:
- model.effective_start_date = data.effective_start_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.effective_end_date:
- model.effective_end_date = data.effective_end_date.strftime("%Y-%m-%d %H:%M:%S")
- if data.issue_name:
- model.issue_name = data.issue_name
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_quota_target_list:
- target_list = []
- for item in data.issue_quota_target_list:
- target = IssueQuotaTarget()
- target.owner_type = item.owner_type
- target.owner_id = item.owner_id
- target.quota = item.quota
- if item.amount is not None:
- target.amount = item.amount
- target_list.append(target)
- model.issue_quota_target_list = target_list
- request = AlipayEbppInvoiceExpensecontrolQuotaCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="创建余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"创建余额/点券失败: {result.msg}")
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if result.quota_id:
- quota_data["quota_id"] = result.quota_id
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=result.quota_id)
- @classmethod
- async def create_quota_service(
- cls, auth: AuthSchema, data: QuotaCreateSchema
- ) -> QuotaOperationOutSchema:
- """创建额度"""
- crud = QuotaCRUD(auth)
- out_biz_no = str(get_snowflake_id())
- quota_data = data.model_dump(exclude_none=True)
- quota_data["out_biz_no"] = out_biz_no
- quota_data["status"] = QuotaStatusEnum.QUOTA_ACTIVE.value
- if quota_data.get("available_amount") is None:
- quota_data["available_amount"] = quota_data.get("total_amount", 0)
- quota = await crud.create(quota_data)
- if not quota:
- raise CustomException(msg="创建额度记录失败")
- return QuotaOperationOutSchema(out_biz_no=out_biz_no, quota_id=quota.quota_id)
- @classmethod
- async def query_expense_quota_service(
- cls, auth: AuthSchema, data: ExpenseQuotaQuerySchema
- ) -> ExpenseQuotaQueryOutSchema:
- """
- 查询余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.query
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaQueryRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaQueryModel import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaQueryResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaQueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaQueryModel()
- model.owner_type = data.owner_type
- model.page_size = data.page_size
- model.page_num = data.page_num
- if data.target_type:
- model.target_type = data.target_type
- if data.target_id:
- model.target_id = data.target_id
- if data.owner_id:
- model.owner_id = data.owner_id
- if data.owner_open_id:
- model.owner_open_id = data.owner_open_id
- if data.enterprise_id:
- model.enterprise_id = data.enterprise_id
- if data.quota_id_list:
- model.quota_id_list = data.quota_id_list
- if data.quota_type:
- model.quota_type = data.quota_type
- request = AlipayEbppInvoiceExpensecontrolQuotaQueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaQueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询余额/点券失败: {result.msg}")
- return ExpenseQuotaQueryOutSchema(
- page_num=result.page_num or data.page_num,
- page_size=result.page_size or data.page_size,
- total_page_count=result.total_page_count or 0,
- )
- @classmethod
- async def modify_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaModifySchema
- ) -> QuotaOperationOutSchema:
- """
- 修改余额/点券
- 调用: alipay.ebpp.invoice.expensecontrol.quota.modify
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaModifyRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaModifyModel import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaModifyResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaModifyResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaModifyModel()
- model.quota_id = data.quota_id
- model.action = data.action
- model.outer_source_id = data.outer_source_id
- model.enterprise_id = data.enterprise_id
- if data.amount is not None:
- model.amount = str(data.amount)
- if data.share_mode:
- model.share_mode = data.share_mode
- request = AlipayEbppInvoiceExpensecontrolQuotaModifyRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="修改余额/点券失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaModifyResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"修改余额/点券失败: {result.msg}")
- return QuotaOperationOutSchema(
- out_biz_no=out_biz_no,
- quota_id=data.quota_id,
- result=result.success,
- )
- @classmethod
- async def delete_expense_quota_service(
- cls, auth: AuthSchema, out_biz_no: str, data: ExpenseQuotaDeleteSchema
- ) -> QuotaOperationOutSchema:
- """
- 删除额度
- 调用: alipay.ebpp.invoice.expensecontrol.quota.delete
- """
- crud = QuotaCRUD(auth)
- quota = await crud.get_by_out_biz_no(out_biz_no)
- if not quota:
- raise CustomException(msg="额度不存在")
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolQuotaDeleteModel import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse import (
- AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolQuotaDeleteModel()
- model.enterprise_id = data.enterprise_id
- if data.quota_id:
- model.quota_id = data.quota_id
- if data.issue_batch_id:
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolQuotaDeleteRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="删除额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolQuotaDeleteResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"删除额度失败: {result.msg}")
- await crud.delete(id=quota.id)
- return QuotaOperationOutSchema(out_biz_no=out_biz_no)
- # ========================
- # 手工批量发放额度
- # ========================
- @classmethod
- async def issue_batch_create_service(
- cls, auth: AuthSchema, data: IssueBatchCreateSchema
- ) -> IssueBatchCreateOutSchema:
- """
- 手工批量发放额度
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.create
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel,
- )
- from alipay.aop.api.domain.IssueTargetInfoContent import (
- IssueTargetInfoContent,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCreateModel()
- model.enterprise_id = data.enterprise_id
- model.issue_name = data.issue_name
- model.quota_type = data.quota_type
- model.effective_start_date = data.effective_start_date
- model.effective_end_date = data.effective_end_date
- model.institution_id = data.institution_id
- model.batch_no = data.batch_no
- model.share_mode = data.share_mode
- if data.issue_desc:
- model.issue_desc = data.issue_desc
- if data.issue_target_info_list:
- target_list = []
- for item in data.issue_target_info_list:
- target = IssueTargetInfoContent()
- target.issue_quota = item.issue_quota
- if item.owner_open_id:
- target.owner_open_id = item.owner_open_id
- if item.owner_id:
- target.owner_id = item.owner_id
- if item.user_name:
- target.user_name = item.user_name
- if item.owner_type:
- target.owner_type = item.owner_type
- target_list.append(target)
- model.issue_target_info_list = target_list
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCreateRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="手工批量发放额度失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCreateResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"手工批量发放额度失败: {result.msg}")
- # 保存批次记录到本地
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- total_amount = Decimal("0")
- if data.issue_target_info_list:
- for item in data.issue_target_info_list:
- try:
- total_amount += Decimal(item.issue_quota)
- except Exception:
- pass
- batch_data = {
- "issue_batch_id": result.issue_batch_id,
- "batch_no": data.batch_no,
- "institution_id": data.institution_id,
- "issue_name": data.issue_name,
- "quota_type": data.quota_type,
- "share_mode": data.share_mode,
- "total_count": len(data.issue_target_info_list or []),
- "total_amount": total_amount,
- "status": "ACTIVE",
- "effective_start_date": datetime.strptime(data.effective_start_date, "%Y-%m-%d %H:%M:%S"),
- "effective_end_date": datetime.strptime(data.effective_end_date, "%Y-%m-%d %H:%M:%S"),
- "issue_desc": data.issue_desc,
- }
- await issue_batch_crud.create(batch_data)
- except Exception as e:
- log.warning(f"保存发放批次记录失败(不影响发放): {e}")
- # 组装校验失败列表
- failed_list = None
- if hasattr(result, 'issue_quota_check_failed_list') and result.issue_quota_check_failed_list:
- failed_list = []
- for f in result.issue_quota_check_failed_list:
- failed_list.append(IssueQuotaCheckFailedItem(
- user_name=getattr(f, 'user_name', None),
- owner_type=getattr(f, 'owner_type', None),
- owner_id=getattr(f, 'owner_id', None),
- owner_open_id=getattr(f, 'owner_open_id', None),
- issue_quota=getattr(f, 'issue_quota', None),
- message=getattr(f, 'message', None),
- result=getattr(f, 'result', None),
- ))
- return IssueBatchCreateOutSchema(
- issue_batch_id=result.issue_batch_id,
- issue_quota_check_failed_list=failed_list,
- )
- @classmethod
- async def issue_batch_cancel_service(
- cls, auth: AuthSchema, data: IssueBatchCancelSchema
- ) -> IssueBatchCancelOutSchema:
- """
- 作废手工发放批次
- 调用: alipay.ebpp.invoice.expensecontrol.issuebatch.cancel
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse import (
- AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceExpensecontrolIssuebatchCancelModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- request = AlipayEbppInvoiceExpensecontrolIssuebatchCancelRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="作废手工发放批次失败: 无响应")
- result = AlipayEbppInvoiceExpensecontrolIssuebatchCancelResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"作废手工发放批次失败: {result.msg}")
- # 更新本地批次记录状态
- try:
- issue_batch_crud = IssueBatchCRUD(auth)
- batch = await issue_batch_crud.get_by_issue_batch_id(data.issue_batch_id)
- if batch:
- setattr(batch, 'status', 'CANCELLED')
- await issue_batch_crud.update(batch.id, {"status": "CANCELLED"})
- except Exception as e:
- log.warning(f"更新批次本地状态失败(不影响作废): {e}")
- return IssueBatchCancelOutSchema(
- result=getattr(result, 'result', False),
- )
- @classmethod
- async def issue_batch_records_query_service(
- cls, auth: AuthSchema, data: IssueBatchRecordsQuerySchema
- ) -> IssueBatchRecordsQueryOutSchema:
- """
- 查询手工发放发放明细
- 调用: alipay.ebpp.invoice.issuebatch.issuerecords.batchquery
- """
- try:
- from alipay.aop.api.request.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest,
- )
- from alipay.aop.api.domain.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel,
- )
- from alipay.aop.api.response.AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse import (
- AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse,
- )
- except ImportError:
- raise CustomException(msg="支付宝SDK未正确安装,请检查alipay-sdk-python依赖")
- model = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryModel()
- model.enterprise_id = data.enterprise_id
- model.institution_id = data.institution_id
- model.issue_batch_id = data.issue_batch_id
- model.page_size = data.page_size
- model.page_num = data.page_num
- request = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryRequest()
- request.biz_model = model
- client = AlipayClient.get_client()
- response = client.execute(request)
- if not response:
- raise CustomException(msg="查询手工发放明细失败: 无响应")
- result = AlipayEbppInvoiceIssuebatchIssuerecordsBatchqueryResponse()
- result.parse_response_content(response)
- if not result.is_success():
- log.error(f"支付宝接口调用失败: {result.code} - {result.msg}")
- raise CustomException(msg=f"查询手工发放明细失败: {result.msg}")
- record_list = None
- if hasattr(result, 'issue_record_info_list') and result.issue_record_info_list:
- record_list = []
- for r in result.issue_record_info_list:
- record_list.append(IssueRecordInfoItem(
- quota_id=getattr(r, 'quota_id', None),
- issue_quota=getattr(r, 'issue_quota', None),
- issue_status=getattr(r, 'issue_status', None),
- owner_type=getattr(r, 'owner_type', None),
- owner_id=getattr(r, 'owner_id', None),
- owner_open_id=getattr(r, 'owner_open_id', None),
- user_name=getattr(r, 'user_name', None),
- currency=getattr(r, 'currency', None),
- ))
- return IssueBatchRecordsQueryOutSchema(
- page_num=getattr(result, 'page_num', data.page_num),
- page_size=getattr(result, 'page_size', data.page_size),
- total_page_count=getattr(result, 'total_page_count', "0"),
- issue_record_info_list=record_list,
- )
- @classmethod
- async def list_batch_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- """分页查询手工发放批次列表(本地DB)"""
- crud = IssueBatchCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=IssueBatchListOutSchema,
- )
- @classmethod
- async def list_service(
- cls,
- auth: AuthSchema,
- page_no: int = 1,
- page_size: int = 20,
- search: dict | None = None,
- ) -> dict:
- crud = QuotaCRUD(auth)
- offset = (page_no - 1) * page_size
- return await crud.page(
- offset=offset,
- limit=page_size,
- order_by=[{"id": "desc"}],
- search=search or {},
- out_schema=QuotaListOutSchema,
- )
|