Skip to content

Commit efe39e6

Browse files
committed
feat: Add getter for fetching pre-batch buffer sizes
1 parent 6662d8c commit efe39e6

10 files changed

Lines changed: 749 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 7.6.0 [unreleased]
22

3+
### Features
4+
5+
- [TODO](): Add getter for fetching pre-batch buffer sizes
6+
37
## 7.5.0 [2026-01-13]
48

59
### Features

client/README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ The writes are processed in batches which are configurable by `WriteOptions`:
613613
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |
614614
| **captureBackpressureData** | whether to capture affected data points in backpressure events | false |
615615
| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 |
616+
| **enableBufferTracking** | whether to enable pre-batch buffer size tracking via `getPreBatchBufferSize()` | false |
616617

617618
There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
618619

@@ -680,6 +681,39 @@ writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> {
680681

681682
Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed.
682683

684+
##### Buffer Size Monitoring
685+
686+
The `WriteApi` provides methods to monitor the number of data points waiting in the pre-batch buffer before being sent to InfluxDB. This is useful for monitoring write throughput and detecting potential bottlenecks.
687+
688+
Each unique combination of bucket, organization, precision, and consistency has its own independent buffer.
689+
690+
```java
691+
WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
692+
.batchSize(1000)
693+
.flushInterval(1000)
694+
.build());
695+
696+
// Write some data
697+
writeApi.writeRecord("my-bucket", "my-org", WritePrecision.NS, "measurement,tag=value field=1");
698+
699+
// Get buffer size for a specific destination
700+
int bufferSize = writeApi.getPreBatchBufferSize("my-bucket", "my-org", WritePrecision.NS);
701+
System.out.println("Points waiting to be batched: " + bufferSize);
702+
703+
// Get all buffer sizes across all destinations
704+
Map<WriteParameters, Integer> allSizes = writeApi.getPreBatchBufferSizes();
705+
allSizes.forEach((params, size) ->
706+
System.out.println(params + ": " + size + " points"));
707+
```
708+
709+
To enable pre-batch buffer tracking:
710+
711+
```java
712+
WriteOptions options = WriteOptions.builder()
713+
.enableBufferTracking(true)
714+
.build();
715+
```
716+
683717
#### Writing data
684718

685719
##### By POJO

client/src/main/java/com/influxdb/client/WriteApi.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
package com.influxdb.client;
2323

2424
import java.util.List;
25+
import java.util.Map;
2526
import javax.annotation.Nonnull;
2627
import javax.annotation.Nullable;
2728
import javax.annotation.concurrent.ThreadSafe;
2829

30+
import com.influxdb.client.domain.WriteConsistency;
2931
import com.influxdb.client.domain.WritePrecision;
3032
import com.influxdb.client.write.Point;
3133
import com.influxdb.client.write.WriteParameters;
@@ -283,6 +285,55 @@ <T extends AbstractWriteEvent> ListenerRegistration listenEvents(@Nonnull final
283285
*/
284286
void flush();
285287

288+
/**
289+
* Returns the current pre-batch buffer size for the specified destination.
290+
* This represents the number of data points waiting to be batched for the given
291+
* bucket, organization, precision, and consistency combination.
292+
*
293+
* <p>
294+
* Note: Each unique combination of (bucket, org, precision, consistency) has its own
295+
* independent pre-batch buffer. The parameters must match exactly how the data was written.
296+
* </p>
297+
*
298+
* @param bucket the destination bucket
299+
* @param org the destination organization
300+
* @param precision the write precision
301+
* @return current number of points waiting to be batched, or 0 if no buffer exists
302+
*/
303+
int getPreBatchBufferSize(@Nonnull final String bucket,
304+
@Nonnull final String org,
305+
@Nonnull final WritePrecision precision);
306+
307+
/**
308+
* Returns the current pre-batch buffer size for the specified destination.
309+
*
310+
* @param bucket the destination bucket
311+
* @param org the destination organization
312+
* @param precision the write precision
313+
* @param consistency the write consistency (for InfluxDB Enterprise clusters)
314+
* @return current number of points waiting to be batched, or 0 if no buffer exists
315+
*/
316+
int getPreBatchBufferSize(@Nonnull final String bucket,
317+
@Nonnull final String org,
318+
@Nonnull final WritePrecision precision,
319+
@Nullable final WriteConsistency consistency);
320+
321+
/**
322+
* Returns the current pre-batch buffer size for the specified destination.
323+
*
324+
* @param params the write parameters identifying the destination
325+
* @return current number of points waiting to be batched, or 0 if no buffer exists
326+
*/
327+
int getPreBatchBufferSize(@Nonnull final WriteParameters params);
328+
329+
/**
330+
* Returns a snapshot of all current pre-batch buffer sizes.
331+
*
332+
* @return map of WriteParameters to current buffer size
333+
*/
334+
@Nonnull
335+
Map<WriteParameters, Integer> getPreBatchBufferSizes();
336+
286337
/**
287338
* Close threads for asynchronous batch writing.
288339
*/

client/src/main/java/com/influxdb/client/WriteOptions.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* <li>bufferLimit = 10_000</li>
4646
* <li>concatMapPrefetch = 2</li>
4747
* <li>captureBackpressureData = false</li>
48+
* <li>enableBufferTracking = false</li>
4849
* </ul>
4950
* <p>
5051
* The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}.
@@ -66,6 +67,7 @@ public final class WriteOptions implements WriteApi.RetryOptions {
6667
public static final int DEFAULT_BUFFER_LIMIT = 10000;
6768
public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2;
6869
public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false;
70+
public static final boolean DEFAULT_ENABLE_BUFFER_TRACKING = false;
6971

7072
/**
7173
* Default configuration with values that are consistent with Telegraf.
@@ -85,6 +87,7 @@ public final class WriteOptions implements WriteApi.RetryOptions {
8587
private final Scheduler writeScheduler;
8688
private final BackpressureOverflowStrategy backpressureStrategy;
8789
private final boolean captureBackpressureData;
90+
private final boolean enableBufferTracking;
8891

8992
/**
9093
* @return the number of data point to collect in batch
@@ -214,6 +217,14 @@ public boolean getCaptureBackpressureData() {
214217
return captureBackpressureData;
215218
}
216219

220+
/**
221+
* @return whether to enable pre-batch buffer size tracking
222+
* @see WriteOptions.Builder#enableBufferTracking(boolean)
223+
*/
224+
public boolean getEnableBufferTracking() {
225+
return enableBufferTracking;
226+
}
227+
217228
private WriteOptions(@Nonnull final Builder builder) {
218229

219230
Arguments.checkNotNull(builder, "WriteOptions.Builder");
@@ -231,6 +242,7 @@ private WriteOptions(@Nonnull final Builder builder) {
231242
writeScheduler = builder.writeScheduler;
232243
backpressureStrategy = builder.backpressureStrategy;
233244
captureBackpressureData = builder.captureBackpressureData;
245+
enableBufferTracking = builder.enableBufferTracking;
234246
}
235247

236248
/**
@@ -262,6 +274,7 @@ public static class Builder {
262274
private Scheduler writeScheduler = Schedulers.newThread();
263275
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
264276
private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA;
277+
private boolean enableBufferTracking = DEFAULT_ENABLE_BUFFER_TRACKING;
265278

266279
/**
267280
* Set the number of data point to collect in batch.
@@ -455,6 +468,22 @@ public Builder captureBackpressureData(final boolean captureBackpressureData) {
455468
return this;
456469
}
457470

471+
/**
472+
* Set whether to enable pre-batch buffer size tracking.
473+
*
474+
* When enabled, the WriteApi tracks the number of data points waiting in the pre-batch buffer
475+
* for each destination (bucket, org, precision, consistency combination). This allows monitoring
476+
* buffer sizes via {@link WriteApi#getPreBatchBufferSize} and {@link WriteApi#getPreBatchBufferSizes}.
477+
*
478+
* @param enableBufferTracking whether to enable buffer size tracking.
479+
* @return {@code this}
480+
*/
481+
@Nonnull
482+
public Builder enableBufferTracking(final boolean enableBufferTracking) {
483+
this.enableBufferTracking = enableBufferTracking;
484+
return this;
485+
}
486+
458487
/**
459488
* Build an instance of WriteOptions.
460489
*
@@ -466,4 +495,4 @@ public WriteOptions build() {
466495
return new WriteOptions(this);
467496
}
468497
}
469-
}
498+
}

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
package com.influxdb.client.internal;
2323

2424
import java.util.Collection;
25+
import java.util.Map;
2526
import java.util.Objects;
27+
import java.util.concurrent.ConcurrentHashMap;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.TimeoutException;
2830
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicInteger;
2932
import java.util.function.BiConsumer;
3033
import java.util.function.Supplier;
3134
import java.util.logging.Level;
@@ -90,6 +93,7 @@ public abstract class AbstractWriteClient extends AbstractRestClient implements
9093
private final Collection<AutoCloseable> autoCloseables;
9194

9295
private AtomicBoolean finished = new AtomicBoolean(false);
96+
private final ConcurrentHashMap<WriteParameters, AtomicInteger> preBatchBufferSizes = new ConcurrentHashMap<>();
9397

9498
public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
9599
@Nonnull final InfluxDBClientOptions options,
@@ -124,15 +128,30 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
124128
//
125129
// Use Buffer to create Batch Items
126130
//
127-
.compose(source ->
128-
new FlowableBufferTimedFlushable<>(
131+
.compose(source -> {
132+
if (writeOptions.getEnableBufferTracking()) {
133+
AtomicInteger groupSize = preBatchBufferSizes.computeIfAbsent(
134+
group.getKey(), k -> new AtomicInteger(0));
135+
return new FlowableBufferTimedFlushable<>(
136+
source,
137+
flushPublisher,
138+
writeOptions.getFlushInterval(),
139+
TimeUnit.MILLISECONDS,
140+
writeOptions.getBatchSize(), processorScheduler,
141+
ArrayListSupplier.asSupplier(),
142+
groupSize::set
143+
);
144+
} else {
145+
return new FlowableBufferTimedFlushable<>(
129146
source,
130147
flushPublisher,
131148
writeOptions.getFlushInterval(),
132149
TimeUnit.MILLISECONDS,
133150
writeOptions.getBatchSize(), processorScheduler,
134151
ArrayListSupplier.asSupplier()
135-
))
152+
);
153+
}
154+
})
136155
//
137156
// Collect Batch items into one Write Item
138157
//
@@ -193,6 +212,32 @@ public void flush() {
193212
flushPublisher.offer(true);
194213
}
195214

215+
/**
216+
* Returns the current pre-batch buffer size for the specified destination.
217+
* This represents the number of data points waiting to be batched for the given
218+
* bucket, organization, precision, and consistency combination.
219+
*
220+
* @param params the write parameters identifying the destination
221+
* @return current number of points waiting to be batched, or 0 if no buffer exists
222+
*/
223+
public int getPreBatchBufferSize(@Nonnull final WriteParameters params) {
224+
Arguments.checkNotNull(params, "WriteParameters");
225+
AtomicInteger size = preBatchBufferSizes.get(params);
226+
return size != null ? size.get() : 0;
227+
}
228+
229+
/**
230+
* Returns a snapshot of all current pre-batch buffer sizes.
231+
*
232+
* @return map of WriteParameters to current buffer size
233+
*/
234+
@Nonnull
235+
public Map<WriteParameters, Integer> getPreBatchBufferSizes() {
236+
ConcurrentHashMap<WriteParameters, Integer> result = new ConcurrentHashMap<>();
237+
preBatchBufferSizes.forEach((key, value) -> result.put(key, value.get()));
238+
return result;
239+
}
240+
196241
public void close() {
197242

198243
LOG.log(Level.FINE, "Flushing any cached BatchWrites before shutdown.");

client/src/main/java/com/influxdb/client/internal/WriteApiImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import java.util.Collection;
2525
import java.util.Collections;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.Objects;
2829
import javax.annotation.Nonnull;
2930
import javax.annotation.Nullable;
3031

3132
import com.influxdb.client.InfluxDBClientOptions;
3233
import com.influxdb.client.WriteApi;
3334
import com.influxdb.client.WriteOptions;
35+
import com.influxdb.client.domain.WriteConsistency;
3436
import com.influxdb.client.domain.WritePrecision;
3537
import com.influxdb.client.service.WriteService;
3638
import com.influxdb.client.write.Point;
@@ -269,6 +271,40 @@ public <T extends AbstractWriteEvent> ListenerRegistration listenEvents(@Nonnull
269271
return subscribe::dispose;
270272
}
271273

274+
@Override
275+
public int getPreBatchBufferSize(@Nonnull final String bucket,
276+
@Nonnull final String org,
277+
@Nonnull final WritePrecision precision) {
278+
Arguments.checkNonEmpty(bucket, "bucket");
279+
Arguments.checkNonEmpty(org, "org");
280+
Arguments.checkNotNull(precision, "WritePrecision");
281+
282+
return getPreBatchBufferSize(new WriteParameters(bucket, org, precision));
283+
}
284+
285+
@Override
286+
public int getPreBatchBufferSize(@Nonnull final String bucket,
287+
@Nonnull final String org,
288+
@Nonnull final WritePrecision precision,
289+
@Nullable final WriteConsistency consistency) {
290+
Arguments.checkNonEmpty(bucket, "bucket");
291+
Arguments.checkNonEmpty(org, "org");
292+
Arguments.checkNotNull(precision, "WritePrecision");
293+
294+
return getPreBatchBufferSize(new WriteParameters(bucket, org, precision, consistency));
295+
}
296+
297+
@Override
298+
public int getPreBatchBufferSize(@Nonnull final WriteParameters params) {
299+
return super.getPreBatchBufferSize(params);
300+
}
301+
302+
@Override
303+
@Nonnull
304+
public Map<WriteParameters, Integer> getPreBatchBufferSizes() {
305+
return super.getPreBatchBufferSizes();
306+
}
307+
272308
@Override
273309
public void close() {
274310
super.close();

0 commit comments

Comments
 (0)