Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_decoding transform: skip toast columns with unchanged values #877

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/bin/pgcopydb/ld_test_decoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ parseTestDecodingMessageActionAndXid(LogicalStreamContext *context)
#define TD_NEW_TUPLE_LEN strlen(TD_NEW_TUPLE)
#define TD_FOUND_NEW_TUPLE(ptr) (strncmp(ptr, TD_NEW_TUPLE, TD_NEW_TUPLE_LEN) == 0)

#define TD_UNCHANGED_TOAST "unchanged-toast-datum"
#define TD_UNCHANGED_TOAST_LEN strlen(TD_UNCHANGED_TOAST)
#define TD_FOUND_UNCHANGED_TOAST(ptr) \
(strncmp(ptr, TD_UNCHANGED_TOAST, TD_UNCHANGED_TOAST_LEN) == 0)

/*
* parseTestDecodingMessage parses a message as emitted by test_decoding into
Expand Down Expand Up @@ -681,6 +685,15 @@ SetColumnNamesAndValues(LogicalMessageTuple *tuple, TestDecodingHeader *header)
break;
}

/* when we find unchanged-toast-datum */
if (cur->valueStart == NULL)
{
/* skip the unchanged-toast-datum */
TestDecodingColumns emptyTestDecodingColumns = { 0 };
*cur = emptyTestDecodingColumns;
continue;
}

++count;

/* that might have been the last column */
Expand Down Expand Up @@ -910,6 +923,20 @@ parseNextColumn(TestDecodingColumns *cols,
cols->valueLen,
cols->valueStart);
}

/*
* Parse unchanged-toast-datum string literals
*/
else if (header->action == STREAM_ACTION_UPDATE && TD_FOUND_UNCHANGED_TOAST(ptr))
{
/* advance to past the value, skip the next space */
ptr = ptr + TD_UNCHANGED_TOAST_LEN;
header->pos = ptr - header->message + 1;

log_trace("parseNextColumn: unchanged-toast-datum for column \"%.*s\"",
cols->colnameLen,
cols->colnameStart);
}
else
{
cols->valueStart = ptr;
Expand Down Expand Up @@ -1071,6 +1098,14 @@ prepareUpdateTuppleArrays(StreamContext *privateContext,
return false;
}

if (cols->attributes.count == 0)
{
log_trace("prepareUpdateTuppleArrays: Skipping update message as all "
"the column values are unchanged. message: \"%s\"",
header->message);
return true;
}

/*
* Now lookup our internal catalogs to find out for every column if it is
* part of the pkey definition (WHERE clause) or not (SET clause).
Expand Down
11 changes: 8 additions & 3 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -2198,9 +2198,14 @@ stream_write_update(FILE *out, LogicalMessageUpdate *update)
LogicalMessageTuple *old = &(update->old.array[s]);
LogicalMessageTuple *new = &(update->new.array[s]);

if (old->values.count != new->values.count ||
old->values.count != 1 ||
new->values.count != 1)
if (old->values.count == 0 && new->values.count == 0)
{
log_trace("stream_write_update: Skipping empty UPDATE statement");
continue;
}
else if (old->values.count != new->values.count ||
old->values.count != 1 ||
new->values.count != 1)
{
log_error("Failed to write multi-values UPDATE statement "
"with %d old rows and %d new rows",
Expand Down
8 changes: 8 additions & 0 deletions tests/cdc-test-decoding/000000010000000000000002.json

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions tests/cdc-test-decoding/000000010000000000000002.sql

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions tests/cdc-test-decoding/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,27 @@ begin
return 1;
end;
$$ language plpgsql;


--
-- See https://github.com/dimitri/pgcopydb/issues/710
-- Tables with toast columns may output `unchanged-toast-datum` values
begin;
create sequence xpto_rand_seq start 79 increment 1499; -- portable "random"

-- test table from PG regression tests
create table xpto (
id serial primary key,
toasted_col1 text,
rand1 float8 default nextval('xpto_rand_seq'),
toasted_col2 text,
rand2 float8 default nextval('xpto_rand_seq')
);

-- table with only toastable columns
create table xpto2 (
toasted_col1 text,
toasted_col2 text
);

commit;
35 changes: 35 additions & 0 deletions tests/cdc-test-decoding/dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,38 @@ commit;
begin;
delete from generated_column_test where id = 2;
commit;


--
-- See https://github.com/dimitri/pgcopydb/issues/710
--
begin;

-- uncompressed external toast data
insert into xpto (toasted_col1, toasted_col2)
select string_agg(g.i::text, ''), string_agg((g.i*2)::text, '')
from generate_series(1, 2000) g(i);

-- compressed external toast data
insert into xpto (toasted_col2)
select repeat(string_agg(to_char(g.i, 'fm0000'), ''), 50)
from generate_series(1, 500) g(i);

-- update of existing column
update xpto
set toasted_col1 = (select string_agg(g.i::text, '')
from generate_series(1, 2000) g(i))
where id = 1;

update xpto set rand1 = 123.456 where id = 1;

-- test case where the only data is external toast data
insert into xpto2 (toasted_col1, toasted_col2)
select string_agg(g.i::text, ''), string_agg((g.i*2)::text, '')
from generate_series(1, 2000) g(i);

-- weird update clause where the data is unchanged
-- we expect to skip the update of the row as the data is unchanged
update xpto2 set toasted_col1 = toasted_col1, toasted_col2 = toasted_col2;

commit;
Loading