diff --git a/src/bin/pgcopydb/ld_test_decoding.c b/src/bin/pgcopydb/ld_test_decoding.c index 593927903..06822d1ef 100644 --- a/src/bin/pgcopydb/ld_test_decoding.c +++ b/src/bin/pgcopydb/ld_test_decoding.c @@ -83,6 +83,8 @@ static bool prepareUpdateTuppleArrays(StreamContext *privateContext, TestDecodingHeader *header); +static bool findIdentifierEndPos(const char *message, char separator, int *position); + /* * prepareWal2jsonMessage prepares our internal JSON entry from a test_decoding * message. At this stage we only escape the message as a proper JSON string. @@ -300,6 +302,81 @@ parseTestDecodingMessage(StreamContext *privateContext, } +/* + * findIdentifierEndPos returns the first position of the separator in the given + * message after navigating through all the quotes. It assumes that the message + * contains a well-formed identifier and doesn't attempt to handle errors while + * parsing the message. + * + * This should always find an identifier end position, as we are parsing a + * well-formed message from the test_decoding plugin. + */ +static bool +findIdentifierEndPos(const char *message, char separator, int *position) +{ + if (message == NULL || position == NULL) + { + log_error("findIdentifierEndPos: message or position cannot be NULL"); + return false; + } + + /* + * The separator cannot be a quote, as we are looking for the first + * separator outside the quotes. + */ + if (separator == '"') + { + log_error("findIdentifierEndPos: separator cannot be a quote"); + return false; + } + + int quoteCount = 0; + + int pos = 0; + + for (pos = 0; message[pos] != '\0'; pos++) + { + if (message[pos] == '"') + { + quoteCount++; + } + + /* + * We are looking for the first 'separator' in the message which + * should be outside the quotes. + * + * When there is 'separator' inside quotes, quoteCount will be odd, + * and we need to account it as a part of the identifier. + * + * Here are some possible inputs and ^ indicates the position + * we want to find: + * + * separator: '.' + * "Foo.Bar"."Baz": UPDATE: + * ^ + * separator: '.' + * "Foo Bar.Baz".hello: UPDATE: + * ^ + * separator: ':' + * "Foo Bar.Baz": UPDATE: + * ^ + */ + else if (message[pos] == separator && (quoteCount % 2) == 0) + { + *position = pos; + return true; + } + } + + /* + * We should never reach the end of the message without finding the + * separator, as we are parsing a well-formed message from the test_decoding + * plugin. + */ + return false; +} + + /* * parseTestDecodingMessageHeader parses a raw test_decoding message to find * the header information only. It stops after having parsed the target table @@ -319,25 +396,37 @@ parseTestDecodingMessageHeader(TestDecodingHeader *header, const char *message) * idp dot sep acp end */ char *idp = (char *) message + strlen("table "); - char *dot = strchr(idp, '.'); - char *sep = strchr(idp, ':'); - char *acp = sep + 2; /* skip ": " */ - char *end = strchr(acp, ':'); - /* skip the last ":" of the header in the offset */ - header->offset = (end - message + 1) + 1; + int schemaEndPos = 0; + char schemaSeparator = '.'; - bool quoted = *idp == '"'; - - if (quoted) + if (!findIdentifierEndPos(idp, schemaSeparator, &schemaEndPos)) { - char ident[BUFSIZE] = { 0 }; - strlcpy(ident, idp, sep - idp); + log_error("Failed to parse schema name in test_decoding message: %s", + message); + return false; + } + + char *dot = idp + schemaEndPos; - log_error("Failed to parse quoted qualified identifer %s", ident); + int tableEndPos = 0; + char tableSeparator = ':'; + + if (!findIdentifierEndPos(dot, tableSeparator, &tableEndPos)) + { + log_error("Failed to parse table name in test_decoding message: %s", + message); return false; } + char *sep = dot + tableEndPos; + + char *acp = sep + 2; /* skip ": " */ + char *end = strchr(acp, ':'); + + /* skip the last ":" of the header in the offset */ + header->offset = (end - message + 1) + 1; + /* * The table schema.name is already escaped by the plugin using PostgreSQL's * internal quote_identifier function (see @@ -657,8 +746,42 @@ parseNextColumn(TestDecodingColumns *cols, } /* search for data type name separators (open/close, or A/B) */ - char *typA = strchr(ptr, '['); - char *typB = typA != NULL ? strchr(typA, ']') : NULL; + int columnEndPos = 0; + char columnSeparator = '['; + + /* + * Find the end of the column name, which is the first '[' character, which + * is also the start of the column type. + * e.g. "payment_id[integer]:23757" + * ^ + */ + if (!findIdentifierEndPos(ptr, columnSeparator, &columnEndPos)) + { + log_error("Failed to parse test_decoding column name in message: %s", + header->message); + + return false; + } + + char *typA = ptr + columnEndPos; + + int typeEndPos = 0; + char typeSeparator = ']'; + + /* + * Find the end of the column type, which is the first ']' character. + * e.g. "payment_id[integer]:23757" + * ^ + */ + if (!findIdentifierEndPos(typA, typeSeparator, &typeEndPos)) + { + log_error("Failed to parse test_decoding column type in message: %s", + header->message); + + return false; + } + + char *typB = typA + typeEndPos; /* * Postgres array data types are spelled like: "text[]". In test_decoding diff --git a/tests/cdc-test-decoding/000000010000000000000002.json b/tests/cdc-test-decoding/000000010000000000000002.json index a47d68e2e..b96c7e367 100644 --- a/tests/cdc-test-decoding/000000010000000000000002.json +++ b/tests/cdc-test-decoding/000000010000000000000002.json @@ -37,8 +37,12 @@ {"action":"I","xid":"0","lsn":"0/24DF6F0","timestamp":"2024-04-17 12:28:50.815952+0000","message":"table public.\"\"\"dqname\"\"\": INSERT: id[bigint]:1"} {"action":"C","xid":"497","lsn":"0/24DF760","timestamp":"2024-04-17 12:28:50.815962+0000","message":"COMMIT 497"} {"action":"B","xid":"498","lsn":"0/24DF760","timestamp":"2024-04-17 12:28:50.815979+0000","message":"BEGIN 498"} -{"action":"I","xid":"0","lsn":"0/24DF7C8","timestamp":"2024-04-17 12:28:50.816004+0000","message":"table public.identifer_as_column: INSERT: \"time\"[bigint]:1"} -{"action":"D","xid":"0","lsn":"0/24DF808","timestamp":"2024-04-17 12:28:50.816015+0000","message":"table public.identifer_as_column: DELETE: \"time\"[bigint]:1"} +{"action":"I","xid":"0","lsn":"0/24E33D8","timestamp":"2024-07-26 13:52:38.916787+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": INSERT: \"time\"[bigint]:1 \"[column name]\"[text]:'foo'"} +{"action":"U","xid":"0","lsn":"0/24E34D0","timestamp":"2024-07-26 13:52:38.916806+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": UPDATE: old-key: \"time\"[bigint]:1 \"[column name]\"[text]:'foo' new-tuple: \"time\"[bigint]:2 \"[column name]\"[text]:'[bar]'"} +{"action":"D","xid":"0","lsn":"0/24E3580","timestamp":"2024-07-26 13:52:38.916815+0000","message":"table \"Foo\"\".Bar\".\":Identifer As \"\"Column\"\".$1:\": DELETE: \"time\"[bigint]:2 \"[column name]\"[text]:'[bar]'"} +{"action":"I","xid":"0","lsn":"0/24EB1D8","timestamp":"2024-07-27 03:09:41.672406+0000","message":"table \"Unicode\"\"Test\".\"слон\": INSERT: id[bigint]:1 \"слон\"[\"[Status]\"]:'open' \"колонка\"[text]:'foo'"} +{"action":"U","xid":"0","lsn":"0/24EB2D0","timestamp":"2024-07-27 03:09:41.672433+0000","message":"table \"Unicode\"\"Test\".\"слон\": UPDATE: old-key: id[bigint]:1 \"слон\"[\"[Status]\"]:'open' new-tuple: id[bigint]:2 \"слон\"[\"[Status]\"]:'closed' \"колонка\"[text]:'[bar]'"} +{"action":"D","xid":"0","lsn":"0/24EB380","timestamp":"2024-07-27 03:09:41.672445+0000","message":"table \"Unicode\"\"Test\".\"слон\": DELETE: id[bigint]:2 \"слон\"[\"[Status]\"]:'closed'"} {"action":"C","xid":"498","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816023+0000","message":"COMMIT 498"} {"action":"B","xid":"499","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816038+0000","message":"BEGIN 499"} {"action":"I","xid":"0","lsn":"0/24DF880","timestamp":"2024-04-17 12:28:50.816083+0000","message":"table public.t_bit_types: INSERT: id[integer]:2 a[bit]:B'100' b[bit varying]:B'101'"} diff --git a/tests/cdc-test-decoding/000000010000000000000002.sql b/tests/cdc-test-decoding/000000010000000000000002.sql index 57c512b09..a1b4892c1 100644 --- a/tests/cdc-test-decoding/000000010000000000000002.sql +++ b/tests/cdc-test-decoding/000000010000000000000002.sql @@ -63,10 +63,18 @@ PREPARE 4835081e AS INSERT INTO public."""dqname""" (id) overriding system value EXECUTE 4835081e["1"]; COMMIT; -- {"xid":497,"lsn":"0/24DFFA8","timestamp":"2024-05-08 11:10:15.240285+0000"} BEGIN; -- {"xid":498,"lsn":"0/24DFFA8","timestamp":"2024-05-08 11:10:15.240338+0000","commit_lsn":"0/24E00E0"} -PREPARE 7a201c42 AS INSERT INTO public.identifer_as_column ("time") overriding system value VALUES ($1); -EXECUTE 7a201c42["1"]; -PREPARE df296f92 AS DELETE FROM public.identifer_as_column WHERE "time" = $1; -EXECUTE df296f92["1"]; +PREPARE 6035915f AS INSERT INTO "Foo"".Bar".":Identifer As ""Column"".$1:" ("time", "[column name]") overriding system value VALUES ($1, $2); +EXECUTE 6035915f["1","foo"]; +PREPARE b08f5996 AS UPDATE "Foo"".Bar".":Identifer As ""Column"".$1:" SET "time" = $1, "[column name]" = $2 WHERE "time" = $3 and "[column name]" = $4; +EXECUTE b08f5996["2","[bar]","1","foo"]; +PREPARE 9a0b3ec6 AS DELETE FROM "Foo"".Bar".":Identifer As ""Column"".$1:" WHERE "time" = $1 and "[column name]" = $2; +EXECUTE 9a0b3ec6["2","[bar]"]; +PREPARE 786445a1 AS INSERT INTO "Unicode""Test"."слон" (id, "слон", "колонка") overriding system value VALUES ($1, $2, $3); +EXECUTE 786445a1["1","open","foo"]; +PREPARE ca780274 AS UPDATE "Unicode""Test"."слон" SET id = $1, "слон" = $2, "колонка" = $3 WHERE id = $4 and "слон" = $5; +EXECUTE ca780274["2","closed","[bar]","1","open"]; +PREPARE a6b34a6f AS DELETE FROM "Unicode""Test"."слон" WHERE id = $1 and "слон" = $2; +EXECUTE a6b34a6f["2","closed"]; COMMIT; -- {"xid":498,"lsn":"0/24E00E0","timestamp":"2024-05-08 11:10:15.240338+0000"} BEGIN; -- {"xid":499,"lsn":"0/24E00E0","timestamp":"2024-05-08 11:10:15.240400+0000","commit_lsn":"0/24E0198"} PREPARE 15aec07e AS INSERT INTO public.t_bit_types (id, a, b) overriding system value VALUES ($1, $2, $3); diff --git a/tests/cdc-test-decoding/ddl.sql b/tests/cdc-test-decoding/ddl.sql index d3af3880f..4400fda8f 100644 --- a/tests/cdc-test-decoding/ddl.sql +++ b/tests/cdc-test-decoding/ddl.sql @@ -25,11 +25,26 @@ commit; begin; -CREATE TABLE IF NOT EXISTS public.identifer_as_column +CREATE SCHEMA IF NOT EXISTS "Foo"".Bar"; + +CREATE TABLE IF NOT EXISTS "Foo"".Bar".":Identifer As ""Column"".$1:" +( + time bigserial, + "[column name]" text, + primary key (time, "[column name]") +); + +CREATE SCHEMA IF NOT EXISTS "Unicode""Test"; + +CREATE TYPE "[Status]" AS ENUM ('new', 'open', 'closed'); + +CREATE TABLE IF NOT EXISTS "Unicode""Test".U&"\0441\043B\043E\043D" ( - time bigserial + id bigserial, + U&"!0441!043B!043E!043D" UESCAPE '!' "[Status]", + U&"!043A!043E!043B!043E!043D!043A!0430" UESCAPE '!' text, + primary key (id, U&"!0441!043B!043E!043D" UESCAPE '!') ); -alter table public.identifer_as_column replica identity full; commit; diff --git a/tests/cdc-test-decoding/dml.sql b/tests/cdc-test-decoding/dml.sql index 68a7df24f..65ca8ed18 100644 --- a/tests/cdc-test-decoding/dml.sql +++ b/tests/cdc-test-decoding/dml.sql @@ -74,12 +74,17 @@ commit; -- begin; -insert into public.identifer_as_column default values; +insert into "Foo"".Bar".":Identifer As ""Column"".$1:" ("time", "[column name]") values (1, 'foo'); -update public.identifer_as_column set "time" = 1 where "time" = 0; +update "Foo"".Bar".":Identifer As ""Column"".$1:" set "time" = 2, "[column name]" = '[bar]' where "time" = 1; -delete from public.identifer_as_column where "time" = 1; +delete from "Foo"".Bar".":Identifer As ""Column"".$1:" where "time" = 2; +insert into "Unicode""Test".U&"\0441\043B\043E\043D" (id, U&"!0441!043B!043E!043D" UESCAPE '!', U&"!043A!043E!043B!043E!043D!043A!0430" UESCAPE '!') values (1, 'open', 'foo'); + +update "Unicode""Test".U&"\0441\043B\043E\043D" set id = 2, U&"!0441!043B!043E!043D" UESCAPE '!' = 'closed', U&"\043A\043E\043B\043E\043D\043A\0430" = '[bar]' where id = 1; + +delete from "Unicode""Test".U&"\0441\043B\043E\043D" where id = 2; commit; --