From 560bbfd15ca1e5f6031a163cdad5a6f885b7771a Mon Sep 17 00:00:00 2001 From: David Garske Date: Wed, 27 May 2026 11:14:59 -0700 Subject: [PATCH] Fix broker disconnect to better handle SIGPIPE --- src/mqtt_broker.c | 27 ++++ tests/test_broker_connect.c | 280 ++++++++++++++++++++++++++++++++++++ 2 files changed, 307 insertions(+) diff --git a/src/mqtt_broker.c b/src/mqtt_broker.c index 19980930..ec36b326 100644 --- a/src/mqtt_broker.c +++ b/src/mqtt_broker.c @@ -631,6 +631,9 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock, BROKER_SOCKET_T* client_sock) { BROKER_SOCKET_T fd; +#ifdef SO_NOSIGPIPE + int on = 1; +#endif (void)ctx; fd = accept(listen_sock, NULL, NULL); @@ -644,6 +647,14 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock, close(fd); return MQTT_CODE_ERROR_SYSTEM; } +#ifdef SO_NOSIGPIPE + /* macOS / BSDs: suppress SIGPIPE on writes to a peer-closed socket. + * Without this (and without MSG_NOSIGNAL in send()), a client that + * publishes QoS>0 and immediately closes its socket would cause the + * broker's PUBACK/PUBREC write to deliver SIGPIPE, terminating the + * broker. Linux uses MSG_NOSIGNAL in BrokerPosix_Write instead. */ + (void)setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); +#endif *client_sock = fd; return MQTT_CODE_SUCCESS; } @@ -714,7 +725,16 @@ static int BrokerPosix_Write(void* ctx, BROKER_SOCKET_T sock, return MQTT_CODE_ERROR_NETWORK; } + /* MSG_NOSIGNAL (Linux/BSDs that define it) prevents SIGPIPE delivery when + * the peer has already closed the connection - the syscall just returns + * EPIPE and we treat it as a normal network error. Platforms without + * MSG_NOSIGNAL (e.g. macOS) rely on the SO_NOSIGPIPE socket option set + * in BrokerPosix_Accept. */ +#ifdef MSG_NOSIGNAL + rc = (int)send(sock, buf, (size_t)buf_len, MSG_NOSIGNAL); +#else rc = (int)send(sock, buf, (size_t)buf_len, 0); +#endif if (rc <= 0) { if (rc < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { return MQTT_CODE_CONTINUE; @@ -4906,6 +4926,13 @@ int wolfmqtt_broker(int argc, char** argv) g_broker_shutdown = 0; signal(SIGINT, broker_signal_handler); signal(SIGTERM, broker_signal_handler); + /* Belt-and-suspenders for the SIGPIPE-on-peer-close path. The socket + * layer already uses MSG_NOSIGNAL / SO_NOSIGPIPE per platform, but + * ignore SIGPIPE process-wide too so any reused or custom net callback + * cannot kill the broker on a write to a closed peer. */ +#ifdef SIGPIPE + signal(SIGPIPE, SIG_IGN); +#endif rc = MqttBroker_Start(&broker); if (rc == MQTT_CODE_SUCCESS) { diff --git a/tests/test_broker_connect.c b/tests/test_broker_connect.c index 1c4cb6c7..9cced42e 100644 --- a/tests/test_broker_connect.c +++ b/tests/test_broker_connect.c @@ -1102,6 +1102,281 @@ TEST(qos2_pubrel_unknown_id_still_pubcomps) MqttBroker_Free(&broker); } +/* Regression for the SIGPIPE-on-PUBREC-write bug. When a publisher sent a + * QoS 2 PUBLISH and immediately closed its socket, the broker's subsequent + * write of PUBREC into the peer-closed socket would deliver SIGPIPE and + * terminate the broker process (default SIGPIPE disposition is SIGTERM). + * The fix uses MSG_NOSIGNAL (Linux/BSDs) and SO_NOSIGPIPE (macOS) on the + * broker's POSIX socket layer, plus an explicit signal(SIGPIPE, SIG_IGN) + * in the standalone broker main as belt-and-suspenders. + * + * The mock-net used by these unit tests never generates SIGPIPE - this + * test pins the protocol-level state-machine path (orphaned subscriber + + * publisher publishes QoS 2 + immediate DISCONNECT) so a future regression + * in the QoS 2 dispatch is caught alongside the wire-level SIGPIPE fix + * verified end-to-end with the paho reproducer the reporter provided. */ +TEST(qos2_publish_with_offline_durable_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int pub_connacks; + int pub_pubrecs; + + /* Subscriber CONNECT: ClientId "S", clean_session=0 (flags=0x00). */ + static const byte connect_sub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x00, 0x00, 0x3C, + 0x00, 0x01, 'S' + }; + /* SUBSCRIBE packet_id=1, filter "x", QoS 2. */ + static const byte subscribe_x[] = { + 0x82, 0x06, + 0x00, 0x01, + 0x00, 0x01, 'x', + 0x02 + }; + /* Clean DISCONNECT. */ + static const byte disconnect[] = { 0xE0, 0x00 }; + /* Publisher CONNECT: ClientId "P", clean_session=0. */ + static const byte connect_pub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x00, 0x00, 0x3C, + 0x00, 0x01, 'P' + }; + /* PUBLISH QoS 2, packet_id=7, topic "x", payload "p". + * remain = 2+1+2+1 = 6 */ + static const byte publish[] = { + 0x34, 0x06, + 0x00, 0x01, 'x', + 0x00, 0x07, + 'p' + }; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + /* Phase 1: subscriber connects, subscribes, then disconnects cleanly. + * After this the broker should have an orphaned sub with client=NULL + * and client_id="S" still in broker->subs. */ + reset_mock_clients(2); + mock_client_input_append(0, connect_sub, sizeof(connect_sub)); + mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); + mock_client_input_append(0, disconnect, sizeof(disconnect)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + if (g_clients[0].closed) { + break; + } + } + ASSERT_TRUE(g_clients[0].closed); + + /* Phase 2: publisher connects, publishes QoS 2, and disconnects - all + * bytes appended in one shot so the broker may read PUBLISH + DISCONNECT + * back-to-back with no Step() between, exactly as the paho client does + * without a sleep before disconnect(). */ + mock_client_input_append(1, connect_pub, sizeof(connect_pub)); + mock_client_input_append(1, publish, sizeof(publish)); + mock_client_input_append(1, disconnect, sizeof(disconnect)); + for (i = 0; i < 32; i++) { + MqttBroker_Step(&broker); + if (g_clients[1].closed) { + break; + } + } + ASSERT_TRUE(g_clients[1].closed); + + /* Sanity: the broker did process the PUBLISH (one PUBREC out) before + * tearing down the publisher. The orphaned sub is offline so no + * forwarded PUBLISH on g_clients[0]. The strong assertion is that + * MqttBroker_Stop/Free do not crash or trip ASan below. */ + pub_connacks = count_packets_of_type(g_clients[1].out_buf, + g_clients[1].out_len, MQTT_PACKET_TYPE_CONNECT_ACK); + pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, + g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); + ASSERT_EQ(1, pub_connacks); + ASSERT_EQ(1, pub_pubrecs); + + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} + +/* Same crash scenario as above but the publisher's TCP goes away abruptly + * (read returns network error) instead of sending a clean DISCONNECT. + * Exercises the BrokerClient_AbnormalClose branch. */ +TEST(qos2_publish_then_abrupt_close_offline_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int pub_pubrecs; + MockClient* mc; + + static const byte connect_sub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x00, 0x00, 0x3C, + 0x00, 0x01, 'S' + }; + static const byte subscribe_x[] = { + 0x82, 0x06, 0x00, 0x01, 0x00, 0x01, 'x', 0x02 + }; + static const byte disconnect[] = { 0xE0, 0x00 }; + static const byte connect_pub[] = { + 0x10, 0x0D, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x04, 0x00, 0x00, 0x3C, + 0x00, 0x01, 'P' + }; + static const byte publish[] = { + 0x34, 0x06, 0x00, 0x01, 'x', 0x00, 0x07, 'p' + }; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + reset_mock_clients(2); + mock_client_input_append(0, connect_sub, sizeof(connect_sub)); + mock_client_input_append(0, subscribe_x, sizeof(subscribe_x)); + mock_client_input_append(0, disconnect, sizeof(disconnect)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + if (g_clients[0].closed) break; + } + ASSERT_TRUE(g_clients[0].closed); + + /* Publisher: CONNECT + PUBLISH, then no more bytes available. Drain the + * input by setting in_pos = in_len after the PUBLISH; subsequent reads + * return TIMEOUT, which the broker's keepalive check would eventually + * trip - but more importantly, marking the mock as `closed` mid-stream + * causes the read to return TIMEOUT and then the next call to fail. + * Instead we simulate an abrupt FIN by appending zero further bytes and + * letting the in_buf drain, then forcing the mock to report closed so + * the next mock_read returns immediately. */ + mock_client_input_append(1, connect_pub, sizeof(connect_pub)); + mock_client_input_append(1, publish, sizeof(publish)); + /* Drive enough Steps to process CONNECT and PUBLISH. */ + for (i = 0; i < 8; i++) { + MqttBroker_Step(&broker); + } + /* Now simulate the peer hanging up: mark the mock as closed so any + * further mock_read returns TIMEOUT immediately. The broker's keepalive + * with our mocked WOLFMQTT_BROKER_GET_TIME_S()==0 won't fire, so the + * test relies on the publisher having no more outgoing handshake bytes + * after PUBREC. The strong assertion is that ASan finds no UAF when we + * tear down the broker while the publisher is still in qos2_pending. */ + mc = &g_clients[1]; + mc->closed = 1; + for (i = 0; i < 4; i++) { + MqttBroker_Step(&broker); + } + + pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, + g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); + ASSERT_EQ(1, pub_pubrecs); + + /* The broker did not detect the close (no read failure), so the + * publisher's BrokerClient is still in the list with packet_id=7 in + * qos2_pending. MqttBroker_Free must walk that list and free the QoS 2 + * state without crashing. */ + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} + +#ifdef WOLFMQTT_V5 +/* v5 variant: same orphan-then-publish-then-disconnect pattern but the + * PUBLISH carries a property block. Tests the suspected use-after-free in + * the fan-out path where pub.props is shared across subscribers and then + * freed at the end of BrokerHandle_Publish. With only an orphaned sub + * matching, the fan-out body does not execute, but the broker's MqttClient + * still has to decode and free the props. */ +TEST(qos2_publish_v5_props_with_offline_durable_subscriber) +{ + MqttBroker broker; + MqttBrokerNet net; + int i; + int pub_pubrecs; + + /* v5 CONNECT subscriber, ClientId "S", clean_session=0, props_len=0. + * remain = MQTT_hdr(6)+level(1)+flags(1)+keepalive(2)+props_len(1)+ + * clientid_hdr(2)+clientid(1) = 14 */ + static const byte connect_sub_v5[] = { + 0x10, 0x0E, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x05, 0x00, 0x00, 0x3C, + 0x00, + 0x00, 0x01, 'S' + }; + /* v5 SUBSCRIBE packet_id=1, props_len=0, filter "x", options=QoS 2. */ + static const byte subscribe_v5[] = { + 0x82, 0x07, + 0x00, 0x01, + 0x00, + 0x00, 0x01, 'x', + 0x02 + }; + /* v5 DISCONNECT (0xE0 0x00 with no reason / props - allowed). */ + static const byte disconnect_v5[] = { 0xE0, 0x00 }; + /* v5 CONNECT publisher, ClientId "P". remain = 14 (same as subscriber). */ + static const byte connect_pub_v5[] = { + 0x10, 0x0E, + 0x00, 0x04, 'M', 'Q', 'T', 'T', + 0x05, 0x00, 0x00, 0x3C, + 0x00, + 0x00, 0x01, 'P' + }; + /* v5 PUBLISH QoS 2 packet_id=7, topic "x", one Message Expiry Interval + * property (id=0x02, 4-byte int=60), payload "p". + * remain = topic_hdr(2)+topic(1)+packet_id(2)+prop_len_vbi(1)+prop(5)+ + * payload(1) = 12 */ + static const byte publish_v5[] = { + 0x34, 0x0C, + 0x00, 0x01, 'x', + 0x00, 0x07, + 0x05, /* prop block length = 5 */ + 0x02, 0x00, 0x00, 0x00, 0x3C, /* Message Expiry Interval = 60 */ + 'p' + }; + + install_mock_net(&net); + XMEMSET(&broker, 0, sizeof(broker)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net)); + ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker)); + + reset_mock_clients(2); + mock_client_input_append(0, connect_sub_v5, sizeof(connect_sub_v5)); + mock_client_input_append(0, subscribe_v5, sizeof(subscribe_v5)); + mock_client_input_append(0, disconnect_v5, sizeof(disconnect_v5)); + for (i = 0; i < 16; i++) { + MqttBroker_Step(&broker); + if (g_clients[0].closed) break; + } + ASSERT_TRUE(g_clients[0].closed); + + mock_client_input_append(1, connect_pub_v5, sizeof(connect_pub_v5)); + mock_client_input_append(1, publish_v5, sizeof(publish_v5)); + mock_client_input_append(1, disconnect_v5, sizeof(disconnect_v5)); + for (i = 0; i < 32; i++) { + MqttBroker_Step(&broker); + if (g_clients[1].closed) break; + } + ASSERT_TRUE(g_clients[1].closed); + + pub_pubrecs = count_packets_of_type(g_clients[1].out_buf, + g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC); + ASSERT_EQ(1, pub_pubrecs); + + MqttBroker_Stop(&broker); + MqttBroker_Free(&broker); +} +#endif /* WOLFMQTT_V5 */ + /* MQTT 3.1.1 section 3.12 / v5 section 3.12: PINGREQ has no variable header and no * payload, so Remaining Length MUST be 0. Broker dispatch must reject a * malformed PINGREQ with an abnormal close instead of emitting a @@ -2156,6 +2431,11 @@ int main(int argc, char** argv) RUN_TEST(qos2_inbound_cap_reached_disconnects); RUN_TEST(qos2_state_freed_on_client_disconnect); RUN_TEST(qos2_pubrel_unknown_id_still_pubcomps); + RUN_TEST(qos2_publish_with_offline_durable_subscriber); + RUN_TEST(qos2_publish_then_abrupt_close_offline_subscriber); +#ifdef WOLFMQTT_V5 + RUN_TEST(qos2_publish_v5_props_with_offline_durable_subscriber); +#endif RUN_TEST(pingreq_valid_emits_pingresp); RUN_TEST(pingreq_nonzero_remain_len_closes_no_pingresp); #ifndef WOLFMQTT_V5