From 1bbe84eb56e12dad79282a7bf2023d600bc00e3d Mon Sep 17 00:00:00 2001 From: Ivey Song Date: Wed, 10 Jun 2026 17:53:01 +0800 Subject: [PATCH] beta psd return --- Tools/beta_calculate.py | 2 +- Zmq/filterProcess.py | 35 +++++++++++++++++++++++++++---- Zmq/zmqServer.py | 10 ++++++--- upperHost_stimmock/MI_headless.py | 5 +++-- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/Tools/beta_calculate.py b/Tools/beta_calculate.py index f77f819..7b58a48 100644 --- a/Tools/beta_calculate.py +++ b/Tools/beta_calculate.py @@ -20,7 +20,7 @@ class Beta_Calculate(): alpha_psd = np.sum(self.band_psd(freqs, psd, (8, 13))) theta_psd = np.sum(self.band_psd(freqs, psd, (4, 8))) - print(f"[功率] β={beta_psd:.2f} | α={alpha_psd:.2f} | θ={theta_psd:.2f}") + # print(f"[功率] β={beta_psd:.2f} | α={alpha_psd:.2f} | θ={theta_psd:.2f}") return beta_psd, alpha_psd, theta_psd diff --git a/Zmq/filterProcess.py b/Zmq/filterProcess.py index a898049..a829233 100644 --- a/Zmq/filterProcess.py +++ b/Zmq/filterProcess.py @@ -7,6 +7,10 @@ import time import threading from scipy import signal from logs.log import algo_log +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from Tools.beta_calculate import Beta_Calculate class FilterRingBuffer: def __init__(self, n_chan, n_points): @@ -117,6 +121,14 @@ class SlidingFilter(threading.Thread): self.running.set() # 滤波结果回调(外部可注册,获取滤波后的数据) self.filter_result_callback = None + # beta_psd 广播回调(外部注册,用于走 zmqServer 8099 端口发送) + self.beta_broadcast_callback = None + + # beta 计算器(Fp1/Fp2 通道,索引 0/1) + self._beta_calc = Beta_Calculate(Threshold_value_low=0, Threshold_value_high=0, fs=srate) + # beta 每秒触发计数(200ms步长,5次 = 1s) + self._beta_step_counter = 0 + self._beta_steps_per_second = max(1, int(round(1.0 / step_sec))) # 5 # 预计算滤波器系数(仅执行一次) self._init_filters() @@ -135,7 +147,7 @@ class SlidingFilter(threading.Thread): self.a_bp = np.array([1.0]) def _filter_window_data(self, window_data): - """对3秒窗口数据执行滤波,返回无边界效应的200ms数据""" + """对3秒窗口数据执行滤波,返回 (无边界效应的200ms数据, 完整3s滤波数据)""" # 零相位滤波(无延迟,无边界效应) filtered = window_data - np.mean(window_data, axis=-1, keepdims=True) filtered = signal.filtfilt(self.b_notch, self.a_notch, filtered, axis=-1) @@ -146,7 +158,7 @@ class SlidingFilter(threading.Thread): start_idx = self.window_size - 2 * self.step_size end_idx = self.window_size - self.step_size output_data = filtered[:, start_idx:end_idx].copy() - return output_data + return output_data, filtered def run(self): """线程主逻辑:精确200ms触发一次滤波""" @@ -174,9 +186,24 @@ class SlidingFilter(threading.Thread): algo_log(f"缓存数据不足,当前缓存{self.ring_buffer.GetDataLenCount()}点,需{self.window_size}点", level='debug') continue - filtered_data = self._filter_window_data(window_data) + filtered_data, filtered_full = self._filter_window_data(window_data) # algo_log(f"滤波后{filtered_data.shape}数据", level='debug') - + + # ========== beta_psd 每秒计算一次(Fp1/Fp2,通道索引 0/1)========== + self._beta_step_counter += 1 + if self._beta_step_counter >= self._beta_steps_per_second: + self._beta_step_counter = 0 + try: + # 直接使用已滤波的完整3s数据的前两通道(Fp1/Fp2) + filter_betadata = filtered_full[:2, :] # shape (2, 750) + beta_psd, _, _ = self._beta_calc.calculate_all( + filter_betadata, fs=self.srate, nperseg=min(self.window_size, filter_betadata.shape[1]) + ) + if self.beta_broadcast_callback is not None: + self.beta_broadcast_callback(round(float(beta_psd), 3)) + except Exception as be: + algo_log(f"beta_psd计算异常: {be}", level='error') + if self.filter_result_callback is not None: self.filter_result_callback(filtered_data[:64, :]) except Exception as e: diff --git a/Zmq/zmqServer.py b/Zmq/zmqServer.py index a21bd33..ac133c2 100644 --- a/Zmq/zmqServer.py +++ b/Zmq/zmqServer.py @@ -405,10 +405,14 @@ class zmqServer(threading.Thread): frames = self.cmd_socket.recv_multipart() self._handle_cmd_message(frames) - # 处理8100数据端口消息 + # 处理8100数据端口消息(排空积压,消除标签延迟) if self.data_socket in socks and socks[self.data_socket] == zmq.POLLIN: - frames = self.data_socket.recv_multipart() - self._handle_data_message(frames) + while True: + try: + frames = self.data_socket.recv_multipart(zmq.NOBLOCK) + self._handle_data_message(frames) + except zmq.Again: + break except Exception as e: algo_log(f"服务器主循环异常: {e}", level="ERROR") diff --git a/upperHost_stimmock/MI_headless.py b/upperHost_stimmock/MI_headless.py index 4609f81..894ed48 100644 --- a/upperHost_stimmock/MI_headless.py +++ b/upperHost_stimmock/MI_headless.py @@ -170,6 +170,7 @@ def run_headless(): time.sleep(1) # 等待连接建立 client.send_data('decoderClass', 'mi') + time.sleep(4) # 等待 zmqServer 排空启动积压包(datamock 提前连接会积压 ~3s 数据) # MI_IntervalEpoch = [0.5, 4.5],trial时长 = 4.5-0.5 = 4.0s _mi_iv = ast.literal_eval(IniRead('system', 'MI_IntervalEpoch')) # [0.5, 4.5] @@ -222,7 +223,7 @@ def run_headless(): time.sleep(0.5) # ding 提示后等待 client.send_data('train', 0) - time.sleep(train_time + epoch_wait) # 等待刺激时间 + epoch 完成时间 + time.sleep(train_time + 0.2) # 等待刺激时间 + epoch 完成时间 trained += 1 client.send_data('rest', 0) @@ -231,7 +232,7 @@ def run_headless(): # 空闲态样本采集(train 1,label=2) print(f"\n[Train] 空闲态采集 (train 1) trained={trained}") client.send_data('train', 1) - time.sleep(train_time + epoch_wait) # 等待刺激时间 + epoch 完成时间 + time.sleep(train_time + 0.2) # 等待刺激时间 + epoch 完成时间 trained += 1 client.send_data('rest', 0)