diff --git a/cmd_stream.go b/cmd_stream.go index b6d0034..0362462 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -1536,7 +1536,7 @@ func xautoclaim( return nextCallId, nil } - msgs := g.pendingAfter(start) + msgs := g.pendingAfterOrEqual(start) var res []StreamEntry for i, p := range msgs { if minIdleTime > 0 && now.Before(p.lastDelivery.Add(minIdleTime)) { diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 7e6d34d..05596e5 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -1322,6 +1322,21 @@ func TestStreamAutoClaim(t *testing.T) { proto.Array(), ), ) + + // read again using the ID returned from the last XAUTOCLAIM call as 'start'. + // the results include that starting message. unlike XREADGROUP, the results of XAUTOCLAIM + // are INCLUSIVE of the start ID. + mustDo(t, c, + "XAUTOCLAIM", "planets", "processing", "bob", "15000", "0-2", "COUNT", "1", + proto.Array( + proto.String("0-0"), + proto.Array( + proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")), + ), + proto.Array(), + ), + ) + mustDo(t, c, "XINFO", "CONSUMERS", "planets", "processing", proto.Array( @@ -1350,8 +1365,8 @@ func TestStreamAutoClaim(t *testing.T) { proto.Array( proto.String("0-2"), proto.String("bob"), - proto.Int(20000), - proto.Int(4), + proto.Int(0), + proto.Int(5), ), ), ) diff --git a/stream.go b/stream.go index 5d3ec67..f2dd466 100644 --- a/stream.go +++ b/stream.go @@ -440,6 +440,13 @@ func (s *streamKey) delete(ids []string) (int, error) { return count, nil } +func (g *streamGroup) pendingAfterOrEqual(id string) []pendingEntry { + pos := sort.Search(len(g.pending), func(i int) bool { + return streamCmp(id, g.pending[i].id) <= 0 + }) + return g.pending[pos:] +} + func (g *streamGroup) pendingAfter(id string) []pendingEntry { pos := sort.Search(len(g.pending), func(i int) bool { return streamCmp(id, g.pending[i].id) < 0