Files
2026-06-01 13:18:36 +08:00

119 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import zmq
import threading
import json
from SunnyLinker import SunnyLinker64
class zmqServer(threading.Thread):
def __init__(self, host='0.0.0.0', port=8099):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.running = False
self.get_Impedance = False # 是否返回阻抗值
self.open_Impedance = None # 是否开启阻抗检测功能
self.StartDecode = False # false 停止解码true=开始解码
self.StartTrain = False # False未进入训练状态True处于训练状态
self.state_mode = None # 'train'为训练状态rest'为休息状态,'test'为测试状态
self.currentLabel = -1 # 接收刺激端消息,了解刺激端当前的训练标签
self.IsExitApp = False # 当socket收到2的时候就置为True代表遥退出系统了。
self.getReport = False # 获取训练报告内容
self.mat_generate = False # 保存mat文件True开始False暂停
self.reset_mat_buffer = False # 重置缓冲区标志True表示下次开始采集需要清空旧数据
self.subject_id = None #受试者ID
self.session_id = None #Session ID
self.save_win = 0 #保存数据时长
self.daemon = True
# 创建 ZeroMQ 上下文
self.context = zmq.Context()
# 创建 REP 套接字(响应端)
self.socket = self.context.socket(zmq.ROUTER)
self.socket.bind(f"tcp://{self.host}:{self.port}") # 绑定到端口 8099
self.targetFreqs = []
self.changeTarget = False # 更换目标频率
self.sunnyLinker = SunnyLinker64(None, None, None, None,None) #单例模式类已在Decoder实例化
self.labels = [0x01, 0x02,0x03]
self.decoder_switch = False #更换解码器
self.decoder_class = None #解码器类别 'ssvep','ssmvep','mi'
def run(self):
self.running = True
print(f"Server is running on {self.host}:{self.port}")
try:
while self.running:
# 等待客户端请求
_,_,message = self.socket.recv_multipart()
message = json.loads(message.decode('utf-8'))
print(f"Received request: {message}")
# 处理请求
method = message.get("method")
params = message.get("params")
if method == "sync":
self.state_mode = 'sync'
if method == "targetFreqs":
if not isinstance(params,list):
print('targetFreqs must be a list')
continue
if params != self.targetFreqs:
self.targetFreqs = params
self.changeTarget = True
if method == "decoderClass":
if not isinstance(params,str):
print('decoderClass must be a str')
continue
# if params != self.decoder_class:
self.decoder_class = params
self.decoder_switch = True
if method == "getReport":
self.getReport = True
if method == "train":#训练状态
self.state_mode = 'train'
self.StartTrain = True
self.currentLabel = params # 当前刺激端的训练标签
self.sunnyLinker.push_trigger(self.labels[self.currentLabel])
elif method == "predict":#预测状态
self.state_mode = 'predict'
if params == 1: #开始解码
self.StartDecode = True
self.sunnyLinker.push_trigger(0x63)
elif params == 2: #停止解码
self.IsExitApp = True
self.running = False
elif method == "rest": #休息状态
self.state_mode = 'rest'
elif method == "impedance":
if params == 1:
self.open_Impedance = True # 开启阻抗
self.get_Impedance = True # 返回阻抗
elif params == 2:
self.open_Impedance = False # 关闭阻抗
self.get_Impedance = False # 停止返回阻抗
elif method == "matGenerate":
self.subject_id = str(params['subject_id'])
self.session_id = str(params['session_id'])
self.save_win = int(params['time'])
self.mat_generate = True
self.reset_mat_buffer = True # 每次发送 matGenerate 都重置缓冲区
elif method == "stop": # 停止
self.mat_generate = False
except Exception as e:
print(f"An socket error occurred: {e}")
finally:
self.running = False
# 关闭套接字和上下文
self.socket.close()
self.context.term()
print("Server socket and context closed.")
def stop(self):
"""显式关闭服务器"""
self.running = False
self.socket.close()
self.context.term()
print("Server closed explicitly.")
if __name__ == '__main__':
server = zmqServer()
server.start()