From 16e888ef3c20d2f9bce4b523ccfb7b80d361726e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 26 Mar 2025 14:17:20 +0100 Subject: [PATCH] postgres replication: Eager string decoding The postgres replication implementation represented rows of data as `Vec>>`, which is accurate but potentially inefficient. Instead, we switch to `Row` containing `Datum::String` or `Datum::Null`. This causes us to do less work in total, with a slight caveat that it is _different_ work on the replication worker. I'm not sure if cloning Bytes into the nested vector is more work than constructing a `Row`, but there is a possibility for performance changes. A `Row` certainly is more compact in memory. Related: MaterializeInc/database-issues#9125 Fixes: MaterializeInc/database-issues#9123 Signed-off-by: Moritz Hoffmann --- .../src/source/postgres/replication.rs | 61 ++++++------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/src/storage/src/source/postgres/replication.rs b/src/storage/src/source/postgres/replication.rs index 721469a63dbe1..5ac4b51cbe8c0 100644 --- a/src/storage/src/source/postgres/replication.rs +++ b/src/storage/src/source/postgres/replication.rs @@ -79,7 +79,6 @@ use std::sync::LazyLock; use std::time::Instant; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use bytes::Bytes; use differential_dataflow::AsCollection; use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt}; use mz_ore::cast::CastFrom; @@ -441,8 +440,6 @@ pub(crate) fn render>( let mut data_upper = resume_lsn; // A stash of reusable vectors to convert from bytes::Bytes based data, which is not // compatible with `columnation`, to Vec data that is. - let mut col_temp: Vec> = vec![]; - let mut row_temp = vec![]; while let Some(event) = stream.as_mut().next().await { use LogicalReplicationMessage::*; use ReplicationMessage::*; @@ -480,23 +477,7 @@ pub(crate) fn render>( upper_cap_set.downgrade([&data_upper]); while let Some((oid, output_index, event, diff)) = tx.try_next().await? { - let event = match event { - Ok(cols) => { - row_temp.clear(); - for c in cols { - let c = c.map(|c| { - let mut col_vec = - col_temp.pop().unwrap_or_default(); - col_vec.clear(); - col_vec.extend_from_slice(&c); - col_vec - }); - row_temp.push(c); - } - Ok(std::mem::take(&mut row_temp)) - } - Err(err) => Err(err.into()), - }; + let event = event.map_err(Into::into); let mut data = (oid, output_index, event); if let Some(req) = rewinds.get(&output_index) { if commit_lsn <= req.snapshot_lsn { @@ -507,11 +488,6 @@ pub(crate) fn render>( } let update = (data, commit_lsn, diff); data_output.give_fueled(&data_cap_set[0], &update).await; - // Store buffers for reuse - if let Ok(mut row) = update.0 .2 { - col_temp.extend(row.drain(..).flatten()); - row_temp = row; - } } } _ => return Err(TransientError::BareTransactionEvent), @@ -599,12 +575,8 @@ pub(crate) fn render>( .get(&oid) .and_then(|outputs| outputs.get(&output_index)) .expect("table_info contains all outputs"); - let event = event.as_ref().map_err(|e| e.clone()).and_then(|row| { - let mut datums = datum_vec.borrow(); - for col in row.iter() { - let datum = col.as_deref().map(super::decode_utf8_text).transpose()?; - datums.push(datum.unwrap_or(Datum::Null)); - } + let event = event.and_then(|row| { + let datums = datum_vec.borrow_with(&row); super::cast_row(&output.casts, &datums, &mut final_row)?; Ok(SourceMessage { key: Row::default(), @@ -871,10 +843,10 @@ fn extract_transaction<'a>( metrics: &'a PgSourceMetrics, publication: &'a str, errored_outputs: &'a mut HashSet, -) -> impl AsyncStream< - Item = Result<(u32, usize, Result>, DefiniteError>, Diff), TransientError>, -> + 'a { +) -> impl AsyncStream, Diff), TransientError>> + 'a +{ use LogicalReplicationMessage::*; + let mut row = Row::default(); async_stream::try_stream!({ let mut stream = pin!(stream); metrics.transactions.inc(); @@ -895,7 +867,7 @@ fn extract_transaction<'a>( Relation(body) if !table_info.contains_key(&body.rel_id()) => continue, Insert(body) => { metrics.inserts.inc(); - let row = unpack_tuple(body.tuple().tuple_data()); + let row = unpack_tuple(body.tuple().tuple_data(), &mut row); let rel = body.rel_id(); for ((output, _), row) in table_info .get(&rel) @@ -917,8 +889,8 @@ fn extract_transaction<'a>( TupleData::UnchangedToast => old, _ => new, }); - let old_row = unpack_tuple(old_tuple.tuple_data()); - let new_row = unpack_tuple(new_tuple); + let old_row = unpack_tuple(old_tuple.tuple_data(), &mut row); + let new_row = unpack_tuple(new_tuple, &mut row); let rel = body.rel_id(); for ((output, _), (old_row, new_row)) in table_info .get(&rel) @@ -946,7 +918,7 @@ fn extract_transaction<'a>( Delete(body) => match body.old_tuple() { Some(old_tuple) => { metrics.deletes.inc(); - let row = unpack_tuple(old_tuple.tuple_data()); + let row = unpack_tuple(old_tuple.tuple_data(), &mut row); let rel = body.rel_id(); for ((output, _), row) in table_info .get(&rel) @@ -1053,23 +1025,24 @@ fn extract_transaction<'a>( /// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be /// done. -fn unpack_tuple<'a, I>(tuple_data: I) -> Result>, DefiniteError> +#[inline] +fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result where I: IntoIterator, I::IntoIter: ExactSizeIterator, { let iter = tuple_data.into_iter(); - let mut row = Vec::with_capacity(iter.len()); + let mut packer = row.packer(); for data in iter { let datum = match data { - TupleData::Text(bytes) => Some(bytes.clone()), - TupleData::Null => None, + TupleData::Text(bytes) => super::decode_utf8_text(bytes)?, + TupleData::Null => Datum::Null, TupleData::UnchangedToast => return Err(DefiniteError::MissingToast), TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData), }; - row.push(datum); + packer.push(datum); } - Ok(row) + Ok(row.clone()) } /// Ensures the publication exists on the server. It returns an outer transient error in case of