Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changesets/ethernet-diagnostics-forwarding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
release: minor
summary: Add variable-length TCP orders, diagnostic forwarding, and flush throttling

- `TcpOrderStreamParser` now supports variable-length orders: when an order's `get_size()` returns 0, all remaining stream data is passed to `parse()`, and the post-parse size is used for byte consumption. A `reset_for_receive()` hook is called after consumption so orders can reset state for the next packet.
- `Order` gains a virtual `reset_for_receive()` no-op that subclasses can override.
- `Socket` constructor no longer panics on `ERR_RTE` during `tcp_connect()` — sets `pending_connection_reset` instead, letting the connection retry when the route becomes available.
- `OrderProtocolDiagnosticSink` now accepts a target `OrderProtocol*` via its constructor, so diagnostic records are sent to a single socket instead of broadcasting to all open sockets.
- `DiagnosticTransportOrder` now parses incoming diagnostic orders (IDs 1555, 2555, 3000) and re-publishes them via `Hub::publish_runtime_*()`, enabling forwarding of remote board diagnostics to the control station.
- `Diagnostics::Hub::flush_pending` throttles records to at most one per 100 ms per priority level (urgent and non-urgent tracked separately) to prevent flooding sinks.
- Default TCP keepalive values tightened: 30 ms idle, 20 ms interval, 2 probes (commented out for now since it can't keep connection with the backend).
- `lwipopts.h`: `MEMP_NUM_SYS_TIMEOUT` increased by 10 to avoid pool exhaustion with SNTP and whatever else it may happen, `TCP_TMR_INTERVAL` reduced to 10 ms for faster keepalive response.
- Added `Diagnostics::install_ethernet_sink(OrderProtocol*)` to register the Ethernet sink with a specific target socket.
1 change: 1 addition & 0 deletions Inc/HALAL/Models/Packets/Order.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Order : public Packet {
virtual void set_callback(void (*callback)(void)) = 0;
virtual void process() = 0;
virtual void parse(OrderProtocol* socket, uint8_t* data) = 0;
virtual void reset_for_receive() {}
void store_ip_order(string& ip) { remote_ip = &ip; }
void parse(uint8_t* data) override { parse(nullptr, data); }
static void process_by_id(uint16_t id) {
Expand Down
4 changes: 4 additions & 0 deletions Inc/HALAL/Services/Communication/Ethernet/LWIP/Ethernet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#define TCP_KEEPALIVE_TRIES_UNTIL_DISCONNECTION 10
#define TCP_SPACE_BETWEEN_KEEPALIVE_TRIES_MS 100

// #define TCP_INACTIVITY_TIME_UNTIL_KEEPALIVE_MS 30
// #define TCP_KEEPALIVE_TRIES_UNTIL_DISCONNECTION 2
// #define TCP_SPACE_BETWEEN_KEEPALIVE_TRIES_MS 20

class Ethernet {
public:
static bool is_ready;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@ inline void process(OrderProtocol* protocol, IPV4& remote_ip, vector<uint8_t>& s
}

const size_t order_size = order_it->second->get_size();
if (order_size < sizeof(uint16_t)) {
parsed_bytes += 1;
continue;
}
if (stream_buffer.size() - parsed_bytes < order_size) {
break;
if (order_size > 0) {
if (order_size < sizeof(uint16_t)) {
parsed_bytes += 1;
continue;
}
if (stream_buffer.size() - parsed_bytes < order_size) {
break;
}
}

order_it->second->store_ip_order(remote_ip.string_address);
Order::process_data(protocol, packet_ptr);
parsed_bytes += order_size;

const size_t consumed = order_it->second->get_size();
if (consumed > 0) {
parsed_bytes += consumed;
} else {
parsed_bytes += sizeof(uint16_t);
}
order_it->second->reset_for_receive();
}

if (parsed_bytes > 0) {
Expand Down
11 changes: 11 additions & 0 deletions Inc/HALAL/Services/Diagnostics/Diagnostics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ class Hub {
static size_t history_next_index;
static array<PendingRecord, Config::pending_capacity> pending_records;
static size_t pending_count;
#ifdef STLIB_ETH
static uint64_t last_urgent_flush_us;
static uint64_t last_normal_flush_us;
#endif
};

class Runtime {
Expand All @@ -258,3 +262,10 @@ class Runtime {
};

} // namespace Diagnostics

#ifdef STLIB_ETH
class OrderProtocol;
namespace Diagnostics {
void install_ethernet_sink(OrderProtocol* target);
}
#endif
4 changes: 4 additions & 0 deletions LWIP/Target/lwipopts.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
#define ETH_RX_BUFFER_SIZE 1536
/*----- Value in opt.h for LWIP_TCP_KEEPALIVE: 0 -----*/
#define LWIP_TCP_KEEPALIVE 1
#define MEMP_NUM_SYS_TIMEOUT (LWIP_NUM_SYS_TIMEOUT_INTERNAL + 10) // + 10 just in case
#define TCP_TMR_INTERVAL 10
#define TCP_FAST_INTERVAL 10
#define TCP_SLOW_INTERVAL 20

#define MEMP_NUM_NETCONN 4
/*----- Value in opt.h for NO_SYS: 0 -----*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@ Socket::Socket(
tcp_connect(connection_control_block, &remote_ip.address, remote_port, connect_callback);
if (connect_error != ERR_OK && connect_error != ERR_ISCONN) {
connecting_sockets.erase(remote_node);
tcp_abort(connection_control_block);
connection_control_block = nullptr;
PANIC("Cannot connect TCP socket. Error code: %d", connect_error);
return;
pending_connection_reset = true;
}

if (std::find(OrderProtocol::sockets.begin(), OrderProtocol::sockets.end(), this) ==
Expand Down
79 changes: 60 additions & 19 deletions Src/HALAL/Services/Diagnostics/DiagnosticSinks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class UartDiagnosticSink final : public DiagnosticSink {
#ifdef STLIB_ETH
class DiagnosticTransportOrder final : public Order {
public:
static inline OrderProtocol* forward_target = nullptr;
DiagnosticTransportOrder() {
Packet::packets[DIAGNOSTIC_FAULT_ORDER_ID] = this;
Packet::packets[DIAGNOSTIC_WARNING_ORDER_ID] = this;
Expand All @@ -94,15 +95,48 @@ class DiagnosticTransportOrder final : public Order {

void set_callback(void (*callback)(void)) override { this->callback = callback; }

void process() override {
if (callback != nullptr) {
callback();
}
}

void parse(OrderProtocol* socket, uint8_t* data) override {
(void)socket;
(void)data;
if (data == nullptr)
return;

memcpy(&id, data, sizeof(id));

size_t off = sizeof(id) + sizeof(uint8_t);
kind = data[off++];

size_t len = bounded_strnlen(reinterpret_cast<char*>(data + off), sizeof(origin) - 1);
memcpy(origin, data + off, len);
origin[len] = '\0';
off += len + 1;

len = bounded_strnlen(reinterpret_cast<char*>(data + off), sizeof(message) - 1);
memcpy(message, data + off, len);
message[len] = '\0';
off += len + 1;

memcpy(
&counter,
data + off,
sizeof(counter) + sizeof(second) + sizeof(minute) + sizeof(hour) + sizeof(day) +
sizeof(month) + sizeof(year)
);
size = off + sizeof(counter) + sizeof(second) + sizeof(minute) + sizeof(hour) +
sizeof(day) + sizeof(month) + sizeof(year);
}

void process() override {
switch (id) {
case DIAGNOSTIC_FAULT_ORDER_ID:
Hub::publish_runtime_fault(message, false, 0, origin, "");
break;
case DIAGNOSTIC_WARNING_ORDER_ID:
Hub::publish_runtime_warning(message, false, 0, origin, "");
break;
case DIAGNOSTIC_OK_ORDER_ID:
Hub::publish_runtime_info(message, false, 0, origin, "");
break;
}
}

uint8_t* build() override {
Expand All @@ -128,6 +162,8 @@ class DiagnosticTransportOrder final : public Order {

size_t get_size() override { return size; }

void reset_for_receive() { size = 0; }

uint16_t get_id() override { return id; }

void set_pointer(size_t index, void* pointer) override {
Expand Down Expand Up @@ -191,22 +227,25 @@ class DiagnosticTransportOrder final : public Order {

class OrderProtocolDiagnosticSink final : public DiagnosticSink {
public:
explicit OrderProtocolDiagnosticSink(OrderProtocol* target = nullptr) : target_socket(target) {
DiagnosticTransportOrder::forward_target = target;
}

bool publish(const DiagnosticRecord& record) override {
if (target_socket == nullptr) {
return false;
}
char description[Config::formatted_message_capacity + 1]{};
DiagnosticFormatter::describe(record, description, sizeof(description));
transport_order.set_record(record, description);

bool delivered = false;
for (OrderProtocol* socket : OrderProtocol::sockets) {
if (socket == nullptr) {
continue;
}
delivered = socket->send_order(transport_order) || delivered;
}
transport_order.build();
bool delivered = target_socket->send_order(transport_order);
transport_order.reset_for_receive();
return delivered;
}

private:
OrderProtocol* target_socket;
DiagnosticTransportOrder transport_order{};
};
#endif
Expand All @@ -222,11 +261,13 @@ void Runtime::install_default_sinks() {
(void)Hub::emplace_sink<UartDiagnosticSink>();
#endif

#ifdef STLIB_ETH
(void)Hub::emplace_sink<OrderProtocolDiagnosticSink>();
#endif

defaults_installed = true;
}

#ifdef STLIB_ETH
void install_ethernet_sink(OrderProtocol* target) {
(void)Hub::emplace_sink<OrderProtocolDiagnosticSink>(target);
}
#endif

} // namespace Diagnostics
21 changes: 21 additions & 0 deletions Src/HALAL/Services/Diagnostics/DiagnosticsHub.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include "HALAL/Services/Diagnostics/Diagnostics.hpp"
#ifdef STLIB_ETH
#include "HALAL/Services/Time/Scheduler.hpp"
#endif

namespace Diagnostics {

Expand All @@ -10,6 +13,10 @@ size_t Hub::history_count = 0;
size_t Hub::history_next_index = 0;
array<Hub::PendingRecord, Config::pending_capacity> Hub::pending_records = {};
size_t Hub::pending_count = 0;
#ifdef STLIB_ETH
uint64_t Hub::last_urgent_flush_us = 0;
uint64_t Hub::last_normal_flush_us = 0;
#endif
bool Runtime::defaults_installed = false;

namespace {
Expand Down Expand Up @@ -297,6 +304,10 @@ void Hub::publish_protection_event(

void Hub::flush_pending(bool urgent_only) {
const uint8_t target_mask = sink_count == 0 ? 0 : static_cast<uint8_t>((1u << sink_count) - 1u);
#ifdef STLIB_ETH
const uint64_t now = Scheduler::get_global_tick();
uint64_t& last_flush = urgent_only ? last_urgent_flush_us : last_normal_flush_us;
#endif

for (size_t record_index = 0; record_index < pending_count;) {
PendingRecord& pending_record = pending_records[record_index];
Expand All @@ -305,13 +316,23 @@ void Hub::flush_pending(bool urgent_only) {
continue;
}

#ifdef STLIB_ETH
if (last_flush != 0 && now - last_flush < 100'000) {
record_index++;
continue;
}
#endif

for (size_t sink_index = 0; sink_index < sink_count; ++sink_index) {
const uint8_t sink_mask = static_cast<uint8_t>(1u << sink_index);
if ((pending_record.delivered_mask & sink_mask) != 0u) {
continue;
}
if (sinks[sink_index] != nullptr && sinks[sink_index]->publish(pending_record.record)) {
pending_record.delivered_mask |= sink_mask;
#ifdef STLIB_ETH
last_flush = now;
#endif
}
}

Expand Down
Loading