Skip to content

Commit 9d832b4

Browse files
committed
Improve data handling with buffer/offset tracking, add flush timeout
1 parent c8baca3 commit 9d832b4

2 files changed

Lines changed: 100 additions & 50 deletions

File tree

src/NimBLEStream.cpp

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,29 @@ bool NimBLEStream::begin() {
127127
}
128128

129129
bool NimBLEStream::end() {
130+
// Release any buffered RX item
131+
if (m_rxState.item && m_rxBuf) {
132+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
133+
m_rxState.item = nullptr;
134+
}
135+
m_rxState.itemSize = 0;
136+
m_rxState.offset = 0;
137+
130138
if (m_txTask) {
131139
vTaskDelete(m_txTask);
132140
m_txTask = nullptr;
133141
}
142+
134143
if (m_txBuf) {
135144
vRingbufferDelete(m_txBuf);
136145
m_txBuf = nullptr;
137146
}
147+
138148
if (m_rxBuf) {
139149
vRingbufferDelete(m_rxBuf);
140150
m_rxBuf = nullptr;
141151
}
142-
m_hasPeek = false;
152+
143153
return true;
144154
}
145155

@@ -161,9 +171,18 @@ size_t NimBLEStream::availableForWrite() const {
161171
return m_txBuf ? xRingbufferGetCurFreeSize(m_txBuf) : 0;
162172
}
163173

164-
void NimBLEStream::flush() {
174+
void NimBLEStream::flush(uint32_t timeout_ms) {
175+
if (!m_txBuf) {
176+
return;
177+
}
178+
179+
ble_npl_time_t deadline = timeout_ms > 0 ? ble_npl_time_get() + ble_npl_time_ms_to_ticks32(timeout_ms) : 0;
180+
165181
// Wait until TX ring is drained
166182
while (m_txBuf && xRingbufferGetCurFreeSize(m_txBuf) < m_txBufSize) {
183+
if (deadline > 0 && ble_npl_time_get() >= deadline) {
184+
break;
185+
}
167186
ble_npl_time_delay(ble_npl_time_ms_to_ticks32(1));
168187
}
169188
}
@@ -174,65 +193,76 @@ int NimBLEStream::available() {
174193
return 0;
175194
}
176195

177-
if (m_hasPeek) {
178-
return 1; // at least the peeked byte
179-
}
196+
// Count buffered RX item remainder
197+
size_t buffered = m_rxState.itemSize > m_rxState.offset ? m_rxState.itemSize - m_rxState.offset : 0;
180198

181199
// Query items in RX ring
182200
UBaseType_t waiting = 0;
183201
vRingbufferGetInfo(m_rxBuf, nullptr, nullptr, nullptr, nullptr, &waiting);
184-
return static_cast<int>(waiting);
202+
203+
return static_cast<int>(buffered + waiting);
185204
}
186205

187206
int NimBLEStream::read() {
188207
if (!m_rxBuf) {
189208
return -1;
190209
}
191210

192-
// Return peeked byte if available
193-
if (m_hasPeek) {
194-
m_hasPeek = false;
195-
return static_cast<int>(m_peekByte);
211+
// Return from buffered item if available
212+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
213+
uint8_t byte = m_rxState.item[m_rxState.offset++];
214+
215+
// Release item if we've consumed it all
216+
if (m_rxState.offset >= m_rxState.itemSize) {
217+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
218+
m_rxState.item = nullptr;
219+
m_rxState.itemSize = 0;
220+
m_rxState.offset = 0;
221+
}
222+
223+
return static_cast<int>(byte);
196224
}
197225

226+
// Fetch next item from ringbuffer
198227
size_t itemSize = 0;
199228
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
200-
if (!item || itemSize == 0) return -1;
201-
202-
uint8_t byte = item[0];
203-
204-
// If item has more bytes, put the rest back
205-
if (itemSize > 1) {
206-
xRingbufferSend(m_rxBuf, item + 1, itemSize - 1, 0);
229+
if (!item || itemSize == 0) {
230+
return -1;
207231
}
208232

209-
vRingbufferReturnItem(m_rxBuf, item);
210-
return static_cast<int>(byte);
233+
// Store in buffer state and return first byte
234+
m_rxState.item = item;
235+
m_rxState.itemSize = itemSize;
236+
m_rxState.offset = 1; // Already consumed first byte
237+
238+
return static_cast<int>(item[0]);
211239
}
212240

213241
int NimBLEStream::peek() {
214242
if (!m_rxBuf) {
215243
return -1;
216244
}
217245

218-
if (m_hasPeek) {
219-
return static_cast<int>(m_peekByte);
246+
// Return from buffered item if available
247+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
248+
return static_cast<int>(m_rxState.item[m_rxState.offset]);
220249
}
221250

222-
size_t itemSize = 0;
223-
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
224-
if (!item || itemSize == 0) {
225-
return -1;
226-
}
227-
228-
m_peekByte = item[0];
229-
m_hasPeek = true;
251+
// Fetch next item from ringbuffer if not already buffered
252+
if (!m_rxState.item) {
253+
size_t itemSize = 0;
254+
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
255+
if (!item || itemSize == 0) {
256+
return -1;
257+
}
230258

231-
// Put the entire item back
232-
xRingbufferSend(m_rxBuf, item, itemSize, 0);
233-
vRingbufferReturnItem(m_rxBuf, item);
259+
// Store in buffer state
260+
m_rxState.item = item;
261+
m_rxState.itemSize = itemSize;
262+
m_rxState.offset = 0;
263+
}
234264

235-
return static_cast<int>(m_peekByte);
265+
return static_cast<int>(m_rxState.item[m_rxState.offset]);
236266
}
237267

238268
size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
@@ -242,13 +272,28 @@ size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
242272

243273
size_t total = 0;
244274

245-
// Consume peeked byte first if present
246-
if (m_hasPeek && total < len) {
247-
buffer[total++] = m_peekByte;
248-
m_hasPeek = false;
275+
// First, consume any buffered RX item remainder
276+
if (m_rxState.item && m_rxState.offset < m_rxState.itemSize) {
277+
size_t available = m_rxState.itemSize - m_rxState.offset;
278+
size_t copyLen = std::min(len, available);
279+
memcpy(buffer, m_rxState.item + m_rxState.offset, copyLen);
280+
m_rxState.offset += copyLen;
281+
total += copyLen;
282+
283+
// Release item if fully consumed
284+
if (m_rxState.offset >= m_rxState.itemSize) {
285+
vRingbufferReturnItem(m_rxBuf, m_rxState.item);
286+
m_rxState.item = nullptr;
287+
m_rxState.itemSize = 0;
288+
m_rxState.offset = 0;
289+
}
290+
291+
if (total >= len) {
292+
return total;
293+
}
249294
}
250295

251-
// Drain RX ringbuffer items up to requested length (non-blocking)
296+
// Drain additional RX ringbuffer items
252297
while (total < len) {
253298
size_t itemSize = 0;
254299
uint8_t* item = static_cast<uint8_t*>(xRingbufferReceive(m_rxBuf, &itemSize, 0));
@@ -260,12 +305,15 @@ size_t NimBLEStream::read(uint8_t* buffer, size_t len) {
260305
memcpy(buffer + total, item, copyLen);
261306
total += copyLen;
262307

263-
// If there are leftover bytes from this item, push them back to RX
264-
if (itemSize > copyLen) {
265-
xRingbufferSend(m_rxBuf, item + copyLen, itemSize - copyLen, 0);
308+
// If we didn't consume the entire item, buffer it
309+
if (copyLen < itemSize) {
310+
m_rxState.item = item;
311+
m_rxState.itemSize = itemSize;
312+
m_rxState.offset = copyLen;
313+
} else {
314+
// Item fully consumed
315+
vRingbufferReturnItem(m_rxBuf, item);
266316
}
267-
268-
vRingbufferReturnItem(m_rxBuf, item);
269317
}
270318

271319
return total;
@@ -277,9 +325,6 @@ size_t NimBLEStream::pushRx(const uint8_t* data, size_t len) {
277325
return 0;
278326
}
279327

280-
// Clear peek state when new data arrives
281-
m_hasPeek = false;
282-
283328
if (xRingbufferSend(m_rxBuf, data, len, 0) != pdTRUE) {
284329
NIMBLE_UART_LOGE(LOG_TAG, "RX buffer full, dropping %u bytes", len);
285330
return 0;

src/NimBLEStream.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class NimBLEStream : public Stream {
8484
}
8585

8686
size_t availableForWrite() const;
87-
void flush() override;
87+
void flush(uint32_t timeout_ms = 0);
8888

8989
// Stream RX methods
9090
virtual int available() override;
@@ -108,6 +108,13 @@ class NimBLEStream : public Stream {
108108
// Push received data into RX ring (called by subclass callbacks)
109109
size_t pushRx(const uint8_t* data, size_t len);
110110

111+
// RX buffering state: avoids requeueing/fragmentation
112+
struct RxState {
113+
uint8_t* item{nullptr};
114+
size_t itemSize{0};
115+
size_t offset{0};
116+
};
117+
111118
RingbufHandle_t m_txBuf{nullptr};
112119
RingbufHandle_t m_rxBuf{nullptr};
113120
TaskHandle_t m_txTask{nullptr};
@@ -116,9 +123,7 @@ class NimBLEStream : public Stream {
116123
uint32_t m_txBufSize{1024};
117124
uint32_t m_rxBufSize{1024};
118125

119-
// RX peek state
120-
mutable uint8_t m_peekByte{0};
121-
mutable bool m_hasPeek{false};
126+
mutable RxState m_rxState{}; // Track current RX item to avoid requeueing
122127
};
123128

124129
# if MYNEWT_VAL(BLE_ROLE_PERIPHERAL)

0 commit comments

Comments
 (0)