diff --git a/datamock.py b/datamock.py new file mode 100644 index 0000000..e81ebbc --- /dev/null +++ b/datamock.py @@ -0,0 +1,138 @@ +import zmq +import numpy as np +import time +from datetime import datetime + +# ========== 参数配置 ========== +FS = 250 # 采样率 Hz +N_SAMPLES_PER_PKT = 5 # 每包采样点数 +N_CHAN = 66 # 通道数: 64 EEG + 1 标签值 + 1 标签序号 +EEG_FREQ = 10 # EEG 正弦波频率 Hz +EEG_AMP = 100e-6 # EEG 幅值 100μV +LABEL_INTERVAL = 5 # 标签间隔秒数 +SERVER_ADDR = 'tcp://127.0.0.1:8100' + +# 发送间隔: 每包 5 采样点 / 250Hz = 20ms +PKT_INTERVAL = N_SAMPLES_PER_PKT / FS + + +def build_packet(global_sample_idx): + """ + 生成一包 [5, 66] 的 float32 数据 + :param global_sample_idx: 当前包第一个采样点在全局序列中的索引 (从 0 开始) + :return: np.ndarray shape [5, 66] + """ + # 当前包内 5 个采样点对应的时间(秒) + t = (global_sample_idx + np.arange(N_SAMPLES_PER_PKT)) / FS + + # Ch0-63: EEG 10Hz 正弦波,幅值 100μV + # t shape [5,],sin 乘以标量后仍是 [5,],需要 reshape 为 [5,1] 再广播到 64 通道 + eeg = (EEG_AMP * np.sin(2 * np.pi * EEG_FREQ * t)).reshape(N_SAMPLES_PER_PKT, 1) # [5, 1] + eeg = np.tile(eeg, (1, 64)) # [5, 64] + + # Ch64: 标签值通道,初始化为 0 + event = np.zeros((N_SAMPLES_PER_PKT, 1), dtype=np.float32) + + # Ch65: 标签序号通道,初始化为 0 + label_idx = np.zeros((N_SAMPLES_PER_PKT, 1), dtype=np.float32) + + # 拼成 [5, 66] + packet = np.concatenate([eeg, event, label_idx], axis=1).astype(np.float32) + return packet + + +def should_send_label(global_sample_idx): + """ + 判断当前包是否包含标签触发点(每 5s 的最后一个采样点) + 采样点索引从 0 开始,每 5s = 1250 个采样点 + 最后一个采样点索引: 1249, 2499, 3749, ... + 由于每包 5 个采样点,标签点落在包内的最后一个采样点位置 + 即当前包起始索引 global_sample_idx 必须使得: + global_sample_idx <= 标签点索引 < global_sample_idx + N_SAMPLES_PER_PKT + 也就是 global_sample_idx <= 1249 < global_sample_idx + 5 + 即 global_sample_idx = 1245, 2495, 3745, ... + 即 global_sample_idx = n * LABEL_INTERVAL * FS - N_SAMPLES_PER_PKT + """ + samples_per_interval = LABEL_INTERVAL * FS + # 检查当前包是否包含 interval 的最后一个采样点 + # 标签点索引 = n * 1250 - 1,当 global_sample_idx = n*1250-5 时,标签在包内索引 4 + return (global_sample_idx + N_SAMPLES_PER_PKT - 1) % samples_per_interval == samples_per_interval - 1 + + +def main(): + ctx = zmq.Context() + sock = ctx.socket(zmq.DEALER) + sock.connect(SERVER_ADDR) + print(f"[{datetime.now().strftime('%H:%M:%S')}] ZMQ Dealer 连接到 {SERVER_ADDR}") + + global_sample_idx = 0 # 全局采样点计数器 + label_type = 1 # 当前标签类型: 1 或 2 + label1_count = 0 # label=1 的序号计数器 + label2_count = 0 # label=2 的序号计数器 + packet_count = 0 # 已发送包数 + + print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始发送模拟数据 ...") + print(f" 采样率: {FS}Hz | 每包 {N_SAMPLES_PER_PKT} 采样点 | 发送间隔 {PKT_INTERVAL*1000:.0f}ms") + print(f" EEG: {EEG_FREQ}Hz 正弦波 | 幅值 {EEG_AMP*1e6}μV") + print(f" 标签: 每 {LABEL_INTERVAL}s 末尾采样点触发 | label 1/2 交替") + print("-" * 50) + + try: + while True: + t_start = time.perf_counter() + + # 构建当前包 + packet = build_packet(global_sample_idx) + + # 检查是否需要放置标签 + if should_send_label(global_sample_idx): + if label_type == 1: + label1_count += 1 + label_value = 1 + label_number = label1_count + else: + label2_count += 1 + label_value = 2 + label_number = label2_count + + # 标签放在当前包最后一个采样点(索引 4) + packet[4, 64] = label_value + packet[4, 65] = label_number + + ts = datetime.now().strftime('%H:%M:%S') + print(f"[{ts}] 标签触发: label={label_value}, 序号={label_number} " + f"(global_sample_idx={global_sample_idx})") + + # 交替标签类型 + label_type = 2 if label_type == 1 else 1 + + # 发送: multipart 3帧 [identity, '', data] + sock.send_multipart([ + b'datamock', + b'', + packet.tobytes() + ]) + + # 每 50 包打印一次进度 + if packet_count % 50 == 0: + ts = datetime.now().strftime('%H:%M:%S') + print(f"[{ts}] 已发送 {packet_count} 包 (global_sample_idx={global_sample_idx})") + + global_sample_idx += N_SAMPLES_PER_PKT + packet_count += 1 + + # 精确控制发送节奏: 等待到 PKT_INTERVAL 秒 + elapsed = time.perf_counter() - t_start + sleep_time = PKT_INTERVAL - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + except KeyboardInterrupt: + print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 停止发送,共发送 {packet_count} 包") + finally: + sock.close() + ctx.term() + + +if __name__ == '__main__': + main()