update
This commit is contained in:
@@ -10,34 +10,27 @@ from logs.log import algo_log
|
|||||||
|
|
||||||
class FilterRingBuffer:
|
class FilterRingBuffer:
|
||||||
def __init__(self, n_chan, n_points):
|
def __init__(self, n_chan, n_points):
|
||||||
"""
|
|
||||||
初始化纯数据环形缓存(线程安全)
|
|
||||||
:param n_chan: 通道数
|
|
||||||
:param n_points: 总缓存点数(与paradigmRingBuffer参数完全一致)
|
|
||||||
"""
|
|
||||||
self.n_chan = n_chan
|
self.n_chan = n_chan
|
||||||
self.n_points = n_points
|
self.n_points = n_points
|
||||||
self.buffer = np.zeros((n_chan, n_points), dtype=np.float64)
|
self.buffer = np.zeros((n_chan, n_points), dtype=np.float64)
|
||||||
self.current_ptr = 0
|
self.current_ptr = 0
|
||||||
self.total_samples = 0
|
self.total_samples = 0
|
||||||
self.lock = threading.Lock() # 仅保护元数据
|
self.lock = threading.Lock() # 仅保护元数据
|
||||||
|
self.has_new_data = False
|
||||||
|
|
||||||
def appendBuffer(self, data):
|
def appendBuffer(self, data):
|
||||||
"""
|
|
||||||
追加数据到缓存(与paradigmRingBuffer接口一致)
|
|
||||||
:param data: 输入数据,shape=(n_chan, n_samples)
|
|
||||||
"""
|
|
||||||
n = data.shape[1]
|
n = data.shape[1]
|
||||||
if n == 0:
|
if n == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
# -------- 第一步:仅加锁读取/更新元数据(持锁极短)--------
|
# 仅加锁读取/更新元数据
|
||||||
with self.lock:
|
with self.lock:
|
||||||
old_ptr = self.current_ptr
|
old_ptr = self.current_ptr
|
||||||
new_ptr = (old_ptr + n) % self.n_points
|
new_ptr = (old_ptr + n) % self.n_points
|
||||||
new_total = min(self.total_samples + n, self.n_points)
|
new_total = min(self.total_samples + n, self.n_points)
|
||||||
|
self.has_new_data = True
|
||||||
|
|
||||||
# -------- 第二步:数组写入(耗时操作,移出锁外)--------
|
# 数组写入(耗时操作,移出锁外)
|
||||||
write_end = old_ptr + n
|
write_end = old_ptr + n
|
||||||
if write_end <= self.n_points:
|
if write_end <= self.n_points:
|
||||||
self.buffer[:, old_ptr:write_end] = data
|
self.buffer[:, old_ptr:write_end] = data
|
||||||
@@ -46,26 +39,30 @@ class FilterRingBuffer:
|
|||||||
self.buffer[:, old_ptr:] = data[:, :split]
|
self.buffer[:, old_ptr:] = data[:, :split]
|
||||||
self.buffer[:, :write_end - self.n_points] = data[:, split:]
|
self.buffer[:, :write_end - self.n_points] = data[:, split:]
|
||||||
|
|
||||||
# -------- 第三步:再次加锁更新最终元数据 --------
|
# 再次加锁更新最终元数据
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.current_ptr = new_ptr
|
self.current_ptr = new_ptr
|
||||||
self.total_samples = new_total
|
self.total_samples = new_total
|
||||||
|
|
||||||
|
# ========== 新增:获取&清空新数据标记的方法 ==========
|
||||||
|
def check_and_clear_new_data(self):
|
||||||
|
"""检查是否有新数据,并一次性清空标记(消费后重置)"""
|
||||||
|
with self.lock:
|
||||||
|
flag = self.has_new_data
|
||||||
|
if flag:
|
||||||
|
self.has_new_data = False
|
||||||
|
return flag
|
||||||
|
|
||||||
def getData(self, count):
|
def getData(self, count):
|
||||||
"""
|
# 加锁获取最新元数据
|
||||||
从最新位置向前读取count个点(环形读取)
|
|
||||||
核心逻辑:current_ptr是下一个写入位置 → 最新数据在current_ptr之前
|
|
||||||
:param count: 读取点数
|
|
||||||
:return: np.ndarray, shape=(n_chan, count)
|
|
||||||
"""
|
|
||||||
# -------- 第一步:加锁获取最新元数据(持锁极短)--------
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
count = min(count, self.total_samples)
|
count = min(count, self.total_samples)
|
||||||
if count == 0:
|
if count == 0:
|
||||||
return np.zeros((self.n_chan, 0))
|
return np.zeros((self.n_chan, 0))
|
||||||
end = self.current_ptr
|
end = self.current_ptr
|
||||||
start = end - count
|
start = end - count
|
||||||
|
|
||||||
|
# 数据读取、切片、拼接(无锁)
|
||||||
if start >= 0:
|
if start >= 0:
|
||||||
res = self.buffer[:, start:end].copy()
|
res = self.buffer[:, start:end].copy()
|
||||||
else:
|
else:
|
||||||
@@ -89,6 +86,7 @@ class FilterRingBuffer:
|
|||||||
self.buffer.fill(0.0)
|
self.buffer.fill(0.0)
|
||||||
self.current_ptr = 0
|
self.current_ptr = 0
|
||||||
self.total_samples = 0
|
self.total_samples = 0
|
||||||
|
self.has_new_data = False # 重置时清空新数据标记
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# 2. 独立滑动滤波类(仅负责滤波业务逻辑,不关心缓存实现)
|
# 2. 独立滑动滤波类(仅负责滤波业务逻辑,不关心缓存实现)
|
||||||
@@ -152,38 +150,35 @@ class SlidingFilter(threading.Thread):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""线程主逻辑:精确200ms触发一次滤波"""
|
"""线程主逻辑:精确200ms触发一次滤波"""
|
||||||
# 精确定时核心:基于perf_counter计算下一次执行时间,补偿sleep误差
|
|
||||||
interval = self.step_sec # 200ms = 0.2秒
|
interval = self.step_sec # 200ms = 0.2秒
|
||||||
next_run_time = time.perf_counter()
|
next_run_time = time.perf_counter()
|
||||||
|
|
||||||
while self.running.is_set():
|
while self.running.is_set():
|
||||||
# 1. 等待到下一次执行时间(精确定时)
|
# 1. 精确定时等待
|
||||||
current_time = time.perf_counter()
|
current_time = time.perf_counter()
|
||||||
if current_time < next_run_time:
|
if current_time < next_run_time:
|
||||||
time.sleep(next_run_time - current_time)
|
time.sleep(next_run_time - current_time)
|
||||||
next_run_time += interval # 补偿:下次执行时间基于上一次目标时间
|
next_run_time += interval
|
||||||
else:
|
else:
|
||||||
# 若超时(如滤波耗时超过200ms),重置下一次时间(避免累积误差)
|
|
||||||
algo_log("滤波耗时超过200ms,定时偏移", level='debug')
|
algo_log("滤波耗时超过200ms,定时偏移", level='debug')
|
||||||
next_run_time = time.perf_counter() + interval
|
next_run_time = time.perf_counter() + interval
|
||||||
|
|
||||||
# 2. 执行滤波逻辑
|
# ========== 新增核心判断:无新数据则直接跳过 ==========
|
||||||
|
if not self.ring_buffer.check_and_clear_new_data():
|
||||||
|
# 无新数据,不执行滤波、不发送数据
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 2. 有新数据,才执行原有滤波逻辑
|
||||||
try:
|
try:
|
||||||
# 获取最新的3秒窗口数据
|
|
||||||
window_data = self.ring_buffer.get_latest_n_points(self.window_size)
|
window_data = self.ring_buffer.get_latest_n_points(self.window_size)
|
||||||
algo_log(f"获取到{window_data.shape}数据", level='debug')
|
|
||||||
if window_data is None:
|
if window_data is None:
|
||||||
algo_log(f"缓存数据不足,当前缓存{self.ring_buffer.GetDataLenCount()}点,需{self.window_size}点", level='debug')
|
algo_log(f"缓存数据不足,当前缓存{self.ring_buffer.GetDataLenCount()}点,需{self.window_size}点", level='debug')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 滤波并提取无边界效应的200ms数据
|
|
||||||
filtered_data = self._filter_window_data(window_data)
|
filtered_data = self._filter_window_data(window_data)
|
||||||
algo_log(f"滤波后{filtered_data.shape}数据", level='debug')
|
algo_log(f"滤波后{filtered_data.shape}数据", level='debug')
|
||||||
|
|
||||||
# 回调返回结果(外部可处理)
|
|
||||||
if self.filter_result_callback is not None:
|
if self.filter_result_callback is not None:
|
||||||
self.filter_result_callback(filtered_data[:64, :]) # 只发送前64通道数据
|
self.filter_result_callback(filtered_data[:64, :])
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
algo_log(f"滤波执行异常: {e}", level='error')
|
algo_log(f"滤波执行异常: {e}", level='error')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user