From 3ac9fe8050b9f19b382b44de2a1337666b8bdfc8 Mon Sep 17 00:00:00 2001 From: lvpeng Date: Tue, 9 Jun 2026 19:14:50 +0800 Subject: [PATCH] bug fix --- XYParser/XYParserApi.cpp | 8 +- XYParser/XYParserApi.h | 1 + XYParser/XYParserTests/Tests.cpp | 32 +- .../AlgorithmUdpServer.cpp | 105 ++++-- XYParser/XYParserWorkflowDemo/main.cpp | 307 ++++++++++++++---- 5 files changed, 339 insertions(+), 114 deletions(-) diff --git a/XYParser/XYParserApi.cpp b/XYParser/XYParserApi.cpp index bbc2714..b12c15c 100644 --- a/XYParser/XYParserApi.cpp +++ b/XYParser/XYParserApi.cpp @@ -24,7 +24,7 @@ constexpr int k8ChLeadCount = 8; constexpr int k64ChLeadCount = 64; constexpr std::uint8_t kAlgorithmChannelCount = 64; constexpr std::size_t kAlgorithmSampleByteCount = - static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT) * sizeof(double); + static_cast(XYPARSER_MAX_CHANNELS) * sizeof(double); struct AlgorithmSample { std::array channel_values_uv{}; @@ -582,13 +582,13 @@ int XYParser_FeedAlgorithmData(XYParserHandle handle, while (context->algorithm_byte_cache.size() >= kAlgorithmSampleByteCount) { AlgorithmSample sample{}; - std::array row{}; + std::array row{}; std::memcpy(row.data(), context->algorithm_byte_cache.data(), kAlgorithmSampleByteCount); for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) { sample.channel_values_uv[channel_index] = row[channel_index]; } - sample.trigger_type = static_cast(row[XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX]); - sample.trigger_index = static_cast(row[XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX]); + sample.trigger_type = 0; + sample.trigger_index = 0; context->welch_processor.PushSample(sample.channel_values_uv); context->algorithm_sample_cache.push_back(sample); context->algorithm_byte_cache.erase( diff --git a/XYParser/XYParserApi.h b/XYParser/XYParserApi.h index 76b113f..d09fcd8 100644 --- a/XYParser/XYParserApi.h +++ b/XYParser/XYParserApi.h @@ -296,6 +296,7 @@ XYPARSER_API int XYParser_ConvertSampleFramesToAlgorithmData(const XYParserFrame double* output_data); // 向算法数据缓存输入字节流,并按每 5 个采样组装为帧,同时驱动 Welch 计算。 +// 算法回包格式为每个采样点 64 个 double,仅包含 64 路通道值,不包含 trigger_type/trigger_index。 // @param handle 目标解析器句柄。 // @param data 输入的算法数据字节流。 // @param size 输入数据的字节数。 diff --git a/XYParser/XYParserTests/Tests.cpp b/XYParser/XYParserTests/Tests.cpp index 0651986..40c52a1 100644 --- a/XYParser/XYParserTests/Tests.cpp +++ b/XYParser/XYParserTests/Tests.cpp @@ -15,6 +15,8 @@ // 匿名命名空间,包含测试辅助代码 namespace { +constexpr int kAlgorithmReturnColumnCount = XYPARSER_MAX_CHANNELS; + /// ParserGuard 类:RAII 封装,确保解析器资源自动释放 /// 当对象生命周期结束时自动调用 XYParser_DestroyParser 释放资源 class ParserGuard { @@ -199,9 +201,9 @@ std::vector BuildAlgorithmDataForSingleChannel(int sample_rate, double secondary_amplitude = 0.0) { std::vector algorithm_data( - static_cast(sample_rate * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0); + static_cast(sample_rate * kAlgorithmReturnColumnCount), 0.0); for (int sample = 0; sample < sample_rate; ++sample) { - const std::size_t sample_offset = static_cast(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT; + const std::size_t sample_offset = static_cast(sample) * kAlgorithmReturnColumnCount; algorithm_data[sample_offset] = static_cast(BuildCombinedSineRawValue(sample, sample_rate, primary_frequency_hz, @@ -219,9 +221,9 @@ std::vector BuildAlgorithmDataForTwoChannels(int sample_rate, double channel1_amplitude) { std::vector algorithm_data( - static_cast(sample_rate * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0); + static_cast(sample_rate * kAlgorithmReturnColumnCount), 0.0); for (int sample = 0; sample < sample_rate; ++sample) { - const std::size_t sample_offset = static_cast(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT; + const std::size_t sample_offset = static_cast(sample) * kAlgorithmReturnColumnCount; algorithm_data[sample_offset] = static_cast( BuildSineRawValue(sample, sample_rate, channel0_frequency_hz, channel0_amplitude)); algorithm_data[sample_offset + 1] = static_cast( @@ -645,15 +647,13 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS ASSERT_NE(parser.get(), nullptr); constexpr int kTotalSamples = 7; - constexpr int kColumnCount = XYPARSER_FRAME_DATA_COLUMN_COUNT; + constexpr int kColumnCount = kAlgorithmReturnColumnCount; std::array(kTotalSamples) * kColumnCount> input_data{}; for (int sample_index = 0; sample_index < kTotalSamples; ++sample_index) { const std::size_t row_offset = static_cast(sample_index) * kColumnCount; input_data[row_offset + 0] = 1000.0 + sample_index; input_data[row_offset + 10] = 2000.0 + sample_index; input_data[row_offset + 63] = 3000.0 + sample_index; - input_data[row_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX] = 10.0 + sample_index; - input_data[row_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX] = 20.0 + sample_index; } const std::uint8_t* input_bytes = reinterpret_cast(input_data.data()); const std::size_t first_chunk_size = 3 * static_cast(kColumnCount) * sizeof(double) + sizeof(double); @@ -681,8 +681,8 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[0][0], 1000.0); EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[2][10], 2002.0); EXPECT_DOUBLE_EQ(summaries[0].channel_values_uv[4][63], 3004.0); - EXPECT_EQ(summaries[0].sample_trigger_types[0], 10U); - EXPECT_EQ(summaries[0].sample_trigger_indices[4], 24U); + EXPECT_EQ(summaries[0].sample_trigger_types[0], 0U); + EXPECT_EQ(summaries[0].sample_trigger_indices[4], 0U); XYParserFrameSummary tail_summary{}; ASSERT_EQ(XYParser_FlushAlgorithmData(parser.get(), &tail_summary), 1); @@ -691,8 +691,8 @@ TEST(XYParserApiTests, FeedAlgorithmDataCachesSamplesBuildsFramesAndFlushesTailS EXPECT_EQ(tail_summary.sample_count, 2U); EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[0][0], 1005.0); EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[1][63], 3006.0); - EXPECT_EQ(tail_summary.sample_trigger_types[0], 15U); - EXPECT_EQ(tail_summary.sample_trigger_indices[1], 26U); + EXPECT_EQ(tail_summary.sample_trigger_types[0], 0U); + EXPECT_EQ(tail_summary.sample_trigger_indices[1], 0U); EXPECT_DOUBLE_EQ(tail_summary.channel_values_uv[2][0], 0.0); EXPECT_EQ(tail_summary.sample_trigger_types[2], 0U); @@ -1108,9 +1108,9 @@ TEST(XYParserApiTests, WelchReturnsOneResultAfterOneSecondFromAlgorithmData) XYParser_SetSampleRate(parser.get(), 10); XYParser_SetWelchDetection(parser.get(), 1); - std::vector algorithm_data(static_cast(10 * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0); + std::vector algorithm_data(static_cast(10 * kAlgorithmReturnColumnCount), 0.0); for (int sample = 0; sample < 10; ++sample) { - const std::size_t sample_offset = static_cast(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT; + const std::size_t sample_offset = static_cast(sample) * kAlgorithmReturnColumnCount; algorithm_data[sample_offset] = static_cast(BuildSineRawValue(sample, 10, 2.0, 1000000.0)); } @@ -1370,7 +1370,7 @@ TEST(XYParserApiTests, WelchResetClearsHalfWindowAfterDisableEnable) const std::vector full_data = BuildAlgorithmDataForSingleChannel(10, 2.0, 1000000.0); const std::size_t half_row_count = - static_cast(XYPARSER_SAMPLES_PER_FRAME) * XYPARSER_FRAME_DATA_COLUMN_COUNT; + static_cast(XYPARSER_SAMPLES_PER_FRAME) * kAlgorithmReturnColumnCount; const std::vector first_half(full_data.begin(), full_data.begin() + static_cast(half_row_count)); const std::vector second_half(full_data.begin() + static_cast(half_row_count), full_data.end()); @@ -1657,9 +1657,9 @@ TEST(XYParserApiTests, WelchDisabledDoesNotProduceResultsFromAlgorithmData) XYParser_SetSampleRate(parser.get(), 10); - std::vector algorithm_data(static_cast(10 * XYPARSER_FRAME_DATA_COLUMN_COUNT), 0.0); + std::vector algorithm_data(static_cast(10 * kAlgorithmReturnColumnCount), 0.0); for (int sample = 0; sample < 10; ++sample) { - const std::size_t sample_offset = static_cast(sample) * XYPARSER_FRAME_DATA_COLUMN_COUNT; + const std::size_t sample_offset = static_cast(sample) * kAlgorithmReturnColumnCount; algorithm_data[sample_offset] = static_cast(BuildSineRawValue(sample, 10, 2.0, 1000000.0)); } diff --git a/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp b/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp index dbd0d85..444c9ba 100644 --- a/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp +++ b/XYParser/XYParserWorkflowDemo/AlgorithmUdpServer.cpp @@ -401,48 +401,51 @@ public: ResponseMode mode, std::vector& response) { - response.resize(request_size); - if (mode == ResponseMode::Echo) { - std::memcpy(response.data(), request, request_size); - return request_size; - } - if (mode == ResponseMode::Zero) { - std::fill(response.begin(), response.end(), 0); - return request_size; - } - if ((request_size % sizeof(double)) != 0) { response.clear(); return 0; } const std::size_t value_count = request_size / sizeof(double); + if ((value_count % static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT)) != 0) { + response.clear(); + return 0; + } + const std::size_t sample_count = + value_count / static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); + const std::size_t response_value_count = + sample_count * static_cast(XYPARSER_MAX_CHANNELS); + const std::size_t response_size = response_value_count * sizeof(double); + response.resize(response_size); + const auto* input_values = reinterpret_cast(request); auto* output_values = reinterpret_cast(response.data()); - for (std::size_t sample_index = 0; sample_index < XYPARSER_SAMPLES_PER_FRAME; ++sample_index) { - const std::size_t sample_offset = - sample_index * static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); - for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) { - const std::size_t value_index = sample_offset + channel_index; - if (value_index >= value_count) { - response.clear(); - return 0; - } - output_values[value_index] = channels_[channel_index].Process(input_values[value_index]); - } + if (mode == ResponseMode::Zero) { + std::fill(response.begin(), response.end(), 0); + return response_size; + } - const std::size_t trigger_type_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_TYPE_INDEX; - const std::size_t trigger_index_index = sample_offset + XYPARSER_FRAME_DATA_TRIGGER_INDEX_INDEX; - if (trigger_index_index >= value_count) { + for (std::size_t sample_index = 0; sample_index < sample_count; ++sample_index) { + const std::size_t input_sample_offset = + sample_index * static_cast(XYPARSER_FRAME_DATA_COLUMN_COUNT); + const std::size_t output_sample_offset = + sample_index * static_cast(XYPARSER_MAX_CHANNELS); + for (std::size_t channel_index = 0; channel_index < XYPARSER_MAX_CHANNELS; ++channel_index) { + const std::size_t input_value_index = input_sample_offset + channel_index; + const std::size_t output_value_index = output_sample_offset + channel_index; + if (input_value_index >= value_count || output_value_index >= response_value_count) { response.clear(); return 0; } - output_values[trigger_type_index] = input_values[trigger_type_index]; - output_values[trigger_index_index] = input_values[trigger_index_index]; + output_values[output_value_index] = + (mode == ResponseMode::Echo) + ? input_values[input_value_index] + : channels_[channel_index].Process(input_values[input_value_index]); + } } - return request_size; + return response_size; } private: @@ -633,14 +636,46 @@ public: return -1; } - std::vector payload_frame; - if (!ReceiveFrame(payload_frame, has_more)) { + std::vector second_frame; + if (!ReceiveFrame(second_frame, has_more)) { timed_out = (last_error_text_ == "Resource temporarily unavailable"); return timed_out ? 0 : -1; } - if (has_more) { - last_error_text_ = "unexpected multipart payload"; - return -1; + + std::vector payload_frame; + if (!has_more) { + payload_frame = std::move(second_frame); + } else if (second_frame.empty()) { + if (!ReceiveFrame(payload_frame, has_more)) { + timed_out = (last_error_text_ == "Resource temporarily unavailable"); + return timed_out ? 0 : -1; + } + if (has_more) { + last_error_text_ = "unexpected multipart payload"; + return -1; + } + } else { + std::vector third_frame; + if (!ReceiveFrame(third_frame, has_more)) { + timed_out = (last_error_text_ == "Resource temporarily unavailable"); + return timed_out ? 0 : -1; + } + if (!has_more) { + payload_frame = std::move(third_frame); + } else { + if (!third_frame.empty()) { + last_error_text_ = "unexpected non-empty delimiter frame"; + return -1; + } + if (!ReceiveFrame(payload_frame, has_more)) { + timed_out = (last_error_text_ == "Resource temporarily unavailable"); + return timed_out ? 0 : -1; + } + if (has_more) { + last_error_text_ = "unexpected multipart payload"; + return -1; + } + } } if (payload_frame.size() > static_cast(capacity)) { last_error_text_ = "payload too large"; @@ -670,6 +705,12 @@ public: return false; } + const int sent_empty = zmq_send(socket_, nullptr, 0, ZMQ_SNDMORE); + if (sent_empty != 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + const int sent_payload = zmq_send(socket_, data, size, 0); if (sent_payload != size) { last_error_text_ = zmq_strerror(zmq_errno()); diff --git a/XYParser/XYParserWorkflowDemo/main.cpp b/XYParser/XYParserWorkflowDemo/main.cpp index 881e11c..26c79e8 100644 --- a/XYParser/XYParserWorkflowDemo/main.cpp +++ b/XYParser/XYParserWorkflowDemo/main.cpp @@ -58,9 +58,9 @@ struct DemoOptions { DemoMode mode = kDemoMode; std::string tcp_host = kDefaultTcpHost; int tcp_port = 5086; - std::string algorithm_host = "127.0.0.1"; + std::string algorithm_host = "192.168.254.102"; int algorithm_port = 8100; - int algorithm_timeout_ms = 200; + int algorithm_timeout_ms = 1000; std::wstring data_com_port = kDefaultDataComPort; std::wstring trigger_com_port = kDefaultTriggerComPort; int serial_baud_rate = 460800; @@ -282,8 +282,33 @@ public: return false; } - const int sent = zmq_send(socket_, data, size, 0); - if (sent != static_cast(size)) { + const Clock::time_point now = Clock::now(); + std::cout << "[AlgorithmZmqTx] payloadBytes=" << size; + if (last_send_time_.has_value()) { + const double delta_ms = + std::chrono::duration(now - *last_send_time_).count(); + std::cout << " deltaMs=" << delta_ms; + } + std::cout << std::endl; + last_send_time_ = now; + + const int sent_identity = zmq_send(socket_, + identity_.data(), + static_cast(identity_.size()), + ZMQ_SNDMORE); + if (sent_identity != static_cast(identity_.size())) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + const int sent_empty = zmq_send(socket_, nullptr, 0, ZMQ_SNDMORE); + if (sent_empty != 0) { + last_error_text_ = zmq_strerror(zmq_errno()); + return false; + } + + const int sent_payload = zmq_send(socket_, data, size, 0); + if (sent_payload != static_cast(size)) { last_error_text_ = zmq_strerror(zmq_errno()); return false; } @@ -299,20 +324,94 @@ public: return -1; } - const int received = zmq_recv(socket_, buffer, capacity, 0); - if (received >= 0) { - last_error_text_.clear(); - return received; + std::vector first_frame; + bool has_more = false; + if (!ReceiveFrame(first_frame, has_more, 0)) { + const int error = zmq_errno(); + last_error_text_ = zmq_strerror(error); + if (error == EAGAIN) { + timed_out = true; + return 0; + } + return -1; } - const int error = zmq_errno(); - last_error_text_ = zmq_strerror(error); - if (error == EAGAIN) { - timed_out = true; - return 0; + std::vector payload_frame; + if (has_more) { + if (!first_frame.empty()) { + last_error_text_ = "unexpected non-empty delimiter frame"; + return -1; + } + bool payload_has_more = false; + if (!ReceiveFrame(payload_frame, payload_has_more, 0)) { + const int error = zmq_errno(); + last_error_text_ = zmq_strerror(error); + if (error == EAGAIN) { + timed_out = true; + return 0; + } + return -1; + } + if (payload_has_more) { + last_error_text_ = "unexpected multipart payload"; + return -1; + } + } else { + payload_frame = std::move(first_frame); } - return -1; + if (payload_frame.size() > static_cast(capacity)) { + last_error_text_ = "payload too large"; + return -1; + } + + std::memcpy(buffer, payload_frame.data(), payload_frame.size()); + last_error_text_.clear(); + return static_cast(payload_frame.size()); + } + + int ReceiveNonBlocking(std::vector& payload, bool& no_message) + { + no_message = false; + if (socket_ == nullptr) { + last_error_text_ = "socket not open"; + return -1; + } + + std::vector first_frame; + bool has_more = false; + if (!ReceiveFrame(first_frame, has_more, ZMQ_DONTWAIT)) { + const int error = zmq_errno(); + last_error_text_ = zmq_strerror(error); + if (error == EAGAIN) { + no_message = true; + return 0; + } + return -1; + } + + std::vector payload_frame; + if (has_more) { + if (!first_frame.empty()) { + last_error_text_ = "unexpected non-empty delimiter frame"; + return -1; + } + bool payload_has_more = false; + if (!ReceiveFrame(payload_frame, payload_has_more, 0)) { + last_error_text_ = zmq_strerror(zmq_errno()); + return -1; + } + if (payload_has_more) { + last_error_text_ = "unexpected multipart payload"; + return -1; + } + } else { + payload_frame = std::move(first_frame); + } + + payload = std::move(payload_frame); + last_error_text_.clear(); + return static_cast(payload.size()); } const std::string& last_error() const { return last_error_text_; } @@ -329,14 +428,38 @@ public: zmq_ctx_term(context_); context_ = nullptr; } + last_send_time_.reset(); } private: + bool ReceiveFrame(std::vector& frame, bool& has_more, int flags) + { + zmq_msg_t message; + zmq_msg_init(&message); + const int received = zmq_msg_recv(&message, socket_, flags); + if (received < 0) { + zmq_msg_close(&message); + return false; + } + + const auto* data = static_cast(zmq_msg_data(&message)); + const std::size_t size = zmq_msg_size(&message); + frame.assign(data, data + size); + + int more = 0; + size_t more_size = sizeof(more); + zmq_getsockopt(socket_, ZMQ_RCVMORE, &more, &more_size); + has_more = (more != 0); + zmq_msg_close(&message); + return true; + } + void* context_ = nullptr; void* socket_ = nullptr; std::string last_error_text_; std::string configured_remote_endpoint_; std::string identity_; + std::optional last_send_time_; }; class SerialPort { @@ -781,56 +904,45 @@ bool RunAlgorithmZmqRoundTrip(XYParserHandle parser, ++stats.sent_packets; stats.sent_bytes += payload_size; - std::array response_buffer{}; - bool timed_out = false; - const int received = zmq_client.Receive(response_buffer.data(), - static_cast(response_buffer.size()), - timed_out); - if (timed_out) { - ++stats.timeout_count; - std::cerr << "Receive algorithm ZMQ payload timed out" - << " remote=" << zmq_client.configured_remote_endpoint() - << " zmqError=" << zmq_client.last_error() - << std::endl; - PrintZmqStats(stats); - return false; - } - if (received <= 0) { - std::cerr << "Receive algorithm ZMQ payload failed" - << " remote=" << zmq_client.configured_remote_endpoint() - << " zmqError=" << zmq_client.last_error() - << std::endl; - PrintZmqStats(stats); - return false; - } - if (received % static_cast(sizeof(double)) != 0) { - ++stats.invalid_payload_count; - std::cerr << "Algorithm ZMQ payload size is not aligned to double" << std::endl; - PrintZmqStats(stats); - return false; - } - if (static_cast(received) != payload_size) { - ++stats.invalid_payload_count; - std::cerr << "Algorithm ZMQ payload size mismatch: expectedBytes=" << payload_size - << " expectedDoubles=" << XYPARSER_FRAME_ALGORITHM_VALUE_COUNT - << " actualBytes=" << received - << " actualDoubles=" << (received / static_cast(sizeof(double))) - << std::endl; - PrintZmqStats(stats); - return false; - } - ++stats.received_packets; - stats.received_bytes += static_cast(received); + bool received_any_response = false; + while (true) { + std::vector response_payload; + bool no_message = false; + const int received = zmq_client.ReceiveNonBlocking(response_payload, no_message); + if (no_message) { + break; + } + if (received <= 0) { + std::cerr << "Receive algorithm ZMQ payload failed" + << " remote=" << zmq_client.configured_remote_endpoint() + << " zmqError=" << zmq_client.last_error() + << std::endl; + PrintZmqStats(stats); + return false; + } + if (received % static_cast(sizeof(double)) != 0) { + ++stats.invalid_payload_count; + std::cerr << "Algorithm ZMQ payload size is not aligned to double" << std::endl; + PrintZmqStats(stats); + return false; + } - if (stats.sent_packets <= 3 || (stats.sent_packets % 100) == 0) { - PrintZmqStats(stats); + received_any_response = true; + ++stats.received_packets; + stats.received_bytes += static_cast(received); + std::cout << "[AlgorithmZmqRx] payloadBytes=" << received + << " doubles=" << (received / static_cast(sizeof(double))) + << std::endl; + XYParser_FeedAlgorithmData(parser, + response_payload.data(), + response_payload.size(), + nullptr, + 0); } - XYParser_FeedAlgorithmData(parser, - response_buffer.data(), - static_cast(received), - nullptr, - 0); + if (received_any_response || stats.sent_packets <= 3 || (stats.sent_packets % 100) == 0) { + PrintZmqStats(stats); + } return true; } @@ -1046,7 +1158,7 @@ void PrintTriggerEvents(const XYParserFrameSummary& summary) int Run64Workflow(const DemoOptions& options) { - constexpr auto kImpedanceDuration = std::chrono::seconds(10); + constexpr auto kImpedanceDuration = std::chrono::seconds(0); constexpr int kImpedanceSampleRate = 250; constexpr int kImpedanceGain = 24; constexpr int kNormalSampleRate = 250; @@ -1062,17 +1174,38 @@ int Run64Workflow(const DemoOptions& options) std::cerr << "Connect 64ch TCP failed: " << options.tcp_host << ':' << options.tcp_port << std::endl; return 1; } + bool tcp_connected = true; + const auto close_data_tcp = [&]() { + if (!tcp_connected) { + return; + } + std::cout << "Close 64ch TCP connection" << std::endl; + data_client.Close(); + tcp_connected = false; + }; SerialPort trigger_port; const bool has_trigger_port = !options.trigger_com_port.empty(); + bool trigger_serial_open = false; + const auto close_trigger_serial = [&]() { + if (!trigger_serial_open) { + return; + } + std::cout << "Close trigger serial" << std::endl; + trigger_port.Close(); + trigger_serial_open = false; + }; if (has_trigger_port && !trigger_port.Open(options.trigger_com_port, options.serial_baud_rate)) { std::cerr << "Open trigger serial failed: " << Narrow(options.trigger_com_port) << std::endl; + close_data_tcp(); return 1; } + trigger_serial_open = has_trigger_port; ParserHandleGuard parser(XYParser_CreateParser(64)); if (parser.get() == nullptr) { std::cerr << "Create 64ch parser failed" << std::endl; + close_data_tcp(); return 1; } @@ -1084,16 +1217,32 @@ int Run64Workflow(const DemoOptions& options) if (!Send64ImpedanceSwitch(data_client, true)) { std::cerr << "Send 64ch impedance open command failed" << std::endl; + close_data_tcp(); return 1; } if (!Send64GainAndSampleRate(data_client, kImpedanceGain, kImpedanceSampleRate)) { std::cerr << "Send 64ch impedance gain/sample-rate command failed" << std::endl; + close_data_tcp(); return 1; } XYParser_SetSampleRate(parser.get(), kImpedanceSampleRate); XYParser_SetImpedanceDetection(parser.get(), 1); ZmqDuplexClient algorithm_zmq; + bool algorithm_zmq_open = false; + const auto close_algorithm_zmq = [&]() { + if (!algorithm_zmq_open) { + return; + } + std::cout << "Close algorithm ZMQ" << std::endl; + algorithm_zmq.Close(); + algorithm_zmq_open = false; + }; + const auto close_all = [&]() { + close_algorithm_zmq(); + close_trigger_serial(); + close_data_tcp(); + }; if (!algorithm_zmq.Open(options.algorithm_host, options.algorithm_port, options.algorithm_timeout_ms)) { @@ -1101,8 +1250,10 @@ int Run64Workflow(const DemoOptions& options) << " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port) << " zmqError=" << algorithm_zmq.last_error() << std::endl; + close_all(); return 1; } + algorithm_zmq_open = true; std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity() << " algorithmHost=" << options.algorithm_host << " algorithmPort=" << options.algorithm_port @@ -1130,6 +1281,7 @@ int Run64Workflow(const DemoOptions& options) std::cerr << "Send " << (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0 ? "TRAIN_0" : "TRAIN_1") << " failed" << std::endl; + close_all(); return 1; } next_trigger_type = (next_trigger_type == XYPARSER_TRIGGER_TRAIN_0) @@ -1161,10 +1313,12 @@ int Run64Workflow(const DemoOptions& options) if (Clock::now() >= impedance_end_time) { if (!Send64GainAndSampleRate(data_client, kNormalGain, kNormalSampleRate)) { std::cerr << "Send 64ch normal gain/sample-rate command failed" << std::endl; + close_all(); return 1; } if (!Send64ImpedanceSwitch(data_client, false)) { std::cerr << "Send 64ch impedance close command failed" << std::endl; + close_all(); return 1; } XYParser_SetSampleRate(parser.get(), kNormalSampleRate); @@ -1189,6 +1343,7 @@ int Run64Workflow(const DemoOptions& options) DrainWelch(parser.get()); } + close_all(); return 0; } @@ -1206,10 +1361,20 @@ int Run8Workflow(const DemoOptions& options) std::cerr << "Open 8ch serial failed: " << Narrow(options.data_com_port) << std::endl; return 1; } + bool data_serial_open = true; + const auto close_data_serial = [&]() { + if (!data_serial_open) { + return; + } + std::cout << "Close 8ch data serial" << std::endl; + data_port.Close(); + data_serial_open = false; + }; ParserHandleGuard parser(XYParser_CreateParser(8)); if (parser.get() == nullptr) { std::cerr << "Create 8ch parser failed" << std::endl; + close_data_serial(); return 1; } @@ -1221,10 +1386,24 @@ int Run8Workflow(const DemoOptions& options) if (!Send8ImpedanceSwitch(data_port, true)) { std::cerr << "Send 8ch impedance open command failed" << std::endl; + close_data_serial(); return 1; } ZmqDuplexClient algorithm_zmq; + bool algorithm_zmq_open = false; + const auto close_algorithm_zmq = [&]() { + if (!algorithm_zmq_open) { + return; + } + std::cout << "Close algorithm ZMQ" << std::endl; + algorithm_zmq.Close(); + algorithm_zmq_open = false; + }; + const auto close_all = [&]() { + close_algorithm_zmq(); + close_data_serial(); + }; if (!algorithm_zmq.Open(options.algorithm_host, options.algorithm_port, options.algorithm_timeout_ms)) { @@ -1232,8 +1411,10 @@ int Run8Workflow(const DemoOptions& options) << " remote=" << BuildZmqTcpEndpoint(options.algorithm_host, options.algorithm_port) << " zmqError=" << algorithm_zmq.last_error() << std::endl; + close_all(); return 1; } + algorithm_zmq_open = true; std::cout << "Algorithm ZMQ enabled: identity=" << algorithm_zmq.identity() << " algorithmHost=" << options.algorithm_host << " algorithmPort=" << options.algorithm_port @@ -1270,6 +1451,7 @@ int Run8Workflow(const DemoOptions& options) if (Clock::now() >= impedance_end_time) { if (!Send8ImpedanceSwitch(data_port, false)) { std::cerr << "Send 8ch impedance close command failed" << std::endl; + close_all(); return 1; } XYParser_SetImpedanceDetection(parser.get(), 0); @@ -1292,6 +1474,7 @@ int Run8Workflow(const DemoOptions& options) DrainWelch(parser.get()); } + close_all(); return 0; }