Skip to content

Commit

Permalink
Merge pull request #2188 from josephschorr/fix-strict-reader
Browse files Browse the repository at this point in the history
Fix the strict read proxy
  • Loading branch information
josephschorr authored Jan 3, 2025
2 parents b8c4594 + 1826d8d commit d2f7721
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
41 changes: 29 additions & 12 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -246,7 +245,6 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
ReadStrictMode(true),
))

t.Run("TestLocking", createMultiDatastoreTest(
Expand Down Expand Up @@ -338,17 +336,25 @@ func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestF
}
}

func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) {
func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf multiDatastoreTestFunc, options ...Option) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()

var replicaDS datastore.Datastore

ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newPostgresDatastore(ctx, uri, 42, options...)
ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

ds2, err := newPostgresDatastore(ctx, uri, 42, append(options, ReadStrictMode(true))...)
require.NoError(t, err)
replicaDS = ds2

return ds
})
defer ds.Close()

tf(t, ds)
tf(t, ds, replicaDS)
}
}

Expand Down Expand Up @@ -1572,44 +1578,55 @@ func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore)
require.NoError(t, err)
}

func StrictReadModeTest(t *testing.T, ds datastore.Datastore) {
func StrictReadModeTest(t *testing.T, primaryDS datastore.Datastore, replicaDS datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
// Write some relationships.
_, err := primaryDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
rtu := tuple.Touch(tuple.MustParse("resource:123#reader@user:456"))
return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{rtu})
})
require.NoError(err)

// Get the HEAD revision.
lowestRevision, err := primaryDS.HeadRevision(ctx)
require.NoError(err)

// Perform a read at the head revision, which should succeed.
reader := ds.SnapshotReader(lowestRevision)
reader := replicaDS.SnapshotReader(lowestRevision)
it, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: "resource",
})
require.NoError(err)

_, err = datastore.IteratorToSlice(it)
found, err := datastore.IteratorToSlice(it)
require.NoError(err)
require.NotEmpty(found)

// Perform a read at a manually constructed revision beyond head, which should fail.
badRev := postgresRevision{
snapshot: pgSnapshot{
// NOTE: the struct defines this value as uint64, but the underlying
// revision is defined as an int64, so we run into an overflow issue
// if we try and use a big uint64.
xmax: math.MaxInt64,
xmin: 123456789,
xmax: 123456789,
},
}

it, err = ds.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
it, err = replicaDS.SnapshotReader(badRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: "resource",
})
require.NoError(err)

_, err = datastore.IteratorToSlice(it)
found2, err := datastore.IteratorToSlice(it)
require.Error(err)
require.ErrorContains(err, "is not available on the replica")
require.ErrorAs(err, &common.RevisionUnavailableError{})
require.Nil(found2)
}

func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/strictreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ func (srqf strictReaderQueryFuncs) addAssertToSQL(sql string) string {
// argument error and a message indicating that the xid "is in the future". If the transaction
// does exist, but has not yet been committed (or aborted), the call to `pg_xact_status` will return
// "in progress". rewriteError will catch these cases and return a RevisionUnavailableError.
assertion := fmt.Sprintf(`do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$;`, srqf.revision.snapshot.xmin-1)
return assertion + sql
assertion := fmt.Sprintf(`; do $$ begin assert (select pg_xact_status(%d::text::xid8) != 'in progress'), 'replica missing revision';end;$$`, srqf.revision.snapshot.xmin-1)
return sql + assertion
}

0 comments on commit d2f7721

Please sign in to comment.