diff --git a/Zmq/filterProcess.py b/Zmq/filterProcess.py index 1486fa1..58d8209 100644 --- a/Zmq/filterProcess.py +++ b/Zmq/filterProcess.py @@ -10,34 +10,27 @@ from logs.log import algo_log class FilterRingBuffer: def __init__(self, n_chan, n_points): - """ - 初始化纯数据环形缓存(线程安全) - :param n_chan: 通道数 - :param n_points: 总缓存点数(与paradigmRingBuffer参数完全一致) - """ self.n_chan = n_chan self.n_points = n_points self.buffer = np.zeros((n_chan, n_points), dtype=np.float64) self.current_ptr = 0 self.total_samples = 0 self.lock = threading.Lock() # 仅保护元数据 + self.has_new_data = False def appendBuffer(self, data): - """ - 追加数据到缓存(与paradigmRingBuffer接口一致) - :param data: 输入数据,shape=(n_chan, n_samples) - """ n = data.shape[1] if n == 0: return - # -------- 第一步:仅加锁读取/更新元数据(持锁极短)-------- + # 仅加锁读取/更新元数据 with self.lock: old_ptr = self.current_ptr new_ptr = (old_ptr + n) % self.n_points new_total = min(self.total_samples + n, self.n_points) + self.has_new_data = True - # -------- 第二步:数组写入(耗时操作,移出锁外)-------- + # 数组写入(耗时操作,移出锁外) write_end = old_ptr + n if write_end <= self.n_points: self.buffer[:, old_ptr:write_end] = data @@ -46,26 +39,30 @@ class FilterRingBuffer: self.buffer[:, old_ptr:] = data[:, :split] self.buffer[:, :write_end - self.n_points] = data[:, split:] - # -------- 第三步:再次加锁更新最终元数据 -------- + # 再次加锁更新最终元数据 with self.lock: self.current_ptr = new_ptr self.total_samples = new_total + # ========== 新增:获取&清空新数据标记的方法 ========== + def check_and_clear_new_data(self): + """检查是否有新数据,并一次性清空标记(消费后重置)""" + with self.lock: + flag = self.has_new_data + if flag: + self.has_new_data = False + return flag + def getData(self, count): - """ - 从最新位置向前读取count个点(环形读取) - 核心逻辑:current_ptr是下一个写入位置 → 最新数据在current_ptr之前 - :param count: 读取点数 - :return: np.ndarray, shape=(n_chan, count) - """ - # -------- 第一步:加锁获取最新元数据(持锁极短)-------- + # 加锁获取最新元数据 with self.lock: count = min(count, self.total_samples) if count == 0: return np.zeros((self.n_chan, 0)) end = self.current_ptr start = end - count - + + # 数据读取、切片、拼接(无锁) if start >= 0: res = self.buffer[:, start:end].copy() else: @@ -89,6 +86,7 @@ class FilterRingBuffer: self.buffer.fill(0.0) self.current_ptr = 0 self.total_samples = 0 + self.has_new_data = False # 重置时清空新数据标记 # ----------------------------------------------------------------------------- # 2. 独立滑动滤波类(仅负责滤波业务逻辑,不关心缓存实现) @@ -152,38 +150,35 @@ class SlidingFilter(threading.Thread): def run(self): """线程主逻辑:精确200ms触发一次滤波""" - # 精确定时核心:基于perf_counter计算下一次执行时间,补偿sleep误差 interval = self.step_sec # 200ms = 0.2秒 next_run_time = time.perf_counter() - while self.running.is_set(): - # 1. 等待到下一次执行时间(精确定时) + # 1. 精确定时等待 current_time = time.perf_counter() if current_time < next_run_time: time.sleep(next_run_time - current_time) - next_run_time += interval # 补偿:下次执行时间基于上一次目标时间 + next_run_time += interval else: - # 若超时(如滤波耗时超过200ms),重置下一次时间(避免累积误差) algo_log("滤波耗时超过200ms,定时偏移", level='debug') next_run_time = time.perf_counter() + interval - # 2. 执行滤波逻辑 + # ========== 新增核心判断:无新数据则直接跳过 ========== + if not self.ring_buffer.check_and_clear_new_data(): + # 无新数据,不执行滤波、不发送数据 + continue + + # 2. 有新数据,才执行原有滤波逻辑 try: - # 获取最新的3秒窗口数据 window_data = self.ring_buffer.get_latest_n_points(self.window_size) - algo_log(f"获取到{window_data.shape}数据", level='debug') if window_data is None: algo_log(f"缓存数据不足,当前缓存{self.ring_buffer.GetDataLenCount()}点,需{self.window_size}点", level='debug') continue - - # 滤波并提取无边界效应的200ms数据 + filtered_data = self._filter_window_data(window_data) algo_log(f"滤波后{filtered_data.shape}数据", level='debug') - - # 回调返回结果(外部可处理) + if self.filter_result_callback is not None: - self.filter_result_callback(filtered_data[:64, :]) # 只发送前64通道数据 - + self.filter_result_callback(filtered_data[:64, :]) except Exception as e: algo_log(f"滤波执行异常: {e}", level='error')