-
Notifications
You must be signed in to change notification settings - Fork 3
Refactoring pools for concurrent operations #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
356fead
5467696
d3519cb
51e7c0f
d8ae4c9
5347ce0
b861f5b
a703aaf
2f12968
22a8b38
1fc36c9
fdd750f
a70164f
05d27a9
c32a92a
c77ddd8
c9d98e9
3cc8f20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools; | ||
|
|
||
| abstract class Adapter | ||
| { | ||
| abstract public function fill(int $size, mixed $value): static; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably fill should accept callable $value instead
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fill() was a bit misleading in this context. The pool does not eagerly create or insert values; connections are created lazily on demand. Because of that, the $value parameter was never actually used as a concrete value. To better reflect the behavior, this has been changed to |
||
|
|
||
| abstract public function push(mixed $connection): static; | ||
|
|
||
| /** | ||
| * @param int $timeout | ||
| * @return mixed | ||
| */ | ||
| abstract public function pop(int $timeout): mixed; | ||
|
|
||
| abstract public function count(): int; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
|
|
||
| class Stack extends Adapter | ||
| { | ||
| /** @var array<mixed> $pool */ | ||
| protected array $pool = []; | ||
|
|
||
| public function fill(int $size, mixed $value): static | ||
| { | ||
| // Initialize empty pool (no pre-filling) | ||
| $this->pool = []; | ||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to pool | ||
| $this->pool[] = $connection; | ||
| return $this; | ||
| } | ||
|
|
||
| public function pop(int $timeout): mixed | ||
| { | ||
| return array_pop($this->pool); | ||
| } | ||
|
abnegate marked this conversation as resolved.
|
||
|
|
||
| public function count(): int | ||
| { | ||
| return count($this->pool); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Pools\Adapter; | ||
|
|
||
| use Utopia\Pools\Adapter; | ||
| use Swoole\Coroutine\Channel; | ||
| use Swoole\Lock; | ||
|
|
||
| class Swoole extends Adapter | ||
| { | ||
| protected Channel $pool; | ||
|
|
||
| /** @var Lock $lock */ | ||
| protected Lock $lock; | ||
|
ArnabChatterjee20k marked this conversation as resolved.
|
||
| public function fill(int $size, mixed $value): static | ||
| { | ||
| // Create empty channel with capacity (no pre-filling) | ||
| $this->pool = new Channel($size); | ||
|
|
||
| // Initialize lock for thread-safe operations | ||
| $this->lock = new Lock(SWOOLE_MUTEX); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this lock the whole worker? Or just the coroutine? Which is desired?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections) |
||
|
|
||
| return $this; | ||
| } | ||
|
|
||
| public function push(mixed $connection): static | ||
| { | ||
| // Push connection to channel | ||
| $this->pool->push($connection); | ||
| return $this; | ||
| } | ||
|
|
||
| public function pop(int $timeout): mixed | ||
| { | ||
| $result = $this->pool->pop($timeout); | ||
|
|
||
| // if pool is empty or timeout occured => result will be false | ||
|
||
| return $result; | ||
| } | ||
|
Comment on lines
+38
to
+41
|
||
|
|
||
|
|
||
| public function count(): int | ||
| { | ||
| return (int) $this->pool->length(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| namespace Utopia\Pools; | ||
|
|
||
| use Exception; | ||
| use Utopia\Pools\Adapter as PoolAdapter; | ||
| use Utopia\Telemetry\Adapter as Telemetry; | ||
| use Utopia\Telemetry\Adapter\None as NoTelemetry; | ||
| use Utopia\Telemetry\Gauge; | ||
|
|
@@ -13,6 +14,7 @@ | |
| */ | ||
| class Pool | ||
| { | ||
| public const POP_TIMEOUT_IN_SECODNS = 3; | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
| /** | ||
| * @var callable | ||
| */ | ||
|
|
@@ -38,16 +40,18 @@ class Pool | |
| */ | ||
| protected int $retrySleep = 1; // seconds | ||
|
|
||
| /** | ||
| * @var array<Connection<TResource>|true> | ||
| */ | ||
| protected array $pool = []; | ||
| protected PoolAdapter $pool; | ||
|
|
||
| /** | ||
| * @var array<string, Connection<TResource>> | ||
| */ | ||
| protected array $active = []; | ||
|
|
||
| /** | ||
| * Total number of connections created | ||
| */ | ||
| protected int $connectionsCreated = 0; | ||
|
|
||
| private Gauge $telemetryOpenConnections; | ||
| private Gauge $telemetryActiveConnections; | ||
| private Gauge $telemetryIdleConnections; | ||
|
|
@@ -58,14 +62,17 @@ class Pool | |
| private array $telemetryAttributes; | ||
|
|
||
| /** | ||
| * @param PoolAdapter $adapter | ||
| * @param string $name | ||
| * @param int $size | ||
| * @param callable(): TResource $init | ||
| */ | ||
| public function __construct(protected string $name, protected int $size, callable $init) | ||
| public function __construct(PoolAdapter $adapter, protected string $name, protected int $size, callable $init) | ||
| { | ||
| $this->init = $init; | ||
| $this->pool = array_fill(0, $this->size, true); | ||
| $this->pool = $adapter; | ||
| // Initialize empty channel (no pre-filling for lazy initialization) | ||
| $this->pool->fill($this->size, null); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this standard? Not sure what best practice is here Lazy initialisation vs prefilled connections
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pre filled was the previous the approach where we maintain the true based placeholders. It only consists of the connection object |
||
| $this->setTelemetry(new NoTelemetry()); | ||
| } | ||
|
|
||
|
|
@@ -223,48 +230,29 @@ public function pop(): Connection | |
| try { | ||
| do { | ||
| $attempts++; | ||
| $connection = array_pop($this->pool); | ||
| // If pool is empty and size limit not reached, create new connection | ||
| if ($this->pool->count() === 0 && $this->connectionsCreated < $this->size) { | ||
| $connection = $this->createConnection(); | ||
| $this->active[$connection->getID()] = $connection; | ||
| return $connection; | ||
| } | ||
|
|
||
| if (is_null($connection)) { | ||
| $connection = $this->pool->pop(self::POP_TIMEOUT_IN_SECODNS); | ||
|
|
||
| if ($connection === false || $connection === null) { | ||
| if ($attempts >= $this->getRetryAttempts()) { | ||
| throw new Exception("Pool '{$this->name}' is empty (size {$this->size})"); | ||
| } | ||
|
|
||
| $totalSleepTime += $this->getRetrySleep(); | ||
| sleep($this->getRetrySleep()); | ||
| } else { | ||
| break; | ||
| } | ||
| } while ($attempts < $this->getRetryAttempts()); | ||
|
|
||
| if ($connection === true) { // Pool has space, create connection | ||
| $attempts = 0; | ||
|
|
||
| do { | ||
| try { | ||
| $attempts++; | ||
| $connection = new Connection(($this->init)()); | ||
| break; // leave loop if successful | ||
| } catch (\Exception $e) { | ||
| if ($attempts >= $this->getReconnectAttempts()) { | ||
| throw new \Exception('Failed to create connection: ' . $e->getMessage()); | ||
| } | ||
| $totalSleepTime += $this->getReconnectSleep(); | ||
| sleep($this->getReconnectSleep()); | ||
| if ($connection instanceof Connection) { | ||
| $this->active[$connection->getID()] = $connection; | ||
| return $connection; | ||
| } | ||
| } while ($attempts < $this->getReconnectAttempts()); | ||
| } | ||
|
|
||
| if ($connection instanceof Connection) { // connection is available, return it | ||
| if (empty($connection->getID())) { | ||
| $connection->setID($this->getName() . '-' . uniqid()); | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| $connection->setPool($this); | ||
|
|
||
| $this->active[$connection->getID()] = $connection; | ||
| return $connection; | ||
| } | ||
| } while ($attempts < $this->getRetryAttempts()); | ||
|
|
||
| throw new Exception('Failed to get a connection from the pool'); | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| } finally { | ||
|
|
@@ -273,14 +261,55 @@ public function pop(): Connection | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create a new connection | ||
| * | ||
| * @return Connection<TResource> | ||
| * @throws \Exception | ||
| */ | ||
| protected function createConnection(): Connection | ||
| { | ||
| $this->connectionsCreated++; | ||
|
|
||
| $connection = null; | ||
| $attempts = 0; | ||
| do { | ||
| try { | ||
| $attempts++; | ||
| $connection = new Connection(($this->init)()); | ||
| break; | ||
| } catch (\Exception $e) { | ||
| if ($attempts >= $this->getReconnectAttempts()) { | ||
| $this->connectionsCreated--; | ||
| throw new \Exception('Failed to create connection: ' . $e->getMessage()); | ||
| } | ||
| sleep($this->getReconnectSleep()); | ||
| } | ||
| } while ($attempts < $this->getReconnectAttempts()); | ||
|
|
||
| if ($connection === null) { | ||
| $this->connectionsCreated--; | ||
| throw new \Exception('Failed to create connection'); | ||
| } | ||
|
|
||
| if (empty($connection->getID())) { | ||
| $connection->setID($this->getName() . '-' . uniqid()); | ||
| } | ||
|
|
||
| $connection->setPool($this); | ||
|
|
||
| return $connection; | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * @param Connection<TResource> $connection | ||
| * @return $this<TResource> | ||
| */ | ||
| public function push(Connection $connection): static | ||
| { | ||
| try { | ||
| $this->pool[] = $connection; | ||
| // Push the actual connection back to the pool | ||
| $this->pool->push($connection); | ||
| unset($this->active[$connection->getID()]); | ||
|
|
||
| return $this; | ||
|
|
@@ -290,11 +319,14 @@ public function push(Connection $connection): static | |
| } | ||
|
|
||
| /** | ||
| * Returns the number of available connections (idle + not yet created) | ||
| * | ||
| * @return int | ||
| */ | ||
| public function count(): int | ||
| { | ||
| return count($this->pool); | ||
| // Available = idle connections in pool + connections not yet created | ||
| return $this->pool->count() + ($this->size - $this->connectionsCreated); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -323,14 +355,27 @@ public function destroy(?Connection $connection = null): static | |
| { | ||
| try { | ||
| if ($connection !== null) { | ||
| $this->pool[] = true; | ||
| $this->connectionsCreated--; | ||
| unset($this->active[$connection->getID()]); | ||
|
|
||
| // Create a new connection to maintain pool size | ||
| if ($this->connectionsCreated < $this->size) { | ||
| $newConnection = $this->createConnection(); | ||
| $this->pool->push($newConnection); | ||
| } | ||
|
|
||
| return $this; | ||
| } | ||
|
Comment on lines
+411
to
420
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is creating a new connection outside of a lock, isn't that risky?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually two reasons -> pattern and pool is channel in case of swoole
|
||
|
|
||
| foreach ($this->active as $connection) { | ||
| $this->pool[] = true; | ||
| $this->connectionsCreated--; | ||
| unset($this->active[$connection->getID()]); | ||
|
|
||
| // Create a new connection to maintain pool size | ||
| if ($this->connectionsCreated < $this->size) { | ||
| $newConnection = $this->createConnection(); | ||
| $this->pool->push($newConnection); | ||
| } | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| return $this; | ||
|
|
@@ -344,26 +389,27 @@ public function destroy(?Connection $connection = null): static | |
| */ | ||
| public function isEmpty(): bool | ||
| { | ||
| return empty($this->pool); | ||
| return $this->pool->count() === 0; | ||
| } | ||
|
||
|
|
||
| /** | ||
| * @return bool | ||
| */ | ||
| public function isFull(): bool | ||
| { | ||
| return count($this->pool) === $this->size; | ||
| // Pool is full when all possible connections are available (idle or not created yet) | ||
| return count($this->active) === 0; | ||
| } | ||
|
|
||
| private function recordPoolTelemetry(): void | ||
| { | ||
| // Connections get removed from $this->pool when they are active | ||
| $activeConnections = count($this->active); | ||
| $existingConnections = count($this->pool); | ||
| $idleConnections = count(array_filter($this->pool, fn ($data) => $data instanceof Connection)); | ||
| $idleConnections = $this->pool->count(); // Connections in the pool (idle) | ||
| $openConnections = $activeConnections + $idleConnections; // Total connections in use or available | ||
|
|
||
| $this->telemetryActiveConnections->record($activeConnections, $this->telemetryAttributes); | ||
| $this->telemetryIdleConnections->record($idleConnections, $this->telemetryAttributes); | ||
| $this->telemetryOpenConnections->record($activeConnections + $idleConnections, $this->telemetryAttributes); | ||
| $this->telemetryPoolCapacity->record($activeConnections + $existingConnections, $this->telemetryAttributes); | ||
| $this->telemetryOpenConnections->record($openConnections, $this->telemetryAttributes); | ||
| $this->telemetryPoolCapacity->record($activeConnections + $this->pool->count(), $this->telemetryAttributes); | ||
|
||
| } | ||
|
ArnabChatterjee20k marked this conversation as resolved.
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.