Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function executeBackfillBatch(
$streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName);

foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) {
$projectingManager->execute($partition, true);
$projectingManager->execute($partition, $streamName, true);
if ($this->terminationListener->shouldTerminate()) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
->chainInterceptedProcessor(
MethodInvokerBuilder::create(
$projectingManagerReference,
InterfaceToCallReference::create(ProjectingManager::class, 'execute'),
InterfaceToCallReference::create(ProjectingManager::class, 'executeAllStreams'),
[
$projectionBuilder->partitionHeader()
? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ public function canHandle(string $projectionName): bool
return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true);
}

public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
{
// Position is 0-based index into the global event array (like InMemoryStreamSource)
$from = $lastPosition !== null ? (int) $lastPosition : 0;

// Determine which streams to read from
$streams = $this->getStreamsToRead();
$streams = $this->getStreamsToRead($streamName);

// Collect all events from all streams
$allEvents = [];
foreach ($streams as $stream) {
if (! $this->eventStore->hasStream($stream)) {
Expand All @@ -62,36 +59,37 @@ public function load(string $projectionName, ?string $lastPosition, int $count,
->withMetadataMatch($this->partitionHeader, Operator::EQUALS, $partitionKey);
}

// Filter by event names if specified (optimization for partitioned projections)
if ($this->eventNames !== []) {
$metadataMatcher = $metadataMatcher
->withMetadataMatch('event_name', Operator::IN, $this->eventNames, FieldType::MESSAGE_PROPERTY);
}

// Load all events from this stream (starting from position 1)
$events = $this->eventStore->load($stream, 1, null, $metadataMatcher);
$allEvents = array_merge($allEvents, is_array($events) ? $events : iterator_to_array($events));
}

// Slice based on global position
$events = array_slice($allEvents, $from, $count);
$to = $from + count($events);

return new StreamPage($events, (string) $to);
}

private function getStreamsToRead(): array
/**
* @return array<string>
*/
private function getStreamsToRead(string $streamName): array
{
if ($streamName !== '') {
return [$streamName];
}

if ($this->streamName !== null) {
return [$this->streamName];
}

// Read from all streams (global stream)
$reflection = new ReflectionProperty($this->eventStore, 'streams');
$reflection->setAccessible(true);
$allStreams = array_keys($reflection->getValue($this->eventStore));

// Filter out internal streams (starting with $)
return array_filter($allStreams, fn ($stream) => ! str_starts_with($stream, '$'));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ public function canHandle(string $projectionName): bool
return $this->projectionNames === null || in_array($projectionName, $this->projectionNames, true);
}

public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState
public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState
{
$key = $this->getKey($projectionName, $partitionKey);
$key = $this->getKey($projectionName, $partitionKey, $streamName);
return $this->projectionStates[$key] ?? null;
}

public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState
public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState
{
$key = $this->getKey($projectionName, $partitionKey);
$key = $this->getKey($projectionName, $partitionKey, $streamName);

if (! isset($this->projectionStates[$key])) {
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, null, null, ProjectionInitializationStatus::UNINITIALIZED);
$this->projectionStates[$key] = new ProjectionPartitionState($projectionName, $partitionKey, $streamName, null, null, ProjectionInitializationStatus::UNINITIALIZED);
return $this->projectionStates[$key];
}

Expand All @@ -55,21 +55,25 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu

public function savePartition(ProjectionPartitionState $projectionState): void
{
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey);
$key = $this->getKey($projectionState->projectionName, $projectionState->partitionKey, $projectionState->streamName);
$this->projectionStates[$key] = $projectionState;
}

private function getKey(string $projectionName, ?string $partitionKey): string
private function getKey(string $projectionName, ?string $partitionKey, string $streamName): string
{
if ($partitionKey === null) {
return $projectionName;
$key = $projectionName;
if ($streamName !== '') {
$key .= '::' . $streamName;
}
return $projectionName . '-' . $partitionKey;
if ($partitionKey !== null) {
$key .= '-' . $partitionKey;
}
return $key;
}

public function delete(string $projectionName): void
{
$projectionStartKey = $this->getKey($projectionName, null);
$projectionStartKey = $projectionName;
foreach ($this->projectionStates as $key => $value) {
if (str_starts_with($key, $projectionStartKey)) {
unset($this->projectionStates[$key]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function append(Event ...$events): void
}
}

public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are loading for stream and partition key. As partition key make only sense within given stream.

{
$from = $lastPosition !== null ? (int) $lastPosition : 0;

Expand Down
31 changes: 19 additions & 12 deletions packages/Ecotone/src/Projecting/ProjectingManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,36 @@ private function getProjectionStateStorage(): ProjectionStateStorage
return $this->projectionStateStorage;
}

public function execute(?string $partitionKeyValue = null, bool $manualInitialization = false): void
public function execute(?string $partitionKeyValue, string $streamName, bool $manualInitialization = false): void
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlabedo I do think global tracking is a bit problematic with current implementation. It makes assumption that projecting between different streams is like projecting across single stream.

I do think we should lose that up, and allow for parallel projecting between streams, just like for partitioned. This would means that large streams, while projecting with 2, 3+ streams could be 2,3+ time shorter.

However the result of that is, if we project to the same row in db between streams, we need to ensure using safe blocking SQL operations. Instead of:

  • not safe: the last one wins
UPDATE count VALUES (5)
  • safe blocking: will give correct results in case of concurrent access
UPDATE count = count + 1

I am not sure, if even projecting to same record between streams should be a case, I would discourage to do that. But if we someone would want to do it, we would have to mention it in the docs - if we go with isolated projecting between streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think global tracking is a bit problematic with current implementation. It makes assumption that projecting between different streams is like projecting across single stream.

I don't understand why you think it is problematic ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe problematic is a bad word, it may be done differently to achieve better results.

If we would split those tracking and each stream for Global would be projected separately, then:

  • If Projection is having two or more related streams, we can do the rebuild for all of those in the same time. By merging those that won't be a case, the time for rebuild is multiplied by the number of streams
  • Less conflicts, because separate streams are not blocking projecting from other streams. This would be in match with our gap detection mechanism and improve it, as right now parallel transaction will block and wait, but with they would continue.
  • More optimized, we only fetch events for the stream that have triggered projection, rather than fetching for all of them

The challenge I see however is that some Developers may assume that each access to the projection is blocking, meaning only one transaction do write at time. But with above approach we on purpose not do that, to achieve scaling and optimize each projection execution.

If someone would then try to update same row from different streams, we could end up with last one wins and overwrites.

Wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am closing this PR, but this question is still open. So please respond here or via dm :)

{
do {
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $manualInitialization || $this->automaticInitialization);
$processedEvents = $this->executeSingleBatch($partitionKeyValue, $streamName, $manualInitialization || $this->automaticInitialization);
} while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true);
}

public function executeAllStreams(?string $partitionKeyValue = null, bool $manualInitialization = false): void
{
$streamFilters = $this->streamFilterRegistry->provide($this->projectionName);
foreach ($streamFilters as $streamFilter) {
$this->execute($partitionKeyValue, $streamFilter->streamName, $manualInitialization);
}
}

/**
* @return int Number of processed events
*/
private function executeSingleBatch(?string $partitionKeyValue, bool $canInitialize): int
private function executeSingleBatch(?string $partitionKeyValue, string $streamName, bool $canInitialize): int
{
$transaction = $this->getProjectionStateStorage()->beginTransaction();
try {
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $canInitialize);
$projectionState = $this->loadOrInitializePartitionState($partitionKeyValue, $streamName, $canInitialize);
if ($projectionState === null) {
$transaction->commit();
return 0;
}

$streamSource = $this->streamSourceRegistry->getFor($this->projectionName);
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue);
$streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue, $streamName);

$userState = $projectionState->userState;
$processedEvents = 0;
Expand All @@ -86,7 +94,6 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial
->withUserState($userState);

if ($processedEvents === 0 && $canInitialize) {
// If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized
$projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED);
}

Expand All @@ -99,9 +106,9 @@ private function executeSingleBatch(?string $partitionKeyValue, bool $canInitial
}
}

public function loadState(?string $partitionKey = null): ProjectionPartitionState
public function loadState(?string $partitionKey, string $streamName): ProjectionPartitionState
{
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey);
return $this->getProjectionStateStorage()->loadPartition($this->projectionName, $partitionKey, $streamName);
}

public function getPartitionProvider(): PartitionProvider
Expand Down Expand Up @@ -193,10 +200,10 @@ public function backfill(): void
$this->prepareBackfill();
}

private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState
private function loadOrInitializePartitionState(?string $partitionKey, string $streamName, bool $canInitialize): ?ProjectionPartitionState
{
$storage = $this->getProjectionStateStorage();
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);

if (! $canInitialize && $projectionState?->status === ProjectionInitializationStatus::UNINITIALIZED) {
return null;
Expand All @@ -206,11 +213,11 @@ private function loadOrInitializePartitionState(?string $partitionKey, bool $can
}

if ($canInitialize) {
$projectionState = $storage->initPartition($this->projectionName, $partitionKey);
$projectionState = $storage->initPartition($this->projectionName, $partitionKey, $streamName);
if ($projectionState) {
$this->projectorExecutor->init();
} else {
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey);
$projectionState = $storage->loadPartition($this->projectionName, $partitionKey, $streamName);
}
return $projectionState;
}
Expand Down
7 changes: 4 additions & 3 deletions packages/Ecotone/src/Projecting/ProjectionPartitionState.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ProjectionPartitionState
public function __construct(
public readonly string $projectionName,
public readonly ?string $partitionKey,
public readonly string $streamName,
public readonly ?string $lastPosition = null,
public readonly mixed $userState = null,
public readonly ?ProjectionInitializationStatus $status = null,
Expand All @@ -20,16 +21,16 @@ public function __construct(

public function withLastPosition(string $lastPosition): self
{
return new self($this->projectionName, $this->partitionKey, $lastPosition, $this->userState, $this->status);
return new self($this->projectionName, $this->partitionKey, $this->streamName, $lastPosition, $this->userState, $this->status);
}

public function withUserState(mixed $userState): self
{
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $userState, $this->status);
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $userState, $this->status);
}

public function withStatus(ProjectionInitializationStatus $status): self
{
return new self($this->projectionName, $this->partitionKey, $this->lastPosition, $this->userState, $status);
return new self($this->projectionName, $this->partitionKey, $this->streamName, $this->lastPosition, $this->userState, $status);
}
}
4 changes: 2 additions & 2 deletions packages/Ecotone/src/Projecting/ProjectionStateStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
interface ProjectionStateStorage
{
public function canHandle(string $projectionName): bool;
public function loadPartition(string $projectionName, ?string $partitionKey = null, bool $lock = true): ?ProjectionPartitionState;
public function initPartition(string $projectionName, ?string $partitionKey = null): ?ProjectionPartitionState;
public function loadPartition(string $projectionName, ?string $partitionKey, string $streamName, bool $lock = true): ?ProjectionPartitionState;
public function initPartition(string $projectionName, ?string $partitionKey, string $streamName): ?ProjectionPartitionState;
public function savePartition(ProjectionPartitionState $projectionState): void;
public function delete(string $projectionName): void;
public function init(string $projectionName): void;
Expand Down
2 changes: 1 addition & 1 deletion packages/Ecotone/src/Projecting/StreamSource.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ interface StreamSource
{
public function canHandle(string $projectionName): bool;

public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage;
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function canHandle(string $projectionName): bool
return true;
}

public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey = null): StreamPage
public function load(string $projectionName, ?string $lastPosition, int $count, ?string $partitionKey, string $streamName): StreamPage
{
$from = $lastPosition !== null ? (int) $lastPosition : 0;
$events = array_slice($this->events, $from, $count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public function test_event_driven_projection_combined_with_event_streaming_proje
$positionTracker = new InMemoryConsumerPositionTracker();

// Given an event-driven projection (catches up from stream when triggered)
$eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('test_stream')] class {
$eventDrivenProjection = new #[ProjectionV2('event_driven_product_count'), FromStream('product_stream')] class {
public int $productCount = 0;

#[EventHandler]
Expand Down
Loading
Loading