diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 56f842f3..68b38775 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy, public void query(Query query, int chunkSize, BiConsumer onNext, Runnable onComplete, Consumer onFailure); + /** + * Execute a streaming query against a database. + * + * @param query + * the query to execute. + * @param timeUnit + * the time unit of the results. + * @param chunkSize + * the number of QueryResults to process in one chunk. + * @param onNext + * the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query + * @param onComplete + * the onComplete to invoke for successfully end of stream + * @param onFailure + * the consumer for error handling + */ + public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer onNext, Runnable onComplete, + Consumer onFailure); + /** * Execute a query against a database. * diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 23427a23..d5ca22a9 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -713,6 +713,90 @@ public void onFailure(final Call call, final Throwable t) { }); } +/** + * {@inheritDoc} + */ + @Override + public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer onNext, + final Runnable onComplete, final Consumer onFailure) { + Call call; + if (query.hasBoundParameters()) { + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize, + query.getParameterJsonWithUrlEncoded()); + } else { + call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize, + query.getParameterJsonWithUrlEncoded()); + } + } else { + if (query.requiresPost()) { + call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize); + } else { + call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize); + } + } + + call.enqueue(new Callback() { + @Override + public void onResponse(final Call call, final Response response) { + + Cancellable cancellable = new Cancellable() { + @Override + public void cancel() { + call.cancel(); + } + + @Override + public boolean isCanceled() { + return call.isCanceled(); + } + }; + + try { + if (response.isSuccessful()) { + ResponseBody chunkedBody = response.body(); + chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete); + } else { + // REVIEW: must be handled consistently with IOException. + ResponseBody errorBody = response.errorBody(); + if (errorBody != null) { + InfluxDBException influxDBException = new InfluxDBException(errorBody.string()); + if (onFailure == null) { + throw influxDBException; + } else { + onFailure.accept(influxDBException); + } + } + } + } catch (IOException e) { + QueryResult queryResult = new QueryResult(); + queryResult.setError(e.toString()); + onNext.accept(cancellable, queryResult); + //passing null onFailure consumer is here for backward compatibility + //where the empty queryResult containing error is propagating into onNext consumer + if (onFailure != null) { + onFailure.accept(e); + } + } catch (Exception e) { + call.cancel(); + if (onFailure != null) { + onFailure.accept(e); + } + } + + } + + @Override + public void onFailure(final Call call, final Throwable t) { + if (onFailure == null) { + throw new InfluxDBException(t); + } else { + onFailure.accept(t); + } + } + }); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/org/influxdb/impl/InfluxDBService.java b/src/main/java/org/influxdb/impl/InfluxDBService.java index 061a7661..e93ca28a 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBService.java +++ b/src/main/java/org/influxdb/impl/InfluxDBService.java @@ -75,12 +75,24 @@ public Call postQuery(@Query(DB) String db, @Query(EPOCH) String ep public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize); + @Streaming + @POST("query?chunked=true") + @FormUrlEncoded + public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize); + @Streaming @POST("query?chunked=true") @FormUrlEncoded public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + @Streaming + @POST("query?chunked=true") + @FormUrlEncoded + public Call postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + @POST("query") @FormUrlEncoded public Call postQuery(@Field(value = Q, encoded = true) String query); @@ -90,8 +102,18 @@ public Call postQuery(@Query(DB) String db, @Field(value = Q, enco public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize); + @Streaming + @GET("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize); + @Streaming @GET("query?chunked=true") public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + + @Streaming + @GET("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch, + @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); } diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 2598b9f2..2a7d9760 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -24,11 +24,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -37,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -1158,6 +1155,68 @@ public void testChunkingOnComplete() throws InterruptedException { Assertions.assertTrue(await, "The onComplete action did not arrive!"); } + /** + * Test chunking with TimeUnit + * @throws InterruptedException + */ + @Test + public void testChunkingWithImeUnit() throws InterruptedException { + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.query(new Query("CREATE DATABASE " + dbName)); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build(); + Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build(); + Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build(); + Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build(); + batchPoints.point(point1); + batchPoints.point(point2); + batchPoints.point(point3); + this.influxDB.write(batchPoints); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + Thread.sleep(2000); + Query query = new Query("SELECT * FROM disk", dbName); + this.influxDB.query(query, 2, result -> {}, countDownLatch::countDown); + List results = new ArrayList<>(); + AtomicReference errorFound = new AtomicReference<>(); + + // Run and map to points + this.influxDB.query( + query, + TimeUnit.MILLISECONDS, + 5000, + (cancellable, queryResult) -> results.add(queryResult), + countDownLatch::countDown, + throwable -> { + countDownLatch.countDown(); + errorFound.set(throwable); + } + ); + + Thread.sleep(2000); + this.influxDB.query(new Query("DROP DATABASE " + dbName)); + + boolean await = countDownLatch.await(10, TimeUnit.SECONDS); + Assertions.assertTrue(await, "The onComplete action did not arrive!"); + Assertions.assertNull(errorFound.get(), "An error occurred : " + errorFound.get()); + + long totalPoints = results.stream() + .filter(qr -> qr.getResults() != null) + .flatMap(qr -> qr.getResults().stream()) + .filter(r -> r.getSeries() != null) + .flatMap(r -> r.getSeries().stream()) + .filter(s -> s.getValues() != null) + .mapToLong(s -> s.getValues().size()) + .sum(); + Assertions.assertEquals(3, totalPoints); + } + @Test public void testChunkingFailOnComplete() throws InterruptedException { if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {