Skip to content

Commit e468f39

Browse files
authored
feat: Implement ReadN (#276)
* add readN * better implementation * more consistent * add test * add NextN test on cdciterator * fix lint * remove Read * fix typo * add combined iterator test * remove Next() * fetch until there's 0 records left * cleaner * better return an empty slice * better return a slice * more * and more * allocate based on `n` * allocate * no need The SDK checks this * save allocation * return nil insteaed * share context * use RecvTimeout to avoid unnecessary timeouts check errors * Revert "use RecvTimeout to avoid unnecessary timeouts" This reverts commit ad6f32e.
1 parent fbd882a commit e468f39

File tree

8 files changed

+454
-154
lines changed

8 files changed

+454
-154
lines changed

source.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
119119
return nil
120120
}
121121

122-
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
123-
return s.iterator.Next(ctx)
122+
func (s *Source) ReadN(ctx context.Context, n int) ([]opencdc.Record, error) {
123+
return s.iterator.NextN(ctx, n)
124124
}
125125

126126
func (s *Source) Ack(ctx context.Context, pos opencdc.Position) error {

source/iterator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import (
2323

2424
// Iterator is an object that can iterate over a queue of records.
2525
type Iterator interface {
26-
// Next takes and returns the next record from the queue. Next is allowed to
27-
// block until either a record is available or the context gets canceled.
28-
Next(context.Context) (opencdc.Record, error)
26+
// NextN takes and returns up to n records from the queue. NextN is allowed to
27+
// block until either at least one record is available or the context gets canceled.
28+
NextN(context.Context, int) ([]opencdc.Record, error)
2929
// Ack signals that a record at a specific position was successfully
3030
// processed.
3131
Ack(context.Context, opencdc.Position) error

source/logrepl/cdc.go

+45-18
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type CDCConfig struct {
3838
}
3939

4040
// CDCIterator asynchronously listens for events from the logical replication
41-
// slot and returns them to the caller through Next.
41+
// slot and returns them to the caller through NextN.
4242
type CDCIterator struct {
4343
config CDCConfig
4444
records chan opencdc.Record
@@ -113,35 +113,62 @@ func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
113113
return nil
114114
}
115115

116-
// Next returns the next record retrieved from the subscription. This call will
117-
// block until either a record is returned from the subscription, the
118-
// subscription stops because of an error or the context gets canceled.
119-
// Returns error when the subscription has been started.
120-
func (i *CDCIterator) Next(ctx context.Context) (opencdc.Record, error) {
116+
// NextN takes and returns up to n records from the queue. NextN is allowed to
117+
// block until either at least one record is available or the context gets canceled.
118+
func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error) {
121119
if !i.subscriberReady() {
122-
return opencdc.Record{}, errors.New("logical replication has not been started")
120+
return nil, errors.New("logical replication has not been started")
123121
}
124122

125-
for {
123+
if n <= 0 {
124+
return nil, fmt.Errorf("n must be greater than 0, got %d", n)
125+
}
126+
127+
recs := make([]opencdc.Record, 0, n)
128+
129+
// Block until at least one record is received or context is canceled
130+
select {
131+
case <-ctx.Done():
132+
return nil, ctx.Err()
133+
case <-i.sub.Done():
134+
if err := i.sub.Err(); err != nil {
135+
return nil, fmt.Errorf("logical replication error: %w", err)
136+
}
137+
if err := ctx.Err(); err != nil {
138+
// subscription is done because the context is canceled, we went
139+
// into the wrong case by chance
140+
return nil, err
141+
}
142+
// subscription stopped without an error and the context is still
143+
// open, this is a strange case, shouldn't actually happen
144+
return nil, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
145+
case rec := <-i.records:
146+
recs = append(recs, rec)
147+
}
148+
149+
for len(recs) < n {
126150
select {
151+
case rec := <-i.records:
152+
recs = append(recs, rec)
127153
case <-ctx.Done():
128-
return opencdc.Record{}, ctx.Err()
154+
return nil, ctx.Err()
129155
case <-i.sub.Done():
130156
if err := i.sub.Err(); err != nil {
131-
return opencdc.Record{}, fmt.Errorf("logical replication error: %w", err)
157+
return recs, fmt.Errorf("logical replication error: %w", err)
132158
}
133159
if err := ctx.Err(); err != nil {
134-
// subscription is done because the context is canceled, we went
135-
// into the wrong case by chance
136-
return opencdc.Record{}, err
160+
// Return what we have with context error
161+
return recs, err
137162
}
138-
// subscription stopped without an error and the context is still
139-
// open, this is a strange case, shouldn't actually happen
140-
return opencdc.Record{}, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
141-
case r := <-i.records:
142-
return r, nil
163+
// Return what we have with subscription stopped error
164+
return recs, fmt.Errorf("subscription stopped, no more data to fetch (this smells like a bug)")
165+
default:
166+
// No more records currently available
167+
return recs, nil
143168
}
144169
}
170+
171+
return recs, nil
145172
}
146173

147174
// Ack forwards the acknowledgment to the subscription.

source/logrepl/cdc_test.go

+152-40
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func TestCDCIterator_New(t *testing.T) {
119119
}
120120
}
121121

122-
func TestCDCIterator_Next(t *testing.T) {
122+
func TestCDCIterator_Operation_NextN(t *testing.T) {
123123
ctx := test.Context(t)
124124
is := is.New(t)
125125

@@ -343,9 +343,11 @@ func TestCDCIterator_Next(t *testing.T) {
343343
// fetch the change
344344
nextCtx, cancel := context.WithTimeout(ctx, time.Second*10)
345345
defer cancel()
346-
got, err := i.Next(nextCtx)
346+
records, err := i.NextN(nextCtx, 1)
347347
is.NoErr(err)
348348

349+
got := records[0]
350+
349351
readAt, err := got.Metadata.GetReadAt()
350352
is.NoErr(err)
351353
is.True(readAt.After(now)) // ReadAt should be after now
@@ -359,40 +361,6 @@ func TestCDCIterator_Next(t *testing.T) {
359361
}
360362
}
361363

362-
func TestCDCIterator_Next_Fail(t *testing.T) {
363-
ctx := test.Context(t)
364-
365-
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
366-
table := test.SetupTestTable(ctx, t, pool)
367-
368-
t.Run("fail when sub is done", func(t *testing.T) {
369-
is := is.New(t)
370-
371-
i := testCDCIterator(ctx, t, pool, table, true)
372-
<-i.sub.Ready()
373-
374-
is.NoErr(i.Teardown(ctx))
375-
376-
_, err := i.Next(ctx)
377-
expectErr := "logical replication error:"
378-
379-
match := strings.Contains(err.Error(), expectErr)
380-
if !match {
381-
t.Logf("%s != %s", err.Error(), expectErr)
382-
}
383-
is.True(match)
384-
})
385-
386-
t.Run("fail when subscriber is not started", func(t *testing.T) {
387-
is := is.New(t)
388-
389-
i := testCDCIterator(ctx, t, pool, table, false)
390-
391-
_, nexterr := i.Next(ctx)
392-
is.Equal(nexterr.Error(), "logical replication has not been started")
393-
})
394-
}
395-
396364
func TestCDCIterator_EnsureLSN(t *testing.T) {
397365
ctx := test.Context(t)
398366
is := is.New(t)
@@ -407,8 +375,11 @@ func TestCDCIterator_EnsureLSN(t *testing.T) {
407375
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table))
408376
is.NoErr(err)
409377

410-
r, err := i.Next(ctx)
378+
rr, err := i.NextN(ctx, 1)
411379
is.NoErr(err)
380+
is.True(len(rr) > 0)
381+
382+
r := rr[0]
412383

413384
p, err := position.ParseSDKPosition(r.Position)
414385
is.NoErr(err)
@@ -485,6 +456,138 @@ func TestCDCIterator_Ack(t *testing.T) {
485456
})
486457
}
487458
}
459+
func TestCDCIterator_NextN(t *testing.T) {
460+
ctx := test.Context(t)
461+
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
462+
table := test.SetupTestTable(ctx, t, pool)
463+
464+
t.Run("retrieve exact N records", func(t *testing.T) {
465+
is := is.New(t)
466+
i := testCDCIterator(ctx, t, pool, table, true)
467+
<-i.sub.Ready()
468+
469+
for j := 1; j <= 3; j++ {
470+
_, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
471+
VALUES (%d, 'test-%d', %d, false, 12.3, 14)`, table, j+10, j, j*100))
472+
is.NoErr(err)
473+
}
474+
475+
var allRecords []opencdc.Record
476+
attemptCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
477+
defer cancel()
478+
479+
// Collect records until we have all 3
480+
for len(allRecords) < 3 {
481+
records, err := i.NextN(attemptCtx, 3-len(allRecords))
482+
is.NoErr(err)
483+
// Only proceed if we got at least one record
484+
is.True(len(records) > 0)
485+
allRecords = append(allRecords, records...)
486+
}
487+
488+
is.Equal(len(allRecords), 3)
489+
490+
for j, r := range allRecords {
491+
is.Equal(r.Operation, opencdc.OperationCreate)
492+
is.Equal(r.Key.(opencdc.StructuredData)["id"], int64(j+11))
493+
change := r.Payload
494+
data := change.After.(opencdc.StructuredData)
495+
is.Equal(data["column1"], fmt.Sprintf("test-%d", j+1))
496+
//nolint:gosec // no risk to overflow
497+
is.Equal(data["column2"], (int32(j)+1)*100)
498+
}
499+
})
500+
501+
t.Run("retrieve fewer records than requested", func(t *testing.T) {
502+
is := is.New(t)
503+
i := testCDCIterator(ctx, t, pool, table, true)
504+
<-i.sub.Ready()
505+
506+
for j := 1; j <= 2; j++ {
507+
_, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
508+
VALUES (%d, 'test-%d', %d, false, 12.3, 14)`, table, j+20, j, j*100))
509+
is.NoErr(err)
510+
}
511+
512+
// Will keep calling NextN until all records are received
513+
var records []opencdc.Record
514+
for len(records) < 2 {
515+
recordsTmp, err := i.NextN(ctx, 5)
516+
is.NoErr(err)
517+
records = append(records, recordsTmp...)
518+
}
519+
520+
// nothing else to fetch
521+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
522+
defer cancel()
523+
_, err := i.NextN(ctxWithTimeout, 5)
524+
is.True(errors.Is(err, context.DeadlineExceeded))
525+
526+
for j, r := range records {
527+
is.Equal(r.Operation, opencdc.OperationCreate)
528+
is.Equal(r.Key.(opencdc.StructuredData)["id"], int64(j+21))
529+
change := r.Payload
530+
data := change.After.(opencdc.StructuredData)
531+
is.Equal(data["column1"], fmt.Sprintf("test-%d", j+1))
532+
//nolint:gosec // no risk to overflow
533+
is.Equal(data["column2"], (int32(j)+1)*100)
534+
}
535+
})
536+
537+
t.Run("context cancellation", func(t *testing.T) {
538+
is := is.New(t)
539+
i := testCDCIterator(ctx, t, pool, table, true)
540+
<-i.sub.Ready()
541+
542+
ctxTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
543+
defer cancel()
544+
545+
_, err := i.NextN(ctxTimeout, 5)
546+
is.True(errors.Is(err, context.DeadlineExceeded))
547+
})
548+
549+
t.Run("subscriber not started", func(t *testing.T) {
550+
is := is.New(t)
551+
i := testCDCIterator(ctx, t, pool, table, false)
552+
553+
_, err := i.NextN(ctx, 5)
554+
is.Equal(err.Error(), "logical replication has not been started")
555+
})
556+
557+
t.Run("invalid N values", func(t *testing.T) {
558+
is := is.New(t)
559+
i := testCDCIterator(ctx, t, pool, table, true)
560+
<-i.sub.Ready()
561+
562+
_, err := i.NextN(ctx, 0)
563+
is.True(strings.Contains(err.Error(), "n must be greater than 0"))
564+
565+
_, err = i.NextN(ctx, -1)
566+
is.True(strings.Contains(err.Error(), "n must be greater than 0"))
567+
})
568+
569+
t.Run("subscription termination", func(t *testing.T) {
570+
is := is.New(t)
571+
i := testCDCIterator(ctx, t, pool, table, true)
572+
<-i.sub.Ready()
573+
574+
_, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
575+
VALUES (30, 'test-1', 100, false, 12.3, 14)`, table))
576+
is.NoErr(err)
577+
578+
go func() {
579+
time.Sleep(100 * time.Millisecond)
580+
is.NoErr(i.Teardown(ctx))
581+
}()
582+
583+
records, err := i.NextN(ctx, 5)
584+
if err != nil {
585+
is.True(strings.Contains(err.Error(), "logical replication error"))
586+
} else {
587+
is.True(len(records) > 0)
588+
}
589+
})
590+
}
488591

489592
func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table string, start bool) *CDCIterator {
490593
is := is.New(t)
@@ -560,8 +663,11 @@ func TestCDCIterator_Schema(t *testing.T) {
560663
)
561664
is.NoErr(err)
562665

563-
r, err := i.Next(ctx)
666+
rr, err := i.NextN(ctx, 1)
564667
is.NoErr(err)
668+
is.True(len(rr) > 0)
669+
670+
r := rr[0]
565671

566672
assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV1, table, r)
567673
assertKeySchemaOK(ctx, is, table, r)
@@ -580,8 +686,11 @@ func TestCDCIterator_Schema(t *testing.T) {
580686
)
581687
is.NoErr(err)
582688

583-
r, err := i.Next(ctx)
689+
rr, err := i.NextN(ctx, 1)
584690
is.NoErr(err)
691+
is.True(len(rr) > 0)
692+
693+
r := rr[0]
585694

586695
assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV2, table, r)
587696
assertKeySchemaOK(ctx, is, table, r)
@@ -600,8 +709,11 @@ func TestCDCIterator_Schema(t *testing.T) {
600709
)
601710
is.NoErr(err)
602711

603-
r, err := i.Next(ctx)
712+
rr, err := i.NextN(ctx, 1)
604713
is.NoErr(err)
714+
is.True(len(rr) > 0)
715+
716+
r := rr[0]
605717

606718
assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV3, table, r)
607719
assertKeySchemaOK(ctx, is, table, r)

0 commit comments

Comments
 (0)