diff --git a/XYParser/XYEegParserCommon.h b/XYParser/XYEegParserCommon.h index 90c6487..026802a 100644 --- a/XYParser/XYEegParserCommon.h +++ b/XYParser/XYEegParserCommon.h @@ -31,6 +31,11 @@ struct XYEegFrame { std::int16_t ecg = 0; std::int16_t blood_oxygen = 0; std::array reserved{}; + std::uint8_t impedance_enabled = 0; + std::uint8_t current_gain = 0; + std::uint16_t current_sample_rate_hz = 0; + std::uint8_t cap_type = 0; + std::uint8_t gnd_detached = 0; std::array, kSamplesPerFrame> samples{}; std::uint8_t crc = 0; std::array tails{}; @@ -158,6 +163,22 @@ private: (static_cast(bytes[offset + 3]) << 24); } + static std::uint16_t DecodeSampleRateHz(std::uint8_t sample_rate_code) noexcept + { + switch (sample_rate_code) { + case 0: + return 250; + case 1: + return 500; + case 2: + return 1000; + case 3: + return 2000; + default: + return 0; + } + } + double ConvertAdcToUv(const std::uint8_t raw0, const std::uint8_t raw1, const std::uint8_t raw2) const noexcept { double value = static_cast(raw0) * 65536.0 + @@ -234,6 +255,11 @@ private: for (std::size_t i = 0; i < frame.reserved.size(); ++i) { frame.reserved[i] = frame_bytes[offset++]; } + frame.impedance_enabled = static_cast(frame.reserved[0] & 0x01U); + frame.current_gain = frame.reserved[1]; + frame.current_sample_rate_hz = DecodeSampleRateHz(frame.reserved[2]); + frame.cap_type = frame.reserved[3]; + frame.gnd_detached = static_cast(frame.reserved[4] != 0 ? 1 : 0); for (auto &sample : frame.samples) { for (std::size_t channel = 0; channel < ChannelCount; ++channel) { diff --git a/XYParser/XYParser.sln b/XYParser/XYParser.sln index 626301d..7e3e480 100644 --- a/XYParser/XYParser.sln +++ b/XYParser/XYParser.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.14.36429.23 d17.14 @@ -7,6 +7,12 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "XYParser", "XYParser.vcxpro EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "XYParserTests", "XYParserTests\XYParserTests.vcxproj", "{5DF8A9AF-BEC9-4B14-A744-7E5638408CFD}" EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "XYParser64Demo", "XYParserWorkflowDemo\XYParser64Demo.vcxproj", "{A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}" +EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "XYParser8Demo", "XYParserWorkflowDemo\XYParser8Demo.vcxproj", "{1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}" +EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "XYAlgorithmUdpServer", "XYParserWorkflowDemo\XYAlgorithmUdpServer.vcxproj", "{6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -31,6 +37,30 @@ Global {5DF8A9AF-BEC9-4B14-A744-7E5638408CFD}.Release|x64.Build.0 = Release|x64 {5DF8A9AF-BEC9-4B14-A744-7E5638408CFD}.Release|x86.ActiveCfg = Release|Win32 {5DF8A9AF-BEC9-4B14-A744-7E5638408CFD}.Release|x86.Build.0 = Release|Win32 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Debug|x64.ActiveCfg = Debug|x64 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Debug|x64.Build.0 = Debug|x64 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Debug|x86.ActiveCfg = Debug|Win32 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Debug|x86.Build.0 = Debug|Win32 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Release|x64.ActiveCfg = Release|x64 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Release|x64.Build.0 = Release|x64 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Release|x86.ActiveCfg = Release|Win32 + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101}.Release|x86.Build.0 = Release|Win32 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Debug|x64.ActiveCfg = Debug|x64 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Debug|x64.Build.0 = Debug|x64 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Debug|x86.ActiveCfg = Debug|Win32 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Debug|x86.Build.0 = Debug|Win32 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Release|x64.ActiveCfg = Release|x64 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Release|x64.Build.0 = Release|x64 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Release|x86.ActiveCfg = Release|Win32 + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107}.Release|x86.Build.0 = Release|Win32 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Debug|x64.ActiveCfg = Debug|x64 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Debug|x64.Build.0 = Debug|x64 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Debug|x86.ActiveCfg = Debug|Win32 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Debug|x86.Build.0 = Debug|Win32 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Release|x64.ActiveCfg = Release|x64 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Release|x64.Build.0 = Release|x64 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Release|x86.ActiveCfg = Release|Win32 + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61}.Release|x86.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/XYParser/XYParserApi.cpp b/XYParser/XYParserApi.cpp index c993f68..bbc2714 100644 --- a/XYParser/XYParserApi.cpp +++ b/XYParser/XYParserApi.cpp @@ -18,7 +18,8 @@ namespace { constexpr std::uint8_t kCommandFrameHeader = 0xAA; constexpr std::uint8_t kCommandFrameTail = 0x55; constexpr std::size_t k8ChImpedanceCommandSize = 7; -constexpr std::size_t kTriggerCommandSize = 3; +constexpr std::size_t kTriggerPayloadSize = 3; +constexpr std::size_t kTriggerCommandSize = 1 + kTriggerPayloadSize + 1 + 2; constexpr int k8ChLeadCount = 8; constexpr int k64ChLeadCount = 64; constexpr std::uint8_t kAlgorithmChannelCount = 64; @@ -66,7 +67,14 @@ bool IsSupportedTriggerType(std::uint8_t trigger_type) std::array BuildTriggerCommand(std::uint8_t trigger_type) { - return {0x00, 0x00, trigger_type}; + const std::array payload = {0x00, 0x00, trigger_type}; + return {kCommandFrameHeader, + payload[0], + payload[1], + payload[2], + CalculateChecksum(payload.data(), payload.size()), + kCommandFrameTail, + kCommandFrameTail}; } constexpr std::array k8ChLeadMap = { @@ -125,6 +133,11 @@ void FillSummary(const XYEegFrame8& frame, XYParserFrameSummary& summary) summary.channel_count = frame.channel_count; summary.battery = frame.battery; summary.sample_count = static_cast(frame.samples.size()); + summary.impedance_enabled = frame.impedance_enabled; + summary.current_gain = frame.current_gain; + summary.current_sample_rate_hz = frame.current_sample_rate_hz; + summary.cap_type = frame.cap_type; + summary.gnd_detached = frame.gnd_detached; for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { summary.sample_trigger_types[sample_index] = frame.samples[sample_index].trigger_type; summary.sample_trigger_indices[sample_index] = frame.samples[sample_index].trigger_index; @@ -143,6 +156,11 @@ void FillSummary(const XYEegFrame64& frame, XYParserFrameSummary& summary) summary.channel_count = frame.channel_count; summary.battery = frame.battery; summary.sample_count = static_cast(frame.samples.size()); + summary.impedance_enabled = frame.impedance_enabled; + summary.current_gain = frame.current_gain; + summary.current_sample_rate_hz = frame.current_sample_rate_hz; + summary.cap_type = frame.cap_type; + summary.gnd_detached = frame.gnd_detached; for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { summary.sample_trigger_types[sample_index] = frame.samples[sample_index].trigger_type; summary.sample_trigger_indices[sample_index] = frame.samples[sample_index].trigger_index; @@ -163,6 +181,11 @@ void Convert8ChSummaryTo64ChSummary(const XYParserFrameSummary& input_summary, output_summary.channel_count = 64; output_summary.battery = input_summary.battery; output_summary.sample_count = input_summary.sample_count; + output_summary.impedance_enabled = input_summary.impedance_enabled; + output_summary.current_gain = input_summary.current_gain; + output_summary.current_sample_rate_hz = input_summary.current_sample_rate_hz; + output_summary.cap_type = input_summary.cap_type; + output_summary.gnd_detached = input_summary.gnd_detached; for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { output_summary.sample_trigger_types[sample_index] = input_summary.sample_trigger_types[sample_index]; diff --git a/XYParser/XYParserApi.h b/XYParser/XYParserApi.h index 5d6c663..76b113f 100644 --- a/XYParser/XYParserApi.h +++ b/XYParser/XYParserApi.h @@ -109,6 +109,16 @@ struct XYParserFrameSummary { std::uint8_t channel_count; std::uint8_t battery; std::uint8_t sample_count; + // 0=正常采集,1=阻抗检测中。 + std::uint8_t impedance_enabled; + // 当前设备增益值,例如 1/2/3/4/6/8/12/24。 + std::uint8_t current_gain; + // 当前实际采样率(Hz),例如 250/500/1000/2000,不是协议枚举值。 + std::uint16_t current_sample_rate_hz; + // 当前脑电帽类型,保留设备上报的原始值。 + std::uint8_t cap_type; + // GND 脱落状态,0=正常,1=脱落。 + std::uint8_t gnd_detached; double channel_values_uv[XYPARSER_SAMPLES_PER_FRAME][XYPARSER_MAX_CHANNELS]; std::uint8_t sample_trigger_types[XYPARSER_SAMPLES_PER_FRAME]; std::uint8_t sample_trigger_indices[XYPARSER_SAMPLES_PER_FRAME]; @@ -224,7 +234,8 @@ XYPARSER_API int XYParser_Serialize8ChImpedanceCommand(int open, XYPARSER_API std::size_t XYParser_GetTriggerCommandSize(void); // Serialize a trigger command. -// Packet layout matches WirelessEEG TriggerEventStruct: 2 reserved bytes + 1 trigger byte. +// Packet layout matches sendCmd(): 0xAA + TriggerEventStruct payload + checksum(payload) + 0x55 + 0x55. +// Current payload layout follows WirelessEEG TriggerEventStruct: 2 reserved bytes + 1 trigger byte. // @param trigger_type Trigger code to send. Prefer values from XYParserTriggerType. // @param out_command Output buffer for serialized bytes. // @param command_size Size of output buffer in bytes. diff --git a/XYParser/XYParserTests/Tests.cpp b/XYParser/XYParserTests/Tests.cpp index 657bb0d..0651986 100644 --- a/XYParser/XYParserTests/Tests.cpp +++ b/XYParser/XYParserTests/Tests.cpp @@ -47,7 +47,9 @@ private: /// @param channel_count 通道数量 /// @param frame_index 帧索引,小端写入标签区前 4 字节 /// @return 包含完整帧数据的字节向量 -std::vector BuildMinimalFrame(std::uint8_t channel_count, std::uint32_t frame_index) +std::vector BuildMinimalFrame(std::uint8_t channel_count, + std::uint32_t frame_index, + std::array reserved = {}) { constexpr std::size_t kSamplesPerFrame = 5; ///< 每帧采样数 constexpr std::uint8_t kHeader = 0xAA; ///< 帧头标记 @@ -79,8 +81,11 @@ std::vector BuildMinimalFrame(std::uint8_t channel_count, std::uin frame[offset++] = 95; frame[offset++] = channel_count; - // 跳过保留字段 - offset += 2 + 2 + 2 + 2 + 2 + 6; + // 写入姿态、生理量和保留字段 + offset += 2 + 2 + 2 + 2 + 2; + for (std::uint8_t value : reserved) { + frame[offset++] = value; + } // 写入采样数据 for (std::size_t sample = 0; sample < kSamplesPerFrame; ++sample) { @@ -128,7 +133,8 @@ void WriteSigned24(std::vector& frame, std::size_t& offset, int ra std::vector BuildFrameWithRawSamples( std::uint8_t channel_count, std::uint32_t frame_index, - const std::array, XYPARSER_SAMPLES_PER_FRAME>& raw_samples) + const std::array, XYPARSER_SAMPLES_PER_FRAME>& raw_samples, + std::array reserved = {}) { constexpr std::uint8_t kHeader = 0xAA; constexpr std::uint8_t kTail = 0x55; @@ -149,7 +155,10 @@ std::vector BuildFrameWithRawSamples( frame[offset++] = static_cast(payload_length & 0xFF); frame[offset++] = 95; frame[offset++] = channel_count; - offset += 2 + 2 + 2 + 2 + 2 + 6; + offset += 2 + 2 + 2 + 2 + 2; + for (std::uint8_t value : reserved) { + frame[offset++] = value; + } for (std::size_t sample = 0; sample < XYPARSER_SAMPLES_PER_FRAME; ++sample) { for (std::size_t channel = 0; channel < channel_count; ++channel) { @@ -306,6 +315,13 @@ int FindPeakPsdIndex(const XYParserWelchSummary& summary, XYParserLeadChannelNum return static_cast(peak_index); } +double ExpectedBinCenteredHanningPeakPsd(double peak_amplitude_uv) +{ + // For a 1-second window where sample_rate == n_per_seg, the periodic Hann window used by + // the implementation yields a one-sided PSD peak of A^2 / 3 for a bin-centered sine wave. + return (peak_amplitude_uv * peak_amplitude_uv) / 3.0; +} + std::uint16_t MeasureLeadImpedanceForMixedSine(std::uint8_t channel_count, XYParserLeadChannelNumber lead, int sample_rate, @@ -429,7 +445,7 @@ TEST(XYParserApiTests, SerializeTriggerCommandMatchesWirelessEegPacket) command.size()); ASSERT_EQ(command_size, static_cast(XYParser_GetTriggerCommandSize())); - const std::array expected = {0x00, 0x00, 0xBB}; + const std::array expected = {0xAA, 0x00, 0x00, 0xBB, 0xBB, 0x55, 0x55}; EXPECT_TRUE(std::equal(expected.begin(), expected.end(), command.begin())); } @@ -442,7 +458,7 @@ TEST(XYParserApiTests, SerializeTrain1TriggerCommandMatchesWirelessEegPacket) command.size()); ASSERT_EQ(command_size, static_cast(XYParser_GetTriggerCommandSize())); - const std::array expected = {0x00, 0x00, 0xBC}; + const std::array expected = {0xAA, 0x00, 0x00, 0xBC, 0xBC, 0x55, 0x55}; EXPECT_TRUE(std::equal(expected.begin(), expected.end(), command.begin())); } @@ -454,7 +470,7 @@ TEST(XYParserApiTests, SerializeTriggerCommandRejectsUnsupportedTriggerType) TEST(XYParserApiTests, SerializeTriggerCommandRejectsSmallBuffer) { - std::array command{}; + std::array command{}; EXPECT_EQ(XYParser_SerializeTriggerCommand(XYPARSER_TRIGGER_TRAIN_1, command.data(), command.size()), 0); } @@ -510,6 +526,11 @@ TEST(XYParserApiTests, Convert8ChFramesTo64ChMapsKnownLeadsAndPadsOthersWithZero input[0].channel_count = 8U; input[0].battery = 88U; input[0].sample_count = 5U; + input[0].impedance_enabled = 1U; + input[0].current_gain = 24U; + input[0].current_sample_rate_hz = 1000U; + input[0].cap_type = 7U; + input[0].gnd_detached = 1U; input[0].sample_trigger_types[0] = 0xB2; input[0].sample_trigger_indices[0] = 3U; input[0].channel_values_uv[0][0] = 11.0; @@ -533,6 +554,11 @@ TEST(XYParserApiTests, Convert8ChFramesTo64ChMapsKnownLeadsAndPadsOthersWithZero EXPECT_EQ(output[0].channel_count, 64U); EXPECT_EQ(output[0].battery, 88U); EXPECT_EQ(output[0].sample_count, 5U); + EXPECT_EQ(output[0].impedance_enabled, 1U); + EXPECT_EQ(output[0].current_gain, 24U); + EXPECT_EQ(output[0].current_sample_rate_hz, 1000U); + EXPECT_EQ(output[0].cap_type, 7U); + EXPECT_EQ(output[0].gnd_detached, 1U); EXPECT_EQ(output[0].sample_trigger_types[0], 0xB2); EXPECT_EQ(output[0].sample_trigger_indices[0], 3U); EXPECT_DOUBLE_EQ(output[0].channel_values_uv[0][LeadChannel_PO5], 11.0); @@ -1474,6 +1500,50 @@ TEST(XYParserApiTests, WelchPsdIncreasesWithSignalAmplitude) low_welch[0].band_values[2][LeadChannel_FP1]); } +TEST(XYParserApiTests, WelchAbsolutePsdMatches10HzSineAt100UvPeakToPeak) +{ + ParserGuard parser(XYParser_CreateParser(64)); + ASSERT_NE(parser.get(), nullptr); + + constexpr int kSampleRate = 250; + constexpr double kFrequencyHz = 10.0; + constexpr double kPeakToPeakUv = 100.0; + constexpr double kPeakAmplitudeUv = kPeakToPeakUv / 2.0; + constexpr double kExpectedPeakPsd = 833.3333333333334; + + XYParser_SetSampleRate(parser.get(), kSampleRate); + XYParser_SetWelchDetection(parser.get(), 1); + + const std::vector algorithm_data = + BuildAlgorithmDataForSingleChannel(kSampleRate, kFrequencyHz, kPeakAmplitudeUv); + + std::array welch{}; + EXPECT_EQ(XYParser_FeedAlgorithmData( + parser.get(), + reinterpret_cast(algorithm_data.data()), + algorithm_data.size() * sizeof(double), + nullptr, + 0), + 0); + + ASSERT_EQ(XYParser_ReadWelch(parser.get(), welch.data(), static_cast(welch.size())), 1); + ASSERT_EQ(welch[0].ok, 1); + + const int peak_index = FindPeakPsdIndex(welch[0], LeadChannel_FP1); + const int frequency_10hz_index = FindFrequencyIndex(welch[0], kFrequencyHz); + ASSERT_GE(peak_index, 0); + ASSERT_GE(frequency_10hz_index, 0); + + EXPECT_DOUBLE_EQ(welch[0].frequencies[static_cast(peak_index)], kFrequencyHz); + EXPECT_DOUBLE_EQ(welch[0].frequencies[static_cast(frequency_10hz_index)], kFrequencyHz); + + const double peak_psd = welch[0].psd_values[LeadChannel_FP1][static_cast(frequency_10hz_index)]; + EXPECT_NEAR(peak_psd, + ExpectedBinCenteredHanningPeakPsd(kPeakAmplitudeUv), + kExpectedPeakPsd * 0.1); + EXPECT_GT(welch[0].band_values[2][LeadChannel_FP1], welch[0].band_values[4][LeadChannel_FP1]); +} + TEST(XYParserApiTests, ReadWelchConsumesQueuedResults) { ParserGuard parser(XYParser_CreateParser(64)); @@ -1637,6 +1707,31 @@ TEST(XYParserApiTests, FeedParsesAComplete8ChannelFrame) EXPECT_EQ(summaries[0].sample_trigger_indices[0], 0U); // 触发索引应为 0 } +TEST(XYParserApiTests, FeedParsesReservedMetadataInto8ChannelSummary) +{ + ParserGuard parser(XYParser_CreateParser(8)); + ASSERT_NE(parser.get(), nullptr); + + XYParser_SetBypassChecksum(parser.get(), 1); + + const std::array reserved = {0x01, 24, 2, 9, 1, 0x5A}; + const std::vector bytes = BuildMinimalFrame(8, 1U, reserved); + std::array summaries{}; + + ASSERT_EQ(XYParser_Feed( + parser.get(), + bytes.data(), + bytes.size(), + summaries.data(), + static_cast(summaries.size())), + 1); + EXPECT_EQ(summaries[0].impedance_enabled, 1U); + EXPECT_EQ(summaries[0].current_gain, 24U); + EXPECT_EQ(summaries[0].current_sample_rate_hz, 1000U); + EXPECT_EQ(summaries[0].cap_type, 9U); + EXPECT_EQ(summaries[0].gnd_detached, 1U); +} + /// 测试:Feed 函数能缓冲部分数据直到完整帧可用 TEST(XYParserApiTests, FeedBuffersPartialDataUntilAFullFrameIsAvailable) { @@ -1866,6 +1961,31 @@ TEST(XYParserApiTests, FeedParses64ChannelFrame) EXPECT_EQ(summaries[0].channel_count, 64U); } +TEST(XYParserApiTests, FeedParsesReservedMetadataInto64ChannelSummary) +{ + ParserGuard parser(XYParser_CreateParser(64)); + ASSERT_NE(parser.get(), nullptr); + + XYParser_SetBypassChecksum(parser.get(), 1); + + const std::array reserved = {0x00, 6, 1, 4, 0, 0x00}; + const std::vector bytes = BuildMinimalFrame(64, 1U, reserved); + std::array summaries{}; + + ASSERT_EQ(XYParser_Feed( + parser.get(), + bytes.data(), + bytes.size(), + summaries.data(), + static_cast(summaries.size())), + 1); + EXPECT_EQ(summaries[0].impedance_enabled, 0U); + EXPECT_EQ(summaries[0].current_gain, 6U); + EXPECT_EQ(summaries[0].current_sample_rate_hz, 500U); + EXPECT_EQ(summaries[0].cap_type, 4U); + EXPECT_EQ(summaries[0].gnd_detached, 0U); +} + /// 测试:连续解析多个帧 TEST(XYParserApiTests, FeedParsesMultipleFrames) { diff --git a/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp b/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp new file mode 100644 index 0000000..dbd0d85 --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp @@ -0,0 +1,853 @@ +#define NOMINMAX +#include +#include +#include +#include + +#include "../XYParserApi.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +constexpr wchar_t kProgramName[] = L"XYAlgorithmUdpServer"; +constexpr wchar_t kDefaultListenHost[] = L"127.0.0.1"; +constexpr int kDefaultListenPort = 8100; +constexpr int kDefaultReceiveTimeoutMs = 1000; +constexpr int kDefaultSampleRate = 250; +constexpr double kDefaultHighPassHz = 0.5; +constexpr double kDefaultLowPassHz = 45.0; +constexpr double kDefaultNotchHz = 50.0; +constexpr double kDefaultNotchQ = 30.0; +constexpr double kDefaultClipUv = 5000.0; + +enum class ResponseMode { + Preprocess, + Echo, + Zero +}; + +struct ServerOptions { + std::wstring listen_host = kDefaultListenHost; + int listen_port = kDefaultListenPort; + int receive_timeout_ms = kDefaultReceiveTimeoutMs; + int sample_rate = kDefaultSampleRate; + double high_pass_hz = kDefaultHighPassHz; + double low_pass_hz = kDefaultLowPassHz; + double notch_hz = kDefaultNotchHz; + double notch_q = kDefaultNotchQ; + double clip_uv = kDefaultClipUv; + ResponseMode response_mode = ResponseMode::Preprocess; +}; + +class WinsockRuntime { +public: + WinsockRuntime() + { + WSADATA wsa_data{}; + ok_ = (WSAStartup(MAKEWORD(2, 2), &wsa_data) == 0); + } + + ~WinsockRuntime() + { + if (ok_) { + WSACleanup(); + } + } + + bool ok() const { return ok_; } + +private: + bool ok_ = false; +}; + +bool ParseInt(const wchar_t* text, int& value) +{ + if (text == nullptr || text[0] == L'\0') { + return false; + } + + wchar_t* end = nullptr; + const long parsed = std::wcstol(text, &end, 10); + if (end == text || *end != L'\0') { + return false; + } + + value = static_cast(parsed); + return true; +} + +bool ParseDouble(const wchar_t* text, double& value) +{ + if (text == nullptr || text[0] == L'\0') { + return false; + } + + wchar_t* end = nullptr; + value = std::wcstod(text, &end); + return end != text && *end == L'\0'; +} + +bool EqualsIgnoreCase(const std::wstring& left, const wchar_t* right) +{ + return _wcsicmp(left.c_str(), right) == 0; +} + +bool ParseResponseMode(const wchar_t* text, ResponseMode& mode) +{ + if (text == nullptr) { + return false; + } + + const std::wstring value(text); + if (EqualsIgnoreCase(value, L"preprocess")) { + mode = ResponseMode::Preprocess; + return true; + } + if (EqualsIgnoreCase(value, L"echo")) { + mode = ResponseMode::Echo; + return true; + } + if (EqualsIgnoreCase(value, L"zero")) { + mode = ResponseMode::Zero; + return true; + } + return false; +} + +std::string Narrow(const std::wstring& value) +{ + if (value.empty()) { + return std::string(); + } + + const int size = WideCharToMultiByte(CP_UTF8, 0, value.c_str(), -1, nullptr, 0, nullptr, nullptr); + if (size <= 0) { + return std::string(); + } + + std::string converted(static_cast(size - 1), '\0'); + WideCharToMultiByte(CP_UTF8, 0, value.c_str(), -1, converted.data(), size, nullptr, nullptr); + return converted; +} + +std::string BuildZmqTcpEndpoint(const std::wstring& host, int port) +{ + return "tcp://" + Narrow(host) + ':' + std::to_string(port); +} + +void PrintUsage() +{ + std::wcout << L"Usage: " << kProgramName << L" [options]\n" + << L" --listen-host 127.0.0.1\n" + << L" --listen-port 8100\n" + << L" --receive-timeout-ms 1000\n" + << L" --sample-rate 250\n" + << L" --high-pass-hz 0.5\n" + << L" --low-pass-hz 45\n" + << L" --notch-hz 50\n" + << L" --notch-q 30\n" + << L" --clip-uv 5000\n" + << L" --mode preprocess|echo|zero\n"; +} + +bool ParseArguments(int argc, wchar_t* argv[], ServerOptions& options) +{ + for (int i = 1; i < argc; ++i) { + const std::wstring arg(argv[i]); + if (arg == L"--listen-host" && i + 1 < argc) { + options.listen_host = argv[++i]; + } else if (arg == L"--listen-port" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.listen_port)) { + return false; + } + } else if (arg == L"--receive-timeout-ms" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.receive_timeout_ms)) { + return false; + } + } else if (arg == L"--sample-rate" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.sample_rate)) { + return false; + } + } else if (arg == L"--high-pass-hz" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.high_pass_hz)) { + return false; + } + } else if (arg == L"--low-pass-hz" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.low_pass_hz)) { + return false; + } + } else if (arg == L"--notch-hz" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.notch_hz)) { + return false; + } + } else if (arg == L"--notch-q" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.notch_q)) { + return false; + } + } else if (arg == L"--clip-uv" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.clip_uv)) { + return false; + } + } else if (arg == L"--mode" && i + 1 < argc) { + if (!ParseResponseMode(argv[++i], options.response_mode)) { + return false; + } + } else if (arg == L"--help" || arg == L"-h") { + return false; + } else { + return false; + } + } + + return options.listen_port > 0 && + options.receive_timeout_ms >= 0 && + options.sample_rate > 0 && + options.high_pass_hz >= 0.0 && + options.low_pass_hz > 0.0 && + options.notch_hz >= 0.0 && + options.notch_q > 0.0 && + options.clip_uv > 0.0 && + options.high_pass_hz < options.low_pass_hz && + options.low_pass_hz < (static_cast(options.sample_rate) * 0.5); +} + +class DcBlocker { +public: + void Configure(double sample_rate_hz, double high_pass_hz) + { + if (high_pass_hz <= 0.0 || sample_rate_hz <= 0.0) { + enabled_ = false; + x_prev_ = 0.0; + y_prev_ = 0.0; + return; + } + + const double normalized = 2.0 * 3.14159265358979323846 * high_pass_hz / sample_rate_hz; + feedback_ = std::exp(-normalized); + enabled_ = true; + x_prev_ = 0.0; + y_prev_ = 0.0; + } + + double Process(double value) + { + if (!enabled_) { + return value; + } + + const double output = value - x_prev_ + feedback_ * y_prev_; + x_prev_ = value; + y_prev_ = output; + return output; + } + +private: + bool enabled_ = false; + double feedback_ = 0.0; + double x_prev_ = 0.0; + double y_prev_ = 0.0; +}; + +class OnePoleLowPass { +public: + void Configure(double sample_rate_hz, double cutoff_hz) + { + if (cutoff_hz <= 0.0 || sample_rate_hz <= 0.0) { + enabled_ = false; + y_prev_ = 0.0; + return; + } + + const double dt = 1.0 / sample_rate_hz; + const double rc = 1.0 / (2.0 * 3.14159265358979323846 * cutoff_hz); + alpha_ = dt / (rc + dt); + enabled_ = true; + initialized_ = false; + y_prev_ = 0.0; + } + + double Process(double value) + { + if (!enabled_) { + return value; + } + + if (!initialized_) { + y_prev_ = value; + initialized_ = true; + return value; + } + + y_prev_ += alpha_ * (value - y_prev_); + return y_prev_; + } + +private: + bool enabled_ = false; + bool initialized_ = false; + double alpha_ = 0.0; + double y_prev_ = 0.0; +}; + +class NotchBiquad { +public: + void Configure(double sample_rate_hz, double notch_hz, double q) + { + if (sample_rate_hz <= 0.0 || notch_hz <= 0.0 || q <= 0.0 || + notch_hz >= (sample_rate_hz * 0.5)) { + enabled_ = false; + ResetState(); + return; + } + + const double omega = 2.0 * 3.14159265358979323846 * notch_hz / sample_rate_hz; + const double alpha = std::sin(omega) / (2.0 * q); + const double cos_omega = std::cos(omega); + const double a0 = 1.0 + alpha; + + b0_ = 1.0 / a0; + b1_ = (-2.0 * cos_omega) / a0; + b2_ = 1.0 / a0; + a1_ = (-2.0 * cos_omega) / a0; + a2_ = (1.0 - alpha) / a0; + enabled_ = true; + ResetState(); + } + + double Process(double value) + { + if (!enabled_) { + return value; + } + + const double output = b0_ * value + b1_ * x1_ + b2_ * x2_ - a1_ * y1_ - a2_ * y2_; + x2_ = x1_; + x1_ = value; + y2_ = y1_; + y1_ = output; + return output; + } + +private: + void ResetState() + { + x1_ = 0.0; + x2_ = 0.0; + y1_ = 0.0; + y2_ = 0.0; + } + + bool enabled_ = false; + double b0_ = 1.0; + double b1_ = 0.0; + double b2_ = 0.0; + double a1_ = 0.0; + double a2_ = 0.0; + double x1_ = 0.0; + double x2_ = 0.0; + double y1_ = 0.0; + double y2_ = 0.0; +}; + +class ChannelPreprocessor { +public: + void Configure(const ServerOptions& options) + { + const double sample_rate = static_cast(options.sample_rate); + dc_blocker_.Configure(sample_rate, options.high_pass_hz); + low_pass_.Configure(sample_rate, options.low_pass_hz); + notch_.Configure(sample_rate, options.notch_hz, options.notch_q); + clip_uv_ = options.clip_uv; + } + + double Process(double value) + { + double processed = dc_blocker_.Process(value); + processed = notch_.Process(processed); + processed = low_pass_.Process(processed); + return std::clamp(processed, -clip_uv_, clip_uv_); + } + +private: + DcBlocker dc_blocker_; + OnePoleLowPass low_pass_; + NotchBiquad notch_; + double clip_uv_ = kDefaultClipUv; +}; + +class AlgorithmProcessor { +public: + void Configure(const ServerOptions& options) + { + for (auto& channel : channels_) { + channel.Configure(options); + } + } + + std::size_t BuildResponsePayload(const std::uint8_t* request, + std::size_t request_size, + ResponseMode mode, + std::vector& response) + { + response.resize(request_size); + if (mode == ResponseMode::Echo) { + std::memcpy(response.data(), request, request_size); + return request_size; + } + if (mode == ResponseMode::Zero) { + std::fill(response.begin(), response.end(), 0); + return request_size; + } + + if ((request_size % sizeof(double)) != 0) { + response.clear(); + return 0; + } + + const std::size_t value_count = request_size / sizeof(double); + const auto* input_values = reinterpret_cast(request); + auto* output_values = reinterpret_cast(response.data()); + + for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { + const std::size_t sample_offset = + sample_index * static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); + for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) { + const std::size_t value_index = sample_offset + channel_index; + if (value_index >= value_count) { + response.clear(); + return 0; + } + output_values[value_index] = channels_[channel_index].Process(input_values[value_index]); + } + + const std::size_t trigger_type_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX; + const std::size_t trigger_index_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX; + if (trigger_index_index >= value_count) { + response.clear(); + return 0; + } + output_values[trigger_type_index] = input_values[trigger_type_index]; + output_values[trigger_index_index] = input_values[trigger_index_index]; + } + + return request_size; + } + +private: + std::array channels_{}; +}; + +const char* DescribeTriggerType(int trigger_type) +{ + switch (trigger_type) { + case XYPARSER_TRIGGER_NONE: + return "NONE"; + case XYPARSER_TRIGGER_TRAIN_0: + return "TRAIN_0"; + case XYPARSER_TRIGGER_TRAIN_1: + return "TRAIN_1"; + default: + return "UNKNOWN"; + } +} + +const char* GetLeadName(std::size_t channel_index) +{ + static constexpr const char* kLeadNames[XYPARSER_MAX_CHANNELS] = { + "FP1", "FP2", "PO6", "POZ", "F3", "F4", "FPZ", "AF4", + "FC3", "PO8", "CP2", "CP1", "FCZ", "PO5", "FC2", "FC1", + "C3", "C4", "FC4", "CP4", "P3", "P4", "F5", "C5", + "F6", "PO4", "CP6", "CP5", "PO3", "CP3", "FC6", "FC5", + "CB1", "CB2", "P5", "AF7", "A1", "T7", "FT7", "TP7", + "FT8", "AF8", "F8", "F7", "P6", "C6", "O2", "O1", + "T8", "P7", "CZ", "PZ", "P8", "FZ", "OZ", "PO7", + "TP8", "AF3", "C2", "C1", "P2", "P1", "F2", "F1" + }; + if (channel_index >= std::size(kLeadNames)) { + return "lead?"; + } + return kLeadNames[channel_index]; +} + +void PrintActiveChannelSamples(const double* values, + std::size_t value_count, + const std::string& remote_endpoint, + std::uint64_t packet_count) +{ + for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { + const std::size_t sample_offset = + sample_index * static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); + const std::size_t trigger_type_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX; + const std::size_t trigger_index_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX; + if (trigger_index_index >= value_count) { + return; + } + + std::ostringstream channel_stream; + int active_channel_count = 0; + for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) { + const std::size_t value_index = sample_offset + channel_index; + if (value_index >= value_count) { + return; + } + + const double value_uv = values[value_index]; + if (value_uv == 0.0) { + continue; + } + + if (active_channel_count > 0) { + channel_stream << " | "; + } + channel_stream << GetLeadName(channel_index) + << '=' << value_uv << "uV"; + ++active_channel_count; + } + + if (active_channel_count == 0) { + continue; + } + + const int trigger_type = static_cast(std::llround(values[trigger_type_index])); + const int trigger_index = static_cast(std::llround(values[trigger_index_index])); + std::cout << "[AlgorithmZmqServer][ActiveSample]" + << " packet=" << packet_count + << " sample=" << sample_index + << " remote=" << remote_endpoint + << " trigger=" << trigger_type + << '(' << DescribeTriggerType(trigger_type) << ')' + << " triggerIndex=" << trigger_index + << " channels=" << channel_stream.str() + << std::endl; + } +} + +class TriggerChangeLogger { +public: + void ObservePayload(const double* values, + std::size_t value_count, + const std::string& remote_endpoint, + std::uint64_t packet_count) + { + for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { + const std::size_t sample_offset = + sample_index * static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); + const std::size_t trigger_type_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX; + const std::size_t trigger_index_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX; + if (trigger_index_index >= value_count) { + return; + } + + const int trigger_type = static_cast(std::llround(values[trigger_type_index])); + const int trigger_index = static_cast(std::llround(values[trigger_index_index])); + if (!has_last_trigger_ || + trigger_type != last_trigger_type_ || + trigger_index != last_trigger_index_) { + has_last_trigger_ = true; + last_trigger_type_ = trigger_type; + last_trigger_index_ = trigger_index; + std::cout << "[AlgorithmZmqServer][Trigger] packet=" << packet_count + << " sample=" << sample_index + << " remote=" << remote_endpoint + << " type=" << trigger_type + << '(' << DescribeTriggerType(trigger_type) << ')' + << " index=" << trigger_index + << std::endl; + } + } + } + +private: + bool has_last_trigger_ = false; + int last_trigger_type_ = 0; + int last_trigger_index_ = 0; +}; + +class ZmqAlgorithmServer { +public: + ~ZmqAlgorithmServer() { Close(); } + + bool Open(const ServerOptions& options) + { + Close(); + last_error_text_.clear(); + last_client_identity_.clear(); + local_endpoint_ = BuildZmqTcpEndpoint(options.listen_host, options.listen_port); + + context_ = zmq_ctx_new(); + if (context_ == nullptr) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + socket_ = zmq_socket(context_, ZMQ_ROUTER); + if (socket_ == nullptr) { + last_error_text_ = zmq_strerror(zmq_errno()); + Close(); + return false; + } + + const int linger_ms = 0; + zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)); + zmq_setsockopt(socket_, ZMQ_RCVTIMEO, &options.receive_timeout_ms, sizeof(options.receive_timeout_ms)); + zmq_setsockopt(socket_, ZMQ_SNDTIMEO, &options.receive_timeout_ms, sizeof(options.receive_timeout_ms)); + + if (zmq_bind(socket_, local_endpoint_.c_str()) != 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + Close(); + return false; + } + + return true; + } + + int Receive(std::uint8_t* buffer, int capacity, std::vector& client_identity, bool& timed_out) + { + timed_out = false; + client_identity.clear(); + if (socket_ == nullptr) { + last_error_text_ = "socket not open"; + return -1; + } + + std::vector identity_frame; + bool has_more = false; + if (!ReceiveFrame(identity_frame, has_more)) { + timed_out = (last_error_text_ == "Resource temporarily unavailable"); + return timed_out ? 0 : -1; + } + if (!has_more) { + last_error_text_ = "missing payload frame"; + return -1; + } + + std::vector payload_frame; + if (!ReceiveFrame(payload_frame, has_more)) { + timed_out = (last_error_text_ == "Resource temporarily unavailable"); + return timed_out ? 0 : -1; + } + if (has_more) { + last_error_text_ = "unexpected multipart payload"; + return -1; + } + if (payload_frame.size() > static_cast(capacity)) { + last_error_text_ = "payload too large"; + return -1; + } + + std::memcpy(buffer, payload_frame.data(), payload_frame.size()); + client_identity = std::move(identity_frame); + last_client_identity_ = DescribeClientIdentity(client_identity); + last_error_text_.clear(); + return static_cast(payload_frame.size()); + } + + bool Send(const std::uint8_t* data, int size, const std::vector& client_identity) + { + if (socket_ == nullptr) { + last_error_text_ = "socket not open"; + return false; + } + + const int sent_identity = zmq_send(socket_, + client_identity.data(), + client_identity.size(), + ZMQ_SNDMORE); + if (sent_identity != static_cast(client_identity.size())) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + const int sent_payload = zmq_send(socket_, data, size, 0); + if (sent_payload != size) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + last_error_text_.clear(); + return true; + } + + const std::string& last_error() const { return last_error_text_; } + const std::string& local_endpoint() const { return local_endpoint_; } + const std::string& last_client_identity() const { return last_client_identity_; } + +private: + bool ReceiveFrame(std::vector& frame, bool& has_more) + { + zmq_msg_t message; + zmq_msg_init(&message); + const int received = zmq_msg_recv(&message, socket_, 0); + if (received < 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + zmq_msg_close(&message); + return false; + } + + const auto* data = static_cast(zmq_msg_data(&message)); + const std::size_t size = zmq_msg_size(&message); + frame.assign(data, data + size); + + int more = 0; + size_t more_size = sizeof(more); + zmq_getsockopt(socket_, ZMQ_RCVMORE, &more, &more_size); + has_more = (more != 0); + zmq_msg_close(&message); + return true; + } + + std::string DescribeClientIdentity(const std::vector& client_identity) const + { + return std::string(reinterpret_cast(client_identity.data()), client_identity.size()); + } + + void Close() + { + if (socket_ != nullptr) { + zmq_close(socket_); + socket_ = nullptr; + } + if (context_ != nullptr) { + zmq_ctx_term(context_); + context_ = nullptr; + } + last_client_identity_.clear(); + } + + void* context_ = nullptr; + void* socket_ = nullptr; + std::string last_error_text_; + std::string local_endpoint_; + std::string last_client_identity_; +}; + +int RunServer(const ServerOptions& options) +{ + ZmqAlgorithmServer server; + if (!server.Open(options)) { + std::cerr << "Open algorithm ZMQ server failed" + << " endpoint=" << BuildZmqTcpEndpoint(options.listen_host, options.listen_port) + << " zmqError=" << server.last_error() + << std::endl; + return 1; + } + + constexpr std::size_t expected_payload_bytes = + XYPARSER_FRAME_ALGORITHM_VALUE_COUNT * sizeof(double); + std::array request_buffer{}; + std::vector response_buffer; + AlgorithmProcessor processor; + TriggerChangeLogger trigger_logger; + processor.Configure(options); + std::uint64_t packet_count = 0; + std::uint64_t invalid_payload_count = 0; + + std::cout << "Algorithm ZMQ server listening" + << " local=" << server.local_endpoint() + << " payloadBytes=" << expected_payload_bytes + << " payloadDoubles=" << XYPARSER_FRAME_ALGORITHM_VALUE_COUNT + << " mode=" + << (options.response_mode == ResponseMode::Preprocess + ? "preprocess" + : (options.response_mode == ResponseMode::Echo ? "echo" : "zero")) + << " sampleRate=" << options.sample_rate + << " highPassHz=" << options.high_pass_hz + << " lowPassHz=" << options.low_pass_hz + << " notchHz=" << options.notch_hz + << " notchQ=" << options.notch_q + << " clipUv=" << options.clip_uv + << std::endl; + + while (true) { + std::vector client_identity; + bool timed_out = false; + const int received = server.Receive(request_buffer.data(), + static_cast(request_buffer.size()), + client_identity, + timed_out); + if (timed_out) { + continue; + } + if (received < 0) { + std::cerr << "Receive algorithm ZMQ payload failed" + << " zmqError=" << server.last_error() + << std::endl; + continue; + } + + ++packet_count; + const std::string remote_endpoint = server.last_client_identity(); + if (static_cast(received) != expected_payload_bytes) { + ++invalid_payload_count; + std::cerr << "Unexpected algorithm ZMQ payload size" + << " remote=" << remote_endpoint + << " actualBytes=" << received + << " expectedBytes=" << expected_payload_bytes + << " invalidPayloads=" << invalid_payload_count + << std::endl; + continue; + } + + PrintActiveChannelSamples(reinterpret_cast(request_buffer.data()), + static_cast(received) / sizeof(double), + remote_endpoint, + packet_count); + trigger_logger.ObservePayload(reinterpret_cast(request_buffer.data()), + static_cast(received) / sizeof(double), + remote_endpoint, + packet_count); + + const std::size_t response_size = processor.BuildResponsePayload(request_buffer.data(), + static_cast(received), + options.response_mode, + response_buffer); + if (response_size == 0) { + ++invalid_payload_count; + std::cerr << "Build algorithm ZMQ response failed" + << " remote=" << remote_endpoint + << " actualBytes=" << received + << " invalidPayloads=" << invalid_payload_count + << std::endl; + continue; + } + if (!server.Send(response_buffer.data(), static_cast(response_size), client_identity)) { + std::cerr << "Send algorithm ZMQ payload failed" + << " remote=" << remote_endpoint + << " zmqError=" << server.last_error() + << std::endl; + continue; + } + + if (packet_count <= 3 || (packet_count % 100) == 0) { + std::cout << "[AlgorithmZmqServer] packets=" << packet_count + << " remote=" << remote_endpoint + << " bytes=" << received + << std::endl; + } + } +} + +} // namespace + +int wmain(int argc, wchar_t* argv[]) +{ + ServerOptions options{}; + if (!ParseArguments(argc, argv, options)) { + PrintUsage(); + return 1; + } + + return RunServer(options); +} diff --git a/XYParser/XYParserWorkflowDemo/README.md b/XYParser/XYParserWorkflowDemo/README.md new file mode 100644 index 0000000..1be60a1 --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/README.md @@ -0,0 +1,137 @@ +# XYParser64Demo / XYParser8Demo + +业务流程 demo,可直接连你的设备模拟器验证: + +- `XYParser64Demo`:TCP 接收 64 导数据,可额外打开一个串口发送 `TRAIN_0 / TRAIN_1` +- `XYParser8Demo`:串口接收 8 导数据 +- `XYAlgorithmUdpServer`:算法 ZMQ 服务端,接收 demo 发来的算法输入并回包 +- Welch/PSD 固定走 ZMQ: + - `Feed -> ConvertSampleFramesToAlgorithmData -> ZMQ 发给算法 -> 收算法 ZMQ 回包 -> FeedAlgorithmData -> ReadWelch` +- demo 会打印 ZMQ 发送/接收包数、字节数,以及回包长度校验结果,方便看数据完整性 + +## 算法 ZMQ 服务端 + +```powershell +.\x64\Debug\XYAlgorithmUdpServer.exe +``` + +默认: + +- 绑定 `tcp://127.0.0.1:8100` +- 固定接收 `XYPARSER_FRAME_ALGORITHM_VALUE_COUNT * sizeof(double)` 字节 +- 默认 `preprocess` 模式:按通道做去直流、50Hz 陷波、低通和平幅裁剪后回包 +- 默认采样率 `250Hz` +- 默认高通 `0.5Hz` +- 默认低通 `45Hz` +- 默认工频陷波 `50Hz` +- 默认裁剪幅值 `5000uV` +- 可用 `echo` 模式:原样回包给发送方 +- 可用 `zero` 模式:回全 0 的同长度 double 数组 + +常用覆盖参数: + +- `--listen-host 127.0.0.1` +- `--listen-port 8100` +- `--receive-timeout-ms 1000` +- `--sample-rate 250` +- `--high-pass-hz 0.5` +- `--low-pass-hz 45` +- `--notch-hz 50` +- `--notch-q 30` +- `--clip-uv 5000` +- `--mode preprocess` + +联调顺序: + +- 先启动 `XYAlgorithmUdpServer` +- 再启动 `XYParser64Demo` 或 `XYParser8Demo` +- 如果 demo 端 `rxPackets` 开始增长,说明 ZMQ 链路已打通 + +## 64 导示例 + +```powershell +.\x64\Debug\XYParser64Demo.exe +``` + +流程: + +- 连接 64 导 TCP 设备 +- 启动后直接开启 Welch/PSD,不打开阻抗 +- 采样率和增益使用内置默认值 +- 默认 TCP 主机为 `127.0.0.1` +- 默认 TCP 端口为 `5086` +- 默认 trigger 串口为 `COM44` +- demo 作为 ZMQ client 连接算法服务端 `tcp://127.0.0.1:8100` +- 按 `train-duration-ms` 周期循环交替发送 `TRAIN_0 / TRAIN_1` + +常用覆盖参数: + +- `--tcp-port 5086` +- `--trigger-com COM44` +- `--algorithm-host 127.0.0.1` +- `--algorithm-port 8100` +- `--train-duration-ms 3000` + +## 8 导示例 + +```powershell +.\x64\Debug\XYParser8Demo.exe +``` + +默认: + +- 8 导数据串口为 `COM44` +- 串口波特率为 `460800` +- 算法 ZMQ 地址为 `tcp://127.0.0.1:8100` +- 启动后先发送一次 8 导阻抗开启命令 +- 在阻抗模式下持续约 `60` 秒并打印阻抗结果 +- 之后自动发送阻抗关闭命令,再恢复 Welch/PSD 和算法链路 + +8 导转 64 导的流程: + +- 先解析成 8 导 `XYParserFrameSummary` +- 再通过 `XYParser_Convert8ChFramesTo64Ch` 扩展成 64 导 +- 8 个已知导联按固定位置写入 64 导 summary +- 其余未覆盖的 56 个 64 导导联全部补 `0` + +8 导到 64 导导联映射图: + +```text +8ch[0] -> PO5 +8ch[1] -> POZ +8ch[2] -> PO6 +8ch[3] -> PO7 +8ch[4] -> O1 +8ch[5] -> OZ +8ch[6] -> O2 +8ch[7] -> PO8 +others -> 0 +``` + +表格形式: + +| 8导索引 | 64导导联 | +| --- | --- | +| 0 | PO5 | +| 1 | POZ | +| 2 | PO6 | +| 3 | PO7 | +| 4 | O1 | +| 5 | OZ | +| 6 | O2 | +| 7 | PO8 | + +代码依据: + +- 8 导映射表定义在 `k8ChLeadMap` +- 转换实现为 `XYParser_Convert8ChFramesTo64Ch -> Convert8ChSummaryTo64ChSummary` + +常用覆盖参数: + +- `--data-com COM44` +- `--baud 460800` +- `--bypass-checksum 1` +- `--algorithm-host 127.0.0.1` +- `--algorithm-port 8100` +- `--algorithm-timeout-ms 200` +- `--vref 4.5` diff --git a/XYParser/XYParserWorkflowDemo/XYAlgorithmUdpServer.vcxproj b/XYParser/XYParserWorkflowDemo/XYAlgorithmUdpServer.vcxproj new file mode 100644 index 0000000..6fb1ff5 --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/XYAlgorithmUdpServer.vcxproj @@ -0,0 +1,170 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + 17.0 + Win32Proj + {6D6DCD3D-995A-4E79-9338-C1D36A3D2A61} + XYAlgorithmUdpServer + 10.0 + + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + + + + + + + + + + + + + + + + + E:\Gitea\Swallow\XYParadigmDll\ZMQ + $(ZmqRoot)\include + $(ZmqRoot)\msvclib + $(ZmqLibDir)\libzmq-v142-mt-4_3_4.dll + E:\Gitea\Swallow\SwallowBCI\release\decoder_mainSSVEP\_internal\libsodium.dll + + + $(SolutionDir)$(Platform)\$(Configuration)\ + $(SolutionDir)$(Platform)\$(Configuration)\XYAlgorithmUdpServer\ + XYAlgorithmUdpServer + + + + $(ZmqIncludeDir);%(AdditionalIncludeDirectories) + + + $(ZmqLibDir);%(AdditionalLibraryDirectories) + libzmq-v142-mt-4_3_4.lib;%(AdditionalDependencies) + + + if exist "$(ZmqDllPath)" (copy /Y "$(ZmqDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libzmq dll) +if exist "$(SodiumDllPath)" (copy /Y "$(SodiumDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libsodium dll) +exit /b 0 + + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + _DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + + + + diff --git a/XYParser/XYParserWorkflowDemo/XYParser64Demo.vcxproj b/XYParser/XYParserWorkflowDemo/XYParser64Demo.vcxproj new file mode 100644 index 0000000..93fd345 --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/XYParser64Demo.vcxproj @@ -0,0 +1,178 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + 17.0 + Win32Proj + {A8B5A8D9-2E6A-4F36-8F79-9D52B0A1D101} + XYParser64Demo + 10.0 + + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + + + + + + + + + + + + + + + + + E:\Gitea\Swallow\XYParadigmDll\ZMQ + $(ZmqRoot)\include + $(ZmqRoot)\msvclib + $(ZmqLibDir)\libzmq-v142-mt-4_3_4.dll + E:\Gitea\Swallow\SwallowBCI\release\decoder_mainSSVEP\_internal\libsodium.dll + + + $(SolutionDir)$(Platform)\$(Configuration)\ + $(SolutionDir)$(Platform)\$(Configuration)\XYParserWorkflowDemo\ + XYParser64Demo + + + + $(ZmqIncludeDir);%(AdditionalIncludeDirectories) + + + $(ZmqLibDir);%(AdditionalLibraryDirectories) + libzmq-v142-mt-4_3_4.lib;%(AdditionalDependencies) + + + if exist "$(ZmqDllPath)" (copy /Y "$(ZmqDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libzmq dll) +if exist "$(SodiumDllPath)" (copy /Y "$(SodiumDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libsodium dll) +exit /b 0 + + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;XY_WORKFLOW_DEMO_64;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;XY_WORKFLOW_DEMO_64;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + _DEBUG;_CONSOLE;XY_WORKFLOW_DEMO_64;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;XY_WORKFLOW_DEMO_64;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + + + + {CB1FF804-BB1F-41C8-92FA-7B15F6B86347} + false + true + false + + + + + diff --git a/XYParser/XYParserWorkflowDemo/XYParser8Demo.vcxproj b/XYParser/XYParserWorkflowDemo/XYParser8Demo.vcxproj new file mode 100644 index 0000000..fd86e82 --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/XYParser8Demo.vcxproj @@ -0,0 +1,178 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + 17.0 + Win32Proj + {1B7FA4A1-8BC2-4D49-9B5A-BD4C6B8F2107} + XYParser8Demo + 10.0 + + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + + + + + + + + + + + + + + + + + E:\Gitea\Swallow\XYParadigmDll\ZMQ + $(ZmqRoot)\include + $(ZmqRoot)\msvclib + $(ZmqLibDir)\libzmq-v142-mt-4_3_4.dll + E:\Gitea\Swallow\SwallowBCI\release\decoder_mainSSVEP\_internal\libsodium.dll + + + $(SolutionDir)$(Platform)\$(Configuration)\ + $(SolutionDir)$(Platform)\$(Configuration)\XYParserWorkflow8Demo\ + XYParser8Demo + + + + $(ZmqIncludeDir);%(AdditionalIncludeDirectories) + + + $(ZmqLibDir);%(AdditionalLibraryDirectories) + libzmq-v142-mt-4_3_4.lib;%(AdditionalDependencies) + + + if exist "$(ZmqDllPath)" (copy /Y "$(ZmqDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libzmq dll) +if exist "$(SodiumDllPath)" (copy /Y "$(SodiumDllPath)" "$(OutDir)" >nul 2>nul || echo Skip copying libsodium dll) +exit /b 0 + + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;XY_WORKFLOW_DEMO_8;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;XY_WORKFLOW_DEMO_8;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + _DEBUG;_CONSOLE;XY_WORKFLOW_DEMO_8;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;XY_WORKFLOW_DEMO_8;%(PreprocessorDefinitions) + true + stdcpp17 + $(SolutionDir);$(ProjectDir);%(AdditionalIncludeDirectories) + NotUsing + + + Console + true + Ws2_32.lib;%(AdditionalDependencies) + + + + + + + + {CB1FF804-BB1F-41C8-92FA-7B15F6B86347} + false + true + false + + + + + diff --git a/XYParser/XYParserWorkflowDemo/main.cpp b/XYParser/XYParserWorkflowDemo/main.cpp new file mode 100644 index 0000000..881e11c --- /dev/null +++ b/XYParser/XYParserWorkflowDemo/main.cpp @@ -0,0 +1,1336 @@ +#define NOMINMAX +#include +#include +#include +#include + +#include "../XYParserApi.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +using Clock = std::chrono::steady_clock; + +enum class DemoMode { + Mode64Tcp, + Mode8Serial +}; + +#if defined(XY_WORKFLOW_DEMO_64) +constexpr DemoMode kDemoMode = DemoMode::Mode64Tcp; +constexpr wchar_t kProgramName[] = L"XYParser64Demo"; +constexpr char kDefaultTcpHost[] = "127.0.0.1"; +constexpr wchar_t kDefaultDataComPort[] = L""; +constexpr wchar_t kDefaultTriggerComPort[] = L"COM44"; +#elif defined(XY_WORKFLOW_DEMO_8) +constexpr DemoMode kDemoMode = DemoMode::Mode8Serial; +constexpr wchar_t kProgramName[] = L"XYParser8Demo"; +constexpr char kDefaultTcpHost[] = "127.0.0.1"; +constexpr wchar_t kDefaultDataComPort[] = L"COM44"; +constexpr wchar_t kDefaultTriggerComPort[] = L""; +#else +#error Define either XY_WORKFLOW_DEMO_64 or XY_WORKFLOW_DEMO_8 for this target. +#endif + +struct ZmqRoundTripStats { + std::uint64_t sent_packets = 0; + std::uint64_t sent_bytes = 0; + std::uint64_t received_packets = 0; + std::uint64_t received_bytes = 0; + std::uint64_t timeout_count = 0; + std::uint64_t invalid_payload_count = 0; +}; + +struct DemoOptions { + DemoMode mode = kDemoMode; + std::string tcp_host = kDefaultTcpHost; + int tcp_port = 5086; + std::string algorithm_host = "127.0.0.1"; + int algorithm_port = 8100; + int algorithm_timeout_ms = 200; + std::wstring data_com_port = kDefaultDataComPort; + std::wstring trigger_com_port = kDefaultTriggerComPort; + int serial_baud_rate = 460800; + int sample_rate = 250; + int gain = 6; + double vref = 4.5; + int bypass_checksum = 1; + int train_duration_ms = 3000; +}; + +std::string FormatIpv4Endpoint(const sockaddr_in& addr) +{ + char address_text[INET_ADDRSTRLEN] = {}; + if (InetNtopA(AF_INET, + const_cast(&addr.sin_addr), + address_text, + static_cast(std::size(address_text))) == nullptr) { + std::snprintf(address_text, std::size(address_text), ""); + } + std::ostringstream stream; + stream << address_text << ':' << ntohs(addr.sin_port); + return stream.str(); +} + +class ParserHandleGuard { +public: + explicit ParserHandleGuard(XYParserHandle handle) : handle_(handle) {} + ~ParserHandleGuard() + { + if (handle_ != nullptr) { + XYParser_DestroyParser(handle_); + } + } + + ParserHandleGuard(const ParserHandleGuard&) = delete; + ParserHandleGuard& operator=(const ParserHandleGuard&) = delete; + + XYParserHandle get() const { return handle_; } + +private: + XYParserHandle handle_ = nullptr; +}; + +class WinsockRuntime { +public: + WinsockRuntime() + { + WSADATA wsa_data{}; + ok_ = (WSAStartup(MAKEWORD(2, 2), &wsa_data) == 0); + } + + ~WinsockRuntime() + { + if (ok_) { + WSACleanup(); + } + } + + bool ok() const { return ok_; } + +private: + bool ok_ = false; +}; + +class TcpClient { +public: + ~TcpClient() { Close(); } + + bool Connect(const std::string& host, int port) + { + addrinfo hints{}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + addrinfo* result = nullptr; + const std::string port_text = std::to_string(port); + if (getaddrinfo(host.c_str(), port_text.c_str(), &hints, &result) != 0) { + return false; + } + + bool connected = false; + for (addrinfo* current = result; current != nullptr; current = current->ai_next) { + SOCKET sock = socket(current->ai_family, current->ai_socktype, current->ai_protocol); + if (sock == INVALID_SOCKET) { + continue; + } + + if (connect(sock, current->ai_addr, static_cast(current->ai_addrlen)) == 0) { + DWORD timeout_ms = 200; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout_ms), sizeof(timeout_ms)); + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout_ms), sizeof(timeout_ms)); + socket_ = sock; + connected = true; + break; + } + + closesocket(sock); + } + + freeaddrinfo(result); + return connected; + } + + bool Send(const std::uint8_t* data, std::size_t size) + { + if (socket_ == INVALID_SOCKET) { + return false; + } + + std::size_t sent_total = 0; + while (sent_total < size) { + const int sent = send(socket_, + reinterpret_cast(data + sent_total), + static_cast(size - sent_total), + 0); + if (sent <= 0) { + return false; + } + sent_total += static_cast(sent); + } + return true; + } + + int Receive(std::uint8_t* buffer, int capacity, bool& disconnected) + { + disconnected = false; + if (socket_ == INVALID_SOCKET) { + disconnected = true; + return -1; + } + + const int received = recv(socket_, reinterpret_cast(buffer), capacity, 0); + if (received > 0) { + return received; + } + if (received == 0) { + disconnected = true; + return -1; + } + + const int error = WSAGetLastError(); + if (error == WSAETIMEDOUT || error == WSAEWOULDBLOCK) { + return 0; + } + + disconnected = true; + return -1; + } + + void Close() + { + if (socket_ != INVALID_SOCKET) { + closesocket(socket_); + socket_ = INVALID_SOCKET; + } + } + +private: + SOCKET socket_ = INVALID_SOCKET; +}; + +std::string BuildZmqTcpEndpoint(const std::string& host, int port) +{ + return "tcp://" + host + ':' + std::to_string(port); +} + +class ZmqDuplexClient { +public: + ~ZmqDuplexClient() { Close(); } + + bool Open(const std::string& remote_host, int remote_port, int timeout_ms) + { + Close(); + last_error_text_.clear(); + configured_remote_endpoint_ = BuildZmqTcpEndpoint(remote_host, remote_port); + + context_ = zmq_ctx_new(); + if (context_ == nullptr) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + socket_ = zmq_socket(context_, ZMQ_DEALER); + if (socket_ == nullptr) { + last_error_text_ = zmq_strerror(zmq_errno()); + Close(); + return false; + } + + const int linger_ms = 0; + zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)); + zmq_setsockopt(socket_, ZMQ_RCVTIMEO, &timeout_ms, sizeof(timeout_ms)); + zmq_setsockopt(socket_, ZMQ_SNDTIMEO, &timeout_ms, sizeof(timeout_ms)); + + identity_ = "xyparser-" + std::to_string(GetCurrentProcessId()) + '-' + std::to_string(GetTickCount64()); + if (zmq_setsockopt(socket_, + ZMQ_IDENTITY, + identity_.data(), + static_cast(identity_.size())) != 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + Close(); + return false; + } + + if (zmq_connect(socket_, configured_remote_endpoint_.c_str()) != 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + Close(); + return false; + } + + return true; + } + + bool Send(const std::uint8_t* data, std::size_t size) + { + if (socket_ == nullptr) { + last_error_text_ = "socket not open"; + return false; + } + + const int sent = zmq_send(socket_, data, size, 0); + if (sent != static_cast(size)) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + last_error_text_.clear(); + return true; + } + + int Receive(std::uint8_t* buffer, int capacity, bool& timed_out) + { + timed_out = false; + if (socket_ == nullptr) { + last_error_text_ = "socket not open"; + return -1; + } + + const int received = zmq_recv(socket_, buffer, capacity, 0); + if (received >= 0) { + last_error_text_.clear(); + return received; + } + + const int error = zmq_errno(); + last_error_text_ = zmq_strerror(error); + if (error == EAGAIN) { + timed_out = true; + return 0; + } + + return -1; + } + + const std::string& last_error() const { return last_error_text_; } + const std::string& configured_remote_endpoint() const { return configured_remote_endpoint_; } + const std::string& identity() const { return identity_; } + + void Close() + { + if (socket_ != nullptr) { + zmq_close(socket_); + socket_ = nullptr; + } + if (context_ != nullptr) { + zmq_ctx_term(context_); + context_ = nullptr; + } + } + +private: + void* context_ = nullptr; + void* socket_ = nullptr; + std::string last_error_text_; + std::string configured_remote_endpoint_; + std::string identity_; +}; + +class SerialPort { +public: + ~SerialPort() { Close(); } + + bool Open(const std::wstring& port_name, int baud_rate) + { + const std::wstring full_name = (port_name.rfind(L"\\\\.\\", 0) == 0) + ? port_name + : (L"\\\\.\\" + port_name); + + handle_ = CreateFileW(full_name.c_str(), + GENERIC_READ | GENERIC_WRITE, + 0, + nullptr, + OPEN_EXISTING, + 0, + nullptr); + if (handle_ == INVALID_HANDLE_VALUE) { + handle_ = nullptr; + return false; + } + + DCB dcb{}; + dcb.DCBlength = sizeof(dcb); + if (!GetCommState(handle_, &dcb)) { + Close(); + return false; + } + + dcb.BaudRate = static_cast(baud_rate); + dcb.ByteSize = 8; + dcb.Parity = NOPARITY; + dcb.StopBits = ONESTOPBIT; + dcb.fBinary = TRUE; + dcb.fParity = FALSE; + + if (!SetCommState(handle_, &dcb)) { + Close(); + return false; + } + + COMMTIMEOUTS timeouts{}; + timeouts.ReadIntervalTimeout = 20; + timeouts.ReadTotalTimeoutConstant = 50; + timeouts.ReadTotalTimeoutMultiplier = 0; + timeouts.WriteTotalTimeoutConstant = 200; + timeouts.WriteTotalTimeoutMultiplier = 0; + if (!SetCommTimeouts(handle_, &timeouts)) { + Close(); + return false; + } + + SetupComm(handle_, 64 * 1024, 64 * 1024); + PurgeComm(handle_, PURGE_RXCLEAR | PURGE_TXCLEAR); + return true; + } + + bool Write(const std::uint8_t* data, std::size_t size) + { + if (handle_ == nullptr) { + return false; + } + + std::size_t written_total = 0; + while (written_total < size) { + DWORD written = 0; + if (!WriteFile(handle_, + data + written_total, + static_cast(size - written_total), + &written, + nullptr)) { + return false; + } + if (written == 0) { + return false; + } + written_total += static_cast(written); + } + return true; + } + + int Read(std::uint8_t* buffer, int capacity, bool& disconnected) + { + disconnected = false; + if (handle_ == nullptr) { + disconnected = true; + return -1; + } + + DWORD bytes_read = 0; + if (!ReadFile(handle_, buffer, static_cast(capacity), &bytes_read, nullptr)) { + disconnected = true; + return -1; + } + return static_cast(bytes_read); + } + + void Close() + { + if (handle_ != nullptr) { + CloseHandle(handle_); + handle_ = nullptr; + } + } + +private: + HANDLE handle_ = nullptr; +}; + +std::string Narrow(const std::wstring& text) +{ + if (text.empty()) { + return {}; + } + + const int size = WideCharToMultiByte(CP_UTF8, 0, text.c_str(), -1, nullptr, 0, nullptr, nullptr); + if (size <= 1) { + return {}; + } + + std::vector buffer(static_cast(size), '\0'); + WideCharToMultiByte(CP_UTF8, 0, text.c_str(), -1, buffer.data(), size, nullptr, nullptr); + std::string result(buffer.data()); + return result; +} + +bool ParseInt(const std::wstring& text, int& value) +{ + wchar_t* end = nullptr; + const long parsed = std::wcstol(text.c_str(), &end, 10); + if (end == nullptr || *end != L'\0') { + return false; + } + value = static_cast(parsed); + return true; +} + +bool ParseDouble(const std::wstring& text, double& value) +{ + wchar_t* end = nullptr; + value = std::wcstod(text.c_str(), &end); + return end != nullptr && *end == L'\0'; +} + +bool ParseBool(const std::wstring& text, bool& value) +{ + if (text == L"1" || text == L"true" || text == L"TRUE") { + value = true; + return true; + } + if (text == L"0" || text == L"false" || text == L"FALSE") { + value = false; + return true; + } + return false; +} + +void PrintUsage() +{ + std::wcout + << kProgramName << L" usage:\n"; +#if defined(XY_WORKFLOW_DEMO_64) + std::wcout + << L" " << kProgramName << L" [--tcp-host 127.0.0.1] [--tcp-port 5086]\n" + << L" [--trigger-com COM44]\n" + << L" [--algorithm-host 127.0.0.1 --algorithm-port 8100]\n" + << L" [--train-duration-ms 3000]\n"; +#else + std::wcout + << L" " << kProgramName << L" [--data-com COM44] [--baud 460800]\n" + << L" [--algorithm-host 127.0.0.1 --algorithm-port 8100]\n"; +#endif + std::wcout + << L"\n" + << L"\n" + << L"Notes:\n" +#if defined(XY_WORKFLOW_DEMO_64) + << L" - Default TCP host is 127.0.0.1 and default trigger serial is COM44.\n" + << L" - Receive 64ch stream from TCP, optional trigger serial for TRAIN_0/TRAIN_1.\n" +#else + << L" - Default 8ch data serial is COM44.\n" + << L" - Receive 8ch stream from serial, then run 8ch -> 64ch -> algorithm data -> Welch.\n" +#endif + << L" - Sample rate and gain use built-in defaults.\n" + << L" - Impedance and Welch/PSD are enabled immediately after startup.\n" + << L" - Algorithm path always uses ZMQ DEALER/ROUTER: demo connects to algorithm server and waits for response.\n"; +} + +bool ParseArguments(int argc, wchar_t* argv[], DemoOptions& options) +{ + if (argc < 1) { + return false; + } + + for (int i = 1; i < argc; ++i) { + const std::wstring arg = argv[i]; + if (arg == L"--tcp-host" && i + 1 < argc) { + options.tcp_host = Narrow(argv[++i]); + } else if (arg == L"--tcp-port" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.tcp_port)) { + return false; + } + } else if (arg == L"--algorithm-host" && i + 1 < argc) { + options.algorithm_host = Narrow(argv[++i]); + } else if ((arg == L"--algorithm-port" || arg == L"--algorithm-udp-port") && i + 1 < argc) { + if (!ParseInt(argv[++i], options.algorithm_port)) { + return false; + } + } else if (arg == L"--algorithm-timeout-ms" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.algorithm_timeout_ms)) { + return false; + } + } else if (arg == L"--data-com" && i + 1 < argc) { + options.data_com_port = argv[++i]; + } else if (arg == L"--trigger-com" && i + 1 < argc) { + options.trigger_com_port = argv[++i]; + } else if (arg == L"--baud" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.serial_baud_rate)) { + return false; + } + } else if (arg == L"--vref" && i + 1 < argc) { + if (!ParseDouble(argv[++i], options.vref)) { + return false; + } + } else if (arg == L"--bypass-checksum" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.bypass_checksum)) { + return false; + } + } else if (arg == L"--train-duration-ms" && i + 1 < argc) { + if (!ParseInt(argv[++i], options.train_duration_ms)) { + return false; + } + } else if (arg == L"--help" || arg == L"-h") { + return false; + } else { + return false; + } + } + + if (options.algorithm_port <= 0) { + return false; + } + +#if defined(XY_WORKFLOW_DEMO_64) + return options.tcp_port > 0; +#else + return !options.data_com_port.empty(); +#endif +} + +std::string DescribeParserError(XYParserHandle parser) +{ + const char* error = XYParser_GetLastError(parser); + return (error != nullptr && error[0] != '\0') ? error : std::string("no extra error"); +} + +void PrintBytes(const char* label, const std::uint8_t* data, std::size_t size) +{ + std::ostringstream stream; + stream << label << " ["; + for (std::size_t i = 0; i < size; ++i) { + if (i > 0) { + stream << ' '; + } + stream << std::hex << std::setw(2) << std::setfill('0') << static_cast(data[i]); + } + stream << ']'; + std::cout << stream.str() << std::endl; +} + +void PrintImpedanceSummary(const XYParserImpedanceSummary& summary) +{ + std::cout << "[Impedance] sampleRate=" << summary.sample_rate + << " window=" << summary.window_sample_count; + + std::array leads{}; + const int lead_count = XYParser_GetLeadMap(summary.channel_count, + leads.data(), + static_cast(leads.size())); + + if (summary.channel_count == 8 && lead_count == 8) { + for (int i = 0; i < lead_count; ++i) { + const XYParserLeadChannelNumber lead = leads[static_cast(i)]; + const char* lead_name = XYParser_GetLeadName(lead); + std::cout << ' ' << (lead_name != nullptr ? lead_name : "lead?") + << '=' << summary.impedance_values[lead]; + } + std::cout << std::endl; + return; + } + + int printed = 0; + for (int i = 0; i < XYPARSER_MAX_CHANNELS; ++i) { + if (summary.impedance_values[i] == 0) { + continue; + } + const auto lead = static_cast(i); + const char* lead_name = XYParser_GetLeadName(lead); + std::cout << ' ' << (lead_name != nullptr ? lead_name : "lead?") + << '=' << summary.impedance_values[i]; + ++printed; + } + if (printed == 0 && lead_count > 0) { + for (int i = 0; i < lead_count; ++i) { + const XYParserLeadChannelNumber lead = leads[static_cast(i)]; + const char* lead_name = XYParser_GetLeadName(lead); + std::cout << ' ' << (lead_name != nullptr ? lead_name : "lead?") + << '=' << summary.impedance_values[lead]; + } + } + std::cout << std::endl; +} + +int FindPeakFrequencyIndex(const XYParserWelchSummary& summary, XYParserLeadChannelNumber lead) +{ + if (summary.frequency_count == 0) { + return -1; + } + + const std::size_t lead_index = static_cast(lead); + std::uint32_t best_index = 0; + double best_value = summary.psd_values[lead_index][0]; + for (std::uint32_t i = 1; i < summary.frequency_count; ++i) { + if (summary.psd_values[lead_index][i] > best_value) { + best_value = summary.psd_values[lead_index][i]; + best_index = i; + } + } + return static_cast(best_index); +} + +XYParserLeadChannelNumber FindDominantWelchLead(const XYParserWelchSummary& summary) +{ + std::array leads{}; + const int lead_count = XYParser_GetLeadMap(summary.channel_count, + leads.data(), + static_cast(leads.size())); + const int channel_count = (lead_count > 0) ? lead_count : static_cast(summary.channel_count); + XYParserLeadChannelNumber best_lead = LeadChannel_FP1; + double best_psd = -1.0; + + for (int channel_index = 0; channel_index < channel_count; ++channel_index) { + XYParserLeadChannelNumber lead = static_cast(channel_index); + if (lead_count > 0) { + lead = leads[static_cast(channel_index)]; + } + + const int peak_index = FindPeakFrequencyIndex(summary, lead); + if (peak_index < 0) { + continue; + } + + const double peak_psd = summary.psd_values[lead][static_cast(peak_index)]; + if (peak_psd > best_psd) { + best_psd = peak_psd; + best_lead = lead; + } + } + + return best_lead; +} + +void PrintWelchSummary(const XYParserWelchSummary& summary) +{ + const XYParserLeadChannelNumber lead = FindDominantWelchLead(summary); + const int peak_index = FindPeakFrequencyIndex(summary, lead); + const char* alpha_name = XYParser_GetWelchBandName(2); + const char* gamma_name = XYParser_GetWelchBandName(4); + const char* lead_name = XYParser_GetLeadName(lead); + + std::cout << "[Welch] sampleRate=" << summary.sample_rate + << " window=" << summary.window_sample_count + << " freqCount=" << summary.frequency_count + << " lead=" << (lead_name != nullptr ? lead_name : "lead?"); + if (peak_index >= 0) { + std::cout << " peakHz=" << summary.frequencies[static_cast(peak_index)] + << " peakPsd=" << summary.psd_values[lead][static_cast(peak_index)]; + } + std::cout << ' ' << (alpha_name != nullptr ? alpha_name : "alpha") + << '=' << summary.band_values[2][lead] + << ' ' << (gamma_name != nullptr ? gamma_name : "gamma") + << '=' << summary.band_values[4][lead] + << std::endl; +} + +void DrainImpedance(XYParserHandle parser) +{ + std::array summaries{}; + while (true) { + const int count = XYParser_ReadImpedance(parser, summaries.data(), static_cast(summaries.size())); + if (count <= 0) { + break; + } + for (int i = 0; i < count; ++i) { + PrintImpedanceSummary(summaries[static_cast(i)]); + } + } +} + +void DrainWelch(XYParserHandle parser) +{ + std::array summaries{}; + while (true) { + const int count = XYParser_ReadWelch(parser, summaries.data(), static_cast(summaries.size())); + if (count <= 0) { + break; + } + for (int i = 0; i < count; ++i) { + if (summaries[static_cast(i)].ok != 0) { + PrintWelchSummary(summaries[static_cast(i)]); + } + } + } +} + +void PrintZmqStats(const ZmqRoundTripStats& stats) +{ + std::cout << "[AlgorithmZmq] txPackets=" << stats.sent_packets + << " txBytes=" << stats.sent_bytes + << " rxPackets=" << stats.received_packets + << " rxBytes=" << stats.received_bytes + << " timeouts=" << stats.timeout_count + << " invalidPayloads=" << stats.invalid_payload_count + << std::endl; +} + +bool RunAlgorithmZmqRoundTrip(XYParserHandle parser, + ZmqDuplexClient& zmq_client, + const double* algorithm_data, + ZmqRoundTripStats& stats) +{ + const std::size_t payload_size = XYPARSER_FRAME_ALGORITHM_VALUE_COUNT * sizeof(double); + if (!zmq_client.Send(reinterpret_cast(algorithm_data), payload_size)) { + std::cerr << "Send algorithm ZMQ payload failed" + << " remote=" << zmq_client.configured_remote_endpoint() + << " zmqError=" << zmq_client.last_error() + << std::endl; + return false; + } + ++stats.sent_packets; + stats.sent_bytes += payload_size; + + std::array response_buffer{}; + bool timed_out = false; + const int received = zmq_client.Receive(response_buffer.data(), + static_cast(response_buffer.size()), + timed_out); + if (timed_out) { + ++stats.timeout_count; + std::cerr << "Receive algorithm ZMQ payload timed out" + << " remote=" << zmq_client.configured_remote_endpoint() + << " zmqError=" << zmq_client.last_error() + << std::endl; + PrintZmqStats(stats); + return false; + } + if (received <= 0) { + std::cerr << "Receive algorithm ZMQ payload failed" + << " remote=" << zmq_client.configured_remote_endpoint() + << " zmqError=" << zmq_client.last_error() + << std::endl; + PrintZmqStats(stats); + return false; + } + if (received % static_cast(sizeof(double)) != 0) { + ++stats.invalid_payload_count; + std::cerr << "Algorithm ZMQ payload size is not aligned to double" << std::endl; + PrintZmqStats(stats); + return false; + } + if (static_cast(received) != payload_size) { + ++stats.invalid_payload_count; + std::cerr << "Algorithm ZMQ payload size mismatch: expectedBytes=" << payload_size + << " expectedDoubles=" << XYPARSER_FRAME_ALGORITHM_VALUE_COUNT + << " actualBytes=" << received + << " actualDoubles=" << (received / static_cast(sizeof(double))) + << std::endl; + PrintZmqStats(stats); + return false; + } + ++stats.received_packets; + stats.received_bytes += static_cast(received); + + if (stats.sent_packets <= 3 || (stats.sent_packets % 100) == 0) { + PrintZmqStats(stats); + } + + XYParser_FeedAlgorithmData(parser, + response_buffer.data(), + static_cast(received), + nullptr, + 0); + return true; +} + +bool Send64GainAndSampleRate(TcpClient& client, int gain, int sample_rate) +{ + std::array command{}; + const int size = XYParser_Serialize64GainSampleRateCommand(static_cast(gain), + static_cast(sample_rate), + command.data(), + command.size()); + if (size <= 0) { + return false; + } + PrintBytes("Send64GainSampleRate", command.data(), static_cast(size)); + return client.Send(command.data(), static_cast(size)); +} + +bool Send64ImpedanceSwitch(TcpClient& client, bool open) +{ + std::array command{}; + const int size = XYParser_Serialize64ImpedanceCommand(open ? 1 : 0, command.data(), command.size()); + if (size <= 0) { + return false; + } + PrintBytes(open ? "Send64ImpedanceOpen" : "Send64ImpedanceClose", + command.data(), + static_cast(size)); + return client.Send(command.data(), static_cast(size)); +} + +bool Send8ImpedanceSwitch(SerialPort& port, bool open) +{ + std::array command{}; + const int size = XYParser_Serialize8ChImpedanceCommand(open ? 1 : 0, command.data(), command.size()); + if (size <= 0) { + return false; + } + PrintBytes(open ? "Send8ImpedanceOpen" : "Send8ImpedanceClose", + command.data(), + static_cast(size)); + return port.Write(command.data(), static_cast(size)); +} + +bool Process64AlgorithmPath(XYParserHandle parser, + const XYParserFrameSummary& summary, + ZmqDuplexClient& zmq_client, + ZmqRoundTripStats& zmq_stats) +{ + std::array algorithm_data{}; + if (XYParser_ConvertSampleFramesToAlgorithmData(&summary, algorithm_data.data()) == 0) { + std::cerr << "ConvertSampleFramesToAlgorithmData failed: " << DescribeParserError(parser) << std::endl; + return false; + } + return RunAlgorithmZmqRoundTrip(parser, zmq_client, algorithm_data.data(), zmq_stats); +} + +bool Process8AlgorithmPath(XYParserHandle parser, + const XYParserFrameSummary& summary, + ZmqDuplexClient& zmq_client, + ZmqRoundTripStats& zmq_stats) +{ + XYParserFrameSummary converted{}; + if (XYParser_Convert8ChFramesTo64Ch(&summary, 1, &converted, 1) != 1) { + std::cerr << "Convert8ChFramesTo64Ch failed" << std::endl; + return false; + } + return Process64AlgorithmPath(parser, converted, zmq_client, zmq_stats); +} + +bool SendTrigger(SerialPort& port, XYParserTriggerType trigger_type) +{ + std::array command{}; + const int size = XYParser_SerializeTriggerCommand(static_cast(trigger_type), + command.data(), + command.size()); + if (size <= 0) { + return false; + } + PrintBytes("SendTrigger", command.data(), static_cast(size)); + return port.Write(command.data(), static_cast(size)); +} + +void PrintFrameProgress(const XYParserFrameSummary* summaries, int count) +{ + if (count <= 0) { + return; + } + const XYParserFrameSummary& last = summaries[static_cast(count - 1)]; +#if defined(XY_WORKFLOW_DEMO_64) + struct DeviceStateSnapshot { + std::uint8_t impedance_enabled = 0; + std::uint8_t current_gain = 0; + std::uint16_t current_sample_rate_hz = 0; + std::uint8_t cap_type = 0; + std::uint8_t gnd_detached = 0; + }; + + const DeviceStateSnapshot current_state = { + last.impedance_enabled, + last.current_gain, + last.current_sample_rate_hz, + last.cap_type, + last.gnd_detached}; + + static bool has_last_state = false; + static DeviceStateSnapshot last_state{}; + if (has_last_state && + current_state.impedance_enabled == last_state.impedance_enabled && + current_state.current_gain == last_state.current_gain && + current_state.current_sample_rate_hz == last_state.current_sample_rate_hz && + current_state.cap_type == last_state.cap_type && + current_state.gnd_detached == last_state.gnd_detached) { + return; + } + + last_state = current_state; + has_last_state = true; + std::cout << "[Feed] frameCount=" << count + << " lastFrameIndex=" << last.frame_index + << " impedanceEnabled=" << static_cast(last.impedance_enabled) + << " currentGain=" << static_cast(last.current_gain) + << " currentSampleRateHz=" << last.current_sample_rate_hz + << " capType=" << static_cast(last.cap_type) + << " gndDetached=" << static_cast(last.gnd_detached); +#else + std::cout << "[Feed] frameCount=" << count + << " lastFrameIndex=" << last.frame_index + << " battery=" << static_cast(last.battery); +#endif + std::cout + << std::endl; +} + +const char* DescribeTriggerType(std::uint8_t trigger_type) +{ + switch (trigger_type) { + case XYPARSER_TRIGGER_NONE: + return "NONE"; + case XYPARSER_TRIGGER_TRAIN_0: + return "TRAIN_0"; + case XYPARSER_TRIGGER_TRAIN_1: + return "TRAIN_1"; + default: + return "UNKNOWN"; + } +} + +void PrintActiveChannelSamples(const XYParserFrameSummary& summary) +{ + std::array leads{}; + const int lead_count = XYParser_GetLeadMap(summary.channel_count, + leads.data(), + static_cast(leads.size())); + const int channel_count = (lead_count > 0) ? lead_count : static_cast(summary.channel_count); + + for (std::uint8_t sample_index = 0; sample_index < summary.sample_count; ++sample_index) { + std::ostringstream channel_stream; + int active_channel_count = 0; + + for (int channel_index = 0; channel_index < channel_count; ++channel_index) { + const double value_uv = summary.channel_values_uv[sample_index][channel_index]; + if (value_uv == 0.0) { + continue; + } + + XYParserLeadChannelNumber lead = static_cast(channel_index); + if (lead_count > 0) { + lead = leads[static_cast(channel_index)]; + } + const char* lead_name = XYParser_GetLeadName(lead); + + if (active_channel_count > 0) { + channel_stream << " | "; + } + channel_stream << (lead_name != nullptr ? lead_name : "lead?") + << '=' << value_uv << "uV"; + ++active_channel_count; + } + + if (active_channel_count == 0) { + continue; + } + + const std::uint8_t trigger_type = summary.sample_trigger_types[sample_index]; + const std::uint8_t trigger_index = summary.sample_trigger_indices[sample_index]; + std::cout << "[ActiveSample] frame=" << summary.frame_index + << " sample=" << static_cast(sample_index) + << " trigger=" << static_cast(trigger_type) + << '(' << DescribeTriggerType(trigger_type) << ')' + << " triggerIndex=" << static_cast(trigger_index) + << " channels=" << channel_stream.str() + << std::endl; + } +} + +void PrintTriggerEvents(const XYParserFrameSummary& summary) +{ + for (std::uint8_t sample_index = 0; sample_index < summary.sample_count; ++sample_index) { + const std::uint8_t trigger_type = summary.sample_trigger_types[sample_index]; + const std::uint8_t trigger_index = summary.sample_trigger_indices[sample_index]; + if (trigger_type == XYPARSER_TRIGGER_NONE && trigger_index == 0) { + continue; + } + + std::cout << "[TriggerRx] frame=" << summary.frame_index + << " sample=" << static_cast(sample_index) + << " trigger=" << static_cast(trigger_type) + << '(' << DescribeTriggerType(trigger_type) << ')' + << " triggerIndex=" << static_cast(trigger_index) + << std::endl; + } +} + +int Run64Workflow(const DemoOptions& options) +{ + constexpr auto kImpedanceDuration = std::chrono::seconds(10); + constexpr int kImpedanceSampleRate = 250; + constexpr int kImpedanceGain = 24; + constexpr int kNormalSampleRate = 250; + constexpr int kNormalGain = 6; + WinsockRuntime winsock; + if (!winsock.ok()) { + std::cerr << "WSAStartup failed" << std::endl; + return 1; + } + + TcpClient data_client; + if (!data_client.Connect(options.tcp_host, options.tcp_port)) { + std::cerr << "Connect 64ch TCP failed: " << options.tcp_host << ':' << options.tcp_port << std::endl; + return 1; + } + + SerialPort trigger_port; + const bool has_trigger_port = !options.trigger_com_port.empty(); + if (has_trigger_port && !trigger_port.Open(options.trigger_com_port, options.serial_baud_rate)) { + std::cerr << "Open trigger serial failed: " << Narrow(options.trigger_com_port) << std::endl; + return 1; + } + + ParserHandleGuard parser(XYParser_CreateParser(64)); + if (parser.get() == nullptr) { + std::cerr << "Create 64ch parser failed" << std::endl; + return 1; + } + + XYParser_SetAdcParams(parser.get(), options.vref, static_cast(kNormalGain)); + XYParser_SetSampleRate(parser.get(), kNormalSampleRate); + XYParser_SetBypassChecksum(parser.get(), options.bypass_checksum); + XYParser_SetWelchDetection(parser.get(), 0); + XYParser_SetImpedanceDetection(parser.get(), 0); + + if (!Send64ImpedanceSwitch(data_client, true)) { + std::cerr << "Send 64ch impedance open command failed" << std::endl; + return 1; + } + if (!Send64GainAndSampleRate(data_client, kImpedanceGain, kImpedanceSampleRate)) { + std::cerr << "Send 64ch impedance gain/sample-rate command failed" << std::endl; + return 1; + } + XYParser_SetSampleRate(parser.get(), kImpedanceSampleRate); + XYParser_SetImpedanceDetection(parser.get(), 1); + + ZmqDuplexClient algorithm_zmq; + if (!algorithm_zmq.Open(options.algorithm_host, + options.algorithm_port, + options.algorithm_timeout_ms)) { + std::cerr << "Open algorithm ZMQ client failed" + << " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port) + << " zmqError=" << algorithm_zmq.last_error() + << std::endl; + return 1; + } + std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity() + << " algorithmHost=" << options.algorithm_host + << " algorithmPort=" << options.algorithm_port + << std::endl; + std::cout << "64ch impedance enabled for " + << std::chrono::duration_cast(kImpedanceDuration).count() + << " seconds before cyclic trigger" << std::endl; + if (has_trigger_port) { + std::cout << "64ch cyclic trigger armed: alternating TRAIN_0/TRAIN_1 every " + << options.train_duration_ms << " ms" << std::endl; + } + + XYParserTriggerType next_trigger_type = XYPARSER_TRIGGER_TRAIN_0; + bool impedance_phase = true; + const Clock::time_point impedance_end_time = Clock::now() + kImpedanceDuration; + Clock::time_point next_trigger_time = Clock::time_point::max(); + + std::array read_buffer{}; + std::array frame_summaries{}; + ZmqRoundTripStats zmq_stats{}; + + while (true) { + if (!impedance_phase && has_trigger_port && Clock::now() >= next_trigger_time) { + if (!SendTrigger(trigger_port, next_trigger_type)) { + std::cerr << "Send " + << (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0 ? "TRAIN_0" : "TRAIN_1") + << " failed" << std::endl; + return 1; + } + next_trigger_type = (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0) + ? XYPARSER_TRIGGER_TRAIN_1 + : XYPARSER_TRIGGER_TRAIN_0; + next_trigger_time = Clock::now() + std::chrono::milliseconds(options.train_duration_ms); + } + + bool disconnected = false; + const int received = data_client.Receive(read_buffer.data(), static_cast(read_buffer.size()), disconnected); + if (disconnected) { + std::cout << "64ch TCP disconnected" << std::endl; + break; + } + if (received <= 0) { + continue; + } + + const int frame_count = XYParser_Feed(parser.get(), + read_buffer.data(), + static_cast(received), + frame_summaries.data(), + static_cast(frame_summaries.size())); + if (frame_count > 0) { + PrintFrameProgress(frame_summaries.data(), frame_count); + } + if (impedance_phase) { + DrainImpedance(parser.get()); + if (Clock::now() >= impedance_end_time) { + if (!Send64GainAndSampleRate(data_client, kNormalGain, kNormalSampleRate)) { + std::cerr << "Send 64ch normal gain/sample-rate command failed" << std::endl; + return 1; + } + if (!Send64ImpedanceSwitch(data_client, false)) { + std::cerr << "Send 64ch impedance close command failed" << std::endl; + return 1; + } + XYParser_SetSampleRate(parser.get(), kNormalSampleRate); + XYParser_SetImpedanceDetection(parser.get(), 0); + XYParser_SetWelchDetection(parser.get(), 1); + DrainImpedance(parser.get()); + std::cout << "64ch impedance disabled after " + << std::chrono::duration_cast(kImpedanceDuration).count() + << " seconds, Welch/PSD and cyclic trigger resumed" << std::endl; + impedance_phase = false; + next_trigger_time = Clock::now(); + } + continue; + } + for (int i = 0; i < frame_count; ++i) { + PrintTriggerEvents(frame_summaries[static_cast(i)]); + Process64AlgorithmPath(parser.get(), + frame_summaries[static_cast(i)], + algorithm_zmq, + zmq_stats); + } + DrainWelch(parser.get()); + } + + return 0; +} + +int Run8Workflow(const DemoOptions& options) +{ + constexpr auto kImpedanceDuration = std::chrono::seconds(60); + WinsockRuntime winsock; + if (!winsock.ok()) { + std::cerr << "WSAStartup failed" << std::endl; + return 1; + } + + SerialPort data_port; + if (!data_port.Open(options.data_com_port, options.serial_baud_rate)) { + std::cerr << "Open 8ch serial failed: " << Narrow(options.data_com_port) << std::endl; + return 1; + } + + ParserHandleGuard parser(XYParser_CreateParser(8)); + if (parser.get() == nullptr) { + std::cerr << "Create 8ch parser failed" << std::endl; + return 1; + } + + XYParser_SetAdcParams(parser.get(), options.vref, static_cast(options.gain)); + XYParser_SetSampleRate(parser.get(), options.sample_rate); + XYParser_SetBypassChecksum(parser.get(), options.bypass_checksum); + XYParser_SetWelchDetection(parser.get(), 0); + XYParser_SetImpedanceDetection(parser.get(), 1); + + if (!Send8ImpedanceSwitch(data_port, true)) { + std::cerr << "Send 8ch impedance open command failed" << std::endl; + return 1; + } + + ZmqDuplexClient algorithm_zmq; + if (!algorithm_zmq.Open(options.algorithm_host, + options.algorithm_port, + options.algorithm_timeout_ms)) { + std::cerr << "Open algorithm ZMQ client failed" + << " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port) + << " zmqError=" << algorithm_zmq.last_error() + << std::endl; + return 1; + } + std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity() + << " algorithmHost=" << options.algorithm_host + << " algorithmPort=" << options.algorithm_port + << std::endl; + std::cout << "8ch impedance enabled for " << std::chrono::duration_cast(kImpedanceDuration).count() + << " seconds, then close and resume Welch/PSD" << std::endl; + std::array read_buffer{}; + std::array frame_summaries{}; + ZmqRoundTripStats zmq_stats{}; + bool impedance_phase = true; + const Clock::time_point impedance_end_time = Clock::now() + kImpedanceDuration; + + while (true) { + bool disconnected = false; + const int received = data_port.Read(read_buffer.data(), static_cast(read_buffer.size()), disconnected); + if (disconnected) { + std::cout << "8ch serial disconnected or read failed" << std::endl; + break; + } + if (received <= 0) { + continue; + } + + const int frame_count = XYParser_Feed(parser.get(), + read_buffer.data(), + static_cast(received), + frame_summaries.data(), + static_cast(frame_summaries.size())); + if (frame_count > 0) { + PrintFrameProgress(frame_summaries.data(), frame_count); + } + if (impedance_phase) { + DrainImpedance(parser.get()); + if (Clock::now() >= impedance_end_time) { + if (!Send8ImpedanceSwitch(data_port, false)) { + std::cerr << "Send 8ch impedance close command failed" << std::endl; + return 1; + } + XYParser_SetImpedanceDetection(parser.get(), 0); + XYParser_SetWelchDetection(parser.get(), 1); + DrainImpedance(parser.get()); + std::cout << "8ch impedance disabled after " + << std::chrono::duration_cast(kImpedanceDuration).count() + << " seconds, Welch/PSD resumed" << std::endl; + impedance_phase = false; + } + continue; + } + for (int i = 0; i < frame_count; ++i) { + PrintTriggerEvents(frame_summaries[static_cast(i)]); + Process8AlgorithmPath(parser.get(), + frame_summaries[static_cast(i)], + algorithm_zmq, + zmq_stats); + } + DrainWelch(parser.get()); + } + + return 0; +} + +} // namespace + +int wmain(int argc, wchar_t* argv[]) +{ + DemoOptions options{}; +#if defined(XY_WORKFLOW_DEMO_64) + constexpr int kImpedanceEnabled = 0; + constexpr int kParserGainDuringRun = 6; +#else + constexpr int kImpedanceEnabled = 1; + constexpr int kParserGainDuringRun = 24; +#endif + if (!ParseArguments(argc, argv, options)) { + PrintUsage(); + return 1; + } + + std::cout << "bypassChecksum=" << options.bypass_checksum + << " sampleRate=" << options.sample_rate + << " gain=" << options.gain + << " algorithmZmq=1" + << " impedance=" << kImpedanceEnabled + << " welch=1" + << std::endl; + std::cout << "workflowState=" + << " impedanceEnabled=" << kImpedanceEnabled + << " parserGain=" << kParserGainDuringRun + << " deviceGainTarget=" << options.gain + << std::endl; + std::cout << "algorithmPayloadDoubles=" << XYPARSER_FRAME_ALGORITHM_VALUE_COUNT + << " algorithmPayloadBytes=" << (XYPARSER_FRAME_ALGORITHM_VALUE_COUNT * sizeof(double)) + << std::endl; + +#if defined(XY_WORKFLOW_DEMO_64) + return Run64Workflow(options); +#else + return Run8Workflow(options); +#endif +} diff --git a/XYParserDataFlow.md b/XYParserDataFlow.md index 1a6d01d..86d1b85 100644 --- a/XYParserDataFlow.md +++ b/XYParserDataFlow.md @@ -1,32 +1,6 @@ # XYParser 数据流与接口时序说明 -## 1. 参与角色 - -- **64导EEG采集设备** - - 持续输出原始 EEG 字节流。 -- **上位机** - - 负责接收设备数据、调用 XYParser 库、对接算法模块。 -- **XYParser 库** - - 负责帧解析、阻抗计算、算法数据回灌后的 Welch/PSD 计算。 -- **算法** - - 接收上位机送入的算法数据,并输出用于 PSD/Welch 计算的数据。 - -## 2. 总体链路 - -当前流程可以分为两个阶段: - -- **阶段一:阻抗检测阶段** - - 设备连接后,首先下发采样率和增益配置命令。 - - 然后下发阻抗开启命令。 - - 在该阶段持续接收设备数据,并通过 `XYParser_ReadImpedance` 读取阻抗结果。 - - 阻抗检测结束后,下发阻抗关闭命令。 - -- **阶段二:常规采集与算法阶段** - - 基于 `XYParser_Feed` 解析得到的帧数据继续做常规采集。 - - 帧数据先转换为算法数据,再送入算法模块。 - - Welch/PSD 不再直接基于帧解析结果计算,而是基于算法数据,通过 `XYParser_FeedAlgorithmData` 输入后计算。 - -## 3. 接口时序图 +## 接口时序图 ### 3.1 64导初始化连接阶段 @@ -41,7 +15,7 @@ sequenceDiagram Host->>Lib: parser64 = XYParser_CreateParser(64) Host->>Lib: XYParser_SetAdcParams(parser64, 4.5, 6.0) Host->>Lib: XYParser_SetSampleRate(parser64, 250) - Host->>Lib: XYParser_SetBypassChecksum(parser64, 0) + Host->>Lib: XYParser_SetBypassChecksum(parser64, 1) Host->>Lib: gain_cmd_size = XYParser_Get64GainSampleRateCommandSize() Lib-->>Host: gain_cmd_size Host->>Lib: gain_cmd_bytes = XYParser_Serialize64GainSampleRateCommand(6, 250, gain_cmd_buf, gain_cmd_size) @@ -162,7 +136,7 @@ sequenceDiagram Host->>Lib: parser8 = XYParser_CreateParser(8) Host->>Lib: XYParser_SetAdcParams(parser8, vref, gain) Host->>Lib: XYParser_SetSampleRate(parser8, sample_rate) - Host->>Lib: XYParser_SetBypassChecksum(parser8, 0) + Host->>Lib: XYParser_SetBypassChecksum(parser8, 1) ``` ### 3.6 8导阻抗阶段 @@ -235,204 +209,54 @@ sequenceDiagram Host->>Lib: XYParser_DestroyParser(parser8) ``` -## 4. 关键接口职责 +### 3.8 8导转64导导联映射关系 -### 4.1 设备参数配置 +8导 workflow 在送入算法前,会先调用 `XYParser_Convert8ChFramesTo64Ch` 将 8 导帧扩展为 64 导帧。 -- `XYParser_SetAdcParams` - - 设置库内使用的 ADC 参考电压和增益参数。 - - 该参数影响原始采样值到微伏值的换算。 +- 8 个输入通道按固定导联位置写入 64 导 summary +- 未覆盖到的其余 56 个 64 导导联全部补 `0` +- `trigger type` 和 `trigger index` 原样透传 -- `XYParser_SetSampleRate` - - 设置库内处理逻辑使用的采样率。 - - 该参数影响阻抗、Welch/PSD 等后续处理。 +映射图如下: -- `XYParser_Get64GainSampleRateCommandSize` - - 获取 64 导设备增益和采样率配置命令所需的缓冲区大小。 +```text +8ch[0] -> PO5 +8ch[1] -> POZ +8ch[2] -> PO6 +8ch[3] -> PO7 +8ch[4] -> O1 +8ch[5] -> OZ +8ch[6] -> O2 +8ch[7] -> PO8 +others -> 0 +``` -- `XYParser_Serialize64GainSampleRateCommand` - - 根据目标增益和采样率生成下发给 64 导设备的命令字节流。 - - 上位机拿到该命令后发送给 EEG 设备,使设备端采样参数与库内配置保持一致。 - - 当前推荐流程中: - - `vref` 固定为 `4.5`。 - - 设备连接成功后,先下发 `250Hz + 增益6`。 - - 开启阻抗前,下发 `250Hz + 增益24`,同时调用 `XYParser_SetImpedanceDetection(handle, 1)`,使库内 gain 自动切到 `24`。 - - 关闭阻抗后,再下发 `250Hz + 增益6`,同时调用 `XYParser_SetImpedanceDetection(handle, 0)`,使库内 gain 自动恢复到 `6`。 +也可以理解为下面这张对应表: -- `XYParser_Get64ImpedanceCommandSize` - - 获取 64 导阻抗开关命令所需的缓冲区大小。 +| 8导索引 | 8导写入到的64导导联 | +| --- | --- | +| 0 | PO5 | +| 1 | POZ | +| 2 | PO6 | +| 3 | PO7 | +| 4 | O1 | +| 5 | OZ | +| 6 | O2 | +| 7 | PO8 | -- `XYParser_Serialize64ImpedanceCommand` - - 生成下发给 64 导设备的阻抗开关命令字节流。 - - `open = 1` 表示开启阻抗检测,`open = 0` 表示关闭阻抗检测。 +转换过程示意: -- `XYParser_Get8ChImpedanceCommandSize` - - 获取 8 导阻抗开关命令所需的缓冲区大小。 +```text +XYParser_Feed(8导原始数据) + -> frame8_summary + -> XYParser_Convert8ChFramesTo64Ch + -> frame64_summary + -> XYParser_ConvertSampleFramesToAlgorithmData + -> algorithm_input_data +``` -- `XYParser_Serialize8ChImpedanceCommand` - - 生成下发给 8 导设备的阻抗开关命令字节流。 - - `open = 1` 表示开启阻抗检测,`open = 0` 表示关闭阻抗检测。 - - 8 导流程中,设备连接后**不需要**额外下发增益和采样率命令。 - -### 4.2 原始数据解析 - -- `XYParser_Feed` - - 输入设备原始字节流。 - - 输出解析后的 `XYParserFrameSummary` 数组。 - - 该接口当前仍负责驱动阻抗相关计算。 - - 该接口当前**不再驱动 Welch/PSD 计算**。 - -### 4.3 阻抗读取 - -- `XYParser_SetImpedanceDetection` - - 控制是否启用阻抗检测。 - - 启用时,库内 ADC 增益自动切换到 `24`。 - - 关闭时,库内 ADC 增益自动恢复到 `6`。 - - 该接口只修改库内解析参数,不会自动给设备发送控制命令。 - -- `XYParser_ReadImpedance` - - 读取当前已经累计完成的阻抗结果。 - - 阻抗结果来源于阻抗检测阶段的帧解析链路。 - -### 4.4 帧转算法数据 - -- `XYParser_ConvertSampleFramesToAlgorithmData` - - 将单帧 `XYParserFrameSummary` 转为算法需要的连续数组。 - - 上位机通常在拿到帧数据后调用此接口,再把结果送入算法模块。 +代码依据: - `XYParser_Convert8ChFramesTo64Ch` - - 将 8 导帧转换为 64 导帧,未映射导联补 0。 - - 8 导流程中,在送算法数据前,需要先把 8 导帧转换为 64 导帧,再调用 `XYParser_ConvertSampleFramesToAlgorithmData`。 - -### 4.5 算法数据回灌 - -- `XYParser_FeedAlgorithmData` - - 输入算法数据字节流。 - - 内部先按采样缓存,再按每 5 个采样组装为一帧。 - - 同时驱动 Welch/PSD 计算。 - - 可选输出重新组装后的 `XYParserFrameSummary`。 - -- `XYParser_ResetAlgorithmDataCache` - - 清空算法数据缓存。 - - 适合在切换任务、重置状态时调用。 - -- `XYParser_FlushAlgorithmData` - - 将缓存中不足 5 个采样的尾数据补齐为 1 帧输出。 - - 用于结束阶段处理残留数据。 - -### 4.6 Welch/PSD 读取 - -- `XYParser_SetWelchDetection` - - 控制是否启用基于算法数据的 Welch 检测。 - -- `XYParser_ReadWelch` - - 读取当前已累计完成的 Welch/PSD 结果。 - - Welch 结果当前仅来源于 `XYParser_FeedAlgorithmData`。 - -## 5. 当前设计结论 - -### 5.1 阻抗数据来源 - -- 阻抗在独立的阻抗检测阶段获取。 -- 即: - - 设备连接成功 - - 下发采样率250和增益6命令 - - `XYParser_SetImpedanceDetection(handle, 1)` - - 下发采样率250和增益24命令 - - 下发阻抗开启命令 - - 设备原始字节流 - - `XYParser_Feed` - - `XYParser_ReadImpedance` - - `XYParser_SetImpedanceDetection(handle, 0)` - - 下发阻抗关闭命令 - - 下发采样率250和增益6命令 - -### 5.2 PSD 数据来源 - -- PSD/Welch 不再直接使用 `XYParser_Feed` 解析出来的帧数据。 -- 当前流程为: - - 设备原始字节流 - - `XYParser_Feed` - - `XYParser_ConvertSampleFramesToAlgorithmData` - - 算法处理 - - `XYParser_FeedAlgorithmData` - - `XYParser_ReadWelch` - -### 5.3 8导算法数据来源 - -- 8 导流程中,算法输入和 Welch/PSD 仍然按 64 导数据格式处理。 -- 当前流程为: - - 8 导设备原始字节流 - - `XYParser_Feed` - - `XYParser_Convert8ChFramesTo64Ch` - - `XYParser_ConvertSampleFramesToAlgorithmData` - - 算法处理 - - `XYParser_FeedAlgorithmData` - - `XYParser_ReadWelch` - -## 6. 推荐调用顺序 - -### 6.1 64导推荐调用顺序 - -```text -1. EEG 设备连接成功 -2. CreateParser -3. SetAdcParams(4.5, 6) / SetSampleRate(250) / SetBypassChecksum -4. Get64GainSampleRateCommandSize / Serialize64GainSampleRateCommand(6, 250) -5. 上位机向设备下发采样率250、增益6命令 -6. SetImpedanceDetection(1) -7. Get64GainSampleRateCommandSize / Serialize64GainSampleRateCommand(24, 250) -8. 上位机向设备下发采样率250、增益24命令 -9. Get64ImpedanceCommandSize / Serialize64ImpedanceCommand(1) -10. 上位机向设备下发阻抗开启命令 -11. 阻抗检测阶段循环: - 11.1 Feed 原始字节流,拿到帧 - 11.2 ReadImpedance 读取阻抗 -12. Serialize64ImpedanceCommand(0) -13. 上位机向设备下发阻抗关闭命令 -14. SetImpedanceDetection(0) -15. Get64GainSampleRateCommandSize / Serialize64GainSampleRateCommand(6, 250) -16. 上位机向设备下发采样率250、增益6命令 -17. SetWelchDetection -18. 常规采集阶段循环: - 18.1 Feed 原始字节流,拿到帧 - 18.2 将帧转换为算法数据 - 18.3 将算法数据送入算法模块 - 18.4 将算法输出数据通过 FeedAlgorithmData 回灌 - 18.5 ReadWelch 读取 PSD/Welch 结果 -19. 必要时 FlushAlgorithmData -20. DestroyParser -``` - -### 6.2 8导推荐调用顺序 - -```text -1. EEG 设备连接成功 -2. CreateParser -3. SetAdcParams(vref, gain) / SetSampleRate(sample_rate) / SetBypassChecksum -4. SetImpedanceDetection(1) -5. Get8ChImpedanceCommandSize / Serialize8ChImpedanceCommand(1) -6. 上位机向设备下发阻抗开启命令 -7. 阻抗检测阶段循环: - 7.1 Feed 原始字节流,拿到8导帧 - 7.2 ReadImpedance 读取阻抗 -8. Serialize8ChImpedanceCommand(0) -9. 上位机向设备下发阻抗关闭命令 -10. SetImpedanceDetection(0) -11. SetWelchDetection -12. 常规采集阶段循环: - 12.1 Feed 原始字节流,拿到8导帧 - 12.2 将8导帧转换为64导帧 - 12.3 将64导帧转换为算法数据 - 12.4 将算法数据送入算法模块 - 12.5 将算法输出数据通过 FeedAlgorithmData 回灌 - 12.6 ReadWelch 读取 PSD/Welch 结果 -13. 必要时 FlushAlgorithmData -14. DestroyParser -``` - -## 7. 一句话总结 - -- **阻抗是独立阶段,先开阻抗、持续读取、再关阻抗。** -- **Welch/PSD 走算法数据链路。** -- **8导送算法前,先转成64导数据。** +- `Convert8ChSummaryTo64ChSummary` +- 8导映射表 `k8ChLeadMap`