Skip to content

Commit

Permalink
respond to Github feedback
Browse files Browse the repository at this point in the history
* update a lot of comments with TODO(cf) format for better tracking
* mostly leave comments around cancelation with is TODO
  • Loading branch information
ParkMyCar committed Jan 15, 2025
1 parent 076a34e commit c407594
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ impl Coordinator {
appends.entry(table.id()).or_default();
}

// Consolidate all Rows for a given table.
// Consolidate all Rows for a given table. We do not consolidate the
// staged batches, that's up to whoever staged them.
let mut all_appends = Vec::with_capacity(appends.len());
for (item_id, table_data) in appends.into_iter() {
let mut all_rows = Vec::new();
Expand Down
5 changes: 5 additions & 0 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ impl<T> StorageCommand<T> {
| AllowWrites
| UpdateConfiguration(_)
| AllowCompaction(_) => false,
// TODO(cf2): multi-replica oneshot ingestions. At the moment returning
// true here means we can't run `COPY FROM` on multi-replica clusters, this
// should be easy enough to support though.
RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true,
}
}
Expand Down Expand Up @@ -963,6 +966,8 @@ pub struct Update<T = mz_repr::Timestamp> {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
/// A batch of updates to be fed to a local input; however, the input must
/// determine the most appropriate timestamps to use.
///
/// TODO(cf2): Can we remove this and use only on [`TableData`].
pub struct TimestamplessUpdate {
pub row: Row,
pub diff: Diff,
Expand Down
8 changes: 5 additions & 3 deletions src/storage-controller/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
RunSinks(x) => metrics.run_sinks_count.add(x.len().cast_into()),
AllowCompaction(x) => metrics.allow_compaction_count.add(x.len().cast_into()),
RunOneshotIngestion(_) => {
// TODO(parkmycar): Metrics.
// TODO(cf2): Add metrics for oneshot ingestions.
}
}
}
Expand Down Expand Up @@ -155,7 +155,8 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
run_sinks.push(sink);
}

// TODO(parkmycar): ???
// TODO(cf1): Add a CancelOneshotIngestion command similar to CancelPeek
// that will compact/reduce away the RunOneshotIngestion.
run_oneshot_ingestions.extend(final_oneshot_ingestions.into_values());

// Reconstitute the commands as a compact history.
Expand Down Expand Up @@ -191,7 +192,8 @@ impl<T: std::fmt::Debug> CommandHistory<T> {
self.commands.push(StorageCommand::RunSinks(run_sinks));
}

// TODO(parkmycar): I think this is correct?
// TODO(cf1): Add a CancelOneshotIngestion command, make sure we prevent
// re-sending commands for ingestions that we've already responded to.
if !run_oneshot_ingestions.is_empty() {
self.commands.extend(
run_oneshot_ingestions
Expand Down
18 changes: 10 additions & 8 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + Tim
/// Channel for receiving table handle drops.
#[derivative(Debug = "ignore")]
pending_table_handle_drops_rx: mpsc::UnboundedReceiver<GlobalId>,
/// Closues that can be used to send responses from oneshot ingestions.
/// Closures that can be used to send responses from oneshot ingestions.
#[derivative(Debug = "ignore")]
pending_oneshot_ingestions: BTreeMap<GlobalId, OneshotResultCallback<ProtoBatch>>,

Expand Down Expand Up @@ -1384,9 +1384,10 @@ where
.pending_oneshot_ingestions
.insert(ingestion_id, result_tx);
assert!(novel.is_none());
Ok(())
} else {
Err(StorageError::ReadOnly)
}

Ok(())
}

async fn alter_export(
Expand Down Expand Up @@ -1982,11 +1983,12 @@ where
}
Some(StorageResponse::StagedBatches(batches)) => {
for (collection_id, batches) in batches {
let sender = self
.pending_oneshot_ingestions
.remove(&collection_id)
.expect("TODO");
(sender)(batches);
match self.pending_oneshot_ingestions.remove(&collection_id) {
Some(sender) => (sender)(batches),
// TODO(cf2): When we support running COPY FROM on multiple
// replicas we can probably just ignore the case of `None`.
None => mz_ore::soft_panic_or_log!("no sender for {collection_id}!"),
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/storage/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,10 @@ pub(crate) fn build_oneshot_ingestion_dataflow<A: Allocate>(
) {
let (results_tx, results_rx) = tokio::sync::mpsc::unbounded_channel();
let callback = move |result| {
// TODO(parkmycar) Do we care if the receiver has gone away?
// How do we handle ProtoBatch cleanup if one is sitting in the channel
// but the receiver has gone away?
// TODO(cf3): Do we care if the receiver has gone away?
//
// Persist is working on cleaning up leaked blobs, we could also use `OneshotReceiverExt`
// here, but that might run into the infamous async-Drop problem.
let _ = results_tx.send(result);
};

Expand Down
6 changes: 4 additions & 2 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub struct StorageState {
pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
/// Descriptions of each installed export.
pub exports: BTreeMap<GlobalId, StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>>,
/// Descriptions of oneshot ingestsions that are currently running.
/// Descriptions of oneshot ingestions that are currently running.
pub oneshot_ingestions: BTreeMap<GlobalId, OneshotIngestionDescription<ProtoBatch>>,
/// Undocumented
pub now: NowFn,
Expand Down Expand Up @@ -956,7 +956,9 @@ impl<'w, A: Allocate> Worker<'w, A> {
}
StorageCommand::RunOneshotIngestion(oneshot) => {
info!(%worker_id, ?oneshot, "reconcile: received RunOneshotIngestion command");
// nothing to do?
// TODO(cf1): Handle CancelOneshotIngestion, clean out stale oneshot
// ingestions from our state. Possibly here we respond to the client
// with a cancelation to make sure the client doesn't wait forever.
}
StorageCommand::RunSinks(exports) => {
info!(%worker_id, ?exports, "reconcile: received RunSinks command");
Expand Down

0 comments on commit c407594

Please sign in to comment.