Compare commits
3 Commits
694321b52c
...
7b5f4f6eb9
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b5f4f6eb9 | |||
| 0cffd1ae02 | |||
| 0e5e79fcdd |
@@ -128,7 +128,7 @@ class SlidingFilter(threading.Thread):
|
|||||||
# 8~30Hz带通FIR(65阶,线性相位)
|
# 8~30Hz带通FIR(65阶,线性相位)
|
||||||
self.b_bp = signal.firwin(
|
self.b_bp = signal.firwin(
|
||||||
numtaps=65,
|
numtaps=65,
|
||||||
cutoff=[8/(self.srate/2), 30/(self.srate/2)],
|
cutoff=[0.5/(self.srate/2), 45/(self.srate/2)],
|
||||||
pass_zero=False,
|
pass_zero=False,
|
||||||
window='hamming'
|
window='hamming'
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ class zmqServer(threading.Thread):
|
|||||||
|
|
||||||
self.host = host
|
self.host = host
|
||||||
|
|
||||||
# test_host = "10.200.27.140"
|
# test_host = "192.168.254.102"
|
||||||
# self.host = test_host
|
# self.host = test_host
|
||||||
|
|
||||||
self.cmd_port = cmd_port # 命令交互端口:收JSON命令 + 返JSON结果
|
self.cmd_port = cmd_port # 命令交互端口:收JSON命令 + 返JSON结果
|
||||||
@@ -180,7 +180,7 @@ class zmqServer(threading.Thread):
|
|||||||
# 转置为上位机需要的[50, 通道数]格式
|
# 转置为上位机需要的[50, 通道数]格式
|
||||||
filtered_data = filtered_data.T.astype(np.float64)
|
filtered_data = filtered_data.T.astype(np.float64)
|
||||||
send_buf = filtered_data.tobytes()
|
send_buf = filtered_data.tobytes()
|
||||||
algo_log(f"发送滤波数据,长度: {len(send_buf)}字节, filtered_data.shape: {filtered_data.shape}", level="DEBUG", record_once=True)
|
algo_log(f"发送滤波数据,长度: {len(send_buf)}字节, filtered_data.shape: {filtered_data.shape}", level="DEBUG", record_once=False)
|
||||||
self.data_send_queue.put(send_buf)
|
self.data_send_queue.put(send_buf)
|
||||||
|
|
||||||
def _process_data_send_queue(self):
|
def _process_data_send_queue(self):
|
||||||
@@ -291,6 +291,8 @@ class zmqServer(threading.Thread):
|
|||||||
elif len(frames) == 3:
|
elif len(frames) == 3:
|
||||||
# 标准格式
|
# 标准格式
|
||||||
ident, empty_sep, data_bytes = frames[:3]
|
ident, empty_sep, data_bytes = frames[:3]
|
||||||
|
elif len(frames) == 2:
|
||||||
|
ident, data_bytes = frames[:2]
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
# 注册新的数据客户端(单客户端场景,自动覆盖旧身份)
|
# 注册新的数据客户端(单客户端场景,自动覆盖旧身份)
|
||||||
|
|||||||
228
filter_test.py
228
filter_test.py
@@ -1,11 +1,13 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
"""
|
"""
|
||||||
脑电滤波服务 8100端口测试工具【最终修复版】
|
脑电滤波服务 8100端口测试工具【统计逻辑专项优化版】
|
||||||
修复:1. Matplotlib中文字体乱码 2. ZMQ双连接收不到数据问题
|
优化点:
|
||||||
通信规范:
|
1. 5秒预热(250个发包),预热结束后才启动丢包/数据统计
|
||||||
上位机 -> 服务端:send_multipart([client_id, b"", data_buf]) 共3帧
|
2. 业务比例:0.02s发1包,200ms收1包 → 每 10 个发包对应 1 个回包
|
||||||
服务端 recv_multipart() 帧长度 = 3
|
3. 通道校验:发送(5,66) 仅对比前64通道,接收(50,64)全通道比对
|
||||||
时序:每20ms(0.02s)发送一包 (5,66),服务端200ms回传 (50,64)
|
4. 区分:全局总包数 / 有效统计区间包数、理论收包数、实际收包数、丢包数、丢包率
|
||||||
|
5. 新增64通道整体数据均值/极值比对,校验数据有效性
|
||||||
|
通信规范:send_multipart([client_id, b"", data_buf]) 三帧报文,服务端 recv_multipart 长度=3
|
||||||
"""
|
"""
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
@@ -20,33 +22,41 @@ from matplotlib.animation import FuncAnimation
|
|||||||
|
|
||||||
# ===================== 全局前置:修复Matplotlib中文字体 & 负号显示 =====================
|
# ===================== 全局前置:修复Matplotlib中文字体 & 负号显示 =====================
|
||||||
plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "WenQuanYi Micro Hei"]
|
plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei", "WenQuanYi Micro Hei"]
|
||||||
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示异常
|
plt.rcParams["axes.unicode_minus"] = False
|
||||||
|
|
||||||
# ===================== 【1. 全局可配置参数区】 =====================
|
# ===================== 【1. 全局业务固定参数(核心统计规则)】 =====================
|
||||||
# ZMQ 服务端配置
|
# ZMQ 服务端配置
|
||||||
ZMQ_SERVER_IP = "127.0.0.1"
|
ZMQ_SERVER_IP = "192.168.254.102"
|
||||||
ZMQ_SERVER_PORT = 8100
|
ZMQ_SERVER_PORT = 8100
|
||||||
ZMQ_SOCKET_TIMEOUT = 3000 # 套接字超时(ms)
|
ZMQ_SOCKET_TIMEOUT = 3000 # 套接字超时(ms)
|
||||||
POLL_TIMEOUT = 10 # Poll轮询超时(ms),不影响发包时序
|
POLL_TIMEOUT = 10 # Poll轮询超时(ms)
|
||||||
|
|
||||||
# 数据报文配置(严格对齐业务)
|
# 时序 & 统计核心规则(严格对齐现场业务)
|
||||||
PKG_SEND_SHAPE = (5, 66) # 发送包 shape (点数, 总通道)
|
SEND_INTERVAL = 0.02 # 上位机发包间隔:20ms/包
|
||||||
PKG_RECV_SHAPE = (50, 64) # 滤波回包 shape (点数, 脑电通道)
|
RECV_INTERVAL = 0.2 # 服务端回包间隔:200ms/包
|
||||||
SEND_INTERVAL = 0.02 # 上位机发包间隔 20ms
|
PREHEAT_SECONDS = 5.0 # 滤波缓存预热时长:5秒
|
||||||
SAMPLE_RATE = 250 # 采样率 Hz
|
# 计算:预热需要的发包总数 = 预热时长 / 单包发送间隔
|
||||||
|
PREHEAT_SEND_PACKS = int(PREHEAT_SECONDS / SEND_INTERVAL) # 5 / 0.02 = 250 包
|
||||||
|
# 收发包比例:每多少个发包对应1个回包
|
||||||
|
PACK_RATIO = int(RECV_INTERVAL / SEND_INTERVAL) # 0.2 / 0.02 = 10
|
||||||
|
|
||||||
# 通道定义
|
# 数据报文形状
|
||||||
CH_EEG = 64
|
PKG_SEND_SHAPE = (5, 66) # 发送包 (点数, 总通道)
|
||||||
|
PKG_RECV_SHAPE = (50, 64) # 回包 (点数, 有效脑电通道)
|
||||||
|
SAMPLE_RATE = 250
|
||||||
|
|
||||||
|
# 通道定义(对比仅使用前64路脑电通道)
|
||||||
|
CH_EEG_VALID = 64 # 共同对比通道数:0~63
|
||||||
CH_EVENT = 64
|
CH_EVENT = 64
|
||||||
CH_RESERVED = 65
|
CH_RESERVED = 65
|
||||||
|
|
||||||
# ZMQ 三帧报文固定字段(和你服务端代码完全一致)
|
# ZMQ 三帧报文固定字段
|
||||||
CLIENT_ID = b"test_client_001"
|
CLIENT_ID = b"test_client_001"
|
||||||
EMPTY_FRAME = b""
|
EMPTY_FRAME = b""
|
||||||
|
|
||||||
# 仿真信号配置(可自由调参测试滤波)
|
# 仿真信号配置
|
||||||
TARGET_CHANNEL = 0
|
TARGET_CHANNEL = 0
|
||||||
SIGNAL_FREQ_LIST = [10.0, 22.0]
|
SIGNAL_FREQ_LIST = [3, 10, 36]
|
||||||
SIGNAL_AMP = 1.8
|
SIGNAL_AMP = 1.8
|
||||||
NOISE_GAUSSIAN_AMP = 0.4
|
NOISE_GAUSSIAN_AMP = 0.4
|
||||||
NOISE_POWER50_AMP = 0.3
|
NOISE_POWER50_AMP = 0.3
|
||||||
@@ -64,21 +74,32 @@ MAX_RUN_SECONDS = None
|
|||||||
ENABLE_RECONNECT = True
|
ENABLE_RECONNECT = True
|
||||||
PRINT_STAT_INTERVAL = 5.0
|
PRINT_STAT_INTERVAL = 5.0
|
||||||
|
|
||||||
# ===================== 【2. 全局变量 & 线程安全】 =====================
|
# ===================== 【2. 全局变量 + 统计结构体(重构统计逻辑)】 =====================
|
||||||
g_running = threading.Event()
|
g_running = threading.Event()
|
||||||
g_running.set()
|
g_running.set()
|
||||||
data_lock = threading.Lock()
|
data_lock = threading.Lock()
|
||||||
|
|
||||||
# 绘图数据缓冲区
|
# 绘图缓冲区
|
||||||
raw_data_buf = deque(maxlen=MAX_PLOT_POINTS)
|
raw_data_buf = deque(maxlen=MAX_PLOT_POINTS)
|
||||||
filt_data_buf = deque(maxlen=MAX_PLOT_POINTS)
|
filt_data_buf = deque(maxlen=MAX_PLOT_POINTS)
|
||||||
|
|
||||||
# 运行统计
|
# ===================== 全新统计变量(区分预热/正式统计) =====================
|
||||||
stat = {
|
stat = {
|
||||||
"send_cnt": 0,
|
# 全局总包数(包含预热包)
|
||||||
"recv_cnt": 0,
|
"total_send": 0,
|
||||||
|
"total_recv": 0,
|
||||||
|
|
||||||
|
# 有效统计区间(预热250包之后)
|
||||||
|
"valid_send": 0, # 有效发包数
|
||||||
|
"valid_recv": 0, # 有效收包数
|
||||||
|
"theo_recv": 0, # 理论应收到包数 = valid_send // PACK_RATIO
|
||||||
|
|
||||||
|
# 运行时间
|
||||||
"start_time": time.perf_counter(),
|
"start_time": time.perf_counter(),
|
||||||
"last_print_time": time.perf_counter()
|
"last_print_time": time.perf_counter(),
|
||||||
|
|
||||||
|
# 数据校验缓存:保存最新一包原始64通道数据,用于和回包比对
|
||||||
|
"latest_raw_64ch": None
|
||||||
}
|
}
|
||||||
|
|
||||||
# ===================== 【3. 日志配置】 =====================
|
# ===================== 【3. 日志配置】 =====================
|
||||||
@@ -95,15 +116,15 @@ logger = init_logger()
|
|||||||
|
|
||||||
# ===================== 【4. 仿真脑电数据生成 (5,66)】 =====================
|
# ===================== 【4. 仿真脑电数据生成 (5,66)】 =====================
|
||||||
def generate_eeg_packet(pkt_idx: int) -> np.ndarray:
|
def generate_eeg_packet(pkt_idx: int) -> np.ndarray:
|
||||||
"""生成单包 (5,66) 仿真数据:脑电+噪声+工频+事件通道+保留通道"""
|
"""生成单包 (5,66) 仿真数据"""
|
||||||
n_point, n_chan = PKG_SEND_SHAPE
|
n_point, n_chan = PKG_SEND_SHAPE
|
||||||
base_t = pkt_idx * n_point / SAMPLE_RATE
|
base_t = pkt_idx * n_point / SAMPLE_RATE
|
||||||
t_arr = base_t + np.arange(n_point) / SAMPLE_RATE
|
t_arr = base_t + np.arange(n_point) / SAMPLE_RATE
|
||||||
|
|
||||||
data = np.zeros((n_point, n_chan), dtype=np.float64)
|
data = np.zeros((n_point, n_chan), dtype=np.float64)
|
||||||
|
|
||||||
# 64路脑电:多频信号 + 50Hz工频 + 高斯白噪声
|
# 64路脑电信号
|
||||||
for ch in range(CH_EEG):
|
for ch in range(CH_EEG_VALID):
|
||||||
sig = 0.0
|
sig = 0.0
|
||||||
for freq in SIGNAL_FREQ_LIST:
|
for freq in SIGNAL_FREQ_LIST:
|
||||||
sig += SIGNAL_AMP * np.sin(2 * np.pi * freq * t_arr)
|
sig += SIGNAL_AMP * np.sin(2 * np.pi * freq * t_arr)
|
||||||
@@ -111,58 +132,51 @@ def generate_eeg_packet(pkt_idx: int) -> np.ndarray:
|
|||||||
sig += NOISE_GAUSSIAN_AMP * np.random.randn(n_point)
|
sig += NOISE_GAUSSIAN_AMP * np.random.randn(n_point)
|
||||||
data[:, ch] = sig
|
data[:, ch] = sig
|
||||||
|
|
||||||
# 事件通道、保留通道赋值
|
# 事件通道、保留通道
|
||||||
data[:, CH_EVENT] = EVENT_LABEL_VAL
|
data[:, CH_EVENT] = EVENT_LABEL_VAL
|
||||||
data[:, CH_RESERVED] = RESERVED_VAL
|
data[:, CH_RESERVED] = RESERVED_VAL
|
||||||
return data
|
return data
|
||||||
|
|
||||||
# ===================== 【5. 核心修复:单DEALER连接 + Poller 同时收发】 =====================
|
# ===================== 【5. ZMQ 核心IO线程(单连接+Poller,保留原有通信逻辑)】 =====================
|
||||||
def zmq_io_thread():
|
def zmq_io_thread():
|
||||||
"""
|
|
||||||
唯一ZMQ工作线程:单个DEALER连接,同时发包+收包(对齐真实上位机)
|
|
||||||
使用 Poller 多路复用,避免阻塞、超时报错
|
|
||||||
"""
|
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
pkt_index = 0
|
pkt_index = 0
|
||||||
send_interval = SEND_INTERVAL
|
send_interval = SEND_INTERVAL
|
||||||
|
|
||||||
|
logger.info(f"滤波预热配置:{PREHEAT_SECONDS}秒 / {PREHEAT_SEND_PACKS} 个发包后开始统计")
|
||||||
|
logger.info(f"收发比例:每 {PACK_RATIO} 个发包 → 1 个滤波回包")
|
||||||
|
|
||||||
while g_running.is_set():
|
while g_running.is_set():
|
||||||
try:
|
try:
|
||||||
# 新建 DEALER 套接字(全局唯一连接)
|
|
||||||
sock = context.socket(zmq.DEALER)
|
sock = context.socket(zmq.DEALER)
|
||||||
sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT)
|
sock.setsockopt(zmq.RCVTIMEO, ZMQ_SOCKET_TIMEOUT)
|
||||||
sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT)
|
sock.setsockopt(zmq.SNDTIMEO, ZMQ_SOCKET_TIMEOUT)
|
||||||
sock.connect(f"tcp://{ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
sock.connect(f"tcp://{ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
||||||
logger.info(f"ZMQ 连接成功 -> {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
logger.info(f"ZMQ 连接成功 -> {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
||||||
|
|
||||||
# 注册Poller:监听当前套接字的可读事件
|
|
||||||
poller = zmq.Poller()
|
poller = zmq.Poller()
|
||||||
poller.register(sock, zmq.POLLIN)
|
poller.register(sock, zmq.POLLIN)
|
||||||
|
|
||||||
# 精准发包计时(消除sleep漂移)
|
|
||||||
next_send_ts = time.perf_counter()
|
next_send_ts = time.perf_counter()
|
||||||
|
|
||||||
while g_running.is_set():
|
while g_running.is_set():
|
||||||
# 1. 运行时长限制判断
|
# 全局运行时长限制
|
||||||
if MAX_RUN_SECONDS is not None:
|
if MAX_RUN_SECONDS is not None:
|
||||||
run_sec = time.perf_counter() - stat["start_time"]
|
run_sec = time.perf_counter() - stat["start_time"]
|
||||||
if run_sec > MAX_RUN_SECONDS:
|
if run_sec > MAX_RUN_SECONDS:
|
||||||
logger.info(f"已到达设定运行时长 {MAX_RUN_SECONDS}s,停止任务")
|
logger.info(f"已到达设定运行时长 {MAX_RUN_SECONDS}s,停止任务")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. Poll 轮询:有数据就接收,无数据继续执行发包逻辑
|
# ========== 1. 轮询接收服务端回包 ==========
|
||||||
socks_ready = dict(poller.poll(POLL_TIMEOUT))
|
socks_ready = dict(poller.poll(POLL_TIMEOUT))
|
||||||
if sock in socks_ready:
|
if sock in socks_ready:
|
||||||
# ========== 接收服务端回包 (multipart) ==========
|
|
||||||
frames = sock.recv_multipart()
|
frames = sock.recv_multipart()
|
||||||
if not frames:
|
if not frames:
|
||||||
continue
|
continue
|
||||||
# 取最后一帧为有效滤波数据
|
|
||||||
recv_bytes = frames[-1]
|
recv_bytes = frames[-1]
|
||||||
if not recv_bytes:
|
if not recv_bytes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 解析为 (50,64) float64
|
# 解析回包 (50,64)
|
||||||
filt_data = np.frombuffer(recv_bytes, dtype=np.float64)
|
filt_data = np.frombuffer(recv_bytes, dtype=np.float64)
|
||||||
expect_size = PKG_RECV_SHAPE[0] * PKG_RECV_SHAPE[1]
|
expect_size = PKG_RECV_SHAPE[0] * PKG_RECV_SHAPE[1]
|
||||||
if filt_data.size != expect_size:
|
if filt_data.size != expect_size:
|
||||||
@@ -170,42 +184,89 @@ def zmq_io_thread():
|
|||||||
continue
|
continue
|
||||||
filt_data = filt_data.reshape(PKG_RECV_SHAPE)
|
filt_data = filt_data.reshape(PKG_RECV_SHAPE)
|
||||||
|
|
||||||
# 统计 + 写入绘图缓冲区
|
# 全局收包计数
|
||||||
stat["recv_cnt"] += 1
|
stat["total_recv"] += 1
|
||||||
|
|
||||||
|
# 仅预热完成后,计入有效统计收包
|
||||||
|
if stat["total_send"] > PREHEAT_SEND_PACKS:
|
||||||
|
stat["valid_recv"] += 1
|
||||||
|
|
||||||
|
# 写入绘图缓冲区
|
||||||
with data_lock:
|
with data_lock:
|
||||||
filt_data_buf.extend(filt_data[:, TARGET_CHANNEL])
|
filt_data_buf.extend(filt_data[:, TARGET_CHANNEL])
|
||||||
|
|
||||||
# 定时打印运行状态
|
# ---------- 新增:64通道数据比对(发包前64通道 <-> 回包64通道) ----------
|
||||||
now = time.perf_counter()
|
raw_64ch = stat["latest_raw_64ch"]
|
||||||
if now - stat["last_print_time"] > PRINT_STAT_INTERVAL:
|
if raw_64ch is not None:
|
||||||
run_sec = now - stat["start_time"]
|
raw_mean = np.mean(raw_64ch)
|
||||||
loss_rate = (stat["send_cnt"] - stat["recv_cnt"]) / stat["send_cnt"] * 100 if stat["send_cnt"] > 0 else 0.0
|
filt_mean = np.mean(filt_data)
|
||||||
logger.info(
|
raw_amp = np.max(np.abs(raw_64ch))
|
||||||
f"运行:{run_sec:.1f}s | 发包:{stat['send_cnt']} | 收包:{stat['recv_cnt']} | 丢包率:{loss_rate:.2f}%"
|
filt_amp = np.max(np.abs(filt_data))
|
||||||
|
logger.debug(
|
||||||
|
f"【通道数据比对】原始64通道均值:{raw_mean:.4f} 幅值:{raw_amp:.4f} | "
|
||||||
|
f"滤波后均值:{filt_mean:.4f} 幅值:{filt_amp:.4f}"
|
||||||
)
|
)
|
||||||
stat["last_print_time"] = now
|
|
||||||
|
|
||||||
# 3. 精准定时发包(严格20ms间隔)
|
# ========== 2. 精准定时发送数据包 ==========
|
||||||
current_ts = time.perf_counter()
|
current_ts = time.perf_counter()
|
||||||
if current_ts >= next_send_ts:
|
if current_ts >= next_send_ts:
|
||||||
# 生成 (5,66) 仿真数据包
|
# 生成(5,66)仿真包
|
||||||
pkt_data = generate_eeg_packet(pkt_index)
|
pkt_data = generate_eeg_packet(pkt_index)
|
||||||
pkt_index += 1
|
pkt_index += 1
|
||||||
send_buf = pkt_data.tobytes()
|
send_buf = pkt_data.tobytes()
|
||||||
|
|
||||||
# ========== 三帧Multipart发送(和你服务端代码完全一致) ==========
|
# 标准三帧Multipart发送
|
||||||
sock.send_multipart([CLIENT_ID, EMPTY_FRAME, send_buf])
|
sock.send_multipart([CLIENT_ID, EMPTY_FRAME, send_buf])
|
||||||
|
|
||||||
# 统计 + 写入原始数据缓冲区
|
# ---------- 发包计数逻辑(核心优化:预热区分) ----------
|
||||||
stat["send_cnt"] += 1
|
stat["total_send"] += 1
|
||||||
|
# 预热完成后,计入有效发包
|
||||||
|
if stat["total_send"] > PREHEAT_SEND_PACKS:
|
||||||
|
stat["valid_send"] += 1
|
||||||
|
# 计算理论应收包数
|
||||||
|
stat["theo_recv"] = stat["valid_send"] // PACK_RATIO
|
||||||
|
|
||||||
|
# 缓存当前包前64通道,用于后续数据比对
|
||||||
|
stat["latest_raw_64ch"] = pkt_data[:, :CH_EEG_VALID]
|
||||||
|
|
||||||
|
# 绘图缓冲区(单通道波形)
|
||||||
with data_lock:
|
with data_lock:
|
||||||
raw_data_buf.extend(pkt_data[:, TARGET_CHANNEL])
|
raw_data_buf.extend(pkt_data[:, TARGET_CHANNEL])
|
||||||
|
|
||||||
# 更新下一次发包时间戳
|
# 更新下一次发包时间
|
||||||
next_send_ts += send_interval
|
next_send_ts += send_interval
|
||||||
|
|
||||||
|
# ========== 3. 定时打印统计信息(区分预热/正式统计) ==========
|
||||||
|
now = time.perf_counter()
|
||||||
|
if now - stat["last_print_time"] > PRINT_STAT_INTERVAL:
|
||||||
|
run_sec = now - stat["start_time"]
|
||||||
|
total_send = stat["total_send"]
|
||||||
|
total_recv = stat["total_recv"]
|
||||||
|
|
||||||
|
# 分支1:仍在预热阶段
|
||||||
|
if total_send <= PREHEAT_SEND_PACKS:
|
||||||
|
remain = PREHEAT_SEND_PACKS - total_send
|
||||||
|
logger.info(
|
||||||
|
f"[预热中] 运行:{run_sec:.1f}s | 已发包:{total_send}/{PREHEAT_SEND_PACKS} | "
|
||||||
|
f"剩余预热包:{remain} | 暂不统计丢包"
|
||||||
|
)
|
||||||
|
# 分支2:预热完成,进入正式统计
|
||||||
|
else:
|
||||||
|
v_send = stat["valid_send"]
|
||||||
|
v_recv = stat["valid_recv"]
|
||||||
|
t_recv = stat["theo_recv"]
|
||||||
|
loss_cnt = t_recv - v_recv
|
||||||
|
loss_rate = (loss_cnt / t_recv * 100) if t_recv > 0 else 0.0
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"[正式统计] 运行:{run_sec:.1f}s | "
|
||||||
|
f"全局总包: 发{total_send}/收{total_recv} | "
|
||||||
|
f"有效区间: 发{v_send}/应收{t_recv}/实收{v_recv} | "
|
||||||
|
f"丢包数:{loss_cnt} | 丢包率:{loss_rate:.2f}%"
|
||||||
|
)
|
||||||
|
stat["last_print_time"] = now
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
# 区分正常超时 和 网络异常
|
|
||||||
if e.errno == zmq.EAGAIN:
|
if e.errno == zmq.EAGAIN:
|
||||||
continue
|
continue
|
||||||
logger.warning(f"ZMQ 连接异常: {e}")
|
logger.warning(f"ZMQ 连接异常: {e}")
|
||||||
@@ -222,7 +283,7 @@ def zmq_io_thread():
|
|||||||
context.term()
|
context.term()
|
||||||
logger.info("ZMQ IO 线程已退出")
|
logger.info("ZMQ IO 线程已退出")
|
||||||
|
|
||||||
# ===================== 【6. 可视化绘图(无逻辑改动,已前置修复字体)】 =====================
|
# ===================== 【6. 可视化绘图(无改动)】 =====================
|
||||||
def init_plot():
|
def init_plot():
|
||||||
fig = plt.figure(figsize=(14, 9))
|
fig = plt.figure(figsize=(14, 9))
|
||||||
fig.suptitle(f"脑电滤波测试 | 观测通道: {TARGET_CHANNEL}", fontsize=14)
|
fig.suptitle(f"脑电滤波测试 | 观测通道: {TARGET_CHANNEL}", fontsize=14)
|
||||||
@@ -264,7 +325,6 @@ def update_plot(frame, lines, axes):
|
|||||||
raw_data = list(raw_data_buf)
|
raw_data = list(raw_data_buf)
|
||||||
filt_data = list(filt_data_buf)
|
filt_data = list(filt_data_buf)
|
||||||
|
|
||||||
# 时域波形
|
|
||||||
if raw_data:
|
if raw_data:
|
||||||
x_raw = np.arange(len(raw_data))
|
x_raw = np.arange(len(raw_data))
|
||||||
line_raw.set_data(x_raw, raw_data)
|
line_raw.set_data(x_raw, raw_data)
|
||||||
@@ -277,7 +337,6 @@ def update_plot(frame, lines, axes):
|
|||||||
ax2.relim()
|
ax2.relim()
|
||||||
ax2.autoscale_view()
|
ax2.autoscale_view()
|
||||||
|
|
||||||
# 频谱计算(汉宁窗减少频谱泄露)
|
|
||||||
def calc_fft(sig, n_fft):
|
def calc_fft(sig, n_fft):
|
||||||
if len(sig) < n_fft:
|
if len(sig) < n_fft:
|
||||||
return [], []
|
return [], []
|
||||||
@@ -300,7 +359,7 @@ def update_plot(frame, lines, axes):
|
|||||||
|
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
# ===================== 【7. 资源释放 & 主入口】 =====================
|
# ===================== 【7. 资源释放 & 最终汇总统计】 =====================
|
||||||
def clean_resource():
|
def clean_resource():
|
||||||
g_running.clear()
|
g_running.clear()
|
||||||
logger.info("开始停止所有线程...")
|
logger.info("开始停止所有线程...")
|
||||||
@@ -309,18 +368,19 @@ def clean_resource():
|
|||||||
logger.info("资源释放完成")
|
logger.info("资源释放完成")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
logger.info("=" * 60)
|
logger.info("=" * 70)
|
||||||
logger.info("脑电滤波测试客户端 【修复版】启动")
|
logger.info("脑电滤波测试客户端【统计逻辑优化版】启动")
|
||||||
logger.info(f"服务端地址: {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
logger.info(f"服务端地址: {ZMQ_SERVER_IP}:{ZMQ_SERVER_PORT}")
|
||||||
logger.info(f"发包格式: {PKG_SEND_SHAPE} | 间隔: {SEND_INTERVAL*1000:.0f}ms")
|
logger.info(f"发包: {PKG_SEND_SHAPE}({SEND_INTERVAL*1000:.0f}ms) | 回包: {PKG_RECV_SHAPE}({RECV_INTERVAL*1000:.0f}ms)")
|
||||||
logger.info(f"回包格式: {PKG_RECV_SHAPE} | ZMQ三帧报文 [客户端ID, 空帧, 数据帧]")
|
logger.info(f"预热规则: {PREHEAT_SECONDS}秒 / {PREHEAT_SEND_PACKS} 包后开启统计")
|
||||||
logger.info("=" * 60)
|
logger.info(f"收发比例: 每 {PACK_RATIO} 个发包对应 1 个回包")
|
||||||
|
logger.info("=" * 70)
|
||||||
|
|
||||||
# 启动唯一ZMQ收发线程
|
# 启动ZMQ收发线程
|
||||||
io_thread = threading.Thread(target=zmq_io_thread, daemon=True, name="ZMQ_IO_Thread")
|
io_thread = threading.Thread(target=zmq_io_thread, daemon=True, name="ZMQ_IO_Thread")
|
||||||
io_thread.start()
|
io_thread.start()
|
||||||
|
|
||||||
# 启动可视化绘图
|
# 启动可视化
|
||||||
fig, lines, axes = init_plot()
|
fig, lines, axes = init_plot()
|
||||||
ani = FuncAnimation(
|
ani = FuncAnimation(
|
||||||
fig, update_plot,
|
fig, update_plot,
|
||||||
@@ -330,20 +390,30 @@ def main():
|
|||||||
cache_frame_data=False
|
cache_frame_data=False
|
||||||
)
|
)
|
||||||
|
|
||||||
# 主线程阻塞,监听关闭
|
|
||||||
try:
|
try:
|
||||||
plt.show()
|
plt.show()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("收到 Ctrl+C 中断信号,准备退出")
|
logger.info("收到 Ctrl+C 中断信号,准备退出")
|
||||||
finally:
|
finally:
|
||||||
# 输出最终统计
|
# 输出最终完整汇总报表
|
||||||
run_total = time.perf_counter() - stat["start_time"]
|
run_total = time.perf_counter() - stat["start_time"]
|
||||||
loss_rate = (stat["send_cnt"] - stat["recv_cnt"]) / stat["send_cnt"] * 100 if stat["send_cnt"] > 0 else 0.0
|
total_send = stat["total_send"]
|
||||||
logger.info(f"\n===== 运行汇总 =====")
|
total_recv = stat["total_recv"]
|
||||||
|
v_send = stat["valid_send"]
|
||||||
|
v_recv = stat["valid_recv"]
|
||||||
|
t_recv = stat["theo_recv"]
|
||||||
|
|
||||||
|
loss_cnt = t_recv - v_recv
|
||||||
|
loss_rate = (loss_cnt / t_recv * 100) if t_recv > 0 else 0.0
|
||||||
|
|
||||||
|
logger.info(f"\n{'='*50} 最终运行汇总 {'='*50}")
|
||||||
logger.info(f"总运行时长: {run_total:.1f} s")
|
logger.info(f"总运行时长: {run_total:.1f} s")
|
||||||
logger.info(f"总发包数: {stat['send_cnt']}")
|
logger.info(f"【全局总包数】发送: {total_send} | 接收: {total_recv}")
|
||||||
logger.info(f"总收包数: {stat['recv_cnt']}")
|
logger.info(f"【有效统计区间(跳过预热{PREHEAT_SEND_PACKS}包)】")
|
||||||
logger.info(f"整体丢包率: {loss_rate:.2f} %")
|
logger.info(f" 有效发包: {v_send} | 理论应收包: {t_recv} | 实际收包: {v_recv}")
|
||||||
|
logger.info(f" 总丢包数: {loss_cnt} | 整体丢包率: {loss_rate:.2f} %")
|
||||||
|
logger.info(f"{'='*106}")
|
||||||
|
|
||||||
clean_resource()
|
clean_resource()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user