diff --git a/Decoder.py b/Decoder.py index 2452ccd..3dbc3ba 100644 --- a/Decoder.py +++ b/Decoder.py @@ -49,18 +49,9 @@ class Decoder_main(threading.Thread): self.decodingSteps = 0 # 0=停止解码 1=预热 2=解码中 3=解码完成,发送解码结果 - self.zmqServer = None - self.sliding_filter = None - - self._init_threads() - - def _init_threads(self): - """初始化ZMQ服务和滤波线程""" - # 1. 初始化ZMQServer并启动 self.zmqServer = zmqServer(device_info=self.device_info) self.zmqServer.start() # 启动ZMQ接收线程 - - # 2. 初始化滤波线程(关联ZMQServer的环形缓存) + self.sliding_filter = SlidingFilter( ring_buffer=self.zmqServer.filterBuffer, n_chan=self.zmqServer.device_info['channel_nums'], @@ -68,8 +59,7 @@ class Decoder_main(threading.Thread): ) # 注册滤波结果回调(示例:打印数据形状) - self.sliding_filter.set_result_callback(self.zmqServer.send_filtered_data) - + self.sliding_filter.filter_result_callback = self.zmqServer.send_filtered_data def is_valid_signal(self, data, threshold=1e5): # 判断当前信号是否为有效信号 # data: (chans, samples) @@ -196,10 +186,11 @@ class Decoder_main(threading.Thread): while self.Runing: # 当滤波数据大于5秒时,启动滤波线程 if self.zmqServer.filterBuffer.GetDataLenCount() > self.device_info['sample_rate'] * 5: + algo_log("启动滤波线程", level="DEBUG") self.sliding_filter.start() if self.zmqServer.decoder_switch or self.zmqServer.changeTarget: - print(f"Decoder_class Switch Detected: {self.zmqServer.decoder_class}") + algo_log(f"Decoder_class Switch Detected: {self.zmqServer.decoder_class}", level="DEBUG") self.zmqServer.decoder_switch = False self.zmqServer.changeTarget = False self.reset_state() # 切换前先统一清理旧状态 @@ -210,7 +201,6 @@ class Decoder_main(threading.Thread): # self.zmqClient.send_to_all('sync', self.zmqClient.state) self.zmqServer.state_mode = 'rest' - # --- 取数优先:先执行 decoder(消费环形缓冲),再处理 plot/report 等重负载 --- try: if self.decoder_class == 'ssvep' or self.decoder_class == 'pvs': self.decoder_SSVEP() diff --git a/Zmq/filterProcess.py b/Zmq/filterProcess.py index 402a971..7bafc6b 100644 --- a/Zmq/filterProcess.py +++ b/Zmq/filterProcess.py @@ -102,8 +102,7 @@ class SlidingFilter(threading.Thread): n_chan=66, srate=250, window_sec=3, - step_sec=0.2, # 200ms滑动步长 - packet_size=5 + step_sec=0.2 ): super().__init__(daemon=True) # 核心参数 @@ -114,7 +113,6 @@ class SlidingFilter(threading.Thread): self.step_sec = step_sec # 200ms滑动步长 self.window_size = int(srate * window_sec) # 3秒点数:250*3=750 self.step_size = int(srate * step_sec) # 200ms点数:250*0.2=50 - self.packet_size = packet_size # 关联ZMQServer的环形缓存(解耦:仅依赖接口) self.ring_buffer = ring_buffer @@ -194,6 +192,17 @@ class SlidingFilter(threading.Thread): self.filter_result_callback = callback def stop(self): - """停止滤波线程""" + """停止滤波线程(安全版)""" + # 1. 先设置停止标志(Event.clear()是线程安全的) self.running.clear() - self.join(timeout=1) + + # 2. 核心修复:只有线程已启动且正在运行时才调用join + if self.is_alive(): + # 等待线程正常退出,最多1秒 + self.join(timeout=1) + # 超时未退出时打印警告,便于排查问题 + if self.is_alive(): + algo_log("警告:滤波线程在1秒内未正常退出,可能存在阻塞操作", level="WARNING") + + # 3. 无论线程是否启动,都打印停止日志 + algo_log("滤波线程已停止") diff --git a/Zmq/zmqServer.py b/Zmq/zmqServer.py index 34e980c..d47e5e2 100644 --- a/Zmq/zmqServer.py +++ b/Zmq/zmqServer.py @@ -268,7 +268,7 @@ class zmqServer(threading.Thread): # -------------------------- 数据端口消息处理 -------------------------- def _handle_data_message(self, frames): """处理8100端口二进制脑电数据消息""" - algo_log(f"收到数据帧,总帧数:{len(frames)}", level="DEBUG", record_once=True) + algo_log(f"收到数据帧,总帧数:{len(frames)}", level="DEBUG", record_once=False) # 然后再进行解析 if len(frames) == 4: # 你的上位机格式