Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy,
public void query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
Consumer<Throwable> onFailure);

/**
* Execute a streaming query against a database.
*
* @param query
* the query to execute.
* @param timeUnit
* the time unit of the results.
* @param chunkSize
* the number of QueryResults to process in one chunk.
* @param onNext
* the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query
* @param onComplete
* the onComplete to invoke for successfully end of stream
* @param onFailure
* the consumer for error handling
*/
public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
Consumer<Throwable> onFailure);

/**
* Execute a query against a database.
*
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,90 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
});
}

/**
* {@inheritDoc}
*/
@Override
public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
final Runnable onComplete, final Consumer<Throwable> onFailure) {
Call<ResponseBody> call;
if (query.hasBoundParameters()) {
if (query.requiresPost()) {
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
query.getParameterJsonWithUrlEncoded());
} else {
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
query.getParameterJsonWithUrlEncoded());
}
} else {
if (query.requiresPost()) {
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
} else {
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
}
}

call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {

Cancellable cancellable = new Cancellable() {
@Override
public void cancel() {
call.cancel();
}

@Override
public boolean isCanceled() {
return call.isCanceled();
}
};

try {
if (response.isSuccessful()) {
ResponseBody chunkedBody = response.body();
chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
} else {
// REVIEW: must be handled consistently with IOException.
ResponseBody errorBody = response.errorBody();
if (errorBody != null) {
InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
if (onFailure == null) {
throw influxDBException;
} else {
onFailure.accept(influxDBException);
}
}
}
} catch (IOException e) {
QueryResult queryResult = new QueryResult();
queryResult.setError(e.toString());
onNext.accept(cancellable, queryResult);
//passing null onFailure consumer is here for backward compatibility
//where the empty queryResult containing error is propagating into onNext consumer
if (onFailure != null) {
onFailure.accept(e);
}
} catch (Exception e) {
call.cancel();
if (onFailure != null) {
onFailure.accept(e);
}
}

}

@Override
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
if (onFailure == null) {
throw new InfluxDBException(t);
} else {
onFailure.accept(t);
}
}
});
}

/**
* {@inheritDoc}
*/
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,24 @@ public Call<QueryResult> postQuery(@Query(DB) String db, @Query(EPOCH) String ep
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@Streaming
@POST("query?chunked=true")
@FormUrlEncoded
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@POST("query")
@FormUrlEncoded
public Call<QueryResult> postQuery(@Field(value = Q, encoded = true) String query);
Expand All @@ -90,8 +102,18 @@ public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, enco
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);

@Streaming
@GET("query?chunked=true")
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
}