@@ -30,6 +30,35 @@ type Iterator interface {
3030 Close () // ends the iterator
3131}
3232
33+ // SourceIterator represents a sequence of nodes like [Iterator]
34+ // Each node also has a named 'source'.
35+ type SourceIterator interface {
36+ Iterator
37+ NodeSource () string // source of current node
38+ }
39+
40+ // WithSource attaches a 'source name' to an iterator.
41+ func WithSourceName (name string , it Iterator ) SourceIterator {
42+ return sourceIter {it , name }
43+ }
44+
45+ func ensureSourceIter (it Iterator ) SourceIterator {
46+ if si , ok := it .(SourceIterator ); ok {
47+ return si
48+ }
49+ return WithSourceName ("" , it )
50+ }
51+
52+ type sourceIter struct {
53+ Iterator
54+ name string
55+ }
56+
57+ // NodeSource implements IteratorSource.
58+ func (it sourceIter ) NodeSource () string {
59+ return it .name
60+ }
61+
3362// ReadNodes reads at most n nodes from the given iterator. The return value contains no
3463// duplicates and no nil values. To prevent looping indefinitely for small repeating node
3564// sequences, this function calls Next at most n times.
@@ -106,16 +135,16 @@ func (it *sliceIter) Close() {
106135// Filter wraps an iterator such that Next only returns nodes for which
107136// the 'check' function returns true.
108137func Filter (it Iterator , check func (* Node ) bool ) Iterator {
109- return & filterIter {it , check }
138+ return & filterIter {ensureSourceIter ( it ) , check }
110139}
111140
112141type filterIter struct {
113- Iterator
142+ SourceIterator
114143 check func (* Node ) bool
115144}
116145
117146func (f * filterIter ) Next () bool {
118- for f .Iterator .Next () {
147+ for f .SourceIterator .Next () {
119148 if f .check (f .Node ()) {
120149 return true
121150 }
@@ -135,9 +164,9 @@ func (f *filterIter) Next() bool {
135164// It's safe to call AddSource and Close concurrently with Next.
136165type FairMix struct {
137166 wg sync.WaitGroup
138- fromAny chan * Node
167+ fromAny chan mixItem
139168 timeout time.Duration
140- cur * Node
169+ cur mixItem
141170
142171 mu sync.Mutex
143172 closed chan struct {}
@@ -146,11 +175,16 @@ type FairMix struct {
146175}
147176
148177type mixSource struct {
149- it Iterator
150- next chan * Node
178+ it SourceIterator
179+ next chan mixItem
151180 timeout time.Duration
152181}
153182
183+ type mixItem struct {
184+ n * Node
185+ source string
186+ }
187+
154188// NewFairMix creates a mixer.
155189//
156190// The timeout specifies how long the mixer will wait for the next fairly-chosen source
@@ -159,7 +193,7 @@ type mixSource struct {
159193// timeout makes the mixer completely fair.
160194func NewFairMix (timeout time.Duration ) * FairMix {
161195 m := & FairMix {
162- fromAny : make (chan * Node ),
196+ fromAny : make (chan mixItem ),
163197 closed : make (chan struct {}),
164198 timeout : timeout ,
165199 }
@@ -175,7 +209,11 @@ func (m *FairMix) AddSource(it Iterator) {
175209 return
176210 }
177211 m .wg .Add (1 )
178- source := & mixSource {it , make (chan * Node ), m .timeout }
212+ source := & mixSource {
213+ it : ensureSourceIter (it ),
214+ next : make (chan mixItem ),
215+ timeout : m .timeout ,
216+ }
179217 m .sources = append (m .sources , source )
180218 go m .runSource (m .closed , source )
181219}
@@ -201,7 +239,7 @@ func (m *FairMix) Close() {
201239
202240// Next returns a node from a random source.
203241func (m * FairMix ) Next () bool {
204- m .cur = nil
242+ m .cur = mixItem {}
205243
206244 for {
207245 source := m .pickSource ()
@@ -217,12 +255,12 @@ func (m *FairMix) Next() bool {
217255 }
218256
219257 select {
220- case n , ok := <- source .next :
258+ case item , ok := <- source .next :
221259 if ok {
222260 // Here, the timeout is reset to the configured value
223261 // because the source delivered a node.
224262 source .timeout = m .timeout
225- m .cur = n
263+ m .cur = item
226264 return true
227265 }
228266 // This source has ended.
@@ -239,15 +277,20 @@ func (m *FairMix) Next() bool {
239277
240278// Node returns the current node.
241279func (m * FairMix ) Node () * Node {
242- return m .cur
280+ return m .cur .n
281+ }
282+
283+ // NodeSource returns the current node's source name.
284+ func (m * FairMix ) NodeSource () string {
285+ return m .cur .source
243286}
244287
245288// nextFromAny is used when there are no sources or when the 'fair' choice
246289// doesn't turn up a node quickly enough.
247290func (m * FairMix ) nextFromAny () bool {
248- n , ok := <- m .fromAny
291+ item , ok := <- m .fromAny
249292 if ok {
250- m .cur = n
293+ m .cur = item
251294 }
252295 return ok
253296}
@@ -284,10 +327,10 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
284327 defer m .wg .Done ()
285328 defer close (s .next )
286329 for s .it .Next () {
287- n := s .it .Node ()
330+ item := mixItem { s .it .Node (), s . it . NodeSource ()}
288331 select {
289- case s .next <- n :
290- case m .fromAny <- n :
332+ case s .next <- item :
333+ case m .fromAny <- item :
291334 case <- closed :
292335 return
293336 }
0 commit comments