From bb73e44743f615d6e4e542f64ccaae71c7805afc Mon Sep 17 00:00:00 2001 From: Dennis Garding Date: Thu, 2 Apr 2026 15:41:04 +0200 Subject: [PATCH 1/3] fix: MediaProcessingProcessor --- .../Processor/MediaProcessingProcessor.php | 68 ++++- .../MediaProcessingProcessorTest.php | 235 ++++++++++++------ 2 files changed, 221 insertions(+), 82 deletions(-) diff --git a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php index 16ce56703..6ab299c5f 100644 --- a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php +++ b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php @@ -93,6 +93,7 @@ public function process( $currentDataSet = null; $currentCount = 0; $workload = []; + $skipped = []; foreach ($mediaFiles as $mediaFile) { if ($currentDataSet === null) { try { @@ -106,6 +107,12 @@ public function process( ->withEntityId($mediaFile['id']) ->build(FetchDataSetMissingLog::class) ); + + $skipped[] = [ + 'id' => $mediaFile['id'], + 'processFailure' => true, + ]; + continue; } @@ -130,7 +137,23 @@ public function process( ); } - \assert($currentDataSet !== null); + if (!empty($skipped)) { + $this->migrationMediaFileRepo->update($skipped, $context); + } + + + $skippedCount = count($skipped); + if ($currentDataSet === null || empty($workload)) { + $this->finalizeProcessStep( + $context, + $migrationContext, + $run, + $progress, + $skippedCount + ); + + return; + } try { $processor = $this->mediaFileProcessorRegistry->getProcessor($migrationContext); @@ -156,15 +179,14 @@ public function process( ); } - $progress->setCurrentEntityProgress($progress->getCurrentEntityProgress() + \count($workload)); - $progress->setProgress($progress->getProgress() + \count($workload)); - $this->updateProgress($run->getId(), $progress, $context); - - if ($this->isAllMediaProcessed($context, $migrationContext->getRunUuid())) { - $this->runTransitionService->transitionToRunStep($migrationContext->getRunUuid(), MigrationStep::CLEANUP); - } - - $this->bus->dispatch(new MigrationProcessMessage($context, $migrationContext->getRunUuid())); + $workloadCount = count($workload); + $this->finalizeProcessStep( + $context, + $migrationContext, + $run, + $progress, + $workloadCount + $skippedCount + ); } /** @@ -179,8 +201,9 @@ private function getMediaFiles(MigrationContextInterface $migrationContext): arr ->from('swag_migration_media_file') ->where('run_id = :runId') ->andWhere('written = 1') - ->orderBy('entity, file_size') - ->setFirstResult($migrationContext->getOffset()) + ->andWhere('processed = 0') + ->andWhere('process_failure = 0') + ->orderBy('id, file_size, entity') ->setMaxResults($migrationContext->getLimit()) ->setParameter('runId', Uuid::fromHexToBytes($migrationContext->getRunUuid())) ->executeQuery() @@ -240,4 +263,25 @@ private function isAllMediaProcessed(Context $context, string $runId): bool return $unprocessedCount === 0; } + + private function finalizeProcessStep( + Context $context, + MigrationContextInterface $migrationContext, + SwagMigrationRunEntity $run, + MigrationProgress $progress, + int $itemCount, + ): void { + $progress->setCurrentEntityProgress($progress->getCurrentEntityProgress() + $itemCount); + $progress->setProgress($progress->getProgress() + $itemCount); + $this->updateProgress($run->getId(), $progress, $context); + + if ($this->isAllMediaProcessed($context, $migrationContext->getRunUuid())) { + $this->runTransitionService->transitionToRunStep( + $migrationContext->getRunUuid(), + MigrationStep::CLEANUP + ); + } + + $this->bus->dispatch(new MigrationProcessMessage($context, $migrationContext->getRunUuid())); + } } diff --git a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php index a5173b917..e392b79f9 100644 --- a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php +++ b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php @@ -10,6 +10,7 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; +use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Shopware\Core\Framework\Context; use Shopware\Core\Framework\DataAbstractionLayer\EntityCollection; @@ -38,7 +39,9 @@ use SwagMigrationAssistant\Migration\Run\RunTransitionServiceInterface; use SwagMigrationAssistant\Migration\Run\SwagMigrationRunEntity; use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\MediaDataSet; +use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\OrderDocumentDataSet; use SwagMigrationAssistant\Profile\Shopware55\Shopware55Profile; +use Symfony\Component\Messenger\MessageBusInterface; #[Package('fundamentals@after-sales')] class MediaProcessingProcessorTest extends TestCase @@ -108,16 +111,9 @@ protected function setUp(): void $this->dbalConnection = $this->createMock(Connection::class); $this->dbalConnection->method('createQueryBuilder')->willReturn($queryBuilder); - $this->processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $this->createMock(MediaFileProcessorRegistryInterface::class), - $this->createMock(DataSetRegistry::class), + $this->processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, ); } @@ -148,16 +144,9 @@ public function testTransitionsToNextStepIfNoMediaFiles(): void MigrationStep::CLEANUP ); - $this->processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $runTransitionService, - $this->bus, - $this->createMock(LoggingService::class), - $this->createMock(Connection::class), - $this->createMock(MediaFileProcessorRegistryInterface::class), - $this->createMock(DataSetRegistry::class), + $this->processor = $this->createMediaProcessor( + runTransitionService: $runTransitionService, + bus: $this->bus, ); $this->processor->process( @@ -193,16 +182,11 @@ public function testHandlesDataSetNotFoundExceptionGracefully(): void static::isInstanceOf(FetchDataSetMissingLog::class) ); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $logging, - $this->dbalConnection, - $this->createMock(MediaFileProcessorRegistryInterface::class), - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + loggingService: $logging, + dbalConnection: $this->dbalConnection, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -244,16 +228,12 @@ public function testHandlesNoConnectionFoundException(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $logging, - $this->dbalConnection, - $registry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + loggingService: $logging, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $registry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -301,16 +281,11 @@ public function testProcess(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -370,16 +345,11 @@ public function testProcessRetriesUntilNoErrors(): void $dataSetRegistry = $this->createMock(DataSetRegistry::class); $dataSetRegistry->method('getDataSet')->willReturn(new MediaDataSet()); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $this->createMock(RunTransitionServiceInterface::class), - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -448,16 +418,13 @@ public function testTransitionsIfAllMediaIsProcessed(): void ) ); - $processor = new MediaProcessingProcessor( - $this->createMock(EntityRepository::class), - $this->createMock(EntityRepository::class), - $migrationMediaFileRepository, - $runTransitionService, - $this->bus, - $this->createMock(LoggingService::class), - $this->dbalConnection, - $processorRegistry, - $dataSetRegistry + $processor = $this->createMediaProcessor( + migrationDataRepo: $migrationMediaFileRepository, + runTransitionService: $runTransitionService, + bus: $this->bus, + dbalConnection: $this->dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry ); $processor->process( @@ -471,4 +438,132 @@ public function testTransitionsIfAllMediaIsProcessed(): void static::assertSame(1, $this->progress->getProgress()); static::assertSame(101, $this->progress->getCurrentEntityProgress()); } + + public function testFetchOnlyUnprocessedDataWithoutOffset(): void + { + $mediaId = Uuid::randomHex(); + $whereCalls = []; + $orderByCalls = []; + + $this->mediaFiles = [ + [ + 'id' => Uuid::randomBytes(), + 'run_id' => Uuid::randomBytes(), + 'media_id' => Uuid::fromHexToBytes($mediaId), + 'entity' => 'order_document', + 'written' => 1, + 'processed' => 0, + 'process_failure' => 0, + 'file_size' => 0, + ], + ]; + + $result = $this->createMock(Result::class); + $result->method('fetchAllAssociative')->willReturn($this->mediaFiles); + + $queryBuilderMock = $this->createMock(QueryBuilder::class); + $queryBuilderMock->expects($this->never())->method('setFirstResult'); + $queryBuilderMock->method('executeQuery')->willReturn($result); + $queryBuilderMock->method('select')->willReturnSelf(); + $queryBuilderMock->method('from')->willReturnSelf(); + $queryBuilderMock->method('where')->willReturnSelf(); + $queryBuilderMock->method('setMaxResults')->willReturnSelf(); + $queryBuilderMock->method('setParameter')->willReturnSelf(); + + $queryBuilderMock->method('andWhere')->willReturnCallback(function (string $condition) use (&$whereCalls, $queryBuilderMock) { + $whereCalls[] = $condition; + + return $queryBuilderMock; + }); + $queryBuilderMock->method('orderBy')->willReturnCallback(function (string $sort, ?string $order = null) use (&$orderByCalls, $queryBuilderMock) { + $orderByCalls[] = [$sort, $order]; + + return $queryBuilderMock; + }); + + $dbalConnection = $this->createMock(Connection::class); + $dbalConnection->method('createQueryBuilder')->willReturn($queryBuilderMock); + + $processorMock = $this->createMock(MediaFileProcessorInterface::class); + $processorMock->expects($this->once()) + ->method('process') + ->willReturn([ + new MediaProcessWorkloadStruct( + $mediaId, + $this->runEntity->getId(), + MediaProcessWorkloadStruct::FINISH_STATE, + ), + ]); + + $processorRegistry = $this->createMock(MediaFileProcessorRegistryInterface::class); + $processorRegistry->method('getProcessor')->willReturn($processorMock); + + $dataSetRegistry = $this->createMock(DataSetRegistry::class); + $dataSetRegistry->method('getDataSet')->willReturn(new OrderDocumentDataSet()); + + $migrationMediaFileRepository = $this->createMock(EntityRepository::class); + $migrationMediaFileRepository->method('search')->willReturn( + new EntitySearchResult( + SwagMigrationMediaFileEntity::class, + 1, + new EntityCollection(), + null, + new Criteria(), + Context::createDefaultContext() + ) + ); + + $processor = $this->createMediaProcessor( + bus: $this->bus, + dbalConnection: $dbalConnection, + mediaFileProcessorRegistry: $processorRegistry, + dataSetRegistry: $dataSetRegistry, + ); + + $processor->process( + $this->migrationContext, + Context::createDefaultContext(), + $this->runEntity, + $this->progress + ); + + static::assertContains('written = 1', $whereCalls); + static::assertContains('processed = 0', $whereCalls); + static::assertContains('process_failure = 0', $whereCalls); + static::assertSame([['id, file_size, entity', null]], $orderByCalls); + } + + private function createMediaProcessor( + MockObject|EntityRepository|null $migrationRunRepo = null, + MockObject|EntityRepository|null $migrationDataRepo = null, + MockObject|EntityRepository|null $migrationMediaFileRepo = null, + MockObject|RunTransitionServiceInterface|null $runTransitionService = null, + MockObject|MessageBusInterface|null $bus = null, + MockObject|LoggingService|null $loggingService = null, + MockObject|Connection|null $dbalConnection = null, + MockObject|MediaFileProcessorRegistryInterface|null $mediaFileProcessorRegistry = null, + MockObject|DataSetRegistry|null $dataSetRegistry = null, + ) { + $migrationRunRepo ??= $this->createMock(EntityRepository::class); + $migrationDataRepo ??= $this->createMock(EntityRepository::class); + $migrationMediaFileRepo ??= $this->createMock(EntityRepository::class); + $runTransitionService ??= $this->createMock(RunTransitionServiceInterface::class); + $bus ??= $this->createMock(MessageBusInterface::class); + $loggingService ??= $this->createMock(LoggingService::class); + $dbalConnection ??= $this->createMock(Connection::class); + $mediaFileProcessorRegistry ??= $this->createMock(MediaFileProcessorRegistryInterface::class); + $dataSetRegistry ??= $this->createMock(DataSetRegistry::class); + + return new MediaProcessingProcessor( + $migrationRunRepo, + $migrationDataRepo, + $migrationMediaFileRepo, + $runTransitionService, + $bus, + $loggingService, + $dbalConnection, + $mediaFileProcessorRegistry, + $dataSetRegistry + ); + } } From a6fa9f5bd911a49b7529d68d4f06a3e85355f5d8 Mon Sep 17 00:00:00 2001 From: Dennis Garding Date: Thu, 2 Apr 2026 15:52:02 +0200 Subject: [PATCH 2/3] fix cs --- .../Handler/Processor/MediaProcessingProcessor.php | 5 ++--- .../Handler/Processor/MediaProcessingProcessorTest.php | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php index 6ab299c5f..98aa5c853 100644 --- a/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php +++ b/src/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessor.php @@ -141,8 +141,7 @@ public function process( $this->migrationMediaFileRepo->update($skipped, $context); } - - $skippedCount = count($skipped); + $skippedCount = \count($skipped); if ($currentDataSet === null || empty($workload)) { $this->finalizeProcessStep( $context, @@ -179,7 +178,7 @@ public function process( ); } - $workloadCount = count($workload); + $workloadCount = \count($workload); $this->finalizeProcessStep( $context, $migrationContext, diff --git a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php index e392b79f9..22d41aaef 100644 --- a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php +++ b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php @@ -470,12 +470,12 @@ public function testFetchOnlyUnprocessedDataWithoutOffset(): void $queryBuilderMock->method('setMaxResults')->willReturnSelf(); $queryBuilderMock->method('setParameter')->willReturnSelf(); - $queryBuilderMock->method('andWhere')->willReturnCallback(function (string $condition) use (&$whereCalls, $queryBuilderMock) { + $queryBuilderMock->method('andWhere')->willReturnCallback(static function (string $condition) use (&$whereCalls, $queryBuilderMock) { $whereCalls[] = $condition; return $queryBuilderMock; }); - $queryBuilderMock->method('orderBy')->willReturnCallback(function (string $sort, ?string $order = null) use (&$orderByCalls, $queryBuilderMock) { + $queryBuilderMock->method('orderBy')->willReturnCallback(static function (string $sort, ?string $order = null) use (&$orderByCalls, $queryBuilderMock) { $orderByCalls[] = [$sort, $order]; return $queryBuilderMock; From 604d31f8c6b36dd41ff705f9d10c50a35756c727 Mon Sep 17 00:00:00 2001 From: Dennis Garding Date: Thu, 2 Apr 2026 16:09:42 +0200 Subject: [PATCH 3/3] fix phpstan --- .../MediaProcessingProcessorTest.php | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php index 22d41aaef..77366529e 100644 --- a/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php +++ b/tests/Migration/MessageQueue/Handler/Processor/MediaProcessingProcessorTest.php @@ -22,6 +22,7 @@ use Shopware\Core\Test\Stub\MessageBus\CollectingMessageBus; use SwagMigrationAssistant\Exception\MigrationException; use SwagMigrationAssistant\Migration\Connection\SwagMigrationConnectionEntity; +use SwagMigrationAssistant\Migration\Data\SwagMigrationDataCollection; use SwagMigrationAssistant\Migration\DataSelection\DataSet\DataSetRegistry; use SwagMigrationAssistant\Migration\Logging\Log\FetchDataSetMissingLog; use SwagMigrationAssistant\Migration\Logging\Log\FetchProcessorMissingLog; @@ -29,6 +30,7 @@ use SwagMigrationAssistant\Migration\Media\MediaFileProcessorInterface; use SwagMigrationAssistant\Migration\Media\MediaFileProcessorRegistryInterface; use SwagMigrationAssistant\Migration\Media\MediaProcessWorkloadStruct; +use SwagMigrationAssistant\Migration\Media\SwagMigrationMediaFileCollection; use SwagMigrationAssistant\Migration\Media\SwagMigrationMediaFileEntity; use SwagMigrationAssistant\Migration\MessageQueue\Handler\Processor\MediaProcessingProcessor; use SwagMigrationAssistant\Migration\MigrationContext; @@ -37,6 +39,7 @@ use SwagMigrationAssistant\Migration\Run\ProgressDataSet; use SwagMigrationAssistant\Migration\Run\ProgressDataSetCollection; use SwagMigrationAssistant\Migration\Run\RunTransitionServiceInterface; +use SwagMigrationAssistant\Migration\Run\SwagMigrationRunCollection; use SwagMigrationAssistant\Migration\Run\SwagMigrationRunEntity; use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\MediaDataSet; use SwagMigrationAssistant\Profile\Shopware\DataSelection\DataSet\OrderDocumentDataSet; @@ -533,6 +536,11 @@ public function testFetchOnlyUnprocessedDataWithoutOffset(): void static::assertSame([['id, file_size, entity', null]], $orderByCalls); } + /** + * @param MockObject|EntityRepository|null $migrationRunRepo + * @param MockObject|EntityRepository|null $migrationDataRepo + * @param MockObject|EntityRepository|null $migrationMediaFileRepo + */ private function createMediaProcessor( MockObject|EntityRepository|null $migrationRunRepo = null, MockObject|EntityRepository|null $migrationDataRepo = null, @@ -543,7 +551,7 @@ private function createMediaProcessor( MockObject|Connection|null $dbalConnection = null, MockObject|MediaFileProcessorRegistryInterface|null $mediaFileProcessorRegistry = null, MockObject|DataSetRegistry|null $dataSetRegistry = null, - ) { + ): MediaProcessingProcessor { $migrationRunRepo ??= $this->createMock(EntityRepository::class); $migrationDataRepo ??= $this->createMock(EntityRepository::class); $migrationMediaFileRepo ??= $this->createMock(EntityRepository::class); @@ -554,6 +562,16 @@ private function createMediaProcessor( $mediaFileProcessorRegistry ??= $this->createMock(MediaFileProcessorRegistryInterface::class); $dataSetRegistry ??= $this->createMock(DataSetRegistry::class); + static::assertInstanceOf(EntityRepository::class, $migrationRunRepo); + static::assertInstanceOf(EntityRepository::class, $migrationDataRepo); + static::assertInstanceOf(EntityRepository::class, $migrationMediaFileRepo); + static::assertInstanceOf(RunTransitionServiceInterface::class, $runTransitionService); + static::assertInstanceOf(MessageBusInterface::class, $bus); + static::assertInstanceOf(LoggingService::class, $loggingService); + static::assertInstanceOf(Connection::class, $dbalConnection); + static::assertInstanceOf(MediaFileProcessorRegistryInterface::class, $mediaFileProcessorRegistry); + static::assertInstanceOf(DataSetRegistry::class, $dataSetRegistry); + return new MediaProcessingProcessor( $migrationRunRepo, $migrationDataRepo,