diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs
index d8970294310cf..6d427187c2db5 100644
--- a/src/storage-client/src/controller.rs
+++ b/src/storage-client/src/controller.rs
@@ -28,7 +28,6 @@ use std::time::Duration;
 
 use async_trait::async_trait;
 use mz_cluster_client::client::ClusterReplicaLocation;
-use mz_cluster_client::metrics::WallclockLagMetrics;
 use mz_cluster_client::ReplicaId;
 use mz_persist_client::batch::ProtoBatch;
 use mz_persist_types::{Codec64, Opaque, ShardId};
@@ -723,11 +722,6 @@ pub struct ExportState<T: TimelyTimestamp> {
 
     /// Reported write frontier.
     pub write_frontier: Antichain<T>,
-
-    /// Maximum frontier wallclock lag since the last introspection update.
-    pub wallclock_lag_max: Duration,
-    /// Frontier wallclock lag metrics tracked for this collection.
-    pub wallclock_lag_metrics: WallclockLagMetrics,
 }
 
 impl<T: Timestamp> ExportState<T> {
@@ -735,15 +729,12 @@ impl<T: Timestamp> ExportState<T> {
         description: ExportDescription<T>,
         read_hold: ReadHold<T>,
         read_policy: ReadPolicy<T>,
-        wallclock_lag_metrics: WallclockLagMetrics,
     ) -> Self {
         Self {
             description,
             read_hold,
             read_policy,
             write_frontier: Antichain::from_elem(Timestamp::minimum()),
-            wallclock_lag_max: Default::default(),
-            wallclock_lag_metrics,
         }
     }
 
diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs
index 943f8af6946e0..8daabea6a21e6 100644
--- a/src/storage-controller/src/lib.rs
+++ b/src/storage-controller/src/lib.rs
@@ -142,8 +142,6 @@ pub struct Controller<T: Timestamp + Lattice + Codec64 + From<EpochMillis> + Tim
     /// This is to prevent the re-binding of identifiers to other descriptions.
     pub(crate) collections: BTreeMap<GlobalId, CollectionState<T>>,
 
-    pub(crate) exports: BTreeMap<GlobalId, ExportState<T>>,
-
     /// Write handle for table shards.
     pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker<T>,
     /// A shared TxnsCache running in a task and communicated with over a channel.
@@ -358,6 +356,11 @@ where
 
         match &collection.extra_state {
             CollectionStateExtra::Ingestion(ingestion_state) => Ok(ingestion_state.hydrated),
+            CollectionStateExtra::Sink(_) => {
+                // For now, sinks are always considered hydrated.
+                // TODO(sinks): base this off of the sink shard's frontier?
+                Ok(true)
+            }
             CollectionStateExtra::None => {
                 // For now, objects that are not ingestions are always
                 // considered hydrated.
@@ -1300,8 +1303,12 @@ where
         &self,
         id: GlobalId,
     ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
-        self.exports
+        self.collections
             .get(&id)
+            .and_then(|c| match &c.extra_state {
+                CollectionStateExtra::Sink(state) => Some(state),
+                _ => None,
+            })
             .ok_or(StorageError::IdentifierMissing(id))
     }
 
@@ -1309,8 +1316,12 @@ where
         &mut self,
         id: GlobalId,
     ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>> {
-        self.exports
+        self.collections
             .get_mut(&id)
+            .and_then(|c| match &mut c.extra_state {
+                CollectionStateExtra::Sink(state) => Some(state),
+                _ => None,
+            })
             .ok_or(StorageError::IdentifierMissing(id))
     }
 
@@ -1362,13 +1373,18 @@ where
                 .metrics
                 .wallclock_lag_metrics(id, Some(description.instance_id));
 
-            let export_state = ExportState::new(
-                description.clone(),
-                read_hold,
-                read_policy,
-                wallclock_lag_metrics,
+            let export_state = ExportState::new(description.clone(), read_hold, read_policy);
+            let collection_metadata = self.storage_collections.collection_metadata(id)?;
+            self.collections.insert(
+                id,
+                CollectionState {
+                    data_source: DataSource::Other,
+                    collection_metadata,
+                    extra_state: CollectionStateExtra::Sink(export_state),
+                    wallclock_lag_max: Default::default(),
+                    wallclock_lag_metrics,
+                },
             );
-            self.exports.insert(id, export_state);
 
             // Just like with `new_source_statistic_entries`, we can probably
             // `insert` here, but in the interest of safety, never override
@@ -1443,10 +1459,7 @@ where
         let to_storage_metadata = self.storage_collections.collection_metadata(id)?;
 
         // Check whether the sink's write frontier is beyond the read hold we got
-        let cur_export = self
-            .exports
-            .get_mut(&id)
-            .ok_or_else(|| StorageError::IdentifierMissing(id))?;
+        let cur_export = self.export_mut(id)?;
         let input_readable = cur_export
             .write_frontier
             .iter()
@@ -1455,17 +1468,11 @@ where
             return Err(StorageError::ReadBeforeSince(from_id));
         }
 
-        let wallclock_lag_metrics = self
-            .metrics
-            .wallclock_lag_metrics(id, Some(new_description.instance_id));
-
         let new_export = ExportState {
             description: new_description.clone(),
             read_hold,
             read_policy: cur_export.read_policy.clone(),
             write_frontier: cur_export.write_frontier.clone(),
-            wallclock_lag_max: Default::default(),
-            wallclock_lag_metrics,
         };
         *cur_export = new_export;
 
@@ -1906,7 +1913,7 @@ where
                 for id in ids.iter() {
                     tracing::debug!("DroppedIds for collections {id}");
 
-                    if let Some(export) = self.exports.get_mut(id) {
+                    if let Some(export) = self.export_mut(*id).ok() {
                         // TODO: Current main never drops export state, so we
                         // also don't do that, because it would be yet more
                         // refactoring. Instead, we downgrade to the empty
@@ -1917,7 +1924,7 @@ where
                             .read_hold
                             .try_downgrade(Antichain::new())
                             .expect("must be possible");
-                    } else if let Some(_collection) = self.collections.remove(id) {
+                    } else if let Some(collection) = self.collections.remove(id) {
                         // Nothing to do, we already dropped read holds in
                         // `drop_sources_unvalidated`.
                     } else {
@@ -1989,6 +1996,9 @@ where
                                                 ingestion_state.hydrated = true;
                                             }
                                         }
+                                        CollectionStateExtra::Sink(_) => {
+                                            // TODO(sinks): track sink hydration?
+                                        }
                                         CollectionStateExtra::None => {
                                             // Nothing to do
                                         }
@@ -2053,36 +2063,41 @@ where
             let instance = cluster_id.and_then(|cluster_id| self.instances.get_mut(&cluster_id));
 
             if read_frontier.is_empty() {
-                if instance.is_some() && self.collections.contains_key(&id) {
-                    let collection = self.collections.get(&id).expect("known to exist");
-                    match collection.extra_state {
-                        CollectionStateExtra::Ingestion(_) => {
-                            pending_source_drops.push(id);
-                        }
-                        CollectionStateExtra::None => {
-                            // Nothing to do
-                        }
-                    }
-                } else if let Some(collection) = self.collections.get(&id) {
-                    match collection.data_source {
-                        DataSource::Table { .. } => {
-                            pending_collection_drops.push(id);
+                if let Some(collection) = self.collections.get(&id) {
+                    if instance.is_some() {
+                        match collection.extra_state {
+                            CollectionStateExtra::Ingestion(_) => {
+                                pending_source_drops.push(id);
+                            }
+                            CollectionStateExtra::None => {
+                                // Nothing to do
+                            }
+                            CollectionStateExtra::Sink(_) => {
+                                pending_sink_drops.push(id);
+                            }
                         }
-                        DataSource::Webhook => {
-                            pending_collection_drops.push(id);
-                            // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
-                            // could probably use some love and maybe get merged together?
-                            let fut = self.collection_manager.unregister_collection(id);
-                            mz_ore::task::spawn(|| format!("storage-webhook-cleanup-{id}"), fut);
+                    } else {
+                        match collection.data_source {
+                            DataSource::Table { .. } => {
+                                pending_collection_drops.push(id);
+                            }
+                            DataSource::Webhook => {
+                                pending_collection_drops.push(id);
+                                // TODO(parkmycar): The Collection Manager and PersistMonotonicWriter
+                                // could probably use some love and maybe get merged together?
+                                let fut = self.collection_manager.unregister_collection(id);
+                                mz_ore::task::spawn(
+                                    || format!("storage-webhook-cleanup-{id}"),
+                                    fut,
+                                );
+                            }
+                            DataSource::Ingestion(_) => (),
+                            DataSource::IngestionExport { .. } => (),
+                            DataSource::Introspection(_) => (),
+                            DataSource::Progress => (),
+                            DataSource::Other => (),
                         }
-                        DataSource::Ingestion(_) => (),
-                        DataSource::IngestionExport { .. } => (),
-                        DataSource::Introspection(_) => (),
-                        DataSource::Progress => (),
-                        DataSource::Other => (),
                     }
-                } else if instance.is_some() && self.exports.contains_key(&id) {
-                    pending_sink_drops.push(id);
                 } else if instance.is_none() {
                     tracing::info!("Compaction command for id {id}, but we don't have a client.");
                 } else {
@@ -2440,7 +2455,6 @@ where
         Self {
             build_info,
             collections: BTreeMap::default(),
-            exports: BTreeMap::default(),
             persist_table_worker,
             txns_read,
             txns_metrics,
@@ -2493,14 +2507,15 @@ where
         let mut read_capability_changes = BTreeMap::default();
 
         for (id, policy) in policies.into_iter() {
-            if let Some(_export) = self.exports.get_mut(&id) {
-                unreachable!("set_hold_policies is only called for ingestions");
-            } else if let Some(collection) = self.collections.get_mut(&id) {
+            if let Some(collection) = self.collections.get_mut(&id) {
                 let ingestion = match &mut collection.extra_state {
                     CollectionStateExtra::Ingestion(ingestion) => ingestion,
                     CollectionStateExtra::None => {
                         unreachable!("set_hold_policies is only called for ingestions");
                     }
+                    CollectionStateExtra::Sink(_) => {
+                        unreachable!("set_hold_policies is only called for ingestions");
+                    }
                 };
                 let mut new_derived_since = policy.frontier(ingestion.write_frontier.borrow());
 
@@ -2529,39 +2544,30 @@ where
         let mut read_capability_changes = BTreeMap::default();
 
         for (id, new_upper) in updates.iter() {
-            if let Ok(export) = self.export_mut(*id) {
-                if PartialOrder::less_than(&export.write_frontier, new_upper) {
-                    export.write_frontier.clone_from(new_upper);
-                }
+            if let Some(collection) = self.collections.get_mut(id) {
+                match &mut collection.extra_state {
+                    CollectionStateExtra::Ingestion(ingestion) => {
+                        if PartialOrder::less_than(&ingestion.write_frontier, new_upper) {
+                            ingestion.write_frontier.clone_from(new_upper);
+                        }
 
-                // Ignore read policy for sinks whose write frontiers are closed, which identifies
-                // the sink is being dropped; we need to advance the read frontier to the empty
-                // chain to signal to the dataflow machinery that they should deprovision this
-                // object.
-                let new_read_capability = if export.write_frontier.is_empty() {
-                    export.write_frontier.clone()
-                } else {
-                    export.read_policy.frontier(export.write_frontier.borrow())
-                };
+                        debug!(%id, ?ingestion, ?new_upper, "upper update for ingestion!");
 
-                if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability) {
-                    let mut update = ChangeBatch::new();
-                    update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
-                    update.extend(
-                        export
-                            .read_hold
-                            .since()
-                            .iter()
-                            .map(|time| (time.clone(), -1)),
-                    );
+                        let mut new_derived_since = ingestion
+                            .hold_policy
+                            .frontier(ingestion.write_frontier.borrow());
 
-                    if !update.is_empty() {
-                        read_capability_changes.insert(*id, update);
+                        if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) {
+                            let mut update = ChangeBatch::new();
+                            update.extend(new_derived_since.iter().map(|time| (time.clone(), 1)));
+                            std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since);
+                            update.extend(new_derived_since.iter().map(|time| (time.clone(), -1)));
+
+                            if !update.is_empty() {
+                                read_capability_changes.insert(*id, update);
+                            }
+                        }
                     }
-                }
-            } else if let Some(collection) = self.collections.get_mut(id) {
-                let ingestion = match &mut collection.extra_state {
-                    CollectionStateExtra::Ingestion(ingestion) => ingestion,
                     CollectionStateExtra::None => {
                         if matches!(collection.data_source, DataSource::Progress) {
                             // We do get these, but can't do anything with it!
@@ -2574,26 +2580,37 @@ where
                         }
                         continue;
                     }
-                };
-
-                if PartialOrder::less_than(&ingestion.write_frontier, new_upper) {
-                    ingestion.write_frontier.clone_from(new_upper);
-                }
-
-                debug!(%id, ?ingestion, ?new_upper, "upper update for ingestion!");
+                    CollectionStateExtra::Sink(export) => {
+                        if PartialOrder::less_than(&export.write_frontier, new_upper) {
+                            export.write_frontier.clone_from(new_upper);
+                        }
 
-                let mut new_derived_since = ingestion
-                    .hold_policy
-                    .frontier(ingestion.write_frontier.borrow());
+                        // Ignore read policy for sinks whose write frontiers are closed, which identifies
+                        // the sink is being dropped; we need to advance the read frontier to the empty
+                        // chain to signal to the dataflow machinery that they should deprovision this
+                        // object.
+                        let new_read_capability = if export.write_frontier.is_empty() {
+                            export.write_frontier.clone()
+                        } else {
+                            export.read_policy.frontier(export.write_frontier.borrow())
+                        };
 
-                if PartialOrder::less_equal(&ingestion.derived_since, &new_derived_since) {
-                    let mut update = ChangeBatch::new();
-                    update.extend(new_derived_since.iter().map(|time| (time.clone(), 1)));
-                    std::mem::swap(&mut ingestion.derived_since, &mut new_derived_since);
-                    update.extend(new_derived_since.iter().map(|time| (time.clone(), -1)));
+                        if PartialOrder::less_equal(export.read_hold.since(), &new_read_capability)
+                        {
+                            let mut update = ChangeBatch::new();
+                            update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
+                            update.extend(
+                                export
+                                    .read_hold
+                                    .since()
+                                    .iter()
+                                    .map(|time| (time.clone(), -1)),
+                            );
 
-                    if !update.is_empty() {
-                        read_capability_changes.insert(*id, update);
+                            if !update.is_empty() {
+                                read_capability_changes.insert(*id, update);
+                            }
+                        }
                     }
                 }
             } else if self.storage_collections.check_exists(*id).is_ok() {
@@ -2636,31 +2653,24 @@ where
                 debug!(id = %key, ?update, "update_hold_capability");
             }
 
-            if let Ok(export) = self.export_mut(key) {
-                // Seed with our current read hold, then apply changes, to
-                // derive how we need to change our read hold.
-                let mut staged_read_hold = MutableAntichain::new();
-                staged_read_hold
-                    .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1)));
-                let changes = staged_read_hold.update_iter(update.drain());
-                update.extend(changes);
-
-                // Make sure we also send `AllowCompaction` commands for sinks,
-                // which drives updating the sink's `as_of`, among other things.
-                let (changes, frontier, _cluster_id) =
-                    exports_net.entry(key).or_insert_with(|| {
-                        (
-                            <ChangeBatch<_>>::new(),
-                            Antichain::new(),
-                            export.cluster_id(),
-                        )
-                    });
-
-                changes.extend(update.drain());
-                *frontier = staged_read_hold.frontier().to_owned();
-            } else if let Some(collection) = self.collections.get_mut(&key) {
-                let ingestion = match &mut collection.extra_state {
-                    CollectionStateExtra::Ingestion(ingestion) => ingestion,
+            if let Some(collection) = self.collections.get_mut(&key) {
+                match &mut collection.extra_state {
+                    CollectionStateExtra::Ingestion(ingestion) => {
+                        let changes = ingestion.read_capabilities.update_iter(update.drain());
+                        update.extend(changes);
+
+                        let (changes, frontier, _cluster_id) =
+                            collections_net.entry(key).or_insert_with(|| {
+                                (
+                                    <ChangeBatch<_>>::new(),
+                                    Antichain::new(),
+                                    ingestion.instance_id,
+                                )
+                            });
+
+                        changes.extend(update.drain());
+                        *frontier = ingestion.read_capabilities.frontier().to_owned();
+                    }
                     CollectionStateExtra::None => {
                         // WIP: See if this ever panics in ci.
                         soft_panic_or_log!(
@@ -2669,22 +2679,30 @@ where
                         );
                         continue;
                     }
-                };
-
-                let changes = ingestion.read_capabilities.update_iter(update.drain());
-                update.extend(changes);
-
-                let (changes, frontier, _cluster_id) =
-                    collections_net.entry(key).or_insert_with(|| {
-                        (
-                            <ChangeBatch<_>>::new(),
-                            Antichain::new(),
-                            ingestion.instance_id,
-                        )
-                    });
-
-                changes.extend(update.drain());
-                *frontier = ingestion.read_capabilities.frontier().to_owned();
+                    CollectionStateExtra::Sink(export) => {
+                        // Seed with our current read hold, then apply changes, to
+                        // derive how we need to change our read hold.
+                        let mut staged_read_hold = MutableAntichain::new();
+                        staged_read_hold
+                            .update_iter(export.read_hold.since().iter().map(|t| (t.clone(), 1)));
+                        let changes = staged_read_hold.update_iter(update.drain());
+                        update.extend(changes);
+
+                        // Make sure we also send `AllowCompaction` commands for sinks,
+                        // which drives updating the sink's `as_of`, among other things.
+                        let (changes, frontier, _cluster_id) =
+                            exports_net.entry(key).or_insert_with(|| {
+                                (
+                                    <ChangeBatch<_>>::new(),
+                                    Antichain::new(),
+                                    export.cluster_id(),
+                                )
+                            });
+
+                        changes.extend(update.drain());
+                        *frontier = staged_read_hold.frontier().to_owned();
+                    }
+                }
             } else {
                 // This is confusing and we should probably error.
                 tracing::warn!(id = ?key, ?update, "update_hold_capabilities for unknown object");
@@ -2713,6 +2731,13 @@ where
 
                 let ingestion = match &mut collection.extra_state {
                     CollectionStateExtra::Ingestion(ingestion) => ingestion,
+                    CollectionStateExtra::Sink(_) => {
+                        soft_panic_or_log!(
+                            "trying to downgrade read holds for collection which is not an \
+                             ingestion: {collection:?}"
+                        );
+                        continue;
+                    }
                     CollectionStateExtra::None => {
                         soft_panic_or_log!(
                             "trying to downgrade read holds for collection which is not an \
@@ -2733,7 +2758,7 @@ where
         }
         for (key, (mut changes, frontier, cluster_id)) in exports_net {
             if !changes.is_empty() {
-                let export_state = self.exports.get_mut(&key).expect("missing export state");
+                let export_state = self.export_mut(key).expect("missing export state");
 
                 export_state
                     .read_hold
@@ -2778,14 +2803,6 @@ where
         Ok(())
     }
 
-    /// Iterate over exports that have not been dropped.
-    fn active_exports(&self) -> impl Iterator<Item = (GlobalId, &ExportState<T>)> {
-        self.exports
-            .iter()
-            .filter(|(_id, e)| !e.is_dropped())
-            .map(|(id, e)| (*id, e))
-    }
-
     /// Opens a write and critical since handles for the given `shard`.
     ///
     /// `since` is an optional `since` that the read handle will be forwarded to if it is less than
@@ -2949,7 +2966,7 @@ where
         self.sink_statistics
             .lock()
             .expect("poisoned")
-            .retain(|k, _| self.exports.contains_key(k));
+            .retain(|k, _| self.export(*k).is_ok());
     }
 
     /// Appends a new global ID, shard ID pair to the appropriate collection.
@@ -3059,7 +3076,7 @@ where
 
         for update in updates {
             let id = update.id;
-            if self.exports.contains_key(&id) {
+            if self.export(id).is_ok() {
                 sink_status_updates.push(update);
             } else if self.storage_collections.check_exists(id).is_ok() {
                 source_status_updates.push(update);
@@ -3211,25 +3228,11 @@ where
                 .collections
                 .get(&id)
                 .and_then(|c| match &c.extra_state {
-                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion),
+                    CollectionStateExtra::Ingestion(ingestion) => Some(ingestion.instance_id),
+                    CollectionStateExtra::Sink(export) => Some(export.cluster_id()),
                     CollectionStateExtra::None => None,
                 })
-                .and_then(|i| self.instances.get(&i.instance_id));
-            if let Some(instance) = instance {
-                for replica_id in instance.replica_ids() {
-                    replica_frontiers.insert((id, replica_id), upper.clone());
-                }
-            }
-
-            global_frontiers.insert(id, (since, upper));
-        }
-
-        for (id, export) in self.active_exports() {
-            // Exports cannot be read from, so their `since` is always the empty frontier.
-            let since = Antichain::new();
-            let upper = export.write_frontier.clone();
-
-            let instance = self.instances.get(&export.cluster_id());
+                .and_then(|i| self.instances.get(&i));
             if let Some(instance) = instance {
                 for replica_id in instance.replica_ids() {
                     replica_frontiers.insert((id, replica_id), upper.clone());
@@ -3355,20 +3358,6 @@ where
             collection.wallclock_lag_metrics.observe(lag);
         }
 
-        let active_exports = self.exports.iter_mut().filter(|(_id, e)| !e.is_dropped());
-        for (id, export) in active_exports {
-            let lag = frontier_lag(&export.write_frontier);
-            export.wallclock_lag_max = std::cmp::max(export.wallclock_lag_max, lag);
-
-            if let Some(updates) = &mut introspection_updates {
-                let lag = std::mem::take(&mut export.wallclock_lag_max);
-                let row = pack_row(*id, lag);
-                updates.push((row, 1));
-            }
-
-            export.wallclock_lag_metrics.observe(lag);
-        }
-
         if let Some(updates) = introspection_updates {
             self.append_introspection_updates(IntrospectionType::WallclockLagHistory, updates);
             self.wallclock_lag_last_refresh = Instant::now();
@@ -3499,6 +3488,7 @@ struct CollectionState<T: TimelyTimestamp> {
 #[derive(Debug)]
 enum CollectionStateExtra<T: TimelyTimestamp> {
     Ingestion(IngestionState<T>),
+    Sink(ExportState<T>),
     None,
 }