diff --git a/filter_test.py b/filter_test.py index 1d75e8e..84d43e3 100644 --- a/filter_test.py +++ b/filter_test.py @@ -1,11 +1,13 @@ # -*- coding: utf-8 -*- """ -脑电滤波服务 8100端口测试工具【最终修复版】 -修复:1. Matplotlib中文字体乱码 2. ZMQ双连接收不到数据问题 -通信规范: -上位机 -> 服务端:send_multipart([client_id, b"", data_buf]) 共3帧 -服务端 recv_multipart() 帧长度 = 3 -时序:每20ms(0.02s)发送一包 (5,66),服务端200ms回传 (50,64) +脑电滤波服务 8100端口测试工具【统计逻辑专项优化版】 +优化点: +1. 5秒预热(250个发包),预热结束后才启动丢包/数据统计 +2. 业务比例:0.02s发1包,200ms收1包 → 每 10 个发包对应 1 个回包 +3. 通道校验:发送(5,66) 仅对比前64通道,接收(50,64)全通道比对 +4. 区分:全局总包数 / 有效统计区间包数、理论收包数、实际收包数、丢包数、丢包率 +5. 新增64通道整体数据均值/极值比对,校验数据有效性 +通信规范:send_multipart([client_id, b"", data_buf]) 三帧报文,服务端 recv_multipart 长度=3 """ import sys import time @@ -20,33 +22,41 @@ from matplotlib.animation import FuncAnimation # ===================== 全局前置:修复Matplotlib中文字体 & 负号显示 ===================== plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "WenQuanYi Micro Hei"] -plt.rcParams["axes.unicode_minus"] = False # 解决负号显示异常 +plt.rcParams["axes.unicode_minus"] = False -# ===================== 【1. 全局可配置参数区】 ===================== +# ===================== 【1. 全局业务固定参数(核心统计规则)】 ===================== # ZMQ 服务端配置 -ZMQ_SERVER_IP = "127.0.0.1" +ZMQ_SERVER_IP = "192.168.254.102" ZMQ_SERVER_PORT = 8100 ZMQ_SOCKET_TIMEOUT = 3000 # 套接字超时(ms) -POLL_TIMEOUT = 10 # Poll轮询超时(ms),不影响发包时序 +POLL_TIMEOUT = 10 # Poll轮询超时(ms) -# 数据报文配置(严格对齐业务) -PKG_SEND_SHAPE = (5, 66) # 发送包 shape (点数, 总通道) -PKG_RECV_SHAPE = (50, 64) # 滤波回包 shape (点数, 脑电通道) -SEND_INTERVAL = 0.02 # 上位机发包间隔 20ms -SAMPLE_RATE = 250 # 采样率 Hz +# 时序 & 统计核心规则(严格对齐现场业务) +SEND_INTERVAL = 0.02 # 上位机发包间隔:20ms/包 +RECV_INTERVAL = 0.2 # 服务端回包间隔:200ms/包 +PREHEAT_SECONDS = 5.0 # 滤波缓存预热时长:5秒 +# 计算:预热需要的发包总数 = 预热时长 / 单包发送间隔 +PREHEAT_SEND_PACKS = int(PREHEAT_SECONDS / SEND_INTERVAL) # 5 / 0.02 = 250 包 +# 收发包比例:每多少个发包对应1个回包 +PACK_RATIO = int(RECV_INTERVAL / SEND_INTERVAL) # 0.2 / 0.02 = 10 -# 通道定义 -CH_EEG = 64 +# 数据报文形状 +PKG_SEND_SHAPE = (5, 66) # 发送包 (点数, 总通道) +PKG_RECV_SHAPE = (50, 64) # 回包 (点数, 有效脑电通道) +SAMPLE_RATE = 250 + +# 通道定义(对比仅使用前64路脑电通道) +CH_EEG_VALID = 64 # 共同对比通道数:0~63 CH_EVENT = 64 CH_RESERVED = 65 -# ZMQ 三帧报文固定字段(和你服务端代码完全一致) +# ZMQ 三帧报文固定字段 CLIENT_ID = b"test_client_001" EMPTY_FRAME = b"" -# 仿真信号配置(可自由调参测试滤波) +# 仿真信号配置 TARGET_CHANNEL = 0 -SIGNAL_FREQ_LIST = [10.0, 22.0] +SIGNAL_FREQ_LIST = [3, 10, 36] SIGNAL_AMP = 1.8 NOISE_GAUSSIAN_AMP = 0.4 NOISE_POWER50_AMP = 0.3 @@ -64,21 +74,32 @@ MAX_RUN_SECONDS = None ENABLE_RECONNECT = True PRINT_STAT_INTERVAL = 5.0 -# ===================== 【2. 全局变量 & 线程安全】 ===================== +# ===================== 【2. 全局变量 + 统计结构体(重构统计逻辑)】 ===================== g_running = threading.Event() g_running.set() data_lock = threading.Lock() -# 绘图数据缓冲区 +# 绘图缓冲区 raw_data_buf = deque(maxlen=MAX_PLOT_POINTS) filt_data_buf = deque(maxlen=MAX_PLOT_POINTS) -# 运行统计 +# ===================== 全新统计变量(区分预热/正式统计) ===================== stat = { - "send_cnt": 0, - "recv_cnt": 0, + # 全局总包数(包含预热包) + "total_send": 0, + "total_recv": 0, + + # 有效统计区间(预热250包之后) + "valid_send": 0, # 有效发包数 + "valid_recv": 0, # 有效收包数 + "theo_recv": 0, # 理论应收到包数 = valid_send // PACK_RATIO + + # 运行时间 "start_time": time.perf_counter(), - "last_print_time": time.perf_counter() + "last_print_time": time.perf_counter(), + + # 数据校验缓存:保存最新一包原始64通道数据,用于和回包比对 + "latest_raw_64ch": None } # ===================== 【3. 日志配置】 ===================== @@ -95,15 +116,15 @@ logger = init_logger() # ===================== 【4. 仿真脑电数据生成 (5,66)】 ===================== def generate_eeg_packet(pkt_idx: int) -> np.ndarray: - """生成单包 (5,66) 仿真数据:脑电+噪声+工频+事件通道+保留通道""" + """生成单包 (5,66) 仿真数据""" n_point, n_chan = PKG_SEND_SHAPE base_t = pkt_idx * n_point / SAMPLE_RATE t_arr = base_t + np.arange(n_point) / SAMPLE_RATE data = np.zeros((n_point, n_chan), dtype=np.float64) - # 64路脑电:多频信号 + 50Hz工频 + 高斯白噪声 - for ch in range(CH_EEG): + # 64路脑电信号 + for ch in range(CH_EEG_VALID): sig = 0.0 for freq in SIGNAL_FREQ_LIST: sig += SIGNAL_AMP * np.sin(2 * np.pi * freq * t_arr) @@ -111,58 +132,51 @@ def generate_eeg_packet(pkt_idx: int) -> np.ndarray: sig += NOISE_GAUSSIAN_AMP * np.random.randn(n_point) data[:, ch] = sig - # 事件通道、保留通道赋值 + # 事件通道、保留通道 data[:, CH_EVENT] = EVENT_LABEL_VAL data[:, CH_RESERVED] = RESERVED_VAL return data -# ===================== 【5. 核心修复:单DEALER连接 + Poller 同时收发】 ===================== +# ===================== 【5. ZMQ 核心IO线程(单连接+Poller,保留原有通信逻辑)】 ===================== def zmq_io_thread(): - """ - 唯一ZMQ工作线程:单个DEALER连接,同时发包+收包(对齐真实上位机) - 使用 Poller 多路复用,避免阻塞、超时报错 - """ context = zmq.Context() pkt_index = 0 send_interval = SEND_INTERVAL + logger.info(f"滤波预热配置:{PREHEAT_SECONDS}秒 / {PREHEAT_SEND_PACKS} 个发包后开始统计") + logger.info(f"收发比例:每 {PACK_RATIO} 个发包 → 1 个滤波回包") + while g_running.is_set(): try: - # 新建 DEALER 套接字(全局唯一连接) sock = context.socket(zmq.DEALER) sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT) sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT) sock.connect(f"tcp://{ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}") logger.info(f"ZMQ 连接成功 -> {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}") - # 注册Poller:监听当前套接字的可读事件 poller = zmq.Poller() poller.register(sock, zmq.POLLIN) - - # 精准发包计时(消除sleep漂移) next_send_ts = time.perf_counter() while g_running.is_set(): - # 1. 运行时长限制判断 + # 全局运行时长限制 if MAX_RUN_SECONDS is not None: run_sec = time.perf_counter() - stat["start_time"] if run_sec > MAX_RUN_SECONDS: logger.info(f"已到达设定运行时长 {MAX_RUN_SECONDS}s,停止任务") return - # 2. Poll 轮询:有数据就接收,无数据继续执行发包逻辑 + # ========== 1. 轮询接收服务端回包 ========== socks_ready = dict(poller.poll(POLL_TIMEOUT)) if sock in socks_ready: - # ========== 接收服务端回包 (multipart) ========== frames = sock.recv_multipart() if not frames: continue - # 取最后一帧为有效滤波数据 recv_bytes = frames[-1] if not recv_bytes: continue - # 解析为 (50,64) float64 + # 解析回包 (50,64) filt_data = np.frombuffer(recv_bytes, dtype=np.float64) expect_size = PKG_RECV_SHAPE[0] * PKG_RECV_SHAPE[1] if filt_data.size != expect_size: @@ -170,42 +184,89 @@ def zmq_io_thread(): continue filt_data = filt_data.reshape(PKG_RECV_SHAPE) - # 统计 + 写入绘图缓冲区 - stat["recv_cnt"] += 1 + # 全局收包计数 + stat["total_recv"] += 1 + + # 仅预热完成后,计入有效统计收包 + if stat["total_send"] > PREHEAT_SEND_PACKS: + stat["valid_recv"] += 1 + + # 写入绘图缓冲区 with data_lock: filt_data_buf.extend(filt_data[:, TARGET_CHANNEL]) - # 定时打印运行状态 - now = time.perf_counter() - if now - stat["last_print_time"] > PRINT_STAT_INTERVAL: - run_sec = now - stat["start_time"] - loss_rate = (stat["send_cnt"] - stat["recv_cnt"]) / stat["send_cnt"] * 100 if stat["send_cnt"] > 0 else 0.0 - logger.info( - f"运行:{run_sec:.1f}s | 发包:{stat['send_cnt']} | 收包:{stat['recv_cnt']} | 丢包率:{loss_rate:.2f}%" + # ---------- 新增:64通道数据比对(发包前64通道 <-> 回包64通道) ---------- + raw_64ch = stat["latest_raw_64ch"] + if raw_64ch is not None: + raw_mean = np.mean(raw_64ch) + filt_mean = np.mean(filt_data) + raw_amp = np.max(np.abs(raw_64ch)) + filt_amp = np.max(np.abs(filt_data)) + logger.debug( + f"【通道数据比对】原始64通道均值:{raw_mean:.4f} 幅值:{raw_amp:.4f} | " + f"滤波后均值:{filt_mean:.4f} 幅值:{filt_amp:.4f}" ) - stat["last_print_time"] = now - # 3. 精准定时发包(严格20ms间隔) + # ========== 2. 精准定时发送数据包 ========== current_ts = time.perf_counter() if current_ts >= next_send_ts: - # 生成 (5,66) 仿真数据包 + # 生成(5,66)仿真包 pkt_data = generate_eeg_packet(pkt_index) pkt_index += 1 send_buf = pkt_data.tobytes() - # ========== 三帧Multipart发送(和你服务端代码完全一致) ========== + # 标准三帧Multipart发送 sock.send_multipart([CLIENT_ID, EMPTY_FRAME, send_buf]) - # 统计 + 写入原始数据缓冲区 - stat["send_cnt"] += 1 + # ---------- 发包计数逻辑(核心优化:预热区分) ---------- + stat["total_send"] += 1 + # 预热完成后,计入有效发包 + if stat["total_send"] > PREHEAT_SEND_PACKS: + stat["valid_send"] += 1 + # 计算理论应收包数 + stat["theo_recv"] = stat["valid_send"] // PACK_RATIO + + # 缓存当前包前64通道,用于后续数据比对 + stat["latest_raw_64ch"] = pkt_data[:, :CH_EEG_VALID] + + # 绘图缓冲区(单通道波形) with data_lock: raw_data_buf.extend(pkt_data[:, TARGET_CHANNEL]) - # 更新下一次发包时间戳 + # 更新下一次发包时间 next_send_ts += send_interval + # ========== 3. 定时打印统计信息(区分预热/正式统计) ========== + now = time.perf_counter() + if now - stat["last_print_time"] > PRINT_STAT_INTERVAL: + run_sec = now - stat["start_time"] + total_send = stat["total_send"] + total_recv = stat["total_recv"] + + # 分支1:仍在预热阶段 + if total_send <= PREHEAT_SEND_PACKS: + remain = PREHEAT_SEND_PACKS - total_send + logger.info( + f"[预热中] 运行:{run_sec:.1f}s | 已发包:{total_send}/{PREHEAT_SEND_PACKS} | " + f"剩余预热包:{remain} | 暂不统计丢包" + ) + # 分支2:预热完成,进入正式统计 + else: + v_send = stat["valid_send"] + v_recv = stat["valid_recv"] + t_recv = stat["theo_recv"] + loss_cnt = t_recv - v_recv + loss_rate = (loss_cnt / t_recv * 100) if t_recv > 0 else 0.0 + + logger.info( + f"[正式统计] 运行:{run_sec:.1f}s | " + f"全局总包: 发{total_send}/收{total_recv} | " + f"有效区间: 发{v_send}/应收{t_recv}/实收{v_recv} | " + f"丢包数:{loss_cnt} | 丢包率:{loss_rate:.2f}%" + ) + stat["last_print_time"] = now + except zmq.ZMQError as e: - # 区分正常超时 和 网络异常 if e.errno == zmq.EAGAIN: continue logger.warning(f"ZMQ 连接异常: {e}") @@ -222,7 +283,7 @@ def zmq_io_thread(): context.term() logger.info("ZMQ IO 线程已退出") -# ===================== 【6. 可视化绘图(无逻辑改动,已前置修复字体)】 ===================== +# ===================== 【6. 可视化绘图(无改动)】 ===================== def init_plot(): fig = plt.figure(figsize=(14, 9)) fig.suptitle(f"脑电滤波测试 | 观测通道: {TARGET_CHANNEL}", fontsize=14) @@ -264,7 +325,6 @@ def update_plot(frame, lines, axes): raw_data = list(raw_data_buf) filt_data = list(filt_data_buf) - # 时域波形 if raw_data: x_raw = np.arange(len(raw_data)) line_raw.set_data(x_raw, raw_data) @@ -277,7 +337,6 @@ def update_plot(frame, lines, axes): ax2.relim() ax2.autoscale_view() - # 频谱计算(汉宁窗减少频谱泄露) def calc_fft(sig, n_fft): if len(sig) < n_fft: return [], [] @@ -300,7 +359,7 @@ def update_plot(frame, lines, axes): return lines -# ===================== 【7. 资源释放 & 主入口】 ===================== +# ===================== 【7. 资源释放 & 最终汇总统计】 ===================== def clean_resource(): g_running.clear() logger.info("开始停止所有线程...") @@ -309,18 +368,19 @@ def clean_resource(): logger.info("资源释放完成") def main(): - logger.info("=" * 60) - logger.info("脑电滤波测试客户端 【修复版】启动") + logger.info("=" * 70) + logger.info("脑电滤波测试客户端【统计逻辑优化版】启动") logger.info(f"服务端地址: {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}") - logger.info(f"发包格式: {PKG_SEND_SHAPE} | 间隔: {SEND_INTERVAL*1000:.0f}ms") - logger.info(f"回包格式: {PKG_RECV_SHAPE} | ZMQ三帧报文 [客户端ID, 空帧, 数据帧]") - logger.info("=" * 60) + logger.info(f"发包: {PKG_SEND_SHAPE}({SEND_INTERVAL*1000:.0f}ms) | 回包: {PKG_RECV_SHAPE}({RECV_INTERVAL*1000:.0f}ms)") + logger.info(f"预热规则: {PREHEAT_SECONDS}秒 / {PREHEAT_SEND_PACKS} 包后开启统计") + logger.info(f"收发比例: 每 {PACK_RATIO} 个发包对应 1 个回包") + logger.info("=" * 70) - # 启动唯一ZMQ收发线程 + # 启动ZMQ收发线程 io_thread = threading.Thread(target=zmq_io_thread, daemon=True, name="ZMQ_IO_Thread") io_thread.start() - # 启动可视化绘图 + # 启动可视化 fig, lines, axes = init_plot() ani = FuncAnimation( fig, update_plot, @@ -330,20 +390,30 @@ def main(): cache_frame_data=False ) - # 主线程阻塞,监听关闭 try: plt.show() except KeyboardInterrupt: logger.info("收到 Ctrl+C 中断信号,准备退出") finally: - # 输出最终统计 + # 输出最终完整汇总报表 run_total = time.perf_counter() - stat["start_time"] - loss_rate = (stat["send_cnt"] - stat["recv_cnt"]) / stat["send_cnt"] * 100 if stat["send_cnt"] > 0 else 0.0 - logger.info(f"\n===== 运行汇总 =====") + total_send = stat["total_send"] + total_recv = stat["total_recv"] + v_send = stat["valid_send"] + v_recv = stat["valid_recv"] + t_recv = stat["theo_recv"] + + loss_cnt = t_recv - v_recv + loss_rate = (loss_cnt / t_recv * 100) if t_recv > 0 else 0.0 + + logger.info(f"\n{'='*50} 最终运行汇总 {'='*50}") logger.info(f"总运行时长: {run_total:.1f} s") - logger.info(f"总发包数: {stat['send_cnt']}") - logger.info(f"总收包数: {stat['recv_cnt']}") - logger.info(f"整体丢包率: {loss_rate:.2f} %") + logger.info(f"【全局总包数】发送: {total_send} | 接收: {total_recv}") + logger.info(f"【有效统计区间(跳过预热{PREHEAT_SEND_PACKS}包)】") + logger.info(f" 有效发包: {v_send} | 理论应收包: {t_recv} | 实际收包: {v_recv}") + logger.info(f" 总丢包数: {loss_cnt} | 整体丢包率: {loss_rate:.2f} %") + logger.info(f"{'='*106}") + clean_resource() sys.exit(0)