import zmq import time import json import os import threading def receive_messages(socket, stop_event): """ 后台线程函数,用于持续接收服务器消息 Args: socket (zmq.Socket): ZeroMQ套接字 stop_event (threading.Event): 停止事件,用于通知线程退出 """ print("开始持续接收服务器数据...") print("-" * 50) while not stop_event.is_set(): try: # 设置接收超时为1秒,避免阻塞 socket.setsockopt(zmq.RCVTIMEO, 1000) # 接收服务器的消息 frames = socket.recv_multipart() # DEALER 套接字接收消息格式:[身份标识, 空帧, 消息内容] # 使用frames[-1]获取最后一帧,无论中间有多少空帧 if len(frames) >= 2: message = frames[-1].decode('utf-8') # 尝试解析为JSON格式 try: json_message = json.loads(message) # 检查消息长度 json_str = str(json_message) if len(json_str) > 100: print(f"收到服务器数据 (JSON): {json_str[:100]}...") else: print(f"收到服务器数据 (JSON): {json_message}") except json.JSONDecodeError: # 检查消息长度 if len(message) > 100: print(f"收到服务器数据 (原始): {message[:100]}...") else: print(f"收到服务器数据 (原始): {message}") else: print(f"收到服务器数据 (格式异常): {frames}") except zmq.Again: # 接收超时,继续循环 continue except Exception as e: print(f"接收消息时发生错误: {e}") # 短暂暂停后继续接收 time.sleep(1) print("接收线程已停止。") def zero_mq_client(server_address="tcp://127.0.0.1:8099"): """ ZeroMQ客户端函数,用于与服务器通信 Args: server_address (str): 服务器地址,格式为"tcp://IP:端口" """ # 创建 ZeroMQ 上下文 context = zmq.Context() # 创建 DEALER 套接字 socket = context.socket(zmq.DEALER) # 生成唯一的身份标识 identity = str('wdd').encode('utf-8') socket.setsockopt(zmq.IDENTITY, identity) try: # 连接到服务器 print(f"连接到服务器 {server_address}...") socket.connect(server_address) # 定义消息集 message_set = [ {"method": "sync", "params": 1}, {"method": "decoderClass", "params": "mi"}, {"method": "decoderClass", "params": "ssvep"}, {"method": "decoderClass", "params": "ssmvep"}, {"method": "decoderClass", "params": "blink"}, {"method": "decoderClass", "params": "concentration"}, {"method": "train", "params": 0}, {"method": "train", "params": 1}, {"method": "rest", "params": 0}, {"method": "predict", "params": 1}, {"method": "getReport", "params": 0}, {"method": "targetFreqs", "params": [11, 12, 13]} ] # 打印消息集 print("消息集:") for i, msg in enumerate(message_set): print(f"[{i}] {msg}") print("-" * 50) # 创建停止事件 stop_event = threading.Event() # 启动接收线程 receive_thread = threading.Thread(target=receive_messages, args=(socket, stop_event)) receive_thread.daemon = True # 设置为守护线程,主线程退出时自动退出 receive_thread.start() # 主线程处理控制台输入 print("输入消息序号发送对应消息,输入'q'退出程序:") while True: try: # 获取用户输入 user_input = input("请输入消息序号: ") # 检查是否退出 if user_input.lower() == 'q': print("正在退出程序...") break # 尝试转换为整数 msg_index = int(user_input) # 检查序号是否有效 if 0 <= msg_index < len(message_set): # 获取对应的消息 selected_message = message_set[msg_index] # 将消息转换为 JSON 字符串 json_message = json.dumps(selected_message) # 打印发送信息 print(f"\n发送消息 (大小: {len(json_message)} 字节)...") print(f"消息方法: {selected_message['method']}") print(f"参数值: {selected_message['params']}") # DEALER 套接字发送消息,包含身份标识和空帧 socket.send_multipart([identity, json_message.encode('utf-8')]) print("消息发送完成!") print("-" * 50) else: print(f"无效的消息序号,请输入 0-{len(message_set)-1} 之间的数字。") print("消息集:") for i, msg in enumerate(message_set): print(f"[{i}] {msg}") print("-" * 50) except ValueError: print("请输入有效的数字或'q'退出。") except Exception as e: print(f"处理输入时发生错误: {e}") except KeyboardInterrupt: print("\n程序被手动终止。") finally: # 停止接收线程 stop_event.set() # 等待接收线程停止 time.sleep(1) # 关闭套接字和上下文 socket.close() context.term() print("客户端已关闭。") if __name__ == "__main__": zero_mq_client()