-
Notifications
You must be signed in to change notification settings - Fork 66
test: events sent during remote join handshake should not be lost #847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "net/url" | ||
| "strings" | ||
|
|
@@ -574,6 +575,131 @@ func TestSendJoinPartialStateResponse(t *testing.T) { | |
| must.HaveInOrder(t, sendJoinResp.ServersInRoom, []string{"hs1"}) | ||
| } | ||
|
|
||
| // This test verifies that events sent into a room between a /make_join and | ||
| // /send_join are not lost to the joining server. When an event is created | ||
| // during the join handshake, the join event's prev_events (set at make_join | ||
| // time) won't reference it, creating two forward extremities. The server | ||
| // handling the join should ensure the joining server can discover the missed | ||
| // event, for example by sending a follow-up event that references both | ||
| // extremities, prompting the joining server to backfill. | ||
| // | ||
| // See https://github.com/element-hq/synapse/pull/19390 | ||
| func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { | ||
| deployment := complement.Deploy(t, 1) | ||
| defer deployment.Destroy(t) | ||
|
|
||
| alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) | ||
|
|
||
| // We track the message event ID sent between make_join and send_join. | ||
| // After send_join, we wait for hs1 to send us either: | ||
| // - the message event itself, or | ||
| // - any event whose prev_events reference the message (e.g. a dummy event) | ||
| var messageEventID string | ||
| messageDiscoverableWaiter := helpers.NewWaiter() | ||
|
|
||
| srv := federation.NewServer(t, deployment, | ||
| federation.HandleKeyRequests(), | ||
| ) | ||
| srv.UnexpectedRequestsAreErrors = false | ||
|
|
||
| // Custom /send handler: the Complement server won't be in the room until | ||
| // send_join completes, so we can't use HandleTransactionRequests (which | ||
| // requires the room in srv.rooms). Instead we parse the raw transaction. | ||
| srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { | ||
|
Comment on lines
+605
to
+608
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment what we're trying to accomplish with this handler |
||
| body, _ := io.ReadAll(req.Body) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Handling the error is prudent here |
||
| txn := gjson.ParseBytes(body) | ||
| txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { | ||
| eventID := pdu.Get("event_id").String() | ||
| eventType := pdu.Get("type").String() | ||
| t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID) | ||
|
|
||
| if messageEventID == "" { | ||
| return true | ||
| } | ||
|
Comment on lines
+593
to
+618
Comment on lines
+616
to
+618
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment why we do this |
||
|
|
||
| // Check if this IS the message event (server pushed it directly). | ||
| if eventID == messageEventID { | ||
| messageDiscoverableWaiter.Finish() | ||
| return true | ||
| } | ||
|
|
||
| // Check if this event's prev_events reference the message | ||
| // (e.g. a dummy event tying the forward extremities together). | ||
| pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { | ||
| if prevEvent.String() == messageEventID { | ||
| messageDiscoverableWaiter.Finish() | ||
| return false | ||
| } | ||
| return true | ||
| }) | ||
|
Comment on lines
+626
to
+634
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment why this is good enough. I assume because people can backfill to get the message. But then this brings up the question, what if it's a prev_event further back in a chain? (we currently don't detect that as good enough) - we need to at-least explain this edge case although perhaps we should
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this is partially addressed below but we should also explain it here: |
||
|
|
||
| return true | ||
| }) | ||
| w.WriteHeader(200) | ||
| w.Write([]byte(`{"pdus":{}}`)) | ||
|
Comment on lines
+638
to
+639
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like nonsense ⏩
Comment on lines
+638
to
+639
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment why responding this way |
||
| })).Methods("PUT") | ||
|
|
||
| cancel := srv.Listen() | ||
| defer cancel() | ||
|
|
||
| // Alice creates a room on hs1. | ||
| roomID := alice.MustCreateRoom(t, map[string]interface{}{ | ||
| "preset": "public_chat", | ||
| }) | ||
|
|
||
| charlie := srv.UserID("charlie") | ||
| origin := srv.ServerName() | ||
| fedClient := srv.FederationClient(deployment) | ||
|
|
||
| // Step 1: make_join, hs1 returns a join event template whose prev_events | ||
| // reflect the current room DAG tips. | ||
| makeJoinResp, err := fedClient.MakeJoin( | ||
| context.Background(), origin, | ||
| deployment.GetFullyQualifiedHomeserverName(t, "hs1"), | ||
| roomID, charlie, | ||
| ) | ||
| must.NotError(t, "MakeJoin", err) | ||
|
|
||
| // Step 2: Alice sends a message on hs1. This advances the DAG past the | ||
| // point captured by make_join's prev_events. The Complement server is not | ||
| // yet in the room, so it won't receive this event via normal federation. | ||
| messageEventID = alice.SendEventSynced(t, roomID, b.Event{ | ||
| Type: "m.room.message", | ||
| Content: map[string]interface{}{ | ||
| "msgtype": "m.text", | ||
| "body": "Message sent between make_join and send_join", | ||
| }, | ||
| }) | ||
| t.Logf("Alice sent message %s between make_join and send_join", messageEventID) | ||
|
|
||
| // Step 3: Build and sign the join event, then send_join. | ||
| // The join event's prev_events are from step 1 (before the message), | ||
| // so persisting it on hs1 creates two forward extremities: the message | ||
| // and the join. | ||
| verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion) | ||
| must.NotError(t, "GetRoomVersion", err) | ||
| eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent) | ||
| joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv) | ||
| must.NotError(t, "Build join event", err) | ||
|
|
||
| _, err = fedClient.SendJoin( | ||
| context.Background(), origin, | ||
| deployment.GetFullyQualifiedHomeserverName(t, "hs1"), | ||
| joinEvent, | ||
| ) | ||
| must.NotError(t, "SendJoin", err) | ||
|
|
||
| // Step 4: hs1 should make the missed message discoverable to the joining | ||
| // server. We accept either receiving the message event directly, or | ||
| // receiving any event whose prev_events reference it (allowing the | ||
| // joining server to backfill). | ||
| messageDiscoverableWaiter.Waitf(t, 5*time.Second, | ||
| "Timed out waiting for message event %s to become discoverable — "+ | ||
| "the event sent between make_join and send_join was lost to the "+ | ||
| "joining server", messageEventID, | ||
| ) | ||
| } | ||
|
|
||
| // given an event JSON, return the type and state_key, joined with a "|" | ||
| func typeAndStateKeyForEvent(result gjson.Result) string { | ||
| return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which unexpected requests do we expect?
We should try to avoid this if possible. Otherwise, we should comment what kinds of requests are flying around