| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- #!/usr/bin/env python3
- """
- 简单的回调通知监听服务器
- 用于测试 openapi 转账回调通知功能
- 使用方法:
- python server.py [port]
- 示例:
- python server.py 8080
- """
- import http.server
- import socketserver
- import json
- import logging
- from datetime import datetime
- from urllib.parse import parse_qs
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- PORT = int(__import__('sys').argv[1]) if len(__import__('sys').argv) > 1 else 8080
- class CallbackHandler(http.server.BaseHTTPRequestHandler):
- """处理回调通知的请求处理器"""
- def do_POST(self):
- """处理 POST 请求"""
- try:
- # 获取请求内容长度
- content_length = int(self.headers.get('Content-Length', 0))
- # 读取请求体
- post_data = self.rfile.read(content_length)
- # 解析表单数据
- content_type = self.headers.get('Content-Type', '')
- logger.info("=" * 60)
- logger.info("收到回调通知!")
- logger.info("时间: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- logger.info("路径: %s", self.path)
- logger.info("Content-Type: %s", content_type)
- logger.info("-" * 40)
- # 解析并打印表单数据
- if 'application/x-www-form-urlencoded' in content_type:
- params = parse_qs(post_data.decode('utf-8'))
- for key, values in params.items():
- logger.info("%s: %s", key, values[0] if values else '')
- # 如果有 content 字段,尝试解析 JSON
- if 'content' in params:
- try:
- content_json = json.loads(params['content'][0])
- logger.info("-" * 40)
- logger.info("content 内容 (JSON):")
- for k, v in content_json.items():
- logger.info(" %s: %s", k, v)
- except json.JSONDecodeError:
- logger.warning("content 不是有效的 JSON 格式")
- elif 'multipart/form-data' in content_type:
- # 简单解析 multipart 数据
- logger.info("收到 multipart/form-data 类型请求")
- logger.info("原始数据: %s", post_data.decode('utf-8', errors='replace'))
- else:
- # 尝试作为 JSON 解析
- try:
- json_data = json.loads(post_data.decode('utf-8'))
- logger.info("JSON 数据:")
- for key, value in json_data.items():
- logger.info(" %s: %s", key, value)
- except json.JSONDecodeError:
- logger.info("原始数据: %s", post_data.decode('utf-8', errors='replace'))
- logger.info("=" * 60)
- # 发送成功响应
- self.send_response(200)
- self.send_header('Content-Type', 'application/json')
- self.send_header('Access-Control-Allow-Origin', '*')
- self.end_headers()
- response = {
- "code": 0,
- "msg": "success",
- "data": {
- "receive_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- "notify_id": "test_response"
- }
- }
- self.wfile.write(json.dumps(response).encode('utf-8'))
- except Exception as e:
- logger.error("处理请求时发生错误: %s", str(e))
- self.send_response(500)
- self.send_header('Content-Type', 'application/json')
- self.end_headers()
- error_response = {"code": -1, "msg": str(e)}
- self.wfile.write(json.dumps(error_response).encode('utf-8'))
- def do_GET(self):
- """处理 GET 请求 - 用于健康检查"""
- self.send_response(200)
- self.send_header('Content-Type', 'application/json')
- self.send_header('Access-Control-Allow-Origin', '*')
- self.end_headers()
- health_response = {
- "status": "ok",
- "service": "callback-server",
- "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- self.wfile.write(json.dumps(health_response).encode('utf-8'))
- def log_message(self, format, *args):
- """自定义日志格式"""
- logger.info(format % args)
- def main():
- """主函数"""
- print(f"""
- ╔═══════════════════════════════════════════════════════════╗
- ║ 回调通知监听服务器 ║
- ╠═══════════════════════════════════════════════════════════╣
- ║ 监听端口: {PORT:<46}║
- ║ 监听路径: POST /callback ║
- ║ 健康检查: GET / ║
- ╠═══════════════════════════════════════════════════════════╣
- ║ 按 Ctrl+C 停止服务器 ║
- ╚═══════════════════════════════════════════════════════════╝
- """)
- with socketserver.TCPServer(("", PORT), CallbackHandler) as httpd:
- logger.info("服务器启动成功,监听端口 %d", PORT)
- try:
- httpd.serve_forever()
- except KeyboardInterrupt:
- logger.info("服务器已停止")
- httpd.shutdown()
- if __name__ == "__main__":
- main()
|