-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathpaginator.go
68 lines (54 loc) · 1.43 KB
/
paginator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package nostr
import (
"context"
"math"
"slices"
"time"
)
func (pool *SimplePool) PaginatorWithInterval(
interval time.Duration,
) func(ctx context.Context, urls []string, filter Filter, opts ...SubscriptionOption) chan RelayEvent {
return func(ctx context.Context, urls []string, filter Filter, opts ...SubscriptionOption) chan RelayEvent {
nextUntil := Now()
if filter.Until != nil {
nextUntil = *filter.Until
}
globalLimit := uint64(filter.Limit)
if globalLimit == 0 && !filter.LimitZero {
globalLimit = math.MaxUint64
}
var globalCount uint64 = 0
globalCh := make(chan RelayEvent)
repeatedCache := make([]string, 0, 300)
nextRepeatedCache := make([]string, 0, 300)
go func() {
defer close(globalCh)
for {
filter.Until = &nextUntil
time.Sleep(interval)
keepGoing := false
for evt := range pool.FetchMany(ctx, urls, filter, opts...) {
if slices.Contains(repeatedCache, evt.ID) {
continue
}
keepGoing = true // if we get one that isn't repeated, then keep trying to get more
nextRepeatedCache = append(nextRepeatedCache, evt.ID)
globalCh <- evt
globalCount++
if globalCount >= globalLimit {
return
}
if evt.CreatedAt < *filter.Until {
nextUntil = evt.CreatedAt
}
}
if !keepGoing {
return
}
repeatedCache = nextRepeatedCache
nextRepeatedCache = nextRepeatedCache[:0]
}
}()
return globalCh
}
}