167 lines
6.1 KiB
Python
167 lines
6.1 KiB
Python
import zmq
|
||
import time
|
||
import json
|
||
import os
|
||
import threading
|
||
|
||
def receive_messages(socket, stop_event):
|
||
"""
|
||
后台线程函数,用于持续接收服务器消息
|
||
|
||
Args:
|
||
socket (zmq.Socket): ZeroMQ套接字
|
||
stop_event (threading.Event): 停止事件,用于通知线程退出
|
||
"""
|
||
print("开始持续接收服务器数据...")
|
||
print("-" * 50)
|
||
|
||
while not stop_event.is_set():
|
||
try:
|
||
# 设置接收超时为1秒,避免阻塞
|
||
socket.setsockopt(zmq.RCVTIMEO, 1000)
|
||
# 接收服务器的消息
|
||
frames = socket.recv_multipart()
|
||
|
||
# DEALER 套接字接收消息格式:[身份标识, 空帧, 消息内容]
|
||
# 使用frames[-1]获取最后一帧,无论中间有多少空帧
|
||
if len(frames) >= 2:
|
||
message = frames[-1].decode('utf-8')
|
||
|
||
# 尝试解析为JSON格式
|
||
try:
|
||
json_message = json.loads(message)
|
||
# 检查消息长度
|
||
json_str = str(json_message)
|
||
if len(json_str) > 100:
|
||
print(f"收到服务器数据 (JSON): {json_str[:100]}...")
|
||
else:
|
||
print(f"收到服务器数据 (JSON): {json_message}")
|
||
except json.JSONDecodeError:
|
||
# 检查消息长度
|
||
if len(message) > 100:
|
||
print(f"收到服务器数据 (原始): {message[:100]}...")
|
||
else:
|
||
print(f"收到服务器数据 (原始): {message}")
|
||
else:
|
||
print(f"收到服务器数据 (格式异常): {frames}")
|
||
|
||
except zmq.Again:
|
||
# 接收超时,继续循环
|
||
continue
|
||
except Exception as e:
|
||
print(f"接收消息时发生错误: {e}")
|
||
# 短暂暂停后继续接收
|
||
time.sleep(1)
|
||
|
||
print("接收线程已停止。")
|
||
|
||
def zero_mq_client(server_address="tcp://127.0.0.1:8099"):
|
||
"""
|
||
ZeroMQ客户端函数,用于与服务器通信
|
||
|
||
Args:
|
||
server_address (str): 服务器地址,格式为"tcp://IP:端口"
|
||
"""
|
||
# 创建 ZeroMQ 上下文
|
||
context = zmq.Context()
|
||
|
||
# 创建 DEALER 套接字
|
||
socket = context.socket(zmq.DEALER)
|
||
|
||
# 生成唯一的身份标识
|
||
identity = str('wdd').encode('utf-8')
|
||
socket.setsockopt(zmq.IDENTITY, identity)
|
||
|
||
try:
|
||
# 连接到服务器
|
||
print(f"连接到服务器 {server_address}...")
|
||
socket.connect(server_address)
|
||
|
||
# 定义消息集
|
||
message_set = [
|
||
{"method": "sync", "params": 1},
|
||
{"method": "decoderClass", "params": "mi"},
|
||
{"method": "decoderClass", "params": "ssvep"},
|
||
{"method": "decoderClass", "params": "ssmvep"},
|
||
{"method": "decoderClass", "params": "blink"},
|
||
{"method": "decoderClass", "params": "concentration"},
|
||
{"method": "train", "params": 0},
|
||
{"method": "train", "params": 1},
|
||
{"method": "rest", "params": 0},
|
||
{"method": "predict", "params": 1},
|
||
{"method": "getReport", "params": 0},
|
||
{"method": "targetFreqs", "params": [11, 12, 13]}
|
||
]
|
||
|
||
# 打印消息集
|
||
print("消息集:")
|
||
for i, msg in enumerate(message_set):
|
||
print(f"[{i}] {msg}")
|
||
print("-" * 50)
|
||
|
||
# 创建停止事件
|
||
stop_event = threading.Event()
|
||
|
||
# 启动接收线程
|
||
receive_thread = threading.Thread(target=receive_messages, args=(socket, stop_event))
|
||
receive_thread.daemon = True # 设置为守护线程,主线程退出时自动退出
|
||
receive_thread.start()
|
||
|
||
# 主线程处理控制台输入
|
||
print("输入消息序号发送对应消息,输入'q'退出程序:")
|
||
while True:
|
||
try:
|
||
# 获取用户输入
|
||
user_input = input("请输入消息序号: ")
|
||
|
||
# 检查是否退出
|
||
if user_input.lower() == 'q':
|
||
print("正在退出程序...")
|
||
break
|
||
|
||
# 尝试转换为整数
|
||
msg_index = int(user_input)
|
||
|
||
# 检查序号是否有效
|
||
if 0 <= msg_index < len(message_set):
|
||
# 获取对应的消息
|
||
selected_message = message_set[msg_index]
|
||
|
||
# 将消息转换为 JSON 字符串
|
||
json_message = json.dumps(selected_message)
|
||
|
||
# 打印发送信息
|
||
print(f"\n发送消息 (大小: {len(json_message)} 字节)...")
|
||
print(f"消息方法: {selected_message['method']}")
|
||
print(f"参数值: {selected_message['params']}")
|
||
|
||
# DEALER 套接字发送消息,包含身份标识和空帧
|
||
socket.send_multipart([identity, json_message.encode('utf-8')])
|
||
print("消息发送完成!")
|
||
print("-" * 50)
|
||
else:
|
||
print(f"无效的消息序号,请输入 0-{len(message_set)-1} 之间的数字。")
|
||
print("消息集:")
|
||
for i, msg in enumerate(message_set):
|
||
print(f"[{i}] {msg}")
|
||
print("-" * 50)
|
||
|
||
except ValueError:
|
||
print("请输入有效的数字或'q'退出。")
|
||
except Exception as e:
|
||
print(f"处理输入时发生错误: {e}")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n程序被手动终止。")
|
||
finally:
|
||
# 停止接收线程
|
||
stop_event.set()
|
||
# 等待接收线程停止
|
||
time.sleep(1)
|
||
# 关闭套接字和上下文
|
||
socket.close()
|
||
context.term()
|
||
print("客户端已关闭。")
|
||
|
||
if __name__ == "__main__":
|
||
zero_mq_client() |