diff --git a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php index 16ce56703..98aa5c853 100644 --- a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php +++ b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php @@ -93,6 +93,7 @@ public function process( $currentDataSet = null; $currentCount = 0; $workload = []; + $skipped = []; foreach ($mediaFiles as $mediaFile) { if ($currentDataSet === null) { try { @@ -106,6 +107,12 @@ public function process( ->withEntityId($mediaFile['id']) ->build(FetchDataSetMissingLog::class) ); + + $skipped[] = [ + 'id' => $mediaFile['id'], + 'processFailure' => true, + ]; + continue; } @@ -130,7 +137,22 @@ public function process( ); } - \assert($currentDataSet !== null); + if (!empty($skipped)) { + $this->migrationMediaFileRepo->update($skipped, $context); + } + + $skippedCount = \count($skipped); + if ($currentDataSet === null || empty($workload)) { + $this->finalizeProcessStep( + $context, + $migrationContext, + $run, + $progress, + $skippedCount + ); + + return; + } try { $processor = $this->mediaFileProcessorRegistry->getProcessor($migrationContext); @@ -156,15 +178,14 @@ public function process( ); } - $progress->setCurrentEntityProgress($progress->getCurrentEntityProgress() + \count($workload)); - $progress->setProgress($progress->getProgress() + \count($workload)); - $this->updateProgress($run->getId(), $progress, $context); - - if ($this->isAllMediaProcessed($context, $migrationContext->getRunUuid())) { - $this->runTransitionService->transitionToRunStep($migrationContext->getRunUuid(), MigrationStep::CLEANUP); - } - - $this->bus->dispatch(new MigrationProcessMessage($context, $migrationContext->getRunUuid())); + $workloadCount = \count($workload); + $this->finalizeProcessStep( + $context, + $migrationContext, + $run, + $progress, + $workloadCount + $skippedCount + ); } /** @@ -179,8 +200,9 @@ private function getMediaFiles(MigrationContextInterface $migrationContext): arr ->from('swag_migration_media_file') ->where('run_id = :runId') ->andWhere('written = 1') - ->orderBy('entity, file_size') - ->setFirstResult($migrationContext->getOffset()) + ->andWhere('processed = 0') + ->andWhere('process_failure = 0') + ->orderBy('id, file_size, entity') ->setMaxResults($migrationContext->getLimit()) ->setParameter('runId', Uuid::fromHexToBytes($migrationContext->getRunUuid())) ->executeQuery() @@ -240,4 +262,25 @@ private function isAllMediaProcessed(Context $context, string $runId): bool return $unprocessedCount === 0; } + + private function finalizeProcessStep( + Context $context, + MigrationContextInterface $migrationContext, + SwagMigrationRunEntity $run, + MigrationProgress $progress, + int $itemCount, + ): void { + $progress->setCurrentEntityProgress($progress->getCurrentEntityProgress() + $itemCount); + $progress->setProgress($progress->getProgress() + $itemCount); + $this->updateProgress($run->getId(), $progress, $context); + + if ($this->isAllMediaProcessed($context, $migrationContext->getRunUuid())) { + $this->runTransitionService->transitionToRunStep( + $migrationContext->getRunUuid(), + MigrationStep::CLEANUP + ); + } + + $this->bus->dispatch(new MigrationProcessMessage($context, $migrationContext->getRunUuid())); + } } diff --git a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php index a5173b917..77366529e 100644 --- a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php +++ b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php @@ -10,6 +10,7 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Shopware\Core\Framework\Context; use Shopware\Core\Framework\DataAbstractionLayer\EntityCollection; @@ -21,6 +22,7 @@ use Shopware\Core\Test\Stub\MessageBus\CollectingMessageBus; use SwagMigrationAssistant\Exception\MigrationException; use SwagMigrationAssistant\Migration\Connection\SwagMigrationConnectionEntity; +use SwagMigrationAssistant\Migration\Data\SwagMigrationDataCollection; use SwagMigrationAssistant\Migration\DataSelection\DataSet\DataSetRegistry; use SwagMigrationAssistant\Migration\Logging\Log\FetchDataSetMissingLog; use SwagMigrationAssistant\Migration\Logging\Log\FetchProcessorMissingLog; @@ -28,6 +30,7 @@ use SwagMigrationAssistant\Migration\Media\MediaFileProcessorInterface; use SwagMigrationAssistant\Migration\Media\MediaFileProcessorRegistryInterface; use SwagMigrationAssistant\Migration\Media\MediaProcessWorkloadStruct; +use SwagMigrationAssistant\Migration\Media\SwagMigrationMediaFileCollection; use SwagMigrationAssistant\Migration\Media\SwagMigrationMediaFileEntity; use SwagMigrationAssistant\Migration\MessageQueue\Handler\Processor\MediaProcessingProcessor; use SwagMigrationAssistant\Migration\MigrationContext; @@ -36,9 +39,12 @@ use SwagMigrationAssistant\Migration\Run\ProgressDataSet; use SwagMigrationAssistant\Migration\Run\ProgressDataSetCollection; use SwagMigrationAssistant\Migration\Run\RunTransitionServiceInterface; +use SwagMigrationAssistant\Migration\Run\SwagMigrationRunCollection; use SwagMigrationAssistant\Migration\Run\SwagMigrationRunEntity; use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\MediaDataSet; +use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\OrderDocumentDataSet; use SwagMigrationAssistant\Profile\Shopware55\Shopware55Profile; +use Symfony\Component\Messenger\MessageBusInterface; #[Package('fundamentals@after-sales')] class MediaProcessingProcessorTest extends TestCase @@ -108,16 +114,9 @@ protected function setUp(): void $this->dbalConnection = $this->createMock(Connection::class); $this->dbalConnection->method('createQueryBuilder')->willReturn($queryBuilder); - $this->processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $this->createMock(MediaFileProcessorRegistryInterface::class), - $this->createMock(DataSetRegistry::class), + $this->processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, ); } @@ -148,16 +147,9 @@ public function testTransitionsToNextStepIfNoMediaFiles(): void MigrationStep::CLEANUP ); - $this->processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $runTransitionService, - $this->bus, - $this->createMock(LoggingService::class), - $this->createMock(Connection::class), - $this->createMock(MediaFileProcessorRegistryInterface::class), - $this->createMock(DataSetRegistry::class), + $this->processor = $this->createMediaProcessor( + runTransitionService: $runTransitionService, + bus: $this->bus, ); $this->processor->process( @@ -193,16 +185,11 @@ public function testHandlesDataSetNotFoundExceptionGracefully(): void static::isInstanceOf(FetchDataSetMissingLog::class) ); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $logging, - $this->dbalConnection, - $this->createMock(MediaFileProcessorRegistryInterface::class), - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + loggingService: $logging, + dbalConnection: $this->dbalConnection, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -244,16 +231,12 @@ public function testHandlesNoConnectionFoundException(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $logging, - $this->dbalConnection, - $registry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + loggingService: $logging, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $registry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -301,16 +284,11 @@ public function testProcess(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -370,16 +348,11 @@ public function testProcessRetriesUntilNoErrors(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -448,16 +421,13 @@ public function testTransitionsIfAllMediaIsProcessed(): void ) ); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $migrationMediaFileRepository, - $runTransitionService, - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + migrationDataRepo: $migrationMediaFileRepository, + runTransitionService: $runTransitionService, + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -471,4 +441,147 @@ public function testTransitionsIfAllMediaIsProcessed(): void static::assertSame(1, $this->progress->getProgress()); static::assertSame(101, $this->progress->getCurrentEntityProgress()); } + + public function testFetchOnlyUnprocessedDataWithoutOffset(): void + { + $mediaId = Uuid::randomHex(); + $whereCalls = []; + $orderByCalls = []; + + $this->mediaFiles = [ + [ + 'id' => Uuid::randomBytes(), + 'run_id' => Uuid::randomBytes(), + 'media_id' => Uuid::fromHexToBytes($mediaId), + 'entity' => 'order_document', + 'written' => 1, + 'processed' => 0, + 'process_failure' => 0, + 'file_size' => 0, + ], + ]; + + $result = $this->createMock(Result::class); + $result->method('fetchAllAssociative')->willReturn($this->mediaFiles); + + $queryBuilderMock = $this->createMock(QueryBuilder::class); + $queryBuilderMock->expects($this->never())->method('setFirstResult'); + $queryBuilderMock->method('executeQuery')->willReturn($result); + $queryBuilderMock->method('select')->willReturnSelf(); + $queryBuilderMock->method('from')->willReturnSelf(); + $queryBuilderMock->method('where')->willReturnSelf(); + $queryBuilderMock->method('setMaxResults')->willReturnSelf(); + $queryBuilderMock->method('setParameter')->willReturnSelf(); + + $queryBuilderMock->method('andWhere')->willReturnCallback(static function (string $condition) use (&$whereCalls, $queryBuilderMock) { + $whereCalls[] = $condition; + + return $queryBuilderMock; + }); + $queryBuilderMock->method('orderBy')->willReturnCallback(static function (string $sort, ?string $order = null) use (&$orderByCalls, $queryBuilderMock) { + $orderByCalls[] = [$sort, $order]; + + return $queryBuilderMock; + }); + + $dbalConnection = $this->createMock(Connection::class); + $dbalConnection->method('createQueryBuilder')->willReturn($queryBuilderMock); + + $processorMock = $this->createMock(MediaFileProcessorInterface::class); + $processorMock->expects($this->once()) + ->method('process') + ->willReturn([ + new MediaProcessWorkloadStruct( + $mediaId, + $this->runEntity->getId(), + MediaProcessWorkloadStruct::FINISH_STATE, + ), + ]); + + $processorRegistry = $this->createMock(MediaFileProcessorRegistryInterface::class); + $processorRegistry->method('getProcessor')->willReturn($processorMock); + + $dataSetRegistry = $this->createMock(DataSetRegistry::class); + $dataSetRegistry->method('getDataSet')->willReturn(new OrderDocumentDataSet()); + + $migrationMediaFileRepository = $this->createMock(EntityRepository::class); + $migrationMediaFileRepository->method('search')->willReturn( + new EntitySearchResult( + SwagMigrationMediaFileEntity::class, + 1, + new EntityCollection(), + null, + new Criteria(), + Context::createDefaultContext() + ) + ); + + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry, + ); + + $processor->process( + $this->migrationContext, + Context::createDefaultContext(), + $this->runEntity, + $this->progress + ); + + static::assertContains('written = 1', $whereCalls); + static::assertContains('processed = 0', $whereCalls); + static::assertContains('process_failure = 0', $whereCalls); + static::assertSame([['id, file_size, entity', null]], $orderByCalls); + } + + /** + * @param MockObject|EntityRepository|null $migrationRunRepo + * @param MockObject|EntityRepository|null $migrationDataRepo + * @param MockObject|EntityRepository|null $migrationMediaFileRepo + */ + private function createMediaProcessor( + MockObject|EntityRepository|null $migrationRunRepo = null, + MockObject|EntityRepository|null $migrationDataRepo = null, + MockObject|EntityRepository|null $migrationMediaFileRepo = null, + MockObject|RunTransitionServiceInterface|null $runTransitionService = null, + MockObject|MessageBusInterface|null $bus = null, + MockObject|LoggingService|null $loggingService = null, + MockObject|Connection|null $dbalConnection = null, + MockObject|MediaFileProcessorRegistryInterface|null $mediaFileProcessorRegistry = null, + MockObject|DataSetRegistry|null $dataSetRegistry = null, + ): MediaProcessingProcessor { + $migrationRunRepo ??= $this->createMock(EntityRepository::class); + $migrationDataRepo ??= $this->createMock(EntityRepository::class); + $migrationMediaFileRepo ??= $this->createMock(EntityRepository::class); + $runTransitionService ??= $this->createMock(RunTransitionServiceInterface::class); + $bus ??= $this->createMock(MessageBusInterface::class); + $loggingService ??= $this->createMock(LoggingService::class); + $dbalConnection ??= $this->createMock(Connection::class); + $mediaFileProcessorRegistry ??= $this->createMock(MediaFileProcessorRegistryInterface::class); + $dataSetRegistry ??= $this->createMock(DataSetRegistry::class); + + static::assertInstanceOf(EntityRepository::class, $migrationRunRepo); + static::assertInstanceOf(EntityRepository::class, $migrationDataRepo); + static::assertInstanceOf(EntityRepository::class, $migrationMediaFileRepo); + static::assertInstanceOf(RunTransitionServiceInterface::class, $runTransitionService); + static::assertInstanceOf(MessageBusInterface::class, $bus); + static::assertInstanceOf(LoggingService::class, $loggingService); + static::assertInstanceOf(Connection::class, $dbalConnection); + static::assertInstanceOf(MediaFileProcessorRegistryInterface::class, $mediaFileProcessorRegistry); + static::assertInstanceOf(DataSetRegistry::class, $dataSetRegistry); + + return new MediaProcessingProcessor( + $migrationRunRepo, + $migrationDataRepo, + $migrationMediaFileRepo, + $runTransitionService, + $bus, + $loggingService, + $dbalConnection, + $mediaFileProcessorRegistry, + $dataSetRegistry + ); + } }