@@ -16,7 +16,7 @@ use differential_dataflow::operators::arrange::arrangement::arrange_core;
1616use differential_dataflow:: operators:: arrange:: { Arranged , TraceAgent } ;
1717use differential_dataflow:: trace:: implementations:: spine_fueled:: Spine ;
1818use differential_dataflow:: trace:: { Batch , Batcher , Builder , Trace , TraceReader } ;
19- use differential_dataflow:: { Collection , Data , ExchangeData , Hashable } ;
19+ use differential_dataflow:: { Collection , Data , ExchangeData , Hashable , VecCollection } ;
2020use timely:: Container ;
2121use timely:: dataflow:: channels:: pact:: { Exchange , ParallelizationContract , Pipeline } ;
2222use timely:: dataflow:: operators:: Operator ;
@@ -112,7 +112,7 @@ where
112112 }
113113}
114114
115- impl < G , K , V , R > MzArrange for Collection < G , ( K , V ) , R >
115+ impl < G , K , V , R > MzArrange for VecCollection < G , ( K , V ) , R >
116116where
117117 G : Scope ,
118118 G :: Timestamp : Lattice ,
@@ -138,7 +138,7 @@ where
138138 }
139139}
140140
141- impl < G , K , V , R , C > MzArrangeCore for Collection < G , ( K , V ) , R , C >
141+ impl < G , C > MzArrangeCore for Collection < G , C >
142142where
143143 G : Scope ,
144144 G :: Timestamp : Lattice ,
@@ -167,10 +167,10 @@ where
167167/// A specialized collection where data only has a key, but no associated value.
168168///
169169/// Created by calling `collection.into()`.
170- pub struct KeyCollection < G : Scope , K , R = usize > ( Collection < G , K , R > ) ;
170+ pub struct KeyCollection < G : Scope , K , R = usize > ( VecCollection < G , K , R > ) ;
171171
172- impl < G : Scope , K , R : Semigroup > From < Collection < G , K , R > > for KeyCollection < G , K , R > {
173- fn from ( value : Collection < G , K , R > ) -> Self {
172+ impl < G : Scope , K , R : Semigroup > From < VecCollection < G , K , R > > for KeyCollection < G , K , R > {
173+ fn from ( value : VecCollection < G , K , R > ) -> Self {
174174 KeyCollection ( value)
175175 }
176176}
@@ -271,13 +271,13 @@ where
271271 let mut batches = BTreeMap :: new ( ) ;
272272
273273 move |input, output| {
274- while let Some ( ( time, data) ) = input . next ( ) {
274+ input . for_each ( | time, data| {
275275 batches. extend (
276276 data. iter ( )
277277 . map ( |batch| ( Rc :: as_ptr ( batch) , Rc :: downgrade ( batch) ) ) ,
278278 ) ;
279279 output. session ( & time) . give_container ( data) ;
280- }
280+ } ) ;
281281 let Some ( trace) = trace. upgrade ( ) else {
282282 return ;
283283 } ;
0 commit comments