Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## 0.14.3 (unreleased)
## 0.14.4 (unreleased)

- Web: Stop leaking dedicated web workers when databases are closed.

## 0.14.3

- Include identifier of mutexes when a navigator lock attempt is aborted.
- Web: Stop leaking dedicated web workers when databases are closed.

## 0.14.2

Expand Down
11 changes: 10 additions & 1 deletion packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import '../common/sqlite_database.dart';
import '../common/timeouts.dart';
import '../impl/context.dart';
import 'connection.dart';
import 'finalizer.dart';
import 'protocol.dart';
import 'web_mutex.dart';

Expand All @@ -29,6 +30,12 @@ final class WebDatabase extends SqliteDatabaseImpl
/// web broadcast channels to forward local update events to other tabs.
final BroadcastUpdates? broadcastUpdates;

/// The [WebSqlite] wrapper from which this database instance is derived.
///
/// When all databases derived from the same [WebSqlite] instances are closed
/// or no longer referenced, this automatically invoked [WebSqlite.close].
final FinalizableWebSqliteResource? _finalizableSource;

@override
bool closed = false;

Expand All @@ -39,11 +46,13 @@ final class WebDatabase extends SqliteDatabaseImpl
required this.profileQueries,
required this.updates,
this.broadcastUpdates,
});
FinalizableWebSqliteResource? finalizable,
}) : _finalizableSource = finalizable;

@override
Future<void> close() async {
await _database.dispose();
_finalizableSource?.close();
closed = true;
}

Expand Down
29 changes: 21 additions & 8 deletions packages/sqlite_async/lib/src/web/database/async_web_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,40 @@ final class AsyncWebDatabaseImpl extends SqliteDatabaseImpl
_connection = await openFactory.openConnection(
SqliteOpenOptions(primaryConnection: true, readOnly: false));

final broadcastUpdates = _connection.broadcastUpdates;
_broadcastUpdatesSubscription =
_installUpdatesListener(_connection, updatesController);
}

// The updated might be coming from a single static stream controller, we want
// to avoid capturing `this` in a subscription.
static StreamSubscription<UpdateNotification>? _installUpdatesListener(
WebDatabase connection,
StreamController<UpdateNotification> updatesController) {
final broadcastUpdates = connection.broadcastUpdates;
final localUpdates = updatesController;
StreamSubscription<UpdateNotification>? broadcastUpdatesSubscription;

if (broadcastUpdates == null) {
// We can use updates directly from the database.
_connection.updates.forEach((update) {
updatesController.add(update);
connection.updates.forEach((update) {
localUpdates.add(update);
});
} else {
_connection.updates.forEach((update) {
updatesController.add(update);
connection.updates.forEach((update) {
localUpdates.add(update);

// Share local updates with other tabs
broadcastUpdates.send(update);
});

// Also add updates from other tabs, note that things we send aren't
// received by our tab.
_broadcastUpdatesSubscription =
broadcastUpdates.updates.listen((updates) {
updatesController.add(updates);
broadcastUpdatesSubscription = broadcastUpdates.updates.listen((updates) {
localUpdates.add(updates);
});
}

return broadcastUpdatesSubscription;
}

T _runZoned<T>(T Function() callback, {required String debugContext}) {
Expand Down
77 changes: 77 additions & 0 deletions packages/sqlite_async/lib/src/web/finalizer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import 'package:sqlite3_web/sqlite3_web.dart';

import '../sqlite_options.dart';

/// A ref-counted [WebSqlite] wrapper.
///
/// Each reference is represented as [FinalizableWebSqliteResource], which have
/// a finalizer attached to them. When all resources pointing to this instance
/// are finalized, we can close the inner [WebSqlite] instance.
final class _RefCountedWebSqlite {
final Future<WebSqlite> instance;
int _users = 0;

_RefCountedWebSqlite._(this.instance);

static (_RefCountedWebSqlite, FinalizableWebSqliteResource) create(
Future<WebSqlite> instance) {
final ref = _RefCountedWebSqlite._(instance);
ref._users = 1;
return (ref, FinalizableWebSqliteResource._(ref));
}

FinalizableWebSqliteResource reference() {
assert(_users > 0);
_users++;
return FinalizableWebSqliteResource._(this);
}

void decrementUsers() {
_users--;
if (_users == 0) {
instance.then((sqlite) => sqlite.close());
}
}
}

final class FinalizableWebSqliteResource {
final _RefCountedWebSqlite _instance;
var _closed = false;

FinalizableWebSqliteResource._(this._instance) {
_finalizer.attach(this, _instance, detach: this);
}

Future<WebSqlite> get sqlite => _instance.instance;

void close() {
if (!_closed) {
_closed = true;
_instance.decrementUsers();
_finalizer.detach(this);
}
}

FinalizableWebSqliteResource clone() =>
FinalizableWebSqliteResource._(_instance);

static final Finalizer<_RefCountedWebSqlite> _finalizer =
Finalizer((r) => r.decrementUsers());
}

/// Active [WebSqlite] instance, keyed by options.
final Map<String, _RefCountedWebSqlite> _activeSqliteInstances = {};

FinalizableWebSqliteResource resolveWebSqliteResource(
WebSqliteOptions options, Future<WebSqlite> Function() open) {
final cacheKey = options.wasmUri + options.workerUri;

if (_activeSqliteInstances[cacheKey] case final instance?
when instance._users > 0) {
return instance.reference();
}

final (sqlite, ref) = _RefCountedWebSqlite.create(open());
_activeSqliteInstances[cacheKey] = sqlite;
return ref;
}
23 changes: 8 additions & 15 deletions packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,25 @@ import 'package:sqlite_async/src/web/web_mutex.dart';

import '../common/abstract_open_factory.dart';
import 'database.dart';
import 'finalizer.dart';
import 'update_notifications.dart';
import 'worker/worker_utils.dart';

final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams();
Map<String, FutureOr<WebSqlite>> _webSQLiteImplementations = {};

/// [SqliteOpenFactory] implementation for the web.
///
/// This class can be extended to customize how databases are opened on the web.
base class WebSqliteOpenFactory extends InternalOpenFactory {
late final Future<WebSqlite> _initialized = Future.sync(() {
final cacheKey = sqliteOptions.webSqliteOptions.wasmUri +
sqliteOptions.webSqliteOptions.workerUri;

if (_webSQLiteImplementations.containsKey(cacheKey)) {
return _webSQLiteImplementations[cacheKey]!;
}

_webSQLiteImplementations[cacheKey] =
openWebSqlite(sqliteOptions.webSqliteOptions);
return _webSQLiteImplementations[cacheKey]!;
});
late final _openedWebSqlite = resolveWebSqliteResource(
sqliteOptions.webSqliteOptions,
() => openWebSqlite(sqliteOptions.webSqliteOptions),
);

WebSqliteOpenFactory(
{required super.path, super.sqliteOptions = const SqliteOptions()}) {
// Make sure initializer starts running immediately
_initialized;
_openedWebSqlite;
}

/// Opens a [WebSqlite] instance for the given [options].
Expand Down Expand Up @@ -73,7 +65,7 @@ base class WebSqliteOpenFactory extends InternalOpenFactory {
/// Due to being asynchronous, the under laying CommonDatabase is not
/// accessible
Future<WebDatabase> openConnection(SqliteOpenOptions options) async {
final workers = await _initialized;
final workers = await _openedWebSqlite.sqlite;
final connection = await connectToWorker(workers, path);

final pragmaStatements = this.pragmaStatements(options);
Expand Down Expand Up @@ -116,6 +108,7 @@ base class WebSqliteOpenFactory extends InternalOpenFactory {
broadcastUpdates: broadcastUpdates,
profileQueries: sqliteOptions.profileQueries,
updates: updatesFor(connection.database),
finalizable: _openedWebSqlite.clone(),
);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/sqlite_async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: sqlite_async
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
version: 0.14.2
version: 0.14.3
resolution: workspace
repository: https://github.com/powersync-ja/sqlite_async.dart
environment:
Expand All @@ -14,7 +14,7 @@ topics:

dependencies:
sqlite3: ^3.2.0
sqlite3_web: ^0.8.0
sqlite3_web: ^0.9.0
sqlite3_connection_pool: ^0.2.4
async: ^2.10.0
collection: ^1.17.0
Expand Down
Loading