Skip to content

Commit 47f8fd6

Browse files
authored
test(scho oss): add smigrating checks for new connections (#3186)
1 parent d784748 commit 47f8fd6

2 files changed

Lines changed: 86 additions & 3 deletions

File tree

packages/client/lib/tests/test-scenario/smart-client-handoffs-oss.e2e.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import diagnostics_channel from "node:diagnostics_channel";
3131

3232
import testUtils from "../../test-utils";
3333
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
34-
import { FaultInjectorClient, ActionTrigger, ActionType, ActionRequest } from "@redis/test-utils/lib/fault-injector";
34+
import { FaultInjectorClient, ActionTrigger, ActionRequest } from "@redis/test-utils/lib/fault-injector";
3535
import { REClusterTestOptions } from "@redis/test-utils";
36-
import { blockCommand, filterTriggersByArgs } from "./test-scenario.util";
36+
import { blockCommand, newConnectionReceivedSmigraging, filterTriggersByArgs } from "./test-scenario.util";
3737

3838
type TestOptions = REClusterTestOptions<{}, {}, {}, 3, {}>
3939

@@ -167,9 +167,12 @@ const KEYS = [
167167
"should not have received any notifications yet"
168168
);
169169

170+
const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);
170171

171172
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);
172173

174+
assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');
175+
173176
// Verify notifications were received
174177
const sMigratingEventCount = diagnosticEvents.filter(
175178
(event) => event.type === "SMIGRATING"
@@ -460,9 +463,13 @@ const KEYS = [
460463
"should not have received any notifications yet"
461464
);
462465

466+
const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);
467+
463468
// Trigger migration
464469
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);
465470

471+
assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');
472+
466473
// Verify notifications were received
467474
const sMigratingEventCount = diagnosticEvents.filter(
468475
(event) => event.type === "SMIGRATING"
@@ -752,9 +759,15 @@ const KEYS = [
752759
"should not have received any notifications yet"
753760
);
754761

762+
763+
const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);
764+
755765
// Trigger migration
756766
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);
757767

768+
assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');
769+
770+
758771
// Verify notifications were received
759772
const sMigratingEventCount = diagnosticEvents.filter(
760773
(event) => event.type === "SMIGRATING"
@@ -1044,9 +1057,13 @@ const KEYS = [
10441057
"should not have received any notifications yet"
10451058
);
10461059

1060+
const newConnReceivedSmigratingPromise = newConnectionReceivedSmigraging(cluster);
1061+
10471062
// Trigger migration
10481063
await faultInjectorClient.triggerAction(ACTION, ACTION_OPTIONS);
10491064

1065+
assert.ok(await newConnReceivedSmigratingPromise, 'Did not receive SMIGRATING on new connection');
1066+
10501067
// Verify notifications were received
10511068
const sMigratingEventCount = diagnosticEvents.filter(
10521069
(event) => event.type === "SMIGRATING"

packages/client/lib/tests/test-scenario/test-scenario.util.ts

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { readFileSync } from "fs";
2-
import { createClient, RedisClientOptions } from "../../..";
2+
import { createClient, RedisClientOptions } from "../../.."
33
import { stub } from "sinon";
44
import { ActionTrigger } from "@redis/test-utils/lib/fault-injector";
5+
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
6+
import diagnostics_channel from "node:diagnostics_channel";
7+
import { setTimeout } from "timers/promises";
8+
import { RedisClusterType } from "../../cluster";
59

610
type DatabaseEndpoint = {
711
addr: string[];
@@ -233,3 +237,65 @@ export function filterTriggersByArgs(
233237
slotShuffleTriggers: filterTriggers(result.slotShuffleTriggers),
234238
};
235239
}
240+
241+
242+
/**
243+
* Waits for SMIGRATING on all existing master connections, then opens one new
244+
* standalone connection and checks if it also receives SMIGRATING.
245+
*
246+
* Assumptions/limitations:
247+
* - Uses a fixed timeout; resolves false on timeout regardless of progress.
248+
* - Assumes one connection per master (no replicas, no pubsub/sharded pubsub).
249+
* - Assumes the new connection observes SMIGRATING within a small wait window.
250+
*/
251+
export function newConnectionReceivedSmigraging(cluster: RedisClusterType<{}, {}, {}, 3, {}>, timeout = 500) {
252+
const mastersCount = cluster.masters.length;
253+
let received = 0;
254+
let settled = false;
255+
let resolve: (value: boolean) => void;
256+
257+
const finish = (value: boolean) => {
258+
if (settled) return;
259+
settled = true;
260+
diagnostics_channel.unsubscribe("redis.maintenance", onEvent);
261+
resolve(value);
262+
};
263+
264+
const onEvent = async (message: unknown) => {
265+
const event = message as DiagnosticsEvent;
266+
if (event.type !== "SMIGRATING") return;
267+
268+
received++;
269+
if (received !== mastersCount) return;
270+
271+
const [host, port] = cluster.masters[0].address.split(":");
272+
const username = cluster.masters[0].client?.options.username;
273+
const password = cluster.masters[0].client?.options.password;
274+
const client = createClient({
275+
socket: { host, port: Number(port) },
276+
username,
277+
password,
278+
RESP: 3,
279+
});
280+
281+
try {
282+
await client.connect();
283+
} catch {
284+
finish(false);
285+
return;
286+
}
287+
288+
await setTimeout(50);
289+
await client.close().catch(() => {});
290+
finish(received === mastersCount + 1);
291+
};
292+
293+
const promise = new Promise<boolean>((res) => {
294+
resolve = res;
295+
});
296+
297+
diagnostics_channel.subscribe("redis.maintenance", onEvent);
298+
setTimeout(timeout, () => finish(false));
299+
300+
return promise;
301+
}

0 commit comments

Comments
 (0)