Skip to content

Commit 2490f64

Browse files
authored
Feature: Implement fstring topics (#72)
* implemented writeString(const __FlashStringHelper* ...), writeString_P() * added publish(__FlashStringHelper* topic, __FlashStringHelper* payload, ...) * make all only beginPublishImpl / writeStringImpl calling functions inline * implement write_P(PGM_P string) * implemented bool subscribeImpl(bool progmem, ...) and bool unsubscribeImpl(bool progmem, ...) * added subscribe examples * added unit tests for subscribe(__FlashStringHelper) and subscribe_P() * added unit tests for __FlashStringHelper and PROGMEM topic/payload
1 parent dec23fa commit 2490f64

9 files changed

Lines changed: 627 additions & 32 deletions

File tree

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ Full API documentation is available here: https://hmueller01.github.io/pubsubcli
4545
`PubSubClient::setKeepAlive(keepAlive)`.
4646
- The client uses MQTT 3.1.1 by default. It can be changed to use MQTT 3.1 by
4747
changing value of `MQTT_VERSION` in `PubSubClient.h`.
48+
- It can publish and subscribe to `PROGMEM` or `__FlashStringHelper` topics since [v3.3.0](https://github.com/hmueller01/pubsubclient3/releases/tag/v3.3.0).
49+
Details see [mqtt_progmem](https://github.com/hmueller01/pubsubclient3/blob/63c77d764a6ba7c83868985eaeab6f07cc062874/examples/mqtt_progmem/src/mqtt_progmem.cpp#L39-L48) example.
50+
But if you like to publish `PROGMEM` topics you have to use
51+
```c
52+
const char TOPIC[] PROGMEM = "test";
53+
const char HELLO_WORLD[] PROGMEM = "hello world";
54+
client.beginPublish_P(TOPIC, strlen_P(HELLO_WORLD), MQTT_QOS0, false);
55+
client.write_P(HELLO_WORLD);
56+
client.endPublish();
57+
```
58+
as `client.publish_P(...)` is already used for `PROGMEM` payloads.
4859

4960

5061
## Compatible Hardware
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// Example sketch showing how to use PubSubClient with strings stored in PROGMEM.
2+
// This needs to be an empty file to satisfy the arduino-cli build system.
3+
// See src/mqtt_progmem.cpp for the actual example code.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
; PlatformIO Project Configuration File
2+
;
3+
; Build options: build flags, source filter
4+
; Upload options: custom upload port, speed and extra flags
5+
; Library options: dependencies, extra library storages
6+
; Advanced options: extra scripting
7+
;
8+
; Please visit documentation for the other options and examples
9+
; https://docs.platformio.org/page/projectconf.html
10+
; https://docs.platformio.org/en/latest/boards/atmelavr/uno.html
11+
12+
[platformio]
13+
description = Basic MQTT example with Authentication and Progmem strings
14+
15+
[env]
16+
framework = arduino
17+
lib_deps =
18+
arduino-libraries/Ethernet @ ^2.0.2
19+
; hmueller01/PubSubClient3 @ ^3.2.1
20+
https://github.com/hmueller01/pubsubclient3.git#implement-fstring-topics
21+
build_flags =
22+
; -D DEBUG_ESP_PORT=Serial
23+
; -D DEBUG_PUBSUBCLIENT
24+
; -D MQTT_MAX_PACKET_SIZE=512
25+
; -D MQTT_KEEPALIVE=120
26+
27+
[env:uno]
28+
platform = atmelavr
29+
board = uno
30+
31+
[env:esp8266]
32+
platform = espressif8266
33+
platform_packages = platformio/framework-arduinoespressif8266 @ https://github.com/esp8266/Arduino.git
34+
board = esp12e
35+
build_flags =
36+
${env.build_flags}
37+
-D PIO_FRAMEWORK_ARDUINO_ESPRESSIF_SDK305
38+
39+
[env:esp32]
40+
platform = espressif32
41+
board = esp32dev
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Basic MQTT example with Authentication
3+
4+
- connects to an MQTT server, providing username
5+
and password
6+
- publishes "hello world" to the topic "outTopic"
7+
- subscribes to the topic "inTopic"
8+
*/
9+
10+
#include <Ethernet.h>
11+
#include <PubSubClient.h>
12+
#include <SPI.h>
13+
14+
#define _UNUSED_ __attribute__((unused))
15+
16+
// Update these with values suitable for your network.
17+
byte mac[] = {0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED};
18+
IPAddress ip(172, 16, 0, 100);
19+
IPAddress server(172, 16, 0, 2);
20+
const char HELLO_WORLD_3[] PROGMEM = "hello world 3";
21+
const char HELLO_WORLD_4[] PROGMEM = "hello world 4";
22+
const char SUBSCRIBE_TOPIC[] PROGMEM = "inTopic1";
23+
24+
void callback(_UNUSED_ char* topic, _UNUSED_ uint8_t* payload, _UNUSED_ size_t plength) {
25+
// handle message arrived
26+
}
27+
28+
EthernetClient ethClient;
29+
PubSubClient client(server, 1883, callback, ethClient);
30+
31+
void setup() {
32+
Ethernet.begin(mac, ip);
33+
// Note - the default maximum packet size is 128 bytes. If the
34+
// combined length of clientId, username and password exceed this use the
35+
// following to increase the buffer size:
36+
// client.setBufferSize(255);
37+
38+
if (client.connect("arduinoClient", "testuser", "testpass")) {
39+
client.publish(F("outTopic"), "hello world 1", MQTT_QOS0, false);
40+
client.publish(F("outTopic"), F("hello world 2"), MQTT_QOS1, false);
41+
client.publish_P(F("outTopic"), HELLO_WORLD_3, MQTT_QOS2, false);
42+
43+
client.beginPublish(F("outTopic"), strlen_P(HELLO_WORLD_4), MQTT_QOS1, false);
44+
client.write_P(HELLO_WORLD_4);
45+
client.endPublish();
46+
47+
client.subscribe(F("inTopic"));
48+
client.subscribe_P(SUBSCRIBE_TOPIC, MQTT_QOS1);
49+
}
50+
}
51+
52+
void loop() {
53+
client.loop();
54+
}

src/PubSubClient.cpp

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,14 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t ple
508508
return false;
509509
}
510510

511+
bool PubSubClient::publish(const __FlashStringHelper* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
512+
if (beginPublish(topic, plength, qos, retained)) {
513+
size_t rc = write(payload, plength);
514+
return endPublish() && (rc == plength);
515+
}
516+
return false;
517+
}
518+
511519
bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
512520
if (beginPublish(topic, plength, qos, retained)) {
513521
size_t rc = write_P(payload, plength);
@@ -516,23 +524,47 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t p
516524
return false;
517525
}
518526

519-
bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos, bool retained) {
527+
bool PubSubClient::publish_P(const __FlashStringHelper* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
528+
if (beginPublish(topic, plength, qos, retained)) {
529+
size_t rc = write_P(payload, plength);
530+
return endPublish() && (rc == plength);
531+
}
532+
return false;
533+
}
534+
535+
/**
536+
* @brief Internal beginPublish implementation using topic stored in RAM or PROGMEM.
537+
*
538+
* @param progmem true if the topic is stored in PROGMEM/Flash, false if in RAM.
539+
* @param topic The topic to publish to.
540+
* @param plength The length of the payload.
541+
* @param qos The quality of service (\ref group_qos) to publish at. [0, 1, 2].
542+
* @param retained Publish the message with the retain flag.
543+
* @return true If the publish succeeded.
544+
* false If the publish failed, either connection lost or message too large.
545+
*/
546+
bool PubSubClient::beginPublishImpl(bool progmem, const char* topic, size_t plength, uint8_t qos, bool retained) {
520547
if (!topic) return false;
521-
if (strlen(topic) == 0) return false; // empty topic is not allowed
522-
if (qos > MQTT_QOS2) { // only valid QoS supported
548+
549+
// get topic length depending on storage (RAM vs PROGMEM)
550+
size_t topicLen = progmem ? strlen_P(topic) : strlen(topic);
551+
if (topicLen == 0) return false; // empty topic is not allowed
552+
553+
if (qos > MQTT_QOS2) { // only valid QoS supported
523554
ERROR_PSC_PRINTF_P("beginPublish() called with invalid QoS %u\n", qos);
524555
return false;
525556
}
557+
526558
const size_t nextMsgLen = (qos > MQTT_QOS0) ? 2 : 0; // add 2 bytes for nextMsgId if QoS > 0
527559
// check if the header, the topic (including 2 length bytes) and nextMsgId fit into the _buffer
528-
if (connected() && (MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + nextMsgLen <= _bufferSize)) {
560+
if (connected() && (MQTT_MAX_HEADER_SIZE + topicLen + 2 + nextMsgLen <= _bufferSize)) {
529561
// first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the _buffer
530-
size_t topicLen = writeString(topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
562+
topicLen = writeStringImpl(progmem, topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
531563
if (qos > MQTT_QOS0) {
532564
// if QoS 1 or 2, we need to send the nextMsgId (packet identifier) after topic
533565
writeNextMsgId(MQTT_MAX_HEADER_SIZE + topicLen);
534566
}
535-
// we now know the length of the topic string (lenght + 2 bytes signalling the length) and can build the variable header information
567+
// we now know the length of the topic string (length + 2 bytes signalling the length) and can build the variable header information
536568
const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
537569
uint8_t hdrLen = buildHeader(header, topicLen + nextMsgLen + plength);
538570
if (hdrLen == 0) return false; // exit here in case of header generation failure
@@ -579,7 +611,7 @@ uint8_t PubSubClient::buildHeader(uint8_t header, size_t length) {
579611
} while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));
580612

581613
if (len > 0) {
582-
ERROR_PSC_PRINTF_P("buildHeader() length too big %zu, left %zu\n", length, len);
614+
ERROR_PSC_PRINTF_P("buildHeader: header=0x%02X, length too big %zu, left %zu\n", header, length, len);
583615
return 0;
584616
}
585617

@@ -612,7 +644,7 @@ size_t PubSubClient::write_P(const uint8_t* buf, size_t size) {
612644
*
613645
* @param header Header byte, e.g. MQTTCONNECT, MQTTPUBLISH, MQTTSUBSCRIBE, MQTTUNSUBSCRIBE.
614646
* @param length Length of _buffer to write.
615-
* @return True if successfully sent, otherwise false if buildHeader() failed or buffer could not be written.
647+
* @return True if successfully sent, otherwise false if build header failed or buffer could not be written.
616648
*/
617649
bool PubSubClient::writeControlPacket(uint8_t header, size_t length) {
618650
uint8_t hdrLen = buildHeader(header, length);
@@ -660,30 +692,50 @@ size_t PubSubClient::writeBuffer(size_t pos, size_t size) {
660692
}
661693

662694
/**
663-
* @brief Write an UTF-8 encoded string to the internal buffer at a given position. The string can have a length of 0 to 65535 bytes (depending on size of
695+
* @brief Internal implementation of writeString using RAM or PROGMEM string.
696+
* Write an UTF-8 encoded string to the internal buffer at a given position. The string can have a length of 0 to 65535 bytes (depending on size of
664697
* internal buffer). The buffer is prefixed with two bytes representing the length of the string. See section 1.5.3 of MQTT v3.1.1 protocol specification.
665698
* @note If the string does not fit in the buffer or is longer than 65535 bytes nothing is written to the buffer and the returned position is
666699
* unchanged.
667700
*
701+
* @param progmem true if the string is stored in PROGMEM, false if in RAM.
668702
* @param string 'C' string of the data that shall be written in the buffer.
669703
* @param pos Position in the internal buffer to write the string.
670704
* @return New position in the internal buffer (pos + 2 + string length), or pos if a buffer overrun would occur or the string is a nullptr.
671705
*/
672-
size_t PubSubClient::writeString(const char* string, size_t pos) {
706+
size_t PubSubClient::writeStringImpl(bool progmem, const char* string, size_t pos) {
673707
if (!string) return pos;
674708

675-
size_t sLen = strlen(string);
709+
size_t sLen = progmem ? strlen_P(string) : strlen(string);
676710
if ((pos + 2 + sLen <= _bufferSize) && (sLen <= 0xFFFF)) {
677711
_buffer[pos++] = (uint8_t)(sLen >> 8);
678712
_buffer[pos++] = (uint8_t)(sLen & 0xFF);
679-
memcpy(_buffer + pos, string, sLen);
713+
if (progmem) {
714+
memcpy_P(_buffer + pos, string, sLen);
715+
} else {
716+
memcpy(_buffer + pos, string, sLen);
717+
}
680718
pos += sLen;
681719
} else {
682-
ERROR_PSC_PRINTF_P("writeString(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
720+
ERROR_PSC_PRINTF_P("writeStringImpl(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
683721
}
684722
return pos;
685723
}
686724

725+
/**
726+
* @brief Write an UTF-8 encoded string to the internal buffer at a given position. The string can have a length of 0 to 65535 bytes (depending on size of
727+
* internal buffer). The buffer is prefixed with two bytes representing the length of the string. See section 1.5.3 of MQTT v3.1.1 protocol specification.
728+
* @note If the string does not fit in the buffer or is longer than 65535 bytes nothing is written to the buffer and the returned position is
729+
* unchanged.
730+
*
731+
* @param string 'C' string of the data that shall be written in the buffer.
732+
* @param pos Position in the internal buffer to write the string.
733+
* @return New position in the internal buffer (pos + 2 + string length), or pos if a buffer overrun would occur or the string is a nullptr.
734+
*/
735+
inline size_t PubSubClient::writeString(const char* string, size_t pos) {
736+
return writeStringImpl(false, string, pos);
737+
}
738+
687739
/**
688740
* @brief Write nextMsgId to the internal buffer at the given position.
689741
* @note If the nextMsgId (2 bytes) does not fit in the buffer nothing is written to the buffer and the returned position is unchanged.
@@ -731,11 +783,20 @@ size_t PubSubClient::flushBuffer() {
731783
return rc;
732784
}
733785

734-
bool PubSubClient::subscribe(const char* topic, uint8_t qos) {
786+
/**
787+
* @brief Internal subscribes to messages published to the specified topic. The topic can be stored in RAM or PROGMEM.
788+
* @param progmem true if the topic is stored in PROGMEM/Flash, false if in RAM.
789+
* @param topic The topic to subscribe to.
790+
* @param qos The qos to subscribe at. [0, 1].
791+
* @return true If sending the subscribe succeeded.
792+
* false If sending the subscribe failed, either connection lost or message too large.
793+
*/
794+
bool PubSubClient::subscribeImpl(bool progmem, const char* topic, uint8_t qos) {
735795
if (!topic) return false;
736796
if (qos > MQTT_QOS1) return false; // only QoS 0 and 1 supported
737797

738-
size_t topicLen = strnlen(topic, _bufferSize);
798+
// get topic length depending on storage (RAM vs PROGMEM)
799+
size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
739800
if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
740801
// Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen + QoS (1)
741802
return false;
@@ -744,25 +805,33 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos) {
744805
// Leave room in the _buffer for header and variable length field
745806
uint16_t length = MQTT_MAX_HEADER_SIZE;
746807
length = writeNextMsgId(length); // _buffer size is checked before
747-
length = writeString(topic, length);
808+
length = writeStringImpl(progmem, topic, length);
748809
_buffer[length++] = qos;
749810
return writeControlPacket(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
750811
}
751812
return false;
752813
}
753814

754-
bool PubSubClient::unsubscribe(const char* topic) {
815+
/**
816+
* @brief Internal unsubscribes from messages published to the specified topic. The topic can be stored in RAM or PROGMEM.
817+
* @param progmem true if the topic is stored in PROGMEM/Flash, false if in RAM.
818+
* @param topic The topic to unsubscribe from.
819+
* @return true If sending the unsubscribe succeeded.
820+
* false If sending the unsubscribe failed, either connection lost or message too large.
821+
*/
822+
bool PubSubClient::unsubscribeImpl(bool progmem, const char* topic) {
755823
if (!topic) return false;
756824

757-
size_t topicLen = strnlen(topic, _bufferSize);
825+
// get topic length depending on storage (RAM vs PROGMEM)
826+
size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
758827
if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
759828
// Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen
760829
return false;
761830
}
762831
if (connected()) {
763832
uint16_t length = MQTT_MAX_HEADER_SIZE;
764833
length = writeNextMsgId(length); // _buffer size is checked before
765-
length = writeString(topic, length);
834+
length = writeStringImpl(progmem, topic, length);
766835
return writeControlPacket(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
767836
}
768837
return false;

0 commit comments

Comments
 (0)