Files
bci_algo/ZeroMQClient_mock.py
2026-06-06 16:15:55 +08:00

166 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import zmq
import time
import json
import os
import threading
def receive_messages(socket, stop_event):
"""
后台线程函数,用于持续接收服务器消息
Args:
socket (zmq.Socket): ZeroMQ套接字
stop_event (threading.Event): 停止事件,用于通知线程退出
"""
print("开始持续接收服务器数据...")
print("-" * 50)
while not stop_event.is_set():
try:
# 设置接收超时为1秒避免阻塞
socket.setsockopt(zmq.RCVTIMEO, 1000)
# 接收服务器的消息
frames = socket.recv_multipart()
# DEALER 套接字接收消息格式:[身份标识, 空帧, 消息内容]
# 使用frames[-1]获取最后一帧,无论中间有多少空帧
if len(frames) >= 2:
message = frames[-1].decode('utf-8')
# 尝试解析为JSON格式
try:
json_message = json.loads(message)
# 检查消息长度
json_str = str(json_message)
if len(json_str) > 100:
print(f"收到服务器数据 (JSON): {json_str[:100]}...")
else:
print(f"收到服务器数据 (JSON): {json_message}")
except json.JSONDecodeError:
# 检查消息长度
if len(message) > 100:
print(f"收到服务器数据 (原始): {message[:100]}...")
else:
print(f"收到服务器数据 (原始): {message}")
else:
print(f"收到服务器数据 (格式异常): {frames}")
except zmq.Again:
# 接收超时,继续循环
continue
except Exception as e:
print(f"接收消息时发生错误: {e}")
# 短暂暂停后继续接收
time.sleep(1)
print("接收线程已停止。")
def zero_mq_client(server_address="tcp://192.168.254.101:8099"):
"""
ZeroMQ客户端函数用于与服务器通信
Args:
server_address (str): 服务器地址,格式为"tcp://IP:端口"
"""
# 创建 ZeroMQ 上下文
context = zmq.Context()
# 创建 DEALER 套接字
socket = context.socket(zmq.DEALER)
# 生成唯一的身份标识
identity = str('wdd').encode('utf-8')
socket.setsockopt(zmq.IDENTITY, identity)
try:
# 连接到服务器
print(f"连接到服务器 {server_address}...")
socket.connect(server_address)
# 定义消息集
message_set = [
{"method": "sync", "params": 1},
{"method": "decoderClass", "params": "mi"},
{"method": "decoderClass", "params": "ssvep"},
{"method": "decoderClass", "params": "ssmvep"},
{"method": "decoderClass", "params": "blink"},
{"method": "decoderClass", "params": "concentration"},
{"method": "train", "params": 0},
{"method": "train", "params": 1},
{"method": "rest", "params": 0},
{"method": "predict", "params": 1},
{"method": "getReport", "params": 0}
]
# 打印消息集
print("消息集:")
for i, msg in enumerate(message_set):
print(f"[{i}] {msg}")
print("-" * 50)
# 创建停止事件
stop_event = threading.Event()
# 启动接收线程
receive_thread = threading.Thread(target=receive_messages, args=(socket, stop_event))
receive_thread.daemon = True # 设置为守护线程,主线程退出时自动退出
receive_thread.start()
# 主线程处理控制台输入
print("输入消息序号发送对应消息,输入'q'退出程序:")
while True:
try:
# 获取用户输入
user_input = input("请输入消息序号: ")
# 检查是否退出
if user_input.lower() == 'q':
print("正在退出程序...")
break
# 尝试转换为整数
msg_index = int(user_input)
# 检查序号是否有效
if 0 <= msg_index < len(message_set):
# 获取对应的消息
selected_message = message_set[msg_index]
# 将消息转换为 JSON 字符串
json_message = json.dumps(selected_message)
# 打印发送信息
print(f"\n发送消息 (大小: {len(json_message)} 字节)...")
print(f"消息方法: {selected_message['method']}")
print(f"参数值: {selected_message['params']}")
# DEALER 套接字发送消息,包含身份标识和空帧
socket.send_multipart([identity, json_message.encode('utf-8')])
print("消息发送完成!")
print("-" * 50)
else:
print(f"无效的消息序号,请输入 0-{len(message_set)-1} 之间的数字。")
print("消息集:")
for i, msg in enumerate(message_set):
print(f"[{i}] {msg}")
print("-" * 50)
except ValueError:
print("请输入有效的数字或'q'退出。")
except Exception as e:
print(f"处理输入时发生错误: {e}")
except KeyboardInterrupt:
print("\n程序被手动终止。")
finally:
# 停止接收线程
stop_event.set()
# 等待接收线程停止
time.sleep(1)
# 关闭套接字和上下文
socket.close()
context.term()
print("客户端已关闭。")
if __name__ == "__main__":
zero_mq_client()