-
-
Notifications
You must be signed in to change notification settings - Fork 21
feat: partitioned multiple stream support #627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,28 +48,36 @@ private function getProjectionStateStorage(): ProjectionStateStorage | |
| return $this->projectionStateStorage; | ||
| } | ||
|
|
||
| public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void | ||
| public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jlabedo I do think global tracking is a bit problematic with current implementation. It makes assumption that projecting between different streams is like projecting across single stream. I do think we should lose that up, and allow for parallel projecting between streams, just like for partitioned. This would means that large streams, while projecting with 2, 3+ streams could be 2,3+ time shorter. However the result of that is, if we project to the same row in db between streams, we need to ensure using safe blocking SQL operations. Instead of:
UPDATE count VALUES (5)
UPDATE count = count + 1I am not sure, if even projecting to same record between streams should be a case, I would discourage to do that. But if we someone would want to do it, we would have to mention it in the docs - if we go with isolated projecting between streams.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't understand why you think it is problematic ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe problematic is a bad word, it may be done differently to achieve better results. If we would split those tracking and each stream for Global would be projected separately, then:
The challenge I see however is that some Developers may assume that each access to the projection is blocking, meaning only one transaction do write at time. But with above approach we on purpose not do that, to achieve scaling and optimize each projection execution. If someone would then try to update same row from different streams, we could end up with last one wins and overwrites. Wdyt?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am closing this PR, but this question is still open. So please respond here or via dm :) |
||
| { | ||
| do { | ||
| $processedEvents = $this->executeSingleBatch($partitionKeyValue, $manualInitialization || $this->automaticInitialization); | ||
| $processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization); | ||
| } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); | ||
| } | ||
|
|
||
| public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void | ||
| { | ||
| $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); | ||
| foreach ($streamFilters as $streamFilter) { | ||
| $this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @return int Number of processed events | ||
| */ | ||
| private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int | ||
| private function executeSingleBatch(?string $partitionKeyValue, string $streamName, bool $canInitialize): int | ||
| { | ||
| $transaction = $this->getProjectionStateStorage()->beginTransaction(); | ||
| try { | ||
| $projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize); | ||
| $projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $streamName, $canInitialize); | ||
| if ($projectionState === null) { | ||
| $transaction->commit(); | ||
| return 0; | ||
| } | ||
|
|
||
| $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); | ||
| $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); | ||
| $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue, $streamName); | ||
|
|
||
| $userState = $projectionState->userState; | ||
| $processedEvents = 0; | ||
|
|
@@ -86,7 +94,6 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial | |
| ->withUserState($userState); | ||
|
|
||
| if ($processedEvents === 0 && $canInitialize) { | ||
| // If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized | ||
| $projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED); | ||
| } | ||
|
|
||
|
|
@@ -99,9 +106,9 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial | |
| } | ||
| } | ||
|
|
||
| public function loadState(?string $partitionKey = null): ProjectionPartitionState | ||
| public function loadState(?string $partitionKey, string $streamName): ProjectionPartitionState | ||
| { | ||
| return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey); | ||
| return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey, $streamName); | ||
| } | ||
|
|
||
| public function getPartitionProvider(): PartitionProvider | ||
|
|
@@ -193,10 +200,10 @@ public function backfill(): void | |
| $this->prepareBackfill(); | ||
| } | ||
|
|
||
| private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState | ||
| private function loadOrInitializePartitionState(?string $partitionKey, string $streamName, bool $canInitialize): ?ProjectionPartitionState | ||
| { | ||
| $storage = $this->getProjectionStateStorage(); | ||
| $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); | ||
| $projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName); | ||
|
|
||
| if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) { | ||
| return null; | ||
|
|
@@ -206,11 +213,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can | |
| } | ||
|
|
||
| if ($canInitialize) { | ||
| $projectionState = $storage->initPartition($this->projectionName, $partitionKey); | ||
| $projectionState = $storage->initPartition($this->projectionName, $partitionKey, $streamName); | ||
| if ($projectionState) { | ||
| $this->projectorExecutor->init(); | ||
| } else { | ||
| $projectionState = $storage->loadPartition($this->projectionName, $partitionKey); | ||
| $projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName); | ||
| } | ||
| return $projectionState; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are loading for stream and partition key. As partition key make only sense within given stream.