Skip to content

Commit 4ce4163

Browse files
Merge pull request #81 from leancodepl/fix/fix-resubscribing
Fix mobile resubscribing
2 parents f52b3a2 + 46adfa4 commit 4ce4163

2 files changed

Lines changed: 67 additions & 1 deletion

File tree

dart/lib/leancode_pipe/pipe_client.dart

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class PipeClient {
8989
for (final topicSub in _registeredTopicSubscriptions)
9090
topicSub.close(),
9191
]);
92+
_registeredTopicSubscriptions.clear();
9293
onClose?.call(error);
9394
})
9495
..on('subscriptionResult', _onSubscriptionResult)
@@ -120,9 +121,15 @@ class PipeClient {
120121
Topic<N> topic, {
121122
void Function()? onReconnect,
122123
}) async {
123-
final thisTopicSubscription =
124+
var thisTopicSubscription =
124125
_registeredTopicSubscriptions.firstWhereOrNull((e) => e.topic == topic);
125126

127+
// Defensive check: if subscription is closed, remove it and treat as non-existent
128+
if (thisTopicSubscription != null && thisTopicSubscription.isClosed) {
129+
_registeredTopicSubscriptions.remove(thisTopicSubscription);
130+
thisTopicSubscription = null;
131+
}
132+
126133
if (thisTopicSubscription != null) {
127134
final state = thisTopicSubscription.stateSubject.value;
128135
switch (state) {

dart/test/leancode_pipe/lean_pipe_client_connection_test.dart

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,5 +168,64 @@ void main() {
168168
expect(isSubscriptionCanceled, true);
169169
},
170170
);
171+
172+
test(
173+
'PipeClient allows subscribing to a topic after disconnect and reconnect',
174+
() async {
175+
// Prepare general config
176+
prepareConnect(connection);
177+
prepareTopic(topic);
178+
when(() => connection.state)
179+
.thenAnswer((_) => HubConnectionState.disconnected);
180+
181+
// Connect to catch on subscription result method
182+
final onSubscriptionResultMethod = await captureOnSubscriptionResult(
183+
connect: client.connect,
184+
connection: connection,
185+
);
186+
187+
// Prepare connection to simulate backend sending subscription confirmation
188+
prepareSubscribeToAnswerWithData(
189+
connection: connection,
190+
uuid: uuid,
191+
answerCallback: (subscribeResult) =>
192+
onSubscriptionResultMethod([subscribeResult.toJson()]),
193+
);
194+
195+
// First subscription
196+
final subscription1 = await client.subscribe(topic);
197+
verifySubscribeCalled(connection);
198+
expect(subscription1, isA<PipeSubscription>());
199+
200+
// Simulate connected state before disconnect
201+
when(() => connection.state)
202+
.thenAnswer((_) => HubConnectionState.connected);
203+
204+
// Capture and trigger onClose
205+
final onClose = await captureOnClose(
206+
connect: client.connect,
207+
connection: connection,
208+
);
209+
when(() => connection.stop()).thenAnswer((_) async {
210+
await onClose(null);
211+
});
212+
213+
// Disconnect
214+
await client.disconnect();
215+
verify(() => connection.stop()).called(1);
216+
217+
// Simulate disconnected state after disconnect
218+
when(() => connection.state)
219+
.thenAnswer((_) => HubConnectionState.disconnected);
220+
221+
// Reconnect
222+
await client.connect();
223+
224+
// Subscribe again to the same topic - this should work
225+
final subscription2 = await client.subscribe(topic);
226+
verifySubscribeCalled(connection);
227+
expect(subscription2, isA<PipeSubscription>());
228+
},
229+
);
171230
});
172231
}

0 commit comments

Comments
 (0)