diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index 9e5190760db..78e944b79e2 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -52,6 +52,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.james.events.EventBus; import org.apache.james.events.EventListener; +import org.apache.james.events.RegistrationKey; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxManager.MessageCapabilities; import org.apache.james.mailbox.MailboxPathLocker; @@ -745,49 +746,47 @@ public Flux copyTo(List sets, StoreMessageManager to } private Mono> copyAll(List sets, StoreMessageManager to, MailboxSession session) { + MessageMoves messageMoves = MessageMoves.builder() + .previousMailboxIds(getMailboxEntity().getMailboxId()) + .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId()) + .build(); + ImmutableSet registrationKeys = messageMoves.impactedMailboxIds() + .map(MailboxIdRegistrationKey::new) + .collect(ImmutableSet.toImmutableSet()); + return Flux.fromIterable(sets) .concatMap(set -> retrieveOriginalRows(set, session)) .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE)) .concatMap(window -> window.collectList() .flatMap(originalRows -> to.copy(originalRows, session).collectList() - .map(copyResult -> Pair.of( - collectMetadata(copyResult.iterator()), - originalRows.stream() + .flatMap(copyResult -> { + SortedMap copiedUids = collectMetadata(copyResult.iterator()); + ImmutableList messageIds = originalRows.stream() .map(org.apache.james.mailbox.store.mail.model.Message::getMessageId) - .collect(ImmutableList.toImmutableList()))))) - .collectList() - .flatMap(allResults -> { - if (allResults.isEmpty()) { - return Mono.just(ImmutableSortedMap.of()); - } - SortedMap allCopiedUids = new TreeMap<>(); - List allMessageIds = new ArrayList<>(); - for (Pair, ImmutableList> result : allResults) { - allCopiedUids.putAll(result.getLeft()); - allMessageIds.addAll(result.getRight()); - } - MessageMoves messageMoves = MessageMoves.builder() - .previousMailboxIds(getMailboxEntity().getMailboxId()) - .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId()) - .build(); - EventBus.EventWithRegistrationKey added = new EventBus.EventWithRegistrationKey( - EventFactory.added() - .randomEventId() - .mailboxSession(session) - .mailbox(to.getMailboxEntity()) - .metaData(allCopiedUids) - .isDelivery(!IS_DELIVERY) - .isAppended(!IS_APPENDED) - .build(), - ImmutableSet.of(new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId()))); - EventBus.EventWithRegistrationKey moved = new EventBus.EventWithRegistrationKey( - EventFactory.moved() - .messageMoves(messageMoves) - .messageId(allMessageIds) - .session(session) - .build(), - messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())); - return Mono.from(eventBus.dispatch(ImmutableList.of(added, moved))).thenReturn(allCopiedUids); + .collect(ImmutableList.toImmutableList()); + EventBus.EventWithRegistrationKey added = new EventBus.EventWithRegistrationKey( + EventFactory.added() + .randomEventId() + .mailboxSession(session) + .mailbox(to.getMailboxEntity()) + .metaData(copiedUids) + .isDelivery(!IS_DELIVERY) + .isAppended(!IS_APPENDED) + .build(), + ImmutableSet.of(new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId()))); + EventBus.EventWithRegistrationKey moved = new EventBus.EventWithRegistrationKey( + EventFactory.moved() + .messageMoves(messageMoves) + .messageId(messageIds) + .session(session) + .build(), + registrationKeys); + return Mono.from(eventBus.dispatch(ImmutableList.of(added, moved))) + .thenReturn(copiedUids); + }))) + .reduce(new TreeMap<>(), (acc, batchUids) -> { + acc.putAll(batchUids); + return acc; }); } @@ -834,63 +833,55 @@ public Flux moveTo(List sets, StoreMessageManager to } private Mono> moveAll(List sets, StoreMessageManager to, MailboxSession session) { + MessageMoves messageMoves = MessageMoves.builder() + .previousMailboxIds(getMailboxEntity().getMailboxId()) + .targetMailboxIds(to.getMailboxEntity().getMailboxId()) + .build(); + ImmutableSet registrationKeys = messageMoves.impactedMailboxIds() + .map(MailboxIdRegistrationKey::new) + .collect(ImmutableSet.toImmutableSet()); + return Flux.fromIterable(sets) .concatMap(set -> retrieveOriginalRows(set, session)) .window(batchSizes.getCopyBatchSize().orElse(Integer.MAX_VALUE)) .concatMap(window -> window .collectList() .flatMap(originalRows -> to.move(originalRows, session) - .map(moveResult -> Pair.of(moveResult, originalRows)))) - .collectList() - .flatMap(allResults -> { - if (allResults.isEmpty()) { - return Mono.just(ImmutableSortedMap.of()); - } - - SortedMap allMoveUids = new TreeMap<>(); - List allOriginalMessages = new ArrayList<>(); - List allMessageIds = new ArrayList<>(); - - for (Pair> result : allResults) { - allMoveUids.putAll(collectMetadata(result.getLeft().getMovedMessages().iterator())); - allOriginalMessages.addAll(result.getLeft().getOriginalMessages()); - result.getRight().stream() - .map(org.apache.james.mailbox.store.mail.model.Message::getMessageId) - .forEach(allMessageIds::add); - } - - MessageMoves messageMoves = MessageMoves.builder() - .previousMailboxIds(getMailboxEntity().getMailboxId()) - .targetMailboxIds(to.getMailboxEntity().getMailboxId()) - .build(); - - EventBus.EventWithRegistrationKey added = new EventBus.EventWithRegistrationKey(EventFactory.added() - .randomEventId() - .mailboxSession(session) - .mailbox(to.getMailboxEntity()) - .metaData(allMoveUids) - .isDelivery(!IS_DELIVERY) - .isAppended(!IS_APPENDED) - .movedFrom(getId()) - .build(), - ImmutableSet.of(new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId()))); - EventBus.EventWithRegistrationKey expunged = new EventBus.EventWithRegistrationKey(EventFactory.expunged() - .randomEventId() - .mailboxSession(session) - .mailbox(getMailboxEntity()) - .addMetaData(allOriginalMessages) - .movedTo(to.getId()) - .build(), - ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId()))); - EventBus.EventWithRegistrationKey moved = new EventBus.EventWithRegistrationKey(EventFactory.moved() - .messageMoves(messageMoves) - .messageId(allMessageIds) - .session(session) - .build(), - messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(ImmutableSet.toImmutableSet())); - - return Mono.from(eventBus.dispatch(ImmutableList.of(added, expunged, moved))) - .thenReturn(allMoveUids); + .flatMap(moveResult -> { + SortedMap moveUids = collectMetadata(moveResult.getMovedMessages().iterator()); + ImmutableList messageIds = originalRows.stream() + .map(org.apache.james.mailbox.store.mail.model.Message::getMessageId) + .collect(ImmutableList.toImmutableList()); + EventBus.EventWithRegistrationKey added = new EventBus.EventWithRegistrationKey(EventFactory.added() + .randomEventId() + .mailboxSession(session) + .mailbox(to.getMailboxEntity()) + .metaData(moveUids) + .isDelivery(!IS_DELIVERY) + .isAppended(!IS_APPENDED) + .movedFrom(getId()) + .build(), + ImmutableSet.of(new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId()))); + EventBus.EventWithRegistrationKey expunged = new EventBus.EventWithRegistrationKey(EventFactory.expunged() + .randomEventId() + .mailboxSession(session) + .mailbox(getMailboxEntity()) + .addMetaData(moveResult.getOriginalMessages()) + .movedTo(to.getId()) + .build(), + ImmutableSet.of(new MailboxIdRegistrationKey(mailbox.getMailboxId()))); + EventBus.EventWithRegistrationKey moved = new EventBus.EventWithRegistrationKey(EventFactory.moved() + .messageMoves(messageMoves) + .messageId(messageIds) + .session(session) + .build(), + registrationKeys); + return Mono.from(eventBus.dispatch(ImmutableList.of(added, expunged, moved))) + .thenReturn(moveUids); + }))) + .reduce(new TreeMap<>(), (acc, batchUids) -> { + acc.putAll(batchUids); + return acc; }); } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/RestoreServiceTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/RestoreServiceTest.java index 5eab10a74e4..121d78d75d7 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/RestoreServiceTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/RestoreServiceTest.java @@ -24,6 +24,8 @@ import static org.apache.james.webadmin.service.ExportServiceTestSystem.CEDRIC; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS; +import static org.awaitility.Durations.ONE_MINUTE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; @@ -49,6 +51,8 @@ import org.apache.james.mailbox.model.MessageResultIterator; import org.apache.james.task.Task; import org.assertj.core.api.SoftAssertions; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -65,6 +69,10 @@ class RestoreServiceTest { "Content-Type: text/plain; charset=UTF-8\r\n" + "\r\n" + "testmail"; + private static final ConditionFactory AWAIT = Awaitility.await() + .atMost(ONE_MINUTE) + .with() + .pollInterval(FIVE_HUNDRED_MILLISECONDS); private RestoreService testee; private ExportServiceTestSystem testSystem; @@ -300,8 +308,9 @@ void restoreShouldDeleteBlobAfterCompletion() throws Exception { testee.restore(CEDRIC, blobId).block(); - assertThatThrownBy(() -> testSystem.blobStore.read(testSystem.blobStore.getDefaultBucketName(), blobId)) - .isInstanceOf(ObjectNotFoundException.class); + AWAIT.untilAsserted(() -> + assertThatThrownBy(() -> testSystem.blobStore.read(testSystem.blobStore.getDefaultBucketName(), blobId)) + .isInstanceOf(ObjectNotFoundException.class)); } private ComposedMessageId createAMailboxWithAMail() throws MailboxException {