@@ -182,6 +182,10 @@ public ParallelTaskRunner.Config build() {
182182 if (capacity < 0 ) {
183183 capacity = numTasks * 2 ;
184184 }
185+ if (sorted && numTasks == 1 ) {
186+ logger .debug ("Sorted ParallelTaskRunner with a single task. Setting sorted to false." );
187+ sorted = false ;
188+ }
185189 return new ParallelTaskRunner .Config (numTasks , batchSize , capacity , abortOnFail , sorted , readQueuePutTimeoutSeconds );
186190 }
187191 }
@@ -802,16 +806,17 @@ public Void call() throws InterruptedException {
802806 }
803807 }
804808 } else if (writeBlockingQueueFuture != null ) {
805- status = TaskRunnableStatus .WRITING_BATCH_TO_QUEUE ;
806809 // Sorted PTR should not have to drain!
807- CompletableFuture <Batch <O >> future = new CompletableFuture <>();
808- future .complete (new Batch <O >(batchResult , batch .position + 1 ));
809- while (!writeBlockingQueueFuture .offer (future , TIMEOUT_CHECK , TimeUnit .SECONDS )) {
810- if (isAbortPending ()) {
811- logger .warn ("Abort task thread on fail" );
812- break ;
813- }
814- }
810+ throw new IllegalStateException ("Sorted ParallelTaskRunner should not have to drain!" );
811+ // status = TaskRunnableStatus.WRITING_BATCH_TO_QUEUE;
812+ // CompletableFuture<Batch<O>> future = new CompletableFuture<>();
813+ // future.complete(new Batch<O>(batchResult, batch.position + 1));
814+ // while (!writeBlockingQueueFuture.offer(future, TIMEOUT_CHECK, TimeUnit.SECONDS)) {
815+ // if (isAbortPending()) {
816+ // logger.warn("Abort task thread on fail");
817+ // break;
818+ // }
819+ // }
815820 }
816821 }
817822 } catch (Error e ) {
0 commit comments