Skip to content

postgres replication: Eager string decoding #32020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 17 additions & 44 deletions src/storage/src/source/postgres/replication.rs
Original file line number Diff line number Diff line change
@@ -79,7 +79,6 @@ use std::sync::LazyLock;
use std::time::Instant;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use differential_dataflow::AsCollection;
use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
use mz_ore::cast::CastFrom;
@@ -441,8 +440,6 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
let mut data_upper = resume_lsn;
// A stash of reusable vectors to convert from bytes::Bytes based data, which is not
// compatible with `columnation`, to Vec<u8> data that is.
let mut col_temp: Vec<Vec<u8>> = vec![];
let mut row_temp = vec![];
while let Some(event) = stream.as_mut().next().await {
use LogicalReplicationMessage::*;
use ReplicationMessage::*;
@@ -480,23 +477,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
upper_cap_set.downgrade([&data_upper]);
while let Some((oid, output_index, event, diff)) = tx.try_next().await?
{
let event = match event {
Ok(cols) => {
row_temp.clear();
for c in cols {
let c = c.map(|c| {
let mut col_vec =
col_temp.pop().unwrap_or_default();
col_vec.clear();
col_vec.extend_from_slice(&c);
col_vec
});
row_temp.push(c);
}
Ok(std::mem::take(&mut row_temp))
}
Err(err) => Err(err.into()),
};
let event = event.map_err(Into::into);
let mut data = (oid, output_index, event);
if let Some(req) = rewinds.get(&output_index) {
if commit_lsn <= req.snapshot_lsn {
@@ -507,11 +488,6 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
}
let update = (data, commit_lsn, diff);
data_output.give_fueled(&data_cap_set[0], &update).await;
// Store buffers for reuse
if let Ok(mut row) = update.0 .2 {
col_temp.extend(row.drain(..).flatten());
row_temp = row;
}
}
}
_ => return Err(TransientError::BareTransactionEvent),
@@ -599,12 +575,8 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
.get(&oid)
.and_then(|outputs| outputs.get(&output_index))
.expect("table_info contains all outputs");
let event = event.as_ref().map_err(|e| e.clone()).and_then(|row| {
let mut datums = datum_vec.borrow();
for col in row.iter() {
let datum = col.as_deref().map(super::decode_utf8_text).transpose()?;
datums.push(datum.unwrap_or(Datum::Null));
}
let event = event.and_then(|row| {
let datums = datum_vec.borrow_with(&row);
super::cast_row(&output.casts, &datums, &mut final_row)?;
Ok(SourceMessage {
key: Row::default(),
@@ -871,10 +843,10 @@ fn extract_transaction<'a>(
metrics: &'a PgSourceMetrics,
publication: &'a str,
errored_outputs: &'a mut HashSet<usize>,
) -> impl AsyncStream<
Item = Result<(u32, usize, Result<Vec<Option<Bytes>>, DefiniteError>, Diff), TransientError>,
> + 'a {
) -> impl AsyncStream<Item = Result<(u32, usize, Result<Row, DefiniteError>, Diff), TransientError>> + 'a
{
use LogicalReplicationMessage::*;
let mut row = Row::default();
async_stream::try_stream!({
let mut stream = pin!(stream);
metrics.transactions.inc();
@@ -895,7 +867,7 @@ fn extract_transaction<'a>(
Relation(body) if !table_info.contains_key(&body.rel_id()) => continue,
Insert(body) => {
metrics.inserts.inc();
let row = unpack_tuple(body.tuple().tuple_data());
let row = unpack_tuple(body.tuple().tuple_data(), &mut row);
let rel = body.rel_id();
for ((output, _), row) in table_info
.get(&rel)
@@ -917,8 +889,8 @@ fn extract_transaction<'a>(
TupleData::UnchangedToast => old,
_ => new,
});
let old_row = unpack_tuple(old_tuple.tuple_data());
let new_row = unpack_tuple(new_tuple);
let old_row = unpack_tuple(old_tuple.tuple_data(), &mut row);
let new_row = unpack_tuple(new_tuple, &mut row);
let rel = body.rel_id();
for ((output, _), (old_row, new_row)) in table_info
.get(&rel)
@@ -946,7 +918,7 @@ fn extract_transaction<'a>(
Delete(body) => match body.old_tuple() {
Some(old_tuple) => {
metrics.deletes.inc();
let row = unpack_tuple(old_tuple.tuple_data());
let row = unpack_tuple(old_tuple.tuple_data(), &mut row);
let rel = body.rel_id();
for ((output, _), row) in table_info
.get(&rel)
@@ -1053,23 +1025,24 @@ fn extract_transaction<'a>(

/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
/// done.
fn unpack_tuple<'a, I>(tuple_data: I) -> Result<Vec<Option<Bytes>>, DefiniteError>
#[inline]
fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
where
I: IntoIterator<Item = &'a TupleData>,
I::IntoIter: ExactSizeIterator,
{
let iter = tuple_data.into_iter();
let mut row = Vec::with_capacity(iter.len());
let mut packer = row.packer();
for data in iter {
let datum = match data {
TupleData::Text(bytes) => Some(bytes.clone()),
TupleData::Null => None,
TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
TupleData::Null => Datum::Null,
TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
};
row.push(datum);
packer.push(datum);
}
Ok(row)
Ok(row.clone())
}

/// Ensures the publication exists on the server. It returns an outer transient error in case of