Skip to content

Commit 62d6562

Browse files
Regression test for corrupted auth chains (#790)
* Regression test for corrupted auth chains * Apply suggestions from code review Co-authored-by: Eric Eastwood <[email protected]> * Compile failures * Comments and skip on dendrite * Review comments * Send a separate sentinel event separate from the main test --------- Co-authored-by: Eric Eastwood <[email protected]>
1 parent d2da5a6 commit 62d6562

File tree

1 file changed

+316
-0
lines changed

1 file changed

+316
-0
lines changed

tests/federation_room_get_missing_events_test.go

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/gorilla/mux"
1213
"github.com/matrix-org/complement"
1314
"github.com/matrix-org/gomatrixserverlib"
1415
"github.com/matrix-org/gomatrixserverlib/fclient"
@@ -18,10 +19,12 @@ import (
1819

1920
"github.com/matrix-org/complement/b"
2021
"github.com/matrix-org/complement/client"
22+
"github.com/matrix-org/complement/ct"
2123
"github.com/matrix-org/complement/federation"
2224
"github.com/matrix-org/complement/helpers"
2325
"github.com/matrix-org/complement/match"
2426
"github.com/matrix-org/complement/must"
27+
"github.com/matrix-org/complement/runtime"
2528
)
2629

2730
// TODO:
@@ -598,3 +601,316 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) {
598601
// Alice should receive the sent event, even though the "bad" event has a too large state key
599602
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(room.RoomID, sentEvent.EventID()))
600603
}
604+
605+
// Test that if you respond to /state_ids, and fail some /event requests, we end up
606+
// with correctly persisted auth information for the event. This creates an _auth graph_ like so:
607+
//
608+
// A <- B <- C <- D <- E m.room.member,bob
609+
//
610+
// Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this:
611+
// - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events
612+
// in /state_ids means we will bias to using /event and not /state. The test needs /event.
613+
// - it sends an unrelated event to the HS with unknown prev_events.
614+
// - it returns an unrelated event for /get_missing_events.
615+
// - then /state_ids should be hit.
616+
//
617+
// When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events.
618+
// Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID.
619+
// Now the actual test can begin:
620+
// - We fail the /event request for B.
621+
// - We ensure that we do not see C,D,E in the final room state.
622+
//
623+
// This is a regression test where a HS could have code which does the following:
624+
// - Sort events topologically (A,B,C,D,E)
625+
// - for each event, check you have the auth events and then auth it.
626+
// - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected).
627+
//
628+
// This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove
629+
// the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B,
630+
// crucially D and E ARE PERSISTED because C exists in-memory.
631+
// This breaks the auth chain for the room, which matters when doing state resolution.
632+
func TestCorruptedAuthChain(t *testing.T) {
633+
// Dendrite doesn't make exactly the same requests as it seems to fallback to /event_auth.
634+
// As this is intended for a synapse bugfix, we'll skip dendrite for now.
635+
runtime.SkipIf(t, runtime.Dendrite)
636+
deployment := complement.Deploy(t, 1)
637+
defer deployment.Destroy(t)
638+
639+
srv := federation.NewServer(t, deployment,
640+
federation.HandleKeyRequests(),
641+
federation.HandleMakeSendJoinRequests(),
642+
federation.HandleTransactionRequests(nil, nil),
643+
federation.HandleInviteRequests(nil),
644+
)
645+
// We expect to be pushed events that we don't care about responding to (not relevant to the test)
646+
srv.UnexpectedRequestsAreErrors = false
647+
cancel := srv.Listen()
648+
defer cancel()
649+
650+
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "alice"})
651+
// ensure the server under test remains in the room when alice rejoins
652+
sentinel := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "sentinel"})
653+
roomID := alice.MustCreateRoom(t, map[string]interface{}{
654+
"preset": "public_chat",
655+
"room_version": "10",
656+
})
657+
sentinel.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
658+
// Pad out the room state
659+
for i := 0; i < 100; i++ {
660+
if i%2 == 0 {
661+
alice.MustLeaveRoom(t, roomID)
662+
} else {
663+
alice.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
664+
}
665+
}
666+
bob := srv.UserID("bob")
667+
defaultImpl := federation.ServerRoomImplDefault{}
668+
var existingAuthChain []gomatrixserverlib.PDU
669+
srvRoom := srv.MustJoinRoom(t, deployment, spec.ServerName("hs1"), roomID, bob, federation.WithRoomOpts(federation.WithImpl(&federation.ServerRoomImplCustom{
670+
ServerRoomImplDefault: defaultImpl,
671+
PopulateFromSendJoinResponseFn: func(def federation.ServerRoomImpl, room *federation.ServerRoom, joinEvent gomatrixserverlib.PDU, resp fclient.RespSendJoin) {
672+
defaultImpl.PopulateFromSendJoinResponse(room, joinEvent, resp)
673+
existingAuthChain = resp.AuthEvents.TrustedEvents(joinEvent.Version(), false)
674+
},
675+
})))
676+
// we should have at least 100 events in the auth chain
677+
if len(existingAuthChain) < 100 {
678+
ct.Fatalf(t, "not enough events in the auth chain, got %d want >100", len(existingAuthChain))
679+
}
680+
createEvent := srvRoom.CurrentState(spec.MRoomCreate, "")
681+
plEvent := srvRoom.CurrentState(spec.MRoomPowerLevels, "")
682+
jrEvent := srvRoom.CurrentState(spec.MRoomJoinRules, "")
683+
bobOriginalJoinEvent := srvRoom.CurrentState(spec.MRoomMember, bob)
684+
685+
// Create A,B,C,D,E which will be profile changes for Bob (where each event is dependent on the next)
686+
eventA := srv.MustCreateEvent(t, srvRoom, federation.Event{
687+
Type: spec.MRoomMember,
688+
Sender: bob,
689+
StateKey: &bob,
690+
Content: map[string]interface{}{
691+
"membership": "join",
692+
"displayname": "A",
693+
},
694+
})
695+
eventB := srv.MustCreateEvent(t, srvRoom, federation.Event{
696+
Type: spec.MRoomMember,
697+
Sender: bob,
698+
StateKey: &bob,
699+
Content: map[string]interface{}{
700+
"membership": "join",
701+
"displayname": "B",
702+
},
703+
PrevEvents: []string{eventA.EventID()},
704+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventA.EventID()},
705+
})
706+
eventC := srv.MustCreateEvent(t, srvRoom, federation.Event{
707+
Type: spec.MRoomMember,
708+
Sender: bob,
709+
StateKey: &bob,
710+
Content: map[string]interface{}{
711+
"membership": "join",
712+
"displayname": "C",
713+
},
714+
PrevEvents: []string{eventB.EventID()},
715+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventB.EventID()},
716+
})
717+
eventD := srv.MustCreateEvent(t, srvRoom, federation.Event{
718+
Type: spec.MRoomMember,
719+
Sender: bob,
720+
StateKey: &bob,
721+
Content: map[string]interface{}{
722+
"membership": "join",
723+
"displayname": "D",
724+
},
725+
PrevEvents: []string{eventC.EventID()},
726+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventC.EventID()},
727+
})
728+
eventE := srv.MustCreateEvent(t, srvRoom, federation.Event{
729+
Type: spec.MRoomMember,
730+
Sender: bob,
731+
StateKey: &bob,
732+
Content: map[string]interface{}{
733+
"membership": "join",
734+
"displayname": "E",
735+
},
736+
PrevEvents: []string{eventD.EventID()},
737+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventD.EventID()},
738+
})
739+
// We include this in auth_events for subsequent events below.
740+
srvRoom.AddEvent(eventE)
741+
742+
// Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot)
743+
stateIDsEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
744+
Type: "m.room.message",
745+
Sender: bob,
746+
Content: map[string]interface{}{
747+
"msgtype": "m.text",
748+
"body": "for /state_ids",
749+
},
750+
PrevEvents: []string{eventE.EventID()},
751+
})
752+
srvRoom.AddEvent(stateIDsEvent)
753+
gmeEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
754+
Type: "m.room.message",
755+
Sender: bob,
756+
Content: map[string]interface{}{
757+
"msgtype": "m.text",
758+
"body": "for /get_missing_events",
759+
},
760+
PrevEvents: []string{stateIDsEvent.EventID()},
761+
})
762+
srvRoom.AddEvent(gmeEvent)
763+
sendTxnEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
764+
Type: "m.room.message",
765+
Sender: bob,
766+
Content: map[string]interface{}{
767+
"msgtype": "m.text",
768+
"body": "for /send",
769+
},
770+
PrevEvents: []string{gmeEvent.EventID()},
771+
})
772+
srvRoom.AddEvent(sendTxnEvent)
773+
774+
// the possible events to return in /event. This omits B.
775+
allEventsToShare := []gomatrixserverlib.PDU{
776+
stateIDsEvent, gmeEvent, sendTxnEvent, eventA, eventC, eventD, eventE,
777+
}
778+
t.Logf("event A: %s", eventA.EventID())
779+
t.Logf("event B: %s", eventB.EventID())
780+
t.Logf("event C: %s", eventC.EventID())
781+
t.Logf("event D: %s", eventD.EventID())
782+
t.Logf("event E: %s", eventE.EventID())
783+
t.Logf("event for /state_ids: %s", stateIDsEvent.EventID())
784+
t.Logf("event for /get_missing_events: %s", gmeEvent.EventID())
785+
t.Logf("event for /send: %s", sendTxnEvent.EventID())
786+
787+
// add handlers for them
788+
gmeWaiter := helpers.NewWaiter()
789+
// We will send 'sendTxnEvent' via /send. The homeserver will see the event has unknown prev_events and hit /get_missing_events
790+
srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) {
791+
defer gmeWaiter.Finish()
792+
body := must.ParseJSON(t, req.Body)
793+
t.Logf("/get_missing_events req for room %s => %s", mux.Vars(req)["roomID"], body.Raw)
794+
must.Equal(t, body.Get("latest_events").Array()[0].String(), sendTxnEvent.EventID(), "unexpected event provided to /get_missing_events")
795+
w.WriteHeader(200)
796+
res := struct {
797+
Events []gomatrixserverlib.PDU `json:"events"`
798+
}{
799+
Events: []gomatrixserverlib.PDU{gmeEvent},
800+
}
801+
t.Logf("/get_missing_events req for room %s responding with %s in room %s", mux.Vars(req)["roomID"], res.Events[0].EventID(), res.Events[0].RoomID())
802+
var responseBytes []byte
803+
responseBytes, err := json.Marshal(&res)
804+
must.NotError(t, "failed to marshal response", err)
805+
w.Write(responseBytes)
806+
})
807+
stateIDWaiter := helpers.NewWaiter()
808+
// The homeserver won't be able to link up the events returned via /get_missing_events to what it previously knew, so it will
809+
// ask for a state snapshot via /state_ids.
810+
srv.Mux().HandleFunc("/_matrix/federation/v1/state_ids/{roomID}", func(w http.ResponseWriter, req *http.Request) {
811+
defer stateIDWaiter.Finish()
812+
t.Logf("/state_ids req for room %s => %s", mux.Vars(req)["roomID"], req.URL.Query().Encode())
813+
reqEventID := req.URL.Query().Get("event_id")
814+
must.Equal(t, reqEventID, stateIDsEvent.EventID(), "unexpected event provided to /state_ids")
815+
w.WriteHeader(200)
816+
817+
var authChainIDs []string
818+
for _, ev := range existingAuthChain {
819+
authChainIDs = append(authChainIDs, ev.EventID())
820+
}
821+
// include A,B,C,D
822+
authChainIDs = append(authChainIDs, eventA.EventID(), eventB.EventID(), eventC.EventID(), eventD.EventID())
823+
// the current state is the same as before but with E as the member event for bob
824+
var pduIDs []string
825+
for _, ev := range srvRoom.AllCurrentState() {
826+
if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(bob) {
827+
continue
828+
}
829+
pduIDs = append(pduIDs, ev.EventID())
830+
}
831+
pduIDs = append(pduIDs, eventE.EventID())
832+
res := struct {
833+
AuthChainIDs []string `json:"auth_chain_ids"`
834+
PDUIDs []string `json:"pdu_ids"`
835+
}{
836+
AuthChainIDs: authChainIDs,
837+
PDUIDs: pduIDs,
838+
}
839+
var responseBytes []byte
840+
responseBytes, err := json.Marshal(&res)
841+
must.NotError(t, "failed to marshal response", err)
842+
w.Write(responseBytes)
843+
})
844+
eventBWaiter := helpers.NewWaiter()
845+
// /state_ids will return some unknown events which the homeserver will try to fetch via /event
846+
srv.Mux().Handle("/_matrix/federation/v1/event/{eventID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
847+
vars := mux.Vars(req)
848+
eventID := vars["eventID"]
849+
var event gomatrixserverlib.PDU
850+
// find the event
851+
for _, ev := range allEventsToShare {
852+
if ev.EventID() == eventID {
853+
event = ev
854+
break
855+
}
856+
}
857+
// we should see a request for event B
858+
if eventID == eventB.EventID() {
859+
eventBWaiter.Finish()
860+
}
861+
862+
if event == nil {
863+
t.Logf("/event returning 404 for event %v", eventID)
864+
w.WriteHeader(404)
865+
w.Write([]byte(fmt.Sprintf(`complement: failed to find event: %s`, eventID)))
866+
return
867+
}
868+
869+
txn := gomatrixserverlib.Transaction{
870+
Origin: spec.ServerName(srv.ServerName()),
871+
OriginServerTS: spec.AsTimestamp(time.Now()),
872+
PDUs: []json.RawMessage{
873+
event.JSON(),
874+
},
875+
}
876+
resp, err := json.Marshal(txn)
877+
if err != nil {
878+
w.WriteHeader(500)
879+
w.Write([]byte(fmt.Sprintf(`complement: failed to marshal JSON response: %s`, err)))
880+
return
881+
}
882+
w.WriteHeader(200)
883+
w.Write(resp)
884+
}))
885+
886+
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sendTxnEvent.JSON()}, nil)
887+
888+
// wait for the server to make the requests
889+
gmeWaiter.Wait(t, 5*time.Second)
890+
stateIDWaiter.Wait(t, 5*time.Second)
891+
eventBWaiter.Wait(t, 5*time.Second)
892+
893+
// At this point all we know is that the server requested event B when doing /state_ids.
894+
// We don't know if sendTxnEvent has been fully processed / the room state has been updated.
895+
// If the server is functioning correctly, sendTxnEvent will never be delivered to the client
896+
// as the server will be unable to fetch room state for it. So send another event as a sentinel.
897+
// Wait until we see sendTxnEvent in the sync timeline before asserting that the room state is correct.
898+
sentinelEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
899+
Type: "m.room.message",
900+
Sender: bob,
901+
Content: map[string]interface{}{
902+
"msgtype": "m.text",
903+
"body": "finished",
904+
},
905+
PrevEvents: []string{bobOriginalJoinEvent.EventID()},
906+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), bobOriginalJoinEvent.EventID()},
907+
})
908+
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sentinelEvent.JSON()}, nil)
909+
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, sentinelEvent.EventID()))
910+
911+
// we should not see event E as the current state for bob.
912+
content := alice.MustGetStateEventContent(t, roomID, spec.MRoomMember, bob)
913+
t.Logf("bob's membership content: %v", content.Raw)
914+
// assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid.
915+
must.Equal(t, content.Get("displayname").Str, "", "Events C/D/E were processed when they should not have been as the server doesn't know B.")
916+
}

0 commit comments

Comments
 (0)