From 785a86c2a608cc70800073f9a39440d3de39abc2 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 10 Feb 2026 11:12:16 +0100 Subject: [PATCH 1/2] feat: partitioned multiple stream support --- .../Projecting/BackfillExecutorHandler.php | 2 +- .../Projecting/Config/ProjectingModule.php | 2 +- .../InMemoryEventStoreStreamSource.php | 22 +- .../InMemoryProjectionStateStorage.php | 26 +- .../InMemory/InMemoryStreamSource.php | 2 +- .../src/Projecting/ProjectingManager.php | 31 +- .../Projecting/ProjectionPartitionState.php | 7 +- .../src/Projecting/ProjectionStateStorage.php | 4 +- .../Ecotone/src/Projecting/StreamSource.php | 2 +- .../InMemoryEventStoreRegistrationTest.php | 2 +- .../EventStreamingProjectionTest.php | 2 +- .../tests/Projecting/ProjectingTest.php | 182 ++++++++++-- .../Database/ProjectionStateTableManager.php | 6 +- .../DbalProjectionStateStorage.php | 38 +-- .../EventStoreAggregateStreamSource.php | 20 +- .../EventStoreGlobalStreamSource.php | 2 +- spec.md | 267 ++++++++++++++++++ 17 files changed, 519 insertions(+), 98 deletions(-) create mode 100644 spec.md diff --git a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php index 5e2385ec4..421ecf672 100644 --- a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php +++ b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php @@ -46,7 +46,7 @@ public function executeBackfillBatch( $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { - $projectingManager->execute($partition, true); + $projectingManager->execute($partition, $streamName, true); if ($this->terminationListener->shouldTerminate()) { break; } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 74131856f..c99d7b991 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -100,7 +100,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ->chainInterceptedProcessor( MethodInvokerBuilder::create( $projectingManagerReference, - InterfaceToCallReference::create(ProjectingManager::class, 'execute'), + InterfaceToCallReference::create(ProjectingManager::class, 'executeAllStreams'), [ $projectionBuilder->partitionHeader() ? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader()) diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php index dc4c7d5e2..d2d277ef6 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryEventStoreStreamSource.php @@ -41,15 +41,12 @@ public function canHandle(string $projectionName): bool return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true); } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { - // Position is 0-based index into the global event array (like InMemoryStreamSource) $from = $lastPosition !== null ? (int) $lastPosition : 0; - // Determine which streams to read from - $streams = $this->getStreamsToRead(); + $streams = $this->getStreamsToRead($streamName); - // Collect all events from all streams $allEvents = []; foreach ($streams as $stream) { if (! $this->eventStore->hasStream($stream)) { @@ -62,36 +59,37 @@ public function load(string $projectionName, ?string $lastPosition, int $count, ->withMetadataMatch($this->partitionHeader, Operator::EQUALS, $partitionKey); } - // Filter by event names if specified (optimization for partitioned projections) if ($this->eventNames !== []) { $metadataMatcher = $metadataMatcher ->withMetadataMatch('event_name', Operator::IN, $this->eventNames, FieldType::MESSAGE_PROPERTY); } - // Load all events from this stream (starting from position 1) $events = $this->eventStore->load($stream, 1, null, $metadataMatcher); $allEvents = array_merge($allEvents, is_array($events) ? $events : iterator_to_array($events)); } - // Slice based on global position $events = array_slice($allEvents, $from, $count); $to = $from + count($events); return new StreamPage($events, (string) $to); } - private function getStreamsToRead(): array + /** + * @return array + */ + private function getStreamsToRead(string $streamName): array { + if ($streamName !== '') { + return [$streamName]; + } + if ($this->streamName !== null) { return [$this->streamName]; } - // Read from all streams (global stream) $reflection = new ReflectionProperty($this->eventStore, 'streams'); - $reflection->setAccessible(true); $allStreams = array_keys($reflection->getValue($this->eventStore)); - // Filter out internal streams (starting with $) return array_filter($allStreams, fn ($stream) => ! str_starts_with($stream, '$')); } } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php index 4441ee208..df5302c29 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjectionStateStorage.php @@ -35,18 +35,18 @@ public function canHandle(string $projectionName): bool return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true); } - public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState + public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState { - $key = $this->getKey($projectionName, $partitionKey); + $key = $this->getKey($projectionName, $partitionKey, $streamName); return $this->projectionStates[$key] ?? null; } - public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState + public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState { - $key = $this->getKey($projectionName, $partitionKey); + $key = $this->getKey($projectionName, $partitionKey, $streamName); if (! isset($this->projectionStates[$key])) { - $this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED); + $this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED); return $this->projectionStates[$key]; } @@ -55,21 +55,25 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu public function savePartition(ProjectionPartitionState $projectionState): void { - $key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey); + $key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey, $projectionState->streamName); $this->projectionStates[$key] = $projectionState; } - private function getKey(string $projectionName, ?string $partitionKey): string + private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string { - if ($partitionKey === null) { - return $projectionName; + $key = $projectionName; + if ($streamName !== '') { + $key .= '::' . $streamName; } - return $projectionName . '-' . $partitionKey; + if ($partitionKey !== null) { + $key .= '-' . $partitionKey; + } + return $key; } public function delete(string $projectionName): void { - $projectionStartKey = $this->getKey($projectionName, null); + $projectionStartKey = $projectionName; foreach ($this->projectionStates as $key => $value) { if (str_starts_with($key, $projectionStartKey)) { unset($this->projectionStates[$key]); diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php index 9c869d28f..2b6bdf1bb 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryStreamSource.php @@ -39,7 +39,7 @@ public function append(Event ...$events): void } } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 93eb76d6b..f8093b5ba 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -48,28 +48,36 @@ private function getProjectionStateStorage(): ProjectionStateStorage return $this->projectionStateStorage; } - public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void + public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void { do { - $processedEvents = $this->executeSingleBatch($partitionKeyValue, $manualInitialization || $this->automaticInitialization); + $processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization); } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); } + public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void + { + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + foreach ($streamFilters as $streamFilter) { + $this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization); + } + } + /** * @return int Number of processed events */ - private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int + private function executeSingleBatch(?string $partitionKeyValue, string $streamName, bool $canInitialize): int { $transaction = $this->getProjectionStateStorage()->beginTransaction(); try { - $projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize); + $projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $streamName, $canInitialize); if ($projectionState === null) { $transaction->commit(); return 0; } $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); - $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); + $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue, $streamName); $userState = $projectionState->userState; $processedEvents = 0; @@ -86,7 +94,6 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial ->withUserState($userState); if ($processedEvents === 0 && $canInitialize) { - // If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized $projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED); } @@ -99,9 +106,9 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial } } - public function loadState(?string $partitionKey = null): ProjectionPartitionState + public function loadState(?string $partitionKey, string $streamName): ProjectionPartitionState { - return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey); + return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey, $streamName); } public function getPartitionProvider(): PartitionProvider @@ -193,10 +200,10 @@ public function backfill(): void $this->prepareBackfill(); } - private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState + private function loadOrInitializePartitionState(?string $partitionKey, string $streamName, bool $canInitialize): ?ProjectionPartitionState { $storage = $this->getProjectionStateStorage(); - $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); + $projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName); if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) { return null; @@ -206,11 +213,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can } if ($canInitialize) { - $projectionState = $storage->initPartition($this->projectionName, $partitionKey); + $projectionState = $storage->initPartition($this->projectionName, $partitionKey, $streamName); if ($projectionState) { $this->projectorExecutor->init(); } else { - $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); + $projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName); } return $projectionState; } diff --git a/packages/Ecotone/src/Projecting/ProjectionPartitionState.php b/packages/Ecotone/src/Projecting/ProjectionPartitionState.php index fa606c94c..eb38a5acf 100644 --- a/packages/Ecotone/src/Projecting/ProjectionPartitionState.php +++ b/packages/Ecotone/src/Projecting/ProjectionPartitionState.php @@ -12,6 +12,7 @@ class ProjectionPartitionState public function __construct( public readonly string $projectionName, public readonly ?string $partitionKey, + public readonly string $streamName, public readonly ?string $lastPosition = null, public readonly mixed $userState = null, public readonly ?ProjectionInitializationStatus $status = null, @@ -20,16 +21,16 @@ public function __construct( public function withLastPosition(string $lastPosition): self { - return new self($this->projectionName, $this->partitionKey, $lastPosition, $this->userState, $this->status); + return new self($this->projectionName, $this->partitionKey, $this->streamName, $lastPosition, $this->userState, $this->status); } public function withUserState(mixed $userState): self { - return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $userState, $this->status); + return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $userState, $this->status); } public function withStatus(ProjectionInitializationStatus $status): self { - return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $this->userState, $status); + return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $this->userState, $status); } } diff --git a/packages/Ecotone/src/Projecting/ProjectionStateStorage.php b/packages/Ecotone/src/Projecting/ProjectionStateStorage.php index a1bbcf4eb..f9c7ee65b 100644 --- a/packages/Ecotone/src/Projecting/ProjectionStateStorage.php +++ b/packages/Ecotone/src/Projecting/ProjectionStateStorage.php @@ -10,8 +10,8 @@ interface ProjectionStateStorage { public function canHandle(string $projectionName): bool; - public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState; - public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState; + public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState; + public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState; public function savePartition(ProjectionPartitionState $projectionState): void; public function delete(string $projectionName): void; public function init(string $projectionName): void; diff --git a/packages/Ecotone/src/Projecting/StreamSource.php b/packages/Ecotone/src/Projecting/StreamSource.php index 3d78189a7..7d0900989 100644 --- a/packages/Ecotone/src/Projecting/StreamSource.php +++ b/packages/Ecotone/src/Projecting/StreamSource.php @@ -11,5 +11,5 @@ interface StreamSource { public function canHandle(string $projectionName): bool; - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage; + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage; } diff --git a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php index 04898a315..315b1f309 100644 --- a/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php +++ b/packages/Ecotone/tests/Lite/InMemoryEventStoreRegistrationTest.php @@ -110,7 +110,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; $events = array_slice($this->events, $from, $count); diff --git a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php index 2703e841a..597aa8702 100644 --- a/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php +++ b/packages/Ecotone/tests/Projecting/EventStreamingProjectionTest.php @@ -294,7 +294,7 @@ public function test_event_driven_projection_combined_with_event_streaming_proje $positionTracker = new InMemoryConsumerPositionTracker(); // Given an event-driven projection (catches up from stream when triggered) - $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('test_stream')] class { + $eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('product_stream')] class { public int $productCount = 0; #[EventHandler] diff --git a/packages/Ecotone/tests/Projecting/ProjectingTest.php b/packages/Ecotone/tests/Projecting/ProjectingTest.php index a6fbf6c07..ac2962b31 100644 --- a/packages/Ecotone/tests/Projecting/ProjectingTest.php +++ b/packages/Ecotone/tests/Projecting/ProjectingTest.php @@ -66,7 +66,7 @@ public function handle(array $event): void ->addExtensionObject(SimpleMessageChannelBuilder::createQueueChannel('async')) ); - $ecotone->withEvents([Event::createWithType('test-event', ['name' => 'Test'])]); + $ecotone->withEventStream('test_stream', [Event::createWithType('test-event', ['name' => 'Test'])]); // When event is published, triggering the projection $ecotone->publishEventWithRoutingKey('trigger', []); @@ -104,7 +104,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; if ($partitionKey) { @@ -165,7 +165,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; if ($partitionKey) { @@ -235,7 +235,7 @@ public function init(): void ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-4']), ]); @@ -265,7 +265,7 @@ public function on(array $event): void ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), ]); @@ -277,7 +277,7 @@ public function on(array $event): void self::assertCount(1, $projection->projectedEvents, 'Projection should have processed previous events after manual initialization'); // Now events should be processed since projection is initialized - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-3']), ]); @@ -313,7 +313,7 @@ public function init(): void ); // Add all events to the stream first - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-3']), @@ -356,7 +356,7 @@ public function init(): void ); // Add events to stream - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), ]); @@ -396,7 +396,7 @@ public function init(): void ); // Add events to stream - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), ]); @@ -436,7 +436,7 @@ public function init(): void ); // Add multiple events to stream - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-3']), @@ -479,7 +479,7 @@ public function init(): void ); // Add multiple events to stream - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-3']), @@ -521,7 +521,7 @@ public function init(): void ); // Add events for different partitions - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-1', 'tenantId' => 'tenant-1']), Event::createWithType($projection::TICKET_CREATED, [], [MessageHeaders::EVENT_AGGREGATE_ID => 'ticket-2', 'tenantId' => 'tenant-2']), ]); @@ -551,7 +551,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { return new StreamPage([], '0'); } @@ -628,7 +628,7 @@ public function handleHighPriority(array $event): void ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType('no-priority', []), ]); @@ -636,7 +636,7 @@ public function handleHighPriority(array $event): void self::assertEquals(['projectionA-no-priority', 'projectionB-no-priority'], $db); $db = []; - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType('with-priority', []), ]); $ecotone->publishEventWithRoutingKey('with-priority'); @@ -668,7 +668,7 @@ public function flush(): void ServiceConfiguration::createWithDefaults() ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType('event1', []), Event::createWithType('event2', []), Event::createWithType('event3', []), @@ -702,7 +702,7 @@ public function handle(array $event): void ServiceConfiguration::createWithDefaults() ->withLicenceKey(LicenceTesting::VALID_LICENCE) ); - $ecotone->withEvents([ + $ecotone->withEventStream('test_stream', [ Event::createWithType('event1', []), Event::createWithType('event2', []), Event::createWithType('event3', []), @@ -766,7 +766,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; if ($partitionKey) { @@ -839,7 +839,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { return new StreamPage([], '0'); } @@ -874,19 +874,31 @@ public function canHandle(string $projectionName): bool return $projectionName === 'userland_storage_projection'; } - public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState + private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string + { + $key = $projectionName; + if ($streamName !== '') { + $key .= '::' . $streamName; + } + if ($partitionKey !== null) { + $key .= '-' . $partitionKey; + } + return $key; + } + + public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState { $this->wasUsed = true; - $key = $projectionName . ($partitionKey ? '-' . $partitionKey : ''); + $key = $this->getKey($projectionName, $partitionKey, $streamName); return $this->projectionStates[$key] ?? null; } - public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState + public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState { $this->wasUsed = true; - $key = $projectionName . ($partitionKey ? '-' . $partitionKey : ''); + $key = $this->getKey($projectionName, $partitionKey, $streamName); if (! isset($this->projectionStates[$key])) { - $this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED); + $this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED); return $this->projectionStates[$key]; } return null; @@ -895,7 +907,7 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu public function savePartition(ProjectionPartitionState $projectionState): void { $this->wasUsed = true; - $key = $projectionState->projectionName . ($projectionState->partitionKey ? '-' . $projectionState->partitionKey : ''); + $key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey, $projectionState->streamName); $this->projectionStates[$key] = $projectionState; } @@ -939,7 +951,7 @@ public function canHandle(string $projectionName): bool return true; } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { $from = $lastPosition !== null ? (int) $lastPosition : 0; $events = array_slice($this->events, $from, $count); @@ -965,4 +977,122 @@ public function load(string $projectionName, ?string $lastPosition, int $count, self::assertTrue($userlandStorage->wasUsed, 'Userland state storage should be prioritized and used'); } + + public function test_partitioned_projection_with_multiple_streams_tracks_positions_separately(): void + { + $projection = new #[ProjectionV2('multi_stream_projection'), FromStream('stream_a'), FromStream('stream_b'), Partitioned('tenantId')] class { + public array $processedEvents = []; + + #[EventHandler('*')] + public function on(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + + $ecotone->withEventStream('stream_a', [ + Event::createWithType('event_a1', ['source' => 'A'], ['tenantId' => 'tenant-1']), + Event::createWithType('event_a2', ['source' => 'A'], ['tenantId' => 'tenant-1']), + ]); + $ecotone->withEventStream('stream_b', [ + Event::createWithType('event_b1', ['source' => 'B'], ['tenantId' => 'tenant-1']), + Event::createWithType('event_b2', ['source' => 'B'], ['tenantId' => 'tenant-1']), + ]); + + $ecotone->triggerProjection('multi_stream_projection'); + + self::assertCount(4, $projection->processedEvents, 'All events from both streams should be processed'); + + $sources = array_column($projection->processedEvents, 'source'); + self::assertContains('A', $sources); + self::assertContains('B', $sources); + } + + public function test_same_partition_key_in_different_streams_does_not_collide(): void + { + $projection = new #[ProjectionV2('collision_test_projection'), FromStream('stream_a'), FromStream('stream_b'), Partitioned('tenantId')] class { + public array $processedEvents = []; + + #[EventHandler('*')] + public function on(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + + $ecotone->withEventStream('stream_a', [ + Event::createWithType('event_a1', ['id' => 1], ['tenantId' => 'same-partition']), + Event::createWithType('event_a2', ['id' => 2], ['tenantId' => 'same-partition']), + ]); + $ecotone->withEventStream('stream_b', [ + Event::createWithType('event_b1', ['id' => 3], ['tenantId' => 'same-partition']), + Event::createWithType('event_b2', ['id' => 4], ['tenantId' => 'same-partition']), + ]); + + $ecotone->triggerProjection('collision_test_projection'); + + self::assertCount(4, $projection->processedEvents, 'Events from both streams with same partition key should be processed independently'); + + $eventIds = array_map(fn ($e) => $e['id'], $projection->processedEvents); + self::assertContains(1, $eventIds); + self::assertContains(2, $eventIds); + self::assertContains(3, $eventIds); + self::assertContains(4, $eventIds); + } + + public function test_partitioned_projection_catches_up_all_streams_on_trigger(): void + { + $projection = new #[ProjectionV2('catchup_test_projection'), FromStream('stream_a'), FromStream('stream_b'), Partitioned('tenantId')] class { + public array $processedEvents = []; + + #[EventHandler('*')] + public function on(array $event): void + { + $this->processedEvents[] = $event; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class], + [$projection], + ServiceConfiguration::createWithDefaults() + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ); + + $ecotone->withEventStream('stream_a', [ + Event::createWithType('event1', ['v' => 1], ['tenantId' => 'tenant-1']), + ]); + $ecotone->withEventStream('stream_b', [ + Event::createWithType('event2', ['v' => 2], ['tenantId' => 'tenant-2']), + ]); + + $ecotone->triggerProjection('catchup_test_projection'); + + self::assertCount(2, $projection->processedEvents); + + $ecotone->withEventStream('stream_a', [ + Event::createWithType('event3', ['v' => 3], ['tenantId' => 'tenant-1']), + ]); + $ecotone->withEventStream('stream_b', [ + Event::createWithType('event4', ['v' => 4], ['tenantId' => 'tenant-2']), + ]); + + $ecotone->triggerProjection('catchup_test_projection'); + + self::assertCount(4, $projection->processedEvents, 'All new events from all streams should be caught up'); + } } diff --git a/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php b/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php index 98c7f440f..c455c2b21 100644 --- a/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php +++ b/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php @@ -99,11 +99,12 @@ private function getPostgresCreateSql(): string return <<tableName} ( projection_name VARCHAR(255) NOT NULL, + stream_name VARCHAR(255) NOT NULL DEFAULT '', partition_key VARCHAR(255) NOT NULL DEFAULT '', last_position TEXT NOT NULL, metadata JSON NOT NULL, user_state JSON, - PRIMARY KEY (projection_name, partition_key) + PRIMARY KEY (projection_name, stream_name, partition_key) ) SQL; } @@ -113,11 +114,12 @@ private function getMysqlCreateSql(): string return <<tableName}` ( `projection_name` VARCHAR(255) NOT NULL, + `stream_name` VARCHAR(255) NOT NULL DEFAULT '', `partition_key` VARCHAR(255) NOT NULL DEFAULT '', `last_position` TEXT NOT NULL, `metadata` JSON NOT NULL, `user_state` JSON, - PRIMARY KEY (`projection_name`, `partition_key`) + PRIMARY KEY (`projection_name`, `stream_name`, `partition_key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci SQL; } diff --git a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php index fec82a2fc..b8f8250a8 100644 --- a/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php +++ b/packages/PdoEventSourcing/src/Projecting/PartitionState/DbalProjectionStateStorage.php @@ -79,14 +79,14 @@ private function markInitialized(): void $this->initialized[$this->getConnectionKey()] = true; } - public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState + public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState { $this->createSchema(); $tableName = $this->getTableName(); $query = <<getConnection()->fetchAssociative($query, [ 'projectionName' => $projectionName, + 'streamName' => $streamName, 'partitionKey' => $partitionKey ?? '', ]); if (! $row) { @@ -103,48 +104,46 @@ public function loadPartition(string $projectionName, ?string $partitionKey = nu $metadata = $row['metadata'] ? json_decode($row['metadata'], true) : null; $status = isset($metadata[self::INITIALIZATION_STATUS_KEY]) ? ProjectionInitializationStatus::from($metadata[self::INITIALIZATION_STATUS_KEY]) : null; - return new ProjectionPartitionState($projectionName, $partitionKey, $row['last_position'], json_decode($row['user_state'], true), $status); + return new ProjectionPartitionState($projectionName, $partitionKey, $streamName, $row['last_position'], json_decode($row['user_state'], true), $status); } - public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState + public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState { $this->createSchema(); $connection = $this->getConnection(); $tableName = $this->getTableName(); - // Try to insert the partition state, ignoring if it already exists $insertQuery = match (true) { $connection->getDatabasePlatform() instanceof MySQLPlatform => << << $projectionState->status?->value ?? ProjectionInitializationStatus::UNINITIALIZED->value, + self::INITIALIZATION_STATUS_KEY => ProjectionInitializationStatus::UNINITIALIZED->value, ]; $rowsAffected = $connection->executeStatement($insertQuery, [ 'projectionName' => $projectionName, + 'streamName' => $streamName, 'partitionKey' => $partitionKey ?? '', 'lastPosition' => '', 'userState' => json_encode(null), 'metadata' => json_encode($metadata, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT), ]); - // If no rows were affected, the partition already existed if ($rowsAffected === 0) { return null; } - // Return the newly created state - return new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED); + return new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED); } public function savePartition(ProjectionPartitionState $projectionState): void @@ -156,14 +155,14 @@ public function savePartition(ProjectionPartitionState $projectionState): void $saveStateQuery = match (true) { $connection->getDatabasePlatform() instanceof MySQLPlatform => << <<executeStatement($saveStateQuery, [ 'projectionName' => $projectionState->projectionName, + 'streamName' => $projectionState->streamName, 'partitionKey' => $projectionState->partitionKey ?? '', 'lastPosition' => $projectionState->lastPosition, 'userState' => json_encode($projectionState->userState, JSON_THROW_ON_ERROR), diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php index 6ea55355a..1a4208863 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreAggregateStreamSource.php @@ -40,13 +40,11 @@ public function canHandle(string $projectionName): bool return in_array($projectionName, $this->handledProjectionNames, true); } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { Assert::notNull($partitionKey, 'Partition key cannot be null for aggregate stream source'); - $streamFilters = $this->streamFilterRegistry->provide($projectionName); - Assert::isTrue(count($streamFilters) > 0, "No stream filter found for projection: {$projectionName}"); - $streamFilter = $streamFilters[0]; + $streamFilter = $this->findStreamFilterByName($projectionName, $streamName); if (! $this->eventStore->hasStream($streamFilter->streamName)) { return new StreamPage([], $lastPosition ?? ''); @@ -90,6 +88,20 @@ public function load(string $projectionName, ?string $lastPosition, int $count, return new StreamPage($events, $this->createPositionFrom($lastPosition, $events)); } + private function findStreamFilterByName(string $projectionName, string $streamName): \Ecotone\Projecting\StreamFilter + { + $streamFilters = $this->streamFilterRegistry->provide($projectionName); + Assert::isTrue(count($streamFilters) > 0, "No stream filter found for projection: {$projectionName}"); + + foreach ($streamFilters as $streamFilter) { + if ($streamFilter->streamName === $streamName) { + return $streamFilter; + } + } + + throw new RuntimeException("No stream filter found for stream: {$streamName} in projection: {$projectionName}"); + } + /** * @param array $events */ diff --git a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php index a9b30c34b..f91d0bb33 100644 --- a/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php +++ b/packages/PdoEventSourcing/src/Projecting/StreamSource/EventStoreGlobalStreamSource.php @@ -60,7 +60,7 @@ private function getConnection(): Connection return $this->connectionFactory->createContext()->getDbalConnection(); } - public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage + public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage { Assert::null($partitionKey, 'Partition key is not supported for EventStoreGlobalStreamSource'); diff --git a/spec.md b/spec.md new file mode 100644 index 000000000..884ee3a15 --- /dev/null +++ b/spec.md @@ -0,0 +1,267 @@ +# Partitioned ProjectionV2 - Multiple Streams Support + +## Problem Statement + +Currently, partitioned projections (`#[Partitioned]` attribute) with `#[ProjectionV2]` only support a single stream. When a projection consumes from multiple streams (multiple `#[FromStream]` or `#[FromAggregateStream]` attributes), two issues arise: + +1. **Position Tracking Collision**: If the same partition key (e.g., aggregate ID `"123"`) exists in two different streams, they share the same position state, causing events to be skipped or processed incorrectly. + +2. **Catch-up Incomplete**: Triggering the projection only catches up events from one stream, leaving other streams unprocessed. + +## Current Architecture Analysis + +### Key Components + +| Component | Description | +|-----------|-------------| +| `ProjectionPartitionState` | Stores: `projectionName`, `partitionKey`, `lastPosition`, `userState`, `status` | +| `StreamFilterRegistry` | Provides `StreamFilter[]` per projection (supports multiple streams) | +| `EventStoreAggregateStreamSource` | Handles partitioned loading - currently only uses `$streamFilters[0]` | +| `InMemoryProjectionStateStorage` | Key format: `{projectionName}-{partitionKey}` (no stream awareness) | +| `DbalProjectionStateStorage` | Primary key: `(projection_name, partition_key)` (no stream column) | +| `ProjectingManager::execute()` | Loads from single stream source, stores single position | + +### Current Table Schema + +```sql +-- PostgreSQL +CREATE TABLE ecotone_projection_state ( + projection_name VARCHAR(255) NOT NULL, + partition_key VARCHAR(255) NOT NULL DEFAULT '', + last_position TEXT NOT NULL, + metadata JSON NOT NULL, + user_state JSON, + PRIMARY KEY (projection_name, partition_key) +) +``` + +### Root Causes + +1. **`EventStoreAggregateStreamSource::load()`** ignores multiple stream filters: + ```php + $streamFilter = $streamFilters[0]; // Only first stream used! + ``` + +2. **State storage key** doesn't include stream name - partitions from different streams collide: + - Stream A, partition "123" → key: `my_projection-123` + - Stream B, partition "123" → key: `my_projection-123` (COLLISION!) + +3. **`ProjectingManager::execute()`** processes only one stream per execution cycle. + +4. **`ProjectionStateStorage::loadPartition()`** has no stream context - cannot distinguish between same partition key in different streams. + +## Proposed Solution + +### Stream-Aware State Storage + +Extend the state storage to include stream information, making each (projection, stream, partition) combination unique. + +#### Changes Required: + +**1. `ProjectionPartitionState`** - Add `streamName` field: +```php +public function __construct( + public readonly string $projectionName, + public readonly ?string $partitionKey, + public readonly ?string $streamName, // NEW - required + public readonly ?string $lastPosition = null, + public readonly mixed $userState = null, + public readonly ?ProjectionInitializationStatus $status = null, +) +``` + +**2. `ProjectionStateStorage` interface** - Add `streamName` parameter: +```php +public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState; +public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState; +public function savePartition(ProjectionPartitionState $projectionState): void; // Uses streamName from state object +``` + +**3. `StreamSource` interface** - Add `streamName` parameter: +```php +public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage; +``` + +**4. `InMemoryProjectionStateStorage`** - Update key generation: +```php +private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string +{ + $key = $projectionName; + if ($streamName !== '') { + $key .= '::' . $streamName; + } + if ($partitionKey !== null) { + $key .= '-' . $partitionKey; + } + return $key; +} +``` + +**5. `ProjectionStateTableManager`** - Update schema to include `stream_name`: +```sql +-- PostgreSQL +CREATE TABLE ecotone_projection_state ( + projection_name VARCHAR(255) NOT NULL, + stream_name VARCHAR(255) NOT NULL DEFAULT '', -- NEW COLUMN + partition_key VARCHAR(255) NOT NULL DEFAULT '', + last_position TEXT NOT NULL, + metadata JSON NOT NULL, + user_state JSON, + PRIMARY KEY (projection_name, stream_name, partition_key) -- UPDATED +) + +-- MySQL +CREATE TABLE `ecotone_projection_state` ( + `projection_name` VARCHAR(255) NOT NULL, + `stream_name` VARCHAR(255) NOT NULL DEFAULT '', -- NEW COLUMN + `partition_key` VARCHAR(255) NOT NULL DEFAULT '', + `last_position` TEXT NOT NULL, + `metadata` JSON NOT NULL, + `user_state` JSON, + PRIMARY KEY (`projection_name`, `stream_name`, `partition_key`) -- UPDATED +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +``` + +**6. `DbalProjectionStateStorage`** - Update all queries to include `stream_name`: +```php +// loadPartition +$query = <<streamFilterRegistry->provide($projectionName); + $streamFilter = $this->findStreamFilterByName($streamFilters, $streamName); + + // Use $streamFilter for loading... +} +``` + +**8. `ProjectingManager::execute()`** - Require stream name, iterate over all streams when triggering: +```php +public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void +{ + do { + $processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization); + } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); +} + +public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void +{ + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + foreach ($streamFilters as $streamFilter) { + $this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization); + } +} +``` + +### Table Schema Change Required: YES + +The database table **must** be updated to add `stream_name` column as part of the composite primary key. + +### Migration Strategy + +1. **New installations**: Create table with new schema including `stream_name` +2. **Existing installations**: + - Add `stream_name` column with default `''` + - Update primary key to include `stream_name` + - Existing data continues to work (single-stream projections use `stream_name = ''`) + +## Implementation Plan + +### Phase 1: Core Infrastructure (Ecotone Package) +1. Update `ProjectionPartitionState` - add `streamName` property (required) +2. Update `ProjectionStateStorage` interface - add `streamName` parameter to `loadPartition`, `initPartition` +3. Update `StreamSource` interface - add `streamName` parameter to `load` +4. Update `InMemoryProjectionStateStorage` - stream-aware key generation +5. Update `InMemoryStreamSource` - update `load` method signature +6. Update `ProjectingManager` - require `streamName` in `execute()`, add `executeAllStreams()` method + +### Phase 2: Database Storage (PdoEventSourcing Package) +1. Update `ProjectionStateTableManager` - add `stream_name` column to schema +2. Update `DbalProjectionStateStorage` - include `stream_name` in all queries +3. Update `EventStoreAggregateStreamSource` - use passed `streamName` to find matching StreamFilter + +### Phase 3: Testing +1. Add unit tests for multi-stream partitioned projections using EcotoneLite +2. Test partition key collision prevention across streams +3. Test catch-up behavior for all streams +4. All tests use inline anonymous classes + +## Test Scenarios + +```php +public function test_partitioned_projection_with_multiple_streams_tracks_positions_separately(): void +{ + // Given: Projection with two FromStream attributes and Partitioned + // And: Same partition key exists in both streams + // When: Projection is triggered for all streams + // Then: Events from both streams are processed + // And: Position for each stream is tracked independently +} + +public function test_partitioned_projection_catches_up_all_streams(): void +{ + // Given: Projection with multiple streams + // And: Events exist in all streams + // When: executeAllStreams() is called + // Then: All streams are caught up completely +} + +public function test_same_partition_key_in_different_streams_does_not_collide(): void +{ + // Given: Stream A has partition "123" at position 5 + // And: Stream B has partition "123" at position 10 + // When: New events are added to both streams + // Then: Each stream's partition continues from its own position +} +``` + +## Files to Modify + +### Ecotone Package +| File | Change | +|------|--------| +| `src/Projecting/ProjectionPartitionState.php` | Add `streamName` property (required) | +| `src/Projecting/ProjectionStateStorage.php` | Add `streamName` param to `loadPartition`, `initPartition` | +| `src/Projecting/StreamSource.php` | Add `streamName` param to `load` | +| `src/Projecting/InMemory/InMemoryProjectionStateStorage.php` | Stream-aware key generation | +| `src/Projecting/InMemory/InMemoryStreamSource.php` | Update `load` method signature | +| `src/Projecting/ProjectingManager.php` | Require `streamName` in `execute()`, add `executeAllStreams()` | +| `tests/Projecting/ProjectingTest.php` | Add multi-stream partitioned tests | + +### PdoEventSourcing Package +| File | Change | +|------|--------| +| `src/Database/ProjectionStateTableManager.php` | Add `stream_name` column | +| `src/Projecting/PartitionState/DbalProjectionStateStorage.php` | Include `stream_name` in queries | +| `src/Projecting/StreamSource/EventStoreAggregateStreamSource.php` | Use passed `streamName` to find StreamFilter | + +## Estimated Effort + +| Task | Estimate | +|------|----------| +| Core infrastructure changes | 4-6 hours | +| Database storage changes | 2-3 hours | +| Testing | 3-4 hours | +| **Total** | **9-13 hours** | + +## Design Decisions + +1. **Partition header**: The partition header comes from the Event Message and is configured via `#[Partitioned]` attribute at the class level. The same partition header is used for all streams. + +2. **No backward compatibility**: This feature is not yet live, so method signatures are changed explicitly without optional parameters for backward compatibility. + +3. **Stream name required**: All methods that deal with partition state now require an explicit `streamName` parameter to ensure proper isolation between streams. + From d3ac1a376fc0fdc54bd4a77a3a00fda10f75a114 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 10 Feb 2026 20:00:22 +0100 Subject: [PATCH 2/2] clean up --- spec.md | 267 -------------------------------------------------------- 1 file changed, 267 deletions(-) delete mode 100644 spec.md diff --git a/spec.md b/spec.md deleted file mode 100644 index 884ee3a15..000000000 --- a/spec.md +++ /dev/null @@ -1,267 +0,0 @@ -# Partitioned ProjectionV2 - Multiple Streams Support - -## Problem Statement - -Currently, partitioned projections (`#[Partitioned]` attribute) with `#[ProjectionV2]` only support a single stream. When a projection consumes from multiple streams (multiple `#[FromStream]` or `#[FromAggregateStream]` attributes), two issues arise: - -1. **Position Tracking Collision**: If the same partition key (e.g., aggregate ID `"123"`) exists in two different streams, they share the same position state, causing events to be skipped or processed incorrectly. - -2. **Catch-up Incomplete**: Triggering the projection only catches up events from one stream, leaving other streams unprocessed. - -## Current Architecture Analysis - -### Key Components - -| Component | Description | -|-----------|-------------| -| `ProjectionPartitionState` | Stores: `projectionName`, `partitionKey`, `lastPosition`, `userState`, `status` | -| `StreamFilterRegistry` | Provides `StreamFilter[]` per projection (supports multiple streams) | -| `EventStoreAggregateStreamSource` | Handles partitioned loading - currently only uses `$streamFilters[0]` | -| `InMemoryProjectionStateStorage` | Key format: `{projectionName}-{partitionKey}` (no stream awareness) | -| `DbalProjectionStateStorage` | Primary key: `(projection_name, partition_key)` (no stream column) | -| `ProjectingManager::execute()` | Loads from single stream source, stores single position | - -### Current Table Schema - -```sql --- PostgreSQL -CREATE TABLE ecotone_projection_state ( - projection_name VARCHAR(255) NOT NULL, - partition_key VARCHAR(255) NOT NULL DEFAULT '', - last_position TEXT NOT NULL, - metadata JSON NOT NULL, - user_state JSON, - PRIMARY KEY (projection_name, partition_key) -) -``` - -### Root Causes - -1. **`EventStoreAggregateStreamSource::load()`** ignores multiple stream filters: - ```php - $streamFilter = $streamFilters[0]; // Only first stream used! - ``` - -2. **State storage key** doesn't include stream name - partitions from different streams collide: - - Stream A, partition "123" → key: `my_projection-123` - - Stream B, partition "123" → key: `my_projection-123` (COLLISION!) - -3. **`ProjectingManager::execute()`** processes only one stream per execution cycle. - -4. **`ProjectionStateStorage::loadPartition()`** has no stream context - cannot distinguish between same partition key in different streams. - -## Proposed Solution - -### Stream-Aware State Storage - -Extend the state storage to include stream information, making each (projection, stream, partition) combination unique. - -#### Changes Required: - -**1. `ProjectionPartitionState`** - Add `streamName` field: -```php -public function __construct( - public readonly string $projectionName, - public readonly ?string $partitionKey, - public readonly ?string $streamName, // NEW - required - public readonly ?string $lastPosition = null, - public readonly mixed $userState = null, - public readonly ?ProjectionInitializationStatus $status = null, -) -``` - -**2. `ProjectionStateStorage` interface** - Add `streamName` parameter: -```php -public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState; -public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState; -public function savePartition(ProjectionPartitionState $projectionState): void; // Uses streamName from state object -``` - -**3. `StreamSource` interface** - Add `streamName` parameter: -```php -public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage; -``` - -**4. `InMemoryProjectionStateStorage`** - Update key generation: -```php -private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string -{ - $key = $projectionName; - if ($streamName !== '') { - $key .= '::' . $streamName; - } - if ($partitionKey !== null) { - $key .= '-' . $partitionKey; - } - return $key; -} -``` - -**5. `ProjectionStateTableManager`** - Update schema to include `stream_name`: -```sql --- PostgreSQL -CREATE TABLE ecotone_projection_state ( - projection_name VARCHAR(255) NOT NULL, - stream_name VARCHAR(255) NOT NULL DEFAULT '', -- NEW COLUMN - partition_key VARCHAR(255) NOT NULL DEFAULT '', - last_position TEXT NOT NULL, - metadata JSON NOT NULL, - user_state JSON, - PRIMARY KEY (projection_name, stream_name, partition_key) -- UPDATED -) - --- MySQL -CREATE TABLE `ecotone_projection_state` ( - `projection_name` VARCHAR(255) NOT NULL, - `stream_name` VARCHAR(255) NOT NULL DEFAULT '', -- NEW COLUMN - `partition_key` VARCHAR(255) NOT NULL DEFAULT '', - `last_position` TEXT NOT NULL, - `metadata` JSON NOT NULL, - `user_state` JSON, - PRIMARY KEY (`projection_name`, `stream_name`, `partition_key`) -- UPDATED -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci -``` - -**6. `DbalProjectionStateStorage`** - Update all queries to include `stream_name`: -```php -// loadPartition -$query = <<streamFilterRegistry->provide($projectionName); - $streamFilter = $this->findStreamFilterByName($streamFilters, $streamName); - - // Use $streamFilter for loading... -} -``` - -**8. `ProjectingManager::execute()`** - Require stream name, iterate over all streams when triggering: -```php -public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void -{ - do { - $processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization); - } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); -} - -public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void -{ - $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); - foreach ($streamFilters as $streamFilter) { - $this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization); - } -} -``` - -### Table Schema Change Required: YES - -The database table **must** be updated to add `stream_name` column as part of the composite primary key. - -### Migration Strategy - -1. **New installations**: Create table with new schema including `stream_name` -2. **Existing installations**: - - Add `stream_name` column with default `''` - - Update primary key to include `stream_name` - - Existing data continues to work (single-stream projections use `stream_name = ''`) - -## Implementation Plan - -### Phase 1: Core Infrastructure (Ecotone Package) -1. Update `ProjectionPartitionState` - add `streamName` property (required) -2. Update `ProjectionStateStorage` interface - add `streamName` parameter to `loadPartition`, `initPartition` -3. Update `StreamSource` interface - add `streamName` parameter to `load` -4. Update `InMemoryProjectionStateStorage` - stream-aware key generation -5. Update `InMemoryStreamSource` - update `load` method signature -6. Update `ProjectingManager` - require `streamName` in `execute()`, add `executeAllStreams()` method - -### Phase 2: Database Storage (PdoEventSourcing Package) -1. Update `ProjectionStateTableManager` - add `stream_name` column to schema -2. Update `DbalProjectionStateStorage` - include `stream_name` in all queries -3. Update `EventStoreAggregateStreamSource` - use passed `streamName` to find matching StreamFilter - -### Phase 3: Testing -1. Add unit tests for multi-stream partitioned projections using EcotoneLite -2. Test partition key collision prevention across streams -3. Test catch-up behavior for all streams -4. All tests use inline anonymous classes - -## Test Scenarios - -```php -public function test_partitioned_projection_with_multiple_streams_tracks_positions_separately(): void -{ - // Given: Projection with two FromStream attributes and Partitioned - // And: Same partition key exists in both streams - // When: Projection is triggered for all streams - // Then: Events from both streams are processed - // And: Position for each stream is tracked independently -} - -public function test_partitioned_projection_catches_up_all_streams(): void -{ - // Given: Projection with multiple streams - // And: Events exist in all streams - // When: executeAllStreams() is called - // Then: All streams are caught up completely -} - -public function test_same_partition_key_in_different_streams_does_not_collide(): void -{ - // Given: Stream A has partition "123" at position 5 - // And: Stream B has partition "123" at position 10 - // When: New events are added to both streams - // Then: Each stream's partition continues from its own position -} -``` - -## Files to Modify - -### Ecotone Package -| File | Change | -|------|--------| -| `src/Projecting/ProjectionPartitionState.php` | Add `streamName` property (required) | -| `src/Projecting/ProjectionStateStorage.php` | Add `streamName` param to `loadPartition`, `initPartition` | -| `src/Projecting/StreamSource.php` | Add `streamName` param to `load` | -| `src/Projecting/InMemory/InMemoryProjectionStateStorage.php` | Stream-aware key generation | -| `src/Projecting/InMemory/InMemoryStreamSource.php` | Update `load` method signature | -| `src/Projecting/ProjectingManager.php` | Require `streamName` in `execute()`, add `executeAllStreams()` | -| `tests/Projecting/ProjectingTest.php` | Add multi-stream partitioned tests | - -### PdoEventSourcing Package -| File | Change | -|------|--------| -| `src/Database/ProjectionStateTableManager.php` | Add `stream_name` column | -| `src/Projecting/PartitionState/DbalProjectionStateStorage.php` | Include `stream_name` in queries | -| `src/Projecting/StreamSource/EventStoreAggregateStreamSource.php` | Use passed `streamName` to find StreamFilter | - -## Estimated Effort - -| Task | Estimate | -|------|----------| -| Core infrastructure changes | 4-6 hours | -| Database storage changes | 2-3 hours | -| Testing | 3-4 hours | -| **Total** | **9-13 hours** | - -## Design Decisions - -1. **Partition header**: The partition header comes from the Event Message and is configured via `#[Partitioned]` attribute at the class level. The same partition header is used for all streams. - -2. **No backward compatibility**: This feature is not yet live, so method signatures are changed explicitly without optional parameters for backward compatibility. - -3. **Stream name required**: All methods that deal with partition state now require an explicit `streamName` parameter to ensure proper isolation between streams. -