This commit is contained in:
Ivey Song
2026-06-09 19:30:27 +08:00
parent 7b5f4f6eb9
commit a9dbe7261b
5 changed files with 363 additions and 30 deletions

304
verify_datamock.py Normal file
View File

@@ -0,0 +1,304 @@
"""
datamock 验证脚本(模拟算法端)
作为 ZMQ ROUTER 监听 8100 端口,等待 datamock.py 连接并验证数据流
运行顺序:
第一步: python verify_datamock.py (先启动,监听 8100)
第二步: python datamock.py (后启动,连接 8100)
"""
import zmq
import numpy as np
import time
import sys
import matplotlib
matplotlib.use('TkAgg')
# 在导入 pyplot 之前确保 Tkinter 正确初始化
try:
import tkinter as tk
root = tk.Tk()
root.withdraw() # 隐藏主窗口,我们只需要它的事件循环
except Exception as e:
print(f"[WARN] Tkinter 初始化警告: {e}")
import matplotlib.pyplot as plt
from datetime import datetime
# ===== 可视化参数 =====
PLOT_WINDOW_SEC = 2.0 # 滑动窗口时长(秒)
PLOT_CHANNELS = [0, 1, 2, 3] # 要显示的 EEG 通道索引
SERVER_ADDR = 'tcp://127.0.0.1:8100'
FS = 250
N_SAMPLES_PER_PKT = 5
N_CHAN = 66
EEG_FREQ = 10
EEG_AMP = 100.0 # EEG 幅值 100μV峰值
EEG_AMP_MEAN = EEG_AMP * 2 / np.pi # 正弦波 |mean| ≈ 63.7μV
EEG_AMP_TOLERANCE = 1.5 # 幅值容差倍数
LABEL_INTERVAL = 5
FFT_SAMPLES = 250 # 做一次 FFT 需要的采样点数1s数据
EXPECTED_BYTES = N_SAMPLES_PER_PKT * N_CHAN * 4 # 1320 bytes (5*66*4)
def validate_fft(samples):
"""对 Ch0 数据做 FFT返回峰值频率"""
freqs = np.fft.rfftfreq(FFT_SAMPLES, d=1 / FS)
fft_mag = np.abs(np.fft.rfft(samples))
peak_idx = np.argmax(fft_mag[1:]) + 1 # 跳过 DC
return freqs[peak_idx], fft_mag, freqs
def main():
ctx = zmq.Context()
sock = ctx.socket(zmq.ROUTER)
sock.bind(SERVER_ADDR)
print(f"[{datetime.now().strftime('%H:%M:%S')}] ZMQ ROUTER 绑定 {SERVER_ADDR},等待 datamock.py 连接...\n")
# ===== 初始化交互式绘图 =====
plt.ion() # 开启交互模式
fig = plt.figure(figsize=(14, 10))
fig.suptitle('EEG Data Monitor (Real-time)', fontsize=14)
# 使用 GridSpec 进行布局
from matplotlib.gridspec import GridSpec
gs = GridSpec(len(PLOT_CHANNELS) + 2, 1, figure=fig, hspace=0.3)
axes = []
lines_eeg = []
for i, ch in enumerate(PLOT_CHANNELS):
ax = fig.add_subplot(gs[i])
axes.append(ax)
ax.set_ylabel(f'Ch{ch} (μV)', fontsize=8)
ax.grid(True, alpha=0.3)
ax.set_ylim(-150, 150)
line, = ax.plot([], [], lw=0.8)
lines_eeg.append(line)
ax.set_title(f'EEG Channel {ch}', fontsize=9)
# 标签通道子图 (Ch64 - 标签值)
ax_label = fig.add_subplot(gs[len(PLOT_CHANNELS)])
axes.append(ax_label)
ax_label.set_ylabel('Label Value', fontsize=8)
ax_label.grid(True, alpha=0.3)
ax_label.set_ylim(-0.5, 2.5)
line_label, = ax_label.plot([], [], 'ro-', lw=1.5, markersize=4)
line_label_data = line_label
ax_label.set_title('Ch64 - Label Value', fontsize=9)
# Ch65 标签序号子图
ax_seq = fig.add_subplot(gs[len(PLOT_CHANNELS) + 1])
axes.append(ax_seq)
ax_seq.set_ylabel('Label Seq', fontsize=8)
ax_seq.set_xlabel('Time (samples)', fontsize=8)
ax_seq.grid(True, alpha=0.3)
ax_seq.set_ylim(-0.5, 10)
line_seq, = ax_seq.plot([], [], 'gs-', lw=1.5, markersize=4)
line_seq_data = line_seq
ax_seq.set_title('Ch65 - Label Sequence', fontsize=9)
plt.tight_layout()
# ===== 状态 =====
global_idx = 0 # 全局采样点索引
label_events = [] # 捕获的标签事件
start_time = None
fft_done = False
fft_buffer = [] # 暂存前 250 点做 FFT
ch64_zero_ok = True # 验证 Ch64 非标签采样点均为 0
ch65_zero_ok = True # 验证 Ch65 非标签采样点均为 0
label_pos_ok_all = True # 验证标签均在包内索引 4
# ===== 数据缓冲区 =====
max_samples = int(FS * PLOT_WINDOW_SEC)
eeg_buffer = {ch: np.zeros(max_samples) for ch in PLOT_CHANNELS}
label_buffer = np.zeros(max_samples)
seq_buffer = np.zeros(max_samples)
time_axis = np.arange(max_samples)
# ZMQ 收发统计
recv_count = 0
try:
# 首次 pause 用于显示窗口
plt.pause(0.5)
print(f"[INFO] 交互窗口已显示,如未看到请检查任务栏")
while True:
# ROUTER recv: prepended 一个 identity 帧
# datamock 发送 3帧 [b'datamock', b'', data_bytes]
# ROUTER 接收后变成 4帧 [router_identity, b'datamock', b'', data_bytes]
frames = sock.recv_multipart()
recv_count += 1
now = time.time()
if start_time is None:
start_time = now
# 帧格式: [router_identity, b'datamock', b'', data_bytes]
router_id = frames[0] # ROUTER 添加的身份帧
identity = frames[1] # 发送端的 identity
_empty = frames[2] # 空帧
raw_data = frames[3] # 实际数据字节
# 数据长度校验
if len(raw_data) != EXPECTED_BYTES:
print(f"[ERROR] 数据长度错误: 期望{EXPECTED_BYTES}字节, 实际{len(raw_data)}字节")
continue
# 解析为 [5, 66] float32 数组
packet = np.frombuffer(raw_data, dtype=np.float32).reshape(N_SAMPLES_PER_PKT, N_CHAN)
elapsed = now - start_time
# ===== 验证 1: 数据形状 =====
if recv_count == 1:
shape_ok = packet.shape == (N_SAMPLES_PER_PKT, N_CHAN)
print(f"[{'' if shape_ok else ''}] 数据形状: {packet.shape} "
f"(期望 [{N_SAMPLES_PER_PKT}, {N_CHAN}])")
if not shape_ok:
print(f" ✗ 形状不匹配,退出")
break
# ===== 验证 2: EEG 幅值(首包) =====
if recv_count == 1:
eeg = packet[:, :64]
amp_mean = np.mean(np.abs(eeg))
amp_ok = amp_mean <= EEG_AMP_MEAN * EEG_AMP_TOLERANCE
print(f"[{'' if amp_ok else ''}] EEG 幅值: 均值={amp_mean:.2f}μV "
f"(期望 ~{EEG_AMP_MEAN:.2f}μV峰值 ~{EEG_AMP:.2f}μV)")
if not amp_ok:
print(f" ✗ 幅值超出容差范围")
# ===== 验证 3: EEG 频率(首秒数据收集满后做 FFT =====
fft_buffer.append(packet[:, 0].copy()) # 收集 Ch0
if not fft_done and len(fft_buffer) * N_SAMPLES_PER_PKT >= FFT_SAMPLES:
# 凑够 250 点,做 FFT
all_ch0 = np.concatenate(fft_buffer)[:FFT_SAMPLES]
peak_freq, fft_mag, freqs = validate_fft(all_ch0)
freq_ok = abs(peak_freq - EEG_FREQ) < 1.0
print(f"[{'' if freq_ok else ''}] EEG 频率: 峰值={peak_freq:.1f}Hz "
f"(期望 ~{EEG_FREQ}Hz)")
print(f" FFT 幅度谱前 5 峰值:")
top5 = np.argsort(fft_mag[1:])[-5:][::-1] + 1
for rank, idx in enumerate(top5):
print(f" {rank+1}. {freqs[idx]:.1f}Hz 幅度={fft_mag[idx]:.1f}")
print()
fft_done = True
# ===== 验证 4: 标签通道Ch64/Ch65 =====
ch64 = packet[:, 64]
ch65 = packet[:, 65]
ch64_nonzero = np.where(ch64 != 0)[0]
ch65_nonzero = np.where(ch65 != 0)[0]
# 检查非标签采样点是否全为 0
ch64_zeros = np.all(ch64[:4] == 0)
ch65_zeros = np.all(ch65[:4] == 0)
ch64_zero_ok = ch64_zero_ok and ch64_zeros
ch65_zero_ok = ch65_zero_ok and ch65_zeros
if len(ch64_nonzero) > 0:
pos_in_pkt = int(ch64_nonzero[0])
label_val = int(ch64[pos_in_pkt])
label_seq = int(ch65[pos_in_pkt])
pos_ok = (len(ch64_nonzero) == 1 and pos_in_pkt == 4)
label_pos_ok_all = label_pos_ok_all and pos_ok
elapsed_since_start = now - start_time
print(f"[✓] 标签触发 @ {elapsed_since_start:.1f}s "
f"(global_idx={global_idx}{recv_count})")
print(f" Ch64 标签值: {label_val} Ch65 序号: {label_seq}")
print(f" 包内位置: 采样点 {pos_in_pkt}/4 "
f"({'' if pos_ok else '✗ 期望 4'}) "
f"其余采样点 Ch64=0: {'' if ch64_zeros else ''} "
f"Ch65=0: {'' if ch65_zeros else ''}")
print()
label_events.append({
'time': elapsed_since_start,
'label': label_val,
'seq': label_seq
})
global_idx += N_SAMPLES_PER_PKT
# ===== 更新绘图缓冲区 =====
for ch_idx, ch in enumerate(PLOT_CHANNELS):
eeg_buffer[ch] = np.roll(eeg_buffer[ch], -N_SAMPLES_PER_PKT)
eeg_buffer[ch][-N_SAMPLES_PER_PKT:] = packet[:, ch]
label_buffer = np.roll(label_buffer, -N_SAMPLES_PER_PKT)
label_buffer[-N_SAMPLES_PER_PKT:] = packet[:, 64]
seq_buffer = np.roll(seq_buffer, -N_SAMPLES_PER_PKT)
seq_buffer[-N_SAMPLES_PER_PKT:] = packet[:, 65]
# ===== 实时更新绘图 =====
for i, ch in enumerate(PLOT_CHANNELS):
lines_eeg[i].set_data(time_axis, eeg_buffer[ch]) # 数据已是 μV 单位
line_label_data.set_data(time_axis, label_buffer)
line_seq_data.set_data(time_axis, seq_buffer)
# 设置 x 轴范围
for ax in axes:
ax.set_xlim(0, max_samples)
# 刷新图形(交互模式)
fig.canvas.draw_idle()
plt.pause(0.001)
except KeyboardInterrupt:
print("\n" + "=" * 55)
print(" 验证结果汇总")
print("=" * 55)
print(f" 运行时长: {time.time() - start_time:.1f}s")
print(f" 收到包数: {recv_count}")
print(f" FFT 验证: {'✓ 已完成' if fft_done else '✗ 未完成时长不足1s'}")
print(f" 非标签采样点 Ch64=0: {'' if ch64_zero_ok else ''}")
print(f" 非标签采样点 Ch65=0: {'' if ch65_zero_ok else ''}")
print(f" 标签均在包内位置4: {'' if label_pos_ok_all else ''}")
if label_events:
print(f"\n 共捕获 {len(label_events)} 次标签事件:")
for i, ev in enumerate(label_events):
print(f" {i+1}. t={ev['time']:.1f}s label={ev['label']} 序号={ev['seq']}")
# 标签间隔
print(f"\n 标签间隔验证 (期望 ~{LABEL_INTERVAL}s):")
for i in range(1, len(label_events)):
dt = label_events[i]['time'] - label_events[i-1]['time']
ok = abs(dt - LABEL_INTERVAL) < 0.1
print(f" {i}->{i+1}: {dt:.2f}s {'' if ok else ''}")
# 标签交替
labels = [e['label'] for e in label_events]
alt_ok = all(labels[i] != labels[i+1] for i in range(len(labels) - 1))
print(f"\n 标签交替: {labels} {'✓ 交替正确' if alt_ok else '✗ 交替错误'}")
# 序号
label1_seqs = [e['seq'] for e in label_events if e['label'] == 1]
label2_seqs = [e['seq'] for e in label_events if e['label'] == 2]
s1_ok = label1_seqs == list(range(1, len(label1_seqs) + 1))
s2_ok = label2_seqs == list(range(1, len(label2_seqs) + 1))
print(f" label=1 序号: {label1_seqs} {'' if s1_ok else ''}")
print(f" label=2 序号: {label2_seqs} {'' if s2_ok else ''}")
else:
print(f"\n 未捕获标签事件(运行时长不足 {LABEL_INTERVAL}s")
print("=" * 55)
finally:
sock.close()
ctx.term()
plt.ioff()
plt.close('all')
try:
root.destroy()
except:
pass
if __name__ == '__main__':
main()