@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
"""
脑电滤波服务 8100端口测试工具【最终修复 版】
修复: 1. Matplotlib中文字体乱码 2. ZMQ双连接收不到数据问题
通信规范:
上位机 -> 服务端: send_multipart([client_id, b " " , data_buf]) 共3帧
服务端 recv_multipart() 帧长度 = 3
时序: 每20ms(0.02s)发送一包 (5,66), 服务端200ms回传 (50,64)
脑电滤波服务 8100端口测试工具【统计逻辑专项优化 版】
优化点:
1. 5秒预热(250个发包),预热结束后才启动丢包/数据统计
2. 业务比例: 0.02s发1包, 200ms收1包 → 每 10 个发包对应 1 个回包
3. 通道校验:发送(5,66) 仅对比前64通道, 接收(50,64)全通道比对
4. 区分:全局总包数 / 有效统计区间包数、理论收包数、实际收包数、丢包数、丢包率
5. 新增64通道整体数据均值/极值比对,校验数据有效性
通信规范: send_multipart([client_id, b " " , data_buf]) 三帧报文,服务端 recv_multipart 长度=3
"""
import sys
import time
@@ -20,33 +22,41 @@ from matplotlib.animation import FuncAnimation
# ===================== 全局前置: 修复Matplotlib中文字体 & 负号显示 =====================
plt . rcParams [ " font.sans-serif " ] = [ " SimHei " , " Microsoft YaHei " , " WenQuanYi Micro Hei " ]
plt . rcParams [ " axes.unicode_minus " ] = False # 解决负号显示异常
plt . rcParams [ " axes.unicode_minus " ] = False
# ===================== 【1. 全局可配置参数区 】 =====================
# ===================== 【1. 全局业务固定参数(核心统计规则) 】 =====================
# ZMQ 服务端配置
ZMQ_SERVER_IP = " 127.0.0.1 "
ZMQ_SERVER_IP = " 192.168.254.102 "
ZMQ_SERVER_PORT = 8100
ZMQ_SOCKET_TIMEOUT = 3000 # 套接字超时(ms)
POLL_TIMEOUT = 10 # Poll轮询超时(ms),不影响发包时序
POLL_TIMEOUT = 10 # Poll轮询超时(ms)
# 数据报文配置 (严格对齐业务)
PKG_ SEND_SHAPE = ( 5 , 66 ) # 发送包 shape (点数, 总通道)
PKG_ RECV_SHAPE = ( 50 , 64 ) # 滤波回包 shape (点数, 脑电通道)
SEND_INTERVAL = 0 .02 # 上位机发包间隔 20ms
SAMPLE_RATE = 250 # 采样率 Hz
# 时序 & 统计核心规则 (严格对齐现场 业务)
SEND_INTERVAL = 0.02 # 上位机发包间隔: 20ms/包
RECV_INTERVAL = 0.2 # 服务端回包间隔: 200ms/包
PREHEAT_SECONDS = 5 .0 # 滤波缓存预热时长: 5秒
# 计算:预热需要的发包总数 = 预热时长 / 单包发送间隔
PREHEAT_SEND_PACKS = int ( PREHEAT_SECONDS / SEND_INTERVAL ) # 5 / 0.02 = 250 包
# 收发包比例: 每多少个发包对应1个回包
PACK_RATIO = int ( RECV_INTERVAL / SEND_INTERVAL ) # 0.2 / 0.02 = 10
# 通道定义
CH_EEG = 64
# 数据报文形状
PKG_SEND_SHAPE = ( 5 , 66 ) # 发送包 (点数, 总通道)
PKG_RECV_SHAPE = ( 50 , 64 ) # 回包 (点数, 有效脑电通道)
SAMPLE_RATE = 250
# 通道定义( 对比仅使用前64路脑电通道)
CH_EEG_VALID = 64 # 共同对比通道数: 0~63
CH_EVENT = 64
CH_RESERVED = 65
# ZMQ 三帧报文固定字段(和你服务端代码完全一致)
# ZMQ 三帧报文固定字段
CLIENT_ID = b " test_client_001 "
EMPTY_FRAME = b " "
# 仿真信号配置(可自由调参测试滤波)
# 仿真信号配置
TARGET_CHANNEL = 0
SIGNAL_FREQ_LIST = [ 10.0 , 22.0 ]
SIGNAL_FREQ_LIST = [ 3 , 10 , 36 ]
SIGNAL_AMP = 1.8
NOISE_GAUSSIAN_AMP = 0.4
NOISE_POWER50_AMP = 0.3
@@ -64,21 +74,32 @@ MAX_RUN_SECONDS = None
ENABLE_RECONNECT = True
PRINT_STAT_INTERVAL = 5.0
# ===================== 【2. 全局变量 & 线程安全 】 =====================
# ===================== 【2. 全局变量 + 统计结构体(重构统计逻辑) 】 =====================
g_running = threading . Event ( )
g_running . set ( )
data_lock = threading . Lock ( )
# 绘图数据 缓冲区
# 绘图缓冲区
raw_data_buf = deque ( maxlen = MAX_PLOT_POINTS )
filt_data_buf = deque ( maxlen = MAX_PLOT_POINTS )
# 运行统计
# ===================== 全新统计变量(区分预热/正式统计) =====================
stat = {
" send_cnt " : 0 ,
" recv_cnt " : 0 ,
# 全局总包数(包含预热包)
" total_send " : 0 ,
" total_recv " : 0 ,
# 有效统计区间( 预热250包之后)
" valid_send " : 0 , # 有效发包数
" valid_recv " : 0 , # 有效收包数
" theo_recv " : 0 , # 理论应收到包数 = valid_send // PACK_RATIO
# 运行时间
" start_time " : time . perf_counter ( ) ,
" last_print_time " : time . perf_counter ( )
" last_print_time " : time . perf_counter ( ) ,
# 数据校验缓存: 保存最新一包原始64通道数据, 用于和回包比对
" latest_raw_64ch " : None
}
# ===================== 【3. 日志配置】 =====================
@@ -95,15 +116,15 @@ logger = init_logger()
# ===================== 【4. 仿真脑电数据生成 (5,66)】 =====================
def generate_eeg_packet ( pkt_idx : int ) - > np . ndarray :
""" 生成单包 (5,66) 仿真数据:脑电+噪声+工频+事件通道+保留通道 """
""" 生成单包 (5,66) 仿真数据 """
n_point , n_chan = PKG_SEND_SHAPE
base_t = pkt_idx * n_point / SAMPLE_RATE
t_arr = base_t + np . arange ( n_point ) / SAMPLE_RATE
data = np . zeros ( ( n_point , n_chan ) , dtype = np . float64 )
# 64路脑电:多频信号 + 50Hz工频 + 高斯白噪声
for ch in range ( CH_EEG ) :
# 64路脑电信号
for ch in range ( CH_EEG_VALID ) :
sig = 0.0
for freq in SIGNAL_FREQ_LIST :
sig + = SIGNAL_AMP * np . sin ( 2 * np . pi * freq * t_arr )
@@ -111,58 +132,51 @@ def generate_eeg_packet(pkt_idx: int) -> np.ndarray:
sig + = NOISE_GAUSSIAN_AMP * np . random . randn ( n_point )
data [ : , ch ] = sig
# 事件通道、保留通道赋值
# 事件通道、保留通道
data [ : , CH_EVENT ] = EVENT_LABEL_VAL
data [ : , CH_RESERVED ] = RESERVED_VAL
return data
# ===================== 【5. 核心修复: 单DEALER连接 + Poller 同时收发 】 =====================
# ===================== 【5. ZMQ 核心IO线程( 单连接+Poller, 保留原有通信逻辑) 】 =====================
def zmq_io_thread ( ) :
"""
唯一ZMQ工作线程: 单个DEALER连接, 同时发包+收包(对齐真实上位机)
使用 Poller 多路复用,避免阻塞、超时报错
"""
context = zmq . Context ( )
pkt_index = 0
send_interval = SEND_INTERVAL
logger . info ( f " 滤波预热配置: { PREHEAT_SECONDS } 秒 / { PREHEAT_SEND_PACKS } 个发包后开始统计 " )
logger . info ( f " 收发比例:每 { PACK_RATIO } 个发包 → 1 个滤波回包 " )
while g_running . is_set ( ) :
try :
# 新建 DEALER 套接字(全局唯一连接)
sock = context . socket ( zmq . DEALER )
sock . setsockopt ( zmq . RCVTIMEO , ZMQ_SOCKET_TIMEOUT )
sock . setsockopt ( zmq . SNDTIMEO , ZMQ_SOCKET_TIMEOUT )
sock . connect ( f " tcp:// { ZMQ_SERVER_IP } : { ZMQ_SERVER_PORT } " )
logger . info ( f " ZMQ 连接成功 -> { ZMQ_SERVER_IP } : { ZMQ_SERVER_PORT } " )
# 注册Poller: 监听当前套接字的可读事件
poller = zmq . Poller ( )
poller . register ( sock , zmq . POLLIN )
# 精准发包计时( 消除sleep漂移)
next_send_ts = time . perf_counter ( )
while g_running . is_set ( ) :
# 1. 运行时长限制判断
# 全局 运行时长限制
if MAX_RUN_SECONDS is not None :
run_sec = time . perf_counter ( ) - stat [ " start_time " ]
if run_sec > MAX_RUN_SECONDS :
logger . info ( f " 已到达设定运行时长 { MAX_RUN_SECONDS } s, 停止任务 " )
return
# 2. Poll 轮询:有数据就接收,无数据继续执行发包逻辑
# ========== 1. 轮询接收服务端回包 ==========
socks_ready = dict ( poller . poll ( POLL_TIMEOUT ) )
if sock in socks_ready :
# ========== 接收服务端回包 (multipart) ==========
frames = sock . recv_multipart ( )
if not frames :
continue
# 取最后一帧为有效滤波数据
recv_bytes = frames [ - 1 ]
if not recv_bytes :
continue
# 解析为 (50,64) float64
# 解析回包 (50,64)
filt_data = np . frombuffer ( recv_bytes , dtype = np . float64 )
expect_size = PKG_RECV_SHAPE [ 0 ] * PKG_RECV_SHAPE [ 1 ]
if filt_data . size != expect_size :
@@ -170,42 +184,89 @@ def zmq_io_thread():
continue
filt_data = filt_data . reshape ( PKG_RECV_SHAPE )
# 统计 + 写入绘图缓冲区
stat [ " recv_cnt " ] + = 1
# 全局收包计数
stat [ " total_ recv" ] + = 1
# 仅预热完成后,计入有效统计收包
if stat [ " total_send " ] > PREHEAT_SEND_PACKS :
stat [ " valid_recv " ] + = 1
# 写入绘图缓冲区
with data_lock :
filt_data_buf . extend ( filt_data [ : , TARGET_CHANNEL ] )
# 定时打印运行状态
now = time . perf_counter ( )
if now - stat [ " last_print_time " ] > PRINT_STAT_INTERVAL :
run_sec = now - stat [ " start_time " ]
loss_rate = ( stat [ " send_cnt " ] - stat [ " recv_cnt " ] ) / stat [ " send_cnt " ] * 100 if stat [ " send_cnt " ] > 0 else 0.0
logger . info (
f " 运行: { run_sec : .1f } s | 发包: { stat [ ' send_cnt ' ] } | 收包: { stat [ ' recv_cnt ' ] } | 丢包率: { loss_rate : .2f } % "
# ---------- 新增: 64通道数据比对( 发包前64通道 <-> 回包64通道) ----------
raw_64ch = stat [ " latest_raw_64ch " ]
if raw_64ch is not None :
raw_mean = np . mean ( raw_64ch )
filt_mean = np . mean ( filt_data )
raw_amp = np . max ( np . abs ( raw_64ch ) )
filt_amp = np . max ( np . abs ( filt_data ) )
logger . debug (
f " 【通道数据比对】原始64通道均值: { raw_mean : .4f } 幅值: { raw_amp : .4f } | "
f " 滤波后均值: { filt_mean : .4f } 幅值: { filt_amp : .4f } "
)
stat [ " last_print_time " ] = now
# 3. 精准定时发包( 严格20ms间隔)
# ========== 2. 精准定时发送数据包 ==========
current_ts = time . perf_counter ( )
if current_ts > = next_send_ts :
# 生成 (5,66) 仿真数据 包
# 生成(5,66)仿真包
pkt_data = generate_eeg_packet ( pkt_index )
pkt_index + = 1
send_buf = pkt_data . tobytes ( )
# ========== 三帧Multipart发送(和你服务端代码完全一致) ==========
# 标准 三帧Multipart发送
sock . send_multipart ( [ CLIENT_ID , EMPTY_FRAME , send_buf ] )
# 统计 + 写入原始数据缓冲区
stat [ " send_cnt " ] + = 1
# ---------- 发包计数逻辑(核心优化:预热区分) ----------
stat [ " total_ send" ] + = 1
# 预热完成后,计入有效发包
if stat [ " total_send " ] > PREHEAT_SEND_PACKS :
stat [ " valid_send " ] + = 1
# 计算理论应收包数
stat [ " theo_recv " ] = stat [ " valid_send " ] / / PACK_RATIO
# 缓存当前包前64通道, 用于后续数据比对
stat [ " latest_raw_64ch " ] = pkt_data [ : , : CH_EEG_VALID ]
# 绘图缓冲区(单通道波形)
with data_lock :
raw_data_buf . extend ( pkt_data [ : , TARGET_CHANNEL ] )
# 更新下一次发包时间戳
# 更新下一次发包时间
next_send_ts + = send_interval
# ========== 3. 定时打印统计信息(区分预热/正式统计) ==========
now = time . perf_counter ( )
if now - stat [ " last_print_time " ] > PRINT_STAT_INTERVAL :
run_sec = now - stat [ " start_time " ]
total_send = stat [ " total_send " ]
total_recv = stat [ " total_recv " ]
# 分支1: 仍在预热阶段
if total_send < = PREHEAT_SEND_PACKS :
remain = PREHEAT_SEND_PACKS - total_send
logger . info (
f " [预热中] 运行: { run_sec : .1f } s | 已发包: { total_send } / { PREHEAT_SEND_PACKS } | "
f " 剩余预热包: { remain } | 暂不统计丢包 "
)
# 分支2: 预热完成, 进入正式统计
else :
v_send = stat [ " valid_send " ]
v_recv = stat [ " valid_recv " ]
t_recv = stat [ " theo_recv " ]
loss_cnt = t_recv - v_recv
loss_rate = ( loss_cnt / t_recv * 100 ) if t_recv > 0 else 0.0
logger . info (
f " [正式统计] 运行: { run_sec : .1f } s | "
f " 全局总包: 发 { total_send } /收 { total_recv } | "
f " 有效区间: 发 { v_send } /应收 { t_recv } /实收 { v_recv } | "
f " 丢包数: { loss_cnt } | 丢包率: { loss_rate : .2f } % "
)
stat [ " last_print_time " ] = now
except zmq . ZMQError as e :
# 区分正常超时 和 网络异常
if e . errno == zmq . EAGAIN :
continue
logger . warning ( f " ZMQ 连接异常: { e } " )
@@ -222,7 +283,7 @@ def zmq_io_thread():
context . term ( )
logger . info ( " ZMQ IO 线程已退出 " )
# ===================== 【6. 可视化绘图(无逻辑改动,已前置修复字体 )】 =====================
# ===================== 【6. 可视化绘图(无改动 )】 =====================
def init_plot ( ) :
fig = plt . figure ( figsize = ( 14 , 9 ) )
fig . suptitle ( f " 脑电滤波测试 | 观测通道: { TARGET_CHANNEL } " , fontsize = 14 )
@@ -264,7 +325,6 @@ def update_plot(frame, lines, axes):
raw_data = list ( raw_data_buf )
filt_data = list ( filt_data_buf )
# 时域波形
if raw_data :
x_raw = np . arange ( len ( raw_data ) )
line_raw . set_data ( x_raw , raw_data )
@@ -277,7 +337,6 @@ def update_plot(frame, lines, axes):
ax2 . relim ( )
ax2 . autoscale_view ( )
# 频谱计算(汉宁窗减少频谱泄露)
def calc_fft ( sig , n_fft ) :
if len ( sig ) < n_fft :
return [ ] , [ ]
@@ -300,7 +359,7 @@ def update_plot(frame, lines, axes):
return lines
# ===================== 【7. 资源释放 & 主入口 】 =====================
# ===================== 【7. 资源释放 & 最终汇总统计 】 =====================
def clean_resource ( ) :
g_running . clear ( )
logger . info ( " 开始停止所有线程... " )
@@ -309,18 +368,19 @@ def clean_resource():
logger . info ( " 资源释放完成 " )
def main ( ) :
logger . info ( " = " * 6 0)
logger . info ( " 脑电滤波测试客户端 【修复 版】启动 " )
logger . info ( " = " * 7 0)
logger . info ( " 脑电滤波测试客户端【统计逻辑优化 版】启动 " )
logger . info ( f " 服务端地址: { ZMQ_SERVER_IP } : { ZMQ_SERVER_PORT } " )
logger . info ( f " 发包格式 : { PKG_SEND_SHAPE } | 间隔 : { SEND _INTERVAL* 1000 : .0f } ms " )
logger . info ( f " 回包格式 : { PKG_RECV_SHAPE } | ZMQ三帧报文 [客户端ID, 空帧, 数据帧] " )
logger . info ( " = " * 60 )
logger . info ( f " 发包: { PKG_SEND_SHAPE } ( { SEND_INTERVAL * 1000 : .0f } ms) | 回包 : { PKG_RECV_SHAPE } ( { RECV _INTERVAL* 1000 : .0f } ms) " )
logger . info ( f " 预热规则 : { PREHEAT_SECONDS } 秒 / { PREHEAT_SEND_PACKS } 包后开启统计 " )
logger . info ( f " 收发比例: 每 { PACK_RATIO } 个发包对应 1 个回包 " )
logger . info ( " = " * 70 )
# 启动唯一 ZMQ收发线程
# 启动ZMQ收发线程
io_thread = threading . Thread ( target = zmq_io_thread , daemon = True , name = " ZMQ_IO_Thread " )
io_thread . start ( )
# 启动可视化绘图
# 启动可视化
fig , lines , axes = init_plot ( )
ani = FuncAnimation (
fig , update_plot ,
@@ -330,20 +390,30 @@ def main():
cache_frame_data = False
)
# 主线程阻塞,监听关闭
try :
plt . show ( )
except KeyboardInterrupt :
logger . info ( " 收到 Ctrl+C 中断信号,准备退出 " )
finally :
# 输出最终统计
# 输出最终完整汇总报表
run_total = time . perf_counter ( ) - stat [ " start_time " ]
loss_rate = ( stat [ " send_cnt " ] - stat [ " recv_cnt " ] ) / stat [ " send_cnt " ] * 100 if stat [ " send_cnt " ] > 0 else 0.0
logger . info ( f " \n ===== 运行汇总 ===== " )
total_send = stat [ " total_send " ]
total_recv = stat [ " total_recv " ]
v_send = stat [ " valid_send " ]
v_recv = stat [ " valid_recv " ]
t_recv = stat [ " theo_recv " ]
loss_cnt = t_recv - v_recv
loss_rate = ( loss_cnt / t_recv * 100 ) if t_recv > 0 else 0.0
logger . info ( f " \n { ' = ' * 50 } 最终运行汇总 { ' = ' * 50 } " )
logger . info ( f " 总运行时长: { run_total : .1f } s " )
logger . info ( f " 总发包数: { stat [ ' send_cnt ' ] } " )
logger . info ( f " 总收包数: { stat [ ' recv_cnt ' ] } " )
logger . info ( f " 整体丢包率 : { loss_rate : .2f } % " )
logger . info ( f " 【全局总包数】发送: { total_send } | 接收: { total_recv } " )
logger . info ( f " 【有效统计区间(跳过预热 { PREHEAT_SEND_PACKS } 包)】 " )
logger . info ( f " 有效发包 : { v_send } | 理论应收包: { t_recv } | 实际收包: { v_recv } " )
logger . info ( f " 总丢包数: { loss_cnt } | 整体丢包率: { loss_rate : .2f } % " )
logger . info ( f " { ' = ' * 106 } " )
clean_resource ( )
sys . exit ( 0 )