diff --git a/src/capture.rs b/src/capture.rs index 9ad7725ca..92133fc2c 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -542,8 +542,8 @@ pub mod source { let capability = capability.expect("Changes occurred, without surfacing a capability"); let mut changes = ChangeBatch::new(); - changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1))); - changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1))); + changes.extend(lower_bound.iter().map(|t| (t.clone(), 1))); + changes.extend(reported_frontier.iter().map(|t| (t.clone(), -1))); let mut frontier_session = frontier.session(&capability); for peer in 0..workers { frontier_session.give((peer, changes.clone())); @@ -724,7 +724,7 @@ pub mod sink { // Announce the lower bound, upper bound, and timestamp counts. let progress = Progress { - lower: frontier.elements().to_vec(), + lower: frontier.to_vec(), upper: new_frontier.to_vec(), counts: announce, }; diff --git a/src/lattice.rs b/src/lattice.rs index 64fceffc9..1c2b979ce 100644 --- a/src/lattice.rs +++ b/src/lattice.rs @@ -268,7 +268,7 @@ implement_lattice!((), ()); /// let f1 = &[Product::new(3, 7), Product::new(5, 6)]; /// let f2 = &[Product::new(4, 6)]; /// let join = antichain_join(f1, f2); -/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]); +/// assert_eq!(&*join, &[Product::new(4, 7), Product::new(5, 6)]); /// # } /// ``` pub fn antichain_join(one: &[T], other: &[T]) -> Antichain { @@ -302,7 +302,7 @@ pub fn antichain_join(one: &[T], other: &[T]) -> Antichain { /// let f1 = &[Product::new(3, 7), Product::new(5, 6)]; /// let f2 = &[Product::new(4, 6)]; /// antichain_join_into(f1, f2, &mut join); -/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]); +/// assert_eq!(&*join, &[Product::new(4, 7), Product::new(5, 6)]); /// # } /// ``` pub fn antichain_join_into(one: &[T], other: &[T], upper: &mut Antichain) { @@ -334,7 +334,7 @@ pub fn antichain_join_into(one: &[T], other: &[T], upper: &mut Antic /// let f1 = &[Product::new(3, 7), Product::new(5, 6)]; /// let f2 = &[Product::new(4, 6)]; /// let meet = antichain_meet(f1, f2); -/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]); +/// assert_eq!(&*meet, &[Product::new(3, 7), Product::new(4, 6)]); /// # } /// ``` pub fn antichain_meet(one: &[T], other: &[T]) -> Antichain { @@ -351,8 +351,8 @@ pub fn antichain_meet(one: &[T], other: &[T]) -> Antichain impl Lattice for Antichain { fn join(&self, other: &Self) -> Self { let mut upper = Antichain::new(); - for time1 in self.elements().iter() { - for time2 in other.elements().iter() { + for time1 in self.iter() { + for time2 in other.iter() { upper.insert(time1.join(time2)); } } @@ -360,10 +360,10 @@ impl Lattice for Antichain { } fn meet(&self, other: &Self) -> Self { let mut upper = Antichain::new(); - for time1 in self.elements().iter() { + for time1 in self.iter() { upper.insert(time1.clone()); } - for time2 in other.elements().iter() { + for time2 in other.iter() { upper.insert(time2.clone()); } upper diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 2d4318e43..159610660 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -615,12 +615,12 @@ where // and feed this to the trace agent (but not along the timely output). // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + if capabilities.iter().any(|c| !input.frontier().less_equal(c.time())) { let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... - for (index, capability) in capabilities.elements().iter().enumerate() { + for (index, capability) in capabilities.iter().enumerate() { if !input.frontier().less_equal(capability.time()) { @@ -631,7 +631,7 @@ where for time in input.frontier().frontier().iter() { upper.insert(time.clone()); } - for other_capability in &capabilities.elements()[(index + 1) .. ] { + for other_capability in &capabilities[(index + 1) .. ] { upper.insert(other_capability.time().clone()); } @@ -641,7 +641,7 @@ where writer.insert(batch.clone(), Some(capability.time().clone())); // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); + output.session(&capabilities[index]).give(batch); } } @@ -652,7 +652,7 @@ where let mut new_capabilities = Antichain::new(); for time in batcher.frontier().iter() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + if let Some(capability) = capabilities.iter().find(|c| c.time().less_equal(time)) { new_capabilities.insert(capability.delayed(time)); } else { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 468398ae6..a493ac00f 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -210,12 +210,12 @@ where if prev_frontier.borrow() != input.frontier().frontier() { // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + if capabilities.iter().any(|c| !input.frontier().less_equal(c.time())) { let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... - for (index, capability) in capabilities.elements().iter().enumerate() { + for (index, capability) in capabilities.iter().enumerate() { if !input.frontier().less_equal(capability.time()) { @@ -226,7 +226,7 @@ where for time in input.frontier().frontier().iter() { upper.insert(time.clone()); } - for other_capability in &capabilities.elements()[(index + 1) .. ] { + for other_capability in &capabilities[(index + 1) .. ] { upper.insert(other_capability.time().clone()); } @@ -299,7 +299,7 @@ where // Communicate `batch` to the arrangement and the stream. writer.insert(batch.clone(), Some(capability.time().clone())); - output.session(&capabilities.elements()[index]).give(batch); + output.session(&capabilities[index]).give(batch); } } @@ -310,7 +310,7 @@ where let mut new_capabilities = Antichain::new(); if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + if let Some(capability) = capabilities.iter().find(|c| c.time().less_equal(time)) { new_capabilities.insert(capability.delayed(time)); } else { diff --git a/src/trace/description.rs b/src/trace/description.rs index 2604cc522..5274d6759 100644 --- a/src/trace/description.rs +++ b/src/trace/description.rs @@ -78,7 +78,7 @@ pub struct Description