diff --git a/.nix/php/lib/php.ini.dist b/.nix/php/lib/php.ini.dist index 488665798b..ba9900777c 100644 --- a/.nix/php/lib/php.ini.dist +++ b/.nix/php/lib/php.ini.dist @@ -9,4 +9,7 @@ file_uploads = On max_file_uploads = 20 short_open_tag = off opcache.enable=1 -opcache.enable_cli=0 \ No newline at end of file +opcache.enable_cli=0 +apc.enabled=1 +apc.enable_cli=1 +apc.shm_size=128M \ No newline at end of file diff --git a/.nix/pkgs/flow-php/package.nix b/.nix/pkgs/flow-php/package.nix index 1d6bbd7406..384e33db14 100644 --- a/.nix/pkgs/flow-php/package.nix +++ b/.nix/pkgs/flow-php/package.nix @@ -22,6 +22,7 @@ let with all; enabled ++ [ + apcu bcmath dom mbstring diff --git a/src/core/etl/src/Flow/ETL/Config.php b/src/core/etl/src/Flow/ETL/Config.php index 00ebf0a0c6..4cf96bd43b 100644 --- a/src/core/etl/src/Flow/ETL/Config.php +++ b/src/core/etl/src/Flow/ETL/Config.php @@ -4,6 +4,7 @@ namespace Flow\ETL; +use Flow\ETL\Config\Aggregation\AggregationConfig; use Flow\ETL\Config\Cache\CacheConfig; use Flow\ETL\Config\ConfigBuilder; use Flow\ETL\Config\Sort\SortConfig; @@ -36,6 +37,7 @@ public function __construct( public SortConfig $sort, private ?Analyze $analyze, public TelemetryConfig $telemetry, + public AggregationConfig $aggregation, ) {} public static function builder(): ConfigBuilder diff --git a/src/core/etl/src/Flow/ETL/Config/Aggregation/AggregationConfig.php b/src/core/etl/src/Flow/ETL/Config/Aggregation/AggregationConfig.php new file mode 100644 index 0000000000..f10975f2d0 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Config/Aggregation/AggregationConfig.php @@ -0,0 +1,21 @@ +memoryLimit === null) { + $aggregationMemory = getenv(AggregationConfig::AGGREGATION_MAX_MEMORY_ENV); + + if (is_string($aggregationMemory)) { + $this->memoryLimit = Unit::fromString($aggregationMemory); + } else { + $memoryLimit = ini_get('memory_limit'); + + if ($memoryLimit === false || $memoryLimit === '-1') { + $this->memoryLimit = Unit::fromBytes(PHP_INT_MAX); + } else { + $this->memoryLimit = Unit::fromString( + $memoryLimit, + )->percentage(self::DEFAULT_AGGREGATION_MEMORY_PERCENTAGE); + } + } + } + + return new AggregationConfig($this->strategy, $this->memoryLimit, $this->storage, $this->filesystemProtocol); + } + + public function filesystemProtocol(string $protocol): self + { + $this->filesystemProtocol = $protocol; + + return $this; + } + + public function memoryLimit(Unit $memoryLimit): self + { + $this->memoryLimit = $memoryLimit; + + return $this; + } + + public function storage(AggregationStorage $storage): self + { + $this->storage = $storage; + + return $this; + } + + public function strategy(AggregationStorageStrategy $strategy): self + { + $this->strategy = $strategy; + + return $this; + } +} diff --git a/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php b/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php index 02606bbd83..80152a7237 100644 --- a/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php @@ -8,12 +8,15 @@ use Flow\ETL\Analyze; use Flow\ETL\Cache; use Flow\ETL\Config; +use Flow\ETL\Config\Aggregation\AggregationConfigBuilder; use Flow\ETL\Config\Cache\CacheConfigBuilder; use Flow\ETL\Config\Sort\SortConfigBuilder; use Flow\ETL\Config\Telemetry\TelemetryConfig; use Flow\ETL\Config\Telemetry\TelemetryOptions; use Flow\ETL\Dataset\Memory\Unit; use Flow\ETL\Filesystem\FilesystemStreams; +use Flow\ETL\GroupBy\AggregationStorageStrategy; +use Flow\ETL\GroupBy\Storage\AggregationStorage; use Flow\ETL\NativePHPRandomValueGenerator; use Flow\ETL\Pipeline\Optimizer; use Flow\ETL\Pipeline\Optimizer\BatchSizeOptimization; @@ -34,6 +37,8 @@ final class ConfigBuilder { + public readonly AggregationConfigBuilder $aggregation; + public readonly CacheConfigBuilder $cache; public readonly SortConfigBuilder $sort; @@ -69,6 +74,7 @@ public function __construct() $this->putInputIntoRows = false; $this->optimizer = null; $this->clock = null; + $this->aggregation = new AggregationConfigBuilder(); $this->cache = new CacheConfigBuilder(); $this->sort = new SortConfigBuilder(); $this->randomValueGenerator = new NativePHPRandomValueGenerator(); @@ -79,6 +85,34 @@ public function __construct() : PackageVersion::get('flow-php/etl'); } + public function aggregationFilesystem(string $protocol): self + { + $this->aggregation->filesystemProtocol($protocol); + + return $this; + } + + public function aggregationMemoryLimit(Unit $unit): self + { + $this->aggregation->memoryLimit($unit); + + return $this; + } + + public function aggregationStorage(AggregationStorageStrategy $strategy): self + { + $this->aggregation->strategy($strategy); + + return $this; + } + + public function aggregationStore(AggregationStorage $storage): self + { + $this->aggregation->storage($storage); + + return $this; + } + public function analyze(Analyze $analyze): self { $this->analyze = $analyze; @@ -111,6 +145,7 @@ public function build(EntryFactory $entryFactory = new EntryFactory()): Config $this->sort->build(), $this->analyze, $this->telemetryConfig ?? TelemetryConfig::default($this->getClock()), + $this->aggregation->build(), ); } diff --git a/src/core/etl/src/Flow/ETL/Function/Average.php b/src/core/etl/src/Flow/ETL/Function/Average.php index d404adea22..1676783917 100644 --- a/src/core/etl/src/Flow/ETL/Function/Average.php +++ b/src/core/etl/src/Flow/ETL/Function/Average.php @@ -21,7 +21,7 @@ use function is_int; use function is_numeric; -final class Average implements AggregatingFunction, WindowFunction +final class Average implements AggregatingFunction, MergeableAggregatingFunction, WindowFunction { private int $count; @@ -82,6 +82,16 @@ public function apply(Row $row, Rows $partition, FlowContext $context): mixed return (new Calculator())->divide($sum, $count, $this->scale, $this->rounding); } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + $this->sum += $other->sum; + $this->count += $other->count; + } + public function over(Window $window): WindowFunction { $this->window = $window; diff --git a/src/core/etl/src/Flow/ETL/Function/Collect.php b/src/core/etl/src/Flow/ETL/Function/Collect.php index 12209f5710..78d4ad51be 100644 --- a/src/core/etl/src/Flow/ETL/Function/Collect.php +++ b/src/core/etl/src/Flow/ETL/Function/Collect.php @@ -11,10 +11,11 @@ use Flow\ETL\Row\EntryFactory; use Flow\ETL\Row\Reference; +use function array_merge; use function current; use function Flow\ETL\DSL\to_entry; -final class Collect implements AggregatingFunction +final class Collect implements AggregatingFunction, MergeableAggregatingFunction { /** * @var array @@ -41,6 +42,15 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + $this->collection = array_merge($this->collection, $other->collection); + } + /** * @return Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/CollectUnique.php b/src/core/etl/src/Flow/ETL/Function/CollectUnique.php index 3f87b0ff09..fd86c2deaf 100644 --- a/src/core/etl/src/Flow/ETL/Function/CollectUnique.php +++ b/src/core/etl/src/Flow/ETL/Function/CollectUnique.php @@ -15,7 +15,7 @@ use function Flow\ETL\DSL\to_entry; use function in_array; -final class CollectUnique implements AggregatingFunction +final class CollectUnique implements AggregatingFunction, MergeableAggregatingFunction { /** * @var array @@ -49,6 +49,20 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + /** @var mixed $value */ + foreach ($other->collection as $value) { + if (!in_array($value, $this->collection, true)) { + $this->collection[] = $value; + } + } + } + /** * @return Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/Count.php b/src/core/etl/src/Flow/ETL/Function/Count.php index 52e585ef3c..3087659836 100644 --- a/src/core/etl/src/Flow/ETL/Function/Count.php +++ b/src/core/etl/src/Flow/ETL/Function/Count.php @@ -16,7 +16,7 @@ use function Flow\ETL\DSL\int_entry; -final class Count implements AggregatingFunction, WindowFunction +final class Count implements AggregatingFunction, MergeableAggregatingFunction, WindowFunction { private int $count; @@ -80,6 +80,15 @@ public function apply(Row $row, Rows $partition, FlowContext $context): mixed return $count; } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + $this->count += $other->count; + } + public function over(Window $window): WindowFunction { $this->window = $window; diff --git a/src/core/etl/src/Flow/ETL/Function/First.php b/src/core/etl/src/Flow/ETL/Function/First.php index 36126f0176..9e7c39b750 100644 --- a/src/core/etl/src/Flow/ETL/Function/First.php +++ b/src/core/etl/src/Flow/ETL/Function/First.php @@ -13,7 +13,7 @@ use function Flow\ETL\DSL\string_entry; -final class First implements AggregatingFunction +final class First implements AggregatingFunction, MergeableAggregatingFunction { /** * @var null|Entry @@ -37,6 +37,15 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + $this->first ??= $other->first; + } + /** * @return Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/Last.php b/src/core/etl/src/Flow/ETL/Function/Last.php index d5c1e76f76..8df306d96d 100644 --- a/src/core/etl/src/Flow/ETL/Function/Last.php +++ b/src/core/etl/src/Flow/ETL/Function/Last.php @@ -13,7 +13,7 @@ use function Flow\ETL\DSL\string_entry; -final class Last implements AggregatingFunction +final class Last implements AggregatingFunction, MergeableAggregatingFunction { /** * @var null|Entry @@ -35,6 +35,17 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + if ($other->last !== null) { + $this->last = $other->last; + } + } + /** * @return Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/Max.php b/src/core/etl/src/Flow/ETL/Function/Max.php index 6794783b8c..9f5eda78a7 100644 --- a/src/core/etl/src/Flow/ETL/Function/Max.php +++ b/src/core/etl/src/Flow/ETL/Function/Max.php @@ -18,7 +18,7 @@ use function is_numeric; use function max; -final class Max implements AggregatingFunction +final class Max implements AggregatingFunction, MergeableAggregatingFunction { private float|DateTimeInterface|null $max; @@ -52,6 +52,25 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + if ($other->max === null) { + return; + } + + if ($this->max === null) { + $this->max = $other->max; + + return; + } + + $this->max = max($this->max, $other->max); + } + /** * @return Entry|Entry|Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/MergeableAggregatingFunction.php b/src/core/etl/src/Flow/ETL/Function/MergeableAggregatingFunction.php new file mode 100644 index 0000000000..4065cd7cf7 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/Function/MergeableAggregatingFunction.php @@ -0,0 +1,10 @@ +min === null) { + return; + } + + if ($this->min === null) { + $this->min = $other->min; + + return; + } + + $this->min = min($this->min, $other->min); + } + /** * @return Entry|Entry|Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/StringAggregate.php b/src/core/etl/src/Flow/ETL/Function/StringAggregate.php index 13f1e468a3..c68fa25544 100644 --- a/src/core/etl/src/Flow/ETL/Function/StringAggregate.php +++ b/src/core/etl/src/Flow/ETL/Function/StringAggregate.php @@ -4,6 +4,7 @@ namespace Flow\ETL\Function; +use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\FlowContext; use Flow\ETL\Row; use Flow\ETL\Row\Entry; @@ -11,6 +12,7 @@ use Flow\ETL\Row\Reference; use Flow\ETL\Row\SortOrder; +use function array_merge; use function count; use function Flow\ETL\DSL\str_entry; use function implode; @@ -18,7 +20,7 @@ use function rsort; use function sort; -final class StringAggregate implements AggregatingFunction +final class StringAggregate implements AggregatingFunction, MergeableAggregatingFunction { /** * @var array @@ -40,6 +42,15 @@ public function aggregate(Row $row, FlowContext $context): void } } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + $this->values = array_merge($this->values, $other->values); + } + /** * @return Row\Entry */ diff --git a/src/core/etl/src/Flow/ETL/Function/Sum.php b/src/core/etl/src/Flow/ETL/Function/Sum.php index 77415475c3..e117a9f302 100644 --- a/src/core/etl/src/Flow/ETL/Function/Sum.php +++ b/src/core/etl/src/Flow/ETL/Function/Sum.php @@ -19,7 +19,7 @@ use function Flow\ETL\DSL\int_entry; use function is_numeric; -final class Sum implements AggregatingFunction, WindowFunction +final class Sum implements AggregatingFunction, MergeableAggregatingFunction, WindowFunction { private float|int $sum; @@ -68,6 +68,16 @@ public function apply(Row $row, Rows $partition, FlowContext $context): mixed return $sum; } + public function merge(MergeableAggregatingFunction $other): void + { + if (!$other instanceof self) { + throw new InvalidArgumentException('Cannot merge ' . self::class . ' with ' . $other::class); + } + + // @mago-ignore analysis:possibly-invalid-argument + $this->sum = (new Calculator())->add($this->sum, $other->sum); + } + public function over(Window $window): WindowFunction { $this->window = $window; diff --git a/src/core/etl/src/Flow/ETL/GroupBy.php b/src/core/etl/src/Flow/ETL/GroupBy.php index d606797cd4..3329121704 100644 --- a/src/core/etl/src/Flow/ETL/GroupBy.php +++ b/src/core/etl/src/Flow/ETL/GroupBy.php @@ -9,6 +9,9 @@ use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Exception\RuntimeException; use Flow\ETL\Function\AggregatingFunction; +use Flow\ETL\Function\MergeableAggregatingFunction; +use Flow\ETL\GroupBy\Storage\AggregationStorage; +use Flow\ETL\GroupBy\Storage\MemoryAggregationStorage; use Flow\ETL\Hash\NativePHPHash; use Flow\ETL\Row\Reference; use Flow\ETL\Row\References; @@ -36,11 +39,6 @@ final class GroupBy */ private array $aggregations; - /** - * @var array, aggregators: array}> - */ - private array $groupedTable; - private ?Reference $pivot; /** @@ -55,14 +53,16 @@ final class GroupBy private readonly References $refs; + private AggregationStorage $storage; + public function __construct(string|Reference ...$entries) { $this->refs = References::init(...array_unique($entries)); $this->aggregations = []; - $this->groupedTable = []; $this->pivotedTable = []; $this->pivotColumns = []; $this->pivot = null; + $this->storage = new MemoryAggregationStorage(); } public function aggregate(AggregatingFunction ...$aggregator): void @@ -80,6 +80,17 @@ public function aggregate(AggregatingFunction ...$aggregator): void $this->aggregations = $aggregator; } + public function allAggregationsMergeable(): bool + { + foreach ($this->aggregations as $aggregation) { + if (!$aggregation instanceof MergeableAggregatingFunction) { + return false; + } + } + + return true; + } + public function group(Rows $rows, FlowContext $context): void { $pivot = $this->pivot; @@ -146,26 +157,22 @@ public function group(Rows $rows, FlowContext $context): void $valuesHash = $this->hash($values); - if (!array_key_exists($valuesHash, $this->groupedTable)) { - $aggregators = []; - - foreach ($this->aggregations as $aggregator) { - $aggregators[] = clone $aggregator; - } - - $this->groupedTable[$valuesHash] = [ - 'values' => $values, - 'aggregators' => $aggregators, - ]; - } + $entry = $this->storage->entry($valuesHash, $values, $this->aggregations); - foreach ($this->groupedTable[$valuesHash]['aggregators'] as $aggregator) { + foreach ($entry->aggregators as $aggregator) { $aggregator->aggregate($row, $context); } + + $this->storage->save($valuesHash, $entry); } } } + public function isPivot(): bool + { + return $this->pivot !== null; + } + public function pivot(Reference $ref): void { $this->pivot = $ref; @@ -199,15 +206,17 @@ public function result(FlowContext $context): Rows return array_to_rows($rows, $context->entryFactory()); } - foreach ($this->groupedTable as $group) { + $this->storage->flush(); + + foreach ($this->storage->all() as $group) { $entries = []; /** @var mixed $value */ - foreach ($group['values'] ?? [] as $entry => $value) { + foreach ($group->values as $entry => $value) { $entries[] = $context->entryFactory()->create($entry, $value); } - foreach ($group['aggregators'] as $aggregator) { + foreach ($group->aggregators as $aggregator) { $entries[] = $aggregator->result($context->entryFactory()); } @@ -219,6 +228,11 @@ public function result(FlowContext $context): Rows return new Rows(...$rows); } + public function useStorage(AggregationStorage $storage): void + { + $this->storage = $storage; + } + /** * @param array $values */ diff --git a/src/core/etl/src/Flow/ETL/GroupBy/AggregationStorageStrategy.php b/src/core/etl/src/Flow/ETL/GroupBy/AggregationStorageStrategy.php new file mode 100644 index 0000000000..18b30633c3 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/AggregationStorageStrategy.php @@ -0,0 +1,17 @@ + + */ + public function all(): Generator; + + /** + * @param array $values + * @param array $prototypes + */ + public function entry(string $hash, array $values, array $prototypes): GroupEntry; + + public function flush(): void; + + public function save(string $hash, GroupEntry $entry): void; +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/BucketsCache/FilesystemGroupBucketsCache.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/BucketsCache/FilesystemGroupBucketsCache.php new file mode 100644 index 0000000000..425ed8633f --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/BucketsCache/FilesystemGroupBucketsCache.php @@ -0,0 +1,58 @@ +cacheDir = ($cacheDir ?? $this->filesystem->getSystemTmpDir())->suffix('/flow-php-group-by/'); + } + + public function get(string $bucketId): array + { + $path = $this->keyPath($bucketId); + + if (!$this->filesystem->status($path)) { + return []; + } + + $stream = $this->filesystem->readFrom($path); + $content = $stream->content(); + $stream->close(); + + return $this->serializer->unserialize($content, [GroupBucket::class])->entries; + } + + public function remove(string $bucketId): void + { + $this->filesystem->rm($this->keyPath($bucketId)->parentDirectory()); + } + + public function set(string $bucketId, array $entries): void + { + $stream = $this->filesystem->writeTo($this->keyPath($bucketId)); + $stream->append($this->serializer->serialize(new GroupBucket($entries))); + $stream->close(); + } + + private function keyPath(string $key): Path + { + return $this->cacheDir->suffix(NativePHPHash::xxh128($key) . '/' . $key . '.php.cache'); + } +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/ExternalAggregationStorage.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/ExternalAggregationStorage.php new file mode 100644 index 0000000000..7885259db3 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/ExternalAggregationStorage.php @@ -0,0 +1,121 @@ + + */ + private array $hot = []; + + /** + * @var list + */ + private array $spilledBuckets = []; + + public function __construct( + private readonly GroupBucketsCache $cache, + private readonly Unit $memoryLimit, + ) { + $this->consumption = new Consumption(false); + } + + /** + * @return Generator + */ + public function all(): Generator + { + if ($this->spilledBuckets === []) { + foreach ($this->hot as $entry) { + yield $entry; + } + + return; + } + + /** @var array $merged */ + $merged = []; + + foreach ($this->spilledBuckets as $bucketId) { + foreach ($this->cache->get($bucketId) as $hash => $entry) { + if (!array_key_exists($hash, $merged)) { + $merged[$hash] = $entry; + + continue; + } + + foreach ($merged[$hash]->aggregators as $index => $aggregator) { + $partial = $entry->aggregators[$index]; + + if ( + $aggregator instanceof MergeableAggregatingFunction + && $partial instanceof MergeableAggregatingFunction + ) { + $aggregator->merge($partial); + } + } + } + + $this->cache->remove($bucketId); + } + + foreach ($merged as $entry) { + yield $entry; + } + } + + public function entry(string $hash, array $values, array $prototypes): GroupEntry + { + if (!array_key_exists($hash, $this->hot)) { + $aggregators = []; + + foreach ($prototypes as $prototype) { + $aggregators[] = clone $prototype; + } + + $this->hot[$hash] = new GroupEntry($values, $aggregators); + } + + return $this->hot[$hash]; + } + + public function flush(): void + { + if ($this->spilledBuckets !== [] && $this->hot !== []) { + $this->spill(); + } + } + + public function save(string $hash, GroupEntry $entry): void + { + if ($this->consumption->currentDiff()->isGreaterThan($this->memoryLimit)) { + $this->spill(); + } + } + + private function spill(): void + { + $bucketId = bin2hex(random_bytes(16)); + $this->cache->set($bucketId, $this->hot); + $this->spilledBuckets[] = $bucketId; + $this->hot = []; + $this->consumption = new Consumption(false); + } +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucket.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucket.php new file mode 100644 index 0000000000..38860554e8 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucket.php @@ -0,0 +1,15 @@ + $entries + */ + public function __construct( + public array $entries, + ) {} +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucketsCache.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucketsCache.php new file mode 100644 index 0000000000..025e523968 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupBucketsCache.php @@ -0,0 +1,23 @@ + + */ + public function get(string $bucketId): array; + + public function remove(string $bucketId): void; + + /** + * @param array $entries + */ + public function set(string $bucketId, array $entries): void; +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupEntry.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupEntry.php new file mode 100644 index 0000000000..40f90004cf --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/GroupEntry.php @@ -0,0 +1,19 @@ + $values + * @param array $aggregators + */ + public function __construct( + public array $values, + public array $aggregators, + ) {} +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/KvAggregationStorage.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/KvAggregationStorage.php new file mode 100644 index 0000000000..d15fa2c2f2 --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/KvAggregationStorage.php @@ -0,0 +1,109 @@ + + */ + private array $hashes = []; + + public function __construct( + private readonly CacheInterface $cache, + private readonly Serializer $serializer = new NativePHPSerializer(), + private readonly string $namespace = 'flow_group_by_', + ) {} + + /** + * @return Generator + */ + public function all(): Generator + { + foreach (array_keys($this->hashes) as $hash) { + $entry = $this->read($hash); + + if ($entry === null) { + throw $this->evicted($hash); + } + + yield $entry; + + $this->cache->delete($this->namespace . $hash); + } + } + + public function entry(string $hash, array $values, array $prototypes): GroupEntry + { + $existing = $this->read($hash); + + if ($existing !== null) { + return $existing; + } + + if (isset($this->hashes[$hash])) { + throw $this->evicted($hash); + } + + $aggregators = []; + + foreach ($prototypes as $prototype) { + $aggregators[] = clone $prototype; + } + + $this->hashes[$hash] = true; + + return new GroupEntry($values, $aggregators); + } + + public function flush(): void {} + + public function save(string $hash, GroupEntry $entry): void + { + $this->hashes[$hash] = true; + + if (!$this->cache->set($this->namespace . $hash, $this->serializer->serialize($entry))) { + throw new RuntimeException( + 'Failed to write aggregation group to the KV store (store full?). Increase its capacity ' + . '(e.g. apc.shm_size, Redis maxmemory) or use AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL.', + ); + } + } + + private function evicted(string $hash): RuntimeException + { + return new RuntimeException( + 'Aggregation group "' + . $hash + . '" was evicted from the KV store mid-aggregation; the result ' + . 'would be incorrect. Increase the store capacity (e.g. apc.shm_size, Redis maxmemory) or use ' + . 'AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL.', + ); + } + + private function read(string $hash): ?GroupEntry + { + // @mago-ignore analysis:mixed-assignment + $serialized = $this->cache->get($this->namespace . $hash); + + if (!is_string($serialized)) { + return null; + } + + return $this->serializer->unserialize($serialized, [GroupEntry::class]); + } +} diff --git a/src/core/etl/src/Flow/ETL/GroupBy/Storage/MemoryAggregationStorage.php b/src/core/etl/src/Flow/ETL/GroupBy/Storage/MemoryAggregationStorage.php new file mode 100644 index 0000000000..d87dc6f2af --- /dev/null +++ b/src/core/etl/src/Flow/ETL/GroupBy/Storage/MemoryAggregationStorage.php @@ -0,0 +1,49 @@ + + */ + private array $groups = []; + + /** + * @return Generator + */ + public function all(): Generator + { + foreach ($this->groups as $group) { + yield $group; + } + } + + public function entry(string $hash, array $values, array $prototypes): GroupEntry + { + if (!array_key_exists($hash, $this->groups)) { + $aggregators = []; + + foreach ($prototypes as $prototype) { + $aggregators[] = clone $prototype; + } + + $this->groups[$hash] = new GroupEntry($values, $aggregators); + } + + return $this->groups[$hash]; + } + + public function flush(): void {} + + public function save(string $hash, GroupEntry $entry): void {} +} diff --git a/src/core/etl/src/Flow/ETL/Processor/GroupByProcessor.php b/src/core/etl/src/Flow/ETL/Processor/GroupByProcessor.php index 46f0050ad6..bc0ed3433b 100644 --- a/src/core/etl/src/Flow/ETL/Processor/GroupByProcessor.php +++ b/src/core/etl/src/Flow/ETL/Processor/GroupByProcessor.php @@ -6,6 +6,10 @@ use Flow\ETL\FlowContext; use Flow\ETL\GroupBy; +use Flow\ETL\GroupBy\Storage\AggregationStorage; +use Flow\ETL\GroupBy\Storage\BucketsCache\FilesystemGroupBucketsCache; +use Flow\ETL\GroupBy\Storage\ExternalAggregationStorage; +use Flow\ETL\GroupBy\Storage\MemoryAggregationStorage; use Flow\ETL\Processor; use Generator; @@ -22,10 +26,38 @@ public function __construct( public function process(Generator $rows, FlowContext $context): Generator { + $this->groupBy->useStorage($this->createStorage($context)); + foreach ($rows as $batch) { $this->groupBy->group($batch, $context); } yield $this->groupBy->result($context); } + + private function createStorage(FlowContext $context): AggregationStorage + { + $config = $context->config->aggregation; + + if ($config->storage !== null) { + return $config->storage; + } + + if ( + $config->strategy->useMemory() + || $this->groupBy->isPivot() + || !$this->groupBy->allAggregationsMergeable() + ) { + return new MemoryAggregationStorage(); + } + + return new ExternalAggregationStorage( + new FilesystemGroupBucketsCache( + $context->filesystem($config->filesystemProtocol), + $context->config->serializer(), + $context->config->cache->localFilesystemCacheDir->suffix('/flow-php-group-by/'), + ), + $config->memoryLimit, + ); + } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByStorageTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByStorageTest.php new file mode 100644 index 0000000000..3a19f8f7a1 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/GroupByStorageTest.php @@ -0,0 +1,131 @@ + + */ + private static ?array $orders = null; + + public function test_apcu_kv_storage_produces_identical_results(): void + { + if (!ApcuAdapter::isSupported()) { + self::markTestSkipped( + 'APCu (with apc.enable_cli=1) is required to evaluate APCu as an aggregation storage backend.', + ); + } + + $cache = new Psr16Cache(new ApcuAdapter('flow_group_by_test')); + $cache->clear(); + + $expected = $this->aggregateBySeller(config_builder()); + $actual = $this->aggregateBySeller(config_builder()->aggregationStore(new KvAggregationStorage($cache))); + + self::assertEqualsCanonicalizing($expected, $actual); + } + + public function test_high_cardinality_group_by_spills_and_stays_correct(): void + { + $memory = $this->aggregateByEmail(config_builder()); + + $external = $this->aggregateByEmail( + config_builder() + ->aggregationStorage(AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL) + ->aggregationMemoryLimit(Unit::fromKb(256)), + ); + + self::assertEqualsCanonicalizing($memory, $external); + } + + public function test_kv_storage_produces_identical_results(): void + { + $expected = $this->aggregateBySeller(config_builder()); + + $actual = $this->aggregateBySeller(config_builder()->aggregationStore(new KvAggregationStorage(new Psr16Cache( + new ArrayAdapter(), + )))); + + self::assertEqualsCanonicalizing($expected, $actual); + } + + public function test_seller_aggregation_is_identical_between_memory_and_external_spill_storage(): void + { + $memory = $this->aggregateBySeller(config_builder()); + + $external = $this->aggregateBySeller( + config_builder() + ->aggregationStorage(AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL) + ->aggregationMemoryLimit(Unit::fromKb(256)), + ); + + self::assertEqualsCanonicalizing($memory, $external); + self::assertCount(5, $memory); + } + + private function aggregateByEmail(ConfigBuilder $config): array + { + return df($config)->read(from_array($this->orders()))->groupBy('email')->aggregate(count())->fetch()->toArray(); + } + + private function aggregateBySeller(ConfigBuilder $config): array + { + return df($config) + ->read(from_array($this->orders())) + ->groupBy('seller_id') + ->aggregate(count(), sum('discount'), collect('customer')) + ->fetch() + ->toArray(); + } + + /** + * @return list + */ + private function orders(): array + { + if (self::$orders !== null) { + return self::$orders; + } + + $orders = []; + + foreach ((new FakeRandomOrdersExtractor(self::ORDERS))->rawData() as $order) { + /** @var mixed $discount */ + $discount = $order['discount']; + + $orders[] = [ + 'seller_id' => (string) $order['seller_id'], + 'email' => (string) $order['email'], + 'customer' => (string) $order['customer'], + 'discount' => is_numeric($discount) ? (float) $discount : null, + ]; + } + + self::$orders = $orders; + + return $orders; + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/GroupBy/Storage/ExternalAggregationStorageTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/GroupBy/Storage/ExternalAggregationStorageTest.php new file mode 100644 index 0000000000..aea7423f04 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/GroupBy/Storage/ExternalAggregationStorageTest.php @@ -0,0 +1,67 @@ +cacheDir->suffix('/external-aggregation-storage/'); + $this->fs()->rm($cacheDir); + + $context = flow_context(); + $storage = new ExternalAggregationStorage( + new FilesystemGroupBucketsCache($this->fs(), $this->serializer(), $cacheDir), + Unit::fromBytes(0), + ); + + $input = [ + ['type' => 'a', 'v' => 1], + ['type' => 'b', 'v' => 2], + ['type' => 'a', 'v' => 3], + ['type' => 'a', 'v' => 4], + ['type' => 'b', 'v' => 5], + ]; + + foreach ($input as $record) { + $entry = $storage->entry($record['type'], ['type' => $record['type']], [sum(ref('v'))]); + + foreach ($entry->aggregators as $aggregator) { + $aggregator->aggregate(row(str_entry('type', $record['type']), int_entry('v', $record['v'])), $context); + } + + $storage->save($record['type'], $entry); + } + + self::assertNotNull($this->fs()->status($cacheDir)); + + $results = []; + + foreach ($storage->all() as $group) { + /** @var int $sum */ + $sum = $group->aggregators[0]->result($context->entryFactory())->value(); + $results[(string) $group->values['type']] = $sum; + } + + ksort($results); + + self::assertSame(['a' => 8, 'b' => 7], $results); + + $this->fs()->rm($cacheDir); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Config/Aggregation/AggregationConfigBuilderTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Config/Aggregation/AggregationConfigBuilderTest.php new file mode 100644 index 0000000000..112e62b03e --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Config/Aggregation/AggregationConfigBuilderTest.php @@ -0,0 +1,71 @@ +memoryLimit(Unit::fromMb(16)) + ->build(); + + self::assertSame(Unit::fromMb(16)->inBytes(), $config->memoryLimit->inBytes()); + } + + public function test_default_strategy_is_memory(): void + { + $config = (new AggregationConfigBuilder())->build(); + + self::assertSame(AggregationStorageStrategy::MEMORY, $config->strategy); + self::assertNull($config->storage); + self::assertGreaterThan(0, $config->memoryLimit->inBytes()); + } + + public function test_injecting_a_custom_storage(): void + { + $storage = new KvAggregationStorage(new Psr16Cache(new ArrayAdapter())); + + $config = (new AggregationConfigBuilder()) + ->storage($storage) + ->build(); + + self::assertSame($storage, $config->storage); + } + + public function test_memory_limit_is_read_from_environment_variable(): void + { + putenv(AggregationConfig::AGGREGATION_MAX_MEMORY_ENV . '=10M'); + + try { + $config = (new AggregationConfigBuilder())->build(); + + self::assertSame(Unit::fromMb(10)->inBytes(), $config->memoryLimit->inBytes()); + } finally { + putenv(AggregationConfig::AGGREGATION_MAX_MEMORY_ENV); + } + } + + public function test_selecting_the_external_strategy(): void + { + $config = (new AggregationConfigBuilder()) + ->strategy(AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL) + ->build(); + + self::assertSame(AggregationStorageStrategy::MEMORY_FALLBACK_EXTERNAL, $config->strategy); + self::assertFalse($config->strategy->useMemory()); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/AggregatingFunctionMergeTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/AggregatingFunctionMergeTest.php new file mode 100644 index 0000000000..8f6d059be3 --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/AggregatingFunctionMergeTest.php @@ -0,0 +1,194 @@ +aggregate(row(int_entry('v', 1)), $context); + $left->aggregate(row(int_entry('v', 2)), $context); + + $right = average(ref('v')); + $right->aggregate(row(int_entry('v', 3)), $context); + + $left->merge($right); + + self::assertEquals(2, $left->result($context->entryFactory())->value()); + } + + public function test_collect_merge_appends_values(): void + { + $context = flow_context(); + + $left = collect(ref('v')); + $left->aggregate(row(int_entry('v', 1)), $context); + $left->aggregate(row(int_entry('v', 2)), $context); + + $right = collect(ref('v')); + $right->aggregate(row(int_entry('v', 3)), $context); + + $left->merge($right); + + self::assertSame([1, 2, 3], $left->result($context->entryFactory())->value()); + } + + public function test_collect_unique_merge_unions_values(): void + { + $context = flow_context(); + + $left = collect_unique(ref('v')); + $left->aggregate(row(int_entry('v', 1)), $context); + $left->aggregate(row(int_entry('v', 2)), $context); + + $right = collect_unique(ref('v')); + $right->aggregate(row(int_entry('v', 2)), $context); + $right->aggregate(row(int_entry('v', 3)), $context); + + $left->merge($right); + + self::assertSame([1, 2, 3], $left->result($context->entryFactory())->value()); + } + + public function test_count_merge_adds_counts(): void + { + $context = flow_context(); + + $left = count(); + $left->aggregate(row(int_entry('v', 1)), $context); + $left->aggregate(row(int_entry('v', 2)), $context); + + $right = count(); + $right->aggregate(row(int_entry('v', 3)), $context); + + $left->merge($right); + + self::assertSame(3, $left->result($context->entryFactory())->value()); + } + + public function test_first_merge_keeps_earlier_value(): void + { + $context = flow_context(); + + $left = first(ref('v')); + $left->aggregate(row(int_entry('v', 1)), $context); + + $right = first(ref('v')); + $right->aggregate(row(int_entry('v', 9)), $context); + + $left->merge($right); + + self::assertSame(1, $left->result($context->entryFactory())->value()); + } + + public function test_last_merge_keeps_later_value(): void + { + $context = flow_context(); + + $left = last(ref('v')); + $left->aggregate(row(int_entry('v', 1)), $context); + + $right = last(ref('v')); + $right->aggregate(row(int_entry('v', 9)), $context); + + $left->merge($right); + + self::assertSame(9, $left->result($context->entryFactory())->value()); + } + + public function test_max_merge_keeps_greatest(): void + { + $context = flow_context(); + + $left = max(ref('v')); + $left->aggregate(row(int_entry('v', 5)), $context); + $left->aggregate(row(int_entry('v', 3)), $context); + + $right = max(ref('v')); + $right->aggregate(row(int_entry('v', 9)), $context); + + $left->merge($right); + + self::assertSame(9, $left->result($context->entryFactory())->value()); + } + + public function test_merge_with_different_function_type_throws(): void + { + $this->expectException(InvalidArgumentException::class); + + sum(ref('v'))->merge(count()); + } + + public function test_min_merge_keeps_smallest(): void + { + $context = flow_context(); + + $left = min(ref('v')); + $left->aggregate(row(int_entry('v', 5)), $context); + $left->aggregate(row(int_entry('v', 3)), $context); + + $right = min(ref('v')); + $right->aggregate(row(int_entry('v', 1)), $context); + + $left->merge($right); + + self::assertSame(1, $left->result($context->entryFactory())->value()); + } + + public function test_string_aggregate_merge_concatenates_values(): void + { + $context = flow_context(); + + $left = new StringAggregate(ref('v'), ',', SortOrder::ASC); + $left->aggregate(row(str_entry('v', 'b')), $context); + $left->aggregate(row(str_entry('v', 'a')), $context); + + $right = new StringAggregate(ref('v'), ',', SortOrder::ASC); + $right->aggregate(row(str_entry('v', 'c')), $context); + + $left->merge($right); + + self::assertSame('a,b,c', $left->result($context->entryFactory())->value()); + } + + public function test_sum_merge_adds_partial_sums(): void + { + $context = flow_context(); + + $left = sum(ref('v')); + $left->aggregate(row(int_entry('v', 1)), $context); + $left->aggregate(row(int_entry('v', 2)), $context); + + $right = sum(ref('v')); + $right->aggregate(row(int_entry('v', 3)), $context); + + $left->merge($right); + + self::assertSame(6, $left->result($context->entryFactory())->value()); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/GroupBy/Storage/KvAggregationStorageTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/GroupBy/Storage/KvAggregationStorageTest.php new file mode 100644 index 0000000000..4621ff2f9f --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/GroupBy/Storage/KvAggregationStorageTest.php @@ -0,0 +1,93 @@ +entry($group, ['type' => $group], [sum(ref('v'))]); + $entry->aggregators[0]->aggregate(row(str_entry('type', $group), int_entry('v', 1)), $context); + $storage->save($group, $entry); + } + + $cache->delete('flow_group_by_a'); + + $this->expectException(RuntimeException::class); + + iterator_to_array($storage->all()); + } + + public function test_detects_eviction_on_subsequent_access(): void + { + $cache = new Psr16Cache(new ArrayAdapter()); + $storage = new KvAggregationStorage($cache); + $context = flow_context(); + + $entry = $storage->entry('a', ['type' => 'a'], [sum(ref('v'))]); + $entry->aggregators[0]->aggregate(row(str_entry('type', 'a'), int_entry('v', 1)), $context); + $storage->save('a', $entry); + + $cache->delete('flow_group_by_a'); + + $this->expectException(RuntimeException::class); + + $storage->entry('a', ['type' => 'a'], [sum(ref('v'))]); + } + + public function test_round_trip_produces_correct_aggregates(): void + { + $cache = new Psr16Cache(new ArrayAdapter()); + $storage = new KvAggregationStorage($cache); + $context = flow_context(); + + $input = [ + ['type' => 'a', 'v' => 1], + ['type' => 'b', 'v' => 2], + ['type' => 'a', 'v' => 3], + ]; + + foreach ($input as $record) { + $entry = $storage->entry($record['type'], ['type' => $record['type']], [sum(ref('v'))]); + $entry->aggregators[0]->aggregate( + row(str_entry('type', $record['type']), int_entry('v', $record['v'])), + $context, + ); + $storage->save($record['type'], $entry); + } + + $results = []; + + foreach ($storage->all() as $group) { + /** @var int $value */ + $value = $group->aggregators[0]->result($context->entryFactory())->value(); + $results[(string) $group->values['type']] = $value; + } + + ksort($results); + + self::assertSame(['a' => 4, 'b' => 2], $results); + } +}