#!/usr/bin/env python3 """ 简单的回调通知监听服务器 用于测试 openapi 转账回调通知功能 使用方法: python server.py [port] 示例: python server.py 8080 """ import http.server import socketserver import json import logging from datetime import datetime from urllib.parse import parse_qs # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) PORT = int(__import__('sys').argv[1]) if len(__import__('sys').argv) > 1 else 8080 class CallbackHandler(http.server.BaseHTTPRequestHandler): """处理回调通知的请求处理器""" def do_POST(self): """处理 POST 请求""" try: # 获取请求内容长度 content_length = int(self.headers.get('Content-Length', 0)) # 读取请求体 post_data = self.rfile.read(content_length) # 解析表单数据 content_type = self.headers.get('Content-Type', '') logger.info("=" * 60) logger.info("收到回调通知!") logger.info("时间: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S')) logger.info("路径: %s", self.path) logger.info("Content-Type: %s", content_type) logger.info("-" * 40) # 解析并打印表单数据 if 'application/x-www-form-urlencoded' in content_type: params = parse_qs(post_data.decode('utf-8')) for key, values in params.items(): logger.info("%s: %s", key, values[0] if values else '') # 如果有 content 字段,尝试解析 JSON if 'content' in params: try: content_json = json.loads(params['content'][0]) logger.info("-" * 40) logger.info("content 内容 (JSON):") for k, v in content_json.items(): logger.info(" %s: %s", k, v) except json.JSONDecodeError: logger.warning("content 不是有效的 JSON 格式") elif 'multipart/form-data' in content_type: # 简单解析 multipart 数据 logger.info("收到 multipart/form-data 类型请求") logger.info("原始数据: %s", post_data.decode('utf-8', errors='replace')) else: # 尝试作为 JSON 解析 try: json_data = json.loads(post_data.decode('utf-8')) logger.info("JSON 数据:") for key, value in json_data.items(): logger.info(" %s: %s", key, value) except json.JSONDecodeError: logger.info("原始数据: %s", post_data.decode('utf-8', errors='replace')) logger.info("=" * 60) # 发送成功响应 self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() response = { "code": 0, "msg": "success", "data": { "receive_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "notify_id": "test_response" } } self.wfile.write(json.dumps(response).encode('utf-8')) except Exception as e: logger.error("处理请求时发生错误: %s", str(e)) self.send_response(500) self.send_header('Content-Type', 'application/json') self.end_headers() error_response = {"code": -1, "msg": str(e)} self.wfile.write(json.dumps(error_response).encode('utf-8')) def do_GET(self): """处理 GET 请求 - 用于健康检查""" self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() health_response = { "status": "ok", "service": "callback-server", "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') } self.wfile.write(json.dumps(health_response).encode('utf-8')) def log_message(self, format, *args): """自定义日志格式""" logger.info(format % args) def main(): """主函数""" print(f""" ╔═══════════════════════════════════════════════════════════╗ ║ 回调通知监听服务器 ║ ╠═══════════════════════════════════════════════════════════╣ ║ 监听端口: {PORT:<46}║ ║ 监听路径: POST /callback ║ ║ 健康检查: GET / ║ ╠═══════════════════════════════════════════════════════════╣ ║ 按 Ctrl+C 停止服务器 ║ ╚═══════════════════════════════════════════════════════════╝ """) with socketserver.TCPServer(("", PORT), CallbackHandler) as httpd: logger.info("服务器启动成功,监听端口 %d", PORT) try: httpd.serve_forever() except KeyboardInterrupt: logger.info("服务器已停止") httpd.shutdown() if __name__ == "__main__": main()