Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
356fead
Refactor pool management system
ArnabChatterjee20k Dec 16, 2025
5467696
Refactor connection pool management and improve concurrency handling
ArnabChatterjee20k Dec 17, 2025
d3519cb
Implement withLock method for thread-safe operations and fix constant…
ArnabChatterjee20k Dec 17, 2025
51e7c0f
Merge remote-tracking branch 'origin/main' into dat-966
ArnabChatterjee20k Dec 17, 2025
d8ae4c9
Fix parameter type in withLock method and enhance connection pool man…
ArnabChatterjee20k Dec 17, 2025
5347ce0
fix new connection creation while pool is empty and now coonnections …
ArnabChatterjee20k Dec 17, 2025
b861f5b
linting
ArnabChatterjee20k Dec 17, 2025
a703aaf
typo fix
ArnabChatterjee20k Dec 17, 2025
2f12968
Refactor connection creation and destruction logic to improve concurr…
ArnabChatterjee20k Dec 17, 2025
22a8b38
Enhance documentation for withLock method and improve Pool class logi…
ArnabChatterjee20k Dec 17, 2025
1fc36c9
doc strings added
ArnabChatterjee20k Dec 17, 2025
fdd750f
Refactor Adapter methods to standardize initialization and locking; r…
ArnabChatterjee20k Dec 18, 2025
a70164f
fix active connection was not getting set under lock leading to concu…
ArnabChatterjee20k Dec 18, 2025
05d27a9
Add retry mechanism to use method for connection handling with tests
ArnabChatterjee20k Dec 22, 2025
c32a92a
linting
ArnabChatterjee20k Dec 22, 2025
c77ddd8
Revert "linting"
ArnabChatterjee20k Jan 5, 2026
c9d98e9
Revert "Add retry mechanism to use method for connection handling wit…
ArnabChatterjee20k Jan 5, 2026
3cc8f20
removed marked skipped for checking on cli
ArnabChatterjee20k Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
},
"require": {
"php": ">=8.4",
"utopia-php/telemetry": "0.1.*"
"utopia-php/telemetry": "0.1.*",
"ext-swoole": "*"
Comment thread
ArnabChatterjee20k marked this conversation as resolved.
Outdated
},
"require-dev": {
"phpunit/phpunit": "11.*",
"laravel/pint": "1.*",
"phpstan/phpstan": "1.*"
"phpstan/phpstan": "1.*",
"swoole/ide-helper": "5.1.2"
},
"suggests": {
"ext-mongodb": "Needed to support MongoDB database pools",
"ext-redis": "Needed to support Redis cache pools",
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools"
"ext-pdo": "Needed to support MariaDB, MySQL or SQLite database pools",
"ext-swoole" : "Needed to support Swoole based pool adapter"
},
"config": {
"platform": {
Expand Down
711 changes: 454 additions & 257 deletions composer.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Utopia\Pools;

abstract class Adapter
{
abstract public function fill(int $size, mixed $value): static;
Copy link
Copy Markdown
Contributor

@loks0n loks0n Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fill should accept callable $value instead

Copy link
Copy Markdown
Contributor Author

@ArnabChatterjee20k ArnabChatterjee20k Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fill() was a bit misleading in this context. The pool does not eagerly create or insert values; connections are created lazily on demand. Because of that, the $value parameter was never actually used as a concrete value.

To better reflect the behavior, this has been changed to initialize(int $size), which only sets up the pool capacity and internal state.


abstract public function push(mixed $connection): static;

/**
* @param int $timeout
* @return mixed
*/
abstract public function pop(int $timeout): mixed;

abstract public function count(): int;
}
35 changes: 35 additions & 0 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;

class Stack extends Adapter
{
/** @var array<mixed> $pool */
protected array $pool = [];

public function fill(int $size, mixed $value): static
{
// Initialize empty pool (no pre-filling)
$this->pool = [];
return $this;
}

public function push(mixed $connection): static
{
// Push connection to pool
$this->pool[] = $connection;
return $this;
}

public function pop(int $timeout): mixed
{
return array_pop($this->pool);
}
Comment thread
abnegate marked this conversation as resolved.

public function count(): int
{
return count($this->pool);
}
}
46 changes: 46 additions & 0 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Utopia\Pools\Adapter;

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Lock;

class Swoole extends Adapter
{
protected Channel $pool;

/** @var Lock $lock */
protected Lock $lock;
Comment thread
ArnabChatterjee20k marked this conversation as resolved.
public function fill(int $size, mixed $value): static
{
// Create empty channel with capacity (no pre-filling)
$this->pool = new Channel($size);

// Initialize lock for thread-safe operations
$this->lock = new Lock(SWOOLE_MUTEX);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this lock the whole worker? Or just the coroutine? Which is desired?

Copy link
Copy Markdown
Contributor Author

@ArnabChatterjee20k ArnabChatterjee20k Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mutex is shared across all coroutines in the worker process. When one coroutine acquires it, the entire worker is blocked until the lock is released. This ensures atomic updates (e.g., incrementing/decrementing counters or registering/deregistering active connections)
so all operations on the pool will be synchronized


return $this;
}

public function push(mixed $connection): static
{
// Push connection to channel
$this->pool->push($connection);
return $this;
}

public function pop(int $timeout): mixed
{
$result = $this->pool->pop($timeout);

// if pool is empty or timeout occured => result will be false
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a spelling error in the comment: "occured" should be "occurred".

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated via doc string

return $result;
}
Comment on lines +38 to +41
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pop method should document its timeout behavior more clearly. The method returns false on timeout or when the pool is empty, but this behavior is only mentioned in a comment on line 37. The PHPDoc should explicitly document the return type possibilities including false for timeout scenarios.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added



public function count(): int
{
return (int) $this->pool->length();
}
}
144 changes: 95 additions & 49 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Utopia\Pools;

use Exception;
use Utopia\Pools\Adapter as PoolAdapter;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\Telemetry\Gauge;
Expand All @@ -13,6 +14,7 @@
*/
class Pool
{
public const POP_TIMEOUT_IN_SECODNS = 3;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
/**
* @var callable
*/
Expand All @@ -38,16 +40,18 @@ class Pool
*/
protected int $retrySleep = 1; // seconds

/**
* @var array<Connection<TResource>|true>
*/
protected array $pool = [];
protected PoolAdapter $pool;

/**
* @var array<string, Connection<TResource>>
*/
protected array $active = [];

/**
* Total number of connections created
*/
protected int $connectionsCreated = 0;

private Gauge $telemetryOpenConnections;
private Gauge $telemetryActiveConnections;
private Gauge $telemetryIdleConnections;
Expand All @@ -58,14 +62,17 @@ class Pool
private array $telemetryAttributes;

/**
* @param PoolAdapter $adapter
* @param string $name
* @param int $size
* @param callable(): TResource $init
*/
public function __construct(protected string $name, protected int $size, callable $init)
public function __construct(PoolAdapter $adapter, protected string $name, protected int $size, callable $init)
{
$this->init = $init;
$this->pool = array_fill(0, $this->size, true);
$this->pool = $adapter;
// Initialize empty channel (no pre-filling for lazy initialization)
$this->pool->fill($this->size, null);
Copy link
Copy Markdown
Contributor

@loks0n loks0n Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this standard? Not sure what best practice is here

Lazy initialisation vs prefilled connections

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre filled was the previous the approach where we maintain the true based placeholders.
Lazy approach is inspired from the swoole connection pool implementation

It only consists of the connection object

$this->setTelemetry(new NoTelemetry());
}

Expand Down Expand Up @@ -223,48 +230,29 @@ public function pop(): Connection
try {
do {
$attempts++;
$connection = array_pop($this->pool);
// If pool is empty and size limit not reached, create new connection
if ($this->pool->count() === 0 && $this->connectionsCreated < $this->size) {
$connection = $this->createConnection();
$this->active[$connection->getID()] = $connection;
return $connection;
}

if (is_null($connection)) {
$connection = $this->pool->pop(self::POP_TIMEOUT_IN_SECODNS);

if ($connection === false || $connection === null) {
if ($attempts >= $this->getRetryAttempts()) {
throw new Exception("Pool '{$this->name}' is empty (size {$this->size})");
}

$totalSleepTime += $this->getRetrySleep();
sleep($this->getRetrySleep());
} else {
break;
}
} while ($attempts < $this->getRetryAttempts());

if ($connection === true) { // Pool has space, create connection
$attempts = 0;

do {
try {
$attempts++;
$connection = new Connection(($this->init)());
break; // leave loop if successful
} catch (\Exception $e) {
if ($attempts >= $this->getReconnectAttempts()) {
throw new \Exception('Failed to create connection: ' . $e->getMessage());
}
$totalSleepTime += $this->getReconnectSleep();
sleep($this->getReconnectSleep());
if ($connection instanceof Connection) {
$this->active[$connection->getID()] = $connection;
return $connection;
}
} while ($attempts < $this->getReconnectAttempts());
}

if ($connection instanceof Connection) { // connection is available, return it
if (empty($connection->getID())) {
$connection->setID($this->getName() . '-' . uniqid());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

$connection->setPool($this);

$this->active[$connection->getID()] = $connection;
return $connection;
}
} while ($attempts < $this->getRetryAttempts());

throw new Exception('Failed to get a connection from the pool');
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} finally {
Expand All @@ -273,14 +261,55 @@ public function pop(): Connection
}
}

/**
* Create a new connection
*
* @return Connection<TResource>
* @throws \Exception
*/
protected function createConnection(): Connection
{
$this->connectionsCreated++;

$connection = null;
$attempts = 0;
do {
try {
$attempts++;
$connection = new Connection(($this->init)());
break;
} catch (\Exception $e) {
if ($attempts >= $this->getReconnectAttempts()) {
$this->connectionsCreated--;
throw new \Exception('Failed to create connection: ' . $e->getMessage());
}
sleep($this->getReconnectSleep());
}
} while ($attempts < $this->getReconnectAttempts());

if ($connection === null) {
$this->connectionsCreated--;
throw new \Exception('Failed to create connection');
}

if (empty($connection->getID())) {
$connection->setID($this->getName() . '-' . uniqid());
}

$connection->setPool($this);

return $connection;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* @param Connection<TResource> $connection
* @return $this<TResource>
*/
public function push(Connection $connection): static
{
try {
$this->pool[] = $connection;
// Push the actual connection back to the pool
$this->pool->push($connection);
unset($this->active[$connection->getID()]);

return $this;
Expand All @@ -290,11 +319,14 @@ public function push(Connection $connection): static
}

/**
* Returns the number of available connections (idle + not yet created)
*
* @return int
*/
public function count(): int
{
return count($this->pool);
// Available = idle connections in pool + connections not yet created
return $this->pool->count() + ($this->size - $this->connectionsCreated);
}

/**
Expand Down Expand Up @@ -323,14 +355,27 @@ public function destroy(?Connection $connection = null): static
{
try {
if ($connection !== null) {
$this->pool[] = true;
$this->connectionsCreated--;
unset($this->active[$connection->getID()]);

// Create a new connection to maintain pool size
if ($this->connectionsCreated < $this->size) {
$newConnection = $this->createConnection();
$this->pool->push($newConnection);
}

return $this;
}
Comment on lines +411 to 420
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is creating a new connection outside of a lock, isn't that risky?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually two reasons -> pattern and pool is channel in case of swoole

  1. Since pool is channel based so no issues of race condition

  2. $shouldCreate is done under the lock.
    Followed this pattern throughout the locking mechanism

Lock   → check + increment only
Unlock → create connection (no lock)
On failure → lock + decrement


foreach ($this->active as $connection) {
$this->pool[] = true;
$this->connectionsCreated--;
unset($this->active[$connection->getID()]);

// Create a new connection to maintain pool size
if ($this->connectionsCreated < $this->size) {
$newConnection = $this->createConnection();
$this->pool->push($newConnection);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

return $this;
Expand All @@ -344,26 +389,27 @@ public function destroy(?Connection $connection = null): static
*/
public function isEmpty(): bool
{
return empty($this->pool);
return $this->pool->count() === 0;
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isEmpty() method implementation may not align with user expectations. According to the method's logic, the pool is only considered empty when there are no idle connections in the pool storage, but it doesn't account for connections that haven't been created yet (when connectionsCreated < size). Consider documenting this behavior or updating the implementation to check if count() === 0 which includes not-yet-created connections.

Copilot uses AI. Check for mistakes.

/**
* @return bool
*/
public function isFull(): bool
{
return count($this->pool) === $this->size;
// Pool is full when all possible connections are available (idle or not created yet)
return count($this->active) === 0;
}

private function recordPoolTelemetry(): void
{
// Connections get removed from $this->pool when they are active
$activeConnections = count($this->active);
$existingConnections = count($this->pool);
$idleConnections = count(array_filter($this->pool, fn ($data) => $data instanceof Connection));
$idleConnections = $this->pool->count(); // Connections in the pool (idle)
$openConnections = $activeConnections + $idleConnections; // Total connections in use or available

$this->telemetryActiveConnections->record($activeConnections, $this->telemetryAttributes);
$this->telemetryIdleConnections->record($idleConnections, $this->telemetryAttributes);
$this->telemetryOpenConnections->record($activeConnections + $idleConnections, $this->telemetryAttributes);
$this->telemetryPoolCapacity->record($activeConnections + $existingConnections, $this->telemetryAttributes);
$this->telemetryOpenConnections->record($openConnections, $this->telemetryAttributes);
$this->telemetryPoolCapacity->record($activeConnections + $this->pool->count(), $this->telemetryAttributes);
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The telemetry capacity calculation is redundant. Line 440 calculates activeConnections + $this->pool->count(), which is the same as openConnections calculated on line 435. Consider reusing the $openConnections variable instead of recalculating the same value.

Copilot uses AI. Check for mistakes.
}
Comment thread
ArnabChatterjee20k marked this conversation as resolved.
}
Loading