-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchansort.go
44 lines (39 loc) · 1.56 KB
/
chansort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Deprecated: chansort has now moved to github.com/jamesrom/order/chansort
package chansort
import (
"constraints"
"time"
"github.com/jamesrom/priorityqueue"
)
// Less must describe a transitive ordering:
// - if both Less(i, j) and Less(j, k) are true, then Less(i, k) must be true as well.
// - if both Less(i, j) and Less(j, k) are false, then Less(i, k) must be false as well.
type Less[T any] func(T, T) bool
// SortOrderable sorts channel messages in ascending order. Messages received
// inside the sliding-window buffer defined by _window_ are sent to the
// output channel in ascending order. That is to say: a message received
// at time _Z_ from the output channel is guaranteed to be the smallest
// message since _Z − window_.
func SortOrderable[T constraints.Ordered](in <-chan T, window time.Duration) <-chan T {
defaultComparator := func(a, b T) bool { return a < b }
return SortWithComparator(in, window, defaultComparator)
}
// SortWithComparator sorts channel messages in the order defined by the given
// comparator function. Messages received inside the sliding-window buffer
// defined by _window_ are sent to the output channel in order.
// That is to say: a message received at time _Z_ from the output channel is
// guaranteed to be the smallest message since _Z − window_.
func SortWithComparator[T any](in <-chan T, window time.Duration, fn Less[T]) <-chan T {
q := priorityqueue.NewWithComparator(fn)
out := make(chan T)
go func() {
for {
el := <-in
q.Push(el)
time.AfterFunc(window, func() {
out <- q.Pop()
})
}
}()
return out
}