Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dart/lib/leancode_pipe/pipe_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class PipeClient {
_hubConnection.state,
);

Comment thread
Komoszek marked this conversation as resolved.
Stream<PipeConnectionState> get connectionStateStream =>
_hubConnection.connectionStateStream
.map(PipeConnectionStateMapper.fromHubConnectionState);

Future<void> connect() async {
if (connectionState != PipeConnectionState.disconnected) {
_logger.warning(
Expand Down Expand Up @@ -558,7 +562,7 @@ class PipeClient {

Future<void> dispose() async {
await Future.wait(_registeredTopicSubscriptions.map((e) => e.close()));
await _hubConnection.stop();
await _hubConnection.dispose();
}

Future<R> _sendPipeServiceMethod<R extends Object>({
Expand Down
30 changes: 23 additions & 7 deletions dart/lib/signalr_core/src/hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class HubConnection {

HubConnectionState? _connectionState;
late bool _connectionStarted;
late StreamController<HubConnectionState> _connectionStateStreamController;
Comment thread
Komoszek marked this conversation as resolved.
Outdated
Future<void>? _startFuture;
Future<void>? _stopFuture;

Expand Down Expand Up @@ -93,6 +94,7 @@ class HubConnection {
_invocationId = 0;
_receivedHandshakeResponse = false;
_connectionState = HubConnectionState.disconnected;
_connectionStateStreamController = StreamController.broadcast();
_connectionStarted = false;
}

Expand All @@ -113,6 +115,15 @@ class HubConnection {
/// Indicates the state of the {@link HubConnection} to the server.
HubConnectionState? get state => _connectionState;

Comment thread
Komoszek marked this conversation as resolved.
/// Stream of the state changes of the {@link HubConnection} to the server.
Stream<HubConnectionState> get connectionStateStream =>
_connectionStateStreamController.stream;

void _updateConnectionState(HubConnectionState state) {
_connectionState = state;
_connectionStateStreamController.add(state);
Comment thread
PiotrRogulski marked this conversation as resolved.
Outdated
}

/// Represents the connection id of the [HubConnection] on the server. The
/// connection id will be null when the connection is either
/// in the disconnected state or if the negotiation step was skipped.
Expand Down Expand Up @@ -154,17 +165,17 @@ class HubConnection {
));
}

_connectionState = HubConnectionState.connecting;
_updateConnectionState(HubConnectionState.connecting);
_logger!(LogLevel.debug, 'Starting HubConnection.');

try {
await _startInternal();

_connectionState = HubConnectionState.connected;
_updateConnectionState(HubConnectionState.connected);
_connectionStarted = true;
_logger(LogLevel.debug, 'HubConnection connected successfully.');
} catch (e) {
_connectionState = HubConnectionState.disconnected;
_updateConnectionState(HubConnectionState.disconnected);
_logger(
LogLevel.debug,
'HubConnection failed to start successfully because of error '
Expand Down Expand Up @@ -241,6 +252,11 @@ class HubConnection {
}
}

Future<void> dispose() async {
await _connectionStateStreamController.close();
await stop();
}

/// Stops the connection.
Future<void> stop() async {
// Capture the start future before the connection might be restarted in an
Expand Down Expand Up @@ -278,7 +294,7 @@ class HubConnection {
return _stopFuture;
}

_connectionState = HubConnectionState.disconnecting;
_updateConnectionState(HubConnectionState.disconnecting);

_logger!(LogLevel.debug, 'Stopping HubConnection');

Expand Down Expand Up @@ -395,7 +411,7 @@ class HubConnection {

void _completeClose({Exception? exception}) {
if (_connectionStarted) {
_connectionState = HubConnectionState.disconnected;
_updateConnectionState(HubConnectionState.disconnected);
_connectionStarted = false;

try {
Expand Down Expand Up @@ -435,7 +451,7 @@ class HubConnection {
return;
}

_connectionState = HubConnectionState.reconnecting;
_updateConnectionState(HubConnectionState.reconnecting);

if (exception != null) {
_logger!(
Expand Down Expand Up @@ -497,7 +513,7 @@ class HubConnection {
try {
await _startInternal();

_connectionState = HubConnectionState.connected;
_updateConnectionState(HubConnectionState.connected);
_logger(
LogLevel.information, 'HubConnection reconnected successfully.');

Expand Down