Skip to content

storage: use persist-txn for "introspection sources" #24108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/developer/design/20230615_webhook_source.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ In the coordinator we'll do the following:
1. Map the `database`, `schema`, and `name` to a webhook source using the `Catalog`. Erroring if no
object can be found or the object is not a webhook source.
2. Get the sending side of a channel that can be used to send new data to the `StorageController`.
This would be using the new `StorageController::monotonic_appender(...)` detailed below.
This would be using the new `StorageController::webhook_appender(...)` detailed below.
3. Spawn a new `tokio::task`, do all further work in this task and off the `Coordinator` thread.
4. Decode the body of our request according to `BODY FORMAT`.
5. Pack our data into a `Row`.
Expand All @@ -115,7 +115,7 @@ Today we expose a method [`append`](https://github.com/MaterializeInc/materializ
on the `StorageController` trait, I would like to add a new method:

```
pub async fn monotonic_appender(&self, id: GlobalId) -> CollectionAppender;
pub async fn webhook_appender(&self, id: GlobalId) -> CollectionAppender;
```

This new method would return a struct called `CollectionAppender`, which essentially acts as a
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl Catalog {
let span = tracing::span!(tracing::Level::DEBUG, "builtin_non_indexes");
let _enter = span.enter();
for (builtin, id) in builtin_non_indexes {
tracing::info!("WIP GlobalId {} is {}", id, builtin.name());
let schema_id = state.ambient_schemas_by_name[builtin.schema()];
let name = QualifiedItemName {
qualifiers: ItemQualifiers {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ impl Coordinator {
let row_tx = coord
.controller
.storage
.monotonic_appender(entry.id())
.webhook_appender(entry.id())
.map_err(|_| name)?;
let invalidator = coord
.active_webhooks
Expand Down
8 changes: 4 additions & 4 deletions src/adapter/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mz_repr::{ColumnType, Datum, Diff, Row, RowArena};
use mz_secrets::cache::CachingSecretsReader;
use mz_secrets::SecretsReader;
use mz_sql::plan::{WebhookHeaders, WebhookValidation, WebhookValidationSecret};
use mz_storage_client::controller::MonotonicAppender;
use mz_storage_client::controller::WebhookAppender as OldMonotonicAppender;
use mz_storage_types::controller::StorageError;
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -232,11 +232,11 @@ pub struct AppendWebhookResponse {
pub validator: Option<AppendWebhookValidator>,
}

/// A wrapper around [`MonotonicAppender`] that can get closed by the `Coordinator` if the webhook
/// A wrapper around [`WebhookAppender`] that can get closed by the `Coordinator` if the webhook
/// gets modified.
#[derive(Clone, Debug)]
pub struct WebhookAppender {
tx: MonotonicAppender,
tx: OldMonotonicAppender,
guard: WebhookAppenderGuard,
}

Expand All @@ -255,7 +255,7 @@ impl WebhookAppender {
Ok(())
}

pub(crate) fn new(tx: MonotonicAppender, guard: WebhookAppenderGuard) -> Self {
pub(crate) fn new(tx: OldMonotonicAppender, guard: WebhookAppenderGuard) -> Self {
WebhookAppender { tx, guard }
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ where
// error on the first attempt in tests.
mut indeterminate: Option<Indeterminate>,
) -> Result<Result<(SeqNo, WriterMaintenance<T>), InvalidUsage<T>>, (SeqNo, Upper<T>)> {
tracing::info!(
"WIP CaA {:.9} {} {:?}->{:?}",
self.shard_id(),
self.applier.shard_metrics.name,
batch.desc.lower().elements(),
batch.desc.upper().elements()
);
let metrics = Arc::clone(&self.applier.metrics);
let lease_duration_ms = self
.applier
Expand Down
2 changes: 2 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,7 @@ impl ShardsMetrics {
#[derive(Debug)]
pub struct ShardMetrics {
pub shard_id: ShardId,
pub name: String,
pub since: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub upper: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
pub largest_batch_size: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
Expand Down Expand Up @@ -1457,6 +1458,7 @@ impl ShardMetrics {
let shard = shard_id.to_string();
ShardMetrics {
shard_id: *shard_id,
name: name.to_owned(),
since: shards_metrics
.since
.get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]),
Expand Down
12 changes: 6 additions & 6 deletions src/storage-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ pub trait StorageController: Debug {
commands: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError>;

/// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
/// Returns a [`WebhookAppender`] which is a channel that can be used to monotonically
/// append to the specified [`GlobalId`].
fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError>;
fn webhook_appender(&self, id: GlobalId) -> Result<WebhookAppender, StorageError>;

/// Returns the snapshot of the contents of the local input named `id` at `as_of`.
async fn snapshot(
Expand Down Expand Up @@ -658,18 +658,18 @@ impl<T: Timestamp> ExportState<T> {
}
/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
///
/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
/// See `CollectionManager::webhook_appender` to acquire a [`WebhookAppender`].
#[derive(Clone, Debug)]
pub struct MonotonicAppender {
pub struct WebhookAppender {
/// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
tx: mpsc::Sender<(Vec<(Row, Diff)>, oneshot::Sender<Result<(), StorageError>>)>,
}

impl MonotonicAppender {
impl WebhookAppender {
pub fn new(
tx: mpsc::Sender<(Vec<(Row, Diff)>, oneshot::Sender<Result<(), StorageError>>)>,
) -> Self {
MonotonicAppender { tx }
WebhookAppender { tx }
}

pub async fn append(&self, updates: Vec<(Row, Diff)>) -> Result<(), StorageError> {
Expand Down
33 changes: 4 additions & 29 deletions src/storage-controller/src/collection_mgmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use mz_ore::task::AbortOnDropHandle;
use mz_persist_types::Codec64;
use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
use mz_storage_client::client::TimestamplessUpdate;
use mz_storage_client::controller::MonotonicAppender;
use mz_storage_client::controller::WebhookAppender;
use timely::progress::Timestamp;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -125,41 +125,16 @@ where
}
}

/// Appends `updates` to the collection correlated with `id`, does not wait for the append to
/// complete.
///
/// # Panics
/// - If `id` does not belong to managed collections.
/// - If there is contention to write to the collection identified by `id`.
/// - If the collection closed.
pub(super) async fn append_to_collection(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
if !updates.is_empty() {
// Get the update channel in a block to make sure the Mutex lock is scoped.
let update_tx = {
let guard = self.collections.lock().expect("CollectionManager panicked");
let (update_tx, _, _) = guard.get(&id).expect("id to exist");
update_tx.clone()
};

// Specifically _do not_ wait for the append to complete, just for it to be sent.
let (tx, _rx) = oneshot::channel();
update_tx.send((updates, tx)).await.expect("rx hung up");
}
}

/// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
/// Returns a [`WebhookAppender`] that can be used to monotonically append updates to the
/// collection correlated with `id`.
pub(super) fn monotonic_appender(
&self,
id: GlobalId,
) -> Result<MonotonicAppender, StorageError> {
pub(super) fn webhook_appender(&self, id: GlobalId) -> Result<WebhookAppender, StorageError> {
let guard = self.collections.lock().expect("CollectionManager panicked");
let tx = guard
.get(&id)
.map(|(tx, _, _)| tx.clone())
.ok_or(StorageError::IdentifierMissing(id))?;

Ok(MonotonicAppender::new(tx))
Ok(WebhookAppender::new(tx))
}
}

Expand Down
16 changes: 7 additions & 9 deletions src/storage-controller/src/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use mz_persist_types::Codec64;
use mz_repr::{Datum, Diff, GlobalId, Row, TimestampManipulation};
use timely::progress::Timestamp;

use crate::collection_mgmt::CollectionManager;
use crate::persist_handles::PersistTableWriteWorker;
use crate::IntrospectionType;

pub use mz_storage_client::healthcheck::*;
Expand Down Expand Up @@ -85,7 +85,7 @@ where
T: Timestamp + Lattice + Codec64 + TimestampManipulation,
{
/// Managed collection interface
collection_manager: CollectionManager<T>,
collection_manager: PersistTableWriteWorker<T>,
/// A list of introspection IDs for managed collections
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
previous_statuses: BTreeMap<GlobalId, String>,
Expand All @@ -96,7 +96,7 @@ where
T: Timestamp + Lattice + Codec64 + TimestampManipulation + From<EpochMillis>,
{
pub fn new(
collection_manager: CollectionManager<T>,
collection_manager: PersistTableWriteWorker<T>,
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
) -> Self {
Self {
Expand Down Expand Up @@ -139,12 +139,10 @@ where
.cloned()
.collect();

self.collection_manager
.append_to_collection(
source_status_history_id,
Self::pack_status_updates(new.clone()),
)
.await;
self.collection_manager.append_next_txn(
source_status_history_id,
Self::pack_status_updates(new.clone()),
);

self.previous_statuses
.extend(new.into_iter().map(|r| (r.id, r.status_name)));
Expand Down
Loading
Loading