diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/DeleteDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/DeleteDBTaskPage.java index 2d67418579..cedf3602da 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/DeleteDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/DeleteDBTaskPage.java @@ -37,7 +37,7 @@ protected void buildCommandResult() { // For now, we will return -1 as the deleted count.When we update collections to use this class // we can refactor to return the actual count for them. // If there is error, we won't add this status. - if (tasks.errorTasks().isEmpty()) { + if (taskGroup.errorTasks().isEmpty()) { resultBuilder.addStatus(CommandStatus.DELETED_COUNT, -1); } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/InsertDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/InsertDBTaskPage.java index 90b4072be5..b46d381f1b 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/InsertDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/InsertDBTaskPage.java @@ -71,7 +71,7 @@ private void buildNonPerDocumentResult() { // Note: See DocRowIdentifer, it has an attribute that will be called for JSON serialization List insertedIds = - tasks.completedTasks().stream() + taskGroup.completedTasks().stream() .map(InsertDBTask::docRowID) .map(Optional::orElseThrow) .toList(); @@ -102,17 +102,17 @@ private void buildPerDocumentResult() { // kept using the same approach as InsertOperationPage to make comparison easy until we remove // the old class - var results = new InsertionResult[tasks.size()]; + var results = new InsertionResult[taskGroup.tasks().size()]; // Results array filled in order: first successful insertions - for (var task : tasks.completedTasks()) { + for (var task : taskGroup.completedTasks()) { results[task.position()] = new InsertionResult(task.docRowID().orElseThrow(), InsertionStatus.OK, null); } List seenErrors = new ArrayList<>(); // Second: failed insertions; output in order of insertion - for (var task : tasks.errorTasks()) { + for (var task : taskGroup.errorTasks()) { var cmdError = resultBuilder.throwableToCommandError(task.failure().orElseThrow()); // We want to avoid adding the same error multiple times, so we keep track of the index: @@ -129,7 +129,7 @@ private void buildPerDocumentResult() { // And third, if any, skipped insertions; those that were not attempted (f.ex due // to failure for ordered inserts) - for (var task : tasks.skippedTasks()) { + for (var task : taskGroup.skippedTasks()) { results[task.position()] = new InsertionResult(task.docRowID().orElseThrow(), InsertionStatus.SKIPPED, null); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/MetadataDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/MetadataDBTaskPage.java index 03e45670bc..fabf0aac17 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/MetadataDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/MetadataDBTaskPage.java @@ -55,15 +55,15 @@ protected void buildCommandResult() { addTaskWarningsToResult(); addTaskErrorsToResult(); - var metadataAttempts = tasks.completedTasks(); + var metadataAttempts = taskGroup.completedTasks(); if (metadataAttempts.size() > 1) { throw new IllegalArgumentException("Only one attempt is expected for metadata commands"); } if (!metadataAttempts.isEmpty()) { if (showSchema) { - resultBuilder.addStatus(statusKey, tasks.getFirst().getSchema()); + resultBuilder.addStatus(statusKey, taskGroup.tasks().getFirst().getSchema()); } else { - resultBuilder.addStatus(statusKey, tasks.getFirst().getNames()); + resultBuilder.addStatus(statusKey, taskGroup.tasks().getFirst().getNames()); } } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/ReadDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/ReadDBTaskPage.java index 38c3e7ccf8..035ef3dce4 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/ReadDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/ReadDBTaskPage.java @@ -48,7 +48,7 @@ protected void buildCommandResult() { maybeAddSortedRowCount(); maybeAddSchema(CommandStatus.PROJECTION_SCHEMA); - tasks.completedTasks().stream() + taskGroup.completedTasks().stream() .flatMap(task -> task.documents().stream()) .forEach(resultBuilder::addDocument); } @@ -56,7 +56,7 @@ protected void buildCommandResult() { protected void maybeAddSortedRowCount() { var rowCounts = - tasks.completedTasks().stream() + taskGroup.completedTasks().stream() .map(ReadDBTask::sortedRowCount) .filter(Optional::isPresent) .map(Optional::get) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/SchemaDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/SchemaDBTaskPage.java index d6a3980718..df65b57875 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/SchemaDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/SchemaDBTaskPage.java @@ -43,7 +43,7 @@ Accumulator accumulator( protected void buildCommandResult() { super.buildCommandResult(); - resultBuilder.addStatus(CommandStatus.OK, tasks.allTasksCompleted() ? 1 : 0); + resultBuilder.addStatus(CommandStatus.OK, taskGroup.allTasksCompleted() ? 1 : 0); } public static class Accumulator, SchemaT extends SchemaObject> diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/TruncateDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/TruncateDBTaskPage.java index 9ffb2ac9b4..8158c910f3 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/TruncateDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/TruncateDBTaskPage.java @@ -48,7 +48,7 @@ protected void buildCommandResult() { super.buildCommandResult(); // truncate a table, set delete_count status as -1 - if (tasks.errorTasks().isEmpty()) { + if (taskGroup.errorTasks().isEmpty()) { resultBuilder.addStatus(CommandStatus.DELETED_COUNT, -1); } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/UpdateDBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/UpdateDBTaskPage.java index d9c7829d45..098dccd269 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/UpdateDBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/UpdateDBTaskPage.java @@ -37,7 +37,7 @@ protected void buildCommandResult() { // However - we do not know if an upsert happened :( // NOTE when update collection uses operation attempt this will get more complex // If there is error, we won't add this status. - if (tasks.errorTasks().isEmpty()) { + if (taskGroup.errorTasks().isEmpty()) { resultBuilder.addStatus(CommandStatus.MATCHED_COUNT, 1); resultBuilder.addStatus(CommandStatus.MODIFIED_COUNT, 1); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/embeddings/EmbeddingOperationFactory.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/embeddings/EmbeddingOperationFactory.java index d6bc9bfdd7..3d3d2629c6 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/embeddings/EmbeddingOperationFactory.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/embeddings/EmbeddingOperationFactory.java @@ -47,7 +47,7 @@ Operation createOperation( if (LOGGER.isDebugEnabled()) { LOGGER.debug( "createOperation() - zero embeddingActions, creating direct TaskOperation operation tasksAndDeferrables.taskGroup().size()={}", - tasksAndDeferrables.taskGroup().size()); + tasksAndDeferrables.taskGroup().tasks().size()); } // basic task, just wrap the tasks in an operation and go return new TaskOperation<>( @@ -58,7 +58,7 @@ Operation createOperation( LOGGER.debug( "createOperation() - creating CompositeTask Operation, embeddingActions.size()={}, tasksAndDeferrables.taskGroup().size()={}", embeddingActions.size(), - tasksAndDeferrables.taskGroup().size()); + tasksAndDeferrables.taskGroup().tasks().size()); } var compositeBuilder = new CompositeTaskOperationBuilder<>(commandContext); @@ -78,7 +78,7 @@ Operation createOperation( if (LOGGER.isDebugEnabled()) { LOGGER.debug( "createOperation() - created EmbeddingTasks embeddingTaskGroup.size={}", - embeddingTaskGroup.size()); + embeddingTaskGroup.tasks().size()); } // we want to run a group of embedding tasks and then a group of the other tasks, diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/reranking/RerankingTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/reranking/RerankingTaskPage.java index ad3c89d47a..c280beb914 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/reranking/RerankingTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/reranking/RerankingTaskPage.java @@ -41,11 +41,11 @@ public static Accumulator accu protected void buildCommandResult() { // There should only be 1 rerankig task - if (tasks.completedTasks().size() != 1) { + if (taskGroup.completedTasks().size() != 1) { throw new IllegalStateException( - "Expected exactly 1 completed RerankingTask, got " + tasks.completedTasks().size()); + "Expected exactly 1 completed RerankingTask, got " + taskGroup.completedTasks().size()); } - var completedTask = tasks.completedTasks().getFirst(); + var completedTask = taskGroup.completedTasks().getFirst(); // add any errors and warnings super.buildCommandResult(); diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTask.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTask.java index 99e21700d4..d4965a6e61 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTask.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTask.java @@ -67,7 +67,7 @@ public CompositeTask( super(position, schemaObject, retryPolicy); this.innerTaskGroup = Objects.requireNonNull(innerTaskGroup, "innerTaskGroup cannot be null"); - if (innerTaskGroup.isEmpty()) { + if (innerTaskGroup.tasks().isEmpty()) { throw new IllegalArgumentException("innerTaskGroup cannot be empty"); } // last task accumulator can be null, if this is an intermediate task @@ -173,7 +173,7 @@ protected void onSuccess(CompositeTaskInnerPage result) { public Task setSkippedIfReady() { // make sure we pass this though to the inner tasks, the CompositeTask has been skipped // so all inner tasks are also skipped - innerTaskGroup.forEach(Task::setSkippedIfReady); + innerTaskGroup.tasks().forEach(Task::setSkippedIfReady); return super.setSkippedIfReady(); } @@ -189,7 +189,7 @@ public List allWarnings() { var allWarnings = new ArrayList<>(super.allWarnings()); allWarnings.addAll( - innerTaskGroup.stream().map(Task::allWarnings).flatMap(List::stream).toList()); + innerTaskGroup.tasks().stream().map(Task::allWarnings).flatMap(List::stream).toList()); return allWarnings; } @@ -204,7 +204,7 @@ public List warningsExcludingSuppressed() { var allWarnings = new ArrayList<>(super.warningsExcludingSuppressed()); allWarnings.addAll( - innerTaskGroup.stream() + innerTaskGroup.tasks().stream() .map(Task::warningsExcludingSuppressed) .flatMap(List::stream) .toList()); diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTaskOuterPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTaskOuterPage.java index 0237ee3ef0..ad95b915c6 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTaskOuterPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/CompositeTaskOuterPage.java @@ -34,7 +34,7 @@ public static Accumulator accumulator( @Override public CommandResult get() { - if (!tasks.errorTasks().isEmpty()) { + if (!taskGroup.errorTasks().isEmpty()) { // we have some failed tasks, they are failed CompositeTask's that have lifted errors // from their inner tasks // the superclass will build a basic response with errors and warnings, that is what we need @@ -43,9 +43,7 @@ public CommandResult get() { // the last composite task is the one that will build the results of running all the composite // tasks. - // TODO: AARON - need better guarantee the last task is the last task according to it's position - // etc - return tasks.getLast().lastTaskAccumulator().getResults().get(); + return taskGroup.tasks().getLast().lastTaskAccumulator().getResults().get(); } /** diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/DBTaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/DBTaskPage.java index 367e34ab7f..107640ac1f 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/DBTaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/DBTaskPage.java @@ -8,8 +8,8 @@ public abstract class DBTaskPage, SchemaT extends SchemaObject> extends TaskPage { - protected DBTaskPage(TaskGroup tasks, CommandResultBuilder resultBuilder) { - super(tasks, resultBuilder); + protected DBTaskPage(TaskGroup taskGroup, CommandResultBuilder resultBuilder) { + super(taskGroup, resultBuilder); } /** @@ -19,11 +19,11 @@ protected DBTaskPage(TaskGroup tasks, CommandResultBuilder resul * have the _id or PK to report. */ protected void maybeAddSchema(CommandStatus statusKey) { - if (tasks.isEmpty()) { + if (taskGroup.tasks().isEmpty()) { return; } - tasks.stream() + taskGroup.tasks().stream() .map(DBTask::schemaDescription) .filter(Optional::isPresent) .findFirst() diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroup.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroup.java index fc15058144..a1397527d8 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroup.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroup.java @@ -1,88 +1,171 @@ package io.stargate.sgv2.jsonapi.service.operation.tasks; import io.stargate.sgv2.jsonapi.service.cqldriver.executor.SchemaObject; +import io.stargate.sgv2.jsonapi.util.ClassUtils; import io.stargate.sgv2.jsonapi.util.recordable.Recordable; import java.util.*; /** * Ordered container of {@link Task} to be processed as a group. * - *

The container can be configured to process the tasks sequentially or in parallel (default), + *

The TaskGroup can be configured to process the tasks sequentially or in parallel (default), * and other config that is needed about how to process the tasks as a group should live here (e.g. - * if there is a delay between tasks, or if the container should fail fast). - * - *

TODO: aaron feb 4 - stop inheriting array list, just expose methods needed + * if there is a delay between tasks, or if the TaskGroup should fail fast). See {@link + * #shouldFailFast(TaskT)} * * @param Schema object type */ public class TaskGroup, SchemaT extends SchemaObject> - extends ArrayList implements Recordable { + implements Recordable { + + private static final boolean DEFAULT_SEQUENTIAL_PROCESSING = false; - private boolean sequentialProcessing = false; + // NOTE: this should only be accessed in functions design to mutate the list, e.g. add() + // because we must guarantee Task.position() ordering. + // use tasksView for all other access + private final List mutableTasks; + // unmodifiable view of the tasks, that can be shared outside the class + private final List tasksView; + + private final boolean sequentialProcessing; private final UUID groupId = UUID.randomUUID(); + private TaskGroup(boolean sequentialProcessing, int taskCapacity) { + this.sequentialProcessing = sequentialProcessing; + this.mutableTasks = new ArrayList<>(taskCapacity); + this.tasksView = Collections.unmodifiableList(mutableTasks); + } + + /** Initialize the TaskGroup with parallel processing. */ public TaskGroup() { - super(); + this(DEFAULT_SEQUENTIAL_PROCESSING); } /** - * Initialize the container. + * Initialize the TaskGroup. * * @param sequentialProcessing If true the tasks will be processed sequentially, rather than in - * parallel, and the container will skip any remaining tasks if one fails. If false the tasks + * parallel, and the TaskGroup will skip any remaining tasks if one fails. If false the tasks * will be processed in parallel and all tasks will be processed regardless of the status of - * any other tasks. + * any other tasks. See {@link #shouldFailFast(TaskT)} */ public TaskGroup(boolean sequentialProcessing) { - super(); - this.sequentialProcessing = sequentialProcessing; + this(sequentialProcessing, 1); + // Reads and insertOne will normally only have a single task, optimize for that. } + /** + * Initialize the TaskGroup with a single task, using parallel processing. + * + * @param task + */ public TaskGroup(TaskT task) { - this(List.of(task)); + this(List.of(Objects.requireNonNull(task, "task cannot be null"))); } + /** + * Initialize the TaskGroup with a list of tasks, using parallel processing. + * + * @param tasks Tasks to add to the group + */ public TaskGroup(List tasks) { - super(tasks); + this( + DEFAULT_SEQUENTIAL_PROCESSING, + Objects.requireNonNull(tasks, "tasks cannot be null").size()); + tasks.forEach(this::add); + } + + /** + * Gets an unmodifiable list of the tasks in the TaskGroup, order is ascending {@link + * Task#position()} + * + * @return Unmodifiable list of tasks + */ + public List tasks() { + return tasksView; + } + + /** + * Adds the supplied task to the group, in ascending {@link Task#position()} order + * + * @param task The task to add + */ + public void add(TaskT task) { + Objects.requireNonNull(task, "task cannot be null"); + + // Task is Comparable + int index = Collections.binarySearch(mutableTasks, task, Comparator.naturalOrder()); + if (index >= 0) { + throw new IllegalArgumentException( + "Existing task in TaskGroup with the same position. task=" + task.taskDesc()); + } + + int insertionPoint = -index - 1; + mutableTasks.add(insertionPoint, task); + } + + public void addAll(List tasksToAdd) { + Objects.requireNonNull(tasksToAdd, "tasksToAdd cannot be null"); + tasksToAdd.forEach(this::add); } + // + // public int size() { + // return unmodifiableTaskView.size(); + // } + // + // public boolean isEmpty(){ + // return unmodifiableTaskView.isEmpty(); + // } + // + // /** Gets an unmodifiable iterator of the tasks in the TaskGroup */ + // @Override + // public Iterator iterator() { + // return unmodifiableTaskView.iterator(); + // } + // + // /** Gets an unmodifiable stream of the tasks in the TaskGroup */ + // public Stream stream() { + // return unmodifiableTaskView.stream(); + // } + /** - * @return Returns true if the container is configured for sequential processing. + * @return Returns true if the TaskGroup is configured for sequential processing. */ public boolean getSequentialProcessing() { return sequentialProcessing; } public List errorTasks() { - return stream().filter(task -> task.status() == Task.TaskStatus.ERROR).toList(); + return tasksView.stream().filter(task -> task.status() == Task.TaskStatus.ERROR).toList(); } public boolean allTasksCompleted() { - return stream().allMatch(task -> task.status() == Task.TaskStatus.COMPLETED); + return tasksView.stream().allMatch(task -> task.status() == Task.TaskStatus.COMPLETED); } public List completedTasks() { - return stream().filter(task -> task.status() == Task.TaskStatus.COMPLETED).toList(); + return tasksView.stream().filter(task -> task.status() == Task.TaskStatus.COMPLETED).toList(); } public List skippedTasks() { - return stream().filter(task -> task.status() == Task.TaskStatus.SKIPPED).toList(); + return tasksView.stream().filter(task -> task.status() == Task.TaskStatus.SKIPPED).toList(); } public void throwIfNotAllTerminal() { - var nonTerminalTasks = stream().filter(task -> !task.status().isTerminal()).toList(); + var nonTerminalTasks = tasksView.stream().filter(task -> !task.status().isTerminal()).toList(); if (!nonTerminalTasks.isEmpty()) { - var msg = String.join(", ", nonTerminalTasks.stream().map(Object::toString).toList()); + var msg = String.join(", ", nonTerminalTasks.stream().map(Task::taskDesc).toList()); throw new IllegalStateException( "throwIfNotAllTerminal() - Non terminal Tasks found, non-terminal=" + msg); } } /** - * Determines whether the given {@code targetTask} should fail fast based on the container's - * configuration and the current state of all tasks in this container. + * Determines whether the given {@code targetTask} should fail fast based on the TaskGroup's + * configuration and the current state of all tasks in this TaskGroup. * *

If sequential processing is disabled, this method always returns {@code false} — all tasks * are allowed to run regardless of prior failures. @@ -109,21 +192,25 @@ public void throwIfNotAllTerminal() { * shouldFailFast(e) → true ('c' failed earlier, so 'e' must fail fast) * * - * @param targetTask the task to evaluate - * @return {@code true} if the container is configured for sequential processing and the {@code - * targetTask} (or any earlier task) is in error state; {@code false} otherwise + * @param candidateTask the next candidate task to be processed + * @return {@code true} if the {@code candidateTask} should fail fast, that is it should be + * skipped without execution; {@code false} otherwise. */ - public boolean shouldFailFast(TaskT targetTask) { + public boolean shouldFailFast(TaskT candidateTask) { + Objects.requireNonNull(candidateTask, "candidateTask cannot be null"); + if (!sequentialProcessing) { return false; } + // In sequential processing, if the target task is already in error state, we should fail fast - if (targetTask.status() == BaseTask.TaskStatus.ERROR) { + if (candidateTask.status() == BaseTask.TaskStatus.ERROR) { return true; } // In sequential processing, if any prior task is in error state, we should fail fast - return stream() - .takeWhile(task -> task != targetTask) + // array order is maintained as position() order by the add() method + return tasksView.stream() + .takeWhile(task -> task.position() < candidateTask.position()) .anyMatch(task -> task.status() == BaseTask.TaskStatus.ERROR); } @@ -135,7 +222,7 @@ public String toString() { + ", sequentialProcessing=" + sequentialProcessing + ", size=" - + size() + + tasksView.size() + ", statusCount=" + statusCount() + ", taskType=" @@ -145,29 +232,25 @@ public String toString() { @Override public DataRecorder recordTo(DataRecorder dataRecorder) { - Map statusCount = new HashMap<>(size()); - forEach(task -> statusCount.merge(task.status(), 1, Math::addExact)); - return dataRecorder .append("groupId", groupId) .append("taskType", taskClassName()) .append("sequentialProcessing", sequentialProcessing) - .append("size", size()) + .append("size", tasksView.size()) .append("statusCount", statusCount()) - .append("tasks", List.copyOf(this)); + .append("tasks", List.copyOf(tasksView)); } public Map statusCount() { - Map statusCount = new HashMap<>(size()); - forEach(task -> statusCount.merge(task.status(), 1, Math::addExact)); + Map statusCount = new HashMap<>(tasksView.size()); + tasksView.forEach(task -> statusCount.merge(task.status(), 1, Math::addExact)); return statusCount; } public String taskClassName() { - if (size() == 0) { + if (tasksView.isEmpty()) { return ""; } - var simpleName = get(0).getClass().getSimpleName(); - return simpleName.isBlank() ? get(0).getClass().getName() : simpleName; + return ClassUtils.classSimpleName(tasksView.getFirst().getClass()); } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskOperation.java index b350cb5c95..725078f826 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskOperation.java @@ -128,7 +128,7 @@ Uni> executeInternal( protected Multi startMulti(CommandContext commandContext) { // Common start pattern for all operations - var taskMulti = Multi.createFrom().iterable(taskGroup).onItem(); + var taskMulti = Multi.createFrom().iterable(taskGroup.tasks()).onItem(); if (taskGroup.getSequentialProcessing()) { // Tasks are processed sequentially. If one task fails, subsequent tasks should fail fast @@ -140,12 +140,13 @@ protected Multi startMulti(CommandContext commandContext) { return taskMulti.transformToUniAndConcatenate( task -> { var failFast = taskGroup.shouldFailFast(task); - LOGGER.debug( - "startMulti() - dequeuing task for sequential processing, failFast={}, task={}", - failFast, - task); if (failFast) { + LOGGER.debug( + "startMulti() - dequeuing task for sequential processing, failFast={}, task={}", + failFast, + task); + // Stop processing tasks, but we do not want to return a UniFailure, so we set the // tasks to skipped and do not call execute() on it. task.setSkippedIfReady(); diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskPage.java index 90b6c5f078..c66172a328 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskPage.java @@ -4,7 +4,6 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandResultBuilder; import io.stargate.sgv2.jsonapi.service.cqldriver.executor.SchemaObject; import io.stargate.sgv2.jsonapi.service.operation.Operation; -import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -22,26 +21,25 @@ public abstract class TaskPage, SchemaT extends SchemaObject> implements Supplier { - protected final TaskGroup tasks; + protected final TaskGroup taskGroup; protected final CommandResultBuilder resultBuilder; /** * Create a new page of {@link Task} results. * - * @param tasks The group of tasks that have been run. + * @param taskGroup The group of tasks that have been run. * @param resultBuilder The builder to use to create the {@link CommandResult}, it will have * already being with any specifics for the response structure. */ - protected TaskPage(TaskGroup tasks, CommandResultBuilder resultBuilder) { + protected TaskPage(TaskGroup taskGroup, CommandResultBuilder resultBuilder) { - this.tasks = Objects.requireNonNull(tasks, "tasks cannot be null"); + this.taskGroup = Objects.requireNonNull(taskGroup, "taskGroup cannot be null"); this.resultBuilder = Objects.requireNonNull(resultBuilder, "resultBuilder cannot be null"); } /** Get the {@link CommandResult} that represents the page of tasks that have been run. */ @Override public CommandResult get() { - Collections.sort(tasks); buildCommandResult(); return resultBuilder.build(); } @@ -59,7 +57,7 @@ protected void addTaskErrorsToResult() { // any driver errors on the task will have been through the DriverExceptionHandler and turned // into ApiExceptions - tasks.errorTasks().stream() + taskGroup.errorTasks().stream() .map(Task::failure) .filter(Optional::isPresent) .map(Optional::get) @@ -67,7 +65,7 @@ protected void addTaskErrorsToResult() { } protected void addTaskWarningsToResult() { - tasks.stream() + taskGroup.tasks().stream() .flatMap(task -> task.warningsExcludingSuppressed().stream()) .forEach(resultBuilder::addWarning); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/AlterTypeCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/AlterTypeCommandResolver.java index 983931b991..4c124c5a47 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/AlterTypeCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/AlterTypeCommandResolver.java @@ -96,7 +96,7 @@ public Operation resolveKeyspaceCommand( } // sanity check - if (taskGroup.isEmpty()) { + if (taskGroup.tasks().isEmpty()) { throw SchemaException.Code.MISSING_ALTER_TYPE_OPERATIONS.get(); } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroupTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroupTest.java index 37d5b0064c..9746d72157 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroupTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/tasks/TaskGroupTest.java @@ -137,4 +137,50 @@ public void parallelProcessing_noFailFastWithMultipleErrors() { assertThat(group.shouldFailFast(tasks.get(3))).isFalse(); // d assertThat(group.shouldFailFast(tasks.get(4))).isFalse(); // e } + + @Test + public void addTasksInNonPositionOrder() { + TaskGroup seqGroup = new TaskGroup<>(true); + TaskGroup parGroup = new TaskGroup<>(false); + + seqGroup.add(okTask(3)); + parGroup.add(okTask(3)); + seqGroup.add(okTask(0)); + parGroup.add(okTask(0)); + seqGroup.add(okTask(4)); + parGroup.add(okTask(4)); + seqGroup.add(okTask(1)); + parGroup.add(okTask(1)); + seqGroup.add(okTask(2)); + parGroup.add(okTask(2)); + + int numTasks = 5; + assertThat(seqGroup.tasks()).hasSize(numTasks); + assertThat(parGroup.tasks()).hasSize(numTasks); + + for (int i = 0; i < numTasks; i++) { + assertThat(seqGroup.tasks().get(i).position()) + .as("Task in sequential group at matches position:" + i) + .isEqualTo(i); + assertThat(parGroup.tasks().get(i).position()) + .as("Task in parallel group at matches position:" + i) + .isEqualTo(i); + } + + int pos = 0; + for (Task t : seqGroup.tasks()) { + assertThat(t.position()) + .as("Task in sequential group at matches position:" + pos) + .isEqualTo(pos); + pos++; + } + + pos = 0; + for (Task t : seqGroup.tasks()) { + assertThat(t.position()) + .as("Task in sequential group at matches position:" + pos) + .isEqualTo(pos); + pos++; + } + } }