Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/mqtt_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock,
BROKER_SOCKET_T* client_sock)
{
BROKER_SOCKET_T fd;
#ifdef SO_NOSIGPIPE
int on = 1;
#endif
(void)ctx;

fd = accept(listen_sock, NULL, NULL);
Expand All @@ -644,6 +647,14 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock,
close(fd);
return MQTT_CODE_ERROR_SYSTEM;
}
#ifdef SO_NOSIGPIPE
/* macOS / BSDs: suppress SIGPIPE on writes to a peer-closed socket.
* Without this (and without MSG_NOSIGNAL in send()), a client that
* publishes QoS>0 and immediately closes its socket would cause the
* broker's PUBACK/PUBREC write to deliver SIGPIPE, terminating the
* broker. Linux uses MSG_NOSIGNAL in BrokerPosix_Write instead. */
(void)setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
#endif
*client_sock = fd;
return MQTT_CODE_SUCCESS;
}
Expand Down Expand Up @@ -714,7 +725,16 @@ static int BrokerPosix_Write(void* ctx, BROKER_SOCKET_T sock,
return MQTT_CODE_ERROR_NETWORK;
}

/* MSG_NOSIGNAL (Linux/BSDs that define it) prevents SIGPIPE delivery when
* the peer has already closed the connection - the syscall just returns
* EPIPE and we treat it as a normal network error. Platforms without
* MSG_NOSIGNAL (e.g. macOS) rely on the SO_NOSIGPIPE socket option set
* in BrokerPosix_Accept. */
#ifdef MSG_NOSIGNAL
rc = (int)send(sock, buf, (size_t)buf_len, MSG_NOSIGNAL);
#else
rc = (int)send(sock, buf, (size_t)buf_len, 0);
#endif
if (rc <= 0) {
if (rc < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
return MQTT_CODE_CONTINUE;
Expand Down Expand Up @@ -4906,6 +4926,13 @@ int wolfmqtt_broker(int argc, char** argv)
g_broker_shutdown = 0;
signal(SIGINT, broker_signal_handler);
signal(SIGTERM, broker_signal_handler);
/* Belt-and-suspenders for the SIGPIPE-on-peer-close path. The socket
* layer already uses MSG_NOSIGNAL / SO_NOSIGPIPE per platform, but
* ignore SIGPIPE process-wide too so any reused or custom net callback
* cannot kill the broker on a write to a closed peer. */
#ifdef SIGPIPE
signal(SIGPIPE, SIG_IGN);
#endif

rc = MqttBroker_Start(&broker);
if (rc == MQTT_CODE_SUCCESS) {
Expand Down
280 changes: 280 additions & 0 deletions tests/test_broker_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,281 @@ TEST(qos2_pubrel_unknown_id_still_pubcomps)
MqttBroker_Free(&broker);
}

/* Regression for the SIGPIPE-on-PUBREC-write bug. When a publisher sent a
* QoS 2 PUBLISH and immediately closed its socket, the broker's subsequent
* write of PUBREC into the peer-closed socket would deliver SIGPIPE and
* terminate the broker process (default SIGPIPE disposition is SIGTERM).
* The fix uses MSG_NOSIGNAL (Linux/BSDs) and SO_NOSIGPIPE (macOS) on the
* broker's POSIX socket layer, plus an explicit signal(SIGPIPE, SIG_IGN)
* in the standalone broker main as belt-and-suspenders.
*
* The mock-net used by these unit tests never generates SIGPIPE - this
* test pins the protocol-level state-machine path (orphaned subscriber +
* publisher publishes QoS 2 + immediate DISCONNECT) so a future regression
* in the QoS 2 dispatch is caught alongside the wire-level SIGPIPE fix
* verified end-to-end with the paho reproducer the reporter provided. */
TEST(qos2_publish_with_offline_durable_subscriber)
{
MqttBroker broker;
MqttBrokerNet net;
int i;
int pub_connacks;
int pub_pubrecs;

/* Subscriber CONNECT: ClientId "S", clean_session=0 (flags=0x00). */
static const byte connect_sub[] = {
0x10, 0x0D,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x04, 0x00, 0x00, 0x3C,
0x00, 0x01, 'S'
};
/* SUBSCRIBE packet_id=1, filter "x", QoS 2. */
static const byte subscribe_x[] = {
0x82, 0x06,
0x00, 0x01,
0x00, 0x01, 'x',
0x02
};
/* Clean DISCONNECT. */
static const byte disconnect[] = { 0xE0, 0x00 };
/* Publisher CONNECT: ClientId "P", clean_session=0. */
static const byte connect_pub[] = {
0x10, 0x0D,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x04, 0x00, 0x00, 0x3C,
0x00, 0x01, 'P'
};
/* PUBLISH QoS 2, packet_id=7, topic "x", payload "p".
* remain = 2+1+2+1 = 6 */
static const byte publish[] = {
0x34, 0x06,
0x00, 0x01, 'x',
0x00, 0x07,
'p'
};

install_mock_net(&net);
XMEMSET(&broker, 0, sizeof(broker));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker));

/* Phase 1: subscriber connects, subscribes, then disconnects cleanly.
* After this the broker should have an orphaned sub with client=NULL
* and client_id="S" still in broker->subs. */
reset_mock_clients(2);
mock_client_input_append(0, connect_sub, sizeof(connect_sub));
mock_client_input_append(0, subscribe_x, sizeof(subscribe_x));
mock_client_input_append(0, disconnect, sizeof(disconnect));
for (i = 0; i < 16; i++) {
MqttBroker_Step(&broker);
if (g_clients[0].closed) {
break;
}
}
ASSERT_TRUE(g_clients[0].closed);

/* Phase 2: publisher connects, publishes QoS 2, and disconnects - all
* bytes appended in one shot so the broker may read PUBLISH + DISCONNECT
* back-to-back with no Step() between, exactly as the paho client does
* without a sleep before disconnect(). */
mock_client_input_append(1, connect_pub, sizeof(connect_pub));
mock_client_input_append(1, publish, sizeof(publish));
mock_client_input_append(1, disconnect, sizeof(disconnect));
for (i = 0; i < 32; i++) {
MqttBroker_Step(&broker);
if (g_clients[1].closed) {
break;
}
}
ASSERT_TRUE(g_clients[1].closed);

/* Sanity: the broker did process the PUBLISH (one PUBREC out) before
* tearing down the publisher. The orphaned sub is offline so no
* forwarded PUBLISH on g_clients[0]. The strong assertion is that
* MqttBroker_Stop/Free do not crash or trip ASan below. */
pub_connacks = count_packets_of_type(g_clients[1].out_buf,
g_clients[1].out_len, MQTT_PACKET_TYPE_CONNECT_ACK);
pub_pubrecs = count_packets_of_type(g_clients[1].out_buf,
g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC);
ASSERT_EQ(1, pub_connacks);
ASSERT_EQ(1, pub_pubrecs);

MqttBroker_Stop(&broker);
MqttBroker_Free(&broker);
}

/* Same crash scenario as above but the publisher's TCP goes away abruptly
* (read returns network error) instead of sending a clean DISCONNECT.
* Exercises the BrokerClient_AbnormalClose branch. */
TEST(qos2_publish_then_abrupt_close_offline_subscriber)
{
MqttBroker broker;
MqttBrokerNet net;
int i;
int pub_pubrecs;
MockClient* mc;

static const byte connect_sub[] = {
0x10, 0x0D,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x04, 0x00, 0x00, 0x3C,
0x00, 0x01, 'S'
};
static const byte subscribe_x[] = {
0x82, 0x06, 0x00, 0x01, 0x00, 0x01, 'x', 0x02
};
static const byte disconnect[] = { 0xE0, 0x00 };
static const byte connect_pub[] = {
0x10, 0x0D,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x04, 0x00, 0x00, 0x3C,
0x00, 0x01, 'P'
};
static const byte publish[] = {
0x34, 0x06, 0x00, 0x01, 'x', 0x00, 0x07, 'p'
};

install_mock_net(&net);
XMEMSET(&broker, 0, sizeof(broker));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker));

reset_mock_clients(2);
mock_client_input_append(0, connect_sub, sizeof(connect_sub));
mock_client_input_append(0, subscribe_x, sizeof(subscribe_x));
mock_client_input_append(0, disconnect, sizeof(disconnect));
for (i = 0; i < 16; i++) {
MqttBroker_Step(&broker);
if (g_clients[0].closed) break;
}
ASSERT_TRUE(g_clients[0].closed);

/* Publisher: CONNECT + PUBLISH, then no more bytes available. Drain the
* input by setting in_pos = in_len after the PUBLISH; subsequent reads
* return TIMEOUT, which the broker's keepalive check would eventually
* trip - but more importantly, marking the mock as `closed` mid-stream
* causes the read to return TIMEOUT and then the next call to fail.
* Instead we simulate an abrupt FIN by appending zero further bytes and
* letting the in_buf drain, then forcing the mock to report closed so
* the next mock_read returns immediately. */
mock_client_input_append(1, connect_pub, sizeof(connect_pub));
mock_client_input_append(1, publish, sizeof(publish));
/* Drive enough Steps to process CONNECT and PUBLISH. */
for (i = 0; i < 8; i++) {
MqttBroker_Step(&broker);
}
/* Now simulate the peer hanging up: mark the mock as closed so any
* further mock_read returns TIMEOUT immediately. The broker's keepalive
* with our mocked WOLFMQTT_BROKER_GET_TIME_S()==0 won't fire, so the
* test relies on the publisher having no more outgoing handshake bytes
* after PUBREC. The strong assertion is that ASan finds no UAF when we
* tear down the broker while the publisher is still in qos2_pending. */
mc = &g_clients[1];
mc->closed = 1;
for (i = 0; i < 4; i++) {
MqttBroker_Step(&broker);
}

pub_pubrecs = count_packets_of_type(g_clients[1].out_buf,
g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC);
ASSERT_EQ(1, pub_pubrecs);

/* The broker did not detect the close (no read failure), so the
* publisher's BrokerClient is still in the list with packet_id=7 in
* qos2_pending. MqttBroker_Free must walk that list and free the QoS 2
* state without crashing. */
MqttBroker_Stop(&broker);
MqttBroker_Free(&broker);
}

#ifdef WOLFMQTT_V5
/* v5 variant: same orphan-then-publish-then-disconnect pattern but the
* PUBLISH carries a property block. Tests the suspected use-after-free in
* the fan-out path where pub.props is shared across subscribers and then
* freed at the end of BrokerHandle_Publish. With only an orphaned sub
* matching, the fan-out body does not execute, but the broker's MqttClient
* still has to decode and free the props. */
TEST(qos2_publish_v5_props_with_offline_durable_subscriber)
{
MqttBroker broker;
MqttBrokerNet net;
int i;
int pub_pubrecs;

/* v5 CONNECT subscriber, ClientId "S", clean_session=0, props_len=0.
* remain = MQTT_hdr(6)+level(1)+flags(1)+keepalive(2)+props_len(1)+
* clientid_hdr(2)+clientid(1) = 14 */
static const byte connect_sub_v5[] = {
0x10, 0x0E,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x05, 0x00, 0x00, 0x3C,
0x00,
0x00, 0x01, 'S'
};
/* v5 SUBSCRIBE packet_id=1, props_len=0, filter "x", options=QoS 2. */
static const byte subscribe_v5[] = {
0x82, 0x07,
0x00, 0x01,
0x00,
0x00, 0x01, 'x',
0x02
};
/* v5 DISCONNECT (0xE0 0x00 with no reason / props - allowed). */
static const byte disconnect_v5[] = { 0xE0, 0x00 };
/* v5 CONNECT publisher, ClientId "P". remain = 14 (same as subscriber). */
static const byte connect_pub_v5[] = {
0x10, 0x0E,
0x00, 0x04, 'M', 'Q', 'T', 'T',
0x05, 0x00, 0x00, 0x3C,
0x00,
0x00, 0x01, 'P'
};
/* v5 PUBLISH QoS 2 packet_id=7, topic "x", one Message Expiry Interval
* property (id=0x02, 4-byte int=60), payload "p".
* remain = topic_hdr(2)+topic(1)+packet_id(2)+prop_len_vbi(1)+prop(5)+
* payload(1) = 12 */
static const byte publish_v5[] = {
0x34, 0x0C,
0x00, 0x01, 'x',
0x00, 0x07,
0x05, /* prop block length = 5 */
0x02, 0x00, 0x00, 0x00, 0x3C, /* Message Expiry Interval = 60 */
'p'
};

install_mock_net(&net);
XMEMSET(&broker, 0, sizeof(broker));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Init(&broker, &net));
ASSERT_EQ(MQTT_CODE_SUCCESS, MqttBroker_Start(&broker));

reset_mock_clients(2);
mock_client_input_append(0, connect_sub_v5, sizeof(connect_sub_v5));
mock_client_input_append(0, subscribe_v5, sizeof(subscribe_v5));
mock_client_input_append(0, disconnect_v5, sizeof(disconnect_v5));
for (i = 0; i < 16; i++) {
MqttBroker_Step(&broker);
if (g_clients[0].closed) break;
}
ASSERT_TRUE(g_clients[0].closed);

mock_client_input_append(1, connect_pub_v5, sizeof(connect_pub_v5));
mock_client_input_append(1, publish_v5, sizeof(publish_v5));
mock_client_input_append(1, disconnect_v5, sizeof(disconnect_v5));
for (i = 0; i < 32; i++) {
MqttBroker_Step(&broker);
if (g_clients[1].closed) break;
}
ASSERT_TRUE(g_clients[1].closed);

pub_pubrecs = count_packets_of_type(g_clients[1].out_buf,
g_clients[1].out_len, MQTT_PACKET_TYPE_PUBLISH_REC);
ASSERT_EQ(1, pub_pubrecs);

MqttBroker_Stop(&broker);
MqttBroker_Free(&broker);
}
#endif /* WOLFMQTT_V5 */

/* MQTT 3.1.1 section 3.12 / v5 section 3.12: PINGREQ has no variable header and no
* payload, so Remaining Length MUST be 0. Broker dispatch must reject a
* malformed PINGREQ with an abnormal close instead of emitting a
Expand Down Expand Up @@ -2156,6 +2431,11 @@ int main(int argc, char** argv)
RUN_TEST(qos2_inbound_cap_reached_disconnects);
RUN_TEST(qos2_state_freed_on_client_disconnect);
RUN_TEST(qos2_pubrel_unknown_id_still_pubcomps);
RUN_TEST(qos2_publish_with_offline_durable_subscriber);
RUN_TEST(qos2_publish_then_abrupt_close_offline_subscriber);
#ifdef WOLFMQTT_V5
RUN_TEST(qos2_publish_v5_props_with_offline_durable_subscriber);
#endif
RUN_TEST(pingreq_valid_emits_pingresp);
RUN_TEST(pingreq_nonzero_remain_len_closes_no_pingresp);
#ifndef WOLFMQTT_V5
Expand Down
Loading