Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from]: Initial implementation, add OneshotSource and OneshotFormat, support appending Batches to Tables #30942

Merged
merged 12 commits into from
Jan 17, 2025
Prev Previous commit
Next Next commit
responding to GitHub feedback
* use the stage_batches initial capability instead of a CapabilitySet
* add comments describing existing behavior
ParkMyCar committed Jan 17, 2025
commit 1ae99e900a7fb991ab3b438474d4f800ae6548ce
41 changes: 15 additions & 26 deletions src/storage-operators/src/oneshot_source.rs
Original file line number Diff line number Diff line change
@@ -89,7 +89,6 @@ use std::fmt::{Debug, Display};
use std::future::Future;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::CapabilitySet;
use timely::dataflow::{Scope, Stream as TimelyStream};
use timely::progress::Antichain;
use tracing::info;
@@ -112,6 +111,12 @@ pub mod http_source;
/// 4. Decode the fetched byte blobs into [`Row`]s.
/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
///
/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
/// currently have a single worker Fetch an entire file, and then distributes
/// chunks for parallel Decoding. We should benchmark if this is actually faster
/// than just a single worker both fetching and decoding.
///
pub fn render<G, F>(
scope: G,
persist_clients: Arc<PersistClientCache>,
@@ -217,16 +222,7 @@ where

info!(%collection_id, %worker_id, "CopyFrom Leader Discover");

// Spawn a separate task for listing to move any IO to a tokio thread.
let work =
mz_ore::task::spawn(
|| "discover",
async move { source.list().await.context("list") },
)
.await
.expect("failed to spawn task")
.context("discover");

let work = source.list().await.context("list");
match work {
Ok(objects) => objects
.into_iter()
@@ -454,7 +450,7 @@ where
let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);

let shutdown = builder.build(move |caps| async move {
let [_proto_batch_cap] = caps.try_into().unwrap();
let [proto_batch_cap] = caps.try_into().unwrap();

// Open a Persist handle that we can use to stage a batch.
let persist_client = persist_clients
@@ -475,23 +471,19 @@ where
.await
.expect("could not open Persist shard");

// TODO(cf2): Should we use different timestamps here? Using MIN feels a bit weird
// and instead the Coordinator could provide timestamps, but that isn't obviously better?
// Create a batch using the minimum timestamp since these batches will
// get sent back to `environmentd` and their timestamps re-written
// before being finally appended.
let lower = mz_repr::Timestamp::MIN;
let upper = Antichain::from_elem(lower.step_forward());

let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
let mut caps = CapabilitySet::new();

while let Some(event) = rows_handle.next().await {
let AsyncEvent::Data(cap, row_batch) = event else {
let AsyncEvent::Data(_, row_batch) = event else {
continue;
};

// Note: In the current implementation of `COPY FROM` this set should
// only ever contain one capability.
caps.insert(cap);

// Pull Rows off our stream and stage them into a Batch.
for maybe_row in row_batch {
match maybe_row {
@@ -513,8 +505,8 @@ where
batch.delete().await;

// Pass on the error.
let send_cap = caps.first().expect("didn't get any capabilities?");
proto_batch_handle.give(send_cap, Err(err).context("stage batches"));
proto_batch_handle
.give(&proto_batch_cap, Err(err).context("stage batches"));
return;
}
}
@@ -535,10 +527,7 @@ where
// TODO(cf2): Make sure these batches get cleaned up if another
// worker encounters an error.
let proto_batch = batch.into_transmittable_batch();

// Use the first (lowest?) capability from our set.
let send_cap = caps.first().expect("didn't get any capabilities?");
proto_batch_handle.give(send_cap, Ok(proto_batch));
proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
});

(proto_batch_stream, shutdown.press_on_drop())