Skip to content

Commit

Permalink
Fix test_decoding schema parsing with quoted identifier (#852)
Browse files Browse the repository at this point in the history
The current parser fails when the DML message table schema is escaped with quotes.

Remove the quote check, as the existing logic already covers schemas with quotes.

Signed-off-by: Arunprasad Rajkumar <[email protected]>
  • Loading branch information
arajkumar authored Jul 29, 2024
1 parent e0123a6 commit 30d7bcd
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 26 deletions.
151 changes: 137 additions & 14 deletions src/bin/pgcopydb/ld_test_decoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ static bool prepareUpdateTuppleArrays(StreamContext *privateContext,
TestDecodingHeader *header);


static bool findIdentifierEndPos(const char *message, char separator, int *position);

/*
* prepareWal2jsonMessage prepares our internal JSON entry from a test_decoding
* message. At this stage we only escape the message as a proper JSON string.
Expand Down Expand Up @@ -300,6 +302,81 @@ parseTestDecodingMessage(StreamContext *privateContext,
}


/*
* findIdentifierEndPos returns the first position of the separator in the given
* message after navigating through all the quotes. It assumes that the message
* contains a well-formed identifier and doesn't attempt to handle errors while
* parsing the message.
*
* This should always find an identifier end position, as we are parsing a
* well-formed message from the test_decoding plugin.
*/
static bool
findIdentifierEndPos(const char *message, char separator, int *position)
{
if (message == NULL || position == NULL)
{
log_error("findIdentifierEndPos: message or position cannot be NULL");
return false;
}

/*
* The separator cannot be a quote, as we are looking for the first
* separator outside the quotes.
*/
if (separator == '"')
{
log_error("findIdentifierEndPos: separator cannot be a quote");
return false;
}

int quoteCount = 0;

int pos = 0;

for (pos = 0; message[pos] != '\0'; pos++)
{
if (message[pos] == '"')
{
quoteCount++;
}

/*
* We are looking for the first 'separator' in the message which
* should be outside the quotes.
*
* When there is 'separator' inside quotes, quoteCount will be odd,
* and we need to account it as a part of the identifier.
*
* Here are some possible inputs and ^ indicates the position
* we want to find:
*
* separator: '.'
* "Foo.Bar"."Baz": UPDATE:
* ^
* separator: '.'
* "Foo Bar.Baz".hello: UPDATE:
* ^
* separator: ':'
* "Foo Bar.Baz": UPDATE:
* ^
*/
else if (message[pos] == separator && (quoteCount % 2) == 0)
{
*position = pos;
return true;
}
}

/*
* We should never reach the end of the message without finding the
* separator, as we are parsing a well-formed message from the test_decoding
* plugin.
*/
return false;
}


/*
* parseTestDecodingMessageHeader parses a raw test_decoding message to find
* the header information only. It stops after having parsed the target table
Expand All @@ -319,25 +396,37 @@ parseTestDecodingMessageHeader(TestDecodingHeader *header, const char *message)
* idp dot sep acp end
*/
char *idp = (char *) message + strlen("table ");
char *dot = strchr(idp, '.');
char *sep = strchr(idp, ':');
char *acp = sep + 2; /* skip ": " */
char *end = strchr(acp, ':');

/* skip the last ":" of the header in the offset */
header->offset = (end - message + 1) + 1;
int schemaEndPos = 0;
char schemaSeparator = '.';

bool quoted = *idp == '"';

if (quoted)
if (!findIdentifierEndPos(idp, schemaSeparator, &schemaEndPos))
{
char ident[BUFSIZE] = { 0 };
strlcpy(ident, idp, sep - idp);
log_error("Failed to parse schema name in test_decoding message: %s",
message);
return false;
}

char *dot = idp + schemaEndPos;

log_error("Failed to parse quoted qualified identifer %s", ident);
int tableEndPos = 0;
char tableSeparator = ':';

if (!findIdentifierEndPos(dot, tableSeparator, &tableEndPos))
{
log_error("Failed to parse table name in test_decoding message: %s",
message);
return false;
}

char *sep = dot + tableEndPos;

char *acp = sep + 2; /* skip ": " */
char *end = strchr(acp, ':');

/* skip the last ":" of the header in the offset */
header->offset = (end - message + 1) + 1;

/*
* The table schema.name is already escaped by the plugin using PostgreSQL's
* internal quote_identifier function (see
Expand Down Expand Up @@ -657,8 +746,42 @@ parseNextColumn(TestDecodingColumns *cols,
}

/* search for data type name separators (open/close, or A/B) */
char *typA = strchr(ptr, '[');
char *typB = typA != NULL ? strchr(typA, ']') : NULL;
int columnEndPos = 0;
char columnSeparator = '[';

/*
* Find the end of the column name, which is the first '[' character, which
* is also the start of the column type.
* e.g. "payment_id[integer]:23757"
* ^
*/
if (!findIdentifierEndPos(ptr, columnSeparator, &columnEndPos))
{
log_error("Failed to parse test_decoding column name in message: %s",
header->message);

return false;
}

char *typA = ptr + columnEndPos;

int typeEndPos = 0;
char typeSeparator = ']';

/*
* Find the end of the column type, which is the first ']' character.
* e.g. "payment_id[integer]:23757"
* ^
*/
if (!findIdentifierEndPos(typA, typeSeparator, &typeEndPos))
{
log_error("Failed to parse test_decoding column type in message: %s",
header->message);

return false;
}

char *typB = typA + typeEndPos;

/*
* Postgres array data types are spelled like: "text[]". In test_decoding
Expand Down
8 changes: 6 additions & 2 deletions tests/cdc-test-decoding/000000010000000000000002.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
{"action":"I","xid":"0","lsn":"0/24DF6F0","timestamp":"2024-04-17 12:28:50.815952+0000","message":"table public.\"\"\"dqname\"\"\": INSERT: id[bigint]:1"}
{"action":"C","xid":"497","lsn":"0/24DF760","timestamp":"2024-04-17 12:28:50.815962+0000","message":"COMMIT 497"}
{"action":"B","xid":"498","lsn":"0/24DF760","timestamp":"2024-04-17 12:28:50.815979+0000","message":"BEGIN 498"}
{"action":"I","xid":"0","lsn":"0/24DF7C8","timestamp":"2024-04-17 12:28:50.816004+0000","message":"table public.identifer_as_column: INSERT: \"time\"[bigint]:1"}
{"action":"D","xid":"0","lsn":"0/24DF808","timestamp":"2024-04-17 12:28:50.816015+0000","message":"table public.identifer_as_column: DELETE: \"time\"[bigint]:1"}
{"action":"I","xid":"0","lsn":"0/24E33D8","timestamp":"2024-07-26 13:52:38.916787+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": INSERT: \"time\"[bigint]:1 \"[column name]\"[text]:'foo'"}
{"action":"U","xid":"0","lsn":"0/24E34D0","timestamp":"2024-07-26 13:52:38.916806+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": UPDATE: old-key: \"time\"[bigint]:1 \"[column name]\"[text]:'foo' new-tuple: \"time\"[bigint]:2 \"[column name]\"[text]:'[bar]'"}
{"action":"D","xid":"0","lsn":"0/24E3580","timestamp":"2024-07-26 13:52:38.916815+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": DELETE: \"time\"[bigint]:2 \"[column name]\"[text]:'[bar]'"}
{"action":"I","xid":"0","lsn":"0/24EB1D8","timestamp":"2024-07-27 03:09:41.672406+0000","message":"table \"Unicode\"\"Test\".\"слон\": INSERT: id[bigint]:1 \"слон\"[\"[Status]\"]:'open' \"колонка\"[text]:'foo'"}
{"action":"U","xid":"0","lsn":"0/24EB2D0","timestamp":"2024-07-27 03:09:41.672433+0000","message":"table \"Unicode\"\"Test\".\"слон\": UPDATE: old-key: id[bigint]:1 \"слон\"[\"[Status]\"]:'open' new-tuple: id[bigint]:2 \"слон\"[\"[Status]\"]:'closed' \"колонка\"[text]:'[bar]'"}
{"action":"D","xid":"0","lsn":"0/24EB380","timestamp":"2024-07-27 03:09:41.672445+0000","message":"table \"Unicode\"\"Test\".\"слон\": DELETE: id[bigint]:2 \"слон\"[\"[Status]\"]:'closed'"}
{"action":"C","xid":"498","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816023+0000","message":"COMMIT 498"}
{"action":"B","xid":"499","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816038+0000","message":"BEGIN 499"}
{"action":"I","xid":"0","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816083+0000","message":"table public.t_bit_types: INSERT: id[integer]:2 a[bit]:B'100' b[bit varying]:B'101'"}
Expand Down
16 changes: 12 additions & 4 deletions tests/cdc-test-decoding/000000010000000000000002.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ PREPARE 4835081e AS INSERT INTO public."""dqname""" (id) overriding system value
EXECUTE 4835081e["1"];
COMMIT; -- {"xid":497,"lsn":"0/24DFFA8","timestamp":"2024-05-08 11:10:15.240285+0000"}
BEGIN; -- {"xid":498,"lsn":"0/24DFFA8","timestamp":"2024-05-08 11:10:15.240338+0000","commit_lsn":"0/24E00E0"}
PREPARE 7a201c42 AS INSERT INTO public.identifer_as_column ("time") overriding system value VALUES ($1);
EXECUTE 7a201c42["1"];
PREPARE df296f92 AS DELETE FROM public.identifer_as_column WHERE "time" = $1;
EXECUTE df296f92["1"];
PREPARE 6035915f AS INSERT INTO "Foo"".Bar".":Identifer As ""Column"".$1:" ("time", "[column name]") overriding system value VALUES ($1, $2);
EXECUTE 6035915f["1","foo"];
PREPARE b08f5996 AS UPDATE "Foo"".Bar".":Identifer As ""Column"".$1:" SET "time" = $1, "[column name]" = $2 WHERE "time" = $3 and "[column name]" = $4;
EXECUTE b08f5996["2","[bar]","1","foo"];
PREPARE 9a0b3ec6 AS DELETE FROM "Foo"".Bar".":Identifer As ""Column"".$1:" WHERE "time" = $1 and "[column name]" = $2;
EXECUTE 9a0b3ec6["2","[bar]"];
PREPARE 786445a1 AS INSERT INTO "Unicode""Test"."слон" (id, "слон", "колонка") overriding system value VALUES ($1, $2, $3);
EXECUTE 786445a1["1","open","foo"];
PREPARE ca780274 AS UPDATE "Unicode""Test"."слон" SET id = $1, "слон" = $2, "колонка" = $3 WHERE id = $4 and "слон" = $5;
EXECUTE ca780274["2","closed","[bar]","1","open"];
PREPARE a6b34a6f AS DELETE FROM "Unicode""Test"."слон" WHERE id = $1 and "слон" = $2;
EXECUTE a6b34a6f["2","closed"];
COMMIT; -- {"xid":498,"lsn":"0/24E00E0","timestamp":"2024-05-08 11:10:15.240338+0000"}
BEGIN; -- {"xid":499,"lsn":"0/24E00E0","timestamp":"2024-05-08 11:10:15.240400+0000","commit_lsn":"0/24E0198"}
PREPARE 15aec07e AS INSERT INTO public.t_bit_types (id, a, b) overriding system value VALUES ($1, $2, $3);
Expand Down
21 changes: 18 additions & 3 deletions tests/cdc-test-decoding/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,26 @@ commit;

begin;

CREATE TABLE IF NOT EXISTS public.identifer_as_column
CREATE SCHEMA IF NOT EXISTS "Foo"".Bar";

CREATE TABLE IF NOT EXISTS "Foo"".Bar".":Identifer As ""Column"".$1:"
(
time bigserial,
"[column name]" text,
primary key (time, "[column name]")
);

CREATE SCHEMA IF NOT EXISTS "Unicode""Test";

CREATE TYPE "[Status]" AS ENUM ('new', 'open', 'closed');

CREATE TABLE IF NOT EXISTS "Unicode""Test".U&"\0441\043B\043E\043D"
(
time bigserial
id bigserial,
U&"!0441!043B!043E!043D" UESCAPE '!' "[Status]",
U&"!043A!043E!043B!043E!043D!043A!0430" UESCAPE '!' text,
primary key (id, U&"!0441!043B!043E!043D" UESCAPE '!')
);
alter table public.identifer_as_column replica identity full;

commit;

Expand Down
11 changes: 8 additions & 3 deletions tests/cdc-test-decoding/dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ commit;
--
begin;

insert into public.identifer_as_column default values;
insert into "Foo"".Bar".":Identifer As ""Column"".$1:" ("time", "[column name]") values (1, 'foo');

update public.identifer_as_column set "time" = 1 where "time" = 0;
update "Foo"".Bar".":Identifer As ""Column"".$1:" set "time" = 2, "[column name]" = '[bar]' where "time" = 1;

delete from public.identifer_as_column where "time" = 1;
delete from "Foo"".Bar".":Identifer As ""Column"".$1:" where "time" = 2;

insert into "Unicode""Test".U&"\0441\043B\043E\043D" (id, U&"!0441!043B!043E!043D" UESCAPE '!', U&"!043A!043E!043B!043E!043D!043A!0430" UESCAPE '!') values (1, 'open', 'foo');

update "Unicode""Test".U&"\0441\043B\043E\043D" set id = 2, U&"!0441!043B!043E!043D" UESCAPE '!' = 'closed', U&"\043A\043E\043B\043E\043D\043A\0430" = '[bar]' where id = 1;

delete from "Unicode""Test".U&"\0441\043B\043E\043D" where id = 2;
commit;

--
Expand Down

0 comments on commit 30d7bcd

Please sign in to comment.