From 3133b5ae078a29deccf252ed9e3990d50e0d6310 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 28 Apr 2026 10:47:46 -0500 Subject: [PATCH 1/7] Fix state not showing up fast enough with `TestDelayedEvents` (#865) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to https://github.com/matrix-org/complement/pull/830 As experienced in https://github.com/element-hq/synapse-rust-apps/actions/runs/24910122124/job/72949760158?pr=360 (https://github.com/element-hq/synapse-rust-apps/pull/360) ``` ❌ TestDelayedEvents/delayed_state_events_are_kept_on_server_restart (10.12s) delayed_event_test.go:425: StopServer hs1 delayed_event_test.go:429: StartServer hs1 delayed_event_test.go:443: CSAPI.MustDo GET http://127.0.0.1:32978/_matrix/client/v3/rooms/%21MbDncghrqxTzEmQhCP:hs1/state/com.example.test/1 returned non-2xx code: 404 Not Found - body: {"errcode":"M_NOT_FOUND","error":"Event not found."} ``` This is most likely caused because the main process handles processing delayed events (and serving `/state`) but the state will be persisted on one of event_persister workers so the main process might be serving stale data until the invalidation comes through. --- tests/msc4140/delayed_event_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index 4384411b..7bf7534a 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -371,6 +371,11 @@ func TestDelayedEvents(t *testing.T) { numberOfDelayedEvents := 0 + // Start a sync loop (initial sync) + since := user.MustSyncUntil( + t, client.SyncReq{}, + ) + // Send an initial delayed event that will be ready to send as soon as the server // comes back up. user.MustDo( @@ -440,7 +445,9 @@ func TestDelayedEvents(t *testing.T) { remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) // Sanity check that the room state was updated correctly with the delayed events // that were sent. - user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey1)) + since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey1 + })) // Wait until we see another delayed event being sent (ensure things resumed and are continuing). time.Sleep(10 * time.Second) @@ -452,7 +459,9 @@ func TestDelayedEvents(t *testing.T) { // FIXME: Ideally, we'd check specifically for the last one that was sent but it // will be a bit of a juggle and fiddly to get this right so for now we just check // one. - user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey2)) + since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey2 + })) }) } From b2bb3494e159cda19e96e0de938235b54f81a430 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 30 Apr 2026 16:50:56 -0500 Subject: [PATCH 2/7] Fix state lookup assertion Traditional `/sync` expects you to use `state` as the base and layer state from the `timeline` on top (flawed because of state res). So we have to check in the `timeline` instead of `state` for the update. We can also consider this good here because we don't expect any state to be rejected because of state res issues. --- client/sync.go | 9 +++++---- tests/msc4140/delayed_event_test.go | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/client/sync.go b/client/sync.go index 701f73af..233313d4 100644 --- a/client/sync.go +++ b/client/sync.go @@ -215,10 +215,11 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt { }) } -// Check that the state section for `roomID` has an event which passes the check function. -// Note that the state section of a sync response only contains the change in state up to the start -// of the timeline and will not contain the entire state of the room for incremental or -// `lazy_load_members` syncs. +// Check that the `state` section for `roomID` has an event which passes the check function. +// +// Note that the `state` section of a sync response only contains the change in state up +// to the start of the `timeline` and will not contain the entire state of the room for +// incremental or `lazy_load_members` syncs. func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { err := checkArrayElements( diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index 7bf7534a..7420fb95 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -445,7 +445,7 @@ func TestDelayedEvents(t *testing.T) { remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) // Sanity check that the room state was updated correctly with the delayed events // that were sent. - since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey1 })) @@ -459,7 +459,7 @@ func TestDelayedEvents(t *testing.T) { // FIXME: Ideally, we'd check specifically for the last one that was sent but it // will be a bit of a juggle and fiddly to get this right so for now we just check // one. - since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey2 })) }) From 3b19fdaf8faa578d001d7af2ca2cc6924b1876d1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 30 Apr 2026 17:08:27 -0500 Subject: [PATCH 3/7] Use `SyncStateHas` which better illustrates what we want to do Use a sync filter so `timeline` doesn't mess with us Don't use incremental sync because no need --- tests/msc4140/delayed_event_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index 7420fb95..d5789ab1 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -30,6 +30,16 @@ const ( DelayedEventActionSend = "send" ) +// A filter for `/sync` that excludes timeline events. +// +// This is useful if you want to see `state` in the `/sync` response without the pesky +// de-duplication with `timeline` that traditional `/sync` does. +const NoTimelineSyncFilter = `{ + "room": { + "timeline": { "limit": 0 } + } +}` + // TODO: Test pagination of `GET /_matrix/client/v1/delayed_events` once // it is implemented in a homeserver. @@ -371,11 +381,6 @@ func TestDelayedEvents(t *testing.T) { numberOfDelayedEvents := 0 - // Start a sync loop (initial sync) - since := user.MustSyncUntil( - t, client.SyncReq{}, - ) - // Send an initial delayed event that will be ready to send as soon as the server // comes back up. user.MustDo( @@ -445,7 +450,7 @@ func TestDelayedEvents(t *testing.T) { remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) // Sanity check that the room state was updated correctly with the delayed events // that were sent. - since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { + user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey1 })) @@ -459,7 +464,7 @@ func TestDelayedEvents(t *testing.T) { // FIXME: Ideally, we'd check specifically for the last one that was sent but it // will be a bit of a juggle and fiddly to get this right so for now we just check // one. - since = user.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { + user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey2 })) }) From f4be0998c2983807a39df8d30f9b4e6034a7e82a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 30 Apr 2026 17:17:57 -0500 Subject: [PATCH 4/7] Clean up another test --- tests/msc4140/delayed_event_test.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index d5789ab1..31d8859a 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -149,6 +149,7 @@ func TestDelayedEvents(t *testing.T) { stateKey := "to_send_on_timeout" + // Schedule a delayed event setterKey := "setter" setterExpected := "on_timeout" user.MustDo( @@ -161,8 +162,11 @@ func TestDelayedEvents(t *testing.T) { getDelayQueryParam("900"), ) + // Ensure that a delayed event is now scheduled matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) - + // And includes the correct content + // + // FIXME: This assertion seems superfluous to this test and should be it's own test res = getDelayedEvents(t, user) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ @@ -178,19 +182,29 @@ func TestDelayedEvents(t *testing.T) { }), }, }) + + // Sanity check that the room state hasn't changed yet res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) + // Wait one second which will cause the delayed state event to be sent time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) + + // Check for the state change from the delayed state event + user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey + })) + // Make sure the state looks as expected after res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ match.JSONKeyEqual(setterKey, setterExpected), }, }) + // No more delayed events + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) }) t.Run("cannot update a delayed event without an action", func(t *testing.T) { From ebc611c2fc41068e68fb62f8838a8dc540b5e612 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 30 Apr 2026 17:33:30 -0500 Subject: [PATCH 5/7] Update all state tests --- tests/msc4140/delayed_event_test.go | 64 ++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index 31d8859a..f8386bf7 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -192,7 +192,8 @@ func TestDelayedEvents(t *testing.T) { // Wait one second which will cause the delayed state event to be sent time.Sleep(1 * time.Second) - // Check for the state change from the delayed state event + // Check for the state change from the delayed state event (using `MustSyncUntil` to + // account for any processing or worker replication delays) user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey })) @@ -256,6 +257,7 @@ func TestDelayedEvents(t *testing.T) { stateKey := "to_never_send" + // Schedule a delayed event setterKey := "setter" setterExpected := "none" res = user.MustDo( @@ -269,22 +271,33 @@ func TestDelayedEvents(t *testing.T) { ) delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") + // Wait a bit but not long enough for the delayed state event to be sent time.Sleep(1 * time.Second) + // We should still see the scheduled delayed event (hasn't been sent yet) matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) + + // Sanity check that the room state hasn't changed res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) + // Cancel the delayed event unauthedClient.MustDo( t, "POST", getPathForUpdateDelayedEvent(delayID, DelayedEventActionCancel), client.WithJSONBody(t, map[string]interface{}{}), ) + // No more delayed events matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) + // Sanity check that the previously scheduled delayed event doesn't end up being sent anyway + // + // Wait another second which would cause the previously scheduled delayed to be sent + // as we've waited a total of 2s now (> 1.5s delay) time.Sleep(1 * time.Second) + // Sanity check that the room state hasn't changed res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, @@ -298,6 +311,7 @@ func TestDelayedEvents(t *testing.T) { stateKey := "to_send_on_request" + // Schedule a delayed event setterKey := "setter" setterExpected := "on_send" res = user.MustDo( @@ -311,26 +325,39 @@ func TestDelayedEvents(t *testing.T) { ) delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") + // Wait a bit but not long enough for the delayed state event to be sent time.Sleep(1 * time.Second) + // We should still see the scheduled delayed event (hasn't been sent yet) matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) + + // Sanity check that the room state hasn't changed yet res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) + // Force the delayed event to be sent immediately unauthedClient.MustDo( t, "POST", getPathForUpdateDelayedEvent(delayID, DelayedEventActionSend), client.WithJSONBody(t, map[string]interface{}{}), ) - matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) - res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) + + // Check for the state change from the delayed state event (using `MustSyncUntil` to + // account for any processing or worker replication delays) + user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey + })) + // Make sure the state looks as expected after + res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ match.JSONKeyEqual(setterKey, setterExpected), }, }) + // No more delayed events + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) }) t.Run("delayed state events can be restarted", func(t *testing.T) { @@ -340,6 +367,7 @@ func TestDelayedEvents(t *testing.T) { defer cleanupDelayedEvents(t, user) + // Schedule a delayed event setterKey := "setter" setterExpected := "on_timeout" res = user.MustDo( @@ -353,13 +381,18 @@ func TestDelayedEvents(t *testing.T) { ) delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") + // Wait a bit but not long enough for the delayed state event to be sent time.Sleep(1 * time.Second) + // We should still see the scheduled delayed event (hasn't been sent yet) matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) + + // Sanity check that the room state hasn't changed yet res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) + // Restart the timer on the delayed event unauthedClient.MustDo( t, "POST", @@ -367,21 +400,37 @@ func TestDelayedEvents(t *testing.T) { client.WithJSONBody(t, map[string]interface{}{}), ) + // Wait a bit but not long enough for the delayed state event to be sent time.Sleep(1 * time.Second) + // We should still see the scheduled delayed event (hasn't been sent yet) matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) + + // Sanity check that the room state hasn't changed yet res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) + // Wait one second which will cause the delayed state event to be sent time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) + + // Wait one second which will cause the delayed state event to be sent + time.Sleep(1 * time.Second) + + // Check for the state change from the delayed state event (using `MustSyncUntil` to + // account for any processing or worker replication delays) + user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey + })) + // Make sure the state looks as expected after res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ match.JSONKeyEqual(setterKey, setterExpected), }, }) + // No more delayed events + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) }) t.Run("delayed state events are kept on server restart", func(t *testing.T) { @@ -463,7 +512,8 @@ func TestDelayedEvents(t *testing.T) { // Capture whatever number of delayed events are remaining after the server restart. remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) // Sanity check that the room state was updated correctly with the delayed events - // that were sent. + // that were sent. (using `MustSyncUntil` to account for any processing or worker + // replication delays) user.MustSyncUntil(t, client.SyncReq{Filter: NoTimelineSyncFilter}, client.SyncStateHas(roomID, func(ev gjson.Result) bool { return ev.Get("type").Str == eventType && ev.Get("state_key").Str == stateKey1 })) @@ -474,6 +524,8 @@ func TestDelayedEvents(t *testing.T) { delayedEventsNumberLessThan(remainingDelayedEventCount), ) // Sanity check that the other delayed events also updated the room state correctly. + // (using `MustSyncUntil` to account for any processing or worker replication + // delays) // // FIXME: Ideally, we'd check specifically for the last one that was sent but it // will be a bit of a juggle and fiddly to get this right so for now we just check @@ -605,6 +657,8 @@ func matchDelayedEvents(t *testing.T, user *client.CSAPI, checks ...delayedEvent ) } +// FIXME: Instead of using `cleanupDelayedEvents`, each test should just use their own +// room func cleanupDelayedEvents(t *testing.T, user *client.CSAPI) { t.Helper() res := getDelayedEvents(t, user) From b4cb936dc697be325ef2941b791de0ab52ad6209 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 30 Apr 2026 17:36:40 -0500 Subject: [PATCH 6/7] Add final fixme for the big blob --- tests/msc4140/delayed_event_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index f8386bf7..b8c3a78f 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -72,6 +72,7 @@ func TestDelayedEvents(t *testing.T) { }) }) + // FIXME: Too much mixing of tests that should be more independent t.Run("delayed message events are sent on timeout", func(t *testing.T) { var res *http.Response var countExpected uint64 From 90d94e8644aa2703e760fc8ba663c0d5c8d9eee9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 5 May 2026 08:50:07 -0500 Subject: [PATCH 7/7] Remove duplicate sleep --- tests/msc4140/delayed_event_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index b8c3a78f..03557d88 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -412,9 +412,6 @@ func TestDelayedEvents(t *testing.T) { StatusCode: 404, }) - // Wait one second which will cause the delayed state event to be sent - time.Sleep(1 * time.Second) - // Wait one second which will cause the delayed state event to be sent time.Sleep(1 * time.Second)