diff --git a/.changesets/ethernet-diagnostics-forwarding.md b/.changesets/ethernet-diagnostics-forwarding.md new file mode 100644 index 000000000..64c5fea6b --- /dev/null +++ b/.changesets/ethernet-diagnostics-forwarding.md @@ -0,0 +1,12 @@ +release: minor +summary: Add variable-length TCP orders, diagnostic forwarding, and flush throttling + +- `TcpOrderStreamParser` now supports variable-length orders: when an order's `get_size()` returns 0, all remaining stream data is passed to `parse()`, and the post-parse size is used for byte consumption. A `reset_for_receive()` hook is called after consumption so orders can reset state for the next packet. +- `Order` gains a virtual `reset_for_receive()` no-op that subclasses can override. +- `Socket` constructor no longer panics on `ERR_RTE` during `tcp_connect()` — sets `pending_connection_reset` instead, letting the connection retry when the route becomes available. +- `OrderProtocolDiagnosticSink` now accepts a target `OrderProtocol*` via its constructor, so diagnostic records are sent to a single socket instead of broadcasting to all open sockets. +- `DiagnosticTransportOrder` now parses incoming diagnostic orders (IDs 1555, 2555, 3000) and re-publishes them via `Hub::publish_runtime_*()`, enabling forwarding of remote board diagnostics to the control station. +- `Diagnostics::Hub::flush_pending` throttles records to at most one per 100 ms per priority level (urgent and non-urgent tracked separately) to prevent flooding sinks. +- Default TCP keepalive values tightened: 30 ms idle, 20 ms interval, 2 probes (commented out for now since it can't keep connection with the backend). +- `lwipopts.h`: `MEMP_NUM_SYS_TIMEOUT` increased by 10 to avoid pool exhaustion with SNTP and whatever else it may happen, `TCP_TMR_INTERVAL` reduced to 10 ms for faster keepalive response. +- Added `Diagnostics::install_ethernet_sink(OrderProtocol*)` to register the Ethernet sink with a specific target socket. diff --git a/Inc/HALAL/Models/Packets/Order.hpp b/Inc/HALAL/Models/Packets/Order.hpp index 69eeed0ea..f09f95694 100644 --- a/Inc/HALAL/Models/Packets/Order.hpp +++ b/Inc/HALAL/Models/Packets/Order.hpp @@ -16,6 +16,7 @@ class Order : public Packet { virtual void set_callback(void (*callback)(void)) = 0; virtual void process() = 0; virtual void parse(OrderProtocol* socket, uint8_t* data) = 0; + virtual void reset_for_receive() {} void store_ip_order(string& ip) { remote_ip = &ip; } void parse(uint8_t* data) override { parse(nullptr, data); } static void process_by_id(uint16_t id) { diff --git a/Inc/HALAL/Services/Communication/Ethernet/LWIP/Ethernet.hpp b/Inc/HALAL/Services/Communication/Ethernet/LWIP/Ethernet.hpp index f44d2ef3a..8372704ab 100644 --- a/Inc/HALAL/Services/Communication/Ethernet/LWIP/Ethernet.hpp +++ b/Inc/HALAL/Services/Communication/Ethernet/LWIP/Ethernet.hpp @@ -23,6 +23,10 @@ #define TCP_KEEPALIVE_TRIES_UNTIL_DISCONNECTION 10 #define TCP_SPACE_BETWEEN_KEEPALIVE_TRIES_MS 100 +// #define TCP_INACTIVITY_TIME_UNTIL_KEEPALIVE_MS 30 +// #define TCP_KEEPALIVE_TRIES_UNTIL_DISCONNECTION 2 +// #define TCP_SPACE_BETWEEN_KEEPALIVE_TRIES_MS 20 + class Ethernet { public: static bool is_ready; diff --git a/Inc/HALAL/Services/Communication/Ethernet/LWIP/TCP/TcpOrderStreamParser.hpp b/Inc/HALAL/Services/Communication/Ethernet/LWIP/TCP/TcpOrderStreamParser.hpp index bb5e08089..699da01d3 100644 --- a/Inc/HALAL/Services/Communication/Ethernet/LWIP/TCP/TcpOrderStreamParser.hpp +++ b/Inc/HALAL/Services/Communication/Ethernet/LWIP/TCP/TcpOrderStreamParser.hpp @@ -25,17 +25,26 @@ inline void process(OrderProtocol* protocol, IPV4& remote_ip, vector& s } const size_t order_size = order_it->second->get_size(); - if (order_size < sizeof(uint16_t)) { - parsed_bytes += 1; - continue; - } - if (stream_buffer.size() - parsed_bytes < order_size) { - break; + if (order_size > 0) { + if (order_size < sizeof(uint16_t)) { + parsed_bytes += 1; + continue; + } + if (stream_buffer.size() - parsed_bytes < order_size) { + break; + } } order_it->second->store_ip_order(remote_ip.string_address); Order::process_data(protocol, packet_ptr); - parsed_bytes += order_size; + + const size_t consumed = order_it->second->get_size(); + if (consumed > 0) { + parsed_bytes += consumed; + } else { + parsed_bytes += sizeof(uint16_t); + } + order_it->second->reset_for_receive(); } if (parsed_bytes > 0) { diff --git a/Inc/HALAL/Services/Diagnostics/Diagnostics.hpp b/Inc/HALAL/Services/Diagnostics/Diagnostics.hpp index 28d5c5dfe..1be5b7da3 100644 --- a/Inc/HALAL/Services/Diagnostics/Diagnostics.hpp +++ b/Inc/HALAL/Services/Diagnostics/Diagnostics.hpp @@ -245,6 +245,10 @@ class Hub { static size_t history_next_index; static array pending_records; static size_t pending_count; +#ifdef STLIB_ETH + static uint64_t last_urgent_flush_us; + static uint64_t last_normal_flush_us; +#endif }; class Runtime { @@ -258,3 +262,10 @@ class Runtime { }; } // namespace Diagnostics + +#ifdef STLIB_ETH +class OrderProtocol; +namespace Diagnostics { +void install_ethernet_sink(OrderProtocol* target); +} +#endif diff --git a/LWIP/Target/lwipopts.h b/LWIP/Target/lwipopts.h index e29c56086..e13c0541c 100644 --- a/LWIP/Target/lwipopts.h +++ b/LWIP/Target/lwipopts.h @@ -51,6 +51,10 @@ #define ETH_RX_BUFFER_SIZE 1536 /*----- Value in opt.h for LWIP_TCP_KEEPALIVE: 0 -----*/ #define LWIP_TCP_KEEPALIVE 1 +#define MEMP_NUM_SYS_TIMEOUT (LWIP_NUM_SYS_TIMEOUT_INTERNAL + 10) // + 10 just in case +#define TCP_TMR_INTERVAL 10 +#define TCP_FAST_INTERVAL 10 +#define TCP_SLOW_INTERVAL 20 #define MEMP_NUM_NETCONN 4 /*----- Value in opt.h for NO_SYS: 0 -----*/ diff --git a/Src/HALAL/Services/Communication/Ethernet/LWIP/TCP/Socket.cpp b/Src/HALAL/Services/Communication/Ethernet/LWIP/TCP/Socket.cpp index 4afcc3b97..98c7d3a4a 100644 --- a/Src/HALAL/Services/Communication/Ethernet/LWIP/TCP/Socket.cpp +++ b/Src/HALAL/Services/Communication/Ethernet/LWIP/TCP/Socket.cpp @@ -156,10 +156,7 @@ Socket::Socket( tcp_connect(connection_control_block, &remote_ip.address, remote_port, connect_callback); if (connect_error != ERR_OK && connect_error != ERR_ISCONN) { connecting_sockets.erase(remote_node); - tcp_abort(connection_control_block); - connection_control_block = nullptr; - PANIC("Cannot connect TCP socket. Error code: %d", connect_error); - return; + pending_connection_reset = true; } if (std::find(OrderProtocol::sockets.begin(), OrderProtocol::sockets.end(), this) == diff --git a/Src/HALAL/Services/Diagnostics/DiagnosticSinks.cpp b/Src/HALAL/Services/Diagnostics/DiagnosticSinks.cpp index 0371b6618..73f224c52 100644 --- a/Src/HALAL/Services/Diagnostics/DiagnosticSinks.cpp +++ b/Src/HALAL/Services/Diagnostics/DiagnosticSinks.cpp @@ -68,6 +68,7 @@ class UartDiagnosticSink final : public DiagnosticSink { #ifdef STLIB_ETH class DiagnosticTransportOrder final : public Order { public: + static inline OrderProtocol* forward_target = nullptr; DiagnosticTransportOrder() { Packet::packets[DIAGNOSTIC_FAULT_ORDER_ID] = this; Packet::packets[DIAGNOSTIC_WARNING_ORDER_ID] = this; @@ -94,15 +95,48 @@ class DiagnosticTransportOrder final : public Order { void set_callback(void (*callback)(void)) override { this->callback = callback; } - void process() override { - if (callback != nullptr) { - callback(); - } - } - void parse(OrderProtocol* socket, uint8_t* data) override { (void)socket; - (void)data; + if (data == nullptr) + return; + + memcpy(&id, data, sizeof(id)); + + size_t off = sizeof(id) + sizeof(uint8_t); + kind = data[off++]; + + size_t len = bounded_strnlen(reinterpret_cast(data + off), sizeof(origin) - 1); + memcpy(origin, data + off, len); + origin[len] = '\0'; + off += len + 1; + + len = bounded_strnlen(reinterpret_cast(data + off), sizeof(message) - 1); + memcpy(message, data + off, len); + message[len] = '\0'; + off += len + 1; + + memcpy( + &counter, + data + off, + sizeof(counter) + sizeof(second) + sizeof(minute) + sizeof(hour) + sizeof(day) + + sizeof(month) + sizeof(year) + ); + size = off + sizeof(counter) + sizeof(second) + sizeof(minute) + sizeof(hour) + + sizeof(day) + sizeof(month) + sizeof(year); + } + + void process() override { + switch (id) { + case DIAGNOSTIC_FAULT_ORDER_ID: + Hub::publish_runtime_fault(message, false, 0, origin, ""); + break; + case DIAGNOSTIC_WARNING_ORDER_ID: + Hub::publish_runtime_warning(message, false, 0, origin, ""); + break; + case DIAGNOSTIC_OK_ORDER_ID: + Hub::publish_runtime_info(message, false, 0, origin, ""); + break; + } } uint8_t* build() override { @@ -128,6 +162,8 @@ class DiagnosticTransportOrder final : public Order { size_t get_size() override { return size; } + void reset_for_receive() { size = 0; } + uint16_t get_id() override { return id; } void set_pointer(size_t index, void* pointer) override { @@ -191,22 +227,25 @@ class DiagnosticTransportOrder final : public Order { class OrderProtocolDiagnosticSink final : public DiagnosticSink { public: + explicit OrderProtocolDiagnosticSink(OrderProtocol* target = nullptr) : target_socket(target) { + DiagnosticTransportOrder::forward_target = target; + } + bool publish(const DiagnosticRecord& record) override { + if (target_socket == nullptr) { + return false; + } char description[Config::formatted_message_capacity + 1]{}; DiagnosticFormatter::describe(record, description, sizeof(description)); transport_order.set_record(record, description); - - bool delivered = false; - for (OrderProtocol* socket : OrderProtocol::sockets) { - if (socket == nullptr) { - continue; - } - delivered = socket->send_order(transport_order) || delivered; - } + transport_order.build(); + bool delivered = target_socket->send_order(transport_order); + transport_order.reset_for_receive(); return delivered; } private: + OrderProtocol* target_socket; DiagnosticTransportOrder transport_order{}; }; #endif @@ -222,11 +261,13 @@ void Runtime::install_default_sinks() { (void)Hub::emplace_sink(); #endif -#ifdef STLIB_ETH - (void)Hub::emplace_sink(); -#endif - defaults_installed = true; } +#ifdef STLIB_ETH +void install_ethernet_sink(OrderProtocol* target) { + (void)Hub::emplace_sink(target); +} +#endif + } // namespace Diagnostics diff --git a/Src/HALAL/Services/Diagnostics/DiagnosticsHub.cpp b/Src/HALAL/Services/Diagnostics/DiagnosticsHub.cpp index 9bad45779..0553297f5 100644 --- a/Src/HALAL/Services/Diagnostics/DiagnosticsHub.cpp +++ b/Src/HALAL/Services/Diagnostics/DiagnosticsHub.cpp @@ -1,4 +1,7 @@ #include "HALAL/Services/Diagnostics/Diagnostics.hpp" +#ifdef STLIB_ETH +#include "HALAL/Services/Time/Scheduler.hpp" +#endif namespace Diagnostics { @@ -10,6 +13,10 @@ size_t Hub::history_count = 0; size_t Hub::history_next_index = 0; array Hub::pending_records = {}; size_t Hub::pending_count = 0; +#ifdef STLIB_ETH +uint64_t Hub::last_urgent_flush_us = 0; +uint64_t Hub::last_normal_flush_us = 0; +#endif bool Runtime::defaults_installed = false; namespace { @@ -297,6 +304,10 @@ void Hub::publish_protection_event( void Hub::flush_pending(bool urgent_only) { const uint8_t target_mask = sink_count == 0 ? 0 : static_cast((1u << sink_count) - 1u); +#ifdef STLIB_ETH + const uint64_t now = Scheduler::get_global_tick(); + uint64_t& last_flush = urgent_only ? last_urgent_flush_us : last_normal_flush_us; +#endif for (size_t record_index = 0; record_index < pending_count;) { PendingRecord& pending_record = pending_records[record_index]; @@ -305,6 +316,13 @@ void Hub::flush_pending(bool urgent_only) { continue; } +#ifdef STLIB_ETH + if (last_flush != 0 && now - last_flush < 100'000) { + record_index++; + continue; + } +#endif + for (size_t sink_index = 0; sink_index < sink_count; ++sink_index) { const uint8_t sink_mask = static_cast(1u << sink_index); if ((pending_record.delivered_mask & sink_mask) != 0u) { @@ -312,6 +330,9 @@ void Hub::flush_pending(bool urgent_only) { } if (sinks[sink_index] != nullptr && sinks[sink_index]->publish(pending_record.record)) { pending_record.delivered_mask |= sink_mask; +#ifdef STLIB_ETH + last_flush = now; +#endif } }